NCBI C++ ToolKit
threads_man.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: threads_man.cpp 76997 2017-03-17 13:19:30Z gouriano $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Pavel Ivanov
27  *
28  */
29 
30 #include "task_server_pch.hpp"
31 
32 #include <corelib/ncbireg.hpp>
33 
34 #include "threads_man.hpp"
35 #include "timers.hpp"
36 #include "sockets_man.hpp"
37 #include "memory_man.hpp"
38 #include "logging.hpp"
39 #include "time_man.hpp"
40 #include "server_core.hpp"
41 #include "rcu.hpp"
42 #include "scheduler.hpp"
43 #include "srv_stat.hpp"
44 
45 #ifdef NCBI_OS_LINUX
46 # include <sys/prctl.h>
47 #endif
48 
49 
51 
52 
53 // We should reserve space for main thread and service thread, so that total
54 // amount doesn't overflow.
57 
58 
65 
66 #ifdef NCBI_OS_LINUX
67 static pthread_key_t s_CurThreadKey;
68 #endif
70 
72 
73 
74 extern Uint8 s_CurJiffies;
75 extern CSrvTime s_JiffyTime;
76 extern string s_AppBaseName;
77 
78 
79 
80 
83 {
84  SSrvThread* thr = NULL;
85 #ifdef NCBI_OS_LINUX
86  thr = (SSrvThread*)pthread_getspecific(s_CurThreadKey);
87 #endif
88  return thr;
89 }
90 
93 {
95  if (thr->thread_num == 0 || thr->thread_num == s_MaxRunningThreads + 1) {
96  SRV_FATAL("Unexpected ThreadNum: " << thr->thread_num);
97  }
98  return thr->thread_num - 1;
99 }
100 
103 {
104  TSrvThreadNum result = 0;
105  for (TSrvThreadNum i = 1; i <= s_MaxRunningThreads; ++i, ++result) {
106  if (!IsThreadRunning(s_Threads[i]))
107  break;
108  }
109  return result;
110 }
111 
112 static void
114 {
115 #ifdef NCBI_OS_LINUX
116  pthread_setspecific(s_CurThreadKey, thr);
117  char buf[20];
118  if (thr->thread_state != eThreadDormant)
119  snprintf(buf, 20, "%s_%d", s_AppBaseName.c_str(), thr->thread_num);
120  else if (thr->thread_num == 0)
121  snprintf(buf, 20, "%s", s_AppBaseName.c_str());
122  else
123  snprintf(buf, 20, "%s_S", s_AppBaseName.c_str());
124  prctl(PR_SET_NAME, (unsigned long)buf, 0, 0, 0);
125 #endif
126 }
127 
128 static void
130 {
131  thr->thread_state = eThreadRunning;
134  thr->stat->ThreadStarted();
135  s_ThrMgrLock.Lock();
136  if (s_CurMgrThread != thr) {
137  SRV_FATAL("Invalid SrvThread state");
138  }
142  }
143 }
144 
145 static void
147 {
149  RCUPassQS(thr->rcu);
150  return;
151  }
152 
153  if (thr->seen_jiffy == s_CurJiffies)
154  return;
155  thr->seen_jiffy = s_CurJiffies;
156  RCUPassQS(thr->rcu);
157 
158  if (thr->seen_secs == CSrvTime::CurSecs())
159  return;
160  thr->seen_secs = CSrvTime::CurSecs();
162 }
163 
164 static void
166 {
168  RCUPassQS(thr->rcu);
169  return;
170  }
171 
172  if (thr->seen_jiffy == s_CurJiffies)
173  return;
174  thr->seen_jiffy = s_CurJiffies;
175  RCUPassQS(thr->rcu);
177 
178  if (thr->seen_secs == CSrvTime::CurSecs())
179  return;
180  thr->seen_secs = CSrvTime::CurSecs();
182  TimerTick();
183 }
184 
185 static void
187 {
189  if (thr->seen_srv_state != s_SrvState) {
190  thr->seen_srv_state = s_SrvState;
191  SetAllSocksRunnable(thr->socks);
192  }
193  RCUPassQS(thr->rcu);
194  PromoteSockAmount(thr->socks);
195  CheckConnectsTimeout(thr->socks);
196  if (IsServerStopping() && !RCUHasCalls(thr->rcu))
197  thr->thread_state = eThreadStopped;
198  return;
199  }
200 
201  if (thr->seen_jiffy == s_CurJiffies)
202  return;
203  thr->seen_jiffy = s_CurJiffies;
204  RCUPassQS(thr->rcu);
206  PromoteSockAmount(thr->socks);
207  CheckConnectsTimeout(thr->socks);
208 
209  if (thr->seen_secs == CSrvTime::CurSecs())
210  return;
211  thr->seen_secs = CSrvTime::CurSecs();
213  if (thr->thread_state != eThreadLockedForStop) {
214  CleanSocketList(thr->socks);
215  if (thr->thread_num == 1 && s_ThreadMgrState == eThrMgrThreadExited) {
217  s_ThrMgrLock.Lock();
220  }
221  }
222 }
223 
224 static SSrvThread*
226 {
227  SSrvThread* new_thr = new SSrvThread();
228  new_thr->thread_num = thread_num;
229  new_thr->stat = new CSrvStat();
230  AssignThreadMemMgr(new_thr);
231  AssignThreadLogging(new_thr);
232  AssignThreadSched(new_thr);
233  AssignThreadSocks(new_thr);
234  s_Threads[thread_num] = new_thr;
235  return new_thr;
236 }
237 
238 static void*
240 {
244 
245  while (thr->thread_state != eThreadStopped) {
248  }
249 
251 
252  if (!IsServerStopping()) {
253  s_ThrMgrLock.Lock();
255  SRV_FATAL("Invalid WorkerThread state");
256  }
259  }
260  return NULL;
261 }
262 
263 static bool
264 s_StartThread(SSrvThread* thr, void* (*thr_func)(void*))
265 {
266 #ifdef NCBI_OS_LINUX
267  int res = pthread_create(&thr->thread_handle, NULL, thr_func, (void*)thr);
268  if (res != 0) {
269  SRV_LOG(Critical, "Unable to create new thread, result=" << res);
270  return false;
271  }
272 #endif
273  return true;
274 }
275 
276 static void
278 {
279 #ifdef NCBI_OS_LINUX
280  int res = pthread_join(s_CurMgrThread->thread_handle, NULL);
281  if (res != 0) {
282  SRV_LOG(Critical, "Cannot join the thread, res=" << res);
283  }
284 #endif
285 
291 
292  s_ThrMgrLock.Lock();
297 }
298 
299 static void
301 {
302  s_ThrMgrLock.Lock();
307  s_ThrMgrLock.Lock();
312  }
313 }
314 
315 static void*
317 {
320 
321  CSrvTime next_jfy_time = CSrvTime::Current();
322  next_jfy_time += s_JiffyTime;
323  while (!IsServerStopping() || RCUHasCalls(s_SvcThr->rcu)) {
325 
330 
333 
334  CSrvTime cur_time = CSrvTime::Current();
335  if (next_jfy_time > cur_time) {
336  CSrvTime wait_time = next_jfy_time;
337  wait_time -= cur_time;
338  s_SvcSignal.WaitValueChange(0, wait_time);
339  }
340  IncCurJiffies();
341 
342  next_jfy_time = s_LastJiffyTime;
343  next_jfy_time += s_JiffyTime;
344  }
345 
347  return NULL;
348 }
349 
350 void
352 {
354  return;
355 
356  s_ThrMgrLock.Lock();
357  if (s_ThreadMgrState == eThrMgrIdle && thr->thread_state == eThreadReleased) {
360  thr->thread_state = eThreadStarting;
361  }
363 }
364 
365 void
367 {
369  return;
370 
371  s_ThrMgrLock.Lock();
372  if (s_ThreadMgrState == eThrMgrIdle && thr->thread_state == eThreadRunning) {
375  thr->thread_state = eThreadLockedForStop;
376  thr->CallRCU();
377  }
379 }
380 
381 void
383 {
384  s_ThrMgrLock.Lock();
385  if (thr->thread_state != eThreadLockedForStop ||
387  SRV_FATAL("Invalid thread state");
388  }
389  thr->thread_state = eThreadRevived;
393 }
394 
395 static bool
397 {
399 
401  return false;
402  for (TSrvThreadNum i = 1; i <= s_MaxRunningThreads; ++i) {
404  for (TSrvThreadNum j = i; j <= s_MaxRunningThreads; ++j) {
406  }
407  break;
408  }
409  }
410  return true;
411 }
412 
413 static void
415 {
417 
418 #ifdef NCBI_OS_LINUX
419  for (TSrvThreadNum i = 1; i <= s_MaxRunningThreads + 1; ++i) {
421  if (thr->thread_state < eThreadReleased) {
422  pthread_join(thr->thread_handle, NULL);
424  }
425  }
427 #endif
428 }
429 
430 void
432 {
433 #ifdef NCBI_OS_LINUX
434  int res = pthread_key_create(&s_CurThreadKey, NULL);
435  if (res) {
436  printf("terminating after pthread_key_create returned error %d\n", res);
437  SRV_FATAL("pthread_key_create returned error: " << res);
438  }
439 #endif
440 }
441 
442 void
444 {
445  s_MaxRunningThreads = TSrvThreadNum(reg->GetInt(section, "max_threads", 20));
448 }
449 
450 bool ReConfig_Threads(const CTempString&, const CNcbiRegistry&, string&)
451 {
452  return true;
453 }
454 
456 {
457  string is("\": "), eol(",\n\"");
458  task.WriteText(eol).WriteText("max_threads").WriteText(is ).WriteNumber( s_MaxRunningThreads);
459 }
460 
461 bool
463 {
465  for (TSrvThreadNum i = 0; i <= s_MaxRunningThreads + 1; ++i) {
466  s_AllocThread(i);
467  }
468 
469  s_MainThr = s_Threads[0];
471 
476 
477  return true;
478 }
479 
480 void
482 {
483  if (!s_StartAllThreads())
484  return;
485  if (!IsThreadRunning(s_Threads[1]))
487 
488  while (!IsServerStopping()) {
490  DoSocketWait();
491  }
492 
494 }
495 
496 void
498 {}
499 
500 
501 
503  : seen_jiffy(0),
504  seen_secs(0),
505  thread_state(eThreadStarting),
506  seen_srv_state(eSrvRunning),
507  cur_task(NULL),
508  mm_pool(NULL),
509  sched(NULL),
510  log_data(NULL),
511  rcu(NULL),
512  socks(NULL)
513 {}
514 
516 {}
517 
518 void
520 {
521  switch (thread_state) {
524  break;
525  case eThreadRevived:
527  break;
528  default:
529  SRV_FATAL("Unexpected thread state: " << thread_state);
530  }
531 }
532 
void SchedStartJiffy(SSrvThread *thr)
Definition: scheduler.cpp:389
void ReleaseThreadSched(SSrvThread *thr)
Definition: scheduler.cpp:602
void AssignThreadSched(SSrvThread *thr)
Definition: scheduler.cpp:593
void SchedCheckOverloads(void)
Definition: scheduler.cpp:351
void SchedExecuteTask(SSrvThread *thr)
Definition: scheduler.cpp:521
Wrapper around Linux's futex.
Definition: srv_sync.hpp:141
EWaitResult WaitValueChange(int old_value)
Wait for futex's value to change (with and without timeout).
Definition: srv_sync.cpp:44
Mutex created to have minimum possible size (its size is 4 bytes) and to sleep using kernel capabilit...
Definition: srv_sync.hpp:193
void Unlock(void)
Unlock the mutex.
Definition: srv_sync.cpp:119
void Lock(void)
Lock the mutex.
Definition: srv_sync.cpp:108
CNcbiRegistry –.
Definition: ncbireg.hpp:913
Task controlling a socket.
CSrvSocketTask & WriteText(CTempString message)
Write text into socket.
CSrvSocketTask & WriteNumber(NumType num)
Write number into socket as string, i.e.
void ThreadStopped(void)
Definition: srv_stat.cpp:201
Class incorporating convenient methods to work with struct timespec.
Definition: srv_time.hpp:61
static int CurSecs(void)
Current time in seconds since epoch (time_t).
static CSrvTime Current(void)
Exact current time with precision up to nanoseconds.
static void RequestShutdown(ESrvShutdownType shutdown_type)
Asks server to start shutdown procedures.
static TSrvThreadNum GetCurThreadNum(void)
Returns number of current worker thread. Number is 0-based.
Definition: threads_man.cpp:92
static bool IsInShutdown(void)
Checks if TaskServer received request to shutdown.
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
char data[12]
Definition: iconv.c:80
#define NULL
Definition: ncbistd.hpp:225
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1203
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
virtual int GetInt(const string &section, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
Definition: ncbireg.cpp:362
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
char * buf
int i
void AssignThreadLogging(SSrvThread *thr)
Definition: logging.cpp:606
void CheckLoggingFlush(SSrvThread *thr)
Definition: logging.cpp:619
void LogNoteThreadsStarted(void)
Definition: logging.cpp:517
void ReleaseThreadLogging(SSrvThread *thr)
Definition: logging.cpp:653
void StartThreadLogging(SSrvThread *thr)
Definition: logging.cpp:635
void StopThreadLogging(SSrvThread *thr)
Definition: logging.cpp:641
void AssignThreadMemMgr(SSrvThread *thr)
void ReleaseThreadMemMgr(SSrvThread *thr)
Process information in the NCBI Registry, including working with configuration files.
T max(T x_, T y_)
bool RCUHasCalls(SRCUInfo *rcu)
Definition: rcu.cpp:137
void RCUFinalizeThread(SSrvThread *thr)
Definition: rcu.cpp:160
void RCUInitNewThread(SSrvThread *thr)
Definition: rcu.cpp:143
void RCUPassQS(SRCUInfo *rcu)
Definition: rcu.cpp:122
void TrackShuttingDown(void)
EServerState s_SrvState
Definition: server_core.cpp:68
void CheckConnectsTimeout(SSocketsData *socks)
void ReleaseThreadSocks(SSrvThread *thr)
void MoveAllSockets(SSocketsData *dst_socks, SSocketsData *src_socks)
void AssignThreadSocks(SSrvThread *thr)
void CleanSocketList(SSocketsData *socks)
void DoSocketWait(void)
void SetAllSocksRunnable(SSocketsData *socks)
void PromoteSockAmount(SSocketsData *socks)
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
Definition: srv_diag.hpp:162
#define SRV_FATAL(msg)
Definition: srv_diag.hpp:173
@ eSrvRunning
Definition: srv_inlines.hpp:50
bool IsServerStopping(void)
CSrvTime s_LastJiffyTime
Definition: time_man.cpp:52
SSocketsData * socks
Definition: threads_man.hpp:92
SSrvThread(void)
EThreadState thread_state
Definition: threads_man.hpp:82
CSrvStat * stat
Definition: threads_man.hpp:93
virtual void ExecuteRCU(void)
Method implementing RCU job that was scheduled earlier by CallRCU().
virtual ~SSrvThread(void)
SRCUInfo * rcu
Definition: threads_man.hpp:91
TSrvThreadNum thread_num
Definition: threads_man.hpp:79
Uint2 TSrvThreadNum
Type for thread number in TaskServer.
Definition: task_server.hpp:42
@ eSrvFastShutdown
CRef< CTestThread > thr[k_NumThreadsMax]
Definition: test_mt.cpp:267
static CMiniMutex s_ThrMgrLock
Definition: threads_man.cpp:60
TSrvThreadNum GetCntRunningThreads(void)
static void * s_WorkerThreadMain(void *data)
static void s_RegisterNewThread(SSrvThread *thr)
string s_AppBaseName
Definition: server_core.cpp:77
static void s_PerJiffyTasks_Main(SSrvThread *thr)
void RequestThreadStart(SSrvThread *thr)
static void s_StartCurMgrThread(void)
static void s_StopCurMgrThread(void)
static SSrvThread * s_SvcThr
Definition: threads_man.cpp:64
static void s_PerJiffyTasks_Worker(SSrvThread *thr)
void RunMainThread(void)
static const TSrvThreadNum kMaxNumberOfThreads
Definition: threads_man.cpp:56
void RequestThreadStop(SSrvThread *thr)
SSrvThread * GetCurThread(void)
Definition: threads_man.cpp:82
SSrvThread ** s_Threads
Definition: threads_man.cpp:59
static SSrvThread * s_MainThr
Definition: threads_man.cpp:63
void ConfigureThreads(const CNcbiRegistry *reg, CTempString section)
Uint8 s_CurJiffies
Definition: time_man.cpp:53
void InitCurThreadStorage(void)
static EThreadMgrState s_ThreadMgrState
Definition: threads_man.cpp:61
bool InitThreadsMan(void)
bool ReConfig_Threads(const CTempString &, const CNcbiRegistry &, string &)
TSrvThreadNum s_MaxRunningThreads
Definition: threads_man.cpp:71
void RequestThreadRevive(SSrvThread *thr)
static bool s_StartThread(SSrvThread *thr, void *(*thr_func)(void *))
void WriteSetup_Threads(CSrvSocketTask &task)
void FinalizeThreadsMan(void)
static void s_PerJiffyTasks_Service(SSrvThread *thr)
static void s_SetCurThread(SSrvThread *thr)
static void s_JoinAllThreads(void)
static bool s_StartAllThreads(void)
static SSrvThread * s_CurMgrThread
Definition: threads_man.cpp:62
CSrvTime s_JiffyTime
Definition: time_man.cpp:54
static CFutex s_SvcSignal
Definition: threads_man.cpp:69
static void * s_ServiceThreadMain(void *)
static SSrvThread * s_AllocThread(TSrvThreadNum thread_num)
bool IsThreadRunning(SSrvThread *thr)
@ eThreadStopped
Definition: threads_man.hpp:72
@ eThreadStarting
Definition: threads_man.hpp:68
@ eThreadLockedForStop
Definition: threads_man.hpp:71
@ eThreadRevived
Definition: threads_man.hpp:70
@ eThreadReleased
Definition: threads_man.hpp:73
@ eThreadDormant
Definition: threads_man.hpp:74
@ eThreadRunning
Definition: threads_man.hpp:69
EThreadMgrState
@ eThrMgrIdle
@ eThrMgrThreadExited
@ eThrMgrStarting
@ eThrMgrSocksMoved
@ eThrMgrNeedNewThread
@ eThrMgrPreparesToStop
void IncCurJiffies(void)
Definition: time_man.cpp:110
void TimerTick(void)
Definition: timers.cpp:177
else result
Definition: token2.c:20
voidp calloc(uInt items, uInt size)
Modified on Sat Jun 15 11:54:12 2024 by modify_doxy.py rev. 669887