44 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
60 return m_Impl->m_Job.job_id;
65 return m_Impl->m_Job.input;
75 m_Impl->m_Job.ret_code = ret_code;
80 return m_Impl->m_InputBlobSize;
85 return m_Impl->m_Job.output;
95 return m_Impl->m_JobNumber;
106 return m_Impl->m_JobCommitStatus;
116 return m_Impl->m_CleanupEventSource;
121 return m_Impl->m_WorkerNode;
126 m_WorkerNode(worker_node),
127 m_CleanupEventSource(
130 m_StatusThrottler(1,
CTimeSpan(worker_node->m_CheckStatusPeriod, 0)),
131 m_ProgressMsgThrottler(1),
132 m_NetScheduleExecutor(worker_node->m_NSExecutor),
133 m_NetCacheAPI(worker_node->m_NetCacheAPI),
135 m_CommitExpiration(0, 0),
142 return m_Impl->m_WorkerNode->GetQueueName();
146 return m_Impl->m_WorkerNode->GetClientName();
156 return m_Impl->GetIStream();
166 return m_Impl->GetOStream();
172 m_Impl->m_ProgressMsgThrottler.Reset(1);
173 m_Impl->m_StatusThrottler.Reset(1,
176 m_Impl->m_GridRead.Reset();
177 m_Impl->m_GridWrite.Reset();
184 m_Impl->CheckIfJobIsLost();
191 m_Impl->CheckIfJobIsLost();
193 m_Impl->m_DisableRetries = no_retries;
194 m_Impl->m_Job.error_msg = err_msg;
199 m_Impl->CheckIfJobIsLost();
204 const string& affinity,
const string& group)
206 m_Impl->CheckIfJobIsLost();
208 m_Impl->m_Job.affinity = affinity;
209 m_Impl->m_Job.group = group;
213 bool send_immediately,
216 m_Impl->PutProgressMessage(
msg, send_immediately, overwrite);
220 bool send_immediately,
224 if (!send_immediately &&
227 msg <<
"\" has been suppressed.");
255 catch (exception& ex) {
256 ERR_POST_X(6,
"Couldn't send a progress message: " << ex.what());
262 m_Impl->CheckIfJobIsLost();
263 m_Impl->JobDelayExpiration(runtime_inc);
271 catch (exception& ex) {
272 ERR_POST_X(8,
"CWorkerNodeJobContext::JobDelayExpiration: " <<
279 return m_Impl->m_WorkerNode->m_LogRequested;
284 return m_Impl->GetShutdownLevel();
294 switch (job_status) {
299 LOG_POST(
"Pullback request from the server, "
300 "(default) pullback timeout=" <<
316 ERR_POST(
"Cannot proceed with job processing: job '" <<
323 catch(exception& ex) {
361 if (!
m_Impl->m_ExclusiveJob) {
362 if (!
m_Impl->m_WorkerNode->EnterExclusiveMode()) {
364 eExclusiveModeIsAlreadySet,
"");
366 m_Impl->m_ExclusiveJob =
true;
373 switch (commit_status) {
381 return "rescheduled";
385 return "not committed";
438 &client_ip_registration)) {
439 ERR_POST(
"Too many jobs with client IP \"" <<
445 &session_id_registration)) {
446 ERR_POST(
"Too many jobs with session ID \"" <<
466 " will be returned back to the queue "
467 "because it requested exclusive mode while "
468 "another exclusive job is already running.");
485 ERR_POST(
"Cannot proceed with job processing: job '" <<
494 catch (exception& e) {
511 if (TWorkerNode_AllowImplicitJobReturn::GetDefault() ||
555 request_state_guard);
560 const auto kRetryDelay =
static_cast<unsigned long>(TServConn_RetryDelay::GetDefault() *
kMilliSecondsPerSecond);
563 TWorkerNode_MaxWaitForServers::GetDefault());
571 unsigned try_count = 0;
597 RecycleJobContextAndCommitJob(job_context, no_op);
603 LOG_POST(
"The total runtime limit (" << total_time_limit <<
" seconds) has been reached");
604 const auto kExitCode = 100;
609 max_wait_for_servers =
610 CDeadline(TWorkerNode_MaxWaitForServers::GetDefault());
618 "configured servers, exiting...");
624 if (++try_count >= TServConn_ConnMaxRetries::GetDefault()) {
632 catch (exception& ex) {
634 if (TWorkerNode_StopOnJobErrors::GetDefault()) {
662 m_NotificationHandler.WaitForNotification(
m_Timeout);
671 m_NotificationHandler.ReceiveNotification())
672 return x_ProcessRequestJobNotification();
680 m_NotificationHandler.WaitForNotification(deadline)) {
681 return x_ProcessRequestJobNotification();
693 m_NotificationHandler.CheckRequestJobNotification(
706 const string& prio_aff_list,
713 m_Timeout, prio_aff_list, any_affinity, job);
762 return m_Impl->m_QueueEmbeddedOutputSize;
unsigned int GetNewJobNumber()
CNetScheduleAdmin::EShutdownLevel GetShutdownLevel(void)
Check if shutdown was requested.
void RequestShutdown(CNetScheduleAdmin::EShutdownLevel level)
Request node shutdown.
static CGridGlobals & GetInstance()
void RecycleJobContextAndCommitJob(SWorkerNodeJobContextImpl *job_context, CRequestContextSwitcher &rctx_switcher)
CWorkerNodeJobContext AllocJobContext()
bool MoreJobs(const SEntry &entry)
CNetServer ReadNotifications()
bool CheckEntry(SEntry &entry, const string &prio_aff_list, bool any_affinity, CNetScheduleJob &job, CNetScheduleAPI::EJobStatus *job_status)
void ReturnJob(CNetScheduleJob &job)
CNetServer x_ProcessRequestJobNotification()
CNetServer WaitForNotifications(const CDeadline &deadline)
SGridWorkerNodeImpl * m_WorkerNode
const string m_ThreadName
CNetScheduleGetJobImpl< CImpl > m_Timeline
bool x_GetNextJob(CNetScheduleJob &job, const CDeadline &deadline)
SGridWorkerNodeImpl * m_WorkerNode
virtual void * Main()
Derived (user-created) class must provide a real thread function.
EShutdownLevel
Shutdown level.
@ eNormalShutdown
Normal shutdown was requested.
@ eNoShutdown
No Shutdown was requested.
@ eShutdownImmediate
Urgent shutdown was requested.
NetSchedule internal exception.
bool CountJob(const string &job_group, CJobRunRegistration *job_run_registration)
virtual void CallEventHandlers()
Clean-up event source for the worker node.
static SQLCHAR output[256]
bool g_IsRequestStopEventEnabled()
bool g_IsRequestStartEventEnabled()
NetSchedule worker node application.
void PrintRequestStop(void)
Print request stop message (for request-driven applications)
#define LOG_POST_X(err_subcode, message)
CDiagContext & GetDiagContext(void)
Get diag context instance.
void SetAppState(EDiagAppState state)
void PrintRequestStart(const string &message)
Print request start message (for request-driven applications)
void SetRequestID(TCount rid)
Set request ID.
void SetSessionID(const string &session)
bool IsSetRequestStatus(void) const
void SetClientIP(const string &client)
void SetRequestStatus(int status)
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
void SetHitID(const string &hit)
Set explicit hit id. The id is reset on request end.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
void Reset(void)
Reset all properties to the initial state.
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
EDiagAppState GetAppState(void) const
Application state.
@ eDiagAppState_RequestEnd
RE.
@ eDiagAppState_RequestBegin
RB.
@ eDiagAppState_Request
R.
void Critical(CExceptionArgs_Base &args)
#define NCBI_CATCH_ALL_X(err_subcode, message)
TErrCode GetErrCode(void) const
Get error code.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
void Warning(CExceptionArgs_Base &args)
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
CCompoundIDPool GetCompoundIDPool()
static bool IsValidKey(const char *key_str, size_t key_len, CCompoundIDPool::TInstance id_pool=NULL)
string PutData(const void *buf, size_t size, const CNamedParameterList *optional=NULL)
Put BLOB to server.
void CommitJob()
Confirm that a job is done and result is ready to be sent back to the client.
CGridWorkerNode GetWorkerNode() const
CNcbiOstream & GetOStream()
Get a stream where a job can write its result.
size_t GetInputBlobSize() const
Get the size of an input stream.
void CommitJobWithFailure(const string &err_msg, bool no_retries=false)
Confirm that a job is finished, but an error has happened during its execution.
EJobStatus
Job status codes.
const string & GetQueueName() const
Get a name of a queue where this node is connected to.
string output
Job result data.
virtual int Do(CWorkerNodeJobContext &context)=0
Execute the job.
bool IsJobCommitted() const
CNetScheduleAPI::EJobStatus GetJobStatus(const CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get the current status of the specified job.
const string & GetJobOutput() const
size_t GetServerOutputSize()
IWorkerNodeCleanupEventSource * GetCleanupEventSource()
CNetScheduleAdmin::EShutdownLevel GetShutdownLevel()
Check if job processing must be aborted.
void PutProgressMessage(const string &msg, bool send_immediately=false, bool overwrite=true)
Put progress message.
void ReturnJob(const CNetScheduleJob &job)
Switch the job back to the "Pending" status so that it can be run again on a different worker node.
const CNetScheduleJob & GetJob() const
Get the associated job structure to access all of its fields.
void ReturnJob()
Schedule the job for return.
bool IsLogRequested() const
Check if logging was requested in config file.
CNetScheduleAPI::TJobMask GetJobMask() const
CNetScheduleAPI::TJobMask mask
int ret_code
Job return code.
CNetRef< SWorkerNodeJobContextImpl > m_Impl
static string StatusToString(EJobStatus status)
Printable status type.
void SetJobRetCode(int ret_code)
Set the return code of the job.
void SetJobOutput(const string &output)
Set a job's output.
void JobDelayExpiration(const CNetScheduleJob &job, unsigned runtime_inc)
Increment job execution timeout.
const string & GetJobKey() const
Get a job key.
const string & GetClientName() const
Get a node name.
void PutProgressMsg(const CNetScheduleJob &job)
Put job interim (progress) message.
unsigned int GetJobNumber() const
void GetProgressMsg(CNetScheduleJob &job)
Get progress message.
void RequestExclusiveMode()
Instruct the system that this job requires all system's resources If this method is call,...
CNcbiIstream & GetIStream()
Get a stream with input data for a job.
void JobDelayExpiration(unsigned runtime_inc)
Increment job execution timeout.
string job_id
Output job key.
void RescheduleJob(const string &affinity, const string &group=kEmptyStr)
Put the job back in the queue with the specified affinity and group.
const string & GetJobInput() const
Get a job input string.
ECommitStatus GetCommitStatus() const
static const char * GetCommitStatusDescription(ECommitStatus commit_status)
@ eCanceled
Explicitly canceled.
@ eRunning
Running on a worker node.
@ ePending
Waiting for execution.
@ eExclusiveModeIsAlreadySet
@ eExclusiveJob
Exclusive job - the node executes only this job, even if there are processor resources.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
IO_PREFIX::istream CNcbiIstream
Portable alias for istream.
static string TruncateSpaces(const string &str, ETrunc where=eTrunc_Both)
Truncate whitespace in a string.
@ eTrunc_End
Truncate trailing whitespace only.
void WaitForRoom(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0)
Wait for the room in the queue up to timeout_sec + timeout_nsec/1E9 seconds.
TItemHandle AcceptRequest(const TRequest &request, TUserPriority priority=0, unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Put a request in the queue with a given priority.
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
CNanoTimeout GetRemainingTime(void) const
Get time left to the expiration.
bool IsExpired(void) const
Check if the deadline is expired.
const long kMilliSecondsPerSecond
Number milliseconds in one second.
@ eInfinite
Infinite deadline.
bool Approve(EThrottleAction action=eDefault)
Approve a request.
@ eErrCode
Return immediately with err code == FALSE.
const struct ncbi::grid::netcache::search::fields::KEY key
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
void SleepSec(unsigned long sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Sleep.
ENetScheduleQueuePauseMode
Defines whether the job queue is paused, and if so, defines the pause mode set by the administrator.
const CNSPreciseTime default_timeout(3600, 0)
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
SSocketAddress server_address
Meaningful information encoded in the NetSchedule key.
bool Add(const CNetScheduleJob &job)
IWorkerNodeJob * GetJobProcessor()
bool EnterExclusiveMode()
SJobsInProgress m_JobsInProgress
CRef< CJobCommitterThread > m_JobCommitterThread
unsigned m_TotalTimeLimit
CStdPoolOfThreads * m_ThreadPool
void x_NotifyJobWatchers(const CWorkerNodeJobContext &job_context, IWorkerNodeJobWatcher::EEvent event)
CRunningJobLimit m_JobsPerSessionID
bool m_ProgressLogRequested
bool IsExclusiveMode() const
bool x_AreMastersBusy() const
size_t m_QueueEmbeddedOutputSize
void LeaveExclusiveMode()
SThreadSafe< SSuspendResume > m_SuspendResume
unsigned m_ThreadPoolTimeout
bool WaitForExclusiveJobToFinish()
CNetScheduleExecutor m_NSExecutor
CRunningJobLimit m_JobsPerClientIP
CNetScheduleGetJob::TAffinityLadder m_AffinityLadder
void ReturnJob(const CNetScheduleJob &job, bool blacklist=true)
bool x_GetJobWithAffinityLadder(SNetServerImpl *server, const CDeadline &timeout, const string &prio_aff_list, bool any_affinity, CNetScheduleJob &job)
EState CheckState() volatile
unsigned GetCurrentJobGeneration() const volatile
CRequestRateControl m_ProgressMsgThrottler
CNcbiIstream & GetIStream()
void x_PrintRequestStop()
CNcbiOstream & GetOStream()
virtual void JobDelayExpiration(unsigned runtime_inc)
CRef< CRequestContext > m_RequestContext
CNetScheduleExecutor m_NetScheduleExecutor
virtual CNetScheduleAdmin::EShutdownLevel GetShutdownLevel()
CRef< CWorkerNodeCleanup > m_CleanupEventSource
CWorkerNodeJobContext::ECommitStatus m_JobCommitStatus
virtual void PutProgressMessage(const string &msg, bool send_immediately, bool overwrite)
SGridWorkerNodeImpl * m_WorkerNode
CRequestRateControl m_StatusThrottler
SWorkerNodeJobContextImpl(SGridWorkerNodeImpl *worker_node)
CNetCacheAPI m_NetCacheAPI