NCBI C++ ToolKit
queue_database.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef NETSCHEDULE_QUEUE_DATABASE__HPP
2 #define NETSCHEDULE_QUEUE_DATABASE__HPP
3 
4 
5 /* $Id: queue_database.hpp 84641 2018-11-26 13:25:46Z satskyse $
6  * ===========================================================================
7  *
8  * PUBLIC DOMAIN NOTICE
9  * National Center for Biotechnology Information
10  *
11  * This software/database is a "United States Government Work" under the
12  * terms of the United States Copyright Act. It was written as part of
13  * the author's official duties as a United States Government employee and
14  * thus cannot be copyrighted. This software/database is freely available
15  * to the public for use. The National Library of Medicine and the U.S.
16  * Government have not placed any restriction on its use or reproduction.
17  *
18  * Although all reasonable efforts have been taken to ensure the accuracy
19  * and reliability of the software and data, the NLM and the U.S.
20  * Government do not and cannot warrant the performance or results that
21  * may be obtained by using this software or data. The NLM and the U.S.
22  * Government disclaim all warranties, express or implied, including
23  * warranties of performance, merchantability or fitness for any particular
24  * purpose.
25  *
26  * Please cite the author in any work or product based on this material.
27  *
28  * ===========================================================================
29  *
30  * Authors: Anatoliy Kuznetsov, Victor Joukov
31  *
32  * File Description:
33  * Top level queue database (Thread-Safe, synchronized).
34  *
35  */
36 
37 
38 #include <corelib/ncbimtx.hpp>
39 #include <corelib/ncbicntr.hpp>
40 
41 #include <utility>
42 
45 #include "ns_util.hpp"
46 #include "job_status.hpp"
47 #include "queue_clean_thread.hpp"
48 #include "ns_notifications.hpp"
49 #include "ns_queue.hpp"
50 #include "queue_vc.hpp"
51 #include "background_host.hpp"
52 #include "ns_service_thread.hpp"
53 #include "ns_precise_time.hpp"
54 
56 
57 class CNetScheduleServer;
58 
59 
60 // Holds parameters together with a queue instance
61 typedef map<string,
62  pair<SQueueParameters, CRef<CQueue> >,
64 
65 
66 // Top level queue database. (Thread-Safe, synchronized.)
68 {
69 public:
71  const string & path,
72  unsigned int max_queues,
73  bool diskless,
74  bool reinit);
76 
77  // Read queue information from registry and configure queues
78  // accordingly.
79  // returns minimum run timeout, necessary for watcher thread
80  time_t Configure(const IRegistry & reg,
81  CJsonNode & diff);
82 
83  // Count Pending and Running jobs in all queues
84  unsigned int CountActiveJobs(void) const;
85  unsigned int CountAllJobs(void) const;
86  bool AnyJobs(void) const;
87 
88  CRef<CQueue> OpenQueue(const string & name);
89 
91  const string & qname, const string & qclass,
92  const string & description = "");
94  const string & qname);
95  SQueueParameters QueueInfo(const string & qname) const;
96  string GetQueueNames(const string & sep) const;
97 
98  void Close(void);
99  bool QueueExists(const string & qname) const;
100 
101  // Remove old jobs
102  void Purge(void);
103  void StopPurge(void);
104  void RunPurgeThread(void);
105  void StopPurgeThread(void);
106 
107  // Collect garbage from affinities
108  void PurgeAffinities(void);
109  void PurgeGroups(void);
110  void StaleWNodes(void);
111  void PurgeBlacklistedJobs(void);
112  void PurgeClientRegistry(void);
113 
114  // Notify all listeners
115  void NotifyListeners(void);
116  void RunNotifThread(void);
117  void StopNotifThread(void);
118  void WakeupNotifThread(void);
120 
121  // Print statistics
122  void PrintStatistics(size_t & aff_count);
123  void PrintJobCounters(void);
124  void RunServiceThread(void);
125  void StopServiceThread(void);
126 
127  void CheckExecutionTimeout(bool logging);
128  void RunExecutionWatcherThread(const CNSPreciseTime & run_delay);
129  void StopExecutionWatcherThread(void);
130 
131  string PrintTransitionCounters(void);
132  string PrintJobsStat(const CNSClientId & client);
133  string GetQueueClassesInfo(void) const;
134  string GetQueueClassesConfig(void) const;
135  string GetQueueInfo(void) const;
136  string GetQueueConfig(void) const;
137  string GetLinkedSectionConfig(void) const;
138 
139  map<string, string> GetLinkedSection(const string & section_name) const;
140 
141  // map: queue name -> pause state (integer, CQueue::EPauseState)
142  // the only paused queues are reported
143  map<string, int> GetPauseQueues(void) const;
144  vector<string> GetRefuseSubmitQueues(void) const;
145  string GetDataPath(void) const
146  { return m_DataPath; }
147 
148 private:
149  // No copy
152 
153 protected:
154  // get next job id (counter increment)
155  unsigned int GetNextId();
156 
157  // Returns first id for the batch
158  unsigned int GetNextIdBatch(unsigned int count);
159 
160 private:
161  void x_Open(bool reinit);
162  void x_CreateAndMountQueue(const string & qname,
163  const SQueueParameters & params);
164 
165  unsigned x_PurgeUnconditional(void);
166  void x_OptimizeStatusMatrix(const CNSPreciseTime & current_time);
167  bool x_CheckStopPurge(void);
169 
171  string m_DataPath;
172  string m_DumpPath;
173  unsigned int m_MaxQueues;
175 
177 
178  // Effective queue classes
180  // Effective queues
182 
183  bool m_StopPurge; // Purge stop flag
185  unsigned int m_FreeStatusMemCnt; // Free memory counter
186  time_t m_LastFreeMem; // time of the last memory opt
187 
192 
194 
195 private:
196  // Last scan attributes
197  string m_PurgeQueue; // The queue name
198  size_t m_PurgeStatusIndex; // Scanned status index
199  unsigned int m_PurgeJobScanned; // Scanned job ID within status
200 
201  // Linked sections support
203  // Section name -> section values
205 
206  bool x_PurgeQueue(CQueue & queue,
207  size_t status_to_start,
208  size_t status_to_end,
209  unsigned int start_job_id,
210  unsigned int end_job_id,
211  size_t max_scanned,
212  size_t max_mark_deleted,
213  const CNSPreciseTime & current_time,
214  size_t & total_scanned,
215  size_t & total_mark_deleted);
216  void x_DeleteQueuesAndClasses(void);
218  CRef<CQueue> x_GetFirst(void);
219  CRef<CQueue> x_GetNext(const string & current_name);
220 
221  // Crash detect support:
222  // - upon start the server creates CRASH_FLAG file
223  // - when gracefully finished the file is deleted
224  // - at the start it is checked if the file is there. If it is then
225  // it means the server crashed
226  void x_CreateCrashFlagFile(void);
227  bool x_DoesCrashFlagFileExist(void) const;
228  void x_RemoveCrashFlagFile(void);
229 
230  // Dump problem detect support:
231  // - upon dtart the server creates DUMP_ERROR_FLAG file
232  // - if all the NS info was dumped successfully the file is deleted
233  // - at the start it is checked if the file is there. If it is then
234  // it means the previous instance had problems dumping something
235  void x_CreateDumpErrorFlagFile(void);
236  bool x_DoesDumpErrorFlagFileExist(void) const;
237  void x_RemoveDumpErrorFlagFile(void);
238 
239 
240  bool x_ConfigureQueueClasses(const TQueueParams & classes_from_ini,
241  CJsonNode & diff);
242  bool x_ConfigureQueues(const TQueueParams & queues_from_ini,
243  CJsonNode & diff);
244 
247  const TQueueParams & classes);
248  void x_ReadLinkedSections(const IRegistry & reg,
249  CJsonNode & diff);
251  const map<string, string> & old_values,
252  const map<string, string> & new_values);
253 
254  void x_ValidateConfiguration(const TQueueParams & queues_from_ini) const;
255  unsigned int
256  x_CountQueuesToAdd(const TQueueParams & queues_from_ini) const;
257 
258  CRef<CQueue> x_GetQueueAt(unsigned int index);
259 
260  void x_Dump(void);
261  void x_DumpQueueOrClass(FILE * f,
262  const string & qname, const string & qclass,
263  bool is_queue,
264  const SQueueParameters & params);
265  void x_DumpLinkedSection(FILE * f, const string & sname,
266  const map<string, string> & values);
267  void x_RemoveDump(void);
268  void x_RemoveDataFiles(void);
269  void x_CreateStorageVersionFile(void);
270 
271  bool x_CheckOpenPreconditions(bool reinit);
272  void x_ReadDumpQueueDesrc(set<string, PNocase> & dump_static_queues,
273  map<string, string,
274  PNocase> & dump_dynamic_queues,
275  TQueueParams & dump_queue_classes);
277  void x_AppendDumpLinkedSections(void);
279  void x_BackupDump(void);
280  void x_CreateSpaceReserveFile(void);
281  bool x_RemoveSpaceReserveFile(void);
282  string x_GetDumpSpaceFileName(void) const;
283  void x_RestorePauseState(const map<string, int> & paused_queues);
284  void x_RestoreRefuseSubmitState(const vector<string> & refuse_submit_queues);
285 }; // CQueueDataBase
286 
287 
289 
290 #endif /* NETSCHEDULE_QUEUE_DATABASE__HPP */
291 
CFastMutex –.
Definition: ncbimtx.hpp:667
JSON node abstraction.
NetScheduler threaded server.
Definition: ns_server.hpp:57
string PrintJobsStat(const CNSClientId &client)
CRef< CServiceThread > m_ServiceThread
set< string, PNocase > x_GetConfigQueues(void)
SQueueParameters QueueInfo(const string &qname) const
void RunServiceThread(void)
CQueueDataBase(CNetScheduleServer *server, const string &path, unsigned int max_queues, bool diskless, bool reinit)
string GetQueueConfig(void) const
time_t Configure(const IRegistry &reg, CJsonNode &diff)
unsigned int m_MaxQueues
void RunExecutionWatcherThread(const CNSPreciseTime &run_delay)
CQueueDataBase(const CQueueDataBase &)
void PrintStatistics(size_t &aff_count)
void RunNotifThread(void)
CRef< CQueue > x_GetNext(const string &current_name)
unsigned int x_CountQueuesToAdd(const TQueueParams &queues_from_ini) const
unsigned int m_FreeStatusMemCnt
void x_AppendDumpLinkedSections(void)
void WakeupNotifThread(void)
CFastMutex m_ConfigureLock
void x_BackupDump(void)
CRef< CQueue > x_GetQueueAt(unsigned int index)
bool x_RemoveSpaceReserveFile(void)
void CreateDynamicQueue(const CNSClientId &client, const string &qname, const string &qclass, const string &description="")
void x_CreateDumpErrorFlagFile(void)
TQueueParams x_ReadIniFileQueueClassDescriptions(const IRegistry &reg)
bool x_CheckStopPurge(void)
string GetDataPath(void) const
string GetQueueInfo(void) const
void x_DeleteQueuesAndClasses(void)
void PurgeGroups(void)
void PurgeClientRegistry(void)
void x_ValidateConfiguration(const TQueueParams &queues_from_ini) const
void x_RemoveDump(void)
bool x_DoesDumpErrorFlagFileExist(void) const
void StopNotifThread(void)
bool AnyJobs(void) const
void DeleteDynamicQueue(const CNSClientId &client, const string &qname)
void StopPurgeThread(void)
void x_ReadLinkedSections(const IRegistry &reg, CJsonNode &diff)
CJsonNode x_DetectChangesInLinkedSection(const map< string, string > &old_values, const map< string, string > &new_values)
map< string, int > GetPauseQueues(void) const
CQueueDataBase & operator=(const CQueueDataBase &)
string GetQueueClassesConfig(void) const
void x_DumpQueueOrClass(FILE *f, const string &qname, const string &qclass, bool is_queue, const SQueueParameters &params)
void StopServiceThread(void)
void RunPurgeThread(void)
void StopExecutionWatcherThread(void)
bool QueueExists(const string &qname) const
CNSPreciseTime SendExactNotifications(void)
bool x_CheckOpenPreconditions(bool reinit)
void StaleWNodes(void)
void StopPurge(void)
map< string, map< string, string > > m_LinkedSections
void PurgeAffinities(void)
vector< string > GetRefuseSubmitQueues(void) const
unsigned int GetNextId()
void x_RemoveDumpErrorFlagFile(void)
bool x_DoesCrashFlagFileExist(void) const
bool x_ConfigureQueues(const TQueueParams &queues_from_ini, CJsonNode &diff)
void x_OptimizeStatusMatrix(const CNSPreciseTime &current_time)
CRef< CJobQueueCleanerThread > m_PurgeThread
CNetScheduleServer * m_Server
TQueueParams x_ReadIniFileQueueDescriptions(const IRegistry &reg, const TQueueParams &classes)
void x_RemoveDataFiles(void)
unsigned int GetNextIdBatch(unsigned int count)
CNSPreciseTime CalculateRuntimePrecision(void) const
CFastMutex m_PurgeLock
void x_CreateSpaceReserveFile(void)
unsigned int m_PurgeJobScanned
SQueueParameters x_SingleQueueInfo(TQueueInfo::const_iterator found) const
void x_CreateCrashFlagFile(void)
string GetQueueNames(const string &sep) const
void x_RestoreRefuseSubmitState(const vector< string > &refuse_submit_queues)
string PrintTransitionCounters(void)
TQueueInfo m_Queues
void NotifyListeners(void)
CFastMutex m_LinkedSectionsGuard
map< string, string > GetLinkedSection(const string &section_name) const
string x_GetDumpSpaceFileName(void) const
void x_CreateStorageVersionFile(void)
void x_Open(bool reinit)
CRef< CQueue > x_GetLastPurged(void)
unsigned x_PurgeUnconditional(void)
void x_RestorePauseState(const map< string, int > &paused_queues)
string GetLinkedSectionConfig(void) const
unsigned int CountActiveJobs(void) const
unsigned int CountAllJobs(void) const
CRef< CQueue > x_GetFirst(void)
void PrintJobCounters(void)
bool x_PurgeQueue(CQueue &queue, size_t status_to_start, size_t status_to_end, unsigned int start_job_id, unsigned int end_job_id, size_t max_scanned, size_t max_mark_deleted, const CNSPreciseTime &current_time, size_t &total_scanned, size_t &total_mark_deleted)
void x_DumpLinkedSection(FILE *f, const string &sname, const map< string, string > &values)
CRef< CJobQueueExecutionWatcherThread > m_ExeWatchThread
CBackgroundHost & m_Host
CRef< CQueue > OpenQueue(const string &name)
void x_ReadDumpQueueDesrc(set< string, PNocase > &dump_static_queues, map< string, string, PNocase > &dump_dynamic_queues, TQueueParams &dump_queue_classes)
void PurgeBlacklistedJobs(void)
string GetQueueClassesInfo(void) const
void CheckExecutionTimeout(bool logging)
bool x_ConfigureQueueClasses(const TQueueParams &classes_from_ini, CJsonNode &diff)
CRef< CGetJobNotificationThread > m_NotifThread
TQueueParams m_QueueClasses
void x_CreateAndMountQueue(const string &qname, const SQueueParameters &params)
void x_RemoveCrashFlagFile(void)
CRef –.
Definition: ncbiobj.hpp:618
IRegistry –.
Definition: ncbireg.hpp:73
Definition: map.hpp:338
string
Definition: cgiapp.hpp:687
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
NetSchedule job status tracker.
Multi-threading – mutexes; rw-locks; semaphore.
NetSchedule client specs.
double f(double x_, const double &y_)
Definition: njn_root.hpp:188
map< string, pair< SQueueParameters, CRef< CQueue > >, PNocase > TQueueInfo
NetSchedule queue client version control.
static CNamedPipeClient * client
Modified on Mon Jun 17 05:11:23 2024 by modify_doxy.py rev. 669887