NCBI C++ ToolKit
wn_main_loop.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: wn_main_loop.cpp 94826 2021-09-15 19:17:14Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Maxim Didenko, Anatoliy Kuznetsov, Dmitry Kazimirov
27  *
28  * File Description:
29  * NetSchedule Worker Node implementation
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "wn_commit_thread.hpp"
35 #include "wn_cleanup.hpp"
36 #include "grid_worker_impl.hpp"
37 #include "netschedule_api_impl.hpp"
38 
42 
43 
44 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
45 
47 
49 {
50  return m_Impl->m_Job;
51 }
52 
54 {
55  return m_Impl->m_Job;
56 }
57 
58 const string& CWorkerNodeJobContext::GetJobKey() const
59 {
60  return m_Impl->m_Job.job_id;
61 }
62 
64 {
65  return m_Impl->m_Job.input;
66 }
67 
69 {
70  m_Impl->m_Job.output = output;
71 }
72 
74 {
75  m_Impl->m_Job.ret_code = ret_code;
76 }
77 
79 {
80  return m_Impl->m_InputBlobSize;
81 }
82 
84 {
85  return m_Impl->m_Job.output;
86 }
87 
89 {
90  return m_Impl->m_Job.mask;
91 }
92 
94 {
95  return m_Impl->m_JobNumber;
96 }
97 
99 {
100  return m_Impl->m_JobCommitStatus != eCS_NotCommitted;
101 }
102 
105 {
106  return m_Impl->m_JobCommitStatus;
107 }
108 
110 {
111  return m_Impl->m_JobCommitStatus == eCS_JobIsLost;
112 }
113 
115 {
116  return m_Impl->m_CleanupEventSource;
117 }
118 
120 {
121  return m_Impl->m_WorkerNode;
122 }
123 
125  SGridWorkerNodeImpl* worker_node) :
126  m_WorkerNode(worker_node),
127  m_CleanupEventSource(
128  new CWorkerNodeJobCleanup(worker_node->m_CleanupEventSource)),
129  m_RequestContext(new CRequestContext),
130  m_StatusThrottler(1, CTimeSpan(worker_node->m_CheckStatusPeriod, 0)),
131  m_ProgressMsgThrottler(1),
132  m_NetScheduleExecutor(worker_node->m_NSExecutor),
133  m_NetCacheAPI(worker_node->m_NetCacheAPI),
134  m_JobGeneration(0),
135  m_CommitExpiration(0, 0),
136  m_Deadline(0, 0)
137 {
138 }
139 
141 {
142  return m_Impl->m_WorkerNode->GetQueueName();
143 }
145 {
146  return m_Impl->m_WorkerNode->GetClientName();
147 }
148 
150 {
152 }
153 
155 {
156  return m_Impl->GetIStream();
157 }
158 
160 {
162 }
163 
165 {
166  return m_Impl->GetOStream();
167 }
168 
170 {
171  try {
172  m_Impl->m_ProgressMsgThrottler.Reset(1);
173  m_Impl->m_StatusThrottler.Reset(1,
174  CTimeSpan(m_Impl->m_WorkerNode->m_CheckStatusPeriod, 0));
175 
176  m_Impl->m_GridRead.Reset();
177  m_Impl->m_GridWrite.Reset();
178  }
179  NCBI_CATCH_ALL_X(61, "Could not close IO streams");
180 }
181 
183 {
184  m_Impl->CheckIfJobIsLost();
185  m_Impl->m_JobCommitStatus = eCS_Done;
186 }
187 
189  bool no_retries)
190 {
191  m_Impl->CheckIfJobIsLost();
192  m_Impl->m_JobCommitStatus = eCS_Failure;
193  m_Impl->m_DisableRetries = no_retries;
194  m_Impl->m_Job.error_msg = err_msg;
195 }
196 
198 {
199  m_Impl->CheckIfJobIsLost();
200  m_Impl->m_JobCommitStatus = eCS_Return;
201 }
202 
204  const string& affinity, const string& group)
205 {
206  m_Impl->CheckIfJobIsLost();
207  m_Impl->m_JobCommitStatus = eCS_Reschedule;
208  m_Impl->m_Job.affinity = affinity;
209  m_Impl->m_Job.group = group;
210 }
211 
213  bool send_immediately,
214  bool overwrite)
215 {
216  m_Impl->PutProgressMessage(msg, send_immediately, overwrite);
217 }
218 
220  bool send_immediately,
221  bool overwrite)
222 {
224  if (!send_immediately &&
226  ERR_POST(Warning << "Progress message \"" <<
227  msg << "\" has been suppressed.");
228  return;
229  }
230 
232  LOG_POST(m_Job.job_id << " progress: " <<
234  }
235 
236  try {
237  if (!overwrite) {
238  // Cached progress_msg (blob ID) might be outdated, refreshing it
240 
241  if (!m_Job.progress_msg.empty()) {
242  return;
243  }
244  }
245 
247  m_NetCacheAPI.PutData(m_Job.progress_msg, msg.data(), msg.length());
248  } else {
249  m_Job.progress_msg = m_NetCacheAPI.PutData(msg.data(), msg.length());
250  }
251 
252  // Cached progress_msg (blob ID) might differ from the actual, syncing them
254  }
255  catch (exception& ex) {
256  ERR_POST_X(6, "Couldn't send a progress message: " << ex.what());
257  }
258 }
259 
261 {
262  m_Impl->CheckIfJobIsLost();
263  m_Impl->JobDelayExpiration(runtime_inc);
264 }
265 
267 {
268  try {
270  }
271  catch (exception& ex) {
272  ERR_POST_X(8, "CWorkerNodeJobContext::JobDelayExpiration: " <<
273  ex.what());
274  }
275 }
276 
278 {
279  return m_Impl->m_WorkerNode->m_LogRequested;
280 }
281 
283 {
284  return m_Impl->GetShutdownLevel();
285 }
286 
288 {
290  try {
291  ENetScheduleQueuePauseMode pause_mode;
292  CNetScheduleAPI::EJobStatus job_status =
294  switch (job_status) {
296  if (pause_mode == eNSQ_WithPullback) {
297  auto default_timeout = m_WorkerNode->m_SuspendResume.GetLock()->GetDefaultPullbackTimeout();
298  m_WorkerNode->m_SuspendResume.GetLock()->SetJobPullbackTimer(default_timeout);
299  LOG_POST("Pullback request from the server, "
300  "(default) pullback timeout=" <<
302  }
303  /* FALL THROUGH */
304 
306  // NetSchedule will still allow to commit this job.
307  break;
308 
310  LOG_POST(Warning << "Job " << m_Job.job_id << " has been canceled");
311  MarkJobAsLost();
313 
314  default:
315  // The worker node does not "own" the job any longer.
316  ERR_POST("Cannot proceed with job processing: job '" <<
317  m_Job.job_id << "' changed status to '" <<
318  CNetScheduleAPI::StatusToString(job_status) << "'.");
319  MarkJobAsLost();
321  }
322  }
323  catch(exception& ex) {
324  ERR_POST("Cannot retrieve job status for " << m_Job.job_id <<
325  ": " << ex.what());
326  }
327 
329  m_WorkerNode->m_SuspendResume.GetLock()->IsJobPullbackTimerExpired()) {
330  LOG_POST("Pullback timeout for " << m_Job.job_id);
332  }
333 
335 }
336 
338 {
341  "Job " << m_Job.job_id << " has been canceled");
342  }
343 }
344 
346 {
348 
350  m_DisableRetries = false;
351  m_InputBlobSize = 0;
354 
357 }
358 
360 {
361  if (!m_Impl->m_ExclusiveJob) {
362  if (!m_Impl->m_WorkerNode->EnterExclusiveMode()) {
364  eExclusiveModeIsAlreadySet, "");
365  }
366  m_Impl->m_ExclusiveJob = true;
367  }
368 }
369 
372 {
373  switch (commit_status) {
374  case eCS_Done:
375  return "done";
376  case eCS_Failure:
377  return "failed";
378  case eCS_Return:
379  return "returned";
380  case eCS_Reschedule:
381  return "rescheduled";
382  case eCS_JobIsLost:
383  return "lost";
384  default:
385  return "not committed";
386  }
387 }
388 
390 {
392 
396  m_Job.ret_code == 0 ? 200 : 500);
397 
400 
403 }
404 
406 {
407  CWorkerNodeJobContext this_job_context(this);
408 
409  m_RequestContext->SetRequestID((int) this_job_context.GetJobNumber());
410 
411  if (!m_Job.client_ip.empty())
413 
414  if (!m_Job.session_id.empty())
416 
417  if (!m_Job.page_hit_id.empty())
419 
421 
422  CRequestContextSwitcher request_state_guard(m_RequestContext);
423 
425  auto extra = GetDiagContext().PrintRequestStart();
426  extra.Print("jid", m_Job.job_id);
427 
429  if (key.ParseJobKey(m_Job.job_id)) extra.Print("_queue", key.queue);
430  }
431 
433 
434  CJobRunRegistration client_ip_registration, session_id_registration;
435 
436  if (!m_Job.client_ip.empty() &&
438  &client_ip_registration)) {
439  ERR_POST("Too many jobs with client IP \"" <<
440  m_Job.client_ip << "\"; job " <<
441  m_Job.job_id << " will be returned.");
443  } else if (!m_Job.session_id.empty() &&
445  &session_id_registration)) {
446  ERR_POST("Too many jobs with session ID \"" <<
447  m_Job.session_id << "\"; job " <<
448  m_Job.job_id << " will be returned.");
450  } else {
451  m_WorkerNode->x_NotifyJobWatchers(this_job_context,
453 
454  try {
455  m_Job.ret_code =
456  m_WorkerNode->GetJobProcessor()->Do(this_job_context);
457  }
458  catch (CGridWorkerNodeException& ex) {
459  switch (ex.GetErrCode()) {
461  break;
462 
465  LOG_POST_X(21, "Job " << m_Job.job_id <<
466  " will be returned back to the queue "
467  "because it requested exclusive mode while "
468  "another exclusive job is already running.");
469  }
470  if (m_JobCommitStatus ==
473  break;
474 
475  default:
476  ERR_POST_X(62, ex);
477  if (m_JobCommitStatus ==
480  }
481  }
482  catch (CNetScheduleException& e) {
483  ERR_POST_X(20, "job " << m_Job.job_id << " failed: " << e);
485  ERR_POST("Cannot proceed with job processing: job '" <<
486  m_Job.job_id << "' has expired.");
487  MarkJobAsLost();
488  } else if (m_JobCommitStatus ==
491  m_Job.error_msg = e.what();
492  }
493  }
494  catch (exception& e) {
495  ERR_POST_X(18, "job " << m_Job.job_id << " failed: " << e.what());
498  m_Job.error_msg = e.what();
499  }
500  }
501 
502  this_job_context.CloseStreams();
503 
504  switch (m_JobCommitStatus) {
506  m_WorkerNode->x_NotifyJobWatchers(this_job_context,
508  break;
509 
511  if (TWorkerNode_AllowImplicitJobReturn::GetDefault() ||
512  this_job_context.GetShutdownLevel() !=
515  m_WorkerNode->x_NotifyJobWatchers(this_job_context,
517  break;
518  }
519 
521  m_Job.error_msg = "Job was not explicitly committed";
522  /* FALL THROUGH */
523 
525  m_WorkerNode->x_NotifyJobWatchers(this_job_context,
527  break;
528 
530  m_WorkerNode->x_NotifyJobWatchers(this_job_context,
532  break;
533 
535  m_WorkerNode->x_NotifyJobWatchers(this_job_context,
537  break;
538 
539  default: // eCanceled - no action needed.
540  // This object will be recycled in x_CommitJob().
541  break;
542  }
543 
544  m_WorkerNode->x_NotifyJobWatchers(this_job_context,
546  }
547 
550 
551  if (!CGridGlobals::GetInstance().IsShuttingDown())
553 
555  request_state_guard);
556 }
557 
559 {
560  const auto kRetryDelay = static_cast<unsigned long>(TServConn_RetryDelay::GetDefault() * kMilliSecondsPerSecond);
562  CDeadline max_wait_for_servers(
563  TWorkerNode_MaxWaitForServers::GetDefault());
564 
565  CWorkerNodeJobContext job_context(
567 
568  const auto total_time_limit = m_WorkerNode->m_TotalTimeLimit;
569  CDeadline deadline(total_time_limit ? CDeadline(total_time_limit) : CDeadline::eInfinite);
571  unsigned try_count = 0;
572  while (!CGridGlobals::GetInstance().IsShuttingDown()) {
573  try {
574  try {
577  }
578  catch (CBlockingQueueException&) {
579  // threaded pool is busy
580  continue;
581  }
582 
583  if (x_GetNextJob(job_context->m_Job, deadline)) {
584  job_context->ResetJobContext();
585 
586  try {
588  new CWorkerNodeRequest(job_context)));
589  }
590  catch (CBlockingQueueException& ex) {
591  ERR_POST_X(28, ex);
592  // that must not happen after CBlockingQueue is fixed
593  _ASSERT(0);
594  job_context->m_JobCommitStatus =
597  RecycleJobContextAndCommitJob(job_context, no_op);
598  }
599  job_context =
601 
602  } else if (deadline.IsExpired()) {
603  LOG_POST("The total runtime limit (" << total_time_limit << " seconds) has been reached");
604  const auto kExitCode = 100; // See also one in grid_globals.cpp
606  break;
607 
608  }
609  max_wait_for_servers =
610  CDeadline(TWorkerNode_MaxWaitForServers::GetDefault());
611  }
612  catch (CNetSrvConnException& e) {
613  SleepMilliSec(kRetryDelay);
615  !max_wait_for_servers.GetRemainingTime().IsZero())
616  continue;
617  ERR_POST(Critical << "Could not connect to the "
618  "configured servers, exiting...");
621  }
622  catch (CNetServiceException& ex) {
623  ERR_POST_X(40, ex);
624  if (++try_count >= TServConn_ConnMaxRetries::GetDefault()) {
627  } else {
628  SleepMilliSec(kRetryDelay);
629  continue;
630  }
631  }
632  catch (exception& ex) {
633  ERR_POST_X(29, ex.what());
634  if (TWorkerNode_StopOnJobErrors::GetDefault()) {
637  }
638  }
639  try_count = 0;
640  }
641 
642  return NULL;
643 }
644 
645 
647 {
648  EState ret = eWorking;
649 
650  while (!CGridGlobals::GetInstance().IsShuttingDown()) {
653  return ret;
655  ret = eRestarted;
656  break;
658  break;
659  }
660 
662  m_NotificationHandler.WaitForNotification(m_Timeout);
663  }
664 
665  return eStopped;
666 }
667 
669 {
671  m_NotificationHandler.ReceiveNotification())
672  return x_ProcessRequestJobNotification();
673 
674  return CNetServer();
675 }
676 
678 {
680  m_NotificationHandler.WaitForNotification(deadline)) {
681  return x_ProcessRequestJobNotification();
682  }
683 
684  return CNetServer();
685 }
686 
688 {
689  CNetServer server;
690 
691  // No need to check state here, it will be checked before entry processing
693  m_NotificationHandler.CheckRequestJobNotification(
694  m_WorkerNode->m_NSExecutor, &server);
695 
696  return server;
697 }
698 
700 {
701  return true;
702 }
703 
705  SEntry& entry,
706  const string& prio_aff_list,
707  bool any_affinity,
708  CNetScheduleJob& job,
709  CNetScheduleAPI::EJobStatus* /*job_status*/)
710 {
711  CNetServer server(m_API.GetService()->GetServer(entry.server_address));
713  m_Timeout, prio_aff_list, any_affinity, job);
714 }
715 
717 {
718  m_WorkerNode->m_NSExecutor->ReturnJob(job, false);
719 }
720 
722 {
723  if (!m_WorkerNode->x_AreMastersBusy()) {
725  return false;
726  }
727 
729  return false;
730 
731  const bool any_affinity = m_Impl.m_API->m_AffinityLadder.empty();
732 
733  if (m_Timeline.GetJob(deadline, job, NULL, any_affinity) != CNetScheduleGetJob::eJob) {
734  return false;
735  }
736 
737  // Already executing this job, so do nothing
738  // (and rely on that execution to report its result later)
739  if (!m_WorkerNode->m_JobsInProgress.Add(job)) {
740  ERR_POST(Warning << "Got already processing job " << job.job_id);
741  return false;
742  }
743 
747  return false;
748  }
749  }
750 
751  // No need to check for idleness here, running jobs won't be stopped anyway
752  if (CGridGlobals::GetInstance().IsShuttingDown()) {
754  return false;
755  }
756 
757  return true;
758 }
759 
761 {
762  return m_Impl->m_QueueEmbeddedOutputSize;
763 }
764 
CDeadline.
Definition: ncbitime.hpp:1830
unsigned int GetNewJobNumber()
CNetScheduleAdmin::EShutdownLevel GetShutdownLevel(void)
Check if shutdown was requested.
void RequestShutdown(CNetScheduleAdmin::EShutdownLevel level)
Request node shutdown.
static CGridGlobals & GetInstance()
Grid Worker Node.
void RecycleJobContextAndCommitJob(SWorkerNodeJobContextImpl *job_context, CRequestContextSwitcher &rctx_switcher)
CWorkerNodeJobContext AllocJobContext()
bool MoreJobs(const SEntry &entry)
CNetServer ReadNotifications()
bool CheckEntry(SEntry &entry, const string &prio_aff_list, bool any_affinity, CNetScheduleJob &job, CNetScheduleAPI::EJobStatus *job_status)
void ReturnJob(CNetScheduleJob &job)
CNetServer x_ProcessRequestJobNotification()
CNetServer WaitForNotifications(const CDeadline &deadline)
SGridWorkerNodeImpl * m_WorkerNode
const string m_ThreadName
CNetScheduleGetJobImpl< CImpl > m_Timeline
bool x_GetNextJob(CNetScheduleJob &job, const CDeadline &deadline)
SGridWorkerNodeImpl * m_WorkerNode
virtual void * Main()
Derived (user-created) class must provide a real thread function.
EShutdownLevel
Shutdown level.
@ eNormalShutdown
Normal shutdown was requested.
@ eNoShutdown
No Shutdown was requested.
@ eShutdownImmediate
Urgent shutdown was requested.
NetSchedule internal exception.
Net Service exception.
Net Service exception.
bool CountJob(const string &job_group, CJobRunRegistration *job_run_registration)
CTimeSpan.
Definition: ncbitime.hpp:1313
virtual void CallEventHandlers()
Definition: wn_cleanup.cpp:57
Worker Node job context.
Clean-up event source for the worker node.
Definition: grid_worker.hpp:98
static SQLCHAR output[256]
Definition: print.c:5
bool g_IsRequestStopEventEnabled()
bool g_IsRequestStartEventEnabled()
NetSchedule worker node application.
#define NULL
Definition: ncbistd.hpp:225
void PrintRequestStop(void)
Print request stop message (for request-driven applications)
Definition: ncbidiag.cpp:2778
#define LOG_POST_X(err_subcode, message)
Definition: ncbidiag.hpp:553
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:818
void SetAppState(EDiagAppState state)
void PrintRequestStart(const string &message)
Print request start message (for request-driven applications)
Definition: ncbidiag.cpp:2762
void SetRequestID(TCount rid)
Set request ID.
void SetSessionID(const string &session)
bool IsSetRequestStatus(void) const
void SetClientIP(const string &client)
void SetRequestStatus(int status)
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
Definition: ncbidiag.hpp:550
void SetHitID(const string &hit)
Set explicit hit id. The id is reset on request end.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
void Reset(void)
Reset all properties to the initial state.
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
EDiagAppState GetAppState(void) const
Application state.
@ eDiagAppState_RequestEnd
RE.
Definition: ncbidiag.hpp:796
@ eDiagAppState_RequestBegin
RB.
Definition: ncbidiag.hpp:794
@ eDiagAppState_Request
R.
Definition: ncbidiag.hpp:795
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1203
#define NCBI_CATCH_ALL_X(err_subcode, message)
Definition: ncbiexpt.hpp:619
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
Definition: ncbiexpt.hpp:719
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
Definition: ncbiexpt.cpp:342
CCompoundIDPool GetCompoundIDPool()
static bool IsValidKey(const char *key_str, size_t key_len, CCompoundIDPool::TInstance id_pool=NULL)
string PutData(const void *buf, size_t size, const CNamedParameterList *optional=NULL)
Put BLOB to server.
void CommitJob()
Confirm that a job is done and result is ready to be sent back to the client.
CGridWorkerNode GetWorkerNode() const
CNcbiOstream & GetOStream()
Get a stream where a job can write its result.
size_t GetInputBlobSize() const
Get the size of an input stream.
void CommitJobWithFailure(const string &err_msg, bool no_retries=false)
Confirm that a job is finished, but an error has happened during its execution.
EJobStatus
Job status codes.
const string & GetQueueName() const
Get a name of a queue where this node is connected to.
string output
Job result data.
virtual int Do(CWorkerNodeJobContext &context)=0
Execute the job.
bool IsJobCommitted() const
CNetScheduleAPI::EJobStatus GetJobStatus(const CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get the current status of the specified job.
const string & GetJobOutput() const
size_t GetServerOutputSize()
IWorkerNodeCleanupEventSource * GetCleanupEventSource()
CNetScheduleAdmin::EShutdownLevel GetShutdownLevel()
Check if job processing must be aborted.
void PutProgressMessage(const string &msg, bool send_immediately=false, bool overwrite=true)
Put progress message.
void ReturnJob(const CNetScheduleJob &job)
Switch the job back to the "Pending" status so that it can be run again on a different worker node.
const CNetScheduleJob & GetJob() const
Get the associated job structure to access all of its fields.
void ReturnJob()
Schedule the job for return.
bool IsLogRequested() const
Check if logging was requested in config file.
CNetScheduleAPI::TJobMask GetJobMask() const
@ eNSQ_WithPullback
CNetScheduleAPI::TJobMask mask
int ret_code
Job return code.
CNetRef< SWorkerNodeJobContextImpl > m_Impl
static string StatusToString(EJobStatus status)
Printable status type.
string input
Input data.
void SetJobRetCode(int ret_code)
Set the return code of the job.
void SetJobOutput(const string &output)
Set a job's output.
void JobDelayExpiration(const CNetScheduleJob &job, unsigned runtime_inc)
Increment job execution timeout.
const string & GetJobKey() const
Get a job key.
const string & GetClientName() const
Get a node name.
void PutProgressMsg(const CNetScheduleJob &job)
Put job interim (progress) message.
unsigned int GetJobNumber() const
void GetProgressMsg(CNetScheduleJob &job)
Get progress message.
void RequestExclusiveMode()
Instruct the system that this job requires all system's resources If this method is call,...
CNcbiIstream & GetIStream()
Get a stream with input data for a job.
void JobDelayExpiration(unsigned runtime_inc)
Increment job execution timeout.
string job_id
Output job key.
void RescheduleJob(const string &affinity, const string &group=kEmptyStr)
Put the job back in the queue with the specified affinity and group.
const string & GetJobInput() const
Get a job input string.
ECommitStatus GetCommitStatus() const
static const char * GetCommitStatusDescription(ECommitStatus commit_status)
@ eCanceled
Explicitly canceled.
@ eRunning
Running on a worker node.
@ ePending
Waiting for execution.
@ eExclusiveJob
Exclusive job - the node executes only this job, even if there are processor resources.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
IO_PREFIX::istream CNcbiIstream
Portable alias for istream.
Definition: ncbistre.hpp:146
static string TruncateSpaces(const string &str, ETrunc where=eTrunc_Both)
Truncate spaces in a string.
Definition: ncbistr.cpp:3186
@ eTrunc_End
Truncate trailing spaces only.
Definition: ncbistr.hpp:2241
void WaitForRoom(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0)
Wait for the room in the queue up to timeout_sec + timeout_nsec/1E9 seconds.
TItemHandle AcceptRequest(const TRequest &request, TUserPriority priority=0, unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Put a request in the queue with a given priority.
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
Definition: ncbithr.cpp:958
CNanoTimeout GetRemainingTime(void) const
Get time left to the expiration.
Definition: ncbitime.cpp:3859
bool IsExpired(void) const
Check if the deadline is expired.
Definition: ncbitime.hpp:1855
const long kMilliSecondsPerSecond
Number milliseconds in one second.
Definition: ncbitime.hpp:96
bool IsZero() const
Definition: ncbitime.cpp:3476
@ eInfinite
Infinite deadline.
Definition: ncbitime.hpp:1834
bool Approve(EThrottleAction action=eDefault)
Approve a request.
@ eErrCode
Return immediately with err code == FALSE.
const struct ncbi::grid::netcache::search::fields::KEY key
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
void SleepSec(unsigned long sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Sleep.
ENetScheduleQueuePauseMode
Defines whether the job queue is paused, and if so, defines the pause mode set by the administrator.
const CNSPreciseTime default_timeout(3600, 0)
Job description.
Meaningful information encoded in the NetSchedule key.
bool Add(const CNetScheduleJob &job)
IWorkerNodeJob * GetJobProcessor()
SJobsInProgress m_JobsInProgress
CRef< CJobCommitterThread > m_JobCommitterThread
CStdPoolOfThreads * m_ThreadPool
void x_NotifyJobWatchers(const CWorkerNodeJobContext &job_context, IWorkerNodeJobWatcher::EEvent event)
CRunningJobLimit m_JobsPerSessionID
bool IsExclusiveMode() const
bool x_AreMastersBusy() const
SThreadSafe< SSuspendResume > m_SuspendResume
bool WaitForExclusiveJobToFinish()
CNetScheduleExecutor m_NSExecutor
CRunningJobLimit m_JobsPerClientIP
CNetScheduleGetJob::TAffinityLadder m_AffinityLadder
void ReturnJob(const CNetScheduleJob &job, bool blacklist=true)
bool x_GetJobWithAffinityLadder(SNetServerImpl *server, const CDeadline &timeout, const string &prio_aff_list, bool any_affinity, CNetScheduleJob &job)
EState CheckState() volatile
unsigned GetCurrentJobGeneration() const volatile
SLock< TType > GetLock()
CRequestRateControl m_ProgressMsgThrottler
CNcbiIstream & GetIStream()
CNcbiOstream & GetOStream()
virtual void JobDelayExpiration(unsigned runtime_inc)
CRef< CRequestContext > m_RequestContext
CNetScheduleExecutor m_NetScheduleExecutor
virtual CNetScheduleAdmin::EShutdownLevel GetShutdownLevel()
CRef< CWorkerNodeCleanup > m_CleanupEventSource
CWorkerNodeJobContext::ECommitStatus m_JobCommitStatus
virtual void PutProgressMessage(const string &msg, bool send_immediately, bool overwrite)
SGridWorkerNodeImpl * m_WorkerNode
CRequestRateControl m_StatusThrottler
SWorkerNodeJobContextImpl(SGridWorkerNodeImpl *worker_node)
#define _ASSERT
Modified on Thu May 23 12:27:58 2024 by modify_doxy.py rev. 669887