46 #define DISP_CATCH_AND_THROW(message, job) \
47 catch (CException& e) { \
49 s += job.GetDescr(); \
50 NCBI_RETHROW(e, CAppJobException, eEngineFailed, s); \
51 } catch (std::exception& ee) { \
52 NCBI_THROW(CAppJobException, eFatalError, ee.what()); \
55 #define DISP_CATCH_AND_REPORT(message, job) \
56 catch (CException& e) { \
57 ERR_POST(message << job.GetDescr()); \
58 ERR_POST(e.ReportAll()); \
59 } catch (std::exception& ee) { \
60 ERR_POST(message << ee.what()); \
71 int report_period,
bool auto_delete)
72 : m_Job(&job), m_ID(id), m_State(
state),
73 m_Engine(engine), m_Listener(listener),
74 m_ReportPeriod(report_period),
75 m_AutoDelete(auto_delete)
139 static const char *kMessage =
"CAppJobDispatcher::ShutDown() ";
159 switch(e.GetErrCode()) {
167 }
catch (std::exception& ee) {
214 ERR_POST(
"CAppJobDispatcher::RegisterEngine() engine \"" << name <<
215 "\" is already registered");
225 it->second->RequestCancel();
232 IEngineParams* params)
242 bool auto_delete, IEngineParams* params)
246 return x_StartJob(job, engine_name, translator, report_period,
247 auto_delete, params);
254 bool auto_delete, IEngineParams* params)
256 return x_StartJob(job, engine_name, &listener, report_period,
257 auto_delete, params);
262 static const char*
kJobRegistered =
"Cannot start the job - it is already registered";
263 static const char*
kUnknownEngine =
"Cannot start the job - engine is not registred";
269 bool auto_delete, IEngineParams* params)
289 listener, report_period, auto_delete);
313 }
catch (std::exception& e) {
323 x_OnJobStarted(job, *engine, listener, report_period, auto_delete);
334 int report_period,
bool )
343 if(report_period > 0) {
346 time_t
t = time(
NULL) + report_period;
355 "CAppJobDispatcher::CancelJob() cannot cancel job";
357 "CAppJobDispatcher::CancelJob() cannot cancel job - the job is not running.";
359 "CAppJobDispatcher::CancelJob() cannot cancel job - the job is not registered.";
361 "CAppJobDispatcher: Tool failed with unspecified error";
376 LOG_POST(
"Repeated job cancel: ignored. job=" << job_id);
407 "CAppJobDispatcher::SuspendJob() cannot suspend job";
409 "CAppJobDispatcher::SuspendJob() cannot suspend job - the job is not running.";
411 "CAppJobDispatcher::SuspendJob() cannot suspend job - the job is not registered.";
440 "CAppJobDispatcher::ResumeJob() cannot resume job";
442 "CAppJobDispatcher::ResumeJob() cannot resume job - the job is not running.";
444 "CAppJobDispatcher::ResumeJob() cannot resume job - the job is not registered.";
472 "CAppJobDispatcher::DeleteJob() cannot delete job";
498 switch(e.GetErrCode()) {
505 }
catch (std::exception& ee) {
540 return rec->
m_Job->GetProgress();
584 ERR_POST(
"x_CAppJobDispatcher::x_GetRegisteredEngine() engine \""
585 << engine_name <<
"\" is not registered.");
588 return it->second.GetPointer();
630 "Exception in CAppJobDispatcher::x_OnJobStateChangedNotify() ";
647 LOG_POST(
"CAppJobDispatcher: Job Notification not delivered (muted/no listener) ");
710 for (
unsigned i = 0;
true; ++
i) {
829 string s =
"Job " + rec.
m_Job->GetDescr();
830 s +=
"Transition from state ";
871 string descr = rec.
m_Job->GetDescr();
872 LOG_POST(
Error <<
"Active progress reporting is requested for job \""
873 << descr <<
"\".\n The job must implement GetProgress() funtion!");
885 static const int kPeriod = 15;
886 bool done_something =
false;
890 time_t now = time(
NULL);
897 if(now >= check_time) {
898 done_something =
true;
909 return done_something;
928 if(rec->
m_State != new_state) {
941 time_t new_time = now + kPeriod;
948 return done_something;
977 ERR_POST(
"Job failed -- NULL error job_id= " << jobId);
static const char * kCancelErrNotReg
static const char * kJobRegistered
static const char * kResumeErrNotRunning
static const char * kSuspendErrNotReg
#define DISP_CATCH_AND_REPORT(message, job)
static const char * kResumeErrEngine
static const char * kDeleteErrEngine
static const char * kSuspendErrEngine
static const char * kSuspendErrNotRunning
static const char * kCannotStart
static const char * kResumeErrNotReg
static const char * kUnknownEngine
static const char * kCancelErrEngine
static const char * kCancelErrNotRunning
DEFINE_CLASS_STATIC_MUTEX(CAppJobDispatcher::sm_Mutex)
CAppJobDispatcher.
static const char * kListenerException
#define DISP_CATCH_AND_THROW(message, job)
static const char * kDefaultErrorMessage
CAppJobError Default implementation for IAppJobError - encapsulates a text error message.
CAppJobEventTranslator Standard Listener that generates notification events.
IAppJobListener Interface for components that need to be notified about changes in Jobs.
CAppJobNotification Notification send by CAppJobEventTranslator.
void Release()
Manually force the resource to be released.
bvector< Alloc > & set(size_type n, bool val=true)
Sets bit n if val is true, clears bit n if val is false.
container_type::const_iterator const_iterator
container_type::iterator iterator
const_iterator end() const
const_iterator find(const key_type &key) const
iterator insert(const value_type &val)
const_iterator begin() const
container_type::value_type value_type
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
void Error(CExceptionArgs_Base &args)
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
#define NCBI_CATCH(message)
Catch CExceptions as well This macro is deprecated - use *_X or *_XX variant instead of it.
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
#define NCBI_RETHROW(prev_exception, exception_class, err_code, message)
Generic macro to re-throw an exception.
virtual void StartJob(IAppJob &job, IEngineParams *params=NULL)=0
If Engine cannot start the Job and exception shall be thrown.
TJobID x_StartJob(IAppJob &job, const string &engine_name, CAppJobEventTranslator *listener, int report_period, bool auto_delete, IEngineParams *params)
void x_OnJobStateChangedNotify(SJobRecord &rec)
virtual void ResumeJob(IAppJob &job)=0
SJobRecord * x_GetJobRecord(TJobID job_id)
virtual ~CAppJobDispatcher()
bool x_IsCanceled(int job_id) const
virtual void SuspendJob(IAppJob &job)=0
IAppJobEngine * x_GetRegisteredEngine(const string &engine_name)
Helper functions these functions rely on external synchronization and do not throw / catch exception.
void RunSync(IAppJob &job, TJobID &jobId, CEventHandler &listener)
Runs jon synchronously sending job notifications synchronously Returns when job is finished.
TTimeToItem m_PollQueue
Job Index (by pointer)
void ShutDown()
Terminates all jobs and releases Engines.
static bool IsTerminal(TJobState state)
virtual void SetListener(IAppJobEngineListener *listener)=0
For "active" mode set a Listener that will be notified when the state of a Job changes.
void x_FlushStateEventQueue()
CAppJobDispatcher()
Release the singleton.
static CRef< CAppJobDispatcher > sm_Dispatcher
global dispatcher, this instance is used by default in most cases, however it is possible to create a...
static CAppJobDispatcher & GetInstance()
void Mute(bool bMute=true)
Mute all notifications.
void x_VerifyProgressNotNull(CAppJobDispatcher::SJobRecord &rec)
SJobRecord(IAppJob &job, TJobID id, TJobState state, IAppJobEngine &engine, CAppJobEventTranslator *listener, int report_period, bool auto_delete)
CAppJobDispatcher::SJobRecord.
virtual CConstIRef< IAppJobError > GetError()=0
Returns IAppJobError object describing internal error that caused the Job to fail.
bool IdleCallback()
this function shall be called in the the application idle function.
void SetDispatcher(CAppJobDispatcher &disp)
static void ReleaseInstance()
get the Singleton Dispatcher
virtual void CancelJob(IAppJob &job)=0
Cancel job in the engine If job is not running yet - just remove from the pending queue otherwise use...
static string StateToStr(TJobState state)
TStateEventQueue m_StateEventQueue
priority queue for Dispatcher to poll on
friend class CAppJobEventTranslator
virtual CRef< CObject > GetResult()=0
Returns the Job Result.
static string GetStatusString(TJobState job_state)
Debugging method for status strings.
bool m_AutoDelete
delete the record when job finishes
int m_ReportPeriod
if > 0, active progress reporting is required
CConstIRef< IAppJobError > GetJobError(TJobID job_id)
void ResumeJob(TJobID job_id)
CFastMutex m_EngineMutex
Engines registry mutex.
CIRef< CAppJobEventTranslator > m_Listener
if not null - "active" mode
void x_OnJobStarted(IAppJob &job, IAppJobEngine &, CAppJobEventTranslator *listener, int report_period, bool)
handles state transition
bool DeleteJob(TJobID job_id)
when a Job is deleted the listener is not notified
EJobState
Job states (describe FSM)
bool m_ShutDownInProgress
Shutdown flag.
virtual void OnEngineJobStateChanged(IAppJob &job, TJobState new_state)
IAppJobEngineListener.
TPtrToRec m_PtrToRec
Job Registry (index by JobID)
TJobID StartJob(IAppJob &job, const string &engine_name, IEngineParams *params=NULL)
Starts a Job on the specified engine in "passive mode" - no notifications or progress reports will be...
virtual string GetDescr() const =0
Returns a human readable description of the Job (optional)
virtual bool IsActive()=0
Returns true if Engine supports "active" model i.e.
virtual EJobState Run()=0
Function that does all the useful work, called by the Engine.
void x_AddJobRecord(SJobRecord &rec)
CRef< CObject > GetJobResult(TJobID job_id)
bool ActiveProgress() const
CConstIRef< IAppJobProgress > GetJobProgress(TJobID job_id)
TNameToEngine m_NameToEngine
Engines Registry.
TJobState GetJobState(TJobID job_id)
All Get() functions return values stored in the Registy not the actual.
virtual TJobState GetJobState(IAppJob &job) const =0
void CancelAllJobs()
Request to cancel all jobs (func returns without waiting)
void CancelJob(TJobID job_id)
bool RegisterEngine(const string &name, IAppJobEngine &engine)
Registers a new Engine, returns true if successful.
CConstIRef< IAppJobProgress > m_Progress
static void x_OnJobStateChanged(SJobRecord &rec, TJobState new_state)
Update job record, throws an exception if new state change is incorrect.
virtual bool Send(CEvent *evt, EDispatch disp_how=eDispatch_Default, int pool_name=ePool_Default)
Sends an event synchronously.
void SuspendJob(TJobID job_id)
CMutex m_MainMutex
guards this instance of the Dispatcher
void x_RemoveJobRecord(SJobRecord &rec)
void x_OnJobProgressNotify(SJobRecord &rec)
CFastMutex m_StateEventMutex
virtual void ShutDown()=0
stop any background threads and free resources associated with the Engine
bm::bvector m_CancelVect
Canceled jobs vector.
@ eEngine_UnknownJob
the job is not registered in the Engine
@ eEngineBusy
Engine is busy, caller needs to re-try the operation.
@ eEngine_InvalidOperation
Engine - operation is invalid.
TObjectType * GetPointer(void) const THROWS_NONE
Get pointer,.
TObjectType * GetPointer(void) THROWS_NONE
Get pointer,.
void Reset(void)
Reset reference object.
void Reset(void)
Reset reference object.
TObjectType & GetObject(void)
Get object.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
void Lock(void)
Lock mutex.
bool TryLock(void)
Try locking mutex.
void Unlock(void)
Unlock mutex.
@ BM_GAP
GAP compression is ON.
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Multi-threading – mutexes; rw-locks; semaphore.
SJobRecord describes a Job registered in Dispatcher.
SQueueItem - element of the Polling Queue.