NCBI C++ ToolKit
wn_commit_thread.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: wn_commit_thread.cpp 94781 2021-09-13 13:32:22Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  * NetSchedule Worker Node implementation
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "wn_commit_thread.hpp"
35 #include "grid_worker_impl.hpp"
36 
40 
41 #include <corelib/ncbiexpt.hpp>
42 #include <corelib/ncbi_system.hpp>
43 
44 
45 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
46 
47 
49 
50 /////////////////////////////////////////////////////////////////////////////
51 //
52 ///@internal
53 static void s_TlsCleanup(IWorkerNodeJob* p_value, void* /* data */ )
54 {
55  if (p_value != NULL)
56  p_value->RemoveReference();
57 }
58 /// @internal
60 
62  m_WorkerNode(worker_node),
63  m_Semaphore(0, 1),
64  m_ThreadName(worker_node->GetAppName() + "_cm")
65 {
66 }
67 
69 {
70  TFastMutexGuard mutex_lock(m_TimelineMutex);
71 
72  if (m_JobContextPool.empty())
74 
75  CWorkerNodeJobContext job_context(m_JobContextPool.front());
76  m_JobContextPool.pop_front();
77 
78  job_context->m_Job.Reset();
79  return job_context;
80 }
81 
83  SWorkerNodeJobContextImpl* job_context,
84  CRequestContextSwitcher& rctx_switcher)
85 {
86  job_context->m_FirstCommitAttempt = true;
87 
88  TFastMutexGuard mutex_lock(m_TimelineMutex);
89 
90  // Must be called prior to adding the job context to
91  // m_ImmediateActions: when empty, m_ImmediateActions
92  // indicates that the committer thread is waiting and
93  // the semaphore must be incremented.
94  WakeUp();
95 
96  m_ImmediateActions.push_back(TEntry(job_context));
97 
98  // We must do it here, before m_TimelineMutex is unlocked
99  rctx_switcher.Release();
100 }
101 
103 {
104  TFastMutexGuard mutex_lock(m_TimelineMutex);
105 
106  m_IsShuttingDown = true;
107  WakeUp();
108 }
109 
111 {
112  CNanoTimeout timeout = m_Timeline.front()->GetTimeout().GetRemainingTime();
113 
114  if (timeout.IsZero()) {
115  return true;
116  }
117 
118  TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
119  return !m_Semaphore.TryWait(timeout);
120 }
121 
123 {
125  TFastMutexGuard mutex_lock(m_TimelineMutex);
126 
127  do {
128  if (m_Timeline.empty()) {
129  TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
130 
131  m_Semaphore.Wait();
132  } else if (WaitForTimeout()) {
133  m_ImmediateActions.push_back(m_Timeline.front());
134  m_Timeline.pop_front();
135  }
136 
137  while (!m_ImmediateActions.empty()) {
138  TEntry& entry = m_ImmediateActions.front();
139 
140  // Do not remove the job context from m_ImmediateActions
141  // prior to calling x_CommitJob() to avoid race conditions
142  // (otherwise, the semaphore can be Post()'ed multiple times
143  // by the worker threads while this thread is in x_CommitJob()).
144  if (x_CommitJob(entry)) {
145  m_JobContextPool.push_back(entry);
146  } else {
147  m_Timeline.push_back(entry);
148  }
149 
150  m_ImmediateActions.pop_front();
151  }
152 
153  // Cannot use CGridGlobals::GetInstance().IsShuttingDown()) below,
154  // this thread could exit before committing all pending jobs otherwise.
155  } while (!m_IsShuttingDown);
156 
157  return NULL;
158 }
159 
161 {
162  TFastMutexUnlockGuard mutext_unlock(m_TimelineMutex);
163 
164  CRequestContextSwitcher request_state_guard(job_context->m_RequestContext);
165 
166  bool recycle_job_context = false;
167  m_WorkerNode->m_JobsInProgress.Update(job_context->m_Job);
168 
169  try {
170  switch (job_context->m_JobCommitStatus) {
172  m_WorkerNode->m_NSExecutor.PutResult(job_context->m_Job);
173  break;
174 
177  job_context->m_DisableRetries);
178  break;
179 
180  default: /* eCS_NotCommitted */
181  // In the unlikely event of eCS_NotCommitted, return the job.
182  /* FALL THROUGH */
183 
185  m_WorkerNode->m_NSExecutor.ReturnJob(job_context->m_Job);
186  break;
187 
189  m_WorkerNode->m_NSExecutor.Reschedule(job_context->m_Job);
190  break;
191 
193  // Job is cancelled or otherwise taken away from the worker
194  // node. Whatever the cause is, it has been reported already.
195  break;
196  }
197 
198  recycle_job_context = true;
199  }
200  catch (CNetScheduleException& e) {
202  e.GetMsg().find("job is in Canceled state") != string::npos) {
203  LOG_POST(Warning << "Could not commit " << job_context->m_Job.job_id << ": " << e.what());
204  } else {
205  ERR_POST_X(65, "Could not commit " << job_context->m_Job.job_id << ": " << e.what());
206  }
207 
208  recycle_job_context = true;
209  }
210  catch (exception& e) {
211  unsigned commit_interval = m_WorkerNode->m_CommitJobInterval;
212  job_context->ResetTimeout(commit_interval);
213  if (job_context->m_FirstCommitAttempt) {
214  job_context->m_FirstCommitAttempt = false;
215  job_context->m_CommitExpiration =
217  } else if (job_context->m_CommitExpiration <
218  job_context->GetTimeout()) {
219  ERR_POST_X(64, "Could not commit " <<
220  job_context->m_Job.job_id << ": " << e.what());
221  recycle_job_context = true;
222  }
223  if (!recycle_job_context) {
224  ERR_POST_X(63, "Error while committing " <<
225  job_context->m_Job.job_id << ": " << e.what() <<
226  "; will retry in " << commit_interval << " seconds.");
227  }
228  }
229 
230  if (recycle_job_context) {
231  m_WorkerNode->m_JobsInProgress.Remove(job_context->m_Job);
232  job_context->x_PrintRequestStop();
233  }
234 
235  return recycle_job_context;
236 }
237 
238 /// @internal
240 {
241  IWorkerNodeJob* ret = s_tls.GetValue();
242  if (ret == NULL) {
243  try {
245  ret = m_JobProcessorFactory->CreateInstance();
246  }
247  catch (exception& e) {
248  ERR_POST_X(9, "Could not create an instance of the "
249  "job processor class." << e.what());
252  throw;
253  }
254  if (ret == NULL) {
258  "Could not create an instance of the job processor class.");
259  }
260  if (CGridGlobals::GetInstance().ReuseJobObject()) {
261  s_tls.SetValue(ret, s_TlsCleanup);
262  ret->AddReference();
263  }
264  }
265  return ret;
266 }
267 
CDeadline.
Definition: ncbitime.hpp:1830
void RequestShutdown(CNetScheduleAdmin::EShutdownLevel level)
Request node shutdown.
static CGridGlobals & GetInstance()
SGridWorkerNodeImpl * m_WorkerNode
CJobCommitterThread(SGridWorkerNodeImpl *worker_node)
CRef< SWorkerNodeJobContextImpl > TEntry
TCommitJobTimeline m_JobContextPool
void RecycleJobContextAndCommitJob(SWorkerNodeJobContextImpl *job_context, CRequestContextSwitcher &rctx_switcher)
virtual void * Main()
Derived (user-created) class must provide a real thread function.
TCommitJobTimeline m_Timeline
TCommitJobTimeline m_ImmediateActions
bool x_CommitJob(SWorkerNodeJobContextImpl *job_context)
CWorkerNodeJobContext AllocJobContext()
CNanoTimeout – Timeout interval, using nanoseconds.
Definition: ncbitime.hpp:1810
@ eShutdownImmediate
Urgent shutdown was requested.
NetSchedule internal exception.
Worker Node job context.
Worker Node Job interface.
#define NULL
Definition: ncbistd.hpp:225
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
Definition: ncbidiag.hpp:550
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
const string & GetMsg(void) const
Get message string.
Definition: ncbiexpt.cpp:461
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
Definition: ncbiexpt.cpp:342
void ReturnJob(const CNetScheduleJob &job)
Switch the job back to the "Pending" status so that it can be run again on a different worker node.
void PutResult(const CNetScheduleJob &job)
Put job result (job should be received by GetJob() or WaitJob())
void Reschedule(const CNetScheduleJob &job)
Reschedule a job with new affinity and/or group information.
void PutFailure(const CNetScheduleJob &job, bool no_retries=false)
Submit job failure diagnostics.
string job_id
Output job key.
void AddReference(void) const
Add reference to object.
Definition: ncbiobj.hpp:489
void RemoveReference(void) const
Remove reference to object.
Definition: ncbiobj.hpp:500
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
void Wait(void)
Wait on semaphore.
Definition: ncbimtx.cpp:1787
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
Definition: ncbithr.cpp:958
bool TryWait(unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Timed wait.
Definition: ncbimtx.cpp:1844
bool IsZero() const
Definition: ncbitime.cpp:3476
Definition of all error codes used in connect services library (xconnserv.lib and others).
Defines NCBI C++ exception handling.
void Remove(const CNetScheduleJob &job)
IWorkerNodeJob * GetJobProcessor()
SJobsInProgress m_JobsInProgress
unique_ptr< IWorkerNodeJobFactory > m_JobProcessorFactory
CNetScheduleExecutor m_NSExecutor
void ResetTimeout(unsigned seconds)
CRef< CRequestContext > m_RequestContext
const CDeadline GetTimeout() const
CWorkerNodeJobContext::ECommitStatus m_JobCommitStatus
static CStaticTls< IWorkerNodeJob > s_tls
static void s_TlsCleanup(IWorkerNodeJob *p_value, void *)
Modified on Mon May 06 04:49:51 2024 by modify_doxy.py rev. 669887