NCBI C++ ToolKit
object_manager_engine.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: object_manager_engine.cpp 43654 2019-08-13 18:41:30Z katargir $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Andrey Yazhuk, Eugene Vasilchenko, Anatoliy Kuznetsov
27  *
28  * File Description:
29  * Implementation of Application Job Engine using object manager prefetcher.
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
36 #include <corelib/ncbi_system.hpp>
37 
38 
40 
41 using namespace objects;
42 
44  public IPrefetchAction
45 {
46 public:
48  : m_Job(job)
49  {
50  }
51 
53  {
54  return *m_Job;
55  }
56 
58  {
59  IAppJob::EJobState state = m_Job->Run();
60  if ( state == IAppJob::eCanceled ) {
61  //NCBI_THROW(CPrefetchCanceled, eCanceled, "canceled");
62  return false;
63  }
64  return state == IAppJob::eCompleted;
65  }
66 
67 private:
69 };
70 
71 
72 ///////////////////////////////////////////////////////////////////////////////
73 /// CObjectManagerEngine
75 : m_PrefetchManager(max_threads), m_Listener(NULL)
76 {
77 }
78 
79 
81 {
82 }
83 
85 {
86  SetListener( NULL );
87  m_PrefetchManager.Shutdown();
88 }
89 
90 
92 {
93  m_PrefetchManager.CancelAllTasks();
94 }
95 
97 {
98  return true;
99 }
100 
101 
103 {
104  CMutexGuard guard(m_Mutex);
105  m_Listener = listener;
106 }
107 
108 
109 void CObjectManagerEngine::StartJob(IAppJob& job, IEngineParams* params)
110 {
111  CMutexGuard guard(m_Mutex);
112  m_Jobs[&job] = m_PrefetchManager.AddAction(new CObjectManagerEngineAction(&job), this);
113 }
114 
115 
117 {
118  CMutexGuard guard(m_Mutex);
119  job.RequestCancel();
120  TJobHandles::iterator iter = m_Jobs.find(&job);
121  if ( iter == m_Jobs.end() ) {
122  NCBI_THROW(CAppJobException, eEngine_UnknownJob, "cannot cancel job");
123  }
124  iter->second->RequestToCancel();
125 }
126 
127 
129 {
130  NCBI_THROW(CException, eUnknown, "cannot suspend job");
131 }
132 
133 
135 {
136  NCBI_THROW(CException, eUnknown, "cannot resume job");
137 }
138 
139 
142 {
143  CMutexGuard guard(m_Mutex);
145  if ( iter == m_Jobs.end() ) {
146  return IAppJob::eInvalid;
147  }
148  CThreadPool_Task::EStatus status = iter->second->GetStatus();
149  switch ( status ) {
152  return IAppJob::eSuspended;
154  return IAppJob::eRunning;
156  return IAppJob::eCompleted;
158  return IAppJob::eFailed;
160  return IAppJob::eCanceled;
161  default:
162  return IAppJob::eInvalid;
163  }
164 }
165 
166 
168  CRef<CPrefetchRequest> token, EEvent event
169 ){
170  IAppJob::EJobState new_state;
171  bool remove = true;
172 
173  switch ( event ) {
174  case eStarted:
175  remove = false;
176  new_state = IAppJob::eRunning;
177  break;
178  case eCompleted:
179  new_state = IAppJob::eCompleted;
180  break;
181  case eFailed:
182  new_state = IAppJob::eFailed;
183  break;
184  case eCanceled:
185  new_state = IAppJob::eCanceled;
186  break;
187  default:
188  return;
189  }
190 
191  // Here we try to avoid multiple mutex lock (Engine -> AppJobDispatcher)
192  // Engine can throw a eEngineBusy error code, caller takes a short break with all mutex free
193  //
194 
195  for( unsigned re_try = 0;; ){
196 
197  IAppJob* app_job = 0;
198  {{
199  CMutexGuard guard(m_Mutex);
201  dynamic_cast<CObjectManagerEngineAction&>( *token->GetAction() )
202  ;
203  app_job = &action.GetIAppJob();
204  }}
205 
206  try {
207  if( m_Listener ){
208  m_Listener->OnEngineJobStateChanged( *app_job, new_state );
209  } else {
210  LOG_POST( Warning <<
211  "OME::PrefetchNotify: No listener already! Probably, late job."
212  );
213  }
214 
215  if( remove ){
216  CMutexGuard guard(m_Mutex);
217  m_Jobs.erase( app_job );
218  }
219 
220  break;
221 
222  } catch( CAppJobException& ex ){
223  if( ex.GetErrCode() == CAppJobException::eEngineBusy ){
224  SleepMilliSec( ++re_try );
225 
226  } else {
227  throw;
228  }
229  }
230  }
231 }
232 
233 
IAppJobListener Interface for components that need to be notified about changes in Jobs.
bool Execute(CRef< CPrefetchRequest > token)
CObjectManagerEngine(unsigned max_threads=3)
CObjectManagerEngine.
void PrefetchNotify(CRef< objects::CPrefetchRequest > token, EEvent event)
virtual void ResumeJob(IAppJob &job)
virtual void RequestCancel()
asyncronous request all jobs to Cancel
IAppJobEngineListener * m_Listener
objects::CPrefetchManager m_PrefetchManager
virtual void CancelJob(IAppJob &job)
Cancel job in the engine If job is not running yet - just remove from the pending queue otherwise use...
virtual TJobState GetJobState(IAppJob &job) const
virtual void StartJob(IAppJob &job, IEngineParams *params=NULL)
If Engine cannot start the Job and exception shall be thrown.
virtual void ShutDown()
stop any background threads and free resources associated with the Engine
virtual void SetListener(IAppJobEngineListener *listener)
For "active" mode set a Listener that will be notified when the state of a Job changes.
virtual void SuspendJob(IAppJob &job)
virtual bool IsActive()
Returns true if Engine supports "active" model i.e.
CObject –.
Definition: ncbiobj.hpp:180
CRef –.
Definition: ncbiobj.hpp:618
IAppJobEngineListener - receives notifications about changes in job state.
IAppJob.
Definition: app_job.hpp:82
void erase(iterator pos)
Definition: map.hpp:167
const_iterator end() const
Definition: map.hpp:152
const_iterator find(const key_type &key) const
Definition: map.hpp:153
static void DLIST_NAME() remove(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
Definition: dlist.tmpl.h:90
#define NULL
Definition: ncbistd.hpp:225
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
virtual void RequestCancel()=0
RequestCancel() is called to notify the Job that it shall exit Run() function ASAP.
EJobState
Job states (describe FSM)
Definition: app_job.hpp:86
virtual void OnEngineJobStateChanged(IAppJob &job, TJobState new_state)=0
@ eEngineBusy
Engine is busy, caller needs to re-try the operation.
@ eCanceled
Definition: app_job.hpp:91
@ eCompleted
Definition: app_job.hpp:89
@ eRunning
Definition: app_job.hpp:88
@ eSuspended
Definition: app_job.hpp:92
@ eInvalid
Definition: app_job.hpp:87
@ eFailed
Definition: app_job.hpp:90
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
EStatus
Status of the task.
Definition: thread_pool.hpp:79
@ eIdle
has not been placed in queue yet
Definition: thread_pool.hpp:80
@ eQueued
in the queue, awaiting execution
Definition: thread_pool.hpp:81
@ eExecuting
being executed
Definition: thread_pool.hpp:82
@ eFailed
failure during execution
Definition: thread_pool.hpp:84
@ eCompleted
executed successfully
Definition: thread_pool.hpp:83
@ eCanceled
canceled - possible only if canceled before processing was started or if method Execute() returns res...
Definition: thread_pool.hpp:85
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
@ eCanceled
Request canceled.
Modified on Fri Dec 08 08:19:18 2023 by modify_doxy.py rev. 669887