NCBI C++ ToolKit
server.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef CONNECT___SERVER__HPP
2 #define CONNECT___SERVER__HPP
3 
4 /* $Id: server.hpp 84663 2018-11-27 18:22:00Z ucko $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Aaron Ucko, Victor Joukov, Denis Vakatov
30  *
31  */
32 
33 /// @file server.hpp
34 /// Framework to create multithreaded network servers with thread-per-request
35 /// scheduling.
36 
40 #include <connect/ncbi_socket.hpp>
41 
42 
43 /** @addtogroup ThreadedServer
44  *
45  * @{
46  */
47 
48 
50 
51 
54 struct SServer_Parameters;
56 class CServer_Connection;
57 
58 
59 /// Extended copy of the type EIO_Event allowing to distinguish between
60 /// connection closing from client and from ourselves
62  eServIO_Open = 0x00,
63  eServIO_Read = 0x01,
64  eServIO_Write = 0x02,
65  eServIO_ReadWrite = 0x03, /**< eIO_Read | eIO_Write */
70  eServIO_Alarm = 0x40
71 };
72 
73 
74 /// Transform EIO_Event type to EServIO_Event
75 inline EServIO_Event
77 {
78  return (EServIO_Event) event;
79 }
80 
81 
82 class CNetCacheServer;
83 
84 /////////////////////////////////////////////////////////////////////////////
85 ///
86 /// CServer::
87 ///
88 /// Thread-pool based server. On every event it allocates one of threads
89 /// from pool to serve the event, thereby making possible to serve large
90 /// number of concurrent connections efficiently. You need to subclass it
91 /// only if you want to provide gentle shutdown ability (override
92 /// ShutdownRequested) or process data in main thread on timeout (override
93 /// ProcessTimeout and set parameter accept_timeout to non-zero value).
94 ///
95 
97 {
98 public:
99  // 'ctors
100  CServer(void);
101  virtual ~CServer();
102 
103  /// Register a listener
104  void AddListener(IServer_ConnectionFactory* factory,
105  unsigned short port);
106 
107  /// Removes a listener
108  /// @param port
109  /// the listener on the port will be removed
110  /// @return
111  /// true if the listener has been removed, false if the server does not
112  /// listen on the port.
113  bool RemoveListener(unsigned short port);
114 
115  ///
116  void SetParameters(const SServer_Parameters& new_params);
117 
118  ///
119  void GetParameters(SServer_Parameters* params);
120 
121  /// Start listening before the main loop. If called, tries
122  /// to listen on all requested ports for all listeners, correcting
123  /// errors by calling listeners' OnFailure
124  void StartListening(void);
125 
126  /// Enter the main loop
127  void Run(void);
128 
129  /// Submit request to be executed by the server thread pool
130  void SubmitRequest(const CRef<CStdRequest>& request);
131 
132  /// Mark connection as deferred for processing, i.e. do not poll on it
133  /// and wait when IsReadyToProcess() will return true.
134  void DeferConnectionProcessing(IServer_ConnectionBase* conn);
135  void DeferConnectionProcessing(CSocket* sock);
136 
137  /// Close connection. Method should be called only when closing is
138  /// initiated by server itself, because it will generate then event
139  /// eServIO_OurClose.
140  ///
141  /// @sa EServIO_Event
142  void CloseConnection(CSocket* sock);
143 
144  /// Add externally created connection to the connection pool which server
145  /// polls on. Throws exception if pool is full.
146  /// NOTE: events to this connection can come theoretically even
147  /// NOTE: if connection gets some error or its peer closes it then conn
148  /// object will be deleted after processing OnClose event. If you don't
149  /// want that you have to call RemoveConnectionFromPool while handling
150  /// OnClose.
151  void AddConnectionToPool(CServer_Connection* conn);
152  /// Remove externally created connection from pool.
153  void RemoveConnectionFromPool(CServer_Connection* conn);
154  /// Force poll cycle to make another iteration.
155  /// Should be called if IsReadyToProcess() for some connection handler
156  /// became true.
157  void WakeUpPollCycle(void);
158  /// Set custom suffix to use on all threads in the server's pool.
159  /// Value can be set only before call to Run(), any change of the value
160  /// after call to Run() will be ignored.
161  void SetCustomThreadSuffix(const string& suffix)
162  { m_ThreadSuffix = suffix; }
163 
164  /// Provides a list of ports on which the server is listening
165  /// @return
166  /// currently listened ports
167  vector<unsigned short> GetListenerPorts(void);
168 
169 protected:
170  /// Initialize the server
171  ///
172  /// Called by Run method before poll cycle.
173  virtual void Init() {}
174 
175  /// Cleanup the server
176  ///
177  /// Called by Run method after poll cycle when all processing threads
178  /// are stopped, but before releasing listening ports. Here you're still
179  /// guaranteed that another instance running on the same set of ports will
180  /// fail at StartListening point.
181  virtual void Exit() {}
182 
183  /// Runs synchronously when no socket activity has occurred in a
184  /// while (as determined by m_AcceptTimeout).
185  /// @sa m_Parameters->accept_timeout
186  virtual void ProcessTimeout(void) {}
187 
188  /// Runs synchronously between iterations.
189  /// @return
190  /// whether to shut down service and return from Run.
191  virtual bool ShutdownRequested(void) { return false; }
192 
193 private:
194  void x_DoRun(void);
195 
196  friend class CNetCacheServer;
197  CPoolOfThreads_ForServer* GetThreadPool(void) { return m_ThreadPool; }
198 
203 };
204 
205 
206 /////////////////////////////////////////////////////////////////////////////
207 ///
208 /// Error codes for OnOverflow method in IServer_ConnectionHandler
210 {
215 };
216 
217 
218 /////////////////////////////////////////////////////////////////////////////
219 ///
220 /// IServer_ConnectionHandler::
221 ///
222 /// Implement this interface to provide server functionality.
223 ///
224 
226 {
227 public:
229 
230  /// Following three methods are guaranteed to be called NOT
231  /// at the same time as On*, so if you implement them
232  /// you should not guard the variables which they can use with
233  /// mutexes.
234  /// @param alarm_time
235  /// Set this parameter to a pointer to a CTime object to recieve
236  /// an OnTimer event at the moment in time specified by this object.
237  /// @return
238  /// Returns the set of events for which Poll should check.
239  virtual EIO_Event GetEventsToPollFor(const CTime** /*alarm_time*/) const
240  { return eIO_Read; }
241  /// Returns the timeout for this connection
242  virtual const STimeout* GetTimeout(void)
243  { return kDefaultTimeout; }
244  /// Returns connection handler's perception of whether we open or not.
245  /// It is unsafe to just close underlying socket because of the race,
246  /// emerging due to the fact that the socket can linger for a while.
247  virtual bool IsOpen(void)
248  { return true; }
249  /// Returns the handler's readiness to process input data or to write
250  /// some output data. OnRead() and OnWrite() are not called unless this
251  /// method return true.
252  virtual bool IsReadyToProcess(void) const
253  { return true; }
254 
255 
256  /// Runs in response to an external event [asynchronous].
257  /// You can get socket by calling GetSocket(), if you close the socket
258  /// this object will be destroyed.
259  /// Individual events are:
260  /// A client has just established this connection.
261  virtual void OnOpen(void) = 0;
262  /// The client has just sent data.
263  virtual void OnRead(void) = 0;
264  /// The client is ready to receive data.
265  virtual void OnWrite(void) = 0;
266 
267  /// Type of connection closing
268  enum EClosePeer {
269  eOurClose, ///< Connection closed by ourselves
270  eClientClose ///< Connection closed by other peer
271  };
272 
273  /// The connection has closed (with information on type of closing)
274  virtual void OnClose(EClosePeer /*peer*/) { }
275 
276  /// Runs when a client has been idle for too long, prior to
277  /// closing the connection [synchronous].
278  virtual void OnTimeout(void) { }
279 
280  /// This method is called at the moment in time specified earlier by the
281  /// alarm_time parameter of the GetEventsToPollFor method [synchronous].
282  virtual void OnTimer(void) { }
283 
284  /// Runs when there are insufficient resources to queue a
285  /// connection, prior to closing it. Provides a reason why the
286  /// connection is being close, which can be reported back to the client.
287  // See comment for CAcceptRequest::Process and CServer::CreateRequest
288  virtual void OnOverflow(EOverflowReason) { }
289 
290  /// Runs when a socket error is detected
291  virtual void OnError(const string & /*err_message*/) { }
292 
293  /// Get underlying socket
294  CSocket& GetSocket(void) { return *m_Socket; }
295 
296 public: // TODO: make it protected. Public is for DEBUG purposes only
297  friend class CServer_Connection;
298  void SetSocket(CSocket *socket) { m_Socket = socket; }
299 
300 private:
302 };
303 
304 
305 
306 /////////////////////////////////////////////////////////////////////////////
307 ///
308 /// IServer_MessageHandler::
309 ///
310 /// TODO:
313 {
314 public:
316  m_Buffer(0)
317  { }
318  virtual ~IServer_MessageHandler() { BUF_Destroy(m_Buffer); }
319  virtual void OnRead(void);
320  // You should implement this look-ahead function to decide, did you get
321  // a message in the series of read events. If not, you should return -1.
322  // If yes, return number of chars, beyond well formed message. E.g., if
323  // your message spans all the buffer, return 0. If you returned non-zero
324  // value, this piece of data will be used in the next CheckMessage to
325  // simplify client state management.
326  // You also need to copy bytes, comprising the message from data to buffer.
327  virtual int CheckMessage(BUF* buffer, const void *data, size_t size) = 0;
328  // Process incoming message in the buffer, by using
329  // BUF_Read(buffer, your_data_buffer, BUF_Size(buffer)).
330  virtual void OnMessage(BUF buffer) = 0;
331 private:
333 };
334 
335 
337 Server_CheckLineMessage(BUF* buffer, const void *data, size_t size,
338  bool& seen_CR);
339 
340 /////////////////////////////////////////////////////////////////////////////
341 ///
342 /// IServer_LineMessageHandler::
343 ///
344 /// TODO:
347 {
348 public:
350  IServer_MessageHandler(), m_SeenCR(false)
351  { }
352  virtual int CheckMessage(BUF* buffer, const void *data, size_t size) {
353  return Server_CheckLineMessage(buffer, data, size, m_SeenCR);
354  }
355 private:
356  bool m_SeenCR;
357 };
358 
359 
360 
361 /////////////////////////////////////////////////////////////////////////////
362 ///
363 /// IServer_StreamHandler::
364 ///
365 /// TODO:
368 {
369 public:
371 private:
373 };
374 
375 
376 
377 /////////////////////////////////////////////////////////////////////////////
378 ///
379 /// IServer_ConnectionFactory::
380 ///
381 /// Factory to be registered with CServer instance. You usually do not
382 /// need to implement it, default template CServer_ConnectionFactory will
383 /// suffice. You NEED to implement it manually to pass server-wide parameters
384 /// to ConnectionHandler instances, e.g. for implementation of gentle shutdown.
385 ///
386 
388 {
389 public:
390  /// What to do if the port is busy
392  eLAFail = 0, // Can not live without this port, default
393  eLAIgnore = 1, // Do nothing, throw away this listener
394  eLARetry = 2 // Listener should provide another port to try
395  };
397 
398  /// @return
399  /// a new instance of handler for connection
400  virtual IServer_ConnectionHandler* Create(void) = 0;
401  /// Return desired action if the port, mentioned in AddListener is busy.
402  /// If the action is eLARetry, provide new port. The
403  virtual EListenAction OnFailure(unsigned short* /* port */)
404  { return eLAFail; }
405 };
406 
407 
408 
409 /////////////////////////////////////////////////////////////////////////////
410 ///
411 /// CServer_ConnectionFactory::
412 ///
413 /// Reasonable default implementation for IServer_ConnectionFactory
414 ///
415 
416 template <class TServer_ConnectionHandler>
418 {
419 public:
421  { return new TServer_ConnectionHandler(); }
422 };
423 
424 
425 
426 /////////////////////////////////////////////////////////////////////////////
427 ///
428 /// SServer_Parameters::
429 ///
430 /// Settings for CServer
431 ///
432 
434 {
435  /// Maximum # of open connections
436  unsigned int max_connections;
437  /// Temporarily close listener when queue fills?
439  /// Maximum t between exit checks
441  /// For how long to keep inactive non-listening sockets open
442  /// (default: 10 minutes)
444 
445  // (settings for the thread pool)
446  unsigned int init_threads; ///< Number of initial threads
447  unsigned int max_threads; ///< Maximum simultaneous threads
448  unsigned int spawn_threshold; ///< Controls when to spawn more threads
449 
450  /// Create structure with the default set of parameters
452 };
453 
454 
455 
456 /////////////////////////////////////////////////////////////////////////////
457 ///
458 /// CServer_Exception::
459 ///
460 /// Exceptions thrown by CServer::Run()
461 ///
462 
465 {
466 public:
467  enum EErrCode {
468  eBadParameters, ///< Out-of-range parameters given
469  eCouldntListen, ///< Unable to bind listening port
470  ePoolOverflow ///< Connection pool overflowed
471  };
472  virtual const char* GetErrCodeString(void) const override;
474 };
475 
476 
477 
479 
480 
481 /* @} */
482 
483 #endif /* CONNECT___SERVER__HPP */
Generic CONN exception.
Helper hook-up class that installs default logging/registry/locking (but only if they have not yet be...
This stream exchanges data in a TCP channel, using the SOCK socket API.
CServer_ConnectionFactory::
Definition: server.hpp:418
CServer_Exception::
Definition: server.hpp:465
CServer::
Definition: server.hpp:97
CSocket::
CTime –.
Definition: ncbitime.hpp:296
IServer_ConnectionFactory::
Definition: server.hpp:388
IServer_ConnectionHandler::
Definition: server.hpp:226
IServer_LineMessageHandler::
Definition: server.hpp:347
IServer_MessageHandler::
Definition: server.hpp:313
IServer_StreamHandler::
Definition: server.hpp:368
static CS_CONNECTION * conn
Definition: ct_dynamic.c:25
#define false
Definition: bool.h:36
char data[12]
Definition: iconv.c:80
void BUF_Destroy(BUF buf)
Definition: ncbi_buffer.c:500
#define EXCEPTION_VIRTUAL_BASE
Do not use virtual base classes in exception declaration at all, because in this case derived class s...
Definition: ncbiexpt.hpp:1388
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
SOCK m_Socket
IO_PREFIX::iostream CNcbiIostream
Portable alias for iostream.
Definition: ncbistre.hpp:152
virtual void Init()
Initialize the server.
Definition: server.hpp:173
virtual bool ShutdownRequested(void)
Runs synchronously between iterations.
Definition: server.hpp:191
virtual void OnOverflow(EOverflowReason)
Runs when there are insufficient resources to queue a connection, prior to closing it.
Definition: server.hpp:288
EServIO_Event
Extended copy of the type EIO_Event allowing to distinguish between connection closing from client an...
Definition: server.hpp:61
virtual void Exit()
Cleanup the server.
Definition: server.hpp:181
virtual EIO_Event GetEventsToPollFor(const CTime **) const
Following three methods are guaranteed to be called NOT at the same time as On*, so if you implement ...
Definition: server.hpp:239
virtual void OnRead(void)=0
The client has just sent data.
virtual const STimeout * GetTimeout(void)
Returns the timeout for this connection.
Definition: server.hpp:242
virtual int CheckMessage(BUF *buffer, const void *data, size_t size)=0
virtual void OnOpen(void)=0
Runs in response to an external event [asynchronous].
virtual void OnClose(EClosePeer)
The connection has closed (with information on type of closing)
Definition: server.hpp:274
EListenAction
What to do if the port is busy.
Definition: server.hpp:391
NCBI_EXCEPTION_DEFAULT(CServer_Exception, CConnException)
EClosePeer
Type of connection closing.
Definition: server.hpp:268
void SetCustomThreadSuffix(const string &suffix)
Set custom suffix to use on all threads in the server's pool.
Definition: server.hpp:161
SServer_Parameters * m_Parameters
Definition: server.hpp:199
CNcbiIostream & GetStream()
unsigned int spawn_threshold
Controls when to spawn more threads.
Definition: server.hpp:448
const STimeout * accept_timeout
Maximum t between exit checks.
Definition: server.hpp:440
virtual IServer_ConnectionHandler * Create(void)=0
void Run(void)
Enter the main loop.
virtual EListenAction OnFailure(unsigned short *)
Return desired action if the port, mentioned in AddListener is busy.
Definition: server.hpp:403
EOverflowReason
Error codes for OnOverflow method in IServer_ConnectionHandler.
Definition: server.hpp:210
virtual void OnError(const string &)
Runs when a socket error is detected.
Definition: server.hpp:291
CPoolOfThreads_ForServer * m_ThreadPool
Definition: server.hpp:201
bool temporarily_stop_listening
Temporarily close listener when queue fills?
Definition: server.hpp:438
void StartListening(void)
Start listening immediately, or throw an exception if it is impossible to do so.
CPoolOfThreads_ForServer * GetThreadPool(void)
Definition: server.hpp:197
virtual void OnTimer(void)
This method is called at the moment in time specified earlier by the alarm_time parameter of the GetE...
Definition: server.hpp:282
virtual ~IServer_ConnectionHandler()
Definition: server.hpp:228
unsigned int max_threads
Maximum simultaneous threads.
Definition: server.hpp:447
EServIO_Event IOEventToServIOEvent(EIO_Event event)
Transform EIO_Event type to EServIO_Event.
Definition: server.hpp:76
CConn_SocketStream m_Stream
Definition: server.hpp:372
virtual bool IsOpen(void)
Returns connection handler's perception of whether we open or not.
Definition: server.hpp:247
virtual void OnMessage(BUF buffer)=0
CServer_ConnectionPool * m_ConnectionPool
Definition: server.hpp:200
virtual int CheckMessage(BUF *buffer, const void *data, size_t size)
Definition: server.hpp:352
void SetSocket(CSocket *socket)
Definition: server.hpp:298
unsigned int max_connections
Maximum # of open connections.
Definition: server.hpp:436
int Server_CheckLineMessage(BUF *buffer, const void *data, size_t size, bool &seen_CR)
Definition: server.cpp:103
virtual bool IsReadyToProcess(void) const
Returns the handler's readiness to process input data or to write some output data.
Definition: server.hpp:252
string m_ThreadSuffix
Definition: server.hpp:202
virtual void ProcessTimeout(void)
Runs synchronously when no socket activity has occurred in a while (as determined by m_AcceptTimeout)...
Definition: server.hpp:186
CSocket & GetSocket(void)
Get underlying socket.
Definition: server.hpp:294
virtual void OnTimeout(void)
Runs when a client has been idle for too long, prior to closing the connection [synchronous].
Definition: server.hpp:278
virtual IServer_ConnectionHandler * Create()
Definition: server.hpp:420
virtual void OnWrite(void)=0
The client is ready to receive data.
unsigned int init_threads
Number of initial threads.
Definition: server.hpp:446
const STimeout * idle_timeout
For how long to keep inactive non-listening sockets open (default: 10 minutes)
Definition: server.hpp:443
virtual ~IServer_MessageHandler()
Definition: server.hpp:318
virtual ~IServer_ConnectionFactory()
Definition: server.hpp:396
@ eServIO_ClientClose
Definition: server.hpp:66
@ eServIO_Write
Definition: server.hpp:64
@ eServIO_Open
Definition: server.hpp:62
@ eServIO_Alarm
Definition: server.hpp:70
@ eServIO_Inactivity
Definition: server.hpp:68
@ eServIO_OurClose
Definition: server.hpp:67
@ eServIO_Read
Definition: server.hpp:63
@ eServIO_Delete
Definition: server.hpp:69
@ eServIO_ReadWrite
eIO_Read | eIO_Write
Definition: server.hpp:65
@ eOurClose
Connection closed by ourselves.
Definition: server.hpp:269
@ eOR_ConnectionPoolFull
Definition: server.hpp:212
@ eOR_UnpollableSocket
Definition: server.hpp:214
@ eOR_RequestQueueFull
Definition: server.hpp:213
@ eOR_Unknown
Definition: server.hpp:211
@ eBadParameters
Out-of-range parameters given.
Definition: server.hpp:468
@ eCouldntListen
Unable to bind listening port.
Definition: server.hpp:469
EIO_Event
I/O event (or direction).
Definition: ncbi_core.h:118
#define kDefaultTimeout
Definition: ncbi_types.h:81
@ eIO_Read
read
Definition: ncbi_core.h:120
#define NCBI_XCONNECT_EXPORT
const struct ncbi::grid::netcache::search::fields::SIZE size
static const char * suffix[]
Definition: pcregrep.c:408
static pcre_uint8 * buffer
Definition: pcretest.c:1051
SServer_Parameters::
Definition: server.hpp:434
Timeout structure.
Definition: ncbi_types.h:76
Modified on Fri Apr 12 17:15:39 2024 by modify_doxy.py rev. 669887