NCBI C++ ToolKit
sockets_man.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: sockets_man.cpp 89003 2020-02-11 17:42:13Z ucko $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Pavel Ivanov
27  *
28  */
29 
30 #include "task_server_pch.hpp"
31 
32 #include <corelib/request_ctx.hpp>
33 #include <corelib/ncbireg.hpp>
34 
35 #include "sockets_man.hpp"
36 #include "threads_man.hpp"
37 #include "srv_stat.hpp"
38 
39 #ifdef NCBI_OS_LINUX
40 # include <sys/types.h>
41 # include <sys/socket.h>
42 # include <netinet/ip.h>
43 # include <netinet/tcp.h>
44 # include <netinet/in.h>
45 # include <arpa/inet.h>
46 # include <netdb.h>
47 # include <sys/epoll.h>
48 # include <unistd.h>
49 # include <fcntl.h>
50 # include <errno.h>
51 
52 # ifndef EPOLLRDHUP
53 # ifdef POLLRDHUP
54 # define EPOLLRDHUP POLLRDHUP
55 # else
56 # define EPOLLRDHUP 0x2000
57 # endif
58 # endif
59 
60 #else
61 # define EPOLLIN 0x0001
62 # define EPOLLOUT 0x0004
63 # define EPOLLERR 0x0008
64 # define EPOLLHUP 0x0010
65 # define EPOLLRDHUP 0x2000
66 #endif
67 
68 
70 
71 
72 static const Uint1 kEpollEventsArraySize = 100;
73 /// 1000 below is chosen to be a little bit less than maximum packet size in
74 /// Ethernet (~1500 bytes).
75 static const Uint2 kSockReadBufSize = 1000;
76 static const Uint2 kSockMinWriteSize = 1000;
77 /// In calculations in the file it's assumed that kSockWriteBufSize is at least
78 /// twice as large as kSockMinWriteSize.
79 static const Uint2 kSockWriteBufSize = 2000;
80 /// 16 Uint4s on x86_64 is the size of CPU's cacheline. And it should be more
81 /// than enough for NetCache.
82 static const Uint1 kMaxCntListeningSocks = 16;
83 
84 
85 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_MEMBER_HOOK
86 typedef intr::member_hook<CSrvSocketTask,
88  &CSrvSocketTask::m_SockListHook> TSockListHookOpt;
89 typedef intr::list<CSrvSocketTask,
90  TSockListHookOpt,
91  intr::constant_time_size<false> > TSockList;
92 #elif NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_BASE_HOOK
93 typedef intr::list<CSrvSocketTask,
94  intr::base_hook<TSrvSockListHook>,
95  intr::constant_time_size<false> > TSockList;
96 #elif NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
97 typedef std::list<CSrvSocketTask*> TSockList;
98 #endif
99 
100 /// Per-thread structure containing information about sockets.
102 {
103  /// List of all open and not yet deleted sockets which were opened in this
104  /// thread.
106  /// "Number of sockets" that this thread created/deleted. When new socket is
107  /// created in this thread this number is increased by 1, when this thread
108  /// deletes socket this number is decreased by 1. From time to time this
109  /// number is added to the global variable of number of sockets. At that
110  /// time 0 is written to sock_cnt to avoid adding the same number several
111  /// times.
113 
115  : sock_cnt(0)
116  {
117  }
118 };
119 
120 
122 {
123  /// Index in the s_ListenSocks array.
125  /// Port to listen to.
127  /// File descriptor for the listening socket.
128  int fd;
129  /// Factory that will create CSrvSocketTask for each incoming socket.
131 };
132 
133 
134 class CSrvListener : public CSrvTask
135 {
136 public:
137  CSrvListener(void);
138  virtual ~CSrvListener(void);
139 
140  virtual void ExecuteSlice(TSrvThreadNum thread_idx);
141 
142 public:
143  /// Per-listening-socket numbers copied from s_ListenEvents when events are
144  /// processed.
146  /// Per-listening-socket numbers copied from s_ListenErrors when errors are
147  /// processed.
149 };
150 
157 static int s_EpollFD = -1;
162 static int s_SocketTimeout = 0;
164 static Uint8 s_ConnTimeout = 10;
165 static string s_HostName;
166 static Uint8 s_AcceptDelay = 1000000;
167 
168 
169 extern Uint8 s_CurJiffies;
170 extern CSrvTime s_JiffyTime;
171 extern SSrvThread** s_Threads;
172 
173 
174 
175 
176 void
178 {
179  s_SoftSocketLimit = reg->GetInt(section, "soft_sockets_limit", 1000);
180  s_HardSocketLimit = reg->GetInt(section, "hard_sockets_limit", 2000);
181  s_ConnTimeout = Uint8(reg->GetInt(section, "connection_timeout", 100))
183  s_SocketTimeout = reg->GetInt(section, "min_socket_inactivity", 300);
184  s_OldSocksDelBatch = Uint1(reg->GetInt(section, "sockets_cleaning_batch", 10));
185  if (s_OldSocksDelBatch < 10)
186  s_OldSocksDelBatch = 10;
187  s_AcceptDelay = Uint8(reg->GetInt(section, "socket_accept_delay", 1000)) * kUSecsPerMSec;
188 }
189 
190 bool ReConfig_Sockets(const CTempString& section, const CNcbiRegistry& new_reg, string&)
191 {
192  ConfigureSockets( &new_reg, section);
193  return true;
194 }
195 
197 {
198  string is("\": "), eol(",\n\"");
199  task.WriteText(eol).WriteText("soft_sockets_limit").WriteText(is ).WriteNumber( s_SoftSocketLimit);
200  task.WriteText(eol).WriteText("hard_sockets_limit").WriteText(is ).WriteNumber( s_HardSocketLimit);
201  task.WriteText(eol).WriteText("connection_timeout").WriteText(is ).WriteNumber( s_ConnTimeout * s_JiffyTime.NSec() / kNSecsPerMSec);
202  task.WriteText(eol).WriteText("min_socket_inactivity").WriteText(is ).WriteNumber( s_SocketTimeout);
203  task.WriteText(eol).WriteText("sockets_cleaning_batch").WriteText(is ).WriteNumber( s_OldSocksDelBatch);
204  task.WriteText(eol).WriteText("socket_accept_delay").WriteText(is ).WriteNumber( s_AcceptDelay / kUSecsPerMSec);
205 }
206 
207 void
209 {
210  SSocketsData* socks = new SSocketsData();
211  thr->socks = socks;
212 }
213 
214 void
216 {}
217 
218 static void
220  const char* err_msg,
221  int x_errno,
222  const char* errno_str,
223  const char* file,
224  int line,
225  const char* func)
226 {
227  if (CSrvDiagMsg::IsSeverityVisible(severity)) {
228  CSrvDiagMsg().StartSrvLog(severity, file, line, func)
229  << err_msg << ", errno=" << x_errno << " (" << errno_str << ")";
230  }
232 }
233 
234 static void
236  const char* err_msg,
237  int x_errno,
238  const char* file,
239  int line,
240  const char* func)
241 {
242  s_LogWithErrStr(severity, err_msg, x_errno, strerror(x_errno), file, line, func);
243 }
244 
245 #define LOG_WITH_ERRNO(sev, msg, x_errno) \
246  s_LogWithErrno(CSrvDiagMsg::sev, msg, x_errno, \
247  __FILE__, __LINE__, NCBI_CURRENT_FUNCTION) \
248 /**/
249 
250 #ifdef NCBI_OS_LINUX
251 
252 static void
253 s_LogWithAIErr(CSrvDiagMsg::ESeverity severity,
254  const char* err_msg,
255  int x_aierr,
256  const char* file,
257  int line,
258  const char* func)
259 {
260  s_LogWithErrStr(severity, err_msg, x_aierr, gai_strerror(x_aierr), file, line, func);
261 }
262 
263 #define LOG_WITH_AIERR(sev, msg, x_aierr) \
264  s_LogWithAIErr(CSrvDiagMsg::sev, msg, x_aierr, \
265  __FILE__, __LINE__, NCBI_CURRENT_FUNCTION) \
266 /**/
267 
268 #endif
269 
270 static inline bool
272 {
273 #ifdef NCBI_OS_LINUX
274  int res = fcntl(sock, F_SETFL, O_NONBLOCK);
275  if (res) {
276  LOG_WITH_ERRNO(Critical, "Cannot set socket non-blocking", errno);
277  return false;
278  }
279 #endif
280  return true;
281 }
282 
283 static bool
285 {
286 #ifdef NCBI_OS_LINUX
287  int value = 1;
288  int res = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &value, sizeof(value));
289  if (res) {
290  LOG_WITH_ERRNO(Critical, "Cannot set socket's keep-alive property", errno);
291  return false;
292  }
293  res = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value));
294  if (res) {
295  LOG_WITH_ERRNO(Critical, "Cannot set socket's no-delay property", errno);
296  return false;
297  }
298 #endif
299  return true;
300 }
301 
302 static void
304 {
305 #ifdef NCBI_OS_LINUX
306  int value = 1;
307  int res = setsockopt(sock, IPPROTO_TCP, TCP_QUICKACK, &value, sizeof(value));
308  if (res)
309  LOG_WITH_ERRNO(Critical, "Cannot set socket's quick-ack property", errno);
310 #endif
311 }
312 
313 static bool
315 {
316  SListenSockInfo& sock_info = s_ListenSocks[idx];
317 #ifdef NCBI_OS_LINUX
318  int sock = socket(AF_INET, SOCK_STREAM, 0);
319  if (sock == -1) {
320  LOG_WITH_ERRNO(Critical, "Cannot create socket", errno);
321  return false;
322  }
323  if (!s_SetSocketNonBlock(sock)) {
324  close(sock);
325  return false;
326  }
327 
328  int value = 1;
329  int res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
330  if (res)
331  LOG_WITH_ERRNO(Error, "Cannot set socket's reuse-address property", errno);
332 
333  struct sockaddr_in addr;
334  memset(&addr, 0, sizeof(addr));
335  addr.sin_family = AF_INET;
336  addr.sin_addr.s_addr = htonl(INADDR_ANY);
337  addr.sin_port = htons(sock_info.port);
338  res = bind(sock, (struct sockaddr*)&addr, sizeof(addr));
339  if (res) {
340  string err_msg("Cannot bind socket to port ");
341  err_msg += NStr::NumericToString(sock_info.port);
342  LOG_WITH_ERRNO(Critical, err_msg.c_str(), errno);
343  close(sock);
344  return false;
345  }
346  res = listen(sock, 128);
347  if (res) {
348  LOG_WITH_ERRNO(Critical, "Cannot listen on a socket", errno);
349  close(sock);
350  return false;
351  }
352 
353  struct epoll_event evt;
354  evt.events = EPOLLIN | EPOLLET;
355  evt.data.ptr = (void*)&sock_info;
356  res = epoll_ctl(s_EpollFD, EPOLL_CTL_ADD, sock, &evt);
357  if (res) {
358  LOG_WITH_ERRNO(Critical, "Cannot add listening socket to epoll", errno);
359  close(sock);
360  return false;
361  }
362 
363  sock_info.fd = sock;
364 #endif
366  return true;
367 }
368 
369 static bool
371 {
372  for (Uint1 i = 0; i < s_CntListeningSocks; ++i) {
374  return false;
375  }
376  return true;
377 }
378 
379 static inline void
381 {
382  if (event & EPOLLIN)
383  ++s_ListenEvents[sock_info->index];
384  else if (event & (EPOLLERR + EPOLLHUP))
385  ++s_ListenErrors[sock_info->index];
387 }
388 
389 static inline void
391 {
392  if ((event & EPOLLIN) && task->m_SeenReadEvts == task->m_RegReadEvts)
393  ++task->m_RegReadEvts;
394  if ((event & EPOLLOUT) && task->m_SeenWriteEvts == task->m_RegWriteEvts)
395  ++task->m_RegWriteEvts;
396  if (event & EPOLLRDHUP)
397  task->m_RegReadHup = true;
398  if (event & (EPOLLERR + EPOLLHUP))
399  task->m_RegError = true;
400  task->SetRunnable();
401 }
402 
403 static void
405  int fd,
406  const char* prefix,
407  const char* file,
408  int line,
409  const char* func)
410 {
411 #ifdef NCBI_OS_LINUX
412  int x_errno = 0;
413  socklen_t x_size = sizeof(x_errno);
414  int res = getsockopt(fd, SOL_SOCKET, SO_ERROR, &x_errno, &x_size);
415  if (res)
416  x_errno = errno;
417  if (x_errno)
418  s_LogWithErrno(severity, prefix, x_errno, file, line, func);
419 #endif
420 }
421 
422 #define LOG_SOCK_ERROR(sev, fd, prefix) \
423  s_LogSocketError(CSrvDiagMsg::sev, fd, prefix, __FILE__, __LINE__, NCBI_CURRENT_FUNCTION)
424 
425 static void
426 s_CloseSocket(int fd, bool do_abort)
427 {
428 #ifdef NCBI_OS_LINUX
429  int res;
430 
431  if (do_abort) {
432  struct linger lgr;
433  lgr.l_linger = 0;
434  lgr.l_onoff = 1;
435  res = setsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&lgr, sizeof(lgr));
436  if (res)
437  LOG_WITH_ERRNO(Critical, "Error setting so_linger", errno);
438  }
439 
440  int val = -1;
441  res = setsockopt(fd, IPPROTO_TCP, TCP_LINGER2, (void*)&val, sizeof(val));
442  if (res)
443  LOG_WITH_ERRNO(Critical, "Error setting tcp_linger2", errno);
444 
445  int x_errno = 0;
446  do {
447  res = close(fd);
448  }
449  while (res && (x_errno = errno) == EINTR);
450  if (res)
451  LOG_WITH_ERRNO(Critical, "Error closing socket", x_errno);
452 #endif
454 }
455 
456 static void
458 {
459  task->m_Fd = -1;
460  CRequestContext* ctx = task->GetDiagCtx();
461  ctx->SetBytesRd(task->m_ReadBytes);
462  ctx->SetBytesWr(task->m_WrittenBytes);
463  Uint8 open_time = Uint8(ctx->GetRequestTimer().Elapsed() * kUSecsPerSecond);
464  GetCurThread()->stat->SockClose(ctx->GetRequestStatus(), open_time);
466  task->ReleaseDiagCtx();
468  --thr->socks->sock_cnt;
469 }
470 
471 string
473 {
474  char buf[20];
475 #ifdef NCBI_OS_LINUX
476  Uint1* hb = (Uint1*)&ip;
477  snprintf(buf, 20, "%u.%u.%u.%u", hb[0], hb[1], hb[2], hb[3]);
478 #endif
479  return buf;
480 }
481 
482 string
484 {
485  char buf[256];
486 #ifdef NCBI_OS_LINUX
487  struct sockaddr_in addr;
488  memset(&addr, 0, sizeof(addr));
489  addr.sin_family = AF_INET;
490  addr.sin_addr.s_addr = ip;
491  int x_errno = getnameinfo((struct sockaddr*)&addr, sizeof(addr),
492  buf, sizeof(buf), NULL, 0, NI_NAMEREQD | NI_NOFQDN);
493  if (x_errno != 0) {
494  LOG_WITH_AIERR(Critical, "Error from getnameinfo", x_errno);
495  buf[0] = 0;
496  }
497 #endif
498  return buf;
499 }
500 
501 const string&
503 {
504  if (s_HostName.empty()) {
505 #ifdef NCBI_OS_LINUX
506  char buf[256];
507  if (gethostname(buf, sizeof(buf)))
508  buf[0] = 0;
509  s_HostName = buf;
510 #endif
511  }
512  return s_HostName;
513 }
514 
515 Uint4
516 CTaskServer::GetIPByHost(const string& host)
517 {
518  Uint4 ip = 0;
519 #ifdef NCBI_OS_LINUX
520  ip = inet_addr(host.c_str());
521  if (ip == INADDR_NONE) {
522  struct addrinfo in_addr;
523  memset(&in_addr, 0, sizeof(in_addr));
524  in_addr.ai_family = AF_INET;
525  struct addrinfo* out_addr = NULL;
526  int x_errno = getaddrinfo(host.c_str(), NULL, &in_addr, &out_addr);
527  if (x_errno) {
528  LOG_WITH_AIERR(Critical, "Error from getaddrinfo", x_errno);
529  ip = 0;
530  }
531  else
532  ip = ((struct sockaddr_in*)out_addr->ai_addr)->sin_addr.s_addr;
533  freeaddrinfo(out_addr);
534  }
535 #endif
536  return ip;
537 }
538 
539 static void
541 {
542  task->CreateNewDiagCtx();
543  string peer(CTaskServer::IPToString(phost));
544  task->GetDiagCtx()->SetClientIP(peer);
546 
548  .PrintParam("_type", "conn")
549  .PrintParam("phost", peer)
550  .PrintParam("pport", pport)
551  .PrintParam("port", port)
552  .PrintParam("conn", task->m_ConnReqId);
553 
554  task->m_ReadBytes = task->m_WrittenBytes = 0;
555 }
556 
557 static void
559 {
560  s_Listener.m_SeenErrors[sock_idx] = s_ListenErrors[sock_idx];
561 
562  SListenSockInfo& sock_info = s_ListenSocks[sock_idx];
563  LOG_SOCK_ERROR(Critical, sock_info.fd, "Error in listening socket");
564 // try to reopen
565  s_CloseSocket(sock_info.fd, true);
566  s_CreateListeningSocket(sock_idx);
567 }
568 
569 static void
571 {
572  s_Listener.m_SeenEvents[sock_idx] = s_ListenEvents[sock_idx];
573  SListenSockInfo& sock_info = s_ListenSocks[sock_idx];
574  CSrvTime cmd_start = CSrvTime::Current();
575  for (;;) {
576 #ifdef NCBI_OS_LINUX
577  struct sockaddr_in addr;
578  socklen_t len = sizeof(addr);
579  int new_sock = accept(sock_info.fd, (struct sockaddr*)&addr, &len);
580  CSrvTime cmd_len = CSrvTime::Current();
581  cmd_len -= cmd_start;
582  Uint8 len_usec = cmd_len.AsUSec();
583  cmd_start = CSrvTime::Current();
584  if (cmd_len > s_AcceptDelay) {
585  SRV_LOG(Warning, "socket accept takes: " << len_usec << "us");
586  }
587  if (new_sock == -1) {
588  int x_errno = errno;
589  if (x_errno != EAGAIN && x_errno != EWOULDBLOCK) {
590  LOG_WITH_ERRNO(Critical, "Error accepting new socket", x_errno);
591  s_CloseSocket(sock_info.fd, true);
592  s_CreateListeningSocket(sock_idx);
593  }
594  break;
595  }
597  SRV_LOG(Error, "Number of open sockets " << s_TotalSockets
598  << " is bigger than hard limit " << s_HardSocketLimit
599  << ". Rejecting new connection.");
600  s_CloseSocket(new_sock, true);
601  continue;
602  }
603  if (!s_SetSocketNonBlock(new_sock) || !s_SetSocketOptions(new_sock)) {
604  s_CloseSocket(new_sock, true);
605  continue;
606  }
607 
608  CSrvSocketTask* task = sock_info.factory->CreateSocketTask();
609  task->m_Fd = new_sock;
610  task->m_PeerAddr = addr.sin_addr.s_addr;
611  task->m_PeerPort = addr.sin_port;
612  s_CreateDiagRequest(task, s_ListenSocks[sock_idx].port,
613  task->m_PeerAddr, task->m_PeerPort);
614  s_Threads[thread_num]->stat->SockOpenPassive();
616  if (!task->StartProcessing(thread_num, true))
617  task->Terminate();
618 #endif
619  }
620 }
621 
622 void
624 {
625 #ifdef NCBI_OS_LINUX
626  CSrvTime wait_time = s_JiffyTime;
627  Uint4 wait_msec = wait_time.NSec() / 1000000;
628  if (wait_msec == 0)
629  wait_msec = 1;
630  struct epoll_event events[kEpollEventsArraySize];
631  int res = epoll_wait(s_EpollFD, events, kEpollEventsArraySize, wait_msec);
632  if (res < 0) {
633  int x_errno = errno;
634  if (x_errno != EINTR)
635  LOG_WITH_ERRNO(Critical, "Error in epoll_wait", x_errno);
636  }
637  for (int i = 0; i < res; ++i) {
638  struct epoll_event& evt = events[i];
639  SSrvSocketInfo* sock_info = (SSrvSocketInfo*)evt.data.ptr;
640  if (sock_info->is_listening)
641  s_RegisterListenEvent((SListenSockInfo*)sock_info, evt.events);
642  else
643  s_RegisterClientEvent((CSrvSocketTask*)sock_info, evt.events);
644  }
645 #endif
646 }
647 
648 bool
650 {
651 #ifdef NCBI_OS_LINUX
652  s_EpollFD = epoll_create(1);
653  if (s_EpollFD == -1) {
654  LOG_WITH_ERRNO(Critical, "Cannot create epoll descriptor", errno);
655  return false;
656  }
657 #endif
658 
660  LOG_WITH_ERRNO(Critical, "Error in gethostname", errno);
661  return false;
662  }
663  return true;
664 }
665 
666 bool
668 {
669  if (s_CntListeningSocks == 0) {
670  SRV_LOG(Critical, "There's no listening sockets, shutting down");
671  return false;
672  }
673  if (!s_StartListening())
674  return false;
675  return true;
676 }
677 
678 void
680 {
681 #ifdef NCBI_OS_LINUX
682  close(s_EpollFD);
683 #endif
684 }
685 
686 static void
688 {
690  SSocketsData* socks = thr->socks;
691 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
692  socks->sock_list.push_front(task);
693 #else
694  socks->sock_list.push_front(*task);
695 #endif
696  ++socks->sock_cnt;
697 
699 }
700 
701 void
703 {
704 #if defined(NCBI_COMPILER_GCC) || defined(NCBI_COMPILER_ANY_CLANG)
706  int old_active[s_OldSocksDelBatch];
707 #else
708  CSrvSocketTask* old_socks[100];
709  int old_active[100];
710 #endif
711 
712  memset(old_socks, 0, sizeof(old_socks));
713  memset(old_active, 0, sizeof(old_active));
714 
715  // Search in the socket list sockets that were used least recently and
716  // that were not used for at least s_SocketTimeout seconds.
717  int limit_time = CSrvTime::CurSecs() - s_SocketTimeout;
718  Uint1 cnt_old = 0;
719  NON_CONST_ITERATE(TSockList, it, lst) {
720 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
721  CSrvSocketTask* task = *it;
722 #else
723  CSrvSocketTask* task = &*it;
724 #endif
725  int active = task->m_LastActive;
726  if (active >= limit_time)
727  continue;
728 
729  // Binary search to find the place where to put new socket in our sorted
730  // list of candidates for closing.
731  Uint1 low = 0, high = cnt_old;
732  while (high > low) {
733  Uint1 mid = (high + low) / 2;
734  if (old_active[mid] > active)
735  high = mid;
736  else
737  low = mid + 1;
738  }
739  if (low >= cnt_old && cnt_old == s_OldSocksDelBatch)
740  continue;
741  if (cnt_old == s_OldSocksDelBatch)
742  --cnt_old;
743  if (low < cnt_old) {
744  memmove(&old_socks[low + 1], &old_socks[low],
745  (cnt_old - low) * sizeof(old_socks[0]));
746  memmove(&old_active[low + 1], &old_active[low],
747  (cnt_old - low) * sizeof(old_active[0]));
748  }
749  old_socks[low] = task;
750  old_active[low] = active;
751  ++cnt_old;
752  }
753 
754  for (Uint1 i = 0; i < cnt_old; ++i) {
755  CSrvSocketTask* task = old_socks[i];
756  if (task->m_LastActive < limit_time) {
757  // We cannot physically close here not only because it can race with
758  // socket actually starting to do something but also because it can
759  // need to do some finalization before it will be possible to close
760  // and delete it.
761  task->m_NeedToClose = true;
762  task->SetRunnable();
763  }
764  }
765 }
766 
767 void
768 MoveAllSockets(SSocketsData* dst_socks, SSocketsData* src_socks)
769 {
770  // Move all sockets from src_socks to dst_socks.
771  dst_socks->sock_list.splice(dst_socks->sock_list.begin(), src_socks->sock_list);
772  dst_socks->sock_cnt += src_socks->sock_cnt;
773  src_socks->sock_cnt = 0;
774 }
775 
776 void
778 {
780  socks->sock_cnt = 0;
781 }
782 
783 void
785 {
786  TSockList& lst = socks->sock_list;
787  NON_CONST_ITERATE(TSockList, it, lst) {
788 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
789  CSrvSocketTask* task = *it;
790 #else
791  CSrvSocketTask* task = &*it;
792 #endif
793  if (task->m_ConnStartJfy != 0
795  && task->m_RegWriteEvts == task->m_SeenWriteEvts
796  && !task->m_RegError)
797  {
798  task->m_RegError = true;
799  task->SetRunnable();
800  }
801  }
802 }
803 
804 void
806 {
807  // Terminate all sockets which have their Terminate() method called already.
808  TSockList& lst = socks->sock_list;
809  ERASE_ITERATE(TSockList, it, lst) {
810 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
811  CSrvSocketTask* task = *it;
812 #else
813  CSrvSocketTask* task = &*it;
814 #endif
815  if (task->m_TaskFlags & fTaskNeedTermination) {
816  lst.erase(it);
817  MarkTaskTerminated(task);
818  }
819  }
820 
821  // Also ask some unused sockets to close if necessary
824 }
825 
826 void
828 {
829  TSockList& lst = socks->sock_list;
830  NON_CONST_ITERATE(TSockList, it, lst) {
831 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
832  (*it)->SetRunnable();
833 #else
834  it->SetRunnable();
835 #endif
836  }
837 }
838 
839 void
841 {
843 }
844 
845 static size_t
847 {
848  if (!task->m_SockHasRead && task->m_SeenReadEvts == task->m_RegReadEvts)
849  return 0;
850  if (size == 0)
851  return 0;
852 
853  ssize_t n_read = 0;
854 #ifdef NCBI_OS_LINUX
855 retry:
856  n_read = recv(task->m_Fd, buf, size, 0);
857  if (n_read == -1) {
858  int x_errno = errno;
859  if (x_errno == EINTR)
860  goto retry;
861  // We should have returned at the very top but due to some races
862  // we fell down here. Anyway we should avoid changing variables
863  // at the bottom.
864  if (x_errno == EWOULDBLOCK) {
865  return 0;
866  }
867  if (x_errno == EAGAIN) {
868  task->m_SeenReadEvts = task->m_RegReadEvts;
869  return 0;
870  }
871  LOG_WITH_ERRNO(Warning, "Error reading from socket", x_errno);
872  task->m_RegError = true;
873  n_read = 0;
874  }
875 #endif
876  task->m_ReadBytes += n_read;
877  task->m_SockHasRead = size_t(n_read) == size;
878  task->m_SockCanReadMore = n_read != 0
879  && (task->m_SockHasRead || !task->m_RegReadHup);
880 
881  return size_t(n_read);
882 }
883 
884 static size_t
885 s_WriteToSocket(CSrvSocketTask* task, const void* buf, size_t size)
886 {
887  if (!task->m_SockCanWrite && task->m_SeenWriteEvts == task->m_RegWriteEvts)
888  return 0;
889  if (size == 0)
890  return 0;
891 
892  task->m_SeenWriteEvts = task->m_RegWriteEvts;
893  ssize_t n_written = 0;
894 #ifdef NCBI_OS_LINUX
895 retry:
896  n_written = send(task->m_Fd, buf, size, 0);
897  if (n_written == -1) {
898  int x_errno = errno;
899  if (x_errno == EINTR)
900  goto retry;
901  if (x_errno == EAGAIN || x_errno == EWOULDBLOCK) {
902  // We should have returned at the very top but due to some races
903  // we fell down here. Anyway we should avoid changing variables
904  // at the bottom.
905  return 0;
906  }
907  LOG_WITH_ERRNO(Warning, "Error writing to socket", x_errno);
908  task->m_RegError = true;
909  n_written = 0;
910  }
911 #endif
912  task->m_WrittenBytes += n_written;
913  task->m_SockCanWrite = size_t(n_written) == size;
914 
915  return size_t(n_written);
916 }
917 
918 static inline void
920 {
921  if (pos != 0) {
922  if (pos > size) {
923  SRV_FATAL("IO buffer broken");
924  }
925  Uint2 new_size = size - pos;
926  memmove(buf, buf + pos, new_size);
927  size = new_size;
928  pos = 0;
929  }
930 }
931 
932 static inline void
934 {
935  if (task->m_RdPos < task->m_RdSize) {
936  char c = task->m_RdBuf[task->m_RdPos];
937  if (c == '\n' || c == '\0') {
938  ++task->m_RdPos;
939  task->m_CRMet = false;
940  }
941  }
942 }
943 
944 static inline Uint2
945 s_ReadFromBuffer(CSrvSocketTask* task, void* dest, size_t size)
946 {
947  Uint2 to_copy = task->m_RdSize - task->m_RdPos;
948  if (size < to_copy)
949  to_copy = Uint2(size);
950  memcpy(dest, task->m_RdBuf + task->m_RdPos, to_copy);
951  task->m_RdPos += to_copy;
952  return to_copy;
953 }
954 
955 static inline void
956 s_CopyData(CSrvSocketTask* task, const void* buf, Uint2 size)
957 {
958  memcpy(task->m_WrBuf + task->m_WrSize, buf, size);
959  task->m_WrSize += size;
960 }
961 
962 static inline size_t
963 s_WriteNoPending(CSrvSocketTask* task, const void* buf, size_t size)
964 {
965  if (size < kSockMinWriteSize) {
966  s_CopyData(task, buf, Uint2(size));
967  return size;
968  }
969  else {
970  return s_WriteToSocket(task, buf, size);
971  }
972 }
973 
974 static inline void
976 {
977  if (task->m_WrSize < task->m_WrPos) {
978  SRV_FATAL("IO buffer broken");
979  }
980  Uint2 n_written = Uint2(s_WriteToSocket(task,
981  task->m_WrBuf + task->m_WrPos,
982  task->m_WrSize - task->m_WrPos));
983  task->m_WrPos += n_written;
984 }
985 
986 static inline void
988 {
989  s_CompactBuffer(task->m_WrBuf, task->m_WrSize, task->m_WrPos);
990 }
991 
992 static void
994 {
995  CSrvSocketTask* dst = src->m_ProxyDst;
996  Uint8& size = src->m_ProxySize;
997 
998  if (src->NeedEarlyClose() || dst->NeedEarlyClose())
999  goto finish_with_error;
1000 
1001  while (size != 0) {
1002  if (src->m_RdPos < src->m_RdSize) {
1003  // If there's something in src's read buffer we should copy it first.
1004  Uint2 to_write = src->m_RdSize - src->m_RdPos;
1005  if (to_write > size)
1006  to_write = Uint2(size);
1007  // We call Write() so that it could figure out by itself whether
1008  // the new data should go to dst's write buffer, or to socket directly,
1009  // or some combination of that.
1010  Uint2 n_done = Uint2(dst->Write(src->m_RdBuf + src->m_RdPos, to_write));
1011  size -= n_done;
1012  src->m_RdPos += n_done;
1013  if (dst->NeedEarlyClose())
1014  goto finish_with_error;
1015  if (n_done < to_write) {
1016  // If there's still something left in src's read buffer then return
1017  // now.
1018  return;
1019  }
1020  continue;
1021  }
1022  // Read buffer in src is empty, we'll need to read directly from src's
1023  // socket. Now let's see how much we should read from there.
1024  Uint2 to_read = dst->m_WrMemSize - dst->m_WrSize;
1025  if (to_read == 0) {
1026  // Write buffer in dst is full, we need to flush it first.
1027  s_FlushData(dst);
1028  if (dst->NeedEarlyClose())
1029  goto finish_with_error;
1030  s_CompactWrBuffer(dst);
1031  to_read = dst->m_WrMemSize - dst->m_WrSize;
1032  if (to_read == 0) {
1033  // If nothing was flushed we can't continue further.
1034  return;
1035  }
1036  }
1037  if (to_read > size)
1038  to_read = Uint2(size);
1039 
1040  Uint2 n_done;
1041  if (to_read < kSockReadBufSize) {
1042  // If very small amount is needed (either because nothing else should
1043  // be proxied or because the rest of write buffer in dst is filled)
1044  // then we better read into src's read buffer first and then copy
1045  // whatever is necessary into dst's write buffer.
1046  src->ReadToBuf();
1047  if (src->NeedEarlyClose())
1048  goto finish_with_error;
1049  _ASSERT(src->m_RdPos == 0);
1050  n_done = src->m_RdSize;
1051  if (n_done > to_read)
1052  n_done = to_read;
1053  memcpy(dst->m_WrBuf + dst->m_WrSize, src->m_RdBuf, n_done);
1054  src->m_RdPos = n_done;
1055  }
1056  else {
1057  // If amount we need is pretty big we'll read directly from src's
1058  // socket into dst's write buffer. And later dst's write buffer will
1059  // be flushed into dst's socket.
1060  n_done = Uint2(s_ReadFromSocket(src, dst->m_WrBuf + dst->m_WrSize,
1061  to_read));
1062  if (src->NeedEarlyClose())
1063  goto finish_with_error;
1064  }
1065  if (n_done == 0) {
1066  // If nothing was copied in the above if/else then we should return
1067  // and wait when more data will be available in src's socket.
1068  return;
1069  }
1070 
1071  dst->m_WrSize += n_done;
1072  size -= n_done;
1073  if (dst->m_WrSize >= kSockMinWriteSize) {
1074  // If dst's write buffer is already filled enough we can flush data
1075  // into the socket right now.
1076  s_FlushData(dst);
1077  if (dst->NeedEarlyClose())
1078  goto finish_with_error;
1079  s_CompactWrBuffer(dst);
1080  }
1081  }
1082  goto finish_proxy;
1083 
1084 finish_with_error:
1085  src->m_ProxyHadError = true;
1086  dst->m_ProxyHadError = true;
1087 
1088 finish_proxy:
1089  src->m_ProxyDst = NULL;
1090  dst->m_ProxySrc = NULL;
1091  dst->SetRunnable();
1092 }
1093 
1094 
1095 
1096 
1097 bool
1099 {
1100  bool result = false;
1102  Uint1 idx = s_CntListeningSocks;
1103  if (idx == kMaxCntListeningSocks)
1104  goto unlock_and_exit;
1105 
1106  SListenSockInfo* sock_info;
1107  sock_info = &s_ListenSocks[idx];
1108  sock_info->is_listening = true;
1109  sock_info->index = idx;
1110  sock_info->port = port;
1111  sock_info->factory = factory;
1112  ACCESS_ONCE(s_CntListeningSocks) = idx + 1;
1113 
1115  result = true;
1116 
1117 unlock_and_exit:
1119  return result;
1120 }
1121 
1122 
1124 {
1125 #if __NC_TASKS_MONITOR
1126  m_TaskName = "CSrvListener";
1127 #endif
1128 }
1129 
1131 {}
1132 
1133 void
1135 {
1136 // process events added by main thread
1137  Uint1 cnt_listen = ACCESS_ONCE(s_CntListeningSocks);
1138  for (Uint1 i = 0; i < cnt_listen; ++i) {
1139  if (m_SeenErrors[i] != s_ListenErrors[i])
1141  if (m_SeenEvents[i] != s_ListenEvents[i])
1142  s_ProcessListenEvent(i, thread_idx);
1143 
1144  if (CTaskServer::IsInShutdown()) {
1145  SListenSockInfo& sock_info = s_ListenSocks[i];
1146  if (sock_info.fd != -1)
1147  s_CloseSocket(sock_info.fd, false);
1148  sock_info.fd = -1;
1149  }
1150  }
1151 }
1152 
1153 
1155  : m_ProxySrc(NULL),
1156  m_ProxyDst(NULL),
1157  m_ConnStartJfy(0),
1158  m_Fd(-1),
1159  m_RdSize(0),
1160  m_RdPos(0),
1161  m_WrMemSize(kSockWriteBufSize),
1162  m_WrSize(0),
1163  m_WrPos(0),
1164  m_CRMet(false),
1165  m_ProxyHadError(false),
1166  m_SockHasRead(false),
1167  m_SockCanWrite(false),
1168  m_SockCanReadMore(true),
1169  m_NeedToClose(false),
1170  m_NeedToFlush(false),
1171  m_SeenReadEvts(0),
1172  m_SeenWriteEvts(0),
1173  m_RegReadEvts(0),
1174  m_RegWriteEvts(0),
1175  m_RegReadHup(false),
1176  m_RegError(false),
1177  m_ErrorPrinted(false)
1178 {
1179  is_listening = false;
1180  m_RdBuf = (char*)malloc(kSockReadBufSize);
1181  m_WrBuf = (char*)malloc(kSockWriteBufSize);
1182 #if __NC_TASKS_MONITOR
1183  m_TaskName = "CSrvSocketTask";
1184 #endif
1185 }
1186 
1188 {
1189  if (m_Fd != -1) {
1190  SRV_LOG(Critical, "SocketTask failed to close socket");
1191  s_CloseSocket(m_Fd, true);
1192  }
1193  free(m_RdBuf);
1194  free(m_WrBuf);
1195 }
1196 
1197 void
1199 {
1200  if (!m_ErrorPrinted) {
1201  LOG_SOCK_ERROR(Warning, m_Fd, "Error in the socket");
1202  m_ErrorPrinted = true;
1203  }
1204 }
1205 
1206 bool
1208 {
1210  Uint2 n_read = Uint2(s_ReadFromSocket(this, m_RdBuf + m_RdSize,
1212  m_RdSize += n_read;
1213  if (m_CRMet)
1214  s_ReadLF(this);
1215  return m_RdSize > 0;
1216 }
1217 
1218 bool
1220 {
1221  if (!ReadToBuf())
1222  return false;
1223 
1224  Uint2 crlf_pos;
1225  for (crlf_pos = m_RdPos; crlf_pos < m_RdSize; ++crlf_pos) {
1226  char c = m_RdBuf[crlf_pos];
1227  if (c == '\n' || c == '\r' || c == '\0')
1228  break;
1229  }
1230  if (crlf_pos >= m_RdSize) {
1231  if (m_RdSize == kSockReadBufSize) {
1232  SRV_LOG(Warning, "Too long line in the protocol - at least "
1233  << m_RdSize << " bytes");
1234  m_RegError = true;
1235  }
1236  return false;
1237  }
1238 
1239  line->assign(m_RdBuf + m_RdPos, crlf_pos - m_RdPos);
1240  if (m_RdBuf[crlf_pos] == '\r') {
1241  m_CRMet = true;
1242  ++crlf_pos;
1243  }
1244  m_RdPos = crlf_pos;
1245  s_ReadLF(this);
1246  return true;
1247 }
1248 
1249 size_t
1251 {
1252  size_t n_read = 0;
1253  if (size != 0 && m_RdPos < m_RdSize) {
1254  Uint2 copied = s_ReadFromBuffer(this, buf, size);
1255  n_read += copied;
1256  buf = (char*)buf + copied;
1257  size -= copied;
1258  }
1259  if (size == 0)
1260  return n_read;
1261 
1262  if (size < kSockReadBufSize) {
1263  if (ReadToBuf())
1264  n_read += s_ReadFromBuffer(this, buf, size);
1265  }
1266  else {
1267  n_read += s_ReadFromSocket(this, buf, size);
1268  }
1269  return n_read;
1270 }
1271 
1272 size_t
1273 CSrvSocketTask::Write(const void* buf, size_t size)
1274 {
1275  Uint2 has_size = m_WrSize - m_WrPos;
1276  if (has_size == 0) {
1277  // If there's nothing in our write buffer then we either copy given data
1278  // into write buffer or (if it's too much of data) write directly into
1279  // socket.
1280  return s_WriteNoPending(this, buf, size);
1281  }
1282  else if (has_size + size <= kSockWriteBufSize) {
1283  // If write buffer has room for the new data then just copy data into
1284  // the write buffer.
1285  s_CompactWrBuffer(this);
1286  s_CopyData(this, buf, Uint2(size));
1287  return size;
1288  }
1289  else if (has_size < kSockMinWriteSize) {
1290  // If write buffer doesn't have enough room to fit the new data and
1291  // amount of data it already has is too small then we copy part of the new
1292  // data to fulfill minimum requirements and then write the rest to the
1293  // socket directly, because it's guaranteed that the rest will be more
1294  // than kSockMinWriteSize (see comment to kSockWriteBufSize - it's guaranteed
1295  // that kSockWriteBufSize is at least 2 times bigger than kSockMinWriteSize).
1296  Uint2 to_copy = kSockMinWriteSize - has_size;
1297  s_CompactWrBuffer(this);
1298  s_CopyData(this, buf, to_copy);
1299  s_FlushData(this);
1300  if (IsWriteDataPending())
1301  return to_copy;
1302  s_CompactWrBuffer(this);
1303 
1304  buf = (const char*)buf + to_copy;
1305  size -= to_copy;
1306  size_t n_written = s_WriteToSocket(this, buf, size);
1307  return to_copy + n_written;
1308  }
1309  else {
1310  // If write buffer already has enough data to fulfill requirement of minimum
1311  // amount to write to socket then we just flush it and then we are back
1312  // to the very first "if" above when we don't have anything in our
1313  // write buffer.
1314  s_FlushData(this);
1315  if (IsWriteDataPending())
1316  return 0;
1317  s_CompactWrBuffer(this);
1318  return s_WriteNoPending(this, buf, size);
1319  }
1320 }
1321 
1322 void
1323 CSrvSocketTask::WriteData(const void* buf, size_t size)
1324 {
1326  SRV_FATAL("IO buffer overflow");
1327  }
1328  if (m_WrSize + size > m_WrMemSize) {
1329  m_WrBuf = (char*)realloc(m_WrBuf, m_WrSize + size);
1331  }
1332  s_CopyData(this, buf, Uint2(size));
1333 }
1334 
1335 void
1337 {
1338  if (HasError() || !IsWriteDataPending())
1339  return;
1340 
1341  s_FlushData(this);
1342  if (!IsWriteDataPending())
1343  s_CompactWrBuffer(this);
1344  else
1345  m_NeedToFlush = true;
1346 }
1347 
1348 void
1350  Uint8 proxy_size)
1351 {
1352  m_ProxyDst = dst_task;
1353  m_ProxySize = proxy_size;
1354  m_ProxyHadError = false;
1355  dst_task->m_ProxyHadError = false;
1356  dst_task->m_ProxySrc = this;
1357  s_DoDataProxy(this);
1358 }
1359 
1360 void
1362 {
1363 // remember last activity time stamp (to close inactive ones later on)
1365 
1366 // if just connected, check for timeout
1367  if (m_ConnStartJfy != 0) {
1369  m_ConnStartJfy = 0;
1370  else if (s_CurJiffies - m_ConnStartJfy > s_ConnTimeout) {
1371  m_RegError = true;
1372  m_ConnStartJfy = 0;
1373  SRV_LOG(Warning, "Connection has timed out");
1374  }
1375  }
1376 
1377  if (m_ProxyDst) {
1378 // to just forward data
1380  s_DoDataProxy(this);
1381  if (!m_ProxyDst)
1382  ExecuteSlice(thr_num);
1383  }
1384  else {
1386  if (src) {
1387  // Proxying is in progress and by design all proxying should be done
1388  // by source socket. So we just transfer this event to it -- probably
1389  // we just became writable and this proxying will be able to continue.
1390  src->SetRunnable();
1391  }
1392  else if (!m_NeedToFlush) {
1393  // In the most often "standard" situation (when there's no proxying
1394  // involved and there's no need to flush write buffers) we will end up
1395  // here and call code from derived class.
1396  ExecuteSlice(thr_num);
1397  }
1398  else {
1399  s_FlushData(this);
1400  if (!IsWriteDataPending() || NeedEarlyClose()) {
1401  // If this socket was already closed by client or server should
1402  // urgently shutdown then we don't care about lost data in the
1403  // write buffers. Otherwise we'll be here when all data has been
1404  // flushed and thus it's time for code in the derived class
1405  // to execute.
1406  s_CompactWrBuffer(this);
1407  m_NeedToFlush = false;
1408  ACCESS_ONCE(m_FlushIsDone) = true;
1409  ExecuteSlice(thr_num);
1410  }
1411  }
1412  }
1413 
1415  && m_Fd != -1)
1416  {
1417  // Ask kernel to give us data from client as quickly as possible.
1419  }
1420 
1422 }
1423 
1424 bool
1426 {
1427  if (m_Fd != -1) {
1428  s_CloseSocket(m_Fd, true);
1429  m_Fd = -1;
1430  }
1431  m_RegError = false;
1432 
1433 #ifdef NCBI_OS_LINUX
1434  int sock = socket(AF_INET, SOCK_STREAM, 0);
1435  if (sock == -1) {
1436  LOG_WITH_ERRNO(Critical, "Cannot create socket", errno);
1437  goto error_return;
1438  }
1439  if (!s_SetSocketNonBlock(sock) || !s_SetSocketOptions(sock))
1440  goto close_and_error;
1441 
1442  struct sockaddr_in addr;
1443  memset(&addr, 0, sizeof(addr));
1444  addr.sin_family = AF_INET;
1445  addr.sin_addr.s_addr = host;
1446  addr.sin_port = htons(port);
1447 retry:
1448  int res;
1449  res = connect(sock, (struct sockaddr*)&addr, sizeof(addr));
1450  if (res) {
1451  int x_errno = errno;
1452  if (x_errno == EINTR)
1453  goto retry;
1454  if (x_errno != EINPROGRESS) {
1455  LOG_WITH_ERRNO(Critical, "Cannot connect socket", x_errno);
1456  goto close_and_error;
1457  }
1458  }
1459 
1461  m_Fd = sock;
1462  s_CreateDiagRequest(this, GetLocalPort(), host, port);
1465  return true;
1466 
1467 close_and_error:
1468  s_CloseSocket(sock, true);
1469 error_return:
1470 #endif
1471  return false;
1472 }
1473 
1474 bool
1475 CSrvSocketTask::StartProcessing(TSrvThreadNum thread_num /* = 0 */, bool boost /*= false*/)
1476 {
1477  s_SaveSocket(this);
1478  m_LastThread = (thread_num? thread_num: GetCurThread()->thread_num);
1479 
1480 #ifdef NCBI_OS_LINUX
1481  struct epoll_event evt;
1482  evt.events = EPOLLIN | EPOLLOUT | EPOLLET;
1483  evt.data.ptr = (SSrvSocketInfo*)this;
1484  int res = epoll_ctl(s_EpollFD, EPOLL_CTL_ADD, m_Fd, &evt);
1485  if (res) {
1486  LOG_WITH_ERRNO(Critical, "Cannot add socket to epoll", errno);
1487  return false;
1488  }
1489 #endif
1490 
1491  SetRunnable(boost);
1492  return true;
1493 }
1494 
1495 Uint2
1497 {
1498 #ifdef NCBI_OS_LINUX
1499  struct sockaddr_in addr;
1500  socklen_t len = sizeof(addr);
1501  memset(&addr, 0, len);
1502  if (getsockname(m_Fd, (struct sockaddr*)&addr, &len) == 0)
1503  return ntohs(addr.sin_port);
1504 
1505  LOG_WITH_ERRNO(Critical, "Cannot read local port of socket", errno);
1506 #endif
1507 
1508  return 0;
1509 }
1510 
1511 void
1513 {
1514  port = m_PeerPort;
1516 }
1517 
1518 void
1520 {
1521  if (m_Fd == -1)
1522  return;
1523  s_CloseSocket(m_Fd, do_abort);
1524  s_CleanSockResources(this);
1525 }
1526 
1527 void
1529 {
1530  if (m_Fd != -1)
1531  CloseSocket();
1532  MarkTaskTerminated(this, false);
1533 }
1534 
1535 
1537 {}
1538 
1540 {}
1541 
void MarkTaskTerminated(CSrvTask *task, bool immediate)
Definition: scheduler.cpp:466
Mutex created to have minimum possible size (its size is 4 bytes) and to sleep using kernel capabilit...
Definition: srv_sync.hpp:193
void Unlock(void)
Unlock the mutex.
Definition: srv_sync.cpp:119
void Lock(void)
Lock the mutex.
Definition: srv_sync.cpp:108
CNcbiRegistry –.
Definition: ncbireg.hpp:913
Class used in all diagnostic logging.
Definition: srv_diag.hpp:73
const CSrvDiagMsg & StartSrvLog(ESeverity sev, const char *file, int line, const char *func) const
Starts log message which will include severity, filename, line number and function name.
Definition: logging.cpp:909
CSrvDiagMsg & StartRequest(void)
Starts "request-start" message.
Definition: logging.cpp:998
CSrvDiagMsg & PrintParam(CTempString name, CTempString value)
Adds parameter to "request-start" or "extra" message.
Definition: logging.cpp:1040
void StopRequest(void)
Prints "request-stop" message.
Definition: logging.cpp:1084
static bool IsSeverityVisible(ESeverity sev)
Checks if given severity level is visible, i.e.
Definition: logging.cpp:903
ESeverity
Severity levels for logging.
Definition: srv_diag.hpp:79
virtual ~CSrvListener(void)
Uint4 m_SeenErrors[kMaxCntListeningSocks]
Per-listening-socket numbers copied from s_ListenErrors when errors are processed.
virtual void ExecuteSlice(TSrvThreadNum thread_idx)
This is the main method to do all work this task should do.
Uint4 m_SeenEvents[kMaxCntListeningSocks]
Per-listening-socket numbers copied from s_ListenEvents when events are processed.
Factory that creates CSrvSocketTask-derived object for each connection coming to listening port which...
virtual CSrvSocketTask * CreateSocketTask(void)=0
virtual ~CSrvSocketFactory(void)
Task controlling a socket.
Uint2 m_WrPos
Position of current writing pointer in the write buffer.
Uint8 m_ConnStartJfy
Jiffy number when Connect() method was called.
bool Connect(Uint4 host, Uint2 port)
Create new socket and connect it to given IP and port.
void x_CloseSocket(bool do_abort)
Close or abort the socket – they have little difference, thus they joined in one method.
bool m_RegReadHup
Flag showing if epoll returned RDHUP on this socket.
CSrvSocketTask * m_ProxySrc
Source task for proxying.
Uint1 m_SeenWriteEvts
Number of last write event seen by Write() when it wrote to socket.
bool HasError(void)
Checks if socket has some error in it.
bool m_CRMet
Flag showing if '\r' symbol was seen at the end of last line but ' ' wasn't seen yet.
Uint8 m_ReadBytes
Total number of bytes read from socket.
int m_Fd
File descriptor for the socket.
size_t Write(const void *buf, size_t size)
Write into the socket as much as immediately possible (including writing into internal write buffers ...
Uint2 m_PeerPort
Remembered peer port.
CSrvSocketTask & WriteText(CTempString message)
Write text into socket.
CSrvSocketTask & WriteNumber(NumType num)
Write number into socket as string, i.e.
void GetPeerAddress(string &host, Uint2 &port)
Get peer IP and port for this socket.
void CloseSocket(void)
Close the socket gracefully, i.e.
Uint2 m_WrMemSize
Size of memory allocated for write buffer.
Uint8 m_ProxySize
Amount left to proxy if proxying operation is in progress.
bool m_NeedToFlush
Flag showing that task needs to flush all write buffers.
virtual void InternalRunSlice(TSrvThreadNum thr_num)
Internal function to execute time slice work.
virtual void Terminate(void)
Terminate the task.
bool m_NeedToClose
Flag showing that socket needs to be closed because of long inactivity.
char * m_RdBuf
Read buffer.
Uint4 m_PeerAddr
Remembered peer IP address.
bool m_ErrorPrinted
Flag showing if pending error in socket was printed in logs.
bool m_ProxyHadError
Flag showing that last proxying operation finished with error.
bool m_SockCanReadMore
Flag showing that socket can have more reads, i.e. there was no EOF yet.
bool m_FlushIsDone
Flag showing that write buffers were flushed.
bool m_RegError
Flag showing if there's error pending on the socket.
CSrvSocketTask * m_ProxyDst
Destination task for proxying.
bool ReadToBuf(void)
Read from socket into internal buffer.
Uint2 m_RdPos
Position of current reading in the read buffer, i.e.
void Flush(void)
Flush all data saved in internal write buffers to socket.
size_t Read(void *buf, size_t size)
Read from socket into memory.
char * m_WrBuf
Write buffer.
virtual ~CSrvSocketTask(void)
Uint2 GetLocalPort(void)
Get local port this socket was created on.
void WriteData(const void *buf, size_t size)
Write the exact amount of data into the socket.
Uint1 m_SeenReadEvts
Number of last read event seen by Read() when it read from socket.
bool IsWriteDataPending(void)
Checks if there's some data pending in write buffers and waiting to be sent to kernel.
Uint2 m_WrSize
Size of data in the write buffer waiting for writing.
Uint8 m_WrittenBytes
Total number of bytes written to socket.
Uint2 m_RdSize
Size of data available for reading in the read buffer.
bool m_SockCanWrite
Flag showing that socket is writable.
bool StartProcessing(TSrvThreadNum thread_num=0, bool boost=false)
Start processing of the socket and include it into TaskServer's central epoll.
bool ReadLine(CTempString *line)
Read from socket one line which ends with ' ', '\r ' or '\0'.
bool NeedEarlyClose(void)
Checks if socket should be closed because of internal reasons (long inactivity or "hard" shutdown as ...
void x_PrintError(void)
Prints socket's error if there's any error pending on the socket.
void StartProxyTo(CSrvSocketTask *dst_task, Uint8 proxy_size)
Start proxying of raw data from this socket to the one in dst_task.
Uint1 m_RegWriteEvts
Counter of "writable" events received from epoll.
bool m_SockHasRead
Flag showing that socket is readable.
Uint1 m_RegReadEvts
Counter of "readable" events received from epoll.
void SockOpenActive(void)
Definition: srv_stat.cpp:207
void SockClose(int status, Uint8 open_time)
Definition: srv_stat.cpp:219
void SockOpenPassive(void)
Definition: srv_stat.cpp:213
void ErrorOnSocket(void)
Definition: srv_stat.cpp:229
Main working entity in TaskServer.
Definition: srv_tasks.hpp:88
TSrvTaskFlags m_TaskFlags
Bit-OR of flags for this task.
Definition: srv_tasks.hpp:209
int m_LastActive
Time (in seconds) when the task was active last time, i.e.
Definition: srv_tasks.hpp:212
CRequestContext * GetDiagCtx(void)
Get current diagnostic context for the task.
TSrvThreadNum m_LastThread
Thread number where this task was executed last time.
Definition: srv_tasks.hpp:206
virtual void ExecuteSlice(TSrvThreadNum thr_num)=0
This is the main method to do all work this task should do.
void SetRunnable(bool boost=false)
Set this task "runnable", i.e.
Definition: scheduler.cpp:618
void ReleaseDiagCtx(void)
Releases current diagnostic context of the task.
Definition: logging.cpp:1257
void CreateNewDiagCtx(void)
Create new diagnostic context for this task to work in.
Definition: logging.cpp:1249
Class incorporating convenient methods to work with struct timespec.
Definition: srv_time.hpp:61
static int CurSecs(void)
Current time in seconds since epoch (time_t).
static CSrvTime Current(void)
Exact current time with precision up to nanoseconds.
Uint8 AsUSec(void) const
Converts object's value to microseconds since epoch.
long & NSec(void)
Read/set number of nanoseconds stored in the object.
static const string & GetHostName(void)
Returns name of server this application is executing on.
static string GetHostByIP(Uint4 ip)
Converts 4-byte encoded IP address into server name.
static bool AddListeningPort(Uint2 port, CSrvSocketFactory *factory)
Adds port for TaskServer to listen to.
static bool IsRunning(void)
Checks if TaskServer is running now, i.e.
static string IPToString(Uint4 ip)
Converts 4-byte encoded IP address into its string representation.
static bool IsInShutdown(void)
Checks if TaskServer received request to shutdown.
static Uint4 GetIPByHost(const string &host)
Converts server name (or IP address written as string) to encoded 4-byte IP address.
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
int close(int fd)
Definition: connection.cpp:45
static const char ip[]
Definition: des.c:75
CS_CONTEXT * ctx
Definition: t0006.c:12
#define true
Definition: bool.h:35
#define false
Definition: bool.h:36
#define getnameinfo(a, b, c, d, e, f, g)
Definition: replacements.h:100
#define getaddrinfo(n, s, h, r)
Definition: replacements.h:99
#define addrinfo
Definition: replacements.h:98
#define freeaddrinfo(a)
Definition: replacements.h:101
#define ERASE_ITERATE(Type, Var, Cont)
Non-constant version with ability to erase current element, if container permits.
Definition: ncbimisc.hpp:843
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
#define NULL
Definition: ncbistd.hpp:225
void SetClientIP(const string &client)
TCount GetRequestID(void) const
Get request ID (or zero if not set).
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1203
void Error(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1197
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
uint8_t Uint1
1-byte (8-bit) unsigned integer
Definition: ncbitype.h:99
int16_t Int2
2-byte (16-bit) signed integer
Definition: ncbitype.h:100
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
uint16_t Uint2
2-byte (16-bit) unsigned integer
Definition: ncbitype.h:101
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
virtual int GetInt(const string &section, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
Definition: ncbireg.cpp:362
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
CTempString & assign(const char *src_str, size_type len)
Assign new values to the content of the a string.
Definition: tempstr.hpp:733
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
static string UInt8ToString(Uint8 value, TNumToStringFlags flags=0, int base=10)
Convert UInt8 to string.
Definition: ncbistr.hpp:5168
int socklen_t
Definition: config.h:95
FILE * file
char * buf
int i
if(yy_accept[yy_current_state])
int len
constexpr bool empty(list< Ts... >) noexcept
const struct ncbi::grid::netcache::search::fields::SIZE size
const GenericPointer< typename T::ValueType > T2 value
Definition: pointer.h:1227
#define INADDR_NONE
Definition: ncbi_ifconf.h:47
#define INADDR_ANY
Definition: ncbi_ifconf.h:50
static size_t x_size(const char *dst, size_t len, const char *ptr)
Definition: ncbi_iprange.c:138
int ssize_t
Definition: ncbiconf_msvc.h:93
Process information in the NCBI Registry, including working with configuration files.
T max(T x_, T y_)
#define memmove(a, b, c)
char * strerror(int n)
Definition: pcregrep.c:835
static const char * prefix[]
Definition: pcregrep.c:405
Defines CRequestContext class for NCBI C++ diagnostic API.
static void s_FlushData(CSrvSocketTask *task)
void CheckConnectsTimeout(SSocketsData *socks)
intr::list< CSrvSocketTask, intr::base_hook< TSrvSockListHook >, intr::constant_time_size< false > > TSockList
Definition: sockets_man.cpp:95
static void s_CloseSocket(int fd, bool do_abort)
bool ReConfig_Sockets(const CTempString &section, const CNcbiRegistry &new_reg, string &)
void ReleaseThreadSocks(SSrvThread *thr)
void RequestStopListening(void)
bool InitSocketsMan(void)
static void s_CopyData(CSrvSocketTask *task, const void *buf, Uint2 size)
static void s_ProcessListenError(Uint1 sock_idx)
static size_t s_ReadFromSocket(CSrvSocketTask *task, void *buf, size_t size)
#define EPOLLERR
Definition: sockets_man.cpp:63
bool StartSocketsMan(void)
static bool s_StartListening(void)
static void s_LogSocketError(CSrvDiagMsg::ESeverity severity, int fd, const char *prefix, const char *file, int line, const char *func)
static void s_CompactWrBuffer(CSrvSocketTask *task)
static const Uint1 kEpollEventsArraySize
Definition: sockets_man.cpp:72
static void s_CreateDiagRequest(CSrvSocketTask *task, Uint2 port, Uint4 phost, Uint2 pport)
static string s_HostName
void s_DeleteOldestSockets(TSockList &lst)
static void s_DoDataProxy(CSrvSocketTask *src)
#define LOG_WITH_ERRNO(sev, msg, x_errno)
static size_t s_WriteNoPending(CSrvSocketTask *task, const void *buf, size_t size)
SSrvThread ** s_Threads
Definition: threads_man.cpp:59
static void s_RegisterClientEvent(CSrvSocketTask *task, Uint4 event)
void MoveAllSockets(SSocketsData *dst_socks, SSocketsData *src_socks)
#define EPOLLOUT
Definition: sockets_man.cpp:62
void ConfigureSockets(const CNcbiRegistry *reg, CTempString section)
Uint8 s_CurJiffies
Definition: time_man.cpp:53
void AssignThreadSocks(SSrvThread *thr)
static const Uint2 kSockMinWriteSize
Definition: sockets_man.cpp:76
#define LOG_SOCK_ERROR(sev, fd, prefix)
static Uint4 s_ListenEvents[kMaxCntListeningSocks]
static Uint1 s_CntListeningSocks
static int s_EpollFD
static const Uint2 kSockReadBufSize
1000 below is chosen to be a little bit less than maximum packet size in Ethernet (~1500 bytes).
Definition: sockets_man.cpp:75
static const Uint2 kSockWriteBufSize
In calculations in the file it's assumed that kSockWriteBufSize is at least twice as large as kSockMi...
Definition: sockets_man.cpp:79
void CleanSocketList(SSocketsData *socks)
static Uint1 s_OldSocksDelBatch
#define EPOLLIN
Definition: sockets_man.cpp:61
int s_SoftSocketLimit
void DoSocketWait(void)
static CSrvListener s_Listener
static SListenSockInfo s_ListenSocks[kMaxCntListeningSocks]
static Uint8 s_ConnTimeout
static Uint8 s_AcceptDelay
static Uint2 s_ReadFromBuffer(CSrvSocketTask *task, void *dest, size_t size)
static void s_LogWithErrStr(CSrvDiagMsg::ESeverity severity, const char *err_msg, int x_errno, const char *errno_str, const char *file, int line, const char *func)
static void s_LogWithErrno(CSrvDiagMsg::ESeverity severity, const char *err_msg, int x_errno, const char *file, int line, const char *func)
static void s_SaveSocket(CSrvSocketTask *task)
void WriteSetup_Sockets(CSrvSocketTask &task)
static bool s_SetSocketOptions(int sock)
static void s_SetSocketQuickAck(int sock)
int s_HardSocketLimit
static void s_CleanSockResources(CSrvSocketTask *task)
static void s_CompactBuffer(char *buf, Uint2 &size, Uint2 &pos)
static const Uint1 kMaxCntListeningSocks
16 Uint4s on x86_64 is the size of CPU's cacheline.
Definition: sockets_man.cpp:82
static CMiniMutex s_ListenSocksLock
static void s_ReadLF(CSrvSocketTask *task)
void SetAllSocksRunnable(SSocketsData *socks)
void PromoteSockAmount(SSocketsData *socks)
static bool s_CreateListeningSocket(Uint1 idx)
static void s_RegisterListenEvent(SListenSockInfo *sock_info, Uint4 event)
void FinalizeSocketsMan(void)
static void s_ProcessListenEvent(Uint1 sock_idx, TSrvThreadNum thread_num)
static Uint4 s_ListenErrors[kMaxCntListeningSocks]
static size_t s_WriteToSocket(CSrvSocketTask *task, const void *buf, size_t size)
#define EPOLLHUP
Definition: sockets_man.cpp:64
CSrvTime s_JiffyTime
Definition: time_man.cpp:54
int s_AllSocketsCount
int s_TotalSockets
static int s_SocketTimeout
static bool s_SetSocketNonBlock(int sock)
#define EPOLLRDHUP
Definition: sockets_man.cpp:65
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
Definition: srv_diag.hpp:162
#define SRV_FATAL(msg)
Definition: srv_diag.hpp:173
intr::list_base_hook< intr::tag< SSrvSockList_tag > > TSrvSockListHook
Definition: srv_sockets.hpp:78
T AtomicAdd(T volatile &var, T add_value)
Definition: srv_sync.hpp:69
#define ACCESS_ONCE(x)
Purpose of this macro is to force compiler to access variable exactly at the place it's written (no m...
Definition: srv_sync.hpp:51
T AtomicSub(T volatile &var, T sub_value)
Definition: srv_sync.hpp:76
@ kUSecsPerSecond
Definition: srv_time.hpp:46
@ kNSecsPerMSec
Definition: srv_time.hpp:47
@ kUSecsPerMSec
Definition: srv_time.hpp:44
CSrvSocketFactory * factory
Factory that will create CSrvSocketTask for each incoming socket.
Uint2 port
Port to listen to.
int fd
File descriptor for the listening socket.
Uint1 index
Index in the s_ListenSocks array.
Per-thread structure containing information about sockets.
Int2 sock_cnt
"Number of sockets" that this thread created/deleted.
TSockList sock_list
List of all open and not yet deleted sockets which were opened in this thread.
SSocketsData(void)
For TaskServer's internal use only.
Definition: srv_sockets.hpp:46
CSrvStat * stat
Definition: threads_man.hpp:93
TSrvThreadNum thread_num
Definition: threads_man.hpp:79
Uint2 TSrvThreadNum
Type for thread number in TaskServer.
Definition: task_server.hpp:42
@ fTaskNeedTermination
Definition: task_server.hpp:51
#define _ASSERT
CRef< CTestThread > thr[k_NumThreadsMax]
Definition: test_mt.cpp:267
SSrvThread * GetCurThread(void)
Definition: threads_man.cpp:82
else result
Definition: token2.c:20
void free(voidpf ptr)
voidp malloc(uInt size)
Modified on Sun Apr 21 03:40:22 2024 by modify_doxy.py rev. 669887