NCBI C++ ToolKit
connection_pool.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: connection_pool.cpp 73499 2016-07-11 15:26:50Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Aaron Ucko, Victor Joukov
27  *
28  * File Description:
29  * Threaded server connection pool
30  *
31  * ===========================================================================
32  */
33 
34 #include <ncbi_pch.hpp>
35 #include <connect/error_codes.hpp>
36 #include "connection_pool.hpp"
37 
38 
39 #define NCBI_USE_ERRCODE_X Connect_ThrServer
40 
41 
43 
45 {
46  switch (conn_type) {
47  case eInactiveSocket: return "eInactiveSocket";
48  case eActiveSocket: return "eActiveSocket";
49  case eListener: return "eListener";
50  case ePreDeferredSocket: return "ePreDeferredSocket";
51  case eDeferredSocket: return "eDeferredSocket";
52  case ePreClosedSocket: return "ePreClosedSocket";
53  case eClosedSocket: return "eClosedSocket";
54  }
55  return "UnknownServerConnType";
56 }
57 
58 
60  m_MaxConnections(max_connections), m_ListeningStarted(false)
61 {}
62 
64 {
65  Erase();
66 }
67 
69 {
70  CMutexGuard guard(m_Mutex);
72  CServer_Connection* conn = dynamic_cast<CServer_Connection*>(*it);
73  if (conn)
74  conn->OnSocketEvent(eServIO_OurClose);
75  else
76  (*it)->OnTimeout();
77 
78  delete *it;
79  }
80  m_Data.clear();
81 }
82 
84 {
85  const STimeout* timeout = kDefaultTimeout;
86  const CSocket* socket = dynamic_cast<const CSocket*>(conn);
87 
88  if (socket) {
89  timeout = socket->GetTimeout(eIO_ReadWrite);
90  }
91  if (timeout != kDefaultTimeout && timeout != kInfiniteTimeout) {
92  conn->expiration = GetFastLocalTime();
93  conn->expiration.AddSecond(timeout->sec, CTime::eIgnoreDaylight);
94  conn->expiration.AddNanoSecond(timeout->usec * 1000);
95  } else {
96  conn->expiration.Clear();
97  }
98 }
99 
101 {
102  conn->type_lock.Lock();
104  conn->type = type;
105  conn->type_lock.Unlock();
106 
107  {{
108  CMutexGuard guard(m_Mutex);
109  if (m_Data.size() >= m_MaxConnections)
110  return false;
111 
112  if (m_Data.find(conn) != m_Data.end())
113  abort();
114  m_Data.insert(conn);
115  }}
116 
117  if (type == eListener)
118  if (m_ListeningStarted)
119  // That's a new listener which should be activated right away
120  // because the StartListening() had already been called earlier
121  // (e.g. in CServer::Run())
122  conn->Activate();
123 
125  return true;
126 }
127 
129 {
130  CMutexGuard guard(m_Mutex);
131  m_Data.erase(conn);
132 }
133 
134 
135 bool CServer_ConnectionPool::RemoveListener(unsigned short port)
136 {
137  bool found = false;
138 
139  {{
140  CMutexGuard guard(m_Mutex);
141 
142  if (std::find(m_ListenerPortsToStop.begin(),
143  m_ListenerPortsToStop.end(), port) !=
144  m_ListenerPortsToStop.end()) {
145  ERR_POST(Warning << "Removing listener on port " << port <<
146  " which has already been requested for removal");
147  return false;
148  }
149 
150  ITERATE (TData, it, m_Data) {
151  TConnBase * conn_base = *it;
152 
153  conn_base->type_lock.Lock(); // To be on the safe side
154  if (conn_base->type == eListener) {
155  CServer_Listener * listener = dynamic_cast<CServer_Listener *>(
156  conn_base);
157  if (listener) {
158  if (listener->GetPort() == port) {
159  m_ListenerPortsToStop.push_back(port);
160  conn_base->type_lock.Unlock();
161 
162  found = true;
163  break;
164  }
165  }
166  }
167  conn_base->type_lock.Unlock();
168  }
169  }}
170 
171  if (found)
173  else
174  ERR_POST(Warning << "No listener on port " << port << " found");
175  return found;
176 }
177 
178 
180 {
181  conn->type_lock.Lock();
182  if (conn->type != eClosedSocket) {
183  EServerConnType new_type = type;
184  if (type == eInactiveSocket) {
185  if (conn->type == ePreDeferredSocket)
186  new_type = eDeferredSocket;
187  else if (conn->type == ePreClosedSocket)
188  new_type = eClosedSocket;
189  else
191  }
192  conn->type = new_type;
193  }
194  conn->type_lock.Unlock();
195 
196  // Signal poll cycle to re-read poll vector by sending
197  // byte to control socket
198  if (type == eInactiveSocket)
200 }
201 
203 {
204  EIO_Status status = m_ControlTrigger.Set();
205  if (status != eIO_Success) {
207  << "PingControlConnection: failed to set control trigger: "
208  << IO_StatusStr(status));
209  }
210 }
211 
212 
214 {
215  conn->type_lock.Lock();
216  if (conn->type != eActiveSocket && conn->type != ePreDeferredSocket
217  && conn->type != ePreClosedSocket)
218  {
219  ERR_POST(Critical << "Unexpected connection type (" <<
221  ") when closing the connection. Ignore and continue.");
222  conn->type_lock.Unlock();
223  return;
224  }
225  conn->type = ePreClosedSocket;
226  conn->type_lock.Unlock();
227 
228  CServer_Connection* srv_conn = static_cast<CServer_Connection*>(conn);
229  srv_conn->Abort();
230  srv_conn->OnSocketEvent(eServIO_OurClose);
231 }
232 
234  vector<CSocketAPI::SPoll>& polls,
235  vector<IServer_ConnectionBase*>& timer_requests,
236  STimeout* timer_timeout,
237  vector<IServer_ConnectionBase*>& revived_conns,
238  vector<IServer_ConnectionBase*>& to_close_conns,
239  vector<IServer_ConnectionBase*>& to_delete_conns)
240 {
241  CTime now = GetFastLocalTime();
242  polls.clear();
243  revived_conns.clear();
244  to_close_conns.clear();
245  to_delete_conns.clear();
246 
247  const CTime * alarm_time = NULL;
248  const CTime * min_alarm_time = NULL;
249  bool alarm_time_defined = false;
250  CTime current_time(CTime::eEmpty);
251 
252  CMutexGuard guard(m_Mutex);
253 
254  // Control trigger goes here as well
255  polls.push_back(CSocketAPI::SPoll(&m_ControlTrigger, eIO_Read));
256 
257  ERASE_ITERATE(TData, it, m_Data) {
258  // Check that socket is not processing packet - safeguards against
259  // out-of-order packet processing by effectively pulling socket from
260  // poll vector until it is done with previous packet. See comments in
261  // server.cpp: CServer_Connection::CreateRequest() and
262  // CServerConnectionRequest::Process()
263  TConnBase* conn_base = *it;
264  conn_base->type_lock.Lock();
265  EServerConnType conn_type = conn_base->type;
266 
267  // There might be a request to delete a listener
268  if (conn_type == eListener) {
269  CServer_Listener * listener = dynamic_cast<CServer_Listener *>(
270  conn_base);
271  if (listener) {
272  unsigned short port = listener->GetPort();
273 
274  vector<unsigned short>::iterator port_it =
275  std::find(m_ListenerPortsToStop.begin(),
276  m_ListenerPortsToStop.end(), port);
277  if (port_it != m_ListenerPortsToStop.end()) {
278  conn_base->type_lock.Unlock();
279  m_ListenerPortsToStop.erase(port_it);
280  delete conn_base;
281  m_Data.erase(it);
282  continue;
283  }
284  }
285  }
286 
287 
288  if (conn_type == eClosedSocket
289  || (conn_type == eInactiveSocket && !conn_base->IsOpen()))
290  {
291  // If it's not eClosedSocket then This connection was closed
292  // by the client earlier in CServer::Run after Poll returned
293  // eIO_Close which was converted into eServIO_ClientClose.
294  // Then during OnSocketEvent(eServIO_ClientClose) it was marked
295  // as closed. Here we just clean it up from the connection pool.
296  to_delete_conns.push_back(conn_base);
297  m_Data.erase(it);
298  }
299  else if (conn_type == eInactiveSocket && conn_base->expiration <= now)
300  {
301  to_close_conns.push_back(conn_base);
302  m_Data.erase(it);
303  }
304  else if ((conn_type == eInactiveSocket || conn_type == eListener)
305  && conn_base->IsOpen())
306  {
307  CPollable* pollable = dynamic_cast<CPollable*>(conn_base);
308  _ASSERT(pollable);
309  polls.push_back(CSocketAPI::SPoll(pollable,
310  conn_base->GetEventsToPollFor(&alarm_time)));
311  if (alarm_time != NULL) {
312  if (!alarm_time_defined) {
313  alarm_time_defined = true;
314  current_time = GetFastLocalTime();
315  min_alarm_time = *alarm_time > current_time? alarm_time: NULL;
316  timer_requests.clear();
317  timer_requests.push_back(conn_base);
318  } else if (min_alarm_time == NULL) {
319  if (*alarm_time <= current_time)
320  timer_requests.push_back(conn_base);
321  } else if (*alarm_time <= *min_alarm_time) {
322  if (*alarm_time != *min_alarm_time) {
323  min_alarm_time = *alarm_time > current_time? alarm_time
324  : NULL;
325  timer_requests.clear();
326  }
327  timer_requests.push_back(conn_base);
328  }
329  alarm_time = NULL;
330  }
331  }
332  else if (conn_type == eDeferredSocket && conn_base->IsReadyToProcess())
333  {
334  conn_base->type = eActiveSocket;
335  revived_conns.push_back(conn_base);
336  }
337  conn_base->type_lock.Unlock();
338  }
339  guard.Release();
340 
341  if (alarm_time_defined) {
342  if (min_alarm_time == NULL)
343  timer_timeout->usec = timer_timeout->sec = 0;
344  else {
345  CTimeSpan span(min_alarm_time->DiffTimeSpan(current_time));
346  if (span.GetCompleteSeconds() < 0 ||
347  span.GetNanoSecondsAfterSecond() < 0)
348  {
349  timer_timeout->usec = timer_timeout->sec = 0;
350  }
351  else {
352  timer_timeout->sec = (unsigned) span.GetCompleteSeconds();
353  timer_timeout->usec = span.GetNanoSecondsAfterSecond() / 1000;
354  }
355  }
356  return true;
357  }
358  return false;
359 }
360 
361 void CServer_ConnectionPool::SetAllActive(const vector<CSocketAPI::SPoll>& polls)
362 {
363  ITERATE(vector<CSocketAPI::SPoll>, it, polls) {
364  if (!it->m_REvent)
365  continue;
366 
367  CTrigger * trigger = dynamic_cast<CTrigger *>(it->m_Pollable);
368  if (trigger)
369  continue;
370 
371  IServer_ConnectionBase* conn_base =
372  dynamic_cast<IServer_ConnectionBase*>(it->m_Pollable);
373 
374  conn_base->type_lock.Lock();
375  if (conn_base->type == eInactiveSocket)
376  conn_base->type = eActiveSocket;
377  else if (conn_base->type != eListener)
378  abort();
379  conn_base->type_lock.Unlock();
380  }
381 }
382 
383 void CServer_ConnectionPool::SetAllActive(const vector<IServer_ConnectionBase*>& conns)
384 {
385  ITERATE(vector<IServer_ConnectionBase*>, it, conns) {
386  IServer_ConnectionBase* conn_base = *it;
387  conn_base->type_lock.Lock();
388  if (conn_base->type != eInactiveSocket)
389  abort();
390  conn_base->type = eActiveSocket;
391  conn_base->type_lock.Unlock();
392  }
393 }
394 
395 
397 {
398  CMutexGuard guard(m_Mutex);
399  ITERATE (TData, it, m_Data) {
400  (*it)->Activate();
401  }
402  m_ListeningStarted = true;
403 }
404 
405 
407 {
408  CMutexGuard guard(m_Mutex);
409  ITERATE (TData, it, m_Data) {
410  (*it)->Passivate();
411  }
412 }
413 
414 
415 vector<unsigned short> CServer_ConnectionPool::GetListenerPorts(void)
416 {
417  vector<unsigned short> ports;
418 
419  CMutexGuard guard(m_Mutex);
420  ITERATE (TData, it, m_Data) {
421  TConnBase * conn_base = *it;
422 
423  conn_base->type_lock.Lock(); // To be on the safe side
424  if (conn_base->type == eListener) {
425  CServer_Listener * listener = dynamic_cast<CServer_Listener *>(
426  conn_base);
427  if (listener) {
428  unsigned short port = listener->GetPort();
429 
430  if (std::find(m_ListenerPortsToStop.begin(),
431  m_ListenerPortsToStop.end(), port) ==
432  m_ListenerPortsToStop.end())
433  ports.push_back(listener->GetPort());
434  }
435  }
436  conn_base->type_lock.Unlock();
437  }
438 
439  return ports;
440 }
441 
void Release()
Manually force the resource to be released.
Definition: guard.hpp:166
CSocket::
CTimeSpan.
Definition: ncbitime.hpp:1313
CTime –.
Definition: ncbitime.hpp:296
CTrigger::
Definition: ncbi_socket.hpp:88
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
void clear()
Definition: set.hpp:153
size_type size() const
Definition: set.hpp:132
const_iterator find(const key_type &key) const
Definition: set.hpp:137
void erase(iterator pos)
Definition: set.hpp:151
const_iterator end() const
Definition: set.hpp:136
Internal header for threaded server connection pools.
static CS_CONNECTION * conn
Definition: ct_dynamic.c:25
#define false
Definition: bool.h:36
static int type
Definition: getdata.c:31
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
#define ERASE_ITERATE(Type, Var, Cont)
Non-constant version with ability to erase current element, if container permits.
Definition: ncbimisc.hpp:843
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
string
Definition: cgiapp.hpp:687
#define NULL
Definition: ncbistd.hpp:225
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
Definition: ncbidiag.hpp:550
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1203
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
EIO_Status Set(void)
const STimeout * GetTimeout(EIO_Event event) const
Get timeout for I/O in the specified direction.
EIO_Status Abort(void)
Abort socket connection.
void CloseConnection(TConnBase *conn)
Close connection as if it was initiated by server (not by client).
std::string g_ServerConnTypeToString(enum EServerConnType conn_type)
void Erase(void)
Erase all connections.
void SetAllActive(const vector< CSocketAPI::SPoll > &polls)
virtual bool IsReadyToProcess(void)
CServer_ConnectionPool(unsigned max_connections)
volatile EServerConnType type
bool GetPollAndTimerVec(vector< CSocketAPI::SPoll > &polls, vector< IServer_ConnectionBase * > &timer_requests, STimeout *timer_timeout, vector< IServer_ConnectionBase * > &revived_conns, vector< IServer_ConnectionBase * > &to_close_conns, vector< IServer_ConnectionBase * > &to_delete_conns)
vector< unsigned short > GetListenerPorts(void)
Provides a list of ports on which the server is listening.
EServerConnType
unsigned short GetPort(void) const
bool Add(TConnBase *conn, EServerConnType type)
virtual EIO_Event GetEventsToPollFor(const CTime **) const
virtual bool IsOpen(void)
void Remove(TConnBase *conn)
void SetConnType(TConnBase *conn, EServerConnType type)
Guard connection from out-of-order packet processing by pulling eActiveSocket's from poll vector Rese...
bool RemoveListener(unsigned short port)
void OnSocketEvent(EServIO_Event event)
Definition: server.cpp:533
void x_UpdateExpiration(TConnBase *conn)
vector< unsigned short > m_ListenerPortsToStop
@ eServIO_OurClose
Definition: server.hpp:67
@ eClosedSocket
@ eActiveSocket
@ ePreDeferredSocket
@ eDeferredSocket
@ eInactiveSocket
@ ePreClosedSocket
@ eListener
void Lock(void)
Acquire mutex for the current thread with no nesting checks.
void Unlock(void)
Release mutex with no owner or nesting checks.
long GetNanoSecondsAfterSecond(void) const
Get number of nanoseconds.
Definition: ncbitime.hpp:2563
CTimeSpan DiffTimeSpan(const CTime &t) const
Difference in nanoseconds from specified time.
Definition: ncbitime.cpp:2304
CTime GetFastLocalTime(void)
Quick and dirty getter of local time.
Definition: ncbitime.cpp:4166
long GetCompleteSeconds(void) const
Get number of complete seconds.
Definition: ncbitime.hpp:2560
@ eEmpty
Use "empty" time.
Definition: ncbitime.hpp:301
@ eIgnoreDaylight
Ignore daylight saving time.
Definition: ncbitime.hpp:368
#define kInfiniteTimeout
Definition: ncbi_types.h:82
EIO_Status
I/O status.
Definition: ncbi_core.h:132
unsigned int usec
microseconds (modulo 1,000,000)
Definition: ncbi_types.h:78
const char * IO_StatusStr(EIO_Status status)
Get the text form of an enum status value.
Definition: ncbi_core.c:56
unsigned int sec
seconds
Definition: ncbi_types.h:77
#define kDefaultTimeout
Definition: ncbi_types.h:81
@ eIO_Success
everything is fine, no error occurred
Definition: ncbi_core.h:133
@ eIO_ReadWrite
eIO_Read | eIO_Write (also, eCONN_OnFlush)
Definition: ncbi_core.h:122
@ eIO_Read
read
Definition: ncbi_core.h:120
Definition of all error codes used in connect library (xconnect.lib, xconnext.lib etc).
void abort()
Polling structure m_Event can be either of eIO_Open, eIO_Read, eIO_Write, eIO_ReadWrite.
Timeout structure.
Definition: ncbi_types.h:76
Definition: type.c:6
#define _ASSERT
Modified on Mon Apr 22 04:00:10 2024 by modify_doxy.py rev. 669887