NCBI C++ ToolKit
server.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: server.cpp 91806 2020-12-14 15:02:47Z grichenk $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Aaron Ucko, Victor Joukov
27  *
28  */
29 
30 /// @file server.cpp
31 /// Framework for a multithreaded network server
32 
33 #include <ncbi_pch.hpp>
34 #include <corelib/ncbi_param.hpp>
35 #include "connection_pool.hpp"
36 #include <connect/ncbi_buffer.h>
37 #include <connect/error_codes.hpp>
38 
39 
40 #define NCBI_USE_ERRCODE_X Connect_ThrServer
41 
42 
44 
45 
46 NCBI_PARAM_DECL(bool, server, Catch_Unhandled_Exceptions);
47 NCBI_PARAM_DEF_EX(bool, server, Catch_Unhandled_Exceptions, true, 0,
48  CSERVER_CATCH_UNHANDLED_EXCEPTIONS);
49 typedef NCBI_PARAM_TYPE(server, Catch_Unhandled_Exceptions) TParamServerCatchExceptions;
51 
52 
53 /////////////////////////////////////////////////////////////////////////////
54 // IServer_MessageHandler implementation
56 {
57  CSocket & socket = GetSocket();
58  CServer_Connection * conn = static_cast<CServer_Connection*>(&socket);
59  char read_buf[4096];
60  size_t n_read;
61  EIO_Status status = socket.Read(read_buf, sizeof(read_buf),
62  &n_read);
63  switch (status) {
64  case eIO_Success:
65  break;
66  case eIO_Timeout:
67  this->OnTimeout();
68  return;
69  case eIO_Closed:
71  return;
72  default:
73  string err_message("Error reading from the client socket (");
74 
75  err_message += socket.GetPeerAddress() + "): " +
76  string(IO_StatusStr(status)) + "(" +
77  NStr::NumericToString(static_cast<int>(status)) + ")";
78 
79  this->OnError(err_message);
80  return;
81  }
82 
83  int message_tail;
84  char * buf_ptr = read_buf;
85  for ( ;n_read > 0 && conn->type == eActiveSocket; ) {
86  message_tail = this->CheckMessage(&m_Buffer, buf_ptr, n_read);
87 
88  // TODO: what should we do if message_tail > n_read?
89  if (message_tail < 0) {
90  return;
91  }
92 
93  this->OnMessage(m_Buffer);
94  int consumed = int(n_read) - message_tail;
95  buf_ptr += consumed;
96  n_read -= consumed;
97  }
98 }
99 
100 
101 /////////////////////////////////////////////////////////////////////////////
102 // Server_CheckLineMessage implementation
103 int Server_CheckLineMessage(BUF* buffer, const void *data, size_t size,
104  bool& seen_CR)
105 {
106  size_t n, skip;
107  const char * msg = (const char *) data;
108  skip = 0;
109  if (size && seen_CR && msg[0] == '\n') {
110  ++skip;
111  }
112  seen_CR = false;
113  for (n = skip; n < size; ++n) {
114  if (msg[n] == '\r' || msg[n] == '\n' || msg[n] == '\0') {
115  seen_CR = msg[n] == '\r';
116  break;
117  }
118  }
119  BUF_Write(buffer, msg+skip, n-skip);
120  return int(size - n - 1);
121 }
122 
123 
126 {
127  CMutexGuard guard(m_Mutex);
128  if (m_Queue.empty()) {
130  }
131  TItemHandle handle(new CQueueItem(data));
132  m_Queue.push_back(handle);
133  return handle;
134 }
135 
138 {
139  CMutexGuard guard(m_Mutex);
140 
141  while (m_Queue.empty()) {
143  }
144 
145  TItemHandle handle(m_Queue.front());
146  m_Queue.pop_front();
147 
148  guard.Release(); // avoid possible deadlocks from x_SetStatus
149  handle->x_SetStatus(CQueueItemBase::eActive);
150  return handle;
151 }
152 
153 
155  : m_Thread(thr)
156 {}
157 
159 {
160  m_Thread->x_UnregisterThread();
161 }
162 
163 void
165 {
166  _ASSERT( !m_Counted );
168  m_Counted = true;
169 }
170 
172 {
173  if (m_Counted) {
174  m_Pool->m_ThreadCount.Add(-1);
175  }
176 }
177 
178 void
180 {
181  m_Pool->UnRegister(*this);
182 }
183 
184 void
186 {
187  TItemHandle handle(m_Pool->GetHandle());
188  if (catch_all) {
189  try {
190  ProcessRequest(handle);
191  } catch (const std::exception & e) {
192  handle->MarkAsForciblyCaught();
193  NCBI_REPORT_EXCEPTION_X(9, "Exception from thread in pool: ", e);
194  // throw;
195  } catch (...) {
196  handle->MarkAsForciblyCaught();
197  // silently propagate non-standard exceptions because they're
198  // likely to be CExitThreadException.
199  throw;
200  }
201  }
202  else {
203  ProcessRequest(handle);
204  }
205 }
206 
207 void*
209 {
210  if (!m_Pool->m_ThrSuffix.empty()) {
212  string thr_name = app ? app->GetProgramDisplayName() : "";
213  thr_name += m_Pool->m_ThrSuffix;
214  SetCurrentThreadName(thr_name.c_str());
215  }
216 
217  if ( !m_Pool->Register(*this) ) {
218  ERR_POST(Warning << "New worker thread blocked at the last minute.");
219  return NULL;
220  }
221  CAutoUnregGuard guard(this);
222 
223  bool catch_all = TParamThreadPoolCatchExceptions::GetDefault();
224  for (;;) {
225  x_HandleOneRequest(catch_all);
226  }
227 
228  return NULL;
229 }
230 
231 void
233 {
234  TCompletingHandle completer = handle;
235  ProcessRequest(completer->GetRequest());
236 }
237 
238 
240  const string& thr_suffix)
241  : m_MaxThreads(max_threads),
242  m_ThrSuffix(thr_suffix),
243  m_KilledAll(false)
244 {
245  m_ThreadCount.Set(0);
246 }
247 
249 {
250  try {
251  KillAllThreads(false);
252  } catch(...) {} // Just to be sure that we will not throw from the destructor.
253 
255  if (n) {
256  ERR_POST_X(10, Warning
257  << "CPoolOfThreads_ForServer::~CPoolOfThreads_ForServer: "
258  << n << " thread(s) still active");
259  }
260 }
261 
262 void
263 CPoolOfThreads_ForServer::Spawn(unsigned int num_threads)
264 {
265  for (unsigned int i = 0; i < num_threads; i++)
266  {
268  thr->CountSelf();
269  thr->Run();
270  }
271 }
272 
273 void
275 {
276  m_Queue.Put(req);
277 }
278 
281 {
282  return m_Queue.GetHandle();
283 }
284 
285 
287 {
288 protected:
289  void Process(void) { CThread::Exit(0); } // Kill the current thread
290 };
291 
292 
293 void
295 {
296  m_KilledAll = true;
297 
299 
300  for (TACValue i = 0; i < m_MaxThreads; ++i) {
301  AcceptRequest(poison);
302  }
304  if (wait) {
305  (*it)->Join();
306  } else {
307  (*it)->Detach();
308  }
309  }
310  m_Threads.clear();
311 }
312 
313 
314 bool
316 {
317  CMutexGuard guard(m_Mutex);
318  if (m_KilledAll) {
319  return false;
320  } else {
321  m_Threads.push_back(CRef<TThread>(&thread));
322  return true;
323  }
324 }
325 
326 void
328 {
329  CMutexGuard guard(m_Mutex);
330  if (!m_KilledAll) {
331  TThreads::iterator it = find(m_Threads.begin(), m_Threads.end(),
332  CRef<TThread>(&thread));
333  if (it != m_Threads.end()) {
334  (*it)->Detach();
335  m_Threads.erase(it);
336  }
337  }
338 }
339 
340 
341 
342 /////////////////////////////////////////////////////////////////////////////
343 // Abstract class for CAcceptRequest and CServerConnectionRequest
345 {
346 public:
348  CServer_ConnectionPool& conn_pool,
349  const STimeout* timeout)
350  : m_Event(event), m_ConnPool(conn_pool), m_IdleTimeout(timeout) {}
351 
352  virtual void Cancel(void) = 0;
353 
354 protected:
358 } ;
359 
360 
361 /////////////////////////////////////////////////////////////////////////////
362 // CAcceptRequest
364 {
365 public:
367  CServer_ConnectionPool& conn_pool,
368  const STimeout* timeout,
369  CServer_Listener* listener);
370  virtual void Process(void);
371  virtual void Cancel(void);
372 private:
373  void x_DoProcess(void);
374 
376 } ;
377 
379  CServer_ConnectionPool& conn_pool,
380  const STimeout* timeout,
381  CServer_Listener* listener)
382  : CServer_Request(event, conn_pool, timeout),
383  m_Connection(NULL)
384 {
385  // Accept connection in main thread to avoid race for listening
386  // socket's accept method, but postpone connection's OnOpen for
387  // pool thread because it can be arbitrarily long.
388  static const STimeout kZeroTimeout = { 0, 0 };
389  unique_ptr<CServer_Connection> conn(
390  new CServer_Connection(listener->m_Factory->Create()));
391  if (listener->Accept(*conn, &kZeroTimeout) != eIO_Success)
392  return;
393 /*
394 #ifdef NCBI_OS_UNIX
395  if (conn->Wait(eIO_Write, &kZeroTimeout) == eIO_Unknown) {
396  int fd;
397  _VERIFY(conn->GetOSHandle(&fd, sizeof(fd)) == eIO_Success);
398  if (fd >= 1024) {
399  ERR_POST(Error << "Accepted unpollable file descriptor "
400  << fd << ", aborting connection");
401  conn->OnOverflow(eOR_UnpollableSocket);
402  conn->Abort();
403  return;
404  }
405  }
406 #endif
407 */
408  conn->SetTimeout(eIO_ReadWrite, m_IdleTimeout);
409  m_Connection = conn.release();
410 }
411 
413 {
417  }
418  else {
419  // The connection pool is full
420  // This place is the only one which can call OnOverflow now
422  delete m_Connection;
423  }
424 }
425 
427 {
428  if (!m_Connection) return;
429  if (s_ServerCatchExceptions->Get()) {
430  try {
431  x_DoProcess();
432  } STD_CATCH_ALL_X(5, "CAcceptRequest::Process");
433  }
434  else {
435  x_DoProcess();
436  }
437 }
438 
440 {
441  // As of now, Cancel can not be called.
442  // See comment at CServer::CreateRequest
443  if (m_Connection) {
445  delete m_Connection;
446  }
447 }
448 
449 /////////////////////////////////////////////////////////////////////////////
450 // CServerConnectionRequest
452 {
453 public:
455  CServer_ConnectionPool& conn_pool,
456  const STimeout* timeout,
457  CServer_Connection* connection)
458  : CServer_Request(event, conn_pool, timeout),
459  m_Connection(connection)
460  {}
461  virtual void Process(void);
462  virtual void Cancel(void);
463 private:
464  void x_Process(void);
466 } ;
467 
468 
470 {
471  try {
473  } catch (...) {
475  throw;
476  }
477 }
478 
479 
481 {
482  if (s_ServerCatchExceptions->Get()) {
483  try {
484  x_Process();
485  } NCBI_CATCH_ALL_X(6, "CServerConnectionRequest::Process");
486  }
487  else {
488  x_Process();
489  }
491  // Return socket to poll vector
493  }
494 }
495 
496 
498 {
499  // As of now, Cancel can not be called.
500  // See comment at CServer::CreateRequest
502  // Return socket to poll vector
504 }
505 
506 
507 /////////////////////////////////////////////////////////////////////////////
508 // CServer_Listener
511  CServer_ConnectionPool& conn_pool,
512  const STimeout* timeout)
513 {
514  return new CAcceptRequest(event, conn_pool, timeout, this);
515 }
516 
517 
518 /////////////////////////////////////////////////////////////////////////////
519 // CServer_Connection
522  CServer_ConnectionPool& conn_pool,
523  const STimeout* timeout)
524 {
525  return new CServerConnectionRequest(event, conn_pool, timeout, this);
526 }
527 
529 {
530  return m_Open;
531 }
532 
534 {
535  switch (event) {
536  case eServIO_Open:
537  m_Handler->OnOpen();
538  break;
539  case eServIO_OurClose:
541  m_Open = false;
542  break;
543  case eServIO_ClientClose:
545  m_Open = false;
546  break;
547  case eServIO_Inactivity:
548  OnTimeout();
550  //m_Open = false;
551  // fall through
552  case eServIO_Delete:
553  delete this;
554  break;
555  case eServIO_Alarm:
556  m_Handler->OnTimer();
557  break;
558  default:
559  if (eServIO_Read & event)
560  m_Handler->OnRead();
561  if (eServIO_Write & event)
562  m_Handler->OnWrite();
563  break;
564  }
565 }
566 
568 {
569  static const STimeout zero_timeout = {0, 0};
570 
571  // Set zero timeout to prevent the socket from sitting in
572  // TIME_WAIT state on the server.
573  SetTimeout(eIO_Close, &zero_timeout);
574 }
575 
576 /////////////////////////////////////////////////////////////////////////////
577 // CServer implementation
578 
580  m_Parameters(new SServer_Parameters()),
581  m_ConnectionPool(NULL),
582  m_ThreadPool(NULL)
583 {
584  try {
587  } catch (...) {
588  delete m_Parameters;
589  throw;
590  }
591 }
592 
593 
595 {
596  delete m_ThreadPool;
597  m_ThreadPool = NULL;
598  delete m_ConnectionPool;
600  delete m_Parameters;
601  m_Parameters = NULL;
602 }
603 
604 
606  unsigned short port)
607 {
608  m_ConnectionPool->Add(new CServer_Listener(factory, port), eListener);
609 }
610 
611 
612 bool CServer::RemoveListener(unsigned short port)
613 {
614  return m_ConnectionPool->RemoveListener(port);
615 }
616 
617 
619 {
620  if (new_params.init_threads <= 0 ||
621  new_params.max_threads < new_params.init_threads ||
622  new_params.max_threads > 1000) {
623  NCBI_THROW(CServer_Exception, eBadParameters,
624  "CServer::SetParameters: Bad parameters");
625  }
626  *m_Parameters = new_params;
628 }
629 
630 
632 {
633  *params = *m_Parameters;
634 }
635 
636 
638 {
640 }
641 
642 
644 {
646  static_cast<CServer_Connection*>(sock)));
647 }
648 
649 
650 static inline bool operator <(const STimeout& to1, const STimeout& to2)
651 {
652  return to1.sec != to2.sec ? to1.sec < to2.sec : to1.usec < to2.usec;
653 }
654 
655 
657 {
659 
660  Init();
661 
662  vector<CSocketAPI::SPoll> polls;
663  size_t count;
664  typedef vector<IServer_ConnectionBase*> TConnsList;
665  TConnsList timer_requests;
666  TConnsList revived_conns;
667  TConnsList to_close_conns;
668  TConnsList to_delete_conns;
669  STimeout timer_timeout;
670  const STimeout* timeout;
671 
672  while (!ShutdownRequested()) {
673  bool has_timer = m_ConnectionPool->GetPollAndTimerVec(
674  polls, timer_requests, &timer_timeout,
675  revived_conns, to_close_conns,
676  to_delete_conns);
677 
678  ITERATE(TConnsList, it, revived_conns) {
679  IServer_ConnectionBase* conn_base = *it;
681  CRef<CStdRequest> req(conn_base->CreateRequest(
682  evt, *m_ConnectionPool,
685  }
686  ITERATE(TConnsList, it, to_close_conns) {
687  IServer_ConnectionBase* conn_base = *it;
688  CRef<CStdRequest> req(conn_base->CreateRequest(
692  }
693  ITERATE(TConnsList, it, to_delete_conns) {
694  IServer_ConnectionBase* conn_base = *it;
695  CRef<CStdRequest> req(conn_base->CreateRequest(
699  }
700 
701  timeout = m_Parameters->accept_timeout;
702 
703  if (has_timer &&
704  (timeout == kDefaultTimeout ||
705  timeout == kInfiniteTimeout ||
706  timer_timeout < *timeout)) {
707  timeout = &timer_timeout;
708  }
709 
710  EIO_Status status = CSocketAPI::Poll(polls, timeout, &count);
711 
712  if (status != eIO_Success && status != eIO_Timeout) {
713  int x_errno = errno;
714  const char* temp = IO_StatusStr(status);
715  string ststr(temp
716  ? temp
717  : NStr::UIntToString((unsigned int) status));
718  string erstr;
719  if (x_errno) {
720  erstr = ", {" + NStr::IntToString(x_errno);
721  if (temp && *temp) {
722  erstr += ',';
723  erstr += temp;
724  }
725  erstr += '}';
726  }
727  ERR_POST_X(8, Critical << "Poll failed with status "
728  << ststr << erstr);
729  continue;
730  }
731 
732  if (count == 0) {
733  if (timeout != &timer_timeout) {
734  ProcessTimeout();
735  }
736  else {
737  m_ConnectionPool->SetAllActive(timer_requests);
738  ITERATE (vector<IServer_ConnectionBase*>, it, timer_requests)
739  {
740  IServer_ConnectionBase* conn_base = *it;
741  CRef<CStdRequest> req(conn_base->CreateRequest(
742  eServIO_Alarm, *m_ConnectionPool, timeout));
744  }
745  }
746  continue;
747  }
748 
750  ITERATE (vector<CSocketAPI::SPoll>, it, polls) {
751  if (!it->m_REvent)
752  continue;
753  CTrigger * trigger = dynamic_cast<CTrigger *>(it->m_Pollable);
754  if (trigger) {
755  trigger->Reset();
756  continue;
757  }
758 
759  IServer_ConnectionBase* conn_base =
760  dynamic_cast<IServer_ConnectionBase*>(it->m_Pollable);
761  _ASSERT(conn_base);
762  CRef<CStdRequest> req(conn_base->CreateRequest(
763  IOEventToServIOEvent(it->m_REvent),
767  }
768  }
769 }
770 
771 void CServer::Run(void)
772 {
773  StartListening(); // detect unavailable ports ASAP
774 
777  if (s_ServerCatchExceptions->Get()) {
778  try {
779  x_DoRun();
780  } catch (const CException & ex) {
781  ERR_POST(ex);
782  // Avoid collateral damage from destroying the thread pool
783  // while worker threads are active (or, worse, initializing).
786  throw;
787  }
788  }
789  else {
790  x_DoRun();
791  }
792 
793  // We need to kill all processing threads first, so that there
794  // is no request with already destroyed connection left.
796  Exit();
797  // We stop listening only here to provide port lock until application
798  // cleaned up after execution.
800  // Here we finally free to erase connection pool.
802 }
803 
804 
806 {
807  if (m_ThreadPool != NULL)
808  m_ThreadPool->AcceptRequest(request);
809 }
810 
811 
813 {
815 }
816 
817 
819 {
821 }
822 
823 
825 {
827  NCBI_THROW(CServer_Exception, ePoolOverflow,
828  "Cannot add connection, pool has overflowed.");
829  }
830 }
831 
833 {
835 }
836 
838 {
840 }
841 
842 vector<unsigned short> CServer::GetListenerPorts(void)
843 {
845 }
846 
847 /////////////////////////////////////////////////////////////////////////////
848 // SServer_Parameters implementation
849 
850 static const STimeout k_DefaultIdleTimeout = { 600, 0 };
851 
853  max_connections(10000),
854  temporarily_stop_listening(false),
855  accept_timeout(kInfiniteTimeout),
856  idle_timeout(&k_DefaultIdleTimeout),
857  init_threads(5),
858  max_threads(10),
859  spawn_threshold(1)
860 {
861 }
862 
863 const char* CServer_Exception::GetErrCodeString(void) const
864 {
865  switch (GetErrCode()) {
866  case eBadParameters: return "eBadParameters";
867  case eCouldntListen: return "eCouldntListen";
868  case ePoolOverflow: return "ePoolOverflow";
869  default: return CException::GetErrCodeString();
870  }
871 }
872 
#define false
Definition: bool.h:36
CServer_Connection * m_Connection
Definition: server.cpp:375
virtual void Cancel(void)
Definition: server.cpp:439
virtual void Process(void)
Do the actual job Called by whichever thread handles this request.
Definition: server.cpp:426
CAcceptRequest(EServIO_Event event, CServer_ConnectionPool &conn_pool, const STimeout *timeout, CServer_Listener *listener)
Definition: server.cpp:378
void x_DoProcess(void)
Definition: server.cpp:412
It may be desirable to store handles obtained from GetHandle() in instances of CCompletingHandle to e...
void Process(void)
Do the actual job Called by whichever thread handles this request.
Definition: server.cpp:289
void Release()
Manually force the resource to be released.
Definition: guard.hpp:166
CRef –.
Definition: ncbiobj.hpp:618
CSafeStatic<>::
virtual void Cancel(void)
Definition: server.cpp:497
CServer_Connection * m_Connection
Definition: server.cpp:465
virtual void Process(void)
Do the actual job Called by whichever thread handles this request.
Definition: server.cpp:480
CServerConnectionRequest(EServIO_Event event, CServer_ConnectionPool &conn_pool, const STimeout *timeout, CServer_Connection *connection)
Definition: server.cpp:454
CServer_Exception::
Definition: server.hpp:465
const STimeout * m_IdleTimeout
Definition: server.cpp:357
CServer_Request(EServIO_Event event, CServer_ConnectionPool &conn_pool, const STimeout *timeout)
Definition: server.cpp:347
EServIO_Event m_Event
Definition: server.cpp:355
virtual void Cancel(void)=0
CServer_ConnectionPool & m_ConnPool
Definition: server.cpp:356
CSocket::
CTrigger::
Definition: ncbi_socket.hpp:88
IServer_ConnectionFactory::
Definition: server.hpp:388
Internal header for threaded server connection pools.
static CS_CONNECTION * conn
Definition: ct_dynamic.c:25
static CNcbiApplicationGuard InstanceGuard(void)
Singleton method.
Definition: ncbiapp.cpp:133
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
const string & GetProgramDisplayName(void) const
Get the application's "display" name.
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
int BUF_Write(BUF *pBuf, const void *data, size_t size)
Definition: ncbi_buffer.c:224
string
Definition: cgiapp.hpp:687
#define NULL
Definition: ncbistd.hpp:225
TNCBIAtomicValue TValue
Alias TValue for TNCBIAtomicValue.
Definition: ncbicntr.hpp:73
void Set(TValue new_value) THROWS_NONE
Set atomic counter value.
Definition: ncbicntr.hpp:185
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
Definition: ncbicntr.hpp:278
TValue Get(void) const THROWS_NONE
Get atomic counter value.
Definition: ncbicntr.hpp:168
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
Definition: ncbidiag.hpp:550
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1203
#define NCBI_CATCH_ALL_X(err_subcode, message)
Definition: ncbiexpt.hpp:619
#define STD_CATCH_ALL_X(err_subcode, message)
Standard handling of "exception"-derived exceptions; catches non-standard exceptions and generates "u...
Definition: ncbiexpt.hpp:608
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
virtual const char * GetErrCodeString(void) const
Get error code interpreted as text.
Definition: ncbiexpt.cpp:444
#define NCBI_REPORT_EXCEPTION_X(err_subcode, title, ex)
Generate a report on the exception with default error code and given subcode.
Definition: ncbiexpt.hpp:761
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
EIO_Status Accept(CSocket *&sock, const STimeout *timeout=kInfiniteTimeout, TSOCK_Flags flags=fSOCK_LogDefault) const
static EIO_Status Poll(vector< SPoll > &polls, const STimeout *timeout, size_t *n_ready=0)
Poll a vector of CPollable objects for I/O readiness.
EIO_Status SetTimeout(EIO_Event event, const STimeout *timeout)
Set timeout for I/O in the specified direction.
EIO_Status Reset(void)
void GetPeerAddress(unsigned int *host, unsigned short *port, ENH_ByteOrder byte_order) const
Get peer address.
EIO_Status Read(void *buf, size_t size, size_t *n_read=0, EIO_ReadMethod how=eIO_ReadPlain)
Read from socket.
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
Definition: ncbistr.hpp:5083
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
Definition: ncbistr.hpp:5108
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
void CountSelf(void)
Definition: server.cpp:164
bool Register(TThread &thread)
Register a thread.
Definition: server.cpp:315
CMutex m_Mutex
The guard for m_MaxThreads and m_MaxUrgentThreads.
TItemHandle GetHandle(void)
Definition: server.cpp:280
TPool * m_Pool
The pool that holds this thread.
void UnRegister(TThread &)
Unregister a thread.
Definition: server.cpp:327
CAtomicCounter::TValue TACValue
void AcceptRequest(const TRequest &request)
Put a request in the queue with a given priority.
Definition: server.cpp:274
void x_UnregisterThread(void)
Definition: server.cpp:179
CPoolOfThreads_ForServer(unsigned int max_threads, const string &thr_suffix)
Constructor.
Definition: server.cpp:239
CMutex m_Mutex
Guards access to queue.
list< CRef< TThread > > TThreads
volatile TACValue m_MaxThreads
The maximum number of threads the pool can hold.
virtual ~CPoolOfThreads_ForServer(void)
Destructor.
Definition: server.cpp:248
void KillAllThreads(bool wait)
Causes all threads in the pool to exit cleanly after finishing all pending requests,...
Definition: server.cpp:294
TItemHandle Put(const TRequest &request)
Put a request into the queue.
Definition: server.cpp:125
TItemHandle GetHandle(void)
Get the first available request from the queue, and return a handle to it.
Definition: server.cpp:137
virtual ~CThreadInPool_ForServer(void)
Destructor.
Definition: server.cpp:171
TThread * NewThread(void)
Create a new thread.
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
Definition: server.cpp:208
void ProcessRequest(TItemHandle handle)
Process a request.
Definition: server.cpp:232
void x_HandleOneRequest(bool catch_all)
Definition: server.cpp:185
CAtomicCounter m_ThreadCount
The current number of threads in the pool.
void Spawn(unsigned int num_threads)
Start processing threads.
Definition: server.cpp:263
@ eActive
extracted but not yet released
virtual void Init()
Initialize the server.
Definition: server.hpp:173
virtual bool ShutdownRequested(void)
Runs synchronously between iterations.
Definition: server.hpp:191
void CloseConnection(TConnBase *conn)
Close connection as if it was initiated by server (not by client).
EServIO_Event
Extended copy of the type EIO_Event allowing to distinguish between connection closing from client an...
Definition: server.hpp:61
virtual void Exit()
Cleanup the server.
Definition: server.hpp:181
virtual void OnTimeout(void)
void RemoveConnectionFromPool(CServer_Connection *conn)
Remove externally created connection from pool.
Definition: server.cpp:832
virtual int CheckMessage(BUF *buffer, const void *data, size_t size)=0
void AddConnectionToPool(CServer_Connection *conn)
Add externally created connection to the connection pool which server polls on.
Definition: server.cpp:824
virtual void OnClose(EClosePeer)
The connection has closed (with information on type of closing)
Definition: server.hpp:274
SServer_Parameters()
Create structure with the default set of parameters.
Definition: server.cpp:852
void Erase(void)
Erase all connections.
void GetParameters(SServer_Parameters *params)
Definition: server.cpp:631
virtual ~CServer_Connection()
Definition: server.cpp:567
void DeferConnectionProcessing(IServer_ConnectionBase *conn)
Mark connection as deferred for processing, i.e.
Definition: server.cpp:812
void SetAllActive(const vector< CSocketAPI::SPoll > &polls)
void WakeUpPollCycle(void)
Force poll cycle to make another iteration.
Definition: server.cpp:837
void SetMaxConnections(unsigned max_connections)
virtual void OnRead(void)
The client has just sent data.
Definition: server.cpp:55
virtual CStdRequest * CreateRequest(EServIO_Event event, CServer_ConnectionPool &connPool, const STimeout *timeout)
Definition: server.cpp:510
SServer_Parameters * m_Parameters
Definition: server.hpp:199
const STimeout * accept_timeout
Maximum t between exit checks.
Definition: server.hpp:440
bool GetPollAndTimerVec(vector< CSocketAPI::SPoll > &polls, vector< IServer_ConnectionBase * > &timer_requests, STimeout *timer_timeout, vector< IServer_ConnectionBase * > &revived_conns, vector< IServer_ConnectionBase * > &to_close_conns, vector< IServer_ConnectionBase * > &to_delete_conns)
void AddListener(IServer_ConnectionFactory *factory, unsigned short port)
Register a listener.
Definition: server.cpp:605
vector< unsigned short > GetListenerPorts(void)
Provides a list of ports on which the server is listening.
bool Add(TConnBase *conn, EServerConnType type)
virtual void OnError(const string &)
Runs when a socket error is detected.
Definition: server.hpp:291
CPoolOfThreads_ForServer * m_ThreadPool
Definition: server.hpp:201
void CloseConnection(CSocket *sock)
Close connection.
Definition: server.cpp:643
virtual EIO_Event GetEventsToPollFor(const CTime **) const
void SetParameters(const SServer_Parameters &new_params)
Definition: server.cpp:618
unsigned int max_threads
Maximum simultaneous threads.
Definition: server.hpp:447
EServIO_Event IOEventToServIOEvent(EIO_Event event)
Transform EIO_Event type to EServIO_Event.
Definition: server.hpp:76
vector< unsigned short > GetListenerPorts(void)
Provides a list of ports on which the server is listening.
Definition: server.cpp:842
virtual void OnOverflow(EOverflowReason reason)
virtual void OnMessage(BUF buffer)=0
unique_ptr< IServer_ConnectionFactory > m_Factory
CServer_ConnectionPool * m_ConnectionPool
Definition: server.hpp:200
void Run(void)
Enter the main loop.
Definition: server.cpp:771
unsigned int max_connections
Maximum # of open connections.
Definition: server.hpp:436
int Server_CheckLineMessage(BUF *buffer, const void *data, size_t size, bool &seen_CR)
Definition: server.cpp:103
string m_ThreadSuffix
Definition: server.hpp:202
virtual const char * GetErrCodeString(void) const override
Definition: server.cpp:863
void Remove(TConnBase *conn)
virtual void ProcessTimeout(void)
Runs synchronously when no socket activity has occurred in a while (as determined by m_AcceptTimeout)...
Definition: server.hpp:186
CSocket & GetSocket(void)
Get underlying socket.
Definition: server.hpp:294
void SetConnType(TConnBase *conn, EServerConnType type)
Guard connection from out-of-order packet processing by pulling eActiveSocket's from poll vector Rese...
void SubmitRequest(const CRef< CStdRequest > &request)
Submit request to be executed by the server thread pool.
Definition: server.cpp:805
friend class CAcceptRequest
virtual void OnTimeout(void)
Runs when a client has been idle for too long, prior to closing the connection [synchronous].
Definition: server.hpp:278
unsigned int m_MaxThreads
Maximum simultaneous threads.
bool RemoveListener(unsigned short port)
unsigned int init_threads
Number of initial threads.
Definition: server.hpp:446
virtual CStdRequest * CreateRequest(EServIO_Event event, CServer_ConnectionPool &connPool, const STimeout *timeout)
Definition: server.cpp:521
virtual CStdRequest * CreateRequest(EServIO_Event event, CServer_ConnectionPool &connPool, const STimeout *timeout)=0
const STimeout * idle_timeout
For how long to keep inactive non-listening sockets open (default: 10 minutes)
Definition: server.hpp:443
void OnSocketEvent(EServIO_Event event)
Definition: server.cpp:533
void x_DoRun(void)
Definition: server.cpp:656
CServer(void)
Definition: server.cpp:579
virtual bool IsOpen(void)
Definition: server.cpp:528
void StartListening(void)
Start listening before the main loop.
Definition: server.cpp:637
unique_ptr< IServer_ConnectionHandler > m_Handler
virtual ~CServer()
Definition: server.cpp:594
bool RemoveListener(unsigned short port)
Removes a listener.
Definition: server.cpp:612
@ eServIO_ClientClose
Definition: server.hpp:66
@ eServIO_Write
Definition: server.hpp:64
@ eServIO_Open
Definition: server.hpp:62
@ eServIO_Alarm
Definition: server.hpp:70
@ eServIO_Inactivity
Definition: server.hpp:68
@ eServIO_OurClose
Definition: server.hpp:67
@ eServIO_Read
Definition: server.hpp:63
@ eServIO_Delete
Definition: server.hpp:69
@ eClientClose
Connection closed by other peer.
Definition: server.hpp:270
@ eOurClose
Connection closed by ourselves.
Definition: server.hpp:269
@ eActiveSocket
@ ePreDeferredSocket
@ eInactiveSocket
@ eListener
@ eOR_ConnectionPoolFull
Definition: server.hpp:212
@ eOR_RequestQueueFull
Definition: server.hpp:213
@ ePoolOverflow
Connection pool overflowed.
Definition: server.hpp:470
@ eBadParameters
Out-of-range parameters given.
Definition: server.hpp:468
@ eCouldntListen
Unable to bind listening port.
Definition: server.hpp:469
static void Exit(void *exit_data)
Cancel current thread.
Definition: ncbithr.cpp:906
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
Definition: ncbithr.cpp:958
bool WaitForSignal(CMutex &mutex, const CDeadline &deadline=CDeadline::eInfinite)
Release mutex and lock the calling thread until the condition variable is signalled.
Definition: ncbimtx.cpp:2554
void SignalAll(void)
Wake all threads that are currently waiting on the condition variable.
Definition: ncbimtx.cpp:2616
#define kInfiniteTimeout
Definition: ncbi_types.h:82
EIO_Status
I/O status.
Definition: ncbi_core.h:132
unsigned int usec
microseconds (modulo 1,000,000)
Definition: ncbi_types.h:78
const char * IO_StatusStr(EIO_Status status)
Get the text form of an enum status value.
Definition: ncbi_core.c:56
unsigned int sec
seconds
Definition: ncbi_types.h:77
#define kDefaultTimeout
Definition: ncbi_types.h:81
@ eIO_Timeout
timeout expired before any I/O succeeded
Definition: ncbi_core.h:134
@ eIO_Success
everything is fine, no error occurred
Definition: ncbi_core.h:133
@ eIO_ReadWrite
eIO_Read | eIO_Write (also, eCONN_OnFlush)
Definition: ncbi_core.h:122
@ eIO_Close
also serves as an error indicator in SOCK_Poll
Definition: ncbi_core.h:123
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
Definition of all error codes used in connect library (xconnect.lib, xconnext.lib etc).
int i
yy_size_t n
const struct ncbi::grid::netcache::search::fields::SIZE size
static const STimeout kZeroTimeout
static pcre_uint8 * buffer
Definition: pcretest.c:1051
NCBI_PARAM_DECL(bool, server, Catch_Unhandled_Exceptions)
static CSafeStatic< TParamServerCatchExceptions > s_ServerCatchExceptions
Definition: server.cpp:50
static bool operator<(const STimeout &to1, const STimeout &to2)
Definition: server.cpp:650
NCBI_PARAM_DEF_EX(bool, server, Catch_Unhandled_Exceptions, true, 0, CSERVER_CATCH_UNHANDLED_EXCEPTIONS)
typedef NCBI_PARAM_TYPE(server, Catch_Unhandled_Exceptions) TParamServerCatchExceptions
static const STimeout k_DefaultIdleTimeout
Definition: server.cpp:850
SServer_Parameters::
Definition: server.hpp:434
Timeout structure.
Definition: ncbi_types.h:76
#define _ASSERT
CRef< CTestThread > thr[k_NumThreadsMax]
Definition: test_mt.cpp:267
unsigned read_buf(z_streamp strm, Bytef *buf, unsigned size)
Definition: deflate.c:1204
Modified on Fri Dec 01 04:51:28 2023 by modify_doxy.py rev. 669887