40 # include <sys/types.h>
41 # include <sys/socket.h>
42 # include <netinet/ip.h>
43 # include <netinet/tcp.h>
44 # include <netinet/in.h>
45 # include <arpa/inet.h>
47 # include <sys/epoll.h>
54 # define EPOLLRDHUP POLLRDHUP
56 # define EPOLLRDHUP 0x2000
61 # define EPOLLIN 0x0001
62 # define EPOLLOUT 0x0004
63 # define EPOLLERR 0x0008
64 # define EPOLLHUP 0x0010
65 # define EPOLLRDHUP 0x2000
85 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_MEMBER_HOOK
88 &CSrvSocketTask::m_SockListHook> TSockListHookOpt;
91 intr::constant_time_size<false> >
TSockList;
92 #elif NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_BASE_HOOK
94 intr::base_hook<TSrvSockListHook>,
96 #elif NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
97 typedef std::list<CSrvSocketTask*>
TSockList;
198 string is(
"\": "), eol(
",\n\"");
222 const char* errno_str,
229 << err_msg <<
", errno=" << x_errno <<
" (" << errno_str <<
")";
245 #define LOG_WITH_ERRNO(sev, msg, x_errno) \
246 s_LogWithErrno(CSrvDiagMsg::sev, msg, x_errno, \
247 __FILE__, __LINE__, NCBI_CURRENT_FUNCTION) \
263 #define LOG_WITH_AIERR(sev, msg, x_aierr) \
264 s_LogWithAIErr(CSrvDiagMsg::sev, msg, x_aierr, \
265 __FILE__, __LINE__, NCBI_CURRENT_FUNCTION) \
274 int res = fcntl(sock, F_SETFL, O_NONBLOCK);
288 int res = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &
value,
sizeof(
value));
293 res = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &
value,
sizeof(
value));
307 int res = setsockopt(sock, IPPROTO_TCP, TCP_QUICKACK, &
value,
sizeof(
value));
318 int sock = socket(AF_INET, SOCK_STREAM, 0);
329 int res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &
value,
sizeof(
value));
333 struct sockaddr_in
addr;
335 addr.sin_family = AF_INET;
337 addr.sin_port = htons(sock_info.
port);
338 res = bind(sock, (
struct sockaddr*)&
addr,
sizeof(
addr));
340 string err_msg(
"Cannot bind socket to port ");
346 res = listen(sock, 128);
353 struct epoll_event evt;
354 evt.events =
EPOLLIN | EPOLLET;
355 evt.data.ptr = (
void*)&sock_info;
356 res = epoll_ctl(
s_EpollFD, EPOLL_CTL_ADD, sock, &evt);
414 int res = getsockopt(fd, SOL_SOCKET, SO_ERROR, &x_errno, &
x_size);
422 #define LOG_SOCK_ERROR(sev, fd, prefix) \
423 s_LogSocketError(CSrvDiagMsg::sev, fd, prefix, __FILE__, __LINE__, NCBI_CURRENT_FUNCTION)
435 res = setsockopt(fd, SOL_SOCKET, SO_LINGER, (
void*)&
lgr,
sizeof(
lgr));
441 res = setsockopt(fd, IPPROTO_TCP, TCP_LINGER2, (
void*)&
val,
sizeof(
val));
449 while (res && (x_errno = errno) == EINTR);
468 --
thr->socks->sock_cnt;
477 snprintf(
buf, 20,
"%u.%u.%u.%u", hb[0], hb[1], hb[2], hb[3]);
487 struct sockaddr_in
addr;
489 addr.sin_family = AF_INET;
490 addr.sin_addr.s_addr =
ip;
492 buf,
sizeof(
buf),
NULL, 0, NI_NAMEREQD | NI_NOFQDN);
494 LOG_WITH_AIERR(
Critical,
"Error from getnameinfo", x_errno);
507 if (gethostname(
buf,
sizeof(
buf)))
520 ip = inet_addr(host.c_str());
523 memset(&in_addr, 0,
sizeof(in_addr));
524 in_addr.ai_family = AF_INET;
528 LOG_WITH_AIERR(
Critical,
"Error from getaddrinfo", x_errno);
532 ip = ((
struct sockaddr_in*)out_addr->ai_addr)->sin_addr.s_addr;
577 struct sockaddr_in
addr;
579 int new_sock = accept(sock_info.
fd, (
struct sockaddr*)&
addr, &
len);
581 cmd_len -= cmd_start;
587 if (new_sock == -1) {
589 if (x_errno != EAGAIN && x_errno != EWOULDBLOCK) {
599 <<
". Rejecting new connection.");
609 task->
m_Fd = new_sock;
627 Uint4 wait_msec = wait_time.
NSec() / 1000000;
634 if (x_errno != EINTR)
637 for (
int i = 0;
i < res; ++
i) {
638 struct epoll_event& evt = events[
i];
691 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
704 #if defined(NCBI_COMPILER_GCC) || defined(NCBI_COMPILER_ANY_CLANG)
712 memset(old_socks, 0,
sizeof(old_socks));
713 memset(old_active, 0,
sizeof(old_active));
720 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
726 if (active >= limit_time)
731 Uint1 low = 0, high = cnt_old;
733 Uint1 mid = (high + low) / 2;
734 if (old_active[mid] > active)
744 memmove(&old_socks[low + 1], &old_socks[low],
745 (cnt_old - low) *
sizeof(old_socks[0]));
746 memmove(&old_active[low + 1], &old_active[low],
747 (cnt_old - low) *
sizeof(old_active[0]));
749 old_socks[low] = task;
750 old_active[low] = active;
754 for (
Uint1 i = 0;
i < cnt_old; ++
i) {
788 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
810 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
831 #if NC_SOCKLIST_USE_TYPE == NC_SOCKLIST_USE_STD_LIST
832 (*it)->SetRunnable();
859 if (x_errno == EINTR)
864 if (x_errno == EWOULDBLOCK) {
867 if (x_errno == EAGAIN) {
881 return size_t(n_read);
897 if (n_written == -1) {
899 if (x_errno == EINTR)
901 if (x_errno == EAGAIN || x_errno == EWOULDBLOCK) {
915 return size_t(n_written);
937 if (c ==
'\n' || c ==
'\0') {
999 goto finish_with_error;
1005 if (to_write >
size)
1014 goto finish_with_error;
1015 if (n_done < to_write) {
1029 goto finish_with_error;
1048 goto finish_with_error;
1051 if (n_done > to_read)
1063 goto finish_with_error;
1078 goto finish_with_error;
1104 goto unlock_and_exit;
1109 sock_info->
index = idx;
1110 sock_info->
port = port;
1125 #if __NC_TASKS_MONITOR
1126 m_TaskName =
"CSrvListener";
1138 for (
Uint1 i = 0;
i < cnt_listen; ++
i) {
1146 if (sock_info.
fd != -1)
1165 m_ProxyHadError(
false),
1166 m_SockHasRead(
false),
1167 m_SockCanWrite(
false),
1168 m_SockCanReadMore(
true),
1169 m_NeedToClose(
false),
1170 m_NeedToFlush(
false),
1175 m_RegReadHup(
false),
1177 m_ErrorPrinted(
false)
1182 #if __NC_TASKS_MONITOR
1183 m_TaskName =
"CSrvSocketTask";
1227 if (c ==
'\n' || c ==
'\r' || c ==
'\0')
1240 if (
m_RdBuf[crlf_pos] ==
'\r') {
1256 buf = (
char*)
buf + copied;
1276 if (has_size == 0) {
1304 buf = (
const char*)
buf + to_copy;
1307 return to_copy + n_written;
1433 #ifdef NCBI_OS_LINUX
1434 int sock = socket(AF_INET, SOCK_STREAM, 0);
1440 goto close_and_error;
1442 struct sockaddr_in
addr;
1444 addr.sin_family = AF_INET;
1445 addr.sin_addr.s_addr = host;
1446 addr.sin_port = htons(port);
1449 res = connect(sock, (
struct sockaddr*)&
addr,
sizeof(
addr));
1451 int x_errno = errno;
1452 if (x_errno == EINTR)
1454 if (x_errno != EINPROGRESS) {
1456 goto close_and_error;
1480 #ifdef NCBI_OS_LINUX
1481 struct epoll_event evt;
1498 #ifdef NCBI_OS_LINUX
1499 struct sockaddr_in
addr;
1502 if (getsockname(
m_Fd, (
struct sockaddr*)&
addr, &
len) == 0)
1503 return ntohs(
addr.sin_port);
void MarkTaskTerminated(CSrvTask *task, bool immediate)
Mutex created to have minimum possible size (its size is 4 bytes) and to sleep using kernel capabilit...
void Unlock(void)
Unlock the mutex.
void Lock(void)
Lock the mutex.
Class used in all diagnostic logging.
const CSrvDiagMsg & StartSrvLog(ESeverity sev, const char *file, int line, const char *func) const
Starts log message which will include severity, filename, line number and function name.
CSrvDiagMsg & StartRequest(void)
Starts "request-start" message.
CSrvDiagMsg & PrintParam(CTempString name, CTempString value)
Adds parameter to "request-start" or "extra" message.
void StopRequest(void)
Prints "request-stop" message.
static bool IsSeverityVisible(ESeverity sev)
Checks if given severity level is visible, i.e.
ESeverity
Severity levels for logging.
virtual ~CSrvListener(void)
Uint4 m_SeenErrors[kMaxCntListeningSocks]
Per-listening-socket numbers copied from s_ListenErrors when errors are processed.
virtual void ExecuteSlice(TSrvThreadNum thread_idx)
This is the main method to do all work this task should do.
Uint4 m_SeenEvents[kMaxCntListeningSocks]
Per-listening-socket numbers copied from s_ListenEvents when events are processed.
Factory that creates CSrvSocketTask-derived object for each connection coming to listening port which...
virtual CSrvSocketTask * CreateSocketTask(void)=0
virtual ~CSrvSocketFactory(void)
Task controlling a socket.
Uint2 m_WrPos
Position of current writing pointer in the write buffer.
Uint8 m_ConnStartJfy
Jiffy number when Connect() method was called.
bool Connect(Uint4 host, Uint2 port)
Create new socket and connect it to given IP and port.
void x_CloseSocket(bool do_abort)
Close or abort the socket – they have little difference, thus they joined in one method.
bool m_RegReadHup
Flag showing if epoll returned RDHUP on this socket.
CSrvSocketTask * m_ProxySrc
Source task for proxying.
Uint1 m_SeenWriteEvts
Number of last write event seen by Write() when it wrote to socket.
bool HasError(void)
Checks if socket has some error in it.
bool m_CRMet
Flag showing if '\r' symbol was seen at the end of last line but ' ' wasn't seen yet.
Uint8 m_ReadBytes
Total number of bytes read from socket.
int m_Fd
File descriptor for the socket.
size_t Write(const void *buf, size_t size)
Write into the socket as much as immediately possible (including writing into internal write buffers ...
Uint2 m_PeerPort
Remembered peer port.
CSrvSocketTask & WriteText(CTempString message)
Write text into socket.
CSrvSocketTask & WriteNumber(NumType num)
Write number into socket as string, i.e.
void GetPeerAddress(string &host, Uint2 &port)
Get peer IP and port for this socket.
void CloseSocket(void)
Close the socket gracefully, i.e.
Uint2 m_WrMemSize
Size of memory allocated for write buffer.
Uint8 m_ProxySize
Amount left to proxy if proxying operation is in progress.
bool m_NeedToFlush
Flag showing that task needs to flush all write buffers.
virtual void InternalRunSlice(TSrvThreadNum thr_num)
Internal function to execute time slice work.
virtual void Terminate(void)
Terminate the task.
bool m_NeedToClose
Flag showing that socket needs to be closed because of long inactivity.
char * m_RdBuf
Read buffer.
Uint4 m_PeerAddr
Remembered peer IP address.
bool m_ErrorPrinted
Flag showing if pending error in socket was printed in logs.
bool m_ProxyHadError
Flag showing that last proxying operation finished with error.
bool m_SockCanReadMore
Flag showing that socket can have more reads, i.e. there was no EOF yet.
bool m_FlushIsDone
Flag showing that write buffers were flushed.
bool m_RegError
Flag showing if there's error pending on the socket.
CSrvSocketTask * m_ProxyDst
Destination task for proxying.
bool ReadToBuf(void)
Read from socket into internal buffer.
Uint2 m_RdPos
Position of current reading in the read buffer, i.e.
void Flush(void)
Flush all data saved in internal write buffers to socket.
size_t Read(void *buf, size_t size)
Read from socket into memory.
char * m_WrBuf
Write buffer.
virtual ~CSrvSocketTask(void)
Uint2 GetLocalPort(void)
Get local port this socket was created on.
void WriteData(const void *buf, size_t size)
Write the exact amount of data into the socket.
Uint1 m_SeenReadEvts
Number of last read event seen by Read() when it read from socket.
bool IsWriteDataPending(void)
Checks if there's some data pending in write buffers and waiting to be sent to kernel.
Uint2 m_WrSize
Size of data in the write buffer waiting for writing.
Uint8 m_WrittenBytes
Total number of bytes written to socket.
Uint2 m_RdSize
Size of data available for reading in the read buffer.
bool m_SockCanWrite
Flag showing that socket is writable.
bool StartProcessing(TSrvThreadNum thread_num=0, bool boost=false)
Start processing of the socket and include it into TaskServer's central epoll.
bool ReadLine(CTempString *line)
Read from socket one line which ends with ' ', '\r ' or '\0'.
bool NeedEarlyClose(void)
Checks if socket should be closed because of internal reasons (long inactivity or "hard" shutdown as ...
void x_PrintError(void)
Prints socket's error if there's any error pending on the socket.
void StartProxyTo(CSrvSocketTask *dst_task, Uint8 proxy_size)
Start proxying of raw data from this socket to the one in dst_task.
Uint1 m_RegWriteEvts
Counter of "writable" events received from epoll.
bool m_SockHasRead
Flag showing that socket is readable.
Uint1 m_RegReadEvts
Counter of "readable" events received from epoll.
void SockOpenActive(void)
void SockClose(int status, Uint8 open_time)
void SockOpenPassive(void)
Main working entity in TaskServer.
TSrvTaskFlags m_TaskFlags
Bit-OR of flags for this task.
int m_LastActive
Time (in seconds) when the task was active last time, i.e.
CRequestContext * GetDiagCtx(void)
Get current diagnostic context for the task.
TSrvThreadNum m_LastThread
Thread number where this task was executed last time.
virtual void ExecuteSlice(TSrvThreadNum thr_num)=0
This is the main method to do all work this task should do.
void SetRunnable(bool boost=false)
Set this task "runnable", i.e.
void ReleaseDiagCtx(void)
Releases current diagnostic context of the task.
void CreateNewDiagCtx(void)
Create new diagnostic context for this task to work in.
Class incorporating convenient methods to work with struct timespec.
static int CurSecs(void)
Current time in seconds since epoch (time_t).
static CSrvTime Current(void)
Exact current time with precision up to nanoseconds.
Uint8 AsUSec(void) const
Converts object's value to microseconds since epoch.
long & NSec(void)
Read/set number of nanoseconds stored in the object.
static const string & GetHostName(void)
Returns name of server this application is executing on.
static string GetHostByIP(Uint4 ip)
Converts 4-byte encoded IP address into server name.
static bool AddListeningPort(Uint2 port, CSrvSocketFactory *factory)
Adds port for TaskServer to listen to.
static bool IsRunning(void)
Checks if TaskServer is running now, i.e.
static string IPToString(Uint4 ip)
Converts 4-byte encoded IP address into its string representation.
static bool IsInShutdown(void)
Checks if TaskServer received request to shutdown.
static Uint4 GetIPByHost(const string &host)
Converts server name (or IP address written as string) to encoded 4-byte IP address.
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
#define getnameinfo(a, b, c, d, e, f, g)
#define getaddrinfo(n, s, h, r)
#define ERASE_ITERATE(Type, Var, Cont)
Non-constant version with ability to erase current element, if container permits.
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
void SetClientIP(const string &client)
TCount GetRequestID(void) const
Get request ID (or zero if not set).
void Critical(CExceptionArgs_Base &args)
void Error(CExceptionArgs_Base &args)
void Warning(CExceptionArgs_Base &args)
uint8_t Uint1
1-byte (8-bit) unsigned integer
int16_t Int2
2-byte (16-bit) signed integer
uint32_t Uint4
4-byte (32-bit) unsigned integer
uint16_t Uint2
2-byte (16-bit) unsigned integer
uint64_t Uint8
8-byte (64-bit) unsigned integer
virtual int GetInt(const string §ion, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
CTempString & assign(const char *src_str, size_type len)
Assign new values to the content of the a string.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
static string UInt8ToString(Uint8 value, TNumToStringFlags flags=0, int base=10)
Convert UInt8 to string.
if(yy_accept[yy_current_state])
constexpr bool empty(list< Ts... >) noexcept
const struct ncbi::grid::netcache::search::fields::SIZE size
const GenericPointer< typename T::ValueType > T2 value
static size_t x_size(const char *dst, size_t len, const char *ptr)
Process information in the NCBI Registry, including working with configuration files.
Defines CRequestContext class for NCBI C++ diagnostic API.
static SLJIT_INLINE sljit_ins lgr(sljit_gpr dst, sljit_gpr src)
static void s_FlushData(CSrvSocketTask *task)
void CheckConnectsTimeout(SSocketsData *socks)
intr::list< CSrvSocketTask, intr::base_hook< TSrvSockListHook >, intr::constant_time_size< false > > TSockList
static void s_CloseSocket(int fd, bool do_abort)
bool ReConfig_Sockets(const CTempString §ion, const CNcbiRegistry &new_reg, string &)
void ReleaseThreadSocks(SSrvThread *thr)
void RequestStopListening(void)
bool InitSocketsMan(void)
static void s_CopyData(CSrvSocketTask *task, const void *buf, Uint2 size)
static void s_ProcessListenError(Uint1 sock_idx)
static size_t s_ReadFromSocket(CSrvSocketTask *task, void *buf, size_t size)
bool StartSocketsMan(void)
static bool s_StartListening(void)
static void s_LogSocketError(CSrvDiagMsg::ESeverity severity, int fd, const char *prefix, const char *file, int line, const char *func)
static void s_CompactWrBuffer(CSrvSocketTask *task)
static const Uint1 kEpollEventsArraySize
static void s_CreateDiagRequest(CSrvSocketTask *task, Uint2 port, Uint4 phost, Uint2 pport)
void s_DeleteOldestSockets(TSockList &lst)
static void s_DoDataProxy(CSrvSocketTask *src)
#define LOG_WITH_ERRNO(sev, msg, x_errno)
static size_t s_WriteNoPending(CSrvSocketTask *task, const void *buf, size_t size)
static void s_RegisterClientEvent(CSrvSocketTask *task, Uint4 event)
void MoveAllSockets(SSocketsData *dst_socks, SSocketsData *src_socks)
void ConfigureSockets(const CNcbiRegistry *reg, CTempString section)
void AssignThreadSocks(SSrvThread *thr)
static const Uint2 kSockMinWriteSize
#define LOG_SOCK_ERROR(sev, fd, prefix)
static Uint4 s_ListenEvents[kMaxCntListeningSocks]
static Uint1 s_CntListeningSocks
static const Uint2 kSockReadBufSize
1000 below is chosen to be a little bit less than maximum packet size in Ethernet (~1500 bytes).
static const Uint2 kSockWriteBufSize
In calculations in the file it's assumed that kSockWriteBufSize is at least twice as large as kSockMi...
void CleanSocketList(SSocketsData *socks)
static Uint1 s_OldSocksDelBatch
static CSrvListener s_Listener
static SListenSockInfo s_ListenSocks[kMaxCntListeningSocks]
static Uint8 s_ConnTimeout
static Uint8 s_AcceptDelay
static Uint2 s_ReadFromBuffer(CSrvSocketTask *task, void *dest, size_t size)
static void s_LogWithErrStr(CSrvDiagMsg::ESeverity severity, const char *err_msg, int x_errno, const char *errno_str, const char *file, int line, const char *func)
static void s_LogWithErrno(CSrvDiagMsg::ESeverity severity, const char *err_msg, int x_errno, const char *file, int line, const char *func)
static void s_SaveSocket(CSrvSocketTask *task)
void WriteSetup_Sockets(CSrvSocketTask &task)
static bool s_SetSocketOptions(int sock)
static void s_SetSocketQuickAck(int sock)
static void s_CleanSockResources(CSrvSocketTask *task)
static void s_CompactBuffer(char *buf, Uint2 &size, Uint2 &pos)
static const Uint1 kMaxCntListeningSocks
16 Uint4s on x86_64 is the size of CPU's cacheline.
static CMiniMutex s_ListenSocksLock
static void s_ReadLF(CSrvSocketTask *task)
void SetAllSocksRunnable(SSocketsData *socks)
void PromoteSockAmount(SSocketsData *socks)
static bool s_CreateListeningSocket(Uint1 idx)
static void s_RegisterListenEvent(SListenSockInfo *sock_info, Uint4 event)
void FinalizeSocketsMan(void)
static void s_ProcessListenEvent(Uint1 sock_idx, TSrvThreadNum thread_num)
static Uint4 s_ListenErrors[kMaxCntListeningSocks]
static size_t s_WriteToSocket(CSrvSocketTask *task, const void *buf, size_t size)
static int s_SocketTimeout
static bool s_SetSocketNonBlock(int sock)
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
intr::list_base_hook< intr::tag< SSrvSockList_tag > > TSrvSockListHook
T AtomicAdd(T volatile &var, T add_value)
#define ACCESS_ONCE(x)
Purpose of this macro is to force compiler to access variable exactly at the place it's written (no m...
T AtomicSub(T volatile &var, T sub_value)
CSrvSocketFactory * factory
Factory that will create CSrvSocketTask for each incoming socket.
Uint2 port
Port to listen to.
int fd
File descriptor for the listening socket.
Uint1 index
Index in the s_ListenSocks array.
Per-thread structure containing information about sockets.
Int2 sock_cnt
"Number of sockets" that this thread created/deleted.
TSockList sock_list
List of all open and not yet deleted sockets which were opened in this thread.
For TaskServer's internal use only.
Uint2 TSrvThreadNum
Type for thread number in TaskServer.
CRef< CTestThread > thr[k_NumThreadsMax]
SSrvThread * GetCurThread(void)