1 #ifndef NETSCHEDULE_NS_QUEUE__HPP
2 #define NETSCHEDULE_NS_QUEUE__HPP
106 CQueue(
const string& queue_name,
130 unsigned int & max_input_size,
131 unsigned int & max_output_size,
160 const string & aff_token,
161 const string & group,
168 vector< pair<CJob, string> > & batch,
169 const string & group,
176 const string & job_key,
178 const string & auth_token,
185 unsigned int timeout,
187 const list<string> * aff_list,
190 bool exclusive_new_affinity,
191 bool prioritized_aff,
193 const list<string> * group_list,
196 string & added_pref_aff);
202 const list<string> & aff_to_add,
203 const list<string> & aff_to_del,
206 const list<string> & aff,
209 const string &
data,
int data_version);
221 string & client_phid,
222 string & progress_msg,
230 unsigned int address,
234 bool need_progress_msg,
235 size_t * last_event_index);
244 const string & job_key,
246 const string & auth_token,
251 const string & job_key,
252 const string & auth_token,
253 const string & aff_token,
254 const string & group,
255 bool & auth_token_ok,
259 const string & job_key,
269 const string & job_key,
271 bool is_ns_rollback =
false);
276 const string & group,
277 const string & aff_token,
278 const vector<TJobStatus> & statuses,
280 vector<string> & warnings);
295 unsigned int timeout,
296 const list<string> * aff_list,
297 bool reader_affinity,
299 bool exclusive_new_affinity,
300 bool prioritized_aff,
301 const list<string> * group_list,
302 bool affinity_may_change,
303 bool group_may_change,
307 string & added_pref_aff);
311 const string & job_key,
313 const string & auth_token);
317 const string & job_key,
319 const string & auth_token,
320 const string & err_msg,
325 const string & job_key,
327 const string & auth_token,
334 const string & job_key,
346 const string & job_key,
348 const string & auth_token,
349 const string & err_msg,
358 bool & client_was_found,
359 string & old_session,
360 bool & had_wn_pref_affs,
361 bool & had_reader_pref_affs);
383 unsigned int last_job,
393 unsigned int add_job_id,
396 unsigned int DeleteBatch(
unsigned int max_deleted);
409 const string & group,
410 const string & aff_token,
411 const vector<TJobStatus> & job_statuses,
412 unsigned int start_after_job_id,
425 bool & client_was_found,
426 bool & session_was_reset,
427 string & old_session,
428 bool & had_wn_pref_affs,
429 bool & had_reader_pref_affs);
439 const string & group_token,
440 const string & aff_token,
441 vector<string> & warnings)
const;
443 const string & group_token,
444 const string & aff_token,
446 vector<string> & warnings)
const;
468 void Dump(
const string & dump_dir_name);
469 void RemoveDump(
const string & dump_dir_name);
470 unsigned int LoadFromDump(
const string & dump_dir_name);
478 const string & job_key,
480 const string & auth_token,
481 const string & err_msg,
503 const vector<unsigned int> & aff_ids,
504 bool use_pref_affinity,
506 bool exclusive_new_affinity,
507 bool prioritized_aff,
514 const vector<unsigned int> & aff_ids,
515 bool use_pref_affinity,
517 bool exclusive_new_affinity,
518 bool prioritized_aff,
522 const string & scope);
529 unsigned int picked_earlier,
533 unsigned int picked_earlier,
535 const string & scope);
538 unsigned int picked_earlier,
542 unsigned int picked_earlier,
544 const string & scope);
547 const string & auth_token,
583 unsigned int timeout,
587 bool exclusive_new_affinity,
592 unsigned int timeout,
596 bool exclusive_new_affinity,
599 unsigned short port);
606 unsigned int start_after_job_id,
617 bool reader_affinity,
619 bool exclusive_new_affinity,
621 bool affinity_may_change,
622 bool group_may_change);
627 const string & job_key,
752 vector<CNetScheduleAPI::EJobStatus>
805 if (program_name.empty())
unsigned int Count(void) const
bool IsAllowed(unsigned int ha) const
NetScheduler threaded server.
All clients registered to connect.
bool IsMatchingClient(const CQueueClientInfo &cinfo) const
bool IsConfigured() const
CFastMutex m_OperationLock
void x_ResetReadingDueToClear(const CNSClientId &client, const TNSBitVector &jobs)
TPauseStatus GetPauseStatus(void) const
void SetPauseStatus(const CNSClientId &client, TPauseStatus status)
CFastMutex m_JobsToDeleteLock
unsigned int CountActiveJobs(void) const
map< string, size_t > x_GetRunningJobsPerClientIP(void)
CNSScopeRegistry m_ScopeRegistry
bool IsSubmitAllowed(unsigned host) const
void x_ResetRunningDueToNewSession(const CNSClientId &client, const TNSBitVector &jobs)
CNSPreciseTime m_ClientRegistryTimeoutSubmitter
TJobStatus ConfirmReadingJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token)
string DecorateJob(unsigned int job_id) const
void TimeLineRemove(unsigned int job_id)
void PurgeClientRegistry(const CNSPreciseTime ¤t_time)
map< unsigned int, CJob > m_Jobs
void TimeLineMove(unsigned int job_id, const CNSPreciseTime &old_time, const CNSPreciseTime &new_time)
unsigned int m_DumpAffBufferSize
int SetClientData(const CNSClientId &client, const string &data, int data_version)
unsigned m_ReadFailedRetries
TPauseStatus m_PauseStatus
CNetScheduleServer * m_Server
TJobStatus x_ChangeReadingStatus(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, const string &err_msg, TJobStatus target_status, bool is_ns_rollback, bool no_retries)
unsigned CountStatus(TJobStatus) const
bool IsProgramAllowed(const string &program_name) const
CQueueDataBase & m_QueueDB
CNSPreciseTime m_PendingTimeout
CJobStatusTracker m_StatusTracker
size_t GetClientsCount(void) const
unsigned m_MaxJobsPerClient
void x_RegisterGetListener(const CNSClientId &client, unsigned short port, unsigned int timeout, const TNSBitVector &aff_ids, bool wnode_aff, bool any_aff, bool exclusive_new_affinity, bool new_format, const TNSBitVector &group_ids)
void x_CheckExecutionTimeout(const CNSPreciseTime &queue_run_timeout, const CNSPreciseTime &queue_read_timeout, unsigned job_id, const CNSPreciseTime &curr_time, bool logging)
unsigned int CancelAllJobs(const CNSClientId &client, bool logging)
string PrintNotificationsList(bool verbose) const
unsigned int m_ClientRegistryMinWorkerNodes
void SetAffinity(const CNSClientId &client, const list< string > &aff, ECommandGroup cmd_group)
const string & GetQueueName() const
void SetRefuseSubmits(bool val)
void PrintStatistics(size_t &aff_count) const
void StatusStatistics(TJobStatus status, TNSBitVector::statistics *st) const
unsigned int LoadFromDump(const string &dump_dir_name)
string PrintClientsList(bool verbose) const
x_SJobPick x_FindOutdatedPendingJob(const CNSClientId &client, unsigned int picked_earlier, const TNSBitVector &group_ids)
void NotifyListenersPeriodically(const CNSPreciseTime ¤t_time)
CNSPreciseTime m_ReadTimeout
CStatisticsCounters m_StatisticsCountersLastPrinted
bool PutProgressMessage(unsigned int job_id, CJob &job, const string &msg)
unsigned int CancelSelectedJobs(const CNSClientId &client, const string &group, const string &aff_token, const vector< TJobStatus > &statuses, bool logging, vector< string > &warnings)
bool ShouldPerfLogTransitions(void) const
void CheckExecutionTimeout(bool logging)
unsigned SubmitBatch(const CNSClientId &client, vector< pair< CJob, string > > &batch, const string &group, bool logging, CNSRollbackInterface *&rollback_action)
TJobStatus ReadAndTouchJob(unsigned int job_id, CJob &job, CNSPreciseTime *lifetime)
void RestorePauseStatus(TPauseStatus status)
CNSPreciseTime m_ClientRegistryTimeoutReader
unsigned int m_ClientRegistryMinUnknowns
void TimeLineAdd(unsigned int job_id, const CNSPreciseTime &job_time)
unsigned int GetJobsToDeleteCount(void) const
CNSPreciseTime m_ReadBlacklistTime
TJobStatus FailReadingJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, const string &err_msg, bool no_retries)
TJobStatus SetJobListener(unsigned int job_id, CJob &job, unsigned int address, unsigned short port, const CNSPreciseTime &timeout, bool need_stolen, bool need_progress_msg, size_t *last_event_index)
void ClearWorkerNode(const CNSClientId &client, bool &client_was_found, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
CNSPreciseTime m_ReaderTimeout
void RegisterQueueResumeNotification(unsigned int address, unsigned short port, bool new_format)
string GetAffinityTokenByID(unsigned int aff_id) const
CJobTimeLine * m_RunTimeLine
void Dump(const string &dump_dir_name)
bool GetJobForReadingOrWait(const CNSClientId &client, unsigned int port, unsigned int timeout, const list< string > *aff_list, bool reader_affinity, bool any_affinity, bool exclusive_new_affinity, bool prioritized_aff, const list< string > *group_list, bool affinity_may_change, bool group_may_change, CJob *job, bool *no_more_jobs, CNSRollbackInterface *&rollback_action, string &added_pref_aff)
unsigned int PurgeAffinities(void)
void UpdatePerfLoggingSettings(const string &qclass)
void SetParameters(const SQueueParameters ¶ms)
CNetScheduleKeyGenerator m_KeyGenerator
TJobStatus JobDelayExpiration(unsigned int job_id, CJob &job, const CNSPreciseTime &tm)
void SetClientScope(const CNSClientId &client)
CQueueClientInfoList m_ProgramVersionList
unsigned int x_CancelJobs(const CNSClientId &client, const TNSBitVector &jobs_to_cancel, bool logging)
string PrintTransitionCounters(void) const
string PrintScopesList(bool verbose) const
CNSPreciseTime m_BlacklistTime
CNSPreciseTime NotifyExactListeners(void)
void CancelWaitRead(const CNSClientId &client)
CNetScheduleAccessList m_SubmHosts
TJobStatus FailJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, const string &err_msg, const string &output, int ret_code, bool no_retries, string warning)
TJobStatus ReturnReadingJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, bool is_ns_rollback, bool blacklist, TJobStatus target_status)
unsigned int GetNextJobIdForBatch(unsigned count)
TParameterList GetParameters() const
void MarkClientAsAdmin(const CNSClientId &client)
bool x_UnregisterGetListener(const CNSClientId &client, unsigned short port)
TQueueKind GetQueueKind(void) const
CNSClientsRegistry m_ClientsRegistry
size_t GetNotifCount(void) const
unsigned GetFailedRetries() const
string x_GetJobsDumpFileName(const string &dump_dname) const
TJobStatus JobDelayReadExpiration(unsigned int job_id, CJob &job, const CNSPreciseTime &tm)
string MakeJobKey(unsigned int job_id) const
string PrintJobDbStat(const CNSClientId &client, unsigned int job_id, TDumpFields dump_fields)
TJobStatus PutResult(const CNSClientId &client, const CNSPreciseTime &curr, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, int ret_code, const string &output)
unsigned int m_JobsToDeleteOps
bool GetRefuseSubmits(void) const
size_t GetGroupsCount(void) const
unsigned int Submit(const CNSClientId &client, CJob &job, const string &aff_token, const string &group, bool logging, CNSRollbackInterface *&rollback_action)
bool GetJobOrWait(const CNSClientId &client, unsigned short port, unsigned int timeout, const CNSPreciseTime &curr, const list< string > *aff_list, bool wnode_affinity, bool any_affinity, bool exclusive_new_affinity, bool prioritized_aff, bool new_format, const list< string > *group_list, CJob *new_job, CNSRollbackInterface *&rollback_action, string &added_pref_aff)
TJobStatus GetStatusAndLifetime(unsigned int job_id, string &client_ip, string &client_sid, string &client_phid, string &progress_msg, CNSPreciseTime *lifetime)
void GetJobsPerState(const CNSClientId &client, const string &group_token, const string &aff_token, size_t *jobs, vector< string > &warnings) const
void RemoveDump(const string &dump_dir_name)
TJobStatus x_ResetDueTo(const CNSClientId &client, unsigned int job_id, const CNSPreciseTime ¤t_time, TJobStatus status_from, CJobEvent::EJobEvent event_type)
CNSPreciseTime m_ClientRegistryTimeoutAdmin
unsigned int CountAllJobs(void) const
SPurgeAttributes CheckJobsExpiry(const CNSPreciseTime ¤t_time, SPurgeAttributes attributes, unsigned int last_job, TJobStatus status)
void x_LogSubmit(const CJob &job)
void x_Erase(const TNSBitVector &job_ids, TJobStatus status)
Erase jobs from all structures, request delayed db deletion.
unsigned int m_DumpGroupBufferSize
x_SJobPick x_FindOutdatedJobForReading(const CNSClientId &client, unsigned int picked_earlier, const TNSBitVector &group_ids)
CNSPreciseTime m_WNodeTimeout
list< pair< string, string > > TParameterList
CNetScheduleAccessList m_WnodeHosts
TJobStatus ReturnJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, string &warning, TJobReturnOption how)
unsigned int m_ClientRegistryMinSubmitters
bool IsReaderAllowed(unsigned host) const
void TimeLineExchange(unsigned int remove_job_id, unsigned int add_job_id, const CNSPreciseTime &new_time)
void x_ResetReadingDueToNewSession(const CNSClientId &client, const TNSBitVector &jobs)
CNSPreciseTime m_ClientRegistryTimeoutWorkerNode
string PrintGroupsList(const CNSClientId &client, bool verbose) const
TJobStatus GetJobStatus(unsigned job_id) const
CNSPreciseTime GetMaxPendingWaitTimeout() const
void GetLinkedSections(map< string, map< string, string > > &linked_sections) const
unsigned int m_ReadJobsOps
TNSBitVector m_JobsToDelete
string PrintAllJobDbStat(const CNSClientId &client, const string &group, const string &aff_token, const vector< TJobStatus > &job_statuses, unsigned int start_after_job_id, unsigned int count, bool order_first, TDumpFields dump_fields, bool logging)
TJobStatus RescheduleJob(const CNSClientId &client, unsigned int job_id, const string &job_key, const string &auth_token, const string &aff_token, const string &group, bool &auth_token_ok, CJob &job)
CNSPreciseTime GetTimeout() const
CNSPreciseTime GetReadTimeout() const
string PrintAffinitiesList(const CNSClientId &client, bool verbose) const
string x_DumpJobs(const TNSBitVector &jobs_to_dump, unsigned int start_after_job_id, unsigned int count, TDumpFields dump_fields, bool order_first)
unsigned int m_ClientRegistryMinReaders
CNSNotificationList m_NotificationsList
CQueue(const string &queue_name, TQueueKind queue_kind, CNetScheduleServer *server, CQueueDataBase &qdb)
size_t GetGroupSlotsUsed(void) const
void RegisterSocketWriteError(const CNSClientId &client)
size_t GetAffSlotsUsed(void) const
unsigned int m_ClientRegistryMinAdmins
void x_UpdateDB_ProvideJobNoLock(const CNSClientId &client, const CNSPreciseTime &curr, unsigned int job_id, ECommandGroup cmd_group, CJob &job)
CNSPreciseTime m_MaxPendingReadWaitTimeout
CNSPreciseTime m_ClientRegistryTimeoutUnknown
unsigned int PurgeGroups(void)
size_t GetScopeSlotsUsed(void) const
CStatisticsCounters m_StatisticsCounters
bool x_NoMoreReadJobs(const CNSClientId &client, const TNSBitVector &aff_list, bool reader_affinity, bool any_affinity, bool exclusive_new_affinity, const TNSBitVector &group_list, bool affinity_may_change, bool group_may_change)
bool IsWorkerAllowed(unsigned host) const
CNSAffinityRegistry m_AffinityRegistry
unsigned int m_DumpBufferSize
const bool & m_LogBatchEachJob
x_SJobPick x_FindVacantJob(const CNSClientId &client, const TNSBitVector &explicit_affs, const vector< unsigned int > &aff_ids, bool use_pref_affinity, bool any_affinity, bool exclusive_new_affinity, bool prioritized_aff, const TNSBitVector &group_ids, bool has_groups, ECommandGroup cmd_group)
CNetScheduleAccessList m_ReaderHosts
CNSPreciseTime GetPendingTimeout() const
CNSPreciseTime x_GetEstimatedJobLifetime(unsigned int job_id, TJobStatus status) const
map< string, string > m_LinkedSections
void PrintJobCounters(void) const
size_t GetGCBacklogCount(void) const
TJobStatus RedoJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job)
TJobStatus RereadJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, bool &no_op)
CNSGroupsRegistry m_GroupRegistry
void x_NotifyJobChanges(const CJob &job, const string &job_key, ENotificationReason reason, const CNSPreciseTime ¤t_time)
CNSPreciseTime GetRunTimeout() const
CNSPreciseTime m_NotifHifreqPeriod
string PrintJobsStat(const CNSClientId &client, const string &group_token, const string &aff_token, vector< string > &warnings) const
void GetMaxIOSizesAndLinkedSections(unsigned int &max_input_size, unsigned int &max_output_size, map< string, map< string, string > > &linked_sections) const
unsigned int m_DumpClientBufferSize
void StaleNodes(const CNSPreciseTime ¤t_time)
void x_ResetRunningDueToClear(const CNSClientId &client, const TNSBitVector &jobs)
CJobGCRegistry m_GCRegistry
void PurgeBlacklistedJobs(void)
bool x_ValidateMaxJobsPerClientIP(unsigned int job_id, const map< string, size_t > &jobs_per_client_ip) const
void CountTransition(CNetScheduleAPI::EJobStatus from, CNetScheduleAPI::EJobStatus to)
TJobStatus GetStatusAndLifetimeAndTouch(unsigned int job_id, CJob &job, CNSPreciseTime *lifetime)
void TouchClientsRegistry(CNSClientId &client, bool &client_was_found, bool &session_was_reset, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
void EraseJob(unsigned job_id, TJobStatus status)
void CancelWaitGet(const CNSClientId &client)
CNSPreciseTime m_MaxPendingWaitTimeout
list< string > ChangeAffinity(const CNSClientId &client, const list< string > &aff_to_add, const list< string > &aff_to_del, ECommandGroup cmd_group)
void x_UpdateDB_PutResultNoLock(unsigned job_id, const string &auth_token, const CNSPreciseTime &curr, int ret_code, const string &output, CJob &job, const CNSClientId &client)
bool m_ShouldPerfLogTransitions
CNSPreciseTime m_RunTimeout
void x_RegisterReadListener(const CNSClientId &client, unsigned short port, unsigned int timeout, const TNSBitVector &aff_ids, bool reader_aff, bool any_aff, bool exclusive_new_affinity, const TNSBitVector &group_ids)
unsigned int m_NotifLofreqMult
unsigned int DeleteBatch(unsigned int max_deleted)
CNSPreciseTime m_StatisticsCountersLastPrintedTimestamp
CNSPreciseTime m_HandicapTimeout
CRWLock m_RunTimeLineLock
vector< CNetScheduleAPI::EJobStatus > m_StatesForRead
TJobStatus Cancel(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, bool is_ns_rollback=false)
CNSPreciseTime m_NotifHifreqInterval
void CountTransition(CNetScheduleAPI::EJobStatus from, CNetScheduleAPI::EJobStatus to, ETransitionPathOption path_option=eNone)
Timeline class for fast approximate time tracking.
Bitvector Bit-vector container with runtime compression of bits.
size_type count() const noexcept
population count (count of ON bits)
static const struct attribute attributes[]
static SQLCHAR output[256]
EJobStatus
Job status codes.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
void ParseVersionString(const string &vstr, string *program_name, CVersionInfo *ver)
Parse string, extract version info and program name (case insensitive)
NetSchedule job status tracker.
Process information in the NCBI Registry, including working with configuration files.
The NCBI C++/STL use hints.
NetSchedule garbage collection registry.
CTimeLine< TNSBitVector > CJobTimeLine
NetSchedule queue client version control.
static CNamedPipeClient * client
Netschedule queue client info.
CVersionInfo version_info
x_SJobPick(unsigned int jid, bool excl, unsigned int aid)
Statistical information about bitset's memory allocation details.