40 #define NCBI_USE_ERRCODE_X Connect_ThrServer
48 CSERVER_CATCH_UNHANDLED_EXCEPTIONS);
49 typedef NCBI_PARAM_TYPE(server, Catch_Unhandled_Exceptions) TParamServerCatchExceptions;
73 string err_message(
"Error reading from the client socket (");
89 if (message_tail < 0) {
94 int consumed =
int(n_read) - message_tail;
107 const char * msg = (
const char *) data;
109 if (
size && seen_CR && msg[0] ==
'\n') {
113 for (
n = skip;
n <
size; ++
n) {
114 if (msg[
n] ==
'\r' || msg[
n] ==
'\n' || msg[
n] ==
'\0') {
115 seen_CR = msg[
n] ==
'\r';
160 m_Thread->x_UnregisterThread();
191 }
catch (
const std::exception & e) {
192 handle->MarkAsForciblyCaught();
196 handle->MarkAsForciblyCaught();
223 bool catch_all = TParamThreadPoolCatchExceptions::GetDefault();
240 const string& thr_suffix)
242 m_ThrSuffix(thr_suffix),
257 <<
"CPoolOfThreads_ForServer::~CPoolOfThreads_ForServer: "
258 <<
n <<
" thread(s) still active");
265 for (
unsigned int i = 0;
i < num_threads;
i++)
371 virtual void Cancel(
void);
389 unique_ptr<CServer_Connection>
conn(
462 virtual void Cancel(
void);
569 static const STimeout zero_timeout = {0, 0};
581 m_ConnectionPool(
NULL),
624 "CServer::SetParameters: Bad parameters");
662 vector<CSocketAPI::SPoll> polls;
664 typedef vector<IServer_ConnectionBase*> TConnsList;
665 TConnsList timer_requests;
666 TConnsList revived_conns;
667 TConnsList to_close_conns;
668 TConnsList to_delete_conns;
674 polls, timer_requests, &timer_timeout,
675 revived_conns, to_close_conns,
678 ITERATE(TConnsList, it, revived_conns) {
686 ITERATE(TConnsList, it, to_close_conns) {
693 ITERATE(TConnsList, it, to_delete_conns) {
706 timer_timeout < *timeout)) {
707 timeout = &timer_timeout;
733 if (timeout != &timer_timeout) {
738 ITERATE (vector<IServer_ConnectionBase*>, it, timer_requests)
750 ITERATE (vector<CSocketAPI::SPoll>, it, polls) {
828 "Cannot add connection, pool has overflowed.");
853 max_connections(10000),
854 temporarily_stop_listening(
false),
865 switch (GetErrCode()) {
CServer_Connection * m_Connection
virtual void Cancel(void)
virtual void Process(void)
Do the actual job Called by whichever thread handles this request.
CAcceptRequest(EServIO_Event event, CServer_ConnectionPool &conn_pool, const STimeout *timeout, CServer_Listener *listener)
It may be desirable to store handles obtained from GetHandle() in instances of CCompletingHandle to e...
void Process(void)
Do the actual job Called by whichever thread handles this request.
void Release()
Manually force the resource to be released.
virtual void Cancel(void)
CServer_Connection * m_Connection
virtual void Process(void)
Do the actual job Called by whichever thread handles this request.
CServerConnectionRequest(EServIO_Event event, CServer_ConnectionPool &conn_pool, const STimeout *timeout, CServer_Connection *connection)
const STimeout * m_IdleTimeout
CServer_Request(EServIO_Event event, CServer_ConnectionPool &conn_pool, const STimeout *timeout)
virtual void Cancel(void)=0
CServer_ConnectionPool & m_ConnPool
IServer_ConnectionFactory::
Internal header for threaded server connection pools.
static CS_CONNECTION * conn
static CNcbiApplicationGuard InstanceGuard(void)
Singleton method.
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
const string & GetProgramDisplayName(void) const
Get the application's "display" name.
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
int BUF_Write(BUF *pBuf, const void *data, size_t size)
TNCBIAtomicValue TValue
Alias TValue for TNCBIAtomicValue.
void Set(TValue new_value) THROWS_NONE
Set atomic counter value.
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
TValue Get(void) const THROWS_NONE
Get atomic counter value.
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
void Critical(CExceptionArgs_Base &args)
#define NCBI_CATCH_ALL_X(err_subcode, message)
#define STD_CATCH_ALL_X(err_subcode, message)
Standard handling of "exception"-derived exceptions; catches non-standard exceptions and generates "u...
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
void Warning(CExceptionArgs_Base &args)
virtual const char * GetErrCodeString(void) const
Get error code interpreted as text.
#define NCBI_REPORT_EXCEPTION_X(err_subcode, title, ex)
Generate a report on the exception with default error code and given subcode.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
EIO_Status Accept(CSocket *&sock, const STimeout *timeout=kInfiniteTimeout, TSOCK_Flags flags=fSOCK_LogDefault) const
static EIO_Status Poll(vector< SPoll > &polls, const STimeout *timeout, size_t *n_ready=0)
Poll a vector of CPollable objects for I/O readiness.
EIO_Status SetTimeout(EIO_Event event, const STimeout *timeout)
Set timeout for I/O in the specified direction.
void GetPeerAddress(unsigned int *host, unsigned short *port, ENH_ByteOrder byte_order) const
Get peer address.
EIO_Status Read(void *buf, size_t size, size_t *n_read=0, EIO_ReadMethod how=eIO_ReadPlain)
Read from socket.
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
bool Register(TThread &thread)
Register a thread.
CAutoUnregGuard(TThread *thr)
CMutex m_Mutex
The guard for m_MaxThreads and m_MaxUrgentThreads.
TItemHandle GetHandle(void)
TPool * m_Pool
The pool that holds this thread.
void UnRegister(TThread &)
Unregister a thread.
CAtomicCounter::TValue TACValue
void AcceptRequest(const TRequest &request)
Put a request in the queue with a given priority.
void x_UnregisterThread(void)
CPoolOfThreads_ForServer(unsigned int max_threads, const string &thr_suffix)
Constructor.
CMutex m_Mutex
Guards access to queue.
list< CRef< TThread > > TThreads
TRealQueue m_Queue
The queue.
volatile TACValue m_MaxThreads
The maximum number of threads the pool can hold.
virtual ~CPoolOfThreads_ForServer(void)
Destructor.
void KillAllThreads(bool wait)
Causes all threads in the pool to exit cleanly after finishing all pending requests,...
TItemHandle Put(const TRequest &request)
Put a request into the queue.
TItemHandle GetHandle(void)
Get the first available request from the queue, and return a handle to it.
virtual ~CThreadInPool_ForServer(void)
Destructor.
TThread * NewThread(void)
Create a new thread.
CConditionVariable m_GetCond
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
void ProcessRequest(TItemHandle handle)
Process a request.
void x_HandleOneRequest(bool catch_all)
CAtomicCounter m_ThreadCount
The current number of threads in the pool.
void Spawn(unsigned int num_threads)
Start processing threads.
@ eActive
extracted but not yet released
void StartListening(void)
virtual void Init()
Initialize the server.
virtual bool ShutdownRequested(void)
Runs synchronously between iterations.
void CloseConnection(TConnBase *conn)
Close connection as if it was initiated by server (not by client).
EServIO_Event
Extended copy of the type EIO_Event allowing to distinguish between connection closing from client an...
virtual void Exit()
Cleanup the server.
virtual void OnTimeout(void)
void RemoveConnectionFromPool(CServer_Connection *conn)
Remove externally created connection from pool.
virtual int CheckMessage(BUF *buffer, const void *data, size_t size)=0
void AddConnectionToPool(CServer_Connection *conn)
Add externally created connection to the connection pool which server polls on.
virtual void OnClose(EClosePeer)
The connection has closed (with information on type of closing)
SServer_Parameters()
Create structure with the default set of parameters.
void Erase(void)
Erase all connections.
void GetParameters(SServer_Parameters *params)
virtual ~CServer_Connection()
void DeferConnectionProcessing(IServer_ConnectionBase *conn)
Mark connection as deferred for processing, i.e.
void SetAllActive(const vector< CSocketAPI::SPoll > &polls)
void WakeUpPollCycle(void)
Force poll cycle to make another iteration.
void SetMaxConnections(unsigned max_connections)
virtual void OnRead(void)
The client has just sent data.
virtual CStdRequest * CreateRequest(EServIO_Event event, CServer_ConnectionPool &connPool, const STimeout *timeout)
SServer_Parameters * m_Parameters
const STimeout * accept_timeout
Maximum t between exit checks.
bool GetPollAndTimerVec(vector< CSocketAPI::SPoll > &polls, vector< IServer_ConnectionBase * > &timer_requests, STimeout *timer_timeout, vector< IServer_ConnectionBase * > &revived_conns, vector< IServer_ConnectionBase * > &to_close_conns, vector< IServer_ConnectionBase * > &to_delete_conns)
void AddListener(IServer_ConnectionFactory *factory, unsigned short port)
Register a listener.
vector< unsigned short > GetListenerPorts(void)
Provides a list of ports on which the server is listening.
bool Add(TConnBase *conn, EServerConnType type)
virtual void OnError(const string &)
Runs when a socket error is detected.
CPoolOfThreads_ForServer * m_ThreadPool
void CloseConnection(CSocket *sock)
Close connection.
virtual EIO_Event GetEventsToPollFor(const CTime **) const
void SetParameters(const SServer_Parameters &new_params)
unsigned int max_threads
Maximum simultaneous threads.
EServIO_Event IOEventToServIOEvent(EIO_Event event)
Transform EIO_Event type to EServIO_Event.
vector< unsigned short > GetListenerPorts(void)
Provides a list of ports on which the server is listening.
virtual void OnOverflow(EOverflowReason reason)
virtual void OnMessage(BUF buffer)=0
unique_ptr< IServer_ConnectionFactory > m_Factory
CServer_ConnectionPool * m_ConnectionPool
void Run(void)
Enter the main loop.
unsigned int max_connections
Maximum # of open connections.
int Server_CheckLineMessage(BUF *buffer, const void *data, size_t size, bool &seen_CR)
virtual const char * GetErrCodeString(void) const override
void Remove(TConnBase *conn)
virtual void ProcessTimeout(void)
Runs synchronously when no socket activity has occurred in a while (as determined by m_AcceptTimeout)...
CSocket & GetSocket(void)
Get underlying socket.
void SetConnType(TConnBase *conn, EServerConnType type)
Guard connection from out-of-order packet processing by pulling eActiveSocket's from poll vector Rese...
void SubmitRequest(const CRef< CStdRequest > &request)
Submit request to be executed by the server thread pool.
friend class CAcceptRequest
virtual void OnTimeout(void)
Runs when a client has been idle for too long, prior to closing the connection [synchronous].
unsigned int m_MaxThreads
Maximum simultaneous threads.
bool RemoveListener(unsigned short port)
unsigned int init_threads
Number of initial threads.
virtual CStdRequest * CreateRequest(EServIO_Event event, CServer_ConnectionPool &connPool, const STimeout *timeout)
virtual CStdRequest * CreateRequest(EServIO_Event event, CServer_ConnectionPool &connPool, const STimeout *timeout)=0
const STimeout * idle_timeout
For how long to keep inactive non-listening sockets open (default: 10 minutes)
void OnSocketEvent(EServIO_Event event)
virtual bool IsOpen(void)
void StartListening(void)
Start listening before the main loop.
unique_ptr< IServer_ConnectionHandler > m_Handler
void PingControlConnection(void)
bool RemoveListener(unsigned short port)
Removes a listener.
@ eClientClose
Connection closed by other peer.
@ eOurClose
Connection closed by ourselves.
@ ePoolOverflow
Connection pool overflowed.
@ eBadParameters
Out-of-range parameters given.
@ eCouldntListen
Unable to bind listening port.
static void Exit(void *exit_data)
Cancel current thread.
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
bool WaitForSignal(CMutex &mutex, const CDeadline &deadline=CDeadline::eInfinite)
Release mutex and lock the calling thread until the condition variable is signalled.
void SignalAll(void)
Wake all threads that are currently waiting on the condition variable.
unsigned int usec
microseconds (modulo 1,000,000)
const char * IO_StatusStr(EIO_Status status)
Get the text form of an enum status value.
@ eIO_Timeout
timeout expired before any I/O succeeded
@ eIO_Success
everything is fine, no error occurred
@ eIO_ReadWrite
eIO_Read | eIO_Write (also, eCONN_OnFlush)
@ eIO_Close
also serves as an error indicator in SOCK_Poll
unsigned int
A callback function used to compare two keys in a database.
Definition of all error codes used in connect library (xconnect.lib, xconnext.lib etc).
const struct ncbi::grid::netcache::search::fields::SIZE size
static const STimeout kZeroTimeout
static pcre_uint8 * buffer
NCBI_PARAM_DECL(bool, server, Catch_Unhandled_Exceptions)
static CSafeStatic< TParamServerCatchExceptions > s_ServerCatchExceptions
static bool operator<(const STimeout &to1, const STimeout &to2)
NCBI_PARAM_DEF_EX(bool, server, Catch_Unhandled_Exceptions, true, 0, CSERVER_CATCH_UNHANDLED_EXCEPTIONS)
typedef NCBI_PARAM_TYPE(server, Catch_Unhandled_Exceptions) TParamServerCatchExceptions
static const STimeout k_DefaultIdleTimeout
CRef< CTestThread > thr[k_NumThreadsMax]
unsigned read_buf(z_streamp strm, Bytef *buf, unsigned size)