NCBI C++ ToolKit
app_job_dispatcher.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: app_job_dispatcher.cpp 34478 2016-01-13 16:19:10Z katargir $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Andrey Yazhuk, Anatoliy Kuznetsov
27  *
28  * File Description:
29  *
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
39 
40 #include <corelib/ncbimtx.hpp>
41 #include <corelib/ncbi_system.hpp>
42 
43 
45 
46 #define DISP_CATCH_AND_THROW(message, job) \
47  catch (CException& e) { \
48  string s(message); \
49  s += job.GetDescr(); \
50  NCBI_RETHROW(e, CAppJobException, eEngineFailed, s); \
51  } catch (std::exception& ee) { \
52  NCBI_THROW(CAppJobException, eFatalError, ee.what()); \
53  }
54 
55 #define DISP_CATCH_AND_REPORT(message, job) \
56  catch (CException& e) { \
57  ERR_POST(message << job.GetDescr()); \
58  ERR_POST(e.ReportAll()); \
59  } catch (std::exception& ee) { \
60  ERR_POST(message << ee.what()); \
61  }
62 
63 
64 
65 ///////////////////////////////////////////////////////////////////////////////
66 /// CAppJobDispatcher::SJobRecord
69  IAppJobEngine& engine,
70  CAppJobEventTranslator* listener,
71  int report_period, bool auto_delete)
72 : m_Job(&job), m_ID(id), m_State(state),
73  m_Engine(engine), m_Listener(listener),
74  m_ReportPeriod(report_period),
75  m_AutoDelete(auto_delete)
76 {
77  if (m_ReportPeriod > 0) {
78  if (m_ReportPeriod < 3) // limit the minimum report period
79  m_ReportPeriod = 3;
80  }
81 }
82 
83 
84 ///////////////////////////////////////////////////////////////////////////////
85 /// CAppJobDispatcher
86 DEFINE_CLASS_STATIC_MUTEX(CAppJobDispatcher::sm_Mutex);
87 
89 
91 {
92  if( ! sm_Dispatcher) {
93  CMutexGuard Guard(sm_Mutex);
94  if( ! sm_Dispatcher) {
96  }
97  }
98  return *sm_Dispatcher;
99 }
100 
101 
103 {
104  CMutexGuard Guard(sm_Mutex);
106 }
107 
108 
111  m_Mute(false),
114 {
115 }
116 
117 
119 {
120 }
121 
122 
124 {
125  m_MainMutex.Lock();
126 }
127 
128 
130 {
132 }
133 
134 
136 {
137  //LOG_POST(Info << "CAppJobDispatcher::ShutDown()");
138 
139  static const char *kMessage = "CAppJobDispatcher::ShutDown() ";
140 
141  m_ShutDownInProgress = true;
142 
143  {{
144  TDispatcherGuard guard(*this);
145 
146  /// shutdown polling
147  m_PollQueue.clear();
148 
149  /// cancel all running Jobs and delete all records
151  SJobRecord* rec = it->second;
152  if(rec->m_State == IAppJob::eRunning) {
153  IAppJob& job = rec->m_Job.GetObject();
154 
155  try {
156  rec->m_Engine.CancelJob(job);
157  } catch(CAppJobException& e) {
158  // analyze what happened
159  switch(e.GetErrCode()) {
162  break; // this is normal
163  default:
164  ERR_POST(kMessage << job.GetDescr());
165  ERR_POST(e.ReportAll());
166  }
167  } catch (std::exception& ee) {
168  ERR_POST(kMessage << ee.what());
169  }
170  }
171  delete rec;
172  }
173  m_PtrToRec.clear();
174  // clear m_JobRecs to block processing of notifications from Engines
175  m_JobRecs.clear();
176 
177  }}
178 
179 
180  // now shut down all engines
182  IAppJobEngine& engine = *it->second;
183  try {
184  engine.ShutDown();
185  } NCBI_CATCH(kMessage);
186  }
187 
188  /// clear Engines
190 }
191 
192 
194 {
195  return x_PollEngines();
196 }
197 
198 
199 bool CAppJobDispatcher::RegisterEngine(const string& name, IAppJobEngine& engine)
200 {
201  _ASSERT(&engine != NULL);
202 
204 
206  if(it == m_NameToEngine.end()) {
207  CIRef<IAppJobEngine> ref(&engine);
208  m_NameToEngine[name] = ref;
209  if(engine.IsActive()) {
210  engine.SetListener(this);
211  }
212  return true;
213  } else {
214  ERR_POST("CAppJobDispatcher::RegisterEngine() engine \"" << name <<
215  "\" is already registered");
216  return false;
217  }
218 }
219 
221 {
223 
225  it->second->RequestCancel();
226  }
227 }
228 
229 
231  CAppJobDispatcher::StartJob(IAppJob& job, const string& engine_name,
232  IEngineParams* params)
233 {
234  // start in passive mode - no default listener, no auto delete
235  return x_StartJob(job, engine_name, NULL, 0, false, params);
236 }
237 
238 
240  CAppJobDispatcher::StartJob(IAppJob& job, const string& engine_name,
241  CEventHandler& listener, int report_period,
242  bool auto_delete, IEngineParams* params)
243 {
244  CAppJobEventTranslator* translator = new CAppJobEventTranslator(listener);
245  translator->SetDispatcher(*this);
246  return x_StartJob(job, engine_name, translator, report_period,
247  auto_delete, params);
248 }
249 
250 
252  CAppJobDispatcher::StartJob(IAppJob& job, const string& engine_name,
253  CAppJobEventTranslator& listener, int report_period,
254  bool auto_delete, IEngineParams* params)
255 {
256  return x_StartJob(job, engine_name, &listener, report_period,
257  auto_delete, params);
258 }
259 
260 
261 static const char* kCannotStart = "Cannot start the job ";
262 static const char* kJobRegistered = "Cannot start the job - it is already registered";
263 static const char* kUnknownEngine = "Cannot start the job - engine is not registred";
264 
265 
267  CAppJobDispatcher::x_StartJob(IAppJob& job, const string& engine_name,
268  CAppJobEventTranslator* listener, int report_period,
269  bool auto_delete, IEngineParams* params)
270 {
271  CIRef<IAppJob> job_rf(&job);
272 
273  IAppJobEngine* engine = x_GetRegisteredEngine(engine_name);
274  if(!engine) {
275  NCBI_THROW(CAppJobException, eInvalidOperation, kUnknownEngine);
276  }
277  TJobID id = eInvalidJobID;
278 
279  // create job record and register it
280  {{
281  TDispatcherGuard guard(*this);
282  SJobRecord* rec = x_GetJobRecord(job);
283  if(rec) {
284  NCBI_THROW(CAppJobException, eInvalidOperation, kJobRegistered);
285  }
286 
287  id = ++m_LastJobID;
288  rec = new SJobRecord(job, id, IAppJob::eRunning, *engine,
289  listener, report_period, auto_delete);
290  x_AddJobRecord(*rec);
291  }}
292 
293 
294  // Submit job to the engine
295  // In this section we do NOT hold the main lock
296  {{
297  unsigned re_try = 0;
298  do {
299  try {
300  engine->StartJob(job, params);
301  }
302  catch (CAppJobException& ex) {
303  if (ex.GetErrCode() == CAppJobException::eEngineBusy) {
304  ++re_try;
305  SleepMilliSec(2);
306  }
307  re_try = 0;
308  }
309  catch (CException& e) {
310  string s(kCannotStart);
311  s += job.GetDescr();
312  NCBI_RETHROW(e, CAppJobException, eEngineFailed, s);
313  } catch (std::exception& e) {
314  NCBI_THROW(CAppJobException, eFatalError, e.what());
315  }
316 
317  } while (re_try);
318  }}
319 
320  // Post notification about job start
321  {{
322  TDispatcherGuard guard(*this);
323  x_OnJobStarted(job, *engine, listener, report_period, auto_delete);
324  }}
325 
326  return id;
327 
328 }
329 
330 
331 /// handles state transition
333  CAppJobEventTranslator* listener,
334  int report_period, bool /*auto_delete*/)
335 {
336  SJobRecord* rec = x_GetJobRecord(job);
337  if (rec == 0) {
338  // Job record already deleted (job finished?)
339  // in the current implementation it is hard to check (no post-execution trace)...
340  return;
341  }
342  if (rec->m_State == IAppJob::eRunning) {
343  if(report_period > 0) {
344  _ASSERT(listener);
345  if(listener) {
346  time_t t = time(NULL) + report_period;
348  }
349  }
350  }
351 }
352 
353 
354 static const char* kCancelErrEngine =
355  "CAppJobDispatcher::CancelJob() cannot cancel job";
356 static const char* kCancelErrNotRunning =
357  "CAppJobDispatcher::CancelJob() cannot cancel job - the job is not running.";
358 static const char* kCancelErrNotReg =
359  "CAppJobDispatcher::CancelJob() cannot cancel job - the job is not registered.";
360 static const char* kDefaultErrorMessage =
361  "CAppJobDispatcher: Tool failed with unspecified error";
362 
363 
364 
365 bool CAppJobDispatcher::x_IsCanceled(int job_id) const
366 {
367  return m_CancelVect[job_id];
368 }
369 
370 
372 {
373  TDispatcherGuard guard(*this);
374 
375  if (m_CancelVect[job_id]) {
376  LOG_POST("Repeated job cancel: ignored. job=" << job_id);
377  return;
378  }
379  m_CancelVect[job_id] = true;
380 
381  SJobRecord* rec = x_GetJobRecord(job_id);
382  if(rec) {
383 
384  try {
385  switch (rec->m_State) {
386  case IAppJob::eRunning:
387  rec->m_Engine.CancelJob(*rec->m_Job);
388  break;
389  case IAppJob::eCompleted:
390  case IAppJob::eFailed:
391  case IAppJob::eCanceled:
392  // nothing to do
393  break;
394  default:
395  NCBI_THROW(CAppJobException, eInvalidOperation, kCancelErrNotRunning);
396  }
397  }
399 
400  } else {
402  }
403 }
404 
405 
406 static const char* kSuspendErrEngine =
407  "CAppJobDispatcher::SuspendJob() cannot suspend job";
408 static const char* kSuspendErrNotRunning =
409  "CAppJobDispatcher::SuspendJob() cannot suspend job - the job is not running.";
410 static const char* kSuspendErrNotReg =
411  "CAppJobDispatcher::SuspendJob() cannot suspend job - the job is not registered.";
412 
413 
415 {
416  TDispatcherGuard guard(*this);
417 
418  SJobRecord* rec = x_GetJobRecord(job_id);
419  if(rec) {
420  if(rec->m_State == IAppJob::eRunning) {
421  IAppJob& job = rec->m_Job.GetObject();
422  try {
423  rec->m_Engine.SuspendJob(job);
424  }
426 
427  // if we are here the request was accepted
429  x_OnJobStateChanged(*rec, rec->m_State); //TODO ?
430  } else {
432  }
433  } else {
435  }
436 }
437 
438 
439 static const char* kResumeErrEngine =
440  "CAppJobDispatcher::ResumeJob() cannot resume job";
441 static const char* kResumeErrNotRunning =
442  "CAppJobDispatcher::ResumeJob() cannot resume job - the job is not running.";
443 static const char* kResumeErrNotReg =
444  "CAppJobDispatcher::ResumeJob() cannot resume job - the job is not registered.";
445 
447 {
448  TDispatcherGuard guard(*this);
449 
450  SJobRecord* rec = x_GetJobRecord(job_id);
451  if(rec) {
452  if(rec->m_State == IAppJob::eSuspended) {
453  IAppJob& job = rec->m_Job.GetObject();
454  try {
455  rec->m_Engine.ResumeJob(job);
456  }
458 
459  // if we are here the request was accepted
460  rec->m_State = IAppJob::eRunning;
461  //x_OnJobStateChanged(*rec, rec->m_State); TODO
462  } else {
463  NCBI_THROW(CAppJobException, eInvalidOperation, kResumeErrNotRunning);
464  }
465  } else {
467  }
468 }
469 
470 
471 static const char* kDeleteErrEngine =
472  "CAppJobDispatcher::DeleteJob() cannot delete job";
473 
475 {
476  TDispatcherGuard guard(*this);
477 
478  SJobRecord* rec = x_GetJobRecord(job_id);
479  if( rec ){
480  CIRef<IAppJob> job(rec->m_Job);
481  IAppJobEngine& engine = rec->m_Engine;
482 
483  m_CancelVect.set(rec->m_ID, true); // mark as canceled
484 
485  // delete the job from Registry
486  bool running = rec->m_State == IAppJob::eRunning;
487  x_RemoveJobRecord(*rec);
488  delete rec;
489 
490  guard.Release(); // Important! Release guard without waiting for engine
491 
492  if (running) {
493  try {
494  engine.CancelJob(*job);
495  }
496  catch(CAppJobException& e) {
497  // analyze what happened
498  switch(e.GetErrCode()) {
501  break; // this is normal
502  default:
503  NCBI_RETHROW(e, CAppJobException, eEngineFailed, kDeleteErrEngine);
504  }
505  } catch (std::exception& ee) {
506  NCBI_THROW(CAppJobException, eFatalError, ee.what());
507  }
508  return true;
509  }
510  }
511 
512  return false;
513 }
514 
515 
516 //////////////////////////////////////////////////////////////////////////////
517 /// All Get() functions return values stored in the Registy not the actual
518 
520 {
521  TDispatcherGuard guard(*this);
522 
523  const SJobRecord* rec = x_GetJobRecord(job_id);
524  return rec ? rec->m_State : IAppJob::eInvalid;
525 }
526 
527 
529 {
530  TDispatcherGuard guard(*this);
531 
532  SJobRecord* rec = x_GetJobRecord(job_id);
533  if(rec) {
534  if(rec->ActiveProgress()) {
535  /// active reporting - x_PollEngines() function updates the record
536  /// periodically, just return the last value
537  return rec->m_Progress;
538  } else {
539  ///
540  return rec->m_Job->GetProgress();
541  }
542  }
544 }
545 
546 
548 {
549  TDispatcherGuard guard(*this);
550 
551  SJobRecord* rec = x_GetJobRecord(job_id);
552  CRef<CObject> res(rec ? rec->m_Job->GetResult().GetPointer() : NULL);
553  return res;
554 }
555 
556 
558 {
559  TDispatcherGuard guard(*this);
560 
561  SJobRecord* rec = x_GetJobRecord(job_id);
562 
563  CConstIRef<IAppJobError> err(rec ? rec->m_Job->GetError().GetPointer() : NULL);
564 
565  // if job failed and there is no error object, create one here marked at CAppJobDispatcher
566  if (!err && (rec->m_State == IAppJob::eFailed)) {
568  }
569 
570  return err;
571 }
572 
573 
574 ///////////////////////////////////////////////////////////////////////////////
575 /// Helper functions
576 /// these functions rely on external synchronization and do not throw / catch exception
577 
579 {
581 
582  TNameToEngine::iterator it = m_NameToEngine.find(engine_name);
583  if(it == m_NameToEngine.end()) {
584  ERR_POST("x_CAppJobDispatcher::x_GetRegisteredEngine() engine \""
585  << engine_name << "\" is not registered.");
586  return NULL;
587  }
588  return it->second.GetPointer();
589 }
590 
591 
593 {
595  return (it == m_PtrToRec.end()) ? NULL : it->second;
596 }
597 
598 
600 {
601  if(job_id > eInvalidJobID && job_id <= m_LastJobID) {
603  return (it != m_JobRecs.end()) ? it->second : NULL;
604  }
605  return NULL;
606 }
607 
608 
610 {
611  _ASSERT(m_JobRecs.find(rec.m_ID) == m_JobRecs.end());
613 
614  m_JobRecs[rec.m_ID] = &rec;
615  m_PtrToRec[rec.m_Job.GetPointer()] = &rec;
616 }
617 
618 
620 {
621  _ASSERT(m_JobRecs.find(rec.m_ID) != m_JobRecs.end());
623 
624  m_JobRecs.erase(rec.m_ID);
626 }
627 
628 
629 static const char* kListenerException =
630  "Exception in CAppJobDispatcher::x_OnJobStateChangedNotify() ";
631 
632 // notify job Listener about state change
634 {
635  //if(rec.m_State == IAppJob::eCompleted) {
636  // LOG_POST("CAppJobDispatcher::x_OnJobStateChangedNotify - Completed, id = " << rec.m_ID);
637  //}
638 
639  if(rec.m_Listener && !m_Mute) {
640  //LOG_POST("m_Listener->OnJobStateChanged() " << rec.m_ID << " " << StateToStr(rec.m_State));
641  try {
642  rec.m_Listener->OnJobStateChanged(&rec, rec.m_State);
643  }
645  }
646  else {
647  LOG_POST("CAppJobDispatcher: Job Notification not delivered (muted/no listener) ");
648  }
649 }
650 
651 
653 {
654  if(rec.m_Listener && !m_Mute) {
655  rec.m_Listener->OnJobProgress(&rec);
656  }
657 }
658 
659 void CAppJobDispatcher::Mute(bool bMute)
660 {
661  m_Mute = bMute;
662 }
663 
664 ///////////////////////////////////////////////////////////////////////////////
665 /// IAppJobEngineListener
666 
667 
668 /// handles state change notifications from active Engines
670 {
671  //string s = StateToStr(new_state);
672  //LOG_POST("CAppJobDispatcher::OnEngineJobStateChanged() new state = " <<
673  // s << ", " << job.GetDescr());
674 
675 
676  if (m_ShutDownInProgress) {
677  return;
678  }
679 
680  // Job execution start is not important, so it gets silenced to minimize mutex clash
681  if (new_state == IAppJob::eRunning) {
682  return;
683  }
684 
685 
686 
687  // Anatoliy July-10-2009 :
688  // Code being re-written to fix bug GB-500 reported by Andrew Shkeda
689  //
690  // JobDispatcher uses 2 connected event queues here
691  // m_StateEventQueue is used to pre-queue the job state change event, one of the functions
692  // it guarantees Job::Complete is not coming before Job::Running (logical state-races in event delivery)
693  // so all notofication is serialized here (probably a good deal of parallel processing too...)
694  //
695 
696  {{
698  m_StateEventQueue.push_back(SJobStateEvent(job, new_state));
699  }}
700 
701  // Second part of the story:
702  //
703  // spin-flush to send all events to the event re-translator
704  // Here we MUST guaratee the event pre-queue is flushed to GUI retranslator (another queue)
705  // The spin guarantees that the queue is flushed, TryLock-sleep is "to avoid a deadlock"
706  // (previous author), but no idea of what resources are involved..
707  //
708  // TODO: need to find a clear way to fix the "deadlock" and remove the m_StateEventQueue
709  //
710  for (unsigned i = 0; true; ++i) {
711  if(m_MainMutex.TryLock()) {
712  try {
714  }
715  catch (exception&)
716  {
718  throw;
719  }
721  break; // queue is empty at this point, our job notification being retranslated to main GUI queue
722  }
723  if (i > 5) {
724  SleepMilliSec( 5+i );
725  }
726  if (i > 20) {
727  //LOG_POST("CAppJobDispatcher: event spin-translation takes too long...");
728  NCBI_THROW(CAppJobException, eEngineBusy, "");
729  }
730  } // for
731 
732 
733 }
734 
735 
736 
737 // this function must be exception-safe
739 {
740  // loop to process one event per mutex lock at a time
741  while (true) {
742  {{
744  if(m_StateEventQueue.empty()) {
745  break;
746  }
747 
748  SJobStateEvent& event = *m_StateEventQueue.begin();
749 
750  IAppJob& job = *event.m_Job;
751  TJobState new_state = event.m_NewState;
752 
753  try {
754  m_StateEventQueue.pop_front();
755  guard.Release(); // Important! m_StateEventMutex is no protection from this point
756 
757  SJobRecord* rec = x_GetJobRecord(job);
758  if (rec) {
759  x_OnJobStateChanged(*rec, new_state);
760 
761  if (!m_Mute)
762  {
763  // call to event re-translator
764  if (rec->m_Listener) {
765  rec->m_Listener->OnJobStateChanged(rec, rec->m_State);
766  }
767  }
768 
769  // if job finished and AutoDelete flag is specified - delete the Job record
770  if (IsTerminal(rec->m_State) && rec->m_AutoDelete) {
771  x_RemoveJobRecord(*rec);
772  delete rec;
773  }
774  }
775  }
776  DISP_CATCH_AND_REPORT("CAppJobDispatcher::x_FlushStateEventQueue() notification failed", job)
777 
778  }}
779  } // while
780 }
781 
783 {
784  switch (job_state)
785  {
786  case IAppJob::eInvalid:
787  return "eInvalid";
788  case IAppJob::eRunning:
789  return "eRunning";
790  case IAppJob::eCompleted:
791  return "eCompleted";
792  case IAppJob::eFailed:
793  return "eFailed";
794  case IAppJob::eCanceled:
795  return "eCanceled";
796  case IAppJob::eSuspended:
797  return "eSuspended";
798  default:
799  return "Unknown";
800  }
801 }
802 
803 
804 
805 // validates state transition and issues notifications
807 {
808  // check that the new state is valid (there is a valid transition from
809  // the current state. If there is not, then Engine is wrong - throw an
810  // exception
811  bool error = false;
812  switch(rec.m_State) {
813  case IAppJob::eRunning:
814  case IAppJob::eCanceled:
815  error = (new_state == IAppJob::eInvalid);
816  break;
817  case IAppJob::eCompleted:
818  case IAppJob::eFailed:
819  error = true; // Job finished, now new states possible
820  break;
821  case IAppJob::eSuspended:
822  error = (new_state != IAppJob::eRunning);
823  break;
824  default:
825  _ASSERT(false); // not a valid state
826  }
827 
828  if(error) {
829  string s = "Job " + rec.m_Job->GetDescr();
830  s += "Transition from state ";
831  s += StateToStr(rec.m_State);
832  s += " to state ";
833  s += StateToStr(new_state);
834  s += " is invalid.";
835  NCBI_THROW(CAppJobException, eEngineFailed, s);
836  //return; false; // unreachable
837  } else {
838  // change state and notify clients
839  rec.m_State = new_state;
840  //return true;
841  }
842 }
843 
844 
846 {
847  switch(state) {
848  case IAppJob::eInvalid: return "Invalid";
849  case IAppJob::eRunning: return "Running";
850  case IAppJob::eCompleted: return "Completed";
851  case IAppJob::eFailed: return "Failed";
852  case IAppJob::eCanceled: return "Cancelled";
853  case IAppJob::eSuspended: return "Suspended";
854  }
855  _ASSERT(false);
856  return "";
857 }
858 
859 
861 {
862  return (state == IAppJob::eCompleted) || (state == IAppJob::eFailed)
863  || (state == IAppJob::eCanceled);
864 }
865 
866 
868 {
869 #ifdef _DEBUG
870  if( ! rec.m_Progress) {
871  string descr = rec.m_Job->GetDescr();
872  LOG_POST(Error << "Active progress reporting is requested for job \""
873  << descr << "\".\n The job must implement GetProgress() funtion!");
874  _ASSERT(false);
875  }
876 #endif
877 }
878 
879 
881 {
882  //LOG_POST("CAppJobDispatcher::x_PollEngines()");
883  /// check Job state every kPeriod seconds. All local Engines are supposed to
884  /// be Active, so this delay is for remote Engines only
885  static const int kPeriod = 15;
886  bool done_something = false;
887 
888  // fast check to see if the queue is not empty
889  if( ! m_PollQueue.empty()) {
890  time_t now = time(NULL);
891 
892  TDispatcherGuard guard(*this);
893 
894  // now synchronized check
895  if( ! m_PollQueue.empty()) {
896  time_t check_time = m_PollQueue.begin()->first;
897  if(now >= check_time) {
898  done_something = true;
899 
900  // it's time to check this Job
901  SQueueItem& item = m_PollQueue.begin()->second;
902  //SJobRecord* rec = item.m_Rec;
903  SJobRecord* rec = x_GetJobRecord(item.m_JobId);
904  bool progress = item.m_Progress;
905 
906  /// delete this item from the Queue (item variable becomes invalid)
908  if (rec == NULL) { // record is already removed, no need to poll
909  return done_something;
910  }
911 
912  if(progress) {
913  // obtain Progress information from the job
914  if(rec->m_State == IAppJob::eRunning) {
915  rec->m_Progress = rec->m_Job->GetProgress();
917  x_OnJobProgressNotify(*rec);
918 
919  time_t new_time = now + rec->m_ReportPeriod;
920  m_PollQueue.insert(TTimeToItem::value_type(new_time, SQueueItem(rec->m_ID, true)));
921  }
922  } else {
923  // request Status information from the passive engine
924  IAppJob& job = rec->m_Job.GetObject();
925  try {
926  TJobState new_state = rec->m_Engine.GetJobState(job);
927 
928  if(rec->m_State != new_state) {
929  x_OnJobStateChanged(*rec, new_state);
931  }
932  }
933  DISP_CATCH_AND_REPORT("CAppJobDispatcher::x_PollEngines()", job)
934 
935  if(IsTerminal(rec->m_State)) {
936  // passive engine requires deleting jobs manually
937  x_RemoveJobRecord(*rec);
938  delete rec;
939  } else {
940  // reschedule Job monitoring
941  time_t new_time = now + kPeriod;
942  m_PollQueue.insert(TTimeToItem::value_type(new_time, SQueueItem(rec->m_ID, false)));
943  }
944  }
945  }
946  }
947  }
948  return done_something;
949 }
950 
951 
953 {
954  jobId = 7777;
955 
956  IAppJob::EJobState state = job.Run();
957 
958  switch (state)
959  {
960  case IAppJob::eCompleted:
961  {
962  CRef<CObject> res(job.GetResult().GetPointer());
963  CRef<CEvent> evt(new CAppJobNotification(jobId, res.GetPointer()));
964  listener.Send(evt);
965  break;
966  }
967  case IAppJob::eSuspended:
968  case IAppJob::eRunning:
969  case IAppJob::eFailed:
970  {
972  if (err) {
973  CRef<CEvent> evt(new CAppJobNotification(jobId, *err));
974  listener.Send(evt);
975  }
976  else {
977  ERR_POST("Job failed -- NULL error job_id= " << jobId);
978  CRef<CEvent> evt(new CAppJobNotification(jobId, state));
979  listener.Send(evt);
980  }
981  break;
982  }
983 
984  case IAppJob::eCanceled:
985  {
986  CRef<CEvent> evt(new CAppJobNotification(jobId, state));
987  listener.Send(evt);
988  break;
989  }
990  default:
991  _ASSERT(false);
992  }
993 }
994 
static const char * kCancelErrNotReg
static const char * kJobRegistered
static const char * kResumeErrNotRunning
static const char * kSuspendErrNotReg
#define DISP_CATCH_AND_REPORT(message, job)
static const char * kResumeErrEngine
static const char * kDeleteErrEngine
static const char * kSuspendErrEngine
static const char * kSuspendErrNotRunning
static const char * kCannotStart
static const char * kResumeErrNotReg
static const char * kUnknownEngine
static const char * kCancelErrEngine
static const char * kCancelErrNotRunning
DEFINE_CLASS_STATIC_MUTEX(CAppJobDispatcher::sm_Mutex)
CAppJobDispatcher.
static const char * kListenerException
#define DISP_CATCH_AND_THROW(message, job)
static const char * kDefaultErrorMessage
CAppJobDispatcher.
CAppJobError Default implementation for IAppJobError - encapsulates a text error message.
CAppJobEventTranslator Standard Listener that generates notification events.
IAppJobListener Interface for components that need to be notified about changes in Jobs.
CAppJobNotification Notification send by CAppJobEventTranslator.
CEventHandler.
void Release()
Manually force the resource to be released.
Definition: guard.hpp:166
IAppJobEngine.
IAppJob.
Definition: app_job.hpp:82
bvector< Alloc > & set(size_type n, bool val=true)
Sets bit n if val is true, clears bit n if val is false.
Definition: bm.h:4188
void erase(iterator pos)
Definition: map.hpp:167
const_iterator end() const
Definition: map.hpp:152
void clear()
Definition: map.hpp:169
const_iterator find(const key_type &key) const
Definition: map.hpp:153
void clear()
Definition: map.hpp:309
void erase(iterator pos)
Definition: map.hpp:307
iterator insert(const value_type &val)
Definition: map.hpp:305
const_iterator begin() const
Definition: map.hpp:291
bool empty() const
Definition: map.hpp:289
#define false
Definition: bool.h:36
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
#define NULL
Definition: ncbistd.hpp:225
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
void Error(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1197
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
#define NCBI_CATCH(message)
Catch CExceptions as well This macro is deprecated - use *_X or *_XX variant instead of it.
Definition: ncbiexpt.hpp:580
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
Definition: ncbiexpt.cpp:342
#define NCBI_RETHROW(prev_exception, exception_class, err_code, message)
Generic macro to re-throw an exception.
Definition: ncbiexpt.hpp:737
virtual void StartJob(IAppJob &job, IEngineParams *params=NULL)=0
If Engine cannot start the Job and exception shall be thrown.
TJobID x_StartJob(IAppJob &job, const string &engine_name, CAppJobEventTranslator *listener, int report_period, bool auto_delete, IEngineParams *params)
void x_OnJobStateChangedNotify(SJobRecord &rec)
virtual void ResumeJob(IAppJob &job)=0
SJobRecord * x_GetJobRecord(TJobID job_id)
bool x_IsCanceled(int job_id) const
virtual void SuspendJob(IAppJob &job)=0
IAppJobEngine * x_GetRegisteredEngine(const string &engine_name)
Helper functions these functions rely on external synchronization and do not throw / catch exception.
void RunSync(IAppJob &job, TJobID &jobId, CEventHandler &listener)
Runs jon synchronously sending job notifications synchronously Returns when job is finished.
TTimeToItem m_PollQueue
Job Index (by pointer)
void ShutDown()
Terminates all jobs and releases Engines.
static bool IsTerminal(TJobState state)
virtual void SetListener(IAppJobEngineListener *listener)=0
For "active" mode set a Listener that will be notified when the state of a Job changes.
CAppJobDispatcher()
Release the singleton.
static CRef< CAppJobDispatcher > sm_Dispatcher
global dispatcher, this instance is used by default in most cases, however it is possible to create a...
static CAppJobDispatcher & GetInstance()
void Mute(bool bMute=true)
Mute all notifications.
void x_VerifyProgressNotNull(CAppJobDispatcher::SJobRecord &rec)
SJobRecord(IAppJob &job, TJobID id, TJobState state, IAppJobEngine &engine, CAppJobEventTranslator *listener, int report_period, bool auto_delete)
CAppJobDispatcher::SJobRecord.
virtual CConstIRef< IAppJobError > GetError()=0
Returns IAppJobError object describing internal error that caused the Job to fail.
bool IdleCallback()
this function shall be called in the the application idle function.
void SetDispatcher(CAppJobDispatcher &disp)
static void ReleaseInstance()
get the Singleton Dispatcher
virtual void CancelJob(IAppJob &job)=0
Cancel job in the engine If job is not running yet - just remove from the pending queue otherwise use...
static string StateToStr(TJobState state)
TStateEventQueue m_StateEventQueue
priority queue for Dispatcher to poll on
friend class CAppJobEventTranslator
virtual CRef< CObject > GetResult()=0
Returns the Job Result.
static string GetStatusString(TJobState job_state)
Debugging method for status strings.
bool m_AutoDelete
delete the record when job finishes
int m_ReportPeriod
if > 0, active progress reporting is required
CConstIRef< IAppJobError > GetJobError(TJobID job_id)
void ResumeJob(TJobID job_id)
CFastMutex m_EngineMutex
Engines registry mutex.
CIRef< CAppJobEventTranslator > m_Listener
if not null - "active" mode
void x_OnJobStarted(IAppJob &job, IAppJobEngine &, CAppJobEventTranslator *listener, int report_period, bool)
handles state transition
bool DeleteJob(TJobID job_id)
when a Job is deleted the listener is not notified
EJobState
Job states (describe FSM)
Definition: app_job.hpp:86
bool m_ShutDownInProgress
Shutdown flag.
virtual void OnEngineJobStateChanged(IAppJob &job, TJobState new_state)
IAppJobEngineListener.
TPtrToRec m_PtrToRec
Job Registry (index by JobID)
TJobID StartJob(IAppJob &job, const string &engine_name, IEngineParams *params=NULL)
Starts a Job on the specified engine in "passive mode" - no notifications or progress reports will be...
virtual string GetDescr() const =0
Returns a human readable description of the Job (optional)
virtual bool IsActive()=0
Returns true if Engine supports "active" model i.e.
virtual EJobState Run()=0
Function that does all the useful work, called by the Engine.
void x_AddJobRecord(SJobRecord &rec)
CRef< CObject > GetJobResult(TJobID job_id)
CConstIRef< IAppJobProgress > GetJobProgress(TJobID job_id)
TNameToEngine m_NameToEngine
Engines Registry.
TJobState GetJobState(TJobID job_id)
All Get() functions return values stored in the Registy not the actual.
virtual TJobState GetJobState(IAppJob &job) const =0
void CancelAllJobs()
Request to cancel all jobs (func returns without waiting)
void CancelJob(TJobID job_id)
bool RegisterEngine(const string &name, IAppJobEngine &engine)
Registers a new Engine, returns true if successful.
CConstIRef< IAppJobProgress > m_Progress
static void x_OnJobStateChanged(SJobRecord &rec, TJobState new_state)
Update job record, throws an exception if new state change is incorrect.
virtual bool Send(CEvent *evt, EDispatch disp_how=eDispatch_Default, int pool_name=ePool_Default)
Sends an event synchronously.
void SuspendJob(TJobID job_id)
CMutex m_MainMutex
guards this instance of the Dispatcher
void x_RemoveJobRecord(SJobRecord &rec)
void x_OnJobProgressNotify(SJobRecord &rec)
virtual void ShutDown()=0
stop any background threads and free resources associated with the Engine
bm::bvector m_CancelVect
Canceled jobs vector.
@ eEngine_UnknownJob
the job is not registered in the Engine
@ eEngineBusy
Engine is busy, caller needs to re-try the operation.
@ eEngine_InvalidOperation
Engine - operation is invalid.
@ eCanceled
Definition: app_job.hpp:91
@ eCompleted
Definition: app_job.hpp:89
@ eRunning
Definition: app_job.hpp:88
@ eSuspended
Definition: app_job.hpp:92
@ eInvalid
Definition: app_job.hpp:87
@ eFailed
Definition: app_job.hpp:90
TObjectType * GetPointer(void) const THROWS_NONE
Get pointer,.
Definition: ncbiobj.hpp:1684
TObjectType * GetPointer(void) THROWS_NONE
Get pointer,.
Definition: ncbiobj.hpp:998
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:1439
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:773
TObjectType & GetObject(void)
Get object.
Definition: ncbiobj.hpp:1011
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
void Lock(void)
Lock mutex.
bool TryLock(void)
Try locking mutex.
void Unlock(void)
Unlock mutex.
@ BM_GAP
GAP compression is ON.
Definition: bmconst.h:148
int i
#include<zmmintrin.h>
Definition: bm.h:78
EIPRangeType t
Definition: ncbi_localip.c:101
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Multi-threading – mutexes; rw-locks; semaphore.
SJobRecord describes a Job registered in Dispatcher.
SQueueItem - element of the Polling Queue.
#define _ASSERT
Modified on Fri Sep 20 14:57:25 2024 by modify_doxy.py rev. 669887