NCBI C++ ToolKit
grid_globals.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: grid_globals.cpp 89834 2020-04-27 16:12:51Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
36 
37 #include <corelib/ncbi_system.hpp>
38 #include <corelib/ncbimtx.hpp>
39 #include <corelib/ncbidiag.hpp>
41 
42 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
43 
45 
46 /////////////////////////////////////////////////////////////////////////////
47 //
48 // CWorkerNodeStatictics
49 /// @internal
51  : m_JobsStarted(0), m_JobsSucceeded(0), m_JobsFailed(0), m_JobsReturned(0),
52  m_JobsRescheduled(0), m_JobsCanceled(0), m_JobsLost(0),
53  m_MaxJobsAllowed(0), m_MaxFailuresAllowed(0),
54  m_InfiniteLoopTime(0)
55 {
56 }
58 {
59 }
60 
62  EEvent event)
63 {
64  auto& grid_globals = CGridGlobals::GetInstance();
65 
66  switch (event) {
67  case eJobStarted:
68  {
70  m_ActiveJobs[const_cast<CWorkerNodeJobContext*>(&job_context)] =
71  SJobActivity();
72  ++m_JobsStarted;
73  if (m_MaxJobsAllowed > 0 && m_JobsStarted > m_MaxJobsAllowed - 1 && !grid_globals.IsShuttingDown()) {
74  LOG_POST_X(1, "The maximum number of allowed jobs (" <<
75  m_MaxJobsAllowed << ") has been reached. "
76  "Sending the shutdown request." );
77  grid_globals.RequestShutdown(CNetScheduleAdmin::eNormalShutdown);
78  }
79  }
80  break;
81  case eJobStopped:
82  {
85  const_cast<CWorkerNodeJobContext*>(&job_context));
86  }
87  break;
88  case eJobFailed:
89  ++m_JobsFailed;
91  grid_globals.GetShutdownLevel() < CNetScheduleAdmin::eShutdownImmediate) {
92  ERR_POST_X(2, Warning << "The maximum number of failed jobs (" <<
93  m_MaxFailuresAllowed << ") has been reached. "
94  "Shutting down..." );
95  grid_globals.RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
96  }
97  break;
98  case eJobSucceeded:
100  break;
101  case eJobReturned:
102  ++m_JobsReturned;
103  break;
104  case eJobRescheduled:
106  break;
107  case eJobCanceled:
108  ++m_JobsCanceled;
109  break;
110  case eJobLost:
111  ++m_JobsLost;
112  break;
113  }
114 
115  if (event != eJobStarted && !grid_globals.IsShuttingDown()) {
116  CGridWorkerNode worker_node(job_context.GetWorkerNode());
117  Uint8 total_memory_limit = worker_node.GetTotalMemoryLimit();
118  if (total_memory_limit > 0) { // memory check requested
119  CCurrentProcess::SMemoryUsage memory_usage;
120  if (!CCurrentProcess::GetMemoryUsage(memory_usage)) {
121  ERR_POST("Could not check self memory usage" );
122  } else if (memory_usage.total > total_memory_limit) {
123  ERR_POST(Warning << "Memory usage (" << memory_usage.total <<
124  ") is above the configured limit (" <<
125  total_memory_limit << ")");
126  const auto kExitCode = 100; // See also one in wn_main_loop.cpp
127  grid_globals.RequestShutdown(CNetScheduleAdmin::eNormalShutdown, kExitCode);
128  }
129  }
130  }
131 }
132 
134 {
135  os << "Started: " <<
137  "\nJobs Succeeded: " << m_JobsSucceeded <<
138  "\nJobs Failed: " << m_JobsFailed <<
139  "\nJobs Returned: " << m_JobsReturned <<
140  "\nJobs Rescheduled: " << m_JobsRescheduled <<
141  "\nJobs Canceled: " << m_JobsCanceled <<
142  "\nJobs Lost: " << m_JobsLost << "\n";
143 
145  os << "Jobs Running: " << m_ActiveJobs.size() << "\n";
147  os << it->first->GetJobKey() << " \"" <<
148  NStr::PrintableString(it->first->GetJobInput()) <<
149  "\" -- running for " <<
150  (int) it->second.elasped_time.Elapsed() << " seconds.";
151  if (it->second.is_stuck)
152  os << "!!! LONG RUNNING JOB !!!";
153  os << "\n";
154  }
155 }
156 
158 {
159  if (m_InfiniteLoopTime > 0) {
160  size_t count = 0;
163  if (!it->second.is_stuck) {
164  if ( it->second.elasped_time.Elapsed() > m_InfiniteLoopTime) {
165  const auto job_key = it->first->GetJobKey();
166  ERR_POST_X(3, "An infinite loop is detected in job " << job_key);
167  GetDiagContext().Extra().Print("job_key", job_key);
168 
169  it->second.is_stuck = true;
171  RequestShutdown(CNetScheduleAdmin::eShutdownImmediate);
172  }
173  } else
174  ++count;
175  }
176  if (count > 0 && count == m_ActiveJobs.size()) {
177  ERR_POST_X(4, "All jobs are in infinite loops. "
178  "Server is shutting down.");
180  }
181  }
182 }
183 
185 {
188  CNetScheduleJob& job = it->first->GetJob();
189  if (!it->second.is_stuck)
190  worker.GetNSExecutor().ReturnJob(job);
191  else {
192  job.error_msg = "Job execution time exceeded " +
194  unsigned(it->second.elasped_time.Elapsed()));
195  job.error_msg += " seconds.";
196  worker.GetNSExecutor().PutFailure(job);
197  }
198  }
199  TPid cpid = CCurrentProcess::GetPid();
200  CProcess(cpid).Kill();
201 }
202 
203 
204 /////////////////////////////////////////////////////////////////////////////
205 //
207  m_ReuseJobObject(false),
208  m_ShutdownLevel(CNetScheduleAdmin::eNoShutdown),
209  m_ExitCode(0),
210  m_StartTime(GetFastLocalTime()),
211  m_Worker(NULL),
212  m_UDPPort(0)
213 {
214 }
215 
217 {
218 }
219 
220 /* static */
222 {
223  static CSafeStatic<CGridGlobals> global_instance;
224 
225  return global_instance.Get();
226 }
227 
228 
230 {
231  return (unsigned) m_JobsStarted.Add(1);
232 }
233 
235 {
236  if (!m_JobWatcher.get())
237  m_JobWatcher.reset(new CWNJobWatcher);
238  return *m_JobWatcher;
239 }
240 
242 {
243  _ASSERT(m_Worker);
244  if (m_Worker)
246 }
247 
249 {
250  if (m_UDPPort != 0)
251  CDatagramSocket().Send("INTERRUPT", sizeof("INTERRUPT"),
252  "127.0.0.1", m_UDPPort);
253 }
254 
CDatagramSocket::
unsigned int GetNewJobNumber()
CAtomicCounter_WithAutoInit m_JobsStarted
CWNJobWatcher & GetJobWatcher()
unsigned short m_UDPPort
void InterruptUDPPortListening()
SGridWorkerNodeImpl * m_Worker
unique_ptr< CWNJobWatcher > m_JobWatcher
const CTime & GetStartTime() const
static CGridGlobals & GetInstance()
Grid Worker Node.
@ eNormalShutdown
Normal shutdown was requested.
@ eShutdownImmediate
Urgent shutdown was requested.
CProcess –.
CSafeStatic<>::
T & Get(void)
Create the variable if not created yet, return the reference.
virtual void Notify(const CWorkerNodeJobContext &job, EEvent event)
CMutex m_ActiveJobsMutex
void CheckForInfiniteLoop()
virtual ~CWNJobWatcher()
void x_KillNode(CGridWorkerNode)
unsigned int m_JobsStarted
unsigned int m_MaxFailuresAllowed
unsigned int m_InfiniteLoopTime
unsigned int m_JobsLost
unsigned int m_JobsFailed
unsigned int m_JobsRescheduled
unsigned int m_JobsSucceeded
void Print(CNcbiOstream &os) const
unsigned int m_JobsCanceled
unsigned int m_JobsReturned
unsigned int m_MaxJobsAllowed
TActiveJobs m_ActiveJobs
Worker Node job context.
void erase(iterator pos)
Definition: map.hpp:167
size_type size() const
Definition: map.hpp:148
#define false
Definition: bool.h:36
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
#define NULL
Definition: ncbistd.hpp:225
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
Definition: ncbicntr.hpp:278
CDiagContext_Extra & Print(const string &name, const string &value)
The method does not print the argument, but adds it to the string.
Definition: ncbidiag.cpp:2622
#define LOG_POST_X(err_subcode, message)
Definition: ncbidiag.hpp:553
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:818
CDiagContext_Extra Extra(void) const
Create a temporary CDiagContext_Extra object.
Definition: ncbidiag.hpp:2095
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
Definition: ncbidiag.hpp:550
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
CGridWorkerNode GetWorkerNode() const
void ReturnJob(const CNetScheduleJob &job)
Switch the job back to the "Pending" status so that it can be run again on a different worker node.
CNetScheduleExecutor GetNSExecutor() const
void PutFailure(const CNetScheduleJob &job, bool no_retries=false)
Submit job failure diagnostics.
Uint8 GetTotalMemoryLimit() const
Get total memory limit (automatic restart if node grows more than that)
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
static TPid GetPid(void)
Get process identifier (pid) for the current process.
size_t total
Total memory usage.
bool Kill(unsigned long timeout=kDefaultKillTimeout)
Terminate process.
static bool GetMemoryUsage(SMemoryUsage &usage)
Get current process memory usage.
pid_t TPid
Process identifier (PID) and process handle.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
EIO_Status Send(const void *data, size_t datalen, const string &host=string(), unsigned short port=0)
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
Definition: ncbistr.cpp:3944
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
string AsString(const CTimeFormat &format=kEmptyStr, TSeconds out_tz=eCurrentTimeZone) const
Transform time to string.
Definition: ncbitime.cpp:1512
CTime GetFastLocalTime(void)
Quick and dirty getter of local time.
Definition: ncbitime.cpp:4167
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
Definition of all error codes used in connect services library (xconnserv.lib and others).
Static variables safety - create on demand, destroy on application termination.
Defines NCBI C++ diagnostic APIs, classes, and macros.
Multi-threading – mutexes; rw-locks; semaphore.
#define count
Job description.
Process memory usage information, in bytes.
#define _ASSERT
Modified on Fri Sep 20 14:57:27 2024 by modify_doxy.py rev. 669887