44 # include <sys/socket.h>
45 # include <netinet/in.h>
46 # include <netinet/tcp.h>
50 #define NCBI_USE_ERRCODE_X ConnServ_Connection
52 #define END_OF_MULTILINE_OUTPUT "END"
148 m_Generation(server->m_ServerInPool->m_CurrentConnectionGeneration.
Get()),
151 if (TServConn_UserLinger2::GetDefault())
163 int upper_limit = TServConn_MaxConnPoolSize::GetDefault();
166 m_FreeConnectionListSize < upper_limit) {
179 #define STRING_LEN(str) (sizeof(str) - 1)
180 #define WARNING_PREFIX "WARNING:"
181 #define WARNING_PREFIX_LEN STRING_LEN(WARNING_PREFIX)
184 bool multiline_output)
193 "Communication timeout while reading"
200 "Connection closed");
205 "Communication error while reading");
217 const char* semicolon = strchr(reply,
';');
218 if (semicolon ==
NULL) {
219 conn_listener->OnWarning(
string(reply, reply + reply_len),
224 conn_listener->OnWarning(
string(reply, semicolon),
m_Server);
225 reply_len -= semicolon - reply + 1;
226 reply = semicolon + 1;
235 }
else if (!multiline_output) {
236 if (TServConn_ErrorOnUnexpectedReply::GetDefault()) {
238 }
else if (TServConn_WarnOnUnexpectedReply::GetDefault()) {
239 conn_listener->OnWarning(
"Unexpected reply: " +
result,
m_Server);
263 string str(line +
"\r\n");
265 const char*
buf =
str.data();
285 bool multiline_output,
292 m_Impl->m_Socket.SetCork(
false);
295 m_Impl->m_Socket.GetOSHandle(&fd,
sizeof(fd));
296 setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &
val,
sizeof(
val));
315 m_Address(std::move(address)),
316 m_ServerProperties(server_properties),
317 m_ThrottleStats(std::move(throttle_params))
368 rv = std::move(args.
GetArgs());
371 const char*
version = version_string.c_str();
372 const char* prev_part_end =
version;
374 string attr_name, attr_value;
378 memcmp(
version + 1,
"ersion", 6) == 0) {
379 const char* version_number =
version + 7;
380 attr_name.assign(prev_part_end, version_number);
382 attr_name[attr_name.size() - 7] =
'v';
383 while (
isspace(*version_number) ||
384 *version_number ==
':' || *version_number ==
'=')
386 const char* version_number_end = version_number;
387 while (
isdigit(*version_number_end) ||
388 *version_number_end ==
'.')
389 ++version_number_end;
390 attr_value.assign(version_number, version_number_end);
393 prev_part_end = version_number_end;
394 while (
isspace(*prev_part_end) || *prev_part_end ==
'&')
397 memcmp(
version + 1,
"uil", 3) == 0 &&
399 const char* build =
version + 5;
400 attr_name.assign(
version, build);
402 attr_name[attr_name.size() - 5] =
'B';
403 while (
isspace(*build) || *build ==
':' || *build ==
'=')
405 attr_value.assign(build);
414 if (--
version == prev_part_end)
419 "Details",
string(prev_part_end,
version)));
428 m_NextAttribute(m_Attributes.begin())
445 return m_Impl->GetNextAttribute(attr_name, attr_value);
450 return m_Impl->m_ServerInPool->m_Address;
467 conn->m_Server = server;
473 conn->m_Generation !=
478 conn->m_Socket.GetSOCK(),
487 conn->m_Socket.Close();
505 if (remaining.
IsZero())
return true;
517 if (t1.
sec < t2.
sec)
return t1;
518 if (t1.
sec > t2.
sec)
return t2;
530 #ifndef NCBI_OS_MSWIN
541 auto& socket =
conn->m_Socket;
545 socket.SetDataLogging(TServConn_ConnDataLogging::GetDefault() ?
eOn :
eOff);
548 socket.DisableOSSendDelay();
549 socket.SetReuseAddress(
eOn);
610 auto& throttle_stats = server_in_pool.m_ThrottleStats;
612 throttle_stats.Check(
this);
615 server_in_pool.TryExec(
this,
handler, timeout);
622 throttle_stats.Adjust(
this, -1);
629 bool multiline_output,
662 bool multiline_output,
667 exec_result, exec_listener);
669 TryExec(exec_handler, timeout);
679 const auto op_result = err_code >= 0;
694 " reached the maximum number of connection failures in a row";
702 if (reg[index] != op_result) {
703 reg[index] = op_result;
708 " aborted as it was considered bad/overloaded";
744 os <<
"Disabling throttling for server " << address.
AsString() <<
745 " before new attempt after " <<
duration.AsString() <<
" seconds wait" <<
770 bool multiline_output,
bool retry_on_exception)
777 unsigned attempt = 0;
789 if (++attempt > max_retries ||
794 ERR_POST(
"Timeout (max_connection_time=" <<
802 if (++attempt > max_retries ||
807 ERR_POST(
"Timeout (max_connection_time=" <<
815 warning +=
", reconnecting: attempt ";
829 return m_Impl->ConnectAndExec(
cmd, multiline_output,
true);
834 string cmd(
"VERSION");
856 {
"throttle_by_consecutive_connection_failures",
"throttle_by_subsequent_connection_failures" }, 0);
867 const string error_rate =
registry.
Get(sections,
"throttle_by_connection_error_rate",
kEmptyStr);
869 if (error_rate.empty())
return;
871 string numerator_str, denominator_str;
873 if (!
NStr::SplitInTwo(error_rate,
"/", numerator_str, denominator_str))
return;
void Release()
Manually force the resource to be released.
CNanoTimeout – Timeout interval, using nanoseconds.
NetSchedule internal exception.
CNetRef< SNetServerConnectionImpl > m_Impl
string Exec(const string &cmd, bool multiline_output=false, const STimeout *timeout=NULL)
Execute remote command 'cmd', wait for the reply, check that it starts with 'OK:',...
CNetServerExecHandler(const string &cmd, bool multiline_output, CNetServer::SExecResult &exec_result, INetServerExecListener *exec_listener)
INetServerExecListener * m_ExecListener
CNetServer::SExecResult & m_ExecResult
virtual void Exec(CNetServerConnection::TInstance conn_impl, const STimeout *timeout)
CNetRef< SNetServerInfoImpl > m_Impl
bool GetNextAttribute(string &attr_name, string &attr_value)
Return the next attribute.
CNetServerMultilineCmdOutput()
bool ReadLine(string &output)
CNetRef< SNetServerMultilineCmdOutputImpl > m_Impl
const SSocketAddress & GetAddress() const
CNetRef< SNetServerImpl > m_Impl
CNetServerInfo GetServerInfo()
Retrieve basic information about the server as attribute name-value pairs.
SExecResult ExecWithRetry(const string &cmd, bool multiline_output=false)
Execute remote command 'cmd', wait for the reply, check if it starts with 'OK:', and return the remai...
CTimeout – Timeout interval.
virtual void OnExec(CNetServerConnection::TInstance conn_impl, const string &cmd)=0
void(*)(CSeq_entry_Handle seh, IWorkbench *wb, const CSerialObject &obj) handler
static CS_CONNECTION * conn
static SQLCHAR output[256]
static const char * str(char *buf, int n)
void Set(TValue new_value) THROWS_NONE
Set atomic counter value.
TValue Get(void) const THROWS_NONE
Get atomic counter value.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
TErrCode GetErrCode(void) const
Get error code.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
const string & GetMsg(void) const
Get message string.
bool Referenced(void) const THROWS_NONE
Check if object is referenced.
virtual const string & Get(const string §ion, const string &name, TFlags flags=0) const
Get the parameter value.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
EIO_Status Connect(const string &host, unsigned short port, const STimeout *timeout=kDefaultTimeout, TSOCK_Flags flags=fSOCK_LogDefault)
Connect to "host:port".
EIO_Status Close(void)
Close socket.
static unsigned short HostToNetShort(unsigned short value)
EIO_Status ReadLine(string &str)
Read a line from socket (up to CR-LF, LF, or null character, discarding any of the EOLs).
const STimeout * GetTimeout(EIO_Event event) const
Get timeout for I/O in the specified direction.
static string ntoa(unsigned int host)
BSD-like API. NB: when int, "host" must be in network byte order.
EIO_Status SetTimeout(EIO_Event event, const STimeout *timeout)
Set timeout for I/O in the specified direction.
EIO_Status Abort(void)
Abort socket connection.
static unsigned int NetToHostLong(unsigned int value)
EIO_Status GetStatus(EIO_Event direction) const
Return status of *last* I/O operation without making any actual I/O.
EIO_Status SOCK_Poll(size_t n, SSOCK_Poll polls[], const STimeout *timeout, size_t *n_ready)
Block until at least one of the sockets enlisted in "polls" array (of size "n") becomes available for...
EIO_Event revent
[in] one of: eIO_Open/Read/Write/ReadWrite
EIO_Status Write(const void *buf, size_t size, size_t *n_written=0, EIO_WriteMethod how=eIO_WritePersist)
Write to socket.
@ fSOCK_KeepAlive
keep socket alive (if supported by OS)
@ fSOCK_LogOff
NB: logging is inherited in accepted SOCK.
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
static string ParseEscapes(const CTempString str, EEscSeqRange mode=eEscSeqRange_Standard, char user_char='?')
Parse C-style escape sequences in the specified string.
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
@ fAllowTrailingSpaces
Ignore trailing whitespace characters.
@ fConvErr_NoThrow
Do not throw an exception on error.
@ fAllowLeadingSpaces
Ignore leading whitespace characters in converted string.
CNanoTimeout GetRemainingTime(void) const
Get time left to the expiration.
CTime & AddSecond(TSeconds seconds=1, EDaylight adl=eDaylightDefault)
Add specified seconds.
bool IsExpired(void) const
Check if the deadline is expired.
string AsString(const CTimeFormat &format=kEmptyStr, TSeconds out_tz=eCurrentTimeZone) const
Transform time to string.
double GetAsDouble(void) const
Get as number of seconds (fractional value).
CTime & SetCurrent(void)
Make the time current in the presently active time zone.
unsigned long GetAsMilliSeconds(void) const
Get as number of milliseconds.
CTime GetFastLocalTime(void)
Quick and dirty getter of local time.
void Get(unsigned int *sec, unsigned int *microsec) const
Get timeout in seconds and microseconds.
const TArgs & GetArgs(void) const
Get the const list of arguments.
unsigned long NcbiTimeoutToMs(const STimeout *timeout)
unsigned int usec
microseconds (modulo 1,000,000)
const char * IO_StatusStr(EIO_Status status)
Get the text form of an enum status value.
@ eIO_Timeout
timeout expired before any I/O succeeded
@ eIO_Success
everything is fine, no error occurred
@ eIO_ReadWrite
eIO_Read | eIO_Write (also, eCONN_OnFlush)
@ eIO_Open
also serves as no-event indicator in SOCK_Poll
@ eIO_Close
also serves as an error indicator in SOCK_Poll
Definition of all error codes used in connect services library (xconnserv.lib and others).
const TYPE & Get(const CNamedParameterList *param)
const string version
version string
chrono::system_clock::duration duration
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
void g_AppendClientIPSessionIDHitID(string &cmd)
#define CONNSERV_THROW_FMT(exception_class, err_code, server, message)
CUrlArgs::TArgs s_GetAttributes(const string &version_string)
#define END_OF_MULTILINE_OUTPUT
#define WARNING_PREFIX_LEN
static const STimeout s_ZeroTimeout
CNetServerInfo g_ServerInfoFromString(const string &server_info)
CNetServerConnection conn
void OnError(const string &err_msg, CNetServer &server)
void SetErrorHandler(TEventHandler error_handler)
INetServerConnectionListener & operator=(const INetServerConnectionListener &)
virtual TPropCreator GetPropCreator() const
function< INetServerProperties *()> TPropCreator
virtual void OnErrorImpl(const string &err_msg, CNetServer &server)=0
CNetService::TEventHandler TEventHandler
virtual void OnConnected(CNetServerConnection &connection)=0
virtual void OnWarningImpl(const string &warn_msg, CNetServer &server)=0
void SetWarningHandler(TEventHandler warning_handler)
TEventHandler m_WarningHandler
INetServerConnectionListener()=default
TEventHandler m_ErrorHandler
void OnWarning(const string &warn_msg, CNetServer &server)
SNetServerConnectionImpl(SNetServerImpl *pool)
virtual ~SNetServerConnectionImpl()
SNetServerConnectionImpl * m_NextFree
CAtomicCounter::TValue m_Generation
void ReadCmdOutputLine(string &result, bool multiline_output)
void WriteLine(const string &line)
virtual void DeleteThis()
Virtual method "deleting" this object.
static const STimeout kMaxTryTimeout
SConnectDeadline(const STimeout &conn_timeout)
CTimeout GetTotal() const
static STimeout Min(const STimeout &t1, const STimeout &t2)
const STimeout * GetRemaining() const
CRef< SNetServerInPool > m_ServerInPool
static void ConnectImpl(CSocket &, SConnectDeadline &, const SSocketAddress &, const SSocketAddress &)
void TryExec(INetServerExecHandler &handler, const STimeout *timeout=NULL)
CNetServer::SExecResult ConnectAndExec(const string &cmd, bool multiline_output, bool retry_on_exception=false)
SNetServerInPool(SSocketAddress address, INetServerProperties *server_properties, SThrottleParams throttle_params)
virtual ~SNetServerInPool()
int m_FreeConnectionListSize
void TryExec(SNetServerImpl *server, INetServerExecHandler &handler, const STimeout *timeout)
SNetServerConnectionImpl * m_FreeConnectionListHead
CNetServerPool m_ServerPool
virtual void DeleteThis()
Virtual method "deleting" this object.
CNetServerConnection GetConnectionFromPool(SNetServerImpl *server)
CAtomicCounter m_CurrentConnectionGeneration
CFastMutex m_FreeConnectionListLock
CNetServerConnection Connect(SNetServerImpl *server, const STimeout *timeout)
bool GetNextAttribute(string &attr_name, string &attr_value)
SNetServerInfoImpl(const string &version_string)
TAttributes::const_iterator m_NextAttribute
virtual ~SNetServerMultilineCmdOutputImpl()
bool m_NetCacheCompatMode
CNetServerConnection m_Connection
bool ReadLine(string &output)
CRef< INetServerConnectionListener > m_Listener
unsigned GetConnectionMaxRetries() const
unsigned long GetConnectionRetryDelay() const
static void ConnectXSite(CSocket &, SNetServerImpl::SConnectDeadline &, const SSocketAddress &, const string &)
void Init(CSynRegistry ®istry, const SRegSynonyms §ions)
constexpr static size_t kMaxDenominator
int max_consecutive_io_failures
bool throttle_until_discoverable
struct SThrottleParams::SIOFailureThreshold io_failure_threshold
bool connect_failures_only
void Init(CSynRegistry ®istry, const SRegSynonyms §ions)
void Check(SNetServerImpl *server_impl)
bool m_DiscoveredAfterThrottling
CFastMutex m_ThrottleLock
const SThrottleParams m_Params
void Adjust(SNetServerImpl *server_impl, int err_code)
pair< bitset< SThrottleParams::SIOFailureThreshold::kMaxDenominator >, size_t > m_IOFailureRegister
int m_NumberOfConsecutiveIOFailures
int g(Seg_Gsm *spe, Seq_Mtf *psm, Thd_Gsm *tdg)