NCBI C++ ToolKit
job_status.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: job_status.cpp 89004 2020-02-11 19:47:28Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Anatoliy Kuznetsov, Victor Joukov
27  *
28  * File Description: Network scheduler job status store
29  *
30  */
31 #include <ncbi_pch.hpp>
32 #include <corelib/ncbi_system.hpp>
33 
36 #include <util/bitset/bmalgo.h>
37 
38 #include "job_status.hpp"
39 #include "ns_gc_registry.hpp"
40 
41 
43 
44 
46  : m_DoneCnt(0)
47 {
48  // Note: one bit vector is not used - the corresponding job state became
49  // obsolete and was deleted. The matrix though uses job statuses as indexes
50  // for fast access, so that missed status vector is also created. The rest
51  // of the code iterates only through the valid states.
52  for (int i = 0; i < CNetScheduleAPI::eLastStatus; ++i) {
53  m_StatusStor.push_back(new TNSBitVector());
54  }
55 }
56 
57 
59 {
60  for (int i = 0; i < CNetScheduleAPI::eLastStatus; ++i) {
61  delete m_StatusStor[i];
62  }
63 }
64 
65 
67 {
68  CReadLockGuard guard(m_Lock);
69 
70  for (size_t k = 0; k < g_ValidJobStatusesSize; ++k) {
72 
73  if (bv.get_bit(job_id))
74  return g_ValidJobStatuses[k];
75  }
77 }
78 
79 
80 unsigned int CJobStatusTracker::CountStatus(TJobStatus status) const
81 {
82  CReadLockGuard guard(m_Lock);
83 
84  return m_StatusStor[(int)status]->count();
85 }
86 
87 
88 unsigned int
89 CJobStatusTracker::CountStatus(const vector<TJobStatus> & statuses) const
90 {
91  unsigned int cnt = 0;
92  CReadLockGuard guard(m_Lock);
93 
94  for (vector<TJobStatus>::const_iterator k = statuses.begin();
95  k != statuses.end(); ++k)
96  cnt += m_StatusStor[(int)(*k)]->count();
97 
98  return cnt;
99 }
100 
101 
102 vector<unsigned int>
103 CJobStatusTracker::GetJobCounters(const vector<TJobStatus> & statuses) const
104 {
105  vector<unsigned int> counters;
106  CReadLockGuard guard(m_Lock);
107 
108  for (vector<TJobStatus>::const_iterator k = statuses.begin();
109  k != statuses.end(); ++k)
110  counters.push_back(m_StatusStor[(int)(*k)]->count());
111 
112  return counters;
113 }
114 
115 
116 unsigned int CJobStatusTracker::Count(void) const
117 {
118  unsigned int cnt = 0;
119  CReadLockGuard guard(m_Lock);
120 
121  for (size_t k = 0; k < g_ValidJobStatusesSize; ++k)
122  cnt += m_StatusStor[g_ValidJobStatuses[k]]->count();
123 
124  return cnt;
125 }
126 
127 
128 unsigned int CJobStatusTracker::GetMinJobID(void) const
129 {
130  unsigned int id = 0;
131  CReadLockGuard guard(m_Lock);
132 
133  for (size_t k = 0; k < g_ValidJobStatusesSize; ++k) {
135  if (!bv.any())
136  continue;
137  if (id == 0)
138  id = bv.get_first();
139  else {
140  unsigned int first = bv.get_first();
141  if (first < id)
142  id = first;
143  }
144  }
145 
146  return id;
147 }
148 
149 
151 {
152  CReadLockGuard guard(m_Lock);
153 
154  for (size_t k = 0; k < g_ValidJobStatusesSize; ++k)
155  if (m_StatusStor[g_ValidJobStatuses[k]]->any())
156  return true;
157  return false;
158 }
159 
160 
162 {
163  CReadLockGuard guard(m_Lock);
164 
165  return m_StatusStor[(int)status]->any();
166 }
167 
168 
169 bool CJobStatusTracker::AnyJobs(const vector<TJobStatus> & statuses) const
170 {
171  CReadLockGuard guard(m_Lock);
172 
173  for (vector<TJobStatus>::const_iterator k = statuses.begin();
174  k != statuses.end(); ++k)
175  if (m_StatusStor[(int)(*k)]->any())
176  return true;
177 
178  return false;
179 }
180 
181 
182 
184  TNSBitVector::statistics * st) const
185 {
186  _ASSERT(st);
187  CReadLockGuard guard(m_Lock);
188  const TNSBitVector & bv = *m_StatusStor[(int)status];
189 
190  bv.calc_stat(st);
191 }
192 
193 
194 void CJobStatusTracker::SetStatus(unsigned job_id, TJobStatus status)
195 {
197  CWriteLockGuard guard(m_Lock);
198 
199  for (size_t k = 0; k < g_ValidJobStatusesSize; ++k) {
201 
202  if (bv.get_bit(job_id)) {
203  if (old_status != CNetScheduleAPI::eJobNotFound)
204  NCBI_THROW(CNetScheduleException, eInternalError,
205  "State matrix was damaged, more than one status "
206  "active for job " + to_string(job_id));
207  old_status = g_ValidJobStatuses[k];
208 
209  if (status != g_ValidJobStatuses[k])
210  bv.set_bit(job_id, false);
211  } else {
212  if (status == g_ValidJobStatuses[k])
213  bv.set_bit(job_id, true);
214  }
215  }
216 }
217 
218 
219 void CJobStatusTracker::AddPendingJob(unsigned int job_id)
220 {
221  CWriteLockGuard guard(m_Lock);
223 }
224 
225 
226 void CJobStatusTracker::Erase(unsigned job_id)
227 {
229 }
230 
231 
233 {
234  CWriteLockGuard guard(m_Lock);
235 
236  for (size_t k = 0; k < g_ValidJobStatusesSize; ++k) {
238 
239  *bv |= bv1;
240  bv1.clear(true);
241  }
242 }
243 
244 
246 {
247  CWriteLockGuard guard(m_Lock);
248 
249  for (size_t k = 0; k < g_ValidJobStatusesSize; ++k) {
250  m_StatusStor[g_ValidJobStatuses[k]]->clear(true);
251  }
252 }
253 
254 
256 {
257  for (size_t k = 0; k < g_ValidJobStatusesSize; ++k) {
259  {{
260  CWriteLockGuard guard(m_Lock);
262  }}
263  }
264 }
265 
266 
268  TJobStatus status,
269  bool set_clear)
270 {
271  TNSBitVector & bv = *m_StatusStor[(int)status];
272  bv.set(job_id, set_clear);
273 }
274 
275 
276 void CJobStatusTracker::AddPendingBatch(unsigned job_id_from,
277  unsigned job_id_to)
278 {
279  CWriteLockGuard guard(m_Lock);
280  m_StatusStor[(int) CNetScheduleAPI::ePending]->set_range(job_id_from,
281  job_id_to);
282 }
283 
284 
285 unsigned int
287  const TNSBitVector & unwanted_jobs,
288  const TNSBitVector & restrict_jobs,
289  bool restricted) const
290 {
291  TNSBitVector & bv = *m_StatusStor[(int)status];
293  CReadLockGuard guard(m_Lock);
294 
295  for (en = bv.first(); en.valid(); ++en) {
296  unsigned int job_id = *en;
297  if (unwanted_jobs.get_bit(job_id))
298  continue;
299  if (restricted) {
300  if (restrict_jobs.get_bit(job_id))
301  return job_id;
302  } else {
303  return job_id;
304  }
305  }
306  return 0;
307 }
308 
309 
310 unsigned int
311 CJobStatusTracker::GetJobByStatus(const vector<TJobStatus> & statuses,
312  const TNSBitVector & unwanted_jobs,
313  const TNSBitVector & restrict_jobs,
314  bool restricted) const
315 {
316  TNSBitVector jobs;
318  CReadLockGuard guard(m_Lock);
319 
320  for (vector<TJobStatus>::const_iterator k = statuses.begin();
321  k != statuses.end(); ++k)
322  jobs |= *m_StatusStor[(int)(*k)];
323 
324  for (en = jobs.first(); en.valid(); ++en) {
325  unsigned int job_id = *en;
326  if (unwanted_jobs.get_bit(job_id))
327  continue;
328  if (restricted) {
329  if (restrict_jobs.get_bit(job_id))
330  return job_id;
331  } else {
332  return job_id;
333  }
334  }
335  return 0;
336 }
337 
338 
339 void
340 CJobStatusTracker::GetJobs(const vector<TJobStatus> & statuses,
341  TNSBitVector & jobs) const
342 {
343  CReadLockGuard guard(m_Lock);
344 
345  for (vector<TJobStatus>::const_iterator k = statuses.begin();
346  k != statuses.end(); ++k)
347  jobs |= *m_StatusStor[(int)(*k)];
348 }
349 
350 
351 void
353  TNSBitVector & jobs) const
354 {
355  CReadLockGuard guard(m_Lock);
356  jobs = *m_StatusStor[(int)status];
357 }
358 
359 
362  const CJobGCRegistry & gc_registry) const
363 {
364  static CNSPreciseTime s_LastTimeout = kTimeZero;
365  static unsigned int s_LastCheckedJobID = 0;
366  static const size_t kMaxCandidates = 100;
367 
369 
370  if (timeout == kTimeZero)
371  return result; // Not configured
372 
373  size_t count = 0;
374  const TNSBitVector & pending_jobs = *m_StatusStor[(int) CNetScheduleAPI::ePending];
375  CNSPreciseTime limit = CNSPreciseTime::Current() - timeout;
376  CReadLockGuard guard(m_Lock);
377 
378  if (s_LastTimeout != timeout) {
379  s_LastTimeout = timeout;
380  s_LastCheckedJobID = 0;
381  }
382 
383  TNSBitVector::enumerator en(pending_jobs.first());
384  for (; en.valid() && count < kMaxCandidates; ++en, ++count) {
385  unsigned int job_id = *en;
386  if (job_id <= s_LastCheckedJobID) {
387  result.set_bit(job_id, true);
388  continue;
389  }
390  if (gc_registry.GetPreciseSubmitTime(job_id) < limit) {
391  s_LastCheckedJobID = job_id;
392  result.set_bit(job_id, true);
393  continue;
394  }
395 
396  // No more old jobs
397  break;
398  }
399 
400  return result;
401 }
402 
403 
406  const TNSBitVector & read_jobs,
407  const CJobGCRegistry & gc_registry) const
408 {
409  static CNSPreciseTime s_LastTimeout = kTimeZero;
410  static unsigned int s_LastCheckedJobID = 0;
411  static const size_t kMaxCandidates = 100;
412 
414 
415  if (timeout == kTimeZero)
416  return result; // Not configured
417 
418  size_t count = 0;
419  CNSPreciseTime limit = CNSPreciseTime::Current() - timeout;
420  CReadLockGuard guard(m_Lock);
421  TNSBitVector candidates;
422 
423  candidates = *m_StatusStor[(int) CNetScheduleAPI::eDone] |
426 
427  // Exclude jobs which have been read or in a process of reading
428  candidates -= read_jobs;
429 
430  if (s_LastTimeout != timeout) {
431  s_LastTimeout = timeout;
432  s_LastCheckedJobID = 0;
433  }
434 
435  TNSBitVector::enumerator en(candidates.first());
436  for (; en.valid() && count < kMaxCandidates; ++en, ++count) {
437  unsigned int job_id = *en;
438  if (job_id <= s_LastCheckedJobID) {
439  result.set_bit(job_id, true);
440  continue;
441  }
442  if (gc_registry.GetPreciseReadVacantTime(job_id) < limit) {
443  s_LastCheckedJobID = job_id;
444  result.set_bit(job_id, true);
445  continue;
446  }
447 
448  // No more old jobs
449  break;
450  }
451 
452  return result;
453 }
454 
455 
457 {
459  CReadLockGuard guard(m_Lock);
460 
461  return bv.any();
462 }
463 
464 
465 unsigned CJobStatusTracker::GetNext(TJobStatus status, unsigned job_id) const
466 {
467  const TNSBitVector & bv = *m_StatusStor[(int)status];
468  CReadLockGuard guard(m_Lock);
469 
470  return bv.get_next(job_id);
471 }
472 
473 
475 {
476  ++m_DoneCnt;
477  if (m_DoneCnt == 65535 * 2) {
478  m_DoneCnt = 0;
479  {{
482  }}
483  {{
486  }}
487  }
488 }
489 
491 
Algorithms for bvector<> (main include)
CNSPreciseTime GetPreciseReadVacantTime(unsigned int job_id) const
CNSPreciseTime GetPreciseSubmitTime(unsigned int job_id) const
unsigned GetNext(TJobStatus status, unsigned job_id) const
Definition: job_status.cpp:465
void SetStatus(unsigned int job_id, TJobStatus status)
Definition: job_status.cpp:194
TStatusStorage m_StatusStor
Definition: job_status.hpp:160
void Erase(unsigned job_id)
Definition: job_status.cpp:226
void GetJobs(const vector< TJobStatus > &statuses, TNSBitVector &jobs) const
Definition: job_status.cpp:340
void AddPendingJob(unsigned int job_id)
Definition: job_status.cpp:219
void StatusStatistics(TJobStatus status, TNSBitVector::statistics *st) const
Definition: job_status.cpp:183
bool AnyPending() const
Definition: job_status.cpp:456
vector< unsigned int > GetJobCounters(const vector< TJobStatus > &statuses) const
Definition: job_status.cpp:103
void SetExactStatusNoLock(unsigned int job_id, TJobStatus status, bool set_clear)
Definition: job_status.cpp:267
unsigned int GetMinJobID(void) const
Definition: job_status.cpp:128
unsigned int CountStatus(TJobStatus status) const
Definition: job_status.cpp:80
void x_IncDoneJobs(void)
Definition: job_status.cpp:474
bool AnyJobs(void) const
Definition: job_status.cpp:150
TJobStatus GetStatus(unsigned job_id) const
Definition: job_status.cpp:66
TNSBitVector GetOutdatedReadVacantJobs(CNSPreciseTime timeout, const TNSBitVector &read_jobs, const CJobGCRegistry &gc_registry) const
Definition: job_status.cpp:405
void AddPendingBatch(unsigned job_id_from, unsigned job_id_to)
Definition: job_status.cpp:276
unsigned int GetJobByStatus(TJobStatus status, const TNSBitVector &unwanted_jobs, const TNSBitVector &restrict_jobs, bool restricted) const
Definition: job_status.cpp:286
TNSBitVector GetOutdatedPendingJobs(CNSPreciseTime timeout, const CJobGCRegistry &gc_registry) const
Definition: job_status.cpp:361
unsigned int Count(void) const
Definition: job_status.cpp:116
void ClearAll(void)
Definition: job_status.cpp:245
static CNSPreciseTime Current(void)
NetSchedule internal exception.
Constant iterator designed to enumerate "ON" bits.
Definition: bm.h:603
bool valid() const noexcept
Checks if iterator is still valid.
Definition: bm.h:283
Bitvector Bit-vector container with runtime compression of bits.
Definition: bm.h:115
@ opt_free_01
Free unused 0 and 1 blocks.
Definition: bm.h:136
@ opt_free_0
Free unused 0 blocks.
Definition: bm.h:135
bool get_bit(size_type n) const noexcept
returns true if bit n is set and false is bit n is 0.
Definition: bm.h:3602
bool any() const noexcept
Returns true if any bits in this bitset are set, and otherwise returns false.
Definition: bm.h:2451
bvector< Alloc > & set(size_type n, bool val=true)
Sets bit n if val is true, clears bit n if val is false.
Definition: bm.h:4188
void optimize(bm::word_t *temp_block=0, optmode opt_mode=opt_compress, statistics *stat=0)
Optimize memory bitvector's memory allocation.
Definition: bm.h:3635
bool set_bit(size_type n, bool val=true)
Sets bit n.
Definition: bm.h:4227
enumerator first() const
Returns enumerator pointing on the first non-zero bit.
Definition: bm.h:1871
size_type get_next(size_type prev) const noexcept
Finds the number of the next bit ON.
Definition: bm.h:1609
void clear(const size_type *ids, size_type ids_size, bm::sort_order so=bm::BM_UNKNOWN)
clear list of bits in this bitset
Definition: bm.h:4149
void calc_stat(struct bm::bvector< Alloc >::statistics *st) const noexcept
Calculates bitvector statistics.
Definition: bm.h:3978
size_type get_first() const noexcept
find first 1 bit in vector. Function may return 0 and this requires an extra check if bit 0 is actual...
Definition: bm.h:1600
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
Definition: dlist.tmpl.h:46
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
EJobStatus
Job status codes.
@ eDone
Job is ready (computed successfully)
@ eCanceled
Explicitly canceled.
@ eJobNotFound
No such job.
@ eLastStatus
Fake status (do not use)
@ ePending
Waiting for execution.
@ eFailed
Failed to run (execution timeout)
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
void set_bit(unsigned *dest, unsigned bitpos) noexcept
Set 1 bit in a block.
Definition: bmfunc.h:3721
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
NetSchedule job status tracker.
const size_t g_ValidJobStatusesSize
Definition: job_status.hpp:64
const CNetScheduleAPI::EJobStatus g_ValidJobStatuses[]
Definition: job_status.hpp:55
int i
Compressed bitset (entry point to bm.h)
NetSchedule client specs.
NetSchedule garbage collection registry.
const CNSPreciseTime kTimeZero
bm::bvector TNSBitVector
Definition: ns_types.hpp:70
static unsigned cnt[256]
Statistical information about bitset's memory allocation details.
Definition: bm.h:125
#define _ASSERT
else result
Definition: token2.c:20
Modified on Thu Apr 11 15:13:51 2024 by modify_doxy.py rev. 669887