NCBI C++ ToolKit
grid_worker.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: grid_worker.cpp 100701 2023-08-31 19:21:12Z lavr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Maxim Didenko, Anatoliy Kuznetsov, Dmitry Kazimirov
27  *
28  * File Description:
29  * NetSchedule Worker Node implementation
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "grid_worker_impl.hpp"
35 #include "grid_control_thread.hpp"
36 #include "netcache_api_impl.hpp"
37 
39 
40 #ifdef NCBI_OS_UNIX
41 #include <sys/types.h>
42 #include <signal.h>
43 #include <unistd.h>
44 #endif
45 
46 
47 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
48 
49 #define DEFAULT_NS_TIMEOUT 30
50 
52 
53 /////////////////////////////////////////////////////////////////////////////
54 //
56 {
57 }
58 
59 /////////////////////////////////////////////////////////////////////////////
60 //
63 
65 {
66  // There are two implementations of x_RunJob(): one is for
67  // the real worker node, and the other is for the offline run.
68  // Do not add any implementation-specific code here.
70 }
71 
72 /////////////////////////////////////////////////////////////////////////////
73 //
74 // CGridControlThread
75 /// @internal
77 {
78 public:
80  unsigned short start_port, unsigned short end_port) : m_Control(
81  worker_node, start_port, end_port),
82  m_ThreadName(worker_node->GetAppName() + "_ct")
83  {
84  }
85 
87  unsigned short GetControlPort() const { return m_Control.GetControlPort(); }
89 
90 protected:
91  virtual void* Main(void)
92  {
94  m_Control.Run();
95  return NULL;
96  }
97  virtual void OnExit(void)
98  {
102  LOG_POST_X(46, Info << "Control Thread has been stopped.");
103  }
104 
105 private:
107  const string m_ThreadName;
108 };
109 
110 /////////////////////////////////////////////////////////////////////////////
111 //
112 // CWorkerNodeIdleThread --
114 {
115 public:
117  SGridWorkerNodeImpl* worker_node,
118  unsigned run_delay,
119  unsigned int auto_shutdown);
120 
122  {
123  m_ShutdownFlag = true;
124  m_Wait1.Post();
125  m_Wait2.Post();
126  }
127  void Schedule()
128  {
129  CFastMutexGuard guard(m_Mutex);
131  if (m_StopFlag) {
132  m_StopFlag = false;
133  m_Wait1.Post();
134  }
135  }
136  void Suspend()
137  {
138  CFastMutexGuard guard(m_Mutex);
141  if (!m_StopFlag) {
142  m_StopFlag = true;
143  m_Wait2.Post();
144  }
145  }
146 
147  bool IsShutdownRequested() const { return m_ShutdownFlag; }
148 
149 
150 protected:
151  virtual void* Main(void);
152  virtual void OnExit(void);
153 
155 
156 private:
157 
158  unsigned int x_GetInterval() const
159  {
160  CFastMutexGuard guard(m_Mutex);
161  return m_AutoShutdown > 0 ? min(m_AutoShutdown -
162  (unsigned) m_AutoShutdownSW.Elapsed(),
164  }
165  bool x_GetStopFlag() const
166  {
167  CFastMutexGuard guard(m_Mutex);
168  return m_StopFlag;
169  }
170  unsigned x_GetIdleTimeIfShutdown() const
171  {
172  CFastMutexGuard guard(m_Mutex);
173  auto elapsed = static_cast<unsigned>(m_AutoShutdownSW.Elapsed());
174  return (m_AutoShutdown && elapsed >= m_AutoShutdown) ? elapsed : 0;
175  }
176 
182  volatile bool m_StopFlag;
183  volatile bool m_ShutdownFlag;
184  unsigned int m_RunInterval;
185  unsigned int m_AutoShutdown;
188  const string m_ThreadName;
189 
192 };
193 
195  SGridWorkerNodeImpl* worker_node,
196  unsigned run_delay,
197  unsigned int auto_shutdown)
198  : m_Task(task), m_WorkerNode(worker_node),
199  m_TaskContext(*this),
200  m_Wait1(0,100000), m_Wait2(0,1000000),
201  m_StopFlag(false), m_ShutdownFlag(false),
202  m_RunInterval(run_delay),
203  m_AutoShutdown(auto_shutdown), m_AutoShutdownSW(CStopWatch::eStart),
204  m_ThreadName(worker_node->GetAppName() + "_id")
205 {
206 }
208 {
210 
211  while (!m_ShutdownFlag) {
212  if (auto idle = x_GetIdleTimeIfShutdown()) {
213  LOG_POST_X(47, "Has been idle (no jobs to process) over " << idle << " seconds. Exiting.");
216  break;
217  }
218  unsigned interval = m_AutoShutdown > 0 ? min (m_RunInterval,
220  if (m_Wait1.TryWait(interval, 0)) {
221  if (m_ShutdownFlag)
222  continue;
223  interval = x_GetInterval();
224  if (m_Wait2.TryWait(interval, 0)) {
225  continue;
226  }
227  }
228  if (m_Task && !x_GetStopFlag()) {
229  try {
230  do {
231  if (auto idle = x_GetIdleTimeIfShutdown()) {
232  LOG_POST_X(47, "Has been idle (no jobs to process) over " << idle << " seconds. Exiting.");
235  m_ShutdownFlag = true;
236  break;
237  }
238  GetContext().Reset();
239  m_Task->Run(GetContext());
240  } while (GetContext().NeedRunAgain() && !m_ShutdownFlag);
241  } NCBI_CATCH_ALL_X(58,
242  "CWorkerNodeIdleThread::Main: Idle Task failed");
243  }
244  }
245  return 0;
246 }
247 
249 {
250  LOG_POST_X(49, Info << "Idle Thread has been stopped.");
251 }
252 
254 {
255  return m_TaskContext;
256 }
257 
258 /////////////////////////////////////////////////////////////////////////////
259 //
260 // CWorkerNodeIdleTaskContext --
262  CWorkerNodeIdleThread& thread) :
263  m_Thread(thread), m_RunAgain(false)
264 {
265 }
267 {
268  return m_Thread.IsShutdownRequested();
269 }
271 {
272  m_RunAgain = false;
273 }
274 
276 {
280 }
281 
282 /////////////////////////////////////////////////////////////////////////////
283 //
284 // CIdleWatcher
285 /// @internal
287 {
288 public:
290 
291  virtual ~CIdleWatcher() {};
292 
293  virtual void Notify(const CWorkerNodeJobContext&, EEvent event)
294  {
295  if (event == eJobStarted) {
296  m_RunningJobs.Add(1);
297  m_Idle.Suspend();
298  } else if (event == eJobStopped) {
299  if (m_RunningJobs.Add(-1) == 0)
300  m_Idle.Schedule();
301  }
302  }
303 
304 private:
307 };
308 
309 
311 {
312  switch (m_Event.exchange(eNoEvent)) {
313  case eNoEvent:
314  break;
315  case eSuspend:
316  return m_IsSuspended.exchange(true) ? eSuspended : eSuspending;
317  case eResume:
318  m_IsSuspended.store(false);
319  return eRunning;
320  }
321 
322  return m_IsSuspended.load() ? eSuspended : eRunning;
323 }
324 
325 
326 /////////////////////////////////////////////////////////////////////////////
327 //
329 {
331 
332  return *m_Registry;
333 }
334 
336 {
337  return m_App.GetArgs();
338 }
339 
341 {
342  return m_App.GetEnvironment();
343 }
344 
346 {
347  // It should be safe to do the cast here
348  // as SGridWorkerNodeImpl is only used as const IWorkerNodeInitContext in
349  // its own constructor where 'this' is not const
350  return const_cast<SGridWorkerNodeImpl*>(this)->m_CleanupEventSource;
351 }
352 
354 {
355  return m_NetScheduleAPI;
356 }
357 
359 {
360  return m_NetCacheAPI;
361 }
362 
364  IWorkerNodeJobFactory* job_factory) :
365  m_Impl(new SGridWorkerNodeImpl(app, job_factory))
366 {
367 }
368 
370  IWorkerNodeJobFactory* job_factory) :
371  m_JobProcessorFactory(job_factory),
372  m_ThreadPool(NULL),
373  m_MaxThreads(1),
374  m_NSTimeout(DEFAULT_NS_TIMEOUT),
375  m_CommitJobInterval(2),
376  m_CheckStatusPeriod(2),
377  m_ExclusiveJobSemaphore(1, 1),
378  m_IsProcessingExclusiveJob(false),
379  m_TotalMemoryLimit(0),
380  m_TotalTimeLimit(0),
381  m_StartupTime(0),
382  m_CleanupEventSource(new CWorkerNodeCleanup()),
383  m_Listener(new CGridWorkerNodeApp_Listener()),
384  m_App(app),
385  m_SingleThreadForced(false)
386 {
387  if (!m_JobProcessorFactory.get())
389  eJobFactoryIsNotSet, "The JobFactory is not set.");
390 }
391 
393 {
394  CSynRegistryBuilder registry_builder(m_App);
395  m_SynRegistry = registry_builder.Get();
397 
398  m_Listener->OnInit(this);
399 
400  // This parameter is deprecated, "[Diag]Merge_Lines" should be used instead
401  if (m_SynRegistry->Get("log", "merge_lines", false)) {
404  }
405 
406  m_NetScheduleAPI = new SNetScheduleAPIImpl(registry_builder, kEmptyStr);
408  m_JobProcessorFactory->Init(*this);
409 }
410 
412 {
413  m_Impl->Init();
414 }
415 
416 void CGridWorkerNode::Suspend(bool pullback, unsigned timeout)
417 {
418  m_Impl->m_SuspendResume.GetLock()->Suspend(pullback, timeout);
419 }
420 
421 void SSuspendResume::Suspend(bool pullback, unsigned timeout)
422 {
423  if (pullback)
424  SetJobPullbackTimer(timeout);
425  if (m_Event.exchange(eSuspend) == eNoEvent)
427 }
428 
430 {
431  m_Impl->m_SuspendResume->Resume();
432 }
433 
434 void SSuspendResume::Resume() volatile
435 {
436  if (m_Event.exchange(eResume) == eNoEvent)
438 }
439 
441 {
442  m_JobPullbackTime = CDeadline(seconds);
444 }
445 
447 {
448  return m_JobPullbackTime.IsExpired();
449 }
450 
452 {
454 
455  if (!m_SingleThreadForced) {
456  string max_threads = m_SynRegistry->Get("server", "max_threads", "8");
457  if (NStr::CompareNocase(max_threads, "auto") == 0)
459  else {
460  try {
461  m_MaxThreads = NStr::StringToUInt(max_threads);
462  }
463  catch (exception&) {
465  ERR_POST_X(51, "Could not convert [server"
466  "] max_threads parameter to number.\n"
467  "Using \'auto\' option (" << m_MaxThreads << ").");
468  }
469  }
470  }
471  m_NSTimeout = m_SynRegistry->Get("server", "job_wait_timeout", DEFAULT_NS_TIMEOUT);
472 
473  {{
474  string memlimitstr = m_SynRegistry->Get("server", "total_memory_limit", kEmptyStr);
475 
476  if (!memlimitstr.empty())
478  }}
479 
480  m_TotalTimeLimit = m_SynRegistry->Get("server", "total_time_limit", 0u);
481 
482  m_StartupTime = time(0);
483 
484  CGridGlobals::GetInstance().SetReuseJobObject(m_SynRegistry->Get("server", "reuse_job_object", false));
486 
487  m_LogRequested = m_SynRegistry->Get("server", "log", false);
488  m_ProgressLogRequested = m_SynRegistry->Get("server", "log_progress", false);
489  m_ThreadPoolTimeout = m_SynRegistry->Get("server", "thread_pool_timeout", 30);
490 }
491 
493 {
494  _ASSERT(m_MaxThreads > 0);
496 
498  GetAppName() + "_wr");
499 
500  try {
501  unsigned init_threads = m_SynRegistry->Get("server", "init_threads", 1);
502 
503  m_ThreadPool->Spawn(init_threads <= m_MaxThreads ?
504  init_threads : m_MaxThreads);
505  }
506  catch (exception& ex) {
507  ERR_POST_X(26, ex.what());
510  throw;
511  }
512 }
513 
514 bool CRunningJobLimit::CountJob(const string& job_group,
515  CJobRunRegistration* job_run_registration)
516 {
517  if (m_MaxNumber == 0)
518  return true;
519 
520  CFastMutexGuard guard(m_Mutex);
521 
522  pair<TJobCounter::iterator, bool> insertion(
523  m_Counter.insert(TJobCounter::value_type(job_group, 1)));
524 
525  if (!insertion.second) {
526  if (insertion.first->second == m_MaxNumber)
527  return false;
528 
529  ++insertion.first->second;
530  }
531 
532  job_run_registration->RegisterRun(this, insertion.first);
533 
534  return true;
535 }
536 
538 #ifdef NCBI_OS_UNIX
539  ESwitch daemonize,
540 #endif
541  string procinfo_file_name)
542 {
543  return m_Impl->Run(
544 #ifdef NCBI_OS_UNIX
545  daemonize,
546 #endif
547  procinfo_file_name);
548 }
549 
551 #ifdef NCBI_OS_UNIX
552  ESwitch daemonize,
553 #endif
554  string procinfo_file_name)
555 {
557 
558  x_WNCoreInit();
559 
560  const SBuildInfo& build_info(m_App.GetFullVersion().GetBuildInfo());
561 
562  LOG_POST_X(50, Info << m_JobProcessorFactory->GetJobVersion() <<
563  " build " << build_info.date << " tag " << build_info.tag);
564 
565  const CArgs& args = m_App.GetArgs();
566 
567  unsigned short start_port, end_port;
568 
569  {{
570  string control_port_arg(args["control_port"] ?
571  args["control_port"].AsString() :
572  m_SynRegistry->Get("server", "control_port", "9300"));
573 
574  CTempString from_port, to_port;
575 
576  NStr::SplitInTwo(control_port_arg, "- ",
577  from_port, to_port,
579 
580  start_port = NStr::StringToNumeric<unsigned short>(from_port);
581  end_port = to_port.empty() ? start_port : NStr::StringToNumeric<unsigned short>(to_port);
582  }}
583 
584 #ifdef NCBI_OS_UNIX
585  bool is_daemon = daemonize != eDefault ? daemonize == eOn :
586  m_SynRegistry->Get("server", "daemon", false);
587 #endif
588 
589  vector<string> vhosts;
590 
591  NStr::Split(m_SynRegistry->Get("server", "master_nodes", kEmptyStr), " ;,", vhosts);
592 
593  ITERATE(vector<string>, it, vhosts) {
594  if (auto address = SSocketAddress::Parse(NStr::TruncateSpaces(*it))) {
595  m_Masters.insert(std::move(address));
596  }
597  }
598 
599  if (vhosts.size() && m_Masters.empty()) {
600  LOG_POST_X(41, Warning << "All hosts from master_nodes were ignored");
601  }
602 
603  vhosts.clear();
604 
605  NStr::Split(m_SynRegistry->Get("server", "admin_hosts", kEmptyStr), " ;,", vhosts);
606 
607  ITERATE(vector<string>, it, vhosts) {
608  unsigned int ha = CSocketAPI::gethostbyname(*it);
609  if (ha != 0)
610  m_AdminHosts.insert(ha);
611  }
612 
613  if (vhosts.size() && m_AdminHosts.empty()) {
614  LOG_POST_X(42, Warning << "All hosts from admin_hosts were ignored");
615  }
616 
617  auto commit_job_interval = m_SynRegistry->Get("server", "commit_job_interval", m_CommitJobInterval);
618  m_CommitJobInterval = static_cast<unsigned>(max(1, commit_job_interval));
619 
620  m_CheckStatusPeriod = m_SynRegistry->Get("server", "check_status_period", 2);
621  if (m_CheckStatusPeriod == 0)
623 
624  auto default_timeout = m_SynRegistry->Get("server", "default_pullback_timeout", 0);
625  m_SuspendResume.GetLock()->SetDefaultPullbackTimeout(default_timeout);
626 
627  if (m_SynRegistry->Has("server", "wait_server_timeout")) {
628  ERR_POST_X(52, "[server"
629  "] \"wait_server_timeout\" is not used anymore.\n"
630  "Use [" << kNetScheduleAPIDriverName <<
631  "] \"communication_timeout\" parameter instead.");
632  }
633 
634  if (!procinfo_file_name.empty()) {
635  // Make sure the process info file is writable.
636  CFile proc_info_file(procinfo_file_name);
637  if (proc_info_file.Exists()) {
638  // Already exists
639  if (!proc_info_file.CheckAccess(CFile::fWrite)) {
640  fprintf(stderr, "'%s' is not writable.\n",
641  procinfo_file_name.c_str());
642  return 21;
643  }
644  } else {
645  // The process info file does not exist yet.
646  // Create a temporary file in the designated directory
647  // to make sure the location is writable.
648  string test_file_name = procinfo_file_name + ".TEST";
649  FILE* f = fopen(test_file_name.c_str(), "w");
650  if (f == NULL) {
651  perror(("Cannot create " + test_file_name).c_str());
652  return 22;
653  }
654  fclose(f);
655  // The worker node may fail (e.g. if the control port is busy)
656  // before the process info file can be written; remove the
657  // empty file to avoid confusion.
658  remove(test_file_name.c_str());
659  }
660  }
661 
662 #ifdef NCBI_OS_UNIX
663  if (is_daemon) {
664  LOG_POST_X(53, "Entering UNIX daemon mode...");
665  CCurrentProcess::Daemonize("/dev/null",
670  }
671 #endif
672 
673  AddJobWatcher(CGridGlobals::GetInstance().GetJobWatcher());
674 
676 
677  CRef<CGridControlThread> control_thread(
678  new CGridControlThread(this, start_port, end_port));
679 
680  try {
681  control_thread->Prepare();
682  }
683  catch (CServer_Exception& e) {
684  if (e.GetErrCode() == CServer_Exception::eCouldntListen) {
685  ERR_POST("Couldn't start a listener on a port from the "
686  "specified control port range; last port tried: " <<
687  control_thread->GetControlPort() << ". Another "
688  "process (probably another instance of this worker "
689  "node) is occupying the port(s).");
690  } else {
691  ERR_POST(e);
692  }
693  return 3;
694  }
695 
696  const string& client(GetClientName());
697  const string& host(CSocketAPI::gethostname());
698  const string& port(NStr::NumericToString(control_thread->GetControlPort()));
699  LOG_POST_X(60, "Control port: " << port);
700  m_NetScheduleAPI->SetAuthParam("control_port", port);
701  m_NetScheduleAPI->SetAuthParam("client_host", host);
702 
703  // This overrides default client node format (omits user name),
704  // so deployed worker nodes could be determined by GRID Dashboard.
705  m_NetScheduleAPI.SetClientNode(client + "::" + host + ':' + port);
706 
708 
710 
712 
713 #ifdef NCBI_OS_UNIX
714  bool reliable_cleanup = m_SynRegistry->Get("server", "reliable_cleanup", false);
715 
716  if (reliable_cleanup) {
717  TPid child_pid = CCurrentProcess::Fork();
718  if (child_pid != 0) {
719  CProcess child_process(child_pid);
720  CProcess::CExitInfo exit_info;
721  child_process.Wait(kInfiniteTimeoutMs, &exit_info);
722 
723  x_ClearNode();
724  remove(procinfo_file_name.c_str());
725 
726  int retcode = 0;
727  if (exit_info.IsPresent()) {
728  if (exit_info.IsSignaled()) {
729  int signum = exit_info.GetSignal();
730  ERR_POST(Critical << "Child process " << child_pid <<
731  " was terminated by signal " << signum);
733  } else if (exit_info.IsExited()) {
734  retcode = exit_info.GetExitCode();
735  if (retcode == 0) {
736  LOG_POST("Worker node process " << child_pid <<
737  " terminated normally (exit code 0)");
738  } else {
739  ERR_POST(Error << "Worker node process " << child_pid <<
740  " terminated with exit code " << retcode);
741  }
742  }
743  }
744  // Exit the parent process with the same return code.
745  return retcode;
746  }
747  }
748 #endif
749 
750  // Now that most of parameters have been checked, create the
751  // "procinfo" file.
752  if (!procinfo_file_name.empty()) {
753  FILE* procinfo_file;
754  if ((procinfo_file = fopen(procinfo_file_name.c_str(), "wt")) == NULL) {
755  perror(procinfo_file_name.c_str());
756  return 23;
757  }
758  fprintf(procinfo_file, "pid: %lu\nport: %s\n"
759  "client_node: %s\nclient_session: %s\n",
760  (unsigned long) CDiagContext::GetPID(),
761  port.c_str(),
764  fclose(procinfo_file);
765  }
766 
769 
770  CDeadline max_wait_for_servers(TWorkerNode_MaxWaitForServers::GetDefault());
771 
772  for (;;) {
773  try {
775 
776  m_NetScheduleAPI.GetAdmin().GetQueueInfo(queue_info);
777 
778  m_QueueTimeout = NStr::StringToUInt(queue_info["timeout"]);
779 
782  break;
783  }
784  catch (CException& e) {
785  int s = (int) m_NSTimeout;
786  do {
787  ERR_POST(e);
788  if (CGridGlobals::GetInstance().IsShuttingDown() ||
789  max_wait_for_servers.GetRemainingTime().IsZero())
790  return 1;
791  SleepSec(1);
792  } while (--s > 0);
793  }
794  }
795 
796  m_JobsPerClientIP.ResetJobCounter( (unsigned) m_SynRegistry->Get("server", "max_jobs_per_client_ip", 0));
797  m_JobsPerSessionID.ResetJobCounter((unsigned) m_SynRegistry->Get("server", "max_jobs_per_session_id", 0));
798 
799  CWNJobWatcher& watcher(CGridGlobals::GetInstance().GetJobWatcher());
800  watcher.SetMaxJobsAllowed( m_SynRegistry->Get("server", "max_total_jobs", 0));
801  watcher.SetMaxFailuresAllowed(m_SynRegistry->Get("server", "max_failed_jobs", 0));
802  watcher.SetInfiniteLoopTime( m_SynRegistry->Get("server", "infinite_loop_time", 0));
805 
806  IWorkerNodeIdleTask* task = NULL;
807 
808  unsigned idle_run_delay = m_SynRegistry->Get("server", "idle_run_delay", 30);
809  unsigned auto_shutdown = m_SynRegistry->Get("server", "auto_shutdown_if_idle", 0);
810 
811  if (idle_run_delay > 0)
812  task = m_JobProcessorFactory->GetIdleTask();
813  if (task || auto_shutdown > 0) {
815  task ? idle_run_delay : auto_shutdown, auto_shutdown));
816  m_IdleThread->Run();
818  }
819 
821  control_thread->Run();
822 
823  LOG_POST_X(54, Info << "\n=================== NEW RUN : " <<
824  CGridGlobals::GetInstance().GetStartTime().AsString() <<
825  " ===================\n" <<
826  m_JobProcessorFactory->GetJobVersion() << " build " <<
827  build_info.date << " tag " << build_info.tag <<
828  " is started.\n"
829  "Waiting for control commands on " << CSocketAPI::gethostname() <<
830  ":" << control_thread->GetControlPort() << "\n"
831  "Queue name: " << GetQueueName() << "\n"
832  "Maximum job threads: " << m_MaxThreads << "\n");
833 
834  m_Listener->OnGridWorkerStart();
835 
837 
838  {
839  CRef<CMainLoopThread> main_loop_thread(new CMainLoopThread(this));
840 
841  main_loop_thread->Run();
842  main_loop_thread->Join();
843  }
844 
845  LOG_POST_X(31, Info << "Shutting down...");
846 
847  bool force_exit = m_SynRegistry->Get("server", "force_exit", false);
848  if (force_exit) {
849  ERR_POST_X(45, "Force exit (worker threads will not be waited for)");
850  } else
852 
853  LOG_POST_X(55, Info << "Stopping the job committer thread...");
855 
856  CNcbiOstrstream os;
858  LOG_POST_X(56, Info << string(CNcbiOstrstreamToString(os)));
859 
860  if (m_IdleThread) {
862  LOG_POST_X(57, "Stopping Idle thread...");
864  }
865  m_IdleThread->Join();
866  }
867 
869 
870 #ifdef NCBI_OS_UNIX
871  // Clear the node only if reliable CLRN mode is not enabled.
872  // Otherwise, the node will be cleared in the parent process.
873  if (!reliable_cleanup)
874 #endif
875  {
876  x_ClearNode();
877  remove(procinfo_file_name.c_str());
878  }
879 
880  int exit_code = x_WNCleanUp();
881 
882  LOG_POST(Info << "Stopping the socket server thread...");
883  control_thread->Stop();
884  control_thread->Join();
885 
886  LOG_POST_X(38, Info << "Worker Node has been stopped.");
887 
888  if (force_exit) {
889  SleepMilliSec(200);
890  _exit(exit_code);
891  }
892 
893  return exit_code;
894 }
895 
897 {
898  if (m_ThreadPool != NULL) {
899  try {
900  LOG_POST_X(32, Info << "Stopping worker threads...");
902  delete m_ThreadPool;
903  m_ThreadPool = NULL;
904  }
905  catch (exception& ex) {
906  ERR_POST_X(33, "Could not stop worker threads: " << ex.what());
907  }
908  }
909 }
910 
912 {
913  try {
915  }
916  catch (CNetServiceException& ex) {
917  // if server does not understand this new command just ignore the error
919  || NStr::Find(ex.what(), "Server error:Unknown request") == NPOS) {
920  ERR_POST_X(35, "Could not unregister from NetSchedule services: "
921  << ex);
922  }
923  }
924  catch (exception& ex) {
925  ERR_POST_X(36, "Could not unregister from NetSchedule services: " <<
926  ex.what());
927  }
928 }
929 
931 {
934 }
935 
936 
938  EOwnership owner)
939 {
940  if (m_Watchers.find(&job_watcher) == m_Watchers.end())
941  m_Watchers[&job_watcher] = owner == eTakeOwnership ?
942  AutoPtr<IWorkerNodeJobWatcher>(&job_watcher) :
944 }
945 
947 {
948  m_Impl->m_Listener.reset(
949  listener ? listener : new CGridWorkerNodeApp_Listener());
950 }
951 
953  const CWorkerNodeJobContext& job_context,
955 {
958  try {
959  const_cast<IWorkerNodeJobWatcher*>(it->first)->Notify(job_context,
960  event);
961  }
962  NCBI_CATCH_ALL_X(66, "Error while notifying a job watcher");
963  }
964 }
965 
966 bool CGridWorkerNode::IsHostInAdminHostsList(const string& host) const
967 {
968  if (m_Impl->m_AdminHosts.empty())
969  return true;
970  unsigned int ha = CSocketAPI::gethostbyname(host);
971  if (m_Impl->m_AdminHosts.find(ha) != m_Impl->m_AdminHosts.end())
972  return true;
973  unsigned int ha_lh = CSocketAPI::gethostbyname("");
974  if (ha == ha_lh) {
975  ha = CSocketAPI::gethostbyname("localhost");
976  if (m_Impl->m_AdminHosts.find(ha) != m_Impl->m_AdminHosts.end())
977  return true;
978  }
979  return false;
980 }
981 
983 {
985  STimeout tmo = {0, 500};
986  CSocket socket(it->host, it->port, &tmo, eOff);
987  if (socket.GetStatus(eIO_Open) != eIO_Success)
988  continue;
989 
990  CNcbiOstrstream os;
991  os << GetClientName() << endl <<
992  GetQueueName() << ";" <<
993  GetServiceName() << endl <<
994  "GETLOAD" << endl << ends;
995 
996  string msg = CNcbiOstrstreamToString(os);
997  if (socket.Write(msg.data(), msg.size()) != eIO_Success)
998  continue;
999  string reply;
1000  if (socket.ReadLine(reply) != eIO_Success)
1001  continue;
1002  if (NStr::StartsWith(reply, "ERR:")) {
1003  NStr::Replace(reply, "ERR:", "", msg);
1004  ERR_POST_X(43, "Worker Node at " << it->AsString() <<
1005  " returned error: " << msg);
1006  } else if (NStr::StartsWith(reply, "OK:")) {
1007  NStr::Replace(reply, "OK:", "", msg);
1008  try {
1009  int load = NStr::StringToInt(msg);
1010  if (load > 0)
1011  return false;
1012  } catch (exception&) {}
1013  } else {
1014  ERR_POST_X(44, "Worker Node at " << it->AsString() <<
1015  " returned unknown reply: " << reply);
1016  }
1017  }
1018  return true;
1019 }
1020 
1022 {
1024 }
1025 
1027 {
1030 }
1031 
1033 {
1036 
1038  return true;
1039  }
1040  return false;
1041 }
1042 
1044 {
1046 
1049 }
1050 
1052 {
1055  return true;
1056  }
1057  return false;
1058 }
1059 
1062 {
1063  s_ReqEventsDisabled = disabled_events;
1064 }
1065 
1067 {
1068  m_Impl->m_SingleThreadForced = true;
1069 }
1070 
1072 {
1073  return *m_Impl->m_JobProcessorFactory;
1074 }
1075 
1077 {
1078  return m_Impl->m_MaxThreads;
1079 }
1080 
1082 {
1083  return m_Impl->m_TotalMemoryLimit;
1084 }
1085 
1087 {
1088  return m_Impl->m_TotalTimeLimit;
1089 }
1090 
1092 {
1093  return m_Impl->m_StartupTime;
1094 }
1095 
1097 {
1098  return m_Impl->m_QueueTimeout;
1099 }
1100 
1102 {
1103  return m_Impl->m_CommitJobInterval;
1104 }
1105 
1107 {
1108  return m_Impl->m_CheckStatusPeriod;
1109 }
1110 
1112 {
1113  CFastMutexGuard guard(m_Impl->m_JobProcessorMutex);
1114  return m_Impl->m_JobProcessorFactory->GetAppName();
1115 }
1116 
1118 {
1119  const CVersionAPI& version(m_Impl->m_App.GetFullVersion());
1120  const CVersionInfo& version_info(version.GetVersionInfo());
1121  const SBuildInfo& build_info(version.GetBuildInfo());
1122 
1123  const auto& job_factory(m_Impl->m_JobProcessorFactory);
1124  _ASSERT(job_factory.get());
1125  const string job_version(job_factory->GetAppVersion());
1126 
1127  return make_pair(job_version.empty() ? version_info.Print() : job_version, build_info);
1128 }
1129 
1131 {
1132  return m_Impl->m_NetCacheAPI;
1133 }
1134 
1136 {
1137  return m_Impl->m_NetScheduleAPI;
1138 }
1139 
1141 {
1142  return m_Impl->m_NSExecutor;
1143 }
1144 
1146 {
1147  return m_Impl->m_CleanupEventSource;
1148 }
1149 
1151 {
1152  return m_Impl->m_SuspendResume->IsSuspended();
1153 }
1154 
1155 const string& CGridWorkerNode::GetQueueName() const
1156 {
1157  return m_Impl->GetQueueName();
1158 }
1159 
1160 const string& CGridWorkerNode::GetClientName() const
1161 {
1162  return m_Impl->GetClientName();
1163 }
1164 
1165 const string& CGridWorkerNode::GetServiceName() const
1166 {
1167  return m_Impl->GetServiceName();
1168 }
1169 
#define false
Definition: bool.h:36
CArgs –.
Definition: ncbiargs.hpp:379
CAtomicCounter_WithAutoInit –.
Definition: ncbicntr.hpp:120
CDeadline.
Definition: ncbitime.hpp:1830
CFastMutex –.
Definition: ncbimtx.hpp:667
CFile –.
Definition: ncbifile.hpp:1604
CGridControlThread(SGridWorkerNodeImpl *worker_node, unsigned short start_port, unsigned short end_port)
Definition: grid_worker.cpp:79
CWorkerNodeControlServer m_Control
const string m_ThreadName
unsigned short GetControlPort() const
Definition: grid_worker.cpp:87
virtual void OnExit(void)
Override this to execute finalization code.
Definition: grid_worker.cpp:97
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
Definition: grid_worker.cpp:91
void SetWorker(SGridWorkerNodeImpl *worker)
CWNJobWatcher & GetJobWatcher()
void SetUDPPort(unsigned short udp_port)
void RequestShutdown(CNetScheduleAdmin::EShutdownLevel level)
Request node shutdown.
void SetReuseJobObject(bool value)
void InterruptUDPPortListening()
static CGridGlobals & GetInstance()
An adapter class for IGridWorkerNodeApp_Listener.
virtual ~CIdleWatcher()
virtual void Notify(const CWorkerNodeJobContext &, EEvent event)
CIdleWatcher(CWorkerNodeIdleThread &idle)
CAtomicCounter_WithAutoInit m_RunningJobs
CWorkerNodeIdleThread & m_Idle
void RegisterRun(CRunningJobLimit *job_counter, CRunningJobLimit::TJobCounter::iterator job_group_it)
CNcbiApplicationAPI –.
CNcbiEnvironment –.
Definition: ncbienv.hpp:104
CNcbiOstrstreamToString class helps convert CNcbiOstrstream to a string Sample usage:
Definition: ncbistre.hpp:802
Client API for NetCache server.
Client API for NCBI NetSchedule server.
void GetQueueInfo(CNetServer server, const string &queue_name, TQueueInfo &queue_info)
@ eShutdownImmediate
Urgent shutdown was requested.
Smart pointer to a part of the NetSchedule API that does job retrieval and processing on the worker n...
Net Service exception.
Extended exit information for waited process.
CProcess –.
CRef –.
Definition: ncbiobj.hpp:618
void ResetJobCounter(unsigned max_number)
bool CountJob(const string &job_group, CJobRunRegistration *job_run_registration)
CSemaphore –.
Definition: ncbimtx.hpp:1375
CServer_Exception::
Definition: server.hpp:465
CSocket::
CStopWatch –.
Definition: ncbitime.hpp:1938
static unsigned int GetCpuCount(void)
Return number of active CPUs/cores (never less than 1).
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
CVersionInfo –.
void SetInfiniteLoopTime(unsigned int infinite_loop_time)
void SetMaxFailuresAllowed(unsigned int max_failures_allowed)
void Print(CNcbiOstream &os) const
void SetMaxJobsAllowed(unsigned int max_jobs_allowed)
unsigned short GetControlPort() const
Worker Node Idle Task Context.
unsigned int m_RunInterval
SGridWorkerNodeImpl * m_WorkerNode
CWorkerNodeIdleThread(IWorkerNodeIdleTask *, SGridWorkerNodeImpl *worker_node, unsigned run_delay, unsigned int auto_shutdown)
unsigned int m_AutoShutdown
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
unsigned x_GetIdleTimeIfShutdown() const
volatile bool m_StopFlag
CStopWatch m_AutoShutdownSW
CWorkerNodeIdleTaskContext & GetContext()
bool IsShutdownRequested() const
bool x_GetStopFlag() const
CWorkerNodeIdleTaskContext m_TaskContext
CWorkerNodeIdleThread & operator=(const CWorkerNodeIdleThread &)
IWorkerNodeIdleTask * m_Task
virtual void OnExit(void)
Override this to execute finalization code.
volatile bool m_ShutdownFlag
unsigned int x_GetInterval() const
CWorkerNodeIdleThread(const CWorkerNodeIdleThread &)
const string m_ThreadName
Worker Node job context.
virtual void Process()
Do the actual job Called by whichever thread handles this request.
Definition: grid_worker.cpp:64
CWorkerNodeJobContext m_JobContext
Listener of events generated by CGridWorkerNodeApp.
IRegistry –.
Definition: ncbireg.hpp:73
Clean-up event source for the worker node.
Definition: grid_worker.hpp:98
Worker Node Idle Task Interface.
Worker Node Job Factory interface.
Jobs watcher interface.
const_iterator end() const
Definition: map.hpp:152
iterator_bool insert(const value_type &val)
Definition: map.hpp:165
const_iterator find(const key_type &key) const
Definition: map.hpp:153
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
bool empty() const
Definition: set.hpp:133
static void DLIST_NAME() remove(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
Definition: dlist.tmpl.h:90
static CGridWorkerNode::EDisabledRequestEvents s_ReqEventsDisabled
Definition: grid_worker.cpp:61
bool g_IsRequestStopEventEnabled()
#define DEFAULT_NS_TIMEOUT
Definition: grid_worker.cpp:49
bool g_IsRequestStartEventEnabled()
const CNcbiEnvironment & GetEnvironment(void) const
Get the application's cached environment.
virtual const CArgs & GetArgs(void) const
Get parsed command line arguments.
Definition: ncbiapp.cpp:285
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
const CVersionAPI & GetFullVersion(void) const
Get the program version information.
Definition: ncbiapp.cpp:1169
@ eOn
Definition: ncbi_types.h:111
@ eTakeOwnership
An object can take ownership of another.
Definition: ncbi_types.h:136
#define NULL
Definition: ncbistd.hpp:225
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
Definition: ncbicntr.hpp:278
void SetDiagPostFlag(EDiagPostFlag flag)
Set the specified flag (globally).
Definition: ncbidiag.cpp:6070
#define LOG_POST_X(err_subcode, message)
Definition: ncbidiag.hpp:553
void SetSessionID(const string &session)
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
Definition: ncbidiag.cpp:1901
static TPID GetPID(void)
Get cached PID (read real PID if not cached yet).
Definition: ncbidiag.cpp:1526
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
Definition: ncbidiag.hpp:550
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
@ eDPF_MergeLines
Escape EOLs.
Definition: ncbidiag.hpp:751
@ eDPF_PreMergeLines
Obsolete. Use eDPF_MergeLines.
Definition: ncbidiag.hpp:752
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1203
void Error(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1197
#define NCBI_CATCH_ALL_X(err_subcode, message)
Definition: ncbiexpt.hpp:619
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
Definition: ncbiexpt.cpp:342
void Info(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1185
bool CheckAccess(TMode access_mode) const
Check access rights.
Definition: ncbifile.cpp:1720
virtual bool Exists(void) const
Check existence of file.
Definition: ncbifile.hpp:4038
@ fWrite
Write permission.
Definition: ncbifile.hpp:1152
time_t GetStartupTime() const
CWorkerNodeIdleThread & m_Thread
IWorkerNodeJobFactory & GetJobFactory()
void ForceSingleThread()
const string & GetQueueName() const
virtual ~IWorkerNodeJobWatcher()
Definition: grid_worker.cpp:55
void SetListener(IGridWorkerNodeApp_Listener *listener)
unsigned GetCommitJobInterval() const
bool IsHostInAdminHostsList(const string &host) const
bool IsShutdownRequested() const
TVersion GetAppVersion() const
const string & GetClientName() const
CNetCacheAPI GetNetCacheAPI() const
pair< string, SBuildInfo > TVersion
virtual void Run(CWorkerNodeIdleTaskContext &)=0
Do the Idle task here.
EDisabledRequestEvents
Disable the automatic logging of request-start and request-stop events by the framework itself.
int Run(ESwitch daemonize=eDefault, string procinfo_file_name=string())
Start job execution loop.
unsigned int GetMaxThreads() const
Get the maximum threads running simultaneously.
unsigned GetTotalTimeLimit() const
Get total time limit (automatic restart after that)
unsigned GetQueueTimeout() const
CNetScheduleExecutor GetNSExecutor() const
IWorkerNodeCleanupEventSource * GetCleanupEventSource()
const string & GetServiceName() const
const SServerParams & GetServerParams()
void RequestShutdown()
static void DisableDefaultRequestEventLogging(EDisabledRequestEvents disabled_events=eDisableStartStop)
bool IsSuspended() const
string GetAppName() const
void SetProgramVersion(const string &pv)
Set program version (like: MyProgram v.
CNetScheduleExecutor GetExecutor()
Create an instance of CNetScheduleExecutor.
CWorkerNodeIdleTaskContext(CWorkerNodeIdleThread &thread)
CNetScheduleAPI GetNetScheduleAPI() const
CNetScheduleAdmin GetAdmin()
void Suspend(bool pullback, unsigned timeout)
Uint8 GetTotalMemoryLimit() const
Get total memory limit (automatic restart if node grows more than that)
unsigned GetCheckStatusPeriod() const
CNetRef< SGridWorkerNodeImpl > m_Impl
void SetClientNode(const string &client_node)
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:773
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
#define kMax_UInt
Definition: ncbi_limits.h:185
const unsigned long kInfiniteTimeoutMs
Infinite timeout in milliseconds.
bool IsPresent(void) const
TRUE if the object contains information about the process state.
static TPid GetPid(void)
Get process identifier (pid) for the current process.
bool IsExited(void) const
TRUE if the process terminated normally.
int GetSignal(void) const
Get the signal number that has caused the process to terminate (UNIX only).
static TPid Daemonize(const char *logfile=0, TDaemonFlags flags=0)
Go daemon.
static TPid Fork(TForkFlags flags=fFF_UpdateDiag)
Fork the process.
int Wait(unsigned long timeout=kInfiniteTimeoutMs, CExitInfo *info=0) const
Wait until process terminates.
pid_t TPid
Process identifier (PID) and process handle.
bool IsSignaled(void) const
TRUE if the process terminated by a signal (UNIX only).
int GetExitCode(void) const
Get process exit code.
@ fDF_AllowExceptions
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
EIO_Status ReadLine(string &str)
Read a line from socket (up to CR-LF, LF, or null character, discarding any of the EOLs).
static string gethostname(ESwitch log=eOff)
Return empty string on error.
static unsigned int gethostbyname(const string &host, ESwitch log=eOff)
Return 0 on error.
EIO_Status GetStatus(EIO_Event direction) const
Return status of *last* I/O operation without making any actual I/O.
EIO_Status Write(const void *buf, size_t size, size_t *n_written=0, EIO_WriteMethod how=eIO_WritePersist)
Write to socket.
#define kEmptyStr
Definition: ncbistr.hpp:123
static int CompareNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive compare of a substring with another string.
Definition: ncbistr.cpp:219
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
Definition: ncbistr.cpp:630
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
Definition: ncbistr.cpp:3457
static Uint8 StringToUInt8_DataSize(const CTempString str, TStringToNumFlags flags=0)
Convert string that can contain "software" qualifiers to Uint8.
Definition: ncbistr.cpp:1539
#define NPOS
Definition: ncbistr.hpp:133
static SIZE_TYPE Find(const CTempString str, const CTempString pattern, ECase use_case=eCase, EDirection direction=eForwardSearch, SIZE_TYPE occurrence=0)
Find the pattern in the string.
Definition: ncbistr.cpp:2887
bool empty(void) const
Return true if the represented string is empty (i.e., the length is zero)
Definition: tempstr.hpp:334
static string & Replace(const string &src, const string &search, const string &replace, string &dst, SIZE_TYPE start_pos=0, SIZE_TYPE max_replace=0, SIZE_TYPE *num_replace=0)
Replace occurrences of a substring within a string.
Definition: ncbistr.cpp:3310
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
Definition: ncbistr.hpp:5411
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3550
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
Definition: ncbistr.cpp:642
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
static string TruncateSpaces(const string &str, ETrunc where=eTrunc_Both)
Truncate spaces in a string.
Definition: ncbistr.cpp:3182
@ fSplit_Truncate
Definition: ncbistr.hpp:2501
@ fSplit_MergeDelimiters
Merge adjacent delimiters.
Definition: ncbistr.hpp:2498
virtual void KillAllThreads(TKillFlags flags)
Causes all threads in the pool to exit cleanly after finishing all pending requests,...
void Spawn(unsigned int num_threads)
Start processing threads.
void Run(void)
Enter the main loop.
Definition: server.cpp:771
unsigned int m_MaxThreads
Maximum simultaneous threads.
void StartListening(void)
Start listening before the main loop.
Definition: server.cpp:637
@ eCouldntListen
Unable to bind listening port.
Definition: server.hpp:469
bool Run(TRunMode flags=fRunDefault)
Run the thread.
Definition: ncbithr.cpp:724
virtual void OnExit(void)
Override this to execute finalization code.
Definition: ncbithr.cpp:940
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
Definition: ncbithr.cpp:958
bool TryWait(unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Timed wait.
Definition: ncbimtx.cpp:1844
void Post(unsigned int count=1)
Increment the semaphore by "count".
Definition: ncbimtx.cpp:1971
void Join(void **exit_data=0)
Wait for the thread termination.
Definition: ncbithr.cpp:863
double Restart(void)
Return time elapsed since first Start() or last Restart() call (in seconds).
Definition: ncbitime.hpp:2817
CNanoTimeout GetRemainingTime(void) const
Get time left to the expiration.
Definition: ncbitime.cpp:3858
double Elapsed(void) const
Return time elapsed since first Start() or last Restart() call (in seconds).
Definition: ncbitime.hpp:2776
bool IsExpired(void) const
Check if the deadline is expired.
Definition: ncbitime.hpp:1856
void Stop(void)
Suspend the timer.
Definition: ncbitime.hpp:2793
bool IsZero() const
Definition: ncbitime.cpp:3475
enum ENcbiSwitch ESwitch
Aux.
enum ENcbiOwnership EOwnership
Ownership relations between objects.
@ eIO_Success
everything is fine, no error occurred
Definition: ncbi_core.h:133
@ eIO_Open
also serves as no-event indicator in SOCK_Poll
Definition: ncbi_core.h:119
const SBuildInfo & GetBuildInfo() const
Get build info (date and tag, if set)
Definition: version.cpp:695
virtual string Print(void) const
Print version information.
Definition: version.cpp:120
string date
Definition: version_api.hpp:79
string tag
Definition: version_api.hpp:80
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
static int version
Definition: mdb_load.c:29
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
void SleepSec(unsigned long sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Sleep.
#define NCBI_OS_UNIX
const char *const kNetScheduleAPIDriverName
T signum(T x_)
T max(T x_, T y_)
T min(T x_, T y_)
double f(double x_, const double &y_)
Definition: njn_root.hpp:188
const CNSPreciseTime default_timeout(3600, 0)
static CNamedPipeClient * client
CSynRegistry::TPtr Get()
This class allows to add build info (date and tag) to application version.
Definition: version_api.hpp:62
unique_ptr< IGridWorkerNodeApp_Listener > m_Listener
CSynRegistry::TPtr m_SynRegistry
string GetAppName() const
CNetScheduleAPI m_NetScheduleAPI
CRef< CJobCommitterThread > m_JobCommitterThread
CNcbiApplicationAPI & m_App
IWorkerNodeCleanupEventSource * GetCleanupEventSource() const override
Get interface for registering clean-up event listeners.
const CArgs & GetArgs() const override
Get command line arguments.
CStdPoolOfThreads * m_ThreadPool
CRef< CWorkerNodeIdleThread > m_IdleThread
const string & GetServiceName() const
void x_NotifyJobWatchers(const CWorkerNodeJobContext &job_context, IWorkerNodeJobWatcher::EEvent event)
CSemaphore m_ExclusiveJobSemaphore
CRunningJobLimit m_JobsPerSessionID
unique_ptr< IWorkerNodeJobFactory > m_JobProcessorFactory
CRef< CWorkerNodeCleanup > m_CleanupEventSource
bool x_AreMastersBusy() const
set< unsigned int > m_AdminHosts
CNetScheduleAPI GetNetScheduleAPI() const override
Get the shared NetScheduleAPI object used by the worker node framework.
set< SSocketAddress > m_Masters
const string & GetQueueName() const
CRef< IRegistry > m_Registry
SGridWorkerNodeImpl(CNcbiApplicationAPI &app, IWorkerNodeJobFactory *job_factory)
int Run(ESwitch daemonize, string procinfo_file_name)
CNetCacheAPI GetNetCacheAPI() const override
Get the shared NetCacheAPI object used by the worker node framework.
const string & GetClientName() const
void AddJobWatcher(IWorkerNodeJobWatcher &job_watcher, EOwnership owner=eNoOwnership)
SThreadSafe< SSuspendResume > m_SuspendResume
const CNcbiEnvironment & GetEnvironment() const override
Get environment variables.
bool WaitForExclusiveJobToFinish()
CNetScheduleExecutor m_NSExecutor
CRunningJobLimit m_JobsPerClientIP
const IRegistry & GetConfig() const override
Get a config file registry.
void SetAuthParam(const string &param_name, const string &param_value)
CNetScheduleNotificationHandler m_NotificationHandler
static SSocketAddress Parse(const string &address, SHost::EName name=SHost::EName::eResolved)
EState CheckState() volatile
bool IsJobPullbackTimerExpired()
void Resume() volatile
atomic< unsigned > m_CurrentJobGeneration
atomic< bool > m_IsSuspended
CDeadline m_JobPullbackTime
void SetJobPullbackTimer(unsigned seconds)
void Suspend(bool pullback, unsigned timeout)
atomic< EEvent > m_Event
SLock< TType > GetLock()
Timeout structure.
Definition: ncbi_types.h:76
#define _ASSERT
Modified on Sat Dec 09 04:47:39 2023 by modify_doxy.py rev. 669887