NCBI C++ ToolKit
ns_queue.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: ns_queue.cpp 102809 2024-07-19 12:05:33Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Victor Joukov
27  *
28  * File Description:
29  * NetSchedule queue structure and parameters
30  */
31 #include <ncbi_pch.hpp>
32 
33 #include <unistd.h>
34 
35 #include "ns_queue.hpp"
37 #include "background_host.hpp"
38 #include "ns_util.hpp"
39 #include "ns_server.hpp"
40 #include "ns_precise_time.hpp"
41 #include "ns_rollback.hpp"
42 #include "queue_database.hpp"
43 #include "ns_handler.hpp"
44 #include "ns_ini_params.hpp"
45 #include "ns_perf_logging.hpp"
46 #include "ns_restore_state.hpp"
47 #include "ns_db_dump.hpp"
48 
49 #include <corelib/ncbi_system.hpp> // SleepMilliSec
50 #include <corelib/request_ctx.hpp>
54 #include <util/bitset/bmalgo.h>
55 
56 
58 
59 
60 // Used together with m_SavedId. m_SavedId is saved in a DB and is used
61 // as to start from value for the restarted neschedule.
62 // s_ReserveDelta value is used to avoid to often DB updates
63 static const unsigned int s_ReserveDelta = 10000;
64 
65 
66 CQueue::CQueue(const string & queue_name,
67  TQueueKind queue_kind,
68  CNetScheduleServer * server,
69  CQueueDataBase & qdb) :
70  m_Server(server),
71  m_QueueDB(qdb),
72  m_RunTimeLine(NULL),
73  m_QueueName(queue_name),
74  m_Kind(queue_kind),
75 
76  m_LastId(0),
77  m_SavedId(s_ReserveDelta),
78 
79  m_JobsToDeleteOps(0),
80  m_ReadJobsOps(0),
81 
82  m_Timeout(default_timeout),
83  m_RunTimeout(default_run_timeout),
84  m_ReadTimeout(default_read_timeout),
85  m_FailedRetries(default_failed_retries),
86  m_ReadFailedRetries(default_failed_retries), // See CXX-5161, the same
87  // default as for
88  // failed_retries
89  m_MaxJobsPerClient(default_max_jobs_per_client),
90  m_BlacklistTime(default_blacklist_time),
91  m_ReadBlacklistTime(default_blacklist_time), // See CXX-4993, the same
92  // default as for
93  // blacklist_time
94  m_MaxInputSize(kNetScheduleMaxDBDataSize),
95  m_MaxOutputSize(kNetScheduleMaxDBDataSize),
96  m_WNodeTimeout(default_wnode_timeout),
97  m_ReaderTimeout(default_reader_timeout),
98  m_PendingTimeout(default_pending_timeout),
99  m_KeyGenerator(server->GetHost(), server->GetPort(), queue_name),
100  m_Log(server->IsLog()),
101  m_LogBatchEachJob(server->IsLogBatchEachJob()),
102  m_RefuseSubmits(false),
103  m_StatisticsCounters(CStatisticsCounters::eQueueCounters),
104  m_StatisticsCountersLastPrinted(CStatisticsCounters::eQueueCounters),
105  m_StatisticsCountersLastPrintedTimestamp(0.0),
106  m_NotificationsList(qdb, server->GetNodeID(), queue_name),
107  m_NotifHifreqInterval(default_notif_hifreq_interval), // 0.1 sec
108  m_NotifHifreqPeriod(default_notif_hifreq_period),
109  m_NotifLofreqMult(default_notif_lofreq_mult),
110  m_DumpBufferSize(default_dump_buffer_size),
111  m_DumpClientBufferSize(default_dump_client_buffer_size),
112  m_DumpAffBufferSize(default_dump_aff_buffer_size),
113  m_DumpGroupBufferSize(default_dump_group_buffer_size),
114  m_ScrambleJobKeys(default_scramble_job_keys),
115  m_PauseStatus(eNoPause),
116  m_ClientRegistryTimeoutWorkerNode(
118  m_ClientRegistryMinWorkerNodes(default_client_registry_min_worker_nodes),
119  m_ClientRegistryTimeoutAdmin(default_client_registry_timeout_admin),
120  m_ClientRegistryMinAdmins(default_client_registry_min_admins),
121  m_ClientRegistryTimeoutSubmitter(default_client_registry_timeout_submitter),
122  m_ClientRegistryMinSubmitters(default_client_registry_min_submitters),
123  m_ClientRegistryTimeoutReader(default_client_registry_timeout_reader),
124  m_ClientRegistryMinReaders(default_client_registry_min_readers),
125  m_ClientRegistryTimeoutUnknown(default_client_registry_timeout_unknown),
126  m_ClientRegistryMinUnknowns(default_client_registry_min_unknowns),
127  m_ShouldPerfLogTransitions(false)
128 {
129  _ASSERT(!queue_name.empty());
132 
136 }
137 
138 
140 {
141  delete m_RunTimeLine;
142 }
143 
144 
145 void CQueue::Attach(void)
146 {
147  // Here we have a db, so we can read the counter value we should start from
150  if (m_SavedId < m_LastId) {
151  // Overflow
152  m_LastId = 0;
154  }
156 }
157 
158 
160 {
162 
163  m_Timeout = params.timeout;
164  m_RunTimeout = params.run_timeout;
165  if (!m_RunTimeLine) {
166  // One time only. Precision can not be reset.
168  unsigned int interval_sec = precision.Sec();
169  if (interval_sec < 1)
170  interval_sec = 1;
171  m_RunTimeLine = new CJobTimeLine(interval_sec, 0);
172  }
173 
174  m_ReadTimeout = params.read_timeout;
182  m_WNodeTimeout = params.wnode_timeout;
198 
210 
213 
214  // program version control
216  if (!params.program_name.empty()) {
218  }
222 
224 }
225 
226 
227 void CQueue::UpdatePerfLoggingSettings(const string & qclass)
228 {
230  qclass);
231 }
232 
233 
235 {
236  TParameterList parameters;
237  CQueueParamAccessor qp(*this);
238  unsigned nParams = qp.GetNumParams();
239 
240  for (unsigned n = 0; n < nParams; ++n) {
241  parameters.push_back(
242  pair<string, string>(qp.GetParamName(n), qp.GetParamValue(n)));
243  }
244  return parameters;
245 }
246 
247 
249  unsigned int & max_input_size,
250  unsigned int & max_output_size,
251  map< string, map<string, string> > & linked_sections) const
252 {
253  CQueueParamAccessor qp(*this);
254 
255  max_input_size = qp.GetMaxInputSize();
256  max_output_size = qp.GetMaxOutputSize();
257  GetLinkedSections(linked_sections);
258 }
259 
260 
261 void
263  map<string, string> > & linked_sections) const
264 {
266  k != m_LinkedSections.end(); ++k) {
268 
269  if (!values.empty())
270  linked_sections[k->first] = values;
271  }
272 }
273 
274 
275 // It is called only if there was no job for reading
277  const TNSBitVector & aff_ids,
278  bool reader_affinity,
279  bool any_affinity,
280  bool exclusive_new_affinity,
281  const TNSBitVector & group_ids,
282  bool affinity_may_change,
283  bool group_may_change)
284 {
285  // This certain condition guarantees that there will be no job given
286  if (!reader_affinity &&
287  !aff_ids.any() &&
288  !exclusive_new_affinity &&
289  !any_affinity)
290  return true;
291 
292  // Used only in the GetJobForReadingOrWait().
293  // Operation lock has to be already taken.
294  // Provides true if there are no more jobs for reading.
295  vector<CNetScheduleAPI::EJobStatus> from_state;
296  TNSBitVector pending_running_jobs;
297  TNSBitVector other_jobs;
298  string scope = client.GetScope();
299 
300  from_state.push_back(CNetScheduleAPI::ePending);
301  from_state.push_back(CNetScheduleAPI::eRunning);
302  m_StatusTracker.GetJobs(from_state, pending_running_jobs);
303 
305 
306  // Remove those which have been read or in process of reading. This cannot
307  // affect the pending and running jobs
308  other_jobs -= m_ReadJobs;
309  // Add those which are in a process of reading.
310  // This needs to be done after '- m_ReadJobs' because that vector holds
311  // both jobs which have been read and jobs which are in a process of
312  // reading. When calculating 'no_more_jobs' the only already read jobs must
313  // be excluded.
314  TNSBitVector reading_jobs;
316 
317  // Apply scope limitations to all participant jobs
318  if (scope.empty() || scope == kNoScopeOnly) {
319  // Both these cases should consider only the non-scope jobs
320  TNSBitVector all_jobs_in_scopes(bm::BM_GAP);
321  all_jobs_in_scopes = m_ScopeRegistry.GetAllJobsInScopes();
322  pending_running_jobs -= all_jobs_in_scopes;
323  other_jobs -= all_jobs_in_scopes;
324  reading_jobs -= all_jobs_in_scopes;
325  } else {
326  // Consider only the jobs in the particular scope
327  TNSBitVector scope_jobs(bm::BM_GAP);
328  scope_jobs = m_ScopeRegistry.GetJobs(scope);
329  pending_running_jobs &= scope_jobs;
330  other_jobs &= scope_jobs;
331  reading_jobs &= scope_jobs;
332  }
333 
334  if (group_ids.any()) {
335  // The pending and running jobs may change their group and or affinity
336  // later via the RESCHEDULE command
337  if (!group_may_change)
338  m_GroupRegistry.RestrictByGroup(group_ids, pending_running_jobs);
339 
340  // The job group cannot be changed for the other job states
341  other_jobs |= reading_jobs;
342  m_GroupRegistry.RestrictByGroup(group_ids, other_jobs);
343  } else
344  other_jobs |= reading_jobs;
345 
346  TNSBitVector candidates = pending_running_jobs | other_jobs;
347 
348  if (!candidates.any())
349  return true;
350 
351 
352  // Deal with affinities
353  // The weakest condition is if any affinity is suitable
354  if (any_affinity)
355  return !candidates.any();
356 
357  TNSBitVector suitable_affinities;
358  TNSBitVector all_aff;
359  TNSBitVector all_pref_affs;
360  TNSBitVector all_aff_jobs; // All jobs with an affinity
361  TNSBitVector no_aff_jobs; // Jobs without any affinity
362 
365  all_aff_jobs = m_AffinityRegistry.GetJobsWithAffinities(all_aff);
366  no_aff_jobs = candidates - all_aff_jobs;
367  if (exclusive_new_affinity && no_aff_jobs.any())
368  return false;
369 
370  if (exclusive_new_affinity)
371  suitable_affinities = all_aff - all_pref_affs;
372  if (reader_affinity)
373  suitable_affinities |= m_ClientsRegistry.
374  GetPreferredAffinities(client, eRead);
375  suitable_affinities |= aff_ids;
376 
377  TNSBitVector suitable_aff_jobs =
379  suitable_affinities);
380  if (affinity_may_change)
381  candidates = pending_running_jobs |
382  (other_jobs & suitable_aff_jobs);
383  else
384  candidates &= suitable_aff_jobs;
385  return !candidates.any();
386 }
387 
388 
389 // Used to log a single job
390 void CQueue::x_LogSubmit(const CJob & job)
391 {
393  .Print("job_key", MakeJobKey(job.GetId()));
394 
395  extra.Flush();
396 }
397 
398 
399 unsigned int CQueue::Submit(const CNSClientId & client,
400  CJob & job,
401  const string & aff_token,
402  const string & group,
403  bool logging,
404  CNSRollbackInterface * & rollback_action)
405 {
406  // the only config parameter used here is the max input size so there is no
407  // need to have a safe parameters accessor.
408 
409  if (job.GetInput().size() > m_MaxInputSize)
410  NCBI_THROW(CNetScheduleException, eDataTooLong, "Input is too long");
411 
412  unsigned int aff_id = 0;
413  unsigned int group_id = 0;
414  CNSPreciseTime op_begin_time = CNSPreciseTime::Current();
415  unsigned int job_id = GetNextId();
416  CJobEvent & event = job.AppendEvent();
417 
418  job.SetId(job_id);
419  job.SetPassport(rand());
420  job.SetLastTouch(op_begin_time);
421 
422  event.SetNodeAddr(client.GetAddress());
423  event.SetStatus(CNetScheduleAPI::ePending);
424  event.SetEvent(CJobEvent::eSubmit);
425  event.SetTimestamp(op_begin_time);
426  event.SetClientNode(client.GetNode());
427  event.SetClientSession(client.GetSession());
428 
429  // Special treatment for system job masks
431  {
432  // NOT IMPLEMENTED YET: put job id into OutOfOrder list.
433  // The idea is that there can be an urgent job, which
434  // should be executed before jobs which were submitted
435  // earlier, e.g. for some administrative purposes. See
436  // CNetScheduleAPI::EJobMask in file netschedule_api.hpp
437  }
438 
439  // Take the queue lock and start the operation
440  {{
441  string scope = client.GetScope();
443 
444 
445  if (!scope.empty()) {
446  // Check the scope registry limits
447  SNSRegistryParameters params =
449  if (!m_ScopeRegistry.CanAccept(scope, params.max_records))
450  NCBI_THROW(CNetScheduleException, eDataTooLong,
451  "No available slots in the queue scope registry");
452  }
453  if (!group.empty()) {
454  // Check the group registry limits
455  SNSRegistryParameters params =
457  if (!m_GroupRegistry.CanAccept(group, params.max_records))
458  NCBI_THROW(CNetScheduleException, eDataTooLong,
459  "No available slots in the queue group registry");
460  }
461  if (!aff_token.empty()) {
462  // Check the affinity registry limits
463  SNSRegistryParameters params =
465  if (!m_AffinityRegistry.CanAccept(aff_token, params.max_records))
466  NCBI_THROW(CNetScheduleException, eDataTooLong,
467  "No available slots in the queue affinity registry");
468  }
469 
470 
471  if (!group.empty()) {
472  group_id = m_GroupRegistry.AddJob(group, job_id);
473  job.SetGroupId(group_id);
474  }
475  if (!aff_token.empty()) {
476  aff_id = m_AffinityRegistry.ResolveAffinityToken(aff_token,
477  job_id, 0, eUndefined);
478  job.SetAffinityId(aff_id);
479  }
480 
481  m_Jobs[job_id] = job;
482 
484 
485  if (!scope.empty())
486  m_ScopeRegistry.AddJob(scope, job_id);
487 
488  // Register the job with the client
490 
491  // Make the decision whether to send or not a notification
492  if (m_PauseStatus == eNoPause)
497 
498  m_GCRegistry.RegisterJob(job_id, op_begin_time,
499  aff_id, group_id,
501  m_RunTimeout,
504  op_begin_time));
505  }}
506 
507  rollback_action = new CNSSubmitRollback(client, job_id,
508  op_begin_time,
510 
512  if (logging)
513  x_LogSubmit(job);
514 
515  return job_id;
516 }
517 
518 
519 unsigned int
521  vector< pair<CJob, string> > & batch,
522  const string & group,
523  bool logging,
524  CNSRollbackInterface * & rollback_action)
525 {
526  unsigned int batch_size = batch.size();
527  unsigned int job_id = GetNextJobIdForBatch(batch_size);
528  TNSBitVector affinities;
530 
531  {{
532  unsigned int job_id_cnt = job_id;
533  unsigned int group_id = 0;
534  vector<string> aff_tokens;
535  string scope = client.GetScope();
536 
537  // Count the number of affinities
538  for (size_t k = 0; k < batch_size; ++k) {
539  const string & aff_token = batch[k].second;
540  if (!aff_token.empty())
541  aff_tokens.push_back(aff_token);
542  }
543 
544 
546 
547  if (!scope.empty()) {
548  // Check the scope registry limits
549  SNSRegistryParameters params =
551  if (!m_ScopeRegistry.CanAccept(scope, params.max_records))
552  NCBI_THROW(CNetScheduleException, eDataTooLong,
553  "No available slots in the queue scope registry");
554  }
555  if (!group.empty()) {
556  // Check the group registry limits
557  SNSRegistryParameters params =
559  if (!m_GroupRegistry.CanAccept(group, params.max_records))
560  NCBI_THROW(CNetScheduleException, eDataTooLong,
561  "No available slots in the queue group registry");
562  }
563  if (!aff_tokens.empty()) {
564  // Check the affinity registry limits
565  SNSRegistryParameters params =
567  if (!m_AffinityRegistry.CanAccept(aff_tokens, params.max_records))
568  NCBI_THROW(CNetScheduleException, eDataTooLong,
569  "No available slots in the queue affinity registry");
570  }
571 
572  group_id = m_GroupRegistry.ResolveGroup(group);
573  for (size_t k = 0; k < batch_size; ++k) {
574 
575  CJob & job = batch[k].first;
576  const string & aff_token = batch[k].second;
577  CJobEvent & event = job.AppendEvent();
578 
579  job.SetId(job_id_cnt);
580  job.SetPassport(rand());
581  job.SetGroupId(group_id);
582  job.SetLastTouch(curr_time);
583 
584  event.SetNodeAddr(client.GetAddress());
585  event.SetStatus(CNetScheduleAPI::ePending);
586  event.SetEvent(CJobEvent::eBatchSubmit);
587  event.SetTimestamp(curr_time);
588  event.SetClientNode(client.GetNode());
589  event.SetClientSession(client.GetSession());
590 
591  if (!aff_token.empty()) {
592  unsigned int aff_id = m_AffinityRegistry.
593  ResolveAffinityToken(aff_token,
594  job_id_cnt,
595  0,
596  eUndefined);
597 
598  job.SetAffinityId(aff_id);
599  affinities.set_bit(aff_id);
600  }
601 
602  m_Jobs[job_id_cnt] = job;
603  ++job_id_cnt;
604  }
605 
606  m_GroupRegistry.AddJobs(group_id, job_id, batch_size);
607  m_StatusTracker.AddPendingBatch(job_id, job_id + batch_size - 1);
609 
610  if (!scope.empty())
611  m_ScopeRegistry.AddJobs(scope, job_id, batch_size);
612 
613  // Make a decision whether to notify clients or not
614  TNSBitVector jobs;
615  jobs.set_range(job_id, job_id + batch_size - 1);
616 
617  if (m_PauseStatus == eNoPause)
618  m_NotificationsList.Notify(jobs, affinities,
619  batch_size != aff_tokens.size(),
626  eGet);
627 
628  for (size_t k = 0; k < batch_size; ++k) {
630  batch[k].first.GetId(), curr_time,
631  batch[k].first.GetAffinityId(), group_id,
632  batch[k].first.GetExpirationTime(m_Timeout,
633  m_RunTimeout,
636  curr_time));
637  }
638  }}
639 
640  m_StatisticsCounters.CountSubmit(batch_size);
641  if (m_LogBatchEachJob && logging)
642  for (size_t k = 0; k < batch_size; ++k)
643  x_LogSubmit(batch[k].first);
644 
645  rollback_action = new CNSBatchSubmitRollback(client, job_id, batch_size);
646  return job_id;
647 }
648 
649 
651  const CNSPreciseTime & curr,
652  unsigned int job_id,
653  const string & job_key,
654  CJob & job,
655  const string & auth_token,
656  int ret_code,
657  const string & output)
658 {
659  // The only one parameter (max output size) is required for the put
660  // operation so there is no need to use CQueueParamAccessor
661 
662  if (output.size() > m_MaxOutputSize)
663  NCBI_THROW(CNetScheduleException, eDataTooLong,
664  "Output is too long");
665 
667  TJobStatus old_status = GetJobStatus(job_id);
668 
669  if (old_status == CNetScheduleAPI::eDone) {
672  return old_status;
673  }
674 
675  if (old_status != CNetScheduleAPI::ePending &&
676  old_status != CNetScheduleAPI::eRunning &&
677  old_status != CNetScheduleAPI::eFailed)
678  return old_status;
679 
680  x_UpdateDB_PutResultNoLock(job_id, auth_token, curr, ret_code, output,
681  job, client);
682 
686  g_DoPerfLogging(*this, job, 200);
688 
691  m_RunTimeout,
694  curr));
695 
696  TimeLineRemove(job_id);
697 
698  x_NotifyJobChanges(job, job_key, eStatusChanged, curr);
699 
700  // Notify the readers if the job has not been given for reading yet
701  if (!m_ReadJobs.get_bit(job_id)) {
702  m_GCRegistry.UpdateReadVacantTime(job_id, curr);
710  eRead);
711  }
712  return old_status;
713 }
714 
715 
716 bool
718  unsigned short port, // Port the client
719  // will wait on
720  unsigned int timeout, // If timeout != 0 =>
721  // WGET
722  const list<string> * aff_list,
723  bool wnode_affinity,
724  bool any_affinity,
725  bool exclusive_new_affinity,
726  bool prioritized_aff,
727  bool new_format,
728  const list<string> * group_list,
729  CJob * new_job,
730  CNSRollbackInterface * & rollback_action,
731  string & added_pref_aff)
732 {
733  // We need exactly 1 parameter - m_RunTimeout, so we can access it without
734  // CQueueParamAccessor
735 
738 
739  // This is a worker node command, so mark the node type as a worker
740  // node
742 
743  vector<unsigned int> aff_ids;
744  TNSBitVector aff_ids_vector;
745  TNSBitVector group_ids_vector;
746  bool has_groups = false;
747 
748  {{
749 
750  if (wnode_affinity) {
751  // Check that the preferred affinities were not reset
753  return false;
754 
755  // Check that the client was garbage collected with preferred affs
757  return false;
758  }
759 
760  // Resolve affinities and groups. It is supposed that the client knows
761  // better what affinities and groups to expect i.e. even if they do not
762  // exist yet, they may appear soon.
763  if (group_list != NULL) {
764  m_GroupRegistry.ResolveGroups(*group_list, group_ids_vector);
765  has_groups = !group_list->empty();
766  }
767  if (aff_list != NULL)
768  m_AffinityRegistry.ResolveAffinities(*aff_list, aff_ids_vector,
769  aff_ids);
770 
772  }}
773 
774  for (;;) {
775  // Old comment:
776  // No lock here to make it possible to pick a job
777  // simultaneously from many threads
778  // Current state:
779  // The lock is taken at the beginning. Now there is not much of a
780  // concurrency so a bit of performance is not needed anymore.
781  // The rest is left untouched to simplify the changes
782  x_SJobPick job_pick = x_FindVacantJob(client,
783  aff_ids_vector, aff_ids,
784  wnode_affinity,
785  any_affinity,
786  exclusive_new_affinity,
787  prioritized_aff,
788  group_ids_vector, has_groups,
789  eGet);
790  {{
791  bool outdated_job = false;
792 
793  if (job_pick.job_id == 0) {
794  if (exclusive_new_affinity)
795  // Second try only if exclusive new aff is set on
796  job_pick = x_FindOutdatedPendingJob(client, 0,
797  group_ids_vector);
798 
799  if (job_pick.job_id == 0) {
800  if (timeout != 0 && port > 0)
801  // WGET: // There is no job, so the client might need to
802  // be registered in the waiting list
803  x_RegisterGetListener(client, port, timeout,
804  aff_ids_vector,
805  wnode_affinity, any_affinity,
806  exclusive_new_affinity,
807  new_format, group_ids_vector);
808  return true;
809  }
810  outdated_job = true;
811  } else {
812  // Check that the job is still Pending; it could be
813  // grabbed by another WN or GC
815  continue; // Try to pick a job again
816 
817  if (exclusive_new_affinity) {
819  job_pick.job_id, eGet,
820  m_MaxPendingWaitTimeout) == false) {
821  x_SJobPick outdated_pick =
823  client, job_pick.job_id,
824  group_ids_vector);
825  if (outdated_pick.job_id != 0) {
826  job_pick = outdated_pick;
827  outdated_job = true;
828  }
829  }
830  }
831  }
832 
833  // The job is still pending, check if it was received as
834  // with exclusive affinity
835  if (job_pick.exclusive && job_pick.aff_id != 0 &&
836  outdated_job == false) {
838  job_pick.aff_id, eGet))
839  continue; // Other WN grabbed this affinity already
840 
841  string aff_token = m_AffinityRegistry.GetTokenByID(
842  job_pick.aff_id);
843  // CXX-8843: The '-' affinity must not be added to the list of
844  // preferred affinities
845  if (aff_token != k_NoAffinityToken) {
846  bool added = m_ClientsRegistry.
847  UpdatePreferredAffinities(
848  client, job_pick.aff_id, 0, eGet);
849  if (added)
850  added_pref_aff = aff_token;
851  }
852  }
853  if (outdated_job && job_pick.aff_id != 0) {
854  string aff_token = m_AffinityRegistry.GetTokenByID(
855  job_pick.aff_id);
856  // CXX-8843: The '-' affinity must not be added to the list of
857  // preferred affinities
858  if (aff_token != k_NoAffinityToken) {
859  bool added = m_ClientsRegistry.
860  UpdatePreferredAffinities(
861  client, job_pick.aff_id, 0, eGet);
862  if (added)
863  added_pref_aff = aff_token;
864  }
865  }
866 
868  *new_job);
871 
875  g_DoPerfLogging(*this, *new_job, 200);
876  if (outdated_job)
878 
880  job_pick.job_id,
881  new_job->GetExpirationTime(m_Timeout,
882  m_RunTimeout,
885  curr));
886  TimeLineAdd(job_pick.job_id, curr + m_RunTimeout);
888 
889  x_NotifyJobChanges(*new_job, MakeJobKey(job_pick.job_id),
890  eStatusChanged, curr);
891 
892  // If there are no more pending jobs, let's clear the
893  // list of delayed exact notifications.
896 
897  rollback_action = new CNSGetJobRollback(client, job_pick.job_id);
898  return true;
899  }}
900  }
901  return true;
902 }
903 
904 
906 {
907  bool result;
908 
909  {{
911 
913  }}
914 
915  if (result == false)
916  ERR_POST(Warning << "Attempt to cancel WGET for the client "
917  "which does not wait anything (node: "
918  << client.GetNode() << " session: "
919  << client.GetSession() << ")");
920 }
921 
922 
924 {
925  bool result;
926 
927  {{
929 
931  }}
932 
933  if (result == false)
934  ERR_POST(Warning << "Attempt to cancel waiting READ for the client "
935  "which does not wait anything (node: "
936  << client.GetNode() << " session: "
937  << client.GetSession() << ")");
938 }
939 
940 
941 list<string>
943  const list<string> & aff_to_add,
944  const list<string> & aff_to_del,
945  ECommandGroup cmd_group)
946 {
947  // It is guaranteed here that the client is a new style one.
948  // I.e. it has both client_node and client_session.
949  if (cmd_group == eGet)
951  else
953 
954 
955  list<string> msgs; // Warning messages for the socket
956  unsigned int client_id = client.GetID();
957  TNSBitVector current_affinities =
959  cmd_group);
960  TNSBitVector aff_id_to_add;
961  TNSBitVector aff_id_to_del;
962  bool any_to_add = false;
963  bool any_to_del = false;
964 
965  // Identify the affinities which should be deleted
966  for (list<string>::const_iterator k(aff_to_del.begin());
967  k != aff_to_del.end(); ++k) {
968  unsigned int aff_id = m_AffinityRegistry.GetIDByToken(*k);
969 
970  if (aff_id == 0) {
971  // The affinity is not known for NS at all
972  ERR_POST(Warning << "Client '" << client.GetNode()
973  << "' deletes unknown affinity '"
974  << *k << "'. Ignored.");
975  msgs.push_back("eAffinityNotFound:"
976  "unknown affinity to delete: " + *k);
977  continue;
978  }
979 
980  if (!current_affinities.get_bit(aff_id)) {
981  // This a try to delete something which has not been added or
982  // deleted before.
983  ERR_POST(Warning << "Client '" << client.GetNode()
984  << "' deletes affinity '" << *k
985  << "' which is not in the list of the "
986  "preferred client affinities. Ignored.");
987  msgs.push_back("eAffinityNotPreferred:not registered affinity "
988  "to delete: " + *k);
989  continue;
990  }
991 
992  // The affinity will really be deleted
993  aff_id_to_del.set_bit(aff_id);
994  any_to_del = true;
995  }
996 
997 
998  // Check that the update of the affinities list will not exceed the limit
999  // for the max number of affinities per client.
1000  // Note: this is not precise check. There could be non-unique affinities in
1001  // the add list or some of affinities to add could already be in the list.
1002  // The precise checking however requires more CPU and blocking so only an
1003  // approximate (but fast) checking is done.
1004  SNSRegistryParameters aff_reg_settings =
1006  if (current_affinities.count() + aff_to_add.size()
1007  - aff_id_to_del.count() >
1008  aff_reg_settings.max_records) {
1009  NCBI_THROW(CNetScheduleException, eTooManyPreferredAffinities,
1010  "The client '" + client.GetNode() +
1011  "' exceeds the limit (" +
1012  to_string(aff_reg_settings.max_records) +
1013  ") of the preferred affinities. Changed request ignored.");
1014  }
1015 
1016  // To avoid logging under the lock
1017  vector<string> already_added_affinities;
1018 
1019  {{
1021 
1022  // Convert the aff_to_add to the affinity IDs
1023  for (list<string>::const_iterator k(aff_to_add.begin());
1024  k != aff_to_add.end(); ++k ) {
1025  unsigned int aff_id =
1027  0, client_id,
1028  cmd_group);
1029 
1030  if (current_affinities.get_bit(aff_id)) {
1031  already_added_affinities.push_back(*k);
1032  continue;
1033  }
1034 
1035  aff_id_to_add.set_bit(aff_id);
1036  any_to_add = true;
1037  }
1038  }}
1039 
1040  // Log the warnings and add it to the warning message
1041  for (vector<string>::const_iterator j(already_added_affinities.begin());
1042  j != already_added_affinities.end(); ++j) {
1043  // That was a try to add something which has already been added
1044  ERR_POST(Warning << "Client '" << client.GetNode()
1045  << "' adds affinity '" << *j
1046  << "' which is already in the list of the "
1047  "preferred client affinities. Ignored.");
1048  msgs.push_back("eAffinityAlreadyPreferred:already registered "
1049  "affinity to add: " + *j);
1050  }
1051 
1052  if (any_to_add || any_to_del)
1054  aff_id_to_add,
1055  aff_id_to_del,
1056  cmd_group);
1057 
1058  if (m_ClientsRegistry.WasGarbageCollected(client, cmd_group)) {
1059  ERR_POST(Warning << "Client '" << client.GetNode()
1060  << "' has been garbage collected and tries to "
1061  "update its preferred affinities.");
1062  msgs.push_back("eClientGarbageCollected:the client had been "
1063  "garbage collected");
1064  }
1065  return msgs;
1066 }
1067 
1068 
1070  const list<string> & aff,
1071  ECommandGroup cmd_group)
1072 {
1073  if (cmd_group == eGet)
1075  else
1077 
1078  SNSRegistryParameters aff_reg_settings =
1080 
1081  if (aff.size() > aff_reg_settings.max_records) {
1082  NCBI_THROW(CNetScheduleException, eTooManyPreferredAffinities,
1083  "The client '" + client.GetNode() +
1084  "' exceeds the limit (" +
1085  to_string(aff_reg_settings.max_records) +
1086  ") of the preferred affinities. Set request ignored.");
1087  }
1088 
1089  unsigned int client_id = client.GetID();
1090  TNSBitVector aff_id_to_set;
1091  TNSBitVector already_added_aff_id;
1092 
1093 
1094  TNSBitVector current_affinities =
1096  cmd_group);
1097  {{
1099 
1100  // Convert the aff to the affinity IDs
1101  for (list<string>::const_iterator k(aff.begin());
1102  k != aff.end(); ++k ) {
1103  unsigned int aff_id =
1105  0, client_id,
1106  cmd_group);
1107 
1108  if (current_affinities.get_bit(aff_id))
1109  already_added_aff_id.set_bit(aff_id);
1110 
1111  aff_id_to_set.set_bit(aff_id);
1112  }
1113  }}
1114 
1115  m_ClientsRegistry.SetPreferredAffinities(client, aff_id_to_set, cmd_group);
1116 }
1117 
1118 
1120  const string & data, int data_version)
1121 {
1122  return m_ClientsRegistry.SetClientData(client, data, data_version);
1123 }
1124 
1125 
1127  CJob & job,
1128  const CNSPreciseTime & tm)
1129 {
1130  CNSPreciseTime queue_run_timeout = GetRunTimeout();
1132 
1134  TJobStatus status = GetJobStatus(job_id);
1135 
1136  if (status != CNetScheduleAPI::eRunning)
1137  return status;
1138 
1139  CNSPreciseTime time_start = kTimeZero;
1140  CNSPreciseTime run_timeout = kTimeZero;
1141 
1142  auto job_iter = m_Jobs.find(job_id);
1143  if (job_iter == m_Jobs.end())
1145 
1146  time_start = job_iter->second.GetLastEvent()->GetTimestamp();
1147  run_timeout = job_iter->second.GetRunTimeout();
1148  if (run_timeout == kTimeZero)
1149  run_timeout = queue_run_timeout;
1150 
1151  if (time_start + run_timeout > curr + tm) {
1152  job = job_iter->second;
1153  return CNetScheduleAPI::eRunning; // Old timeout is enough to cover
1154  // this request, so keep it.
1155  }
1156 
1157  job_iter->second.SetRunTimeout(curr + tm - time_start);
1158  job_iter->second.SetLastTouch(curr);
1159 
1160  // No need to update the GC registry because the running (and reading)
1161  // jobs are skipped by GC
1162  CNSPreciseTime exp_time = kTimeZero;
1163  if (run_timeout != kTimeZero)
1164  exp_time = time_start + run_timeout;
1165 
1166  TimeLineMove(job_id, exp_time, curr + tm);
1167 
1168  job = job_iter->second;
1170 }
1171 
1172 
1174  CJob & job,
1175  const CNSPreciseTime & tm)
1176 {
1177  CNSPreciseTime queue_read_timeout = GetReadTimeout();
1179 
1181  TJobStatus status = GetJobStatus(job_id);
1182 
1183  if (status != CNetScheduleAPI::eReading)
1184  return status;
1185 
1186  CNSPreciseTime time_start = kTimeZero;
1187  CNSPreciseTime read_timeout = kTimeZero;
1188 
1189  auto job_iter = m_Jobs.find(job_id);
1190  if (job_iter == m_Jobs.end())
1192 
1193  time_start = job_iter->second.GetLastEvent()->GetTimestamp();
1194  read_timeout = job_iter->second.GetReadTimeout();
1195  if (read_timeout == kTimeZero)
1196  read_timeout = queue_read_timeout;
1197 
1198  if (time_start + read_timeout > curr + tm) {
1199  job = job_iter->second;
1200  return CNetScheduleAPI::eReading; // Old timeout is enough to
1201  // cover this request, so
1202  // keep it.
1203  }
1204 
1205  job_iter->second.SetReadTimeout(curr + tm - time_start);
1206  job_iter->second.SetLastTouch(curr);
1207 
1208  // No need to update the GC registry because the running (and reading)
1209  // jobs are skipped by GC
1210  CNSPreciseTime exp_time = kTimeZero;
1211  if (read_timeout != kTimeZero)
1212  exp_time = time_start + read_timeout;
1213 
1214  TimeLineMove(job_id, exp_time, curr + tm);
1215 
1216  job = job_iter->second;
1218 }
1219 
1220 
1221 
1222 // This member is used for WST/WST2 which do not need to touch the job
1224  string & client_ip,
1225  string & client_sid,
1226  string & client_phid,
1227  string & progress_msg,
1228  CNSPreciseTime * lifetime)
1229 {
1231  TJobStatus status = GetJobStatus(job_id);
1232 
1233  if (status == CNetScheduleAPI::eJobNotFound)
1234  return status;
1235 
1236  auto job_iter = m_Jobs.find(job_id);
1237 
1238  if (job_iter == m_Jobs.end())
1239  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
1240 
1241  client_ip = job_iter->second.GetClientIP();
1242  client_sid = job_iter->second.GetClientSID();
1243  client_phid = job_iter->second.GetNCBIPHID();
1244  progress_msg = job_iter->second.GetProgressMsg();
1245 
1246  *lifetime = x_GetEstimatedJobLifetime(job_id, status);
1247  return status;
1248 }
1249 
1250 
1251 // This member is used for the SST/SST2 commands which also touch the job
1253  CJob & job,
1254  CNSPreciseTime * lifetime)
1255 {
1257  TJobStatus status = GetJobStatus(job_id);
1258 
1259  if (status == CNetScheduleAPI::eJobNotFound)
1260  return status;
1261 
1263  auto job_iter = m_Jobs.find(job_id);
1264 
1265  if (job_iter == m_Jobs.end())
1266  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
1267 
1268  job_iter->second.SetLastTouch(curr);
1269 
1271  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1272  m_ReadTimeout,
1273  m_PendingTimeout, curr));
1274 
1275  *lifetime = x_GetEstimatedJobLifetime(job_id, status);
1276  job = job_iter->second;
1277  return status;
1278 }
1279 
1280 
1282  CJob & job,
1283  unsigned int address,
1284  unsigned short port,
1285  const CNSPreciseTime & timeout,
1286  bool need_stolen,
1287  bool need_progress_msg,
1288  size_t * last_event_index)
1289 {
1293 
1294  auto job_iter = m_Jobs.find(job_id);
1295 
1296  if (job_iter == m_Jobs.end())
1297  return status;
1298 
1299  *last_event_index = job_iter->second.GetLastEventIndex();
1300  status = job_iter->second.GetStatus();
1301 
1302  unsigned int old_listener_addr = job_iter->second.GetListenerNotifAddr();
1303  unsigned short old_listener_port = job_iter->second.GetListenerNotifPort();
1304 
1305  if (job_iter->second.GetNeedStolenNotif() &&
1306  old_listener_addr != 0 && old_listener_port != 0) {
1307  if (old_listener_addr != address || old_listener_port != port) {
1308  // Send the stolen notification only if it is
1309  // really a new listener
1310  x_NotifyJobChanges(job_iter->second, MakeJobKey(job_id),
1311  eNotificationStolen, curr);
1312  }
1313  }
1314 
1315  if (address == 0 || port == 0 || timeout == kTimeZero) {
1316  // If at least one of the values is 0 => no notifications
1317  // So to make the job properly dumped put zeros everywhere.
1318  job_iter->second.SetListenerNotifAddr(0);
1319  job_iter->second.SetListenerNotifPort(0);
1320  job_iter->second.SetListenerNotifAbsTime(kTimeZero);
1321  } else {
1322  job_iter->second.SetListenerNotifAddr(address);
1323  job_iter->second.SetListenerNotifPort(port);
1324  job_iter->second.SetListenerNotifAbsTime(curr + timeout);
1325  }
1326 
1327  job_iter->second.SetNeedLsnrProgressMsgNotif(need_progress_msg);
1328  job_iter->second.SetNeedStolenNotif(need_stolen);
1329  job_iter->second.SetLastTouch(curr);
1330 
1331  job = job_iter->second;
1332  return status;
1333 }
1334 
1335 
1336 bool CQueue::PutProgressMessage(unsigned int job_id,
1337  CJob & job,
1338  const string & msg)
1339 {
1342 
1343  auto job_iter = m_Jobs.find(job_id);
1344  if (job_iter == m_Jobs.end())
1345  return false;
1346 
1347  job_iter->second.SetProgressMsg(msg);
1348  job_iter->second.SetLastTouch(curr);
1349 
1351  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1352  m_ReadTimeout,
1353  m_PendingTimeout, curr));
1354  x_NotifyJobChanges(job_iter->second, MakeJobKey(job_id),
1355  eProgressMessageChanged, curr);
1356 
1357  job = job_iter->second;
1358  return true;
1359 }
1360 
1361 
1363  unsigned int job_id,
1364  const string & job_key,
1365  CJob & job,
1366  const string & auth_token,
1367  string & warning,
1368  TJobReturnOption how)
1369 {
1371  CNSPreciseTime current_time = CNSPreciseTime::Current();
1372  TJobStatus old_status = GetJobStatus(job_id);
1373 
1374  if (old_status != CNetScheduleAPI::eRunning)
1375  return old_status;
1376 
1377  auto job_iter = m_Jobs.find(job_id);
1378  if (job_iter == m_Jobs.end())
1379  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
1380 
1381  if (!auth_token.empty()) {
1382  // Need to check authorization token first
1383  CJob::EAuthTokenCompareResult token_compare_result =
1384  job_iter->second.CompareAuthToken(auth_token);
1385  if (token_compare_result == CJob::eInvalidTokenFormat)
1386  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
1387  "Invalid authorization token format");
1388  if (token_compare_result == CJob::eNoMatch)
1389  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
1390  "Authorization token does not match");
1391  if (token_compare_result == CJob::ePassportOnlyMatch) {
1392  // That means the job has been given to another worker node
1393  // by whatever reason (expired/failed/returned before)
1394  ERR_POST(Warning << "Received RETURN2 with only "
1395  "passport matched.");
1396  warning = "eJobPassportOnlyMatch:Only job passport matched. "
1397  "Command is ignored.";
1398  job = job_iter->second;
1399  return old_status;
1400  }
1401  // Here: the authorization token is OK, we can continue
1402  }
1403 
1404  unsigned int run_count = job_iter->second.GetRunCount();
1405  CJobEvent * event = job_iter->second.GetLastEvent();
1406 
1407  if (!event)
1408  ERR_POST("No JobEvent for running job");
1409 
1410  event = &job_iter->second.AppendEvent();
1411  event->SetNodeAddr(client.GetAddress());
1412  event->SetStatus(CNetScheduleAPI::ePending);
1413  switch (how) {
1414  case eWithBlacklist:
1415  event->SetEvent(CJobEvent::eReturn);
1416  break;
1417  case eWithoutBlacklist:
1418  event->SetEvent(CJobEvent::eReturnNoBlacklist);
1419  break;
1420  case eRollback:
1421  event->SetEvent(CJobEvent::eNSGetRollback);
1422  break;
1423  }
1424  event->SetTimestamp(current_time);
1425  event->SetClientNode(client.GetNode());
1426  event->SetClientSession(client.GetSession());
1427 
1428  if (run_count)
1429  job_iter->second.SetRunCount(run_count - 1);
1430 
1431  job_iter->second.SetStatus(CNetScheduleAPI::ePending);
1432  job_iter->second.SetLastTouch(current_time);
1433 
1435  switch (how) {
1436  case eWithBlacklist:
1439  break;
1440  case eWithoutBlacklist:
1442  break;
1443  case eRollback:
1445  break;
1446  }
1447  g_DoPerfLogging(*this, job_iter->second, 200);
1448  TimeLineRemove(job_id);
1450  if (how == eWithBlacklist)
1453  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1455  current_time));
1456 
1457  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, current_time);
1458 
1459  if (m_PauseStatus == eNoPause)
1461  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
1464 
1465  job = job_iter->second;
1466  return old_status;
1467 }
1468 
1469 
1471  unsigned int job_id,
1472  const string & job_key,
1473  const string & auth_token,
1474  const string & aff_token,
1475  const string & group,
1476  bool & auth_token_ok,
1477  CJob & job)
1478 {
1479  CNSPreciseTime current_time = CNSPreciseTime::Current();
1481  TJobStatus old_status = GetJobStatus(job_id);
1482  unsigned int affinity_id = 0;
1483  unsigned int group_id = 0;
1484  unsigned int job_affinity_id;
1485  unsigned int job_group_id;
1486 
1487  if (old_status != CNetScheduleAPI::eRunning)
1488  return old_status;
1489 
1490  // Resolve affinity and group in a separate transaction
1491  if (!aff_token.empty() || !group.empty()) {
1492  if (!aff_token.empty())
1493  affinity_id = m_AffinityRegistry.ResolveAffinity(aff_token);
1494  if (!group.empty())
1495  group_id = m_GroupRegistry.ResolveGroup(group);
1496  }
1497 
1498  auto job_iter = m_Jobs.find(job_id);
1499  if (job_iter == m_Jobs.end())
1500  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
1501 
1502  // Need to check authorization token first
1503  CJob::EAuthTokenCompareResult token_compare_result =
1504  job_iter->second.CompareAuthToken(auth_token);
1505 
1506  if (token_compare_result == CJob::eInvalidTokenFormat)
1507  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
1508  "Invalid authorization token format");
1509 
1510  if (token_compare_result != CJob::eCompleteMatch) {
1511  auth_token_ok = false;
1512  job = job_iter->second;
1513  return old_status;
1514  }
1515 
1516  // Here: the authorization token is OK, we can continue
1517  auth_token_ok = true;
1518 
1519  // Memorize the job group and affinity for the proper updates after
1520  // the transaction is finished
1521  job_affinity_id = job_iter->second.GetAffinityId();
1522  job_group_id = job_iter->second.GetGroupId();
1523 
1524  // Update the job affinity and group
1525  job_iter->second.SetAffinityId(affinity_id);
1526  job_iter->second.SetGroupId(group_id);
1527 
1528  unsigned int run_count = job_iter->second.GetRunCount();
1529  CJobEvent * event = job_iter->second.GetLastEvent();
1530 
1531  if (!event)
1532  ERR_POST("No JobEvent for running job");
1533 
1534  event = &job_iter->second.AppendEvent();
1535  event->SetNodeAddr(client.GetAddress());
1536  event->SetStatus(CNetScheduleAPI::ePending);
1537  event->SetEvent(CJobEvent::eReschedule);
1538  event->SetTimestamp(current_time);
1539  event->SetClientNode(client.GetNode());
1540  event->SetClientSession(client.GetSession());
1541 
1542  if (run_count)
1543  job_iter->second.SetRunCount(run_count - 1);
1544 
1545  job_iter->second.SetStatus(CNetScheduleAPI::ePending);
1546  job_iter->second.SetLastTouch(current_time);
1547 
1548  // Job has been updated in the DB. Update the affinity and group
1549  // registries as needed.
1550  if (job_affinity_id != affinity_id) {
1551  if (job_affinity_id != 0)
1552  m_AffinityRegistry.RemoveJobFromAffinity(job_id, job_affinity_id);
1553  if (affinity_id != 0)
1554  m_AffinityRegistry.AddJobToAffinity(job_id, affinity_id);
1555  }
1556  if (job_group_id != group_id) {
1557  if (job_group_id != 0)
1558  m_GroupRegistry.RemoveJob(job_group_id, job_id);
1559  if (group_id != 0)
1560  m_GroupRegistry.AddJob(group_id, job_id);
1561  }
1562  if (job_affinity_id != affinity_id || job_group_id != group_id)
1563  m_GCRegistry.ChangeAffinityAndGroup(job_id, affinity_id, group_id);
1564 
1567  g_DoPerfLogging(*this, job_iter->second, 200);
1568 
1569  TimeLineRemove(job_id);
1572  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1573  m_ReadTimeout,
1575  current_time));
1576 
1577  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, current_time);
1578 
1579  if (m_PauseStatus == eNoPause)
1581  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
1584 
1585  job = job_iter->second;
1586  return old_status;
1587 }
1588 
1589 
1591  unsigned int job_id,
1592  const string & job_key,
1593  CJob & job)
1594 {
1595  CNSPreciseTime current_time = CNSPreciseTime::Current();
1597  TJobStatus old_status = GetJobStatus(job_id);
1598 
1599  if (old_status == CNetScheduleAPI::eJobNotFound ||
1600  old_status == CNetScheduleAPI::ePending ||
1601  old_status == CNetScheduleAPI::eRunning ||
1602  old_status == CNetScheduleAPI::eReading)
1603  return old_status;
1604 
1605  auto job_iter = m_Jobs.find(job_id);
1606 
1607  if (job_iter == m_Jobs.end())
1608  NCBI_THROW(CNetScheduleException, eInternalError,
1609  "Error fetching job");
1610 
1611  CJobEvent * event = job_iter->second.GetLastEvent();
1612  if (!event)
1613  ERR_POST("Inconsistency: a job has no events");
1614 
1615  event = &job_iter->second.AppendEvent();
1616  event->SetNodeAddr(client.GetAddress());
1617  event->SetStatus(CNetScheduleAPI::ePending);
1618  event->SetEvent(CJobEvent::eRedo);
1619  event->SetTimestamp(current_time);
1620  event->SetClientNode(client.GetNode());
1621  event->SetClientSession(client.GetSession());
1622 
1623  job_iter->second.SetStatus(CNetScheduleAPI::ePending);
1624  job_iter->second.SetLastTouch(current_time);
1625 
1627  m_StatisticsCounters.CountRedo(old_status);
1628  g_DoPerfLogging(*this, job_iter->second, 200);
1629 
1631  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1632  m_ReadTimeout,
1634  current_time));
1635 
1636  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, current_time);
1637 
1638  if (m_PauseStatus == eNoPause)
1639  m_NotificationsList.Notify(job_id,
1640  job_iter->second.GetAffinityId(),
1645  job = job_iter->second;
1646  return old_status;
1647 }
1648 
1649 
1651  CJob & job,
1652  CNSPreciseTime * lifetime)
1653 {
1655  TJobStatus status = GetJobStatus(job_id);
1656 
1657  if (status == CNetScheduleAPI::eJobNotFound)
1658  return status;
1659 
1661  auto job_iter = m_Jobs.find(job_id);
1662 
1663  if (job_iter == m_Jobs.end())
1664  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
1665 
1666  job_iter->second.SetLastTouch(curr);
1667 
1669  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1670  m_ReadTimeout,
1671  m_PendingTimeout, curr));
1672  *lifetime = x_GetEstimatedJobLifetime(job_id, status);
1673  job = job_iter->second;
1674  return status;
1675 }
1676 
1677 
1679  unsigned int job_id,
1680  const string & job_key,
1681  CJob & job,
1682  bool is_ns_rollback)
1683 {
1684  TJobStatus old_status;
1685  CNSPreciseTime current_time = CNSPreciseTime::Current();
1686 
1688 
1689  old_status = m_StatusTracker.GetStatus(job_id);
1690  if (old_status == CNetScheduleAPI::eJobNotFound)
1692 
1693  if (old_status == CNetScheduleAPI::eCanceled) {
1694  if (is_ns_rollback)
1696  else
1701  }
1702 
1703  auto job_iter = m_Jobs.find(job_id);
1704  if (job_iter == m_Jobs.end())
1706 
1707  CJobEvent * event = &job_iter->second.AppendEvent();
1708 
1709  event->SetNodeAddr(client.GetAddress());
1710  event->SetStatus(CNetScheduleAPI::eCanceled);
1711  if (is_ns_rollback)
1712  event->SetEvent(CJobEvent::eNSSubmitRollback);
1713  else
1714  event->SetEvent(CJobEvent::eCancel);
1715  event->SetTimestamp(current_time);
1716  event->SetClientNode(client.GetNode());
1717  event->SetClientSession(client.GetSession());
1718 
1719  job_iter->second.SetStatus(CNetScheduleAPI::eCanceled);
1720  job_iter->second.SetLastTouch(current_time);
1721 
1723  if (is_ns_rollback) {
1725  } else {
1728  g_DoPerfLogging(*this, job_iter->second, 200);
1729  }
1730 
1731  TimeLineRemove(job_id);
1732  if (old_status == CNetScheduleAPI::eRunning)
1734  else if (old_status == CNetScheduleAPI::eReading)
1736 
1738  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1739  m_ReadTimeout,
1741  current_time));
1742 
1743  x_NotifyJobChanges(job_iter->second, job_key,
1744  eStatusChanged, current_time);
1745 
1746  // Notify the readers if the job has not been given for reading yet
1747  // and it was not a rollback due to a socket write error
1748  if (!m_ReadJobs.get_bit(job_id) && is_ns_rollback == false) {
1749  m_GCRegistry.UpdateReadVacantTime(job_id, current_time);
1751  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
1754  }
1755 
1756  job = job_iter->second;
1757  return old_status;
1758 }
1759 
1760 
1762  bool logging)
1763 {
1764  vector<CNetScheduleAPI::EJobStatus> statuses;
1765 
1766  // All except cancelled
1767  statuses.push_back(CNetScheduleAPI::ePending);
1768  statuses.push_back(CNetScheduleAPI::eRunning);
1769  statuses.push_back(CNetScheduleAPI::eFailed);
1770  statuses.push_back(CNetScheduleAPI::eDone);
1771  statuses.push_back(CNetScheduleAPI::eReading);
1772  statuses.push_back(CNetScheduleAPI::eConfirmed);
1773  statuses.push_back(CNetScheduleAPI::eReadFailed);
1774 
1775  TNSBitVector jobs;
1777  m_StatusTracker.GetJobs(statuses, jobs);
1778  return x_CancelJobs(client, jobs, logging);
1779 }
1780 
1781 
1783  const TNSBitVector & candidates_to_cancel,
1784  bool logging)
1785 {
1786  CJob job;
1787  CNSPreciseTime current_time = CNSPreciseTime::Current();
1788  TNSBitVector jobs_to_cancel = candidates_to_cancel;
1789 
1790  // Filter the jobs basing on scope if so
1791  string scope = client.GetScope();
1792  if (scope.empty() || scope != kNoScopeOnly) {
1793  // Both these cases should consider only the non-scope jobs
1794  jobs_to_cancel -= m_ScopeRegistry.GetAllJobsInScopes();
1795  } else {
1796  // Consider only the jobs in the particular scope
1797  jobs_to_cancel &= m_ScopeRegistry.GetJobs(scope);
1798  }
1799 
1800  TNSBitVector::enumerator en(jobs_to_cancel.first());
1801  unsigned int count = 0;
1802  for (; en.valid(); ++en) {
1803  unsigned int job_id = *en;
1804  TJobStatus old_status = m_StatusTracker.GetStatus(job_id);
1805  auto job_iter = m_Jobs.find(job_id);
1806 
1807  if (job_iter == m_Jobs.end()) {
1808  ERR_POST("Cannot fetch job " << DecorateJob(job_id) <<
1809  " while cancelling jobs");
1810  continue;
1811  }
1812 
1813  CJobEvent * event = &job_iter->second.AppendEvent();
1814 
1815  event->SetNodeAddr(client.GetAddress());
1816  event->SetStatus(CNetScheduleAPI::eCanceled);
1817  event->SetEvent(CJobEvent::eCancel);
1818  event->SetTimestamp(current_time);
1819  event->SetClientNode(client.GetNode());
1820  event->SetClientSession(client.GetSession());
1821 
1822  job_iter->second.SetStatus(CNetScheduleAPI::eCanceled);
1823  job_iter->second.SetLastTouch(current_time);
1824 
1828  g_DoPerfLogging(*this, job_iter->second, 200);
1829 
1830  TimeLineRemove(job_id);
1831  if (old_status == CNetScheduleAPI::eRunning)
1833  else if (old_status == CNetScheduleAPI::eReading)
1835 
1837  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1838  m_ReadTimeout,
1840  current_time));
1841 
1842  x_NotifyJobChanges(job_iter->second, MakeJobKey(job_id),
1843  eStatusChanged, current_time);
1844 
1845  // Notify the readers if the job has not been given for reading yet
1846  if (!m_ReadJobs.get_bit(job_id)) {
1847  m_GCRegistry.UpdateReadVacantTime(job_id, current_time);
1849  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
1852  }
1853 
1854  if (logging)
1855  GetDiagContext().Extra()
1856  .Print("job_key", MakeJobKey(job_id))
1857  .Print("job_phid", job_iter->second.GetNCBIPHID());
1858 
1859  ++count;
1860  }
1861  return count;
1862 }
1863 
1864 
1865 // This function must be called under the operations lock.
1866 // If called for not existing job then an exception is generated
1869  TJobStatus status) const
1870 {
1871  if (status == CNetScheduleAPI::eRunning ||
1872  status == CNetScheduleAPI::eReading)
1873  return CNSPreciseTime::Current() + GetTimeout();
1874  return m_GCRegistry.GetLifetime(job_id);
1875 }
1876 
1877 
1878 unsigned int
1880  const string & group,
1881  const string & aff_token,
1882  const vector<TJobStatus> & job_statuses,
1883  bool logging,
1884  vector<string> & warnings)
1885 {
1886  if (group.empty() && aff_token.empty() && job_statuses.empty()) {
1887  // This possible if there was only 'Canceled' status and
1888  // it was filtered out. A warning for this case is already produced
1889  return 0;
1890  }
1891 
1892  TNSBitVector jobs_to_cancel;
1893  vector<TJobStatus> statuses;
1894 
1895  if (job_statuses.empty()) {
1896  // All statuses
1897  statuses.push_back(CNetScheduleAPI::ePending);
1898  statuses.push_back(CNetScheduleAPI::eRunning);
1899  statuses.push_back(CNetScheduleAPI::eCanceled);
1900  statuses.push_back(CNetScheduleAPI::eFailed);
1901  statuses.push_back(CNetScheduleAPI::eDone);
1902  statuses.push_back(CNetScheduleAPI::eReading);
1903  statuses.push_back(CNetScheduleAPI::eConfirmed);
1904  statuses.push_back(CNetScheduleAPI::eReadFailed);
1905  }
1906  else {
1907  // The user specified statuses explicitly
1908  // The list validity is checked by the caller.
1909  statuses = job_statuses;
1910  }
1911 
1913  m_StatusTracker.GetJobs(statuses, jobs_to_cancel);
1914 
1915  if (!group.empty()) {
1916  try {
1917  jobs_to_cancel &= m_GroupRegistry.GetJobs(group);
1918  } catch (...) {
1919  jobs_to_cancel.clear();
1920  warnings.push_back("eGroupNotFound:job group " + group +
1921  " is not found");
1922  if (logging)
1923  ERR_POST(Warning << "Job group '" + group +
1924  "' is not found. No jobs are canceled.");
1925  }
1926  }
1927 
1928  if (!aff_token.empty()) {
1929  unsigned int aff_id = m_AffinityRegistry.GetIDByToken(aff_token);
1930  if (aff_id == 0) {
1931  jobs_to_cancel.clear();
1932  warnings.push_back("eAffinityNotFound:affinity " + aff_token +
1933  " is not found");
1934  if (logging)
1935  ERR_POST(Warning << "Affinity '" + aff_token +
1936  "' is not found. No jobs are canceled.");
1937  }
1938  else
1939  jobs_to_cancel &= m_AffinityRegistry.GetJobsWithAffinity(aff_id);
1940  }
1941 
1942  return x_CancelJobs(client, jobs_to_cancel, logging);
1943 }
1944 
1945 
1946 TJobStatus CQueue::GetJobStatus(unsigned int job_id) const
1947 {
1948  return m_StatusTracker.GetStatus(job_id);
1949 }
1950 
1951 
1952 bool CQueue::IsEmpty() const
1953 {
1955  return !m_StatusTracker.AnyJobs();
1956 }
1957 
1958 
1959 unsigned int CQueue::GetNextId()
1960 {
1962 
1963  // Job indexes are expected to start from 1,
1964  // the m_LastId is 0 at the very beginning
1965  ++m_LastId;
1966  if (m_LastId >= m_SavedId) {
1968  if (m_SavedId < m_LastId) {
1969  // Overflow for the saved ID
1970  m_LastId = 1;
1972  }
1974  }
1975  return m_LastId;
1976 }
1977 
1978 
1979 // Reserves the given number of the job IDs
1980 unsigned int CQueue::GetNextJobIdForBatch(unsigned int count)
1981 {
1983 
1984  // Job indexes are expected to start from 1 and be monotonously growing
1985  unsigned int start_index = m_LastId;
1986 
1987  m_LastId += count;
1988  if (m_LastId < start_index ) {
1989  // Overflow
1990  m_LastId = count;
1993  }
1994 
1995  // There were no overflow, check the reserved value
1996  if (m_LastId >= m_SavedId) {
1998  if (m_SavedId < m_LastId) {
1999  // Overflow for the saved ID
2000  m_LastId = count;
2002  }
2004  }
2005 
2006  return m_LastId - count + 1;
2007 }
2008 
2009 
2010 bool
2012  unsigned int port,
2013  unsigned int timeout,
2014  const list<string> * aff_list,
2015  bool reader_affinity,
2016  bool any_affinity,
2017  bool exclusive_new_affinity,
2018  bool prioritized_aff,
2019  const list<string> * group_list,
2020  bool affinity_may_change,
2021  bool group_may_change,
2022  CJob * job,
2023  bool * no_more_jobs,
2024  CNSRollbackInterface * & rollback_action,
2025  string & added_pref_aff)
2026 {
2029  TNSBitVector group_ids_vector;
2030  bool has_groups = false;
2031  TNSBitVector aff_ids_vector;
2032  vector<unsigned int> aff_ids;
2033 
2034  // This is a reader command, so mark the node type as a reader
2036 
2037  *no_more_jobs = false;
2038 
2039  {{
2040 
2041  if (reader_affinity) {
2042  // Check that the preferred affinities were not reset
2044  return false;
2045 
2046  // Check that the client was garbage collected with preferred affs
2048  return false;
2049  }
2050 
2051  // Resolve affinities and groups. It is supposed that the client knows
2052  // better what affinities and groups to expect i.e. even if they do not
2053  // exist yet, they may appear soon.
2054  if (group_list != NULL) {
2055  m_GroupRegistry.ResolveGroups(*group_list, group_ids_vector);
2056  has_groups = !group_list->empty();
2057  }
2058  if (aff_list != NULL)
2059  m_AffinityRegistry.ResolveAffinities(*aff_list, aff_ids_vector,
2060  aff_ids);
2061 
2063  }}
2064 
2065  for (;;) {
2066  // Old comment:
2067  // No lock here to make it possible to pick a job
2068  // simultaneously from many threads
2069  // Current state:
2070  // The lock is taken at the beginning. Now there is not much of a
2071  // concurrency so a bit of performance is not needed anymore.
2072  // The rest is left untouched to simplify the changes
2073  x_SJobPick job_pick = x_FindVacantJob(client,
2074  aff_ids_vector, aff_ids,
2075  reader_affinity,
2076  any_affinity,
2077  exclusive_new_affinity,
2078  prioritized_aff,
2079  group_ids_vector, has_groups,
2080  eRead);
2081 
2082  {{
2083  bool outdated_job = false;
2084  TJobStatus old_status;
2085 
2086  if (job_pick.job_id == 0) {
2087  if (exclusive_new_affinity)
2088  job_pick = x_FindOutdatedJobForReading(client, 0,
2089  group_ids_vector);
2090 
2091  if (job_pick.job_id == 0) {
2092  *no_more_jobs = x_NoMoreReadJobs(client, aff_ids_vector,
2093  reader_affinity, any_affinity,
2094  exclusive_new_affinity,
2095  group_ids_vector,
2096  affinity_may_change,
2097  group_may_change);
2098  if (timeout != 0 && port > 0)
2099  x_RegisterReadListener(client, port, timeout,
2100  aff_ids_vector,
2101  reader_affinity, any_affinity,
2102  exclusive_new_affinity,
2103  group_ids_vector);
2104  return true;
2105  }
2106  outdated_job = true;
2107  } else {
2108  // Check that the job is still Done/Failed/Canceled
2109  // it could be grabbed by another reader or GC
2110  old_status = GetJobStatus(job_pick.job_id);
2111  if (old_status != CNetScheduleAPI::eDone &&
2112  old_status != CNetScheduleAPI::eFailed &&
2113  old_status != CNetScheduleAPI::eCanceled)
2114  continue; // try to pick another job
2115 
2116  if (exclusive_new_affinity) {
2118  job_pick.job_id, eRead,
2119  m_MaxPendingReadWaitTimeout) == false) {
2120  x_SJobPick outdated_pick =
2122  client, job_pick.job_id,
2123  group_ids_vector);
2124  if (outdated_pick.job_id != 0) {
2125  job_pick = outdated_pick;
2126  outdated_job = true;
2127  }
2128  }
2129  }
2130  }
2131 
2132  // The job is still in acceptable state. Check if it was received
2133  // with exclusive affinity
2134  if (job_pick.exclusive && job_pick.aff_id != 0 &&
2135  outdated_job == false) {
2137  continue; // Other reader grabbed this affinity already
2138 
2139  string aff_token = m_AffinityRegistry.GetTokenByID(
2140  job_pick.aff_id);
2141  // CXX-8843: The '-' affinity must not be added to the list of
2142  // preferred affinities
2143  if (aff_token != k_NoAffinityToken) {
2145  client, job_pick.aff_id, 0, eRead);
2146  if (added)
2147  added_pref_aff = aff_token;
2148  }
2149  }
2150 
2151  if (outdated_job && job_pick.aff_id != 0) {
2152  string aff_token = m_AffinityRegistry.GetTokenByID(
2153  job_pick.aff_id);
2154  // CXX-8843: The '-' affinity must not be added to the list of
2155  // preferred affinities
2156  if (aff_token != k_NoAffinityToken) {
2157  bool added = m_ClientsRegistry.
2158  UpdatePreferredAffinities(
2159  client, job_pick.aff_id, 0, eRead);
2160  if (added)
2161  added_pref_aff = aff_token;
2162  }
2163  }
2164 
2165  old_status = GetJobStatus(job_pick.job_id);
2166  x_UpdateDB_ProvideJobNoLock(client, curr, job_pick.job_id,
2167  eRead, *job);
2168  m_StatusTracker.SetStatus(job_pick.job_id,
2170 
2173  g_DoPerfLogging(*this, *job, 200);
2174 
2175  if (outdated_job)
2177 
2180  m_RunTimeout,
2181  m_ReadTimeout,
2183  curr));
2184  TimeLineAdd(job_pick.job_id, curr + m_ReadTimeout);
2186 
2187  x_NotifyJobChanges(*job, MakeJobKey(job_pick.job_id),
2188  eStatusChanged, curr);
2189 
2190  rollback_action = new CNSReadJobRollback(client, job_pick.job_id,
2191  old_status);
2192  m_ReadJobs.set_bit(job_pick.job_id);
2193  ++m_ReadJobsOps;
2194  return true;
2195  }}
2196  }
2197  return true; // unreachable
2198 }
2199 
2200 
2202  unsigned int job_id,
2203  const string & job_key,
2204  CJob & job,
2205  const string & auth_token)
2206 {
2207  TJobStatus old_status = x_ChangeReadingStatus(
2208  client, job_id, job_key,
2209  job, auth_token, "",
2211  false, false);
2213  return old_status;
2214 }
2215 
2216 
2218  unsigned int job_id,
2219  const string & job_key,
2220  CJob & job,
2221  const string & auth_token,
2222  const string & err_msg,
2223  bool no_retries)
2224 {
2225  TJobStatus old_status = x_ChangeReadingStatus(
2226  client, job_id, job_key,
2227  job, auth_token, err_msg,
2229  false, no_retries);
2231  return old_status;
2232 }
2233 
2234 
2236  unsigned int job_id,
2237  const string & job_key,
2238  CJob & job,
2239  const string & auth_token,
2240  bool is_ns_rollback,
2241  bool blacklist,
2242  TJobStatus target_status)
2243 {
2244  TJobStatus old_status = x_ChangeReadingStatus(
2245  client, job_id, job_key,
2246  job, auth_token, "",
2247  target_status,
2248  is_ns_rollback,
2249  false);
2250  if (is_ns_rollback || blacklist == false)
2252  else
2254  return old_status;
2255 }
2256 
2257 
2259  unsigned int job_id,
2260  const string & job_key,
2261  CJob & job,
2262  bool & no_op)
2263 {
2264  CNSPreciseTime current_time = CNSPreciseTime::Current();
2266  TJobStatus old_status = GetJobStatus(job_id);
2267 
2268  if (old_status == CNetScheduleAPI::eJobNotFound ||
2269  old_status == CNetScheduleAPI::ePending ||
2270  old_status == CNetScheduleAPI::eRunning ||
2271  old_status == CNetScheduleAPI::eReading)
2272  return old_status;
2273 
2274  if (old_status == CNetScheduleAPI::eFailed ||
2275  old_status == CNetScheduleAPI::eDone) {
2276  no_op = true;
2277  return old_status;
2278  }
2279 
2280  // Check that the job has been read already
2281  if (!m_ReadJobs.get_bit(job_id)) {
2282  no_op = true;
2283  return old_status;
2284  }
2285 
2286  TJobStatus state_before_read = CNetScheduleAPI::eJobNotFound;
2287  auto job_iter = m_Jobs.find(job_id);
2288 
2289  if (job_iter == m_Jobs.end())
2290  NCBI_THROW(CNetScheduleException, eInternalError,
2291  "Error fetching job");
2292 
2293  const vector<CJobEvent>& job_events = job_iter->second.GetEvents();
2294  if (job_events.empty())
2295  NCBI_THROW(CNetScheduleException, eInternalError,
2296  "Inconsistency: a job has no events");
2297 
2298  state_before_read = job_iter->second.GetStatusBeforeReading();
2299 
2300  CJobEvent * event = &job_iter->second.AppendEvent();
2301  event->SetNodeAddr(client.GetAddress());
2302  event->SetStatus(state_before_read);
2303  event->SetEvent(CJobEvent::eReread);
2304  event->SetTimestamp(current_time);
2305  event->SetClientNode(client.GetNode());
2306  event->SetClientSession(client.GetSession());
2307 
2308  job_iter->second.SetStatus(state_before_read);
2309  job_iter->second.SetLastTouch(current_time);
2310 
2311  m_StatusTracker.SetStatus(job_id, state_before_read);
2312  m_StatisticsCounters.CountReread(old_status, state_before_read);
2313  g_DoPerfLogging(*this, job_iter->second, 200);
2314 
2316  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
2317  m_ReadTimeout,
2319  current_time));
2320 
2321  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, current_time);
2322 
2323  // Notify the readers
2324  m_NotificationsList.Notify(job_id, job_iter->second.GetAffinityId(),
2331  eRead);
2332 
2333  m_ReadJobs.set_bit(job_id, false);
2334  ++m_ReadJobsOps;
2335 
2336  job = job_iter->second;
2337  return old_status;
2338 }
2339 
2340 
2342  unsigned int job_id,
2343  const string & job_key,
2344  CJob & job,
2345  const string & auth_token,
2346  const string & err_msg,
2347  TJobStatus target_status,
2348  bool is_ns_rollback,
2349  bool no_retries)
2350 {
2351  CNSPreciseTime current_time =
2356  TJobStatus old_status =
2357  GetJobStatus(job_id);
2358 
2359  if (old_status != CNetScheduleAPI::eReading)
2360  return old_status;
2361 
2362  auto job_iter = m_Jobs.find(job_id);
2363  if (job_iter == m_Jobs.end())
2364  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
2365 
2366  // Check that authorization token matches
2367  if (is_ns_rollback == false) {
2368  CJob::EAuthTokenCompareResult token_compare_result =
2369  job_iter->second.CompareAuthToken(auth_token);
2370  if (token_compare_result == CJob::eInvalidTokenFormat)
2371  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
2372  "Invalid authorization token format");
2373  if (token_compare_result == CJob::eNoMatch)
2374  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
2375  "Authorization token does not match");
2376  }
2377 
2378  // Sanity check of the current job state
2379  if (job_iter->second.GetStatus() != CNetScheduleAPI::eReading)
2380  NCBI_THROW(CNetScheduleException, eInternalError,
2381  "Internal inconsistency detected. The job state in memory is " +
2383  " while in database it is " +
2384  CNetScheduleAPI::StatusToString(job_iter->second.GetStatus()));
2385 
2386  if (target_status == CNetScheduleAPI::eJobNotFound)
2387  target_status = job_iter->second.GetStatusBeforeReading();
2388 
2389 
2390  // Add an event
2391  CJobEvent & event = job_iter->second.AppendEvent();
2392  event.SetTimestamp(current_time);
2393  event.SetNodeAddr(client.GetAddress());
2394  event.SetClientNode(client.GetNode());
2395  event.SetClientSession(client.GetSession());
2396  event.SetErrorMsg(err_msg);
2397 
2398  if (is_ns_rollback) {
2399  event.SetEvent(CJobEvent::eNSReadRollback);
2400  job_iter->second.SetReadCount(job_iter->second.GetReadCount() - 1);
2401  } else {
2402  switch (target_status) {
2406  event.SetEvent(CJobEvent::eReadRollback);
2407  job_iter->second.SetReadCount(job_iter->second.GetReadCount() - 1);
2408  break;
2410  if (no_retries) {
2411  event.SetEvent(CJobEvent::eReadFinalFail);
2412  } else {
2413  event.SetEvent(CJobEvent::eReadFail);
2414  // Check the number of tries first
2415  if (job_iter->second.GetReadCount() <= m_ReadFailedRetries) {
2416  // The job needs to be re-scheduled for reading
2417  target_status = CNetScheduleAPI::eDone;
2418  path_option = CStatisticsCounters::eFail;
2419  }
2420  }
2421  break;
2423  event.SetEvent(CJobEvent::eReadDone);
2424  break;
2425  default:
2426  _ASSERT(0);
2427  break;
2428  }
2429  }
2430 
2431  event.SetStatus(target_status);
2432  job_iter->second.SetStatus(target_status);
2433  job_iter->second.SetLastTouch(current_time);
2434 
2435  if (target_status != CNetScheduleAPI::eConfirmed &&
2436  target_status != CNetScheduleAPI::eReadFailed) {
2437  m_ReadJobs.set_bit(job_id, false);
2438  ++m_ReadJobsOps;
2439 
2440  m_GCRegistry.UpdateReadVacantTime(job_id, current_time);
2441 
2442  // Notify the readers
2444  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
2447  }
2448 
2449  TimeLineRemove(job_id);
2450 
2451  m_StatusTracker.SetStatus(job_id, target_status);
2453  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
2455  current_time));
2456  if (is_ns_rollback)
2458  else
2460  target_status,
2461  path_option);
2462  g_DoPerfLogging(*this, job_iter->second, 200);
2463  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, current_time);
2464 
2465  job = job_iter->second;
2467 }
2468 
2469 
2470 // This function is called from places where the operations lock has been
2471 // already taken. So there is no lock around memory status tracker
2472 void CQueue::EraseJob(unsigned int job_id, TJobStatus status)
2473 {
2474  m_StatusTracker.Erase(job_id);
2475 
2476  {{
2477  // Request delayed record delete
2479 
2480  m_JobsToDelete.set_bit(job_id);
2482  }}
2483  TimeLineRemove(job_id);
2485 }
2486 
2487 
2488 // status - the job status from which the job was deleted
2489 void CQueue::x_Erase(const TNSBitVector & job_ids, TJobStatus status)
2490 {
2491  size_t job_count = job_ids.count();
2492  if (job_count <= 0)
2493  return;
2494 
2496 
2497  m_JobsToDelete |= job_ids;
2498  m_JobsToDeleteOps += job_count;
2500 }
2501 
2502 
2504 {
2506 }
2507 
2508 
2511  const TNSBitVector & explicit_affs,
2512  const vector<unsigned int> & aff_ids,
2513  bool use_pref_affinity,
2514  bool any_affinity,
2515  bool exclusive_new_affinity,
2516  bool prioritized_aff,
2517  const TNSBitVector & group_ids,
2518  bool has_groups,
2519  ECommandGroup cmd_group)
2520 {
2521  string scope = client.GetScope();
2522  string virtual_scope = client.GetVirtualScope();
2523 
2524  if (!virtual_scope.empty()) {
2525  // Try first this scope: see CXX-5324
2526  x_SJobPick job_pick = x_FindVacantJob(client, explicit_affs,
2527  aff_ids, use_pref_affinity,
2528  any_affinity,
2529  exclusive_new_affinity,
2530  prioritized_aff,
2531  group_ids, has_groups,
2532  cmd_group, virtual_scope);
2533  if (job_pick.job_id != 0)
2534  return job_pick;
2535 
2536  // Fallback to a regular pick
2537  }
2538 
2539  return x_FindVacantJob(client, explicit_affs, aff_ids, use_pref_affinity,
2540  any_affinity, exclusive_new_affinity,
2541  prioritized_aff, group_ids, has_groups,
2542  cmd_group, scope);
2543 }
2544 
2545 
2548  const TNSBitVector & explicit_affs,
2549  const vector<unsigned int> & aff_ids,
2550  bool use_pref_affinity,
2551  bool any_affinity,
2552  bool exclusive_new_affinity,
2553  bool prioritized_aff,
2554  const TNSBitVector & group_ids,
2555  bool has_groups,
2556  ECommandGroup cmd_group,
2557  const string & scope)
2558 {
2559  bool explicit_aff = !aff_ids.empty();
2560  bool effective_use_pref_affinity = use_pref_affinity;
2561  TNSBitVector pref_aff_candidate_jobs;
2562  TNSBitVector exclusive_aff_candidate_jobs;
2563 
2564  // Jobs per client support: CXX-11138 (only for GET)
2565  map<string, size_t> running_jobs_per_client;
2566  if (m_MaxJobsPerClient > 0 && cmd_group == eGet) {
2567  running_jobs_per_client = x_GetRunningJobsPerClientIP();
2568  }
2569 
2571  client, cmd_group);
2572  if (use_pref_affinity)
2573  effective_use_pref_affinity = use_pref_affinity && pref_aff.any();
2574 
2575  if (explicit_aff || effective_use_pref_affinity || exclusive_new_affinity) {
2576  // Check all vacant jobs: pending jobs for eGet,
2577  // done/failed/cancel jobs for eRead
2578  TNSBitVector vacant_jobs;
2579  if (cmd_group == eGet)
2581  else
2582  m_StatusTracker.GetJobs(m_StatesForRead, vacant_jobs);
2583 
2584  if (scope.empty() || scope == kNoScopeOnly) {
2585  // Both these cases should consider only the non-scope jobs
2586  vacant_jobs -= m_ScopeRegistry.GetAllJobsInScopes();
2587  } else {
2588  // Consider only the jobs in the particular scope
2589  vacant_jobs &= m_ScopeRegistry.GetJobs(scope);
2590  }
2591 
2592  // Exclude blacklisted jobs
2594  vacant_jobs);
2595  // Keep only the group jobs if the groups are provided
2596  if (has_groups)
2597  m_GroupRegistry.RestrictByGroup(group_ids, vacant_jobs);
2598 
2599  // Exclude jobs which have been read or in a process of reading
2600  if (cmd_group == eRead)
2601  vacant_jobs -= m_ReadJobs;
2602 
2603  if (prioritized_aff) {
2604  // The criteria here is a list of explicit affinities
2605  // (respecting their order) which may be followed by any affinity
2606  for (vector<unsigned int>::const_iterator k = aff_ids.begin();
2607  k != aff_ids.end(); ++k) {
2608  TNSBitVector aff_jobs = m_AffinityRegistry.
2609  GetJobsWithAffinity(*k);
2610  TNSBitVector candidates = vacant_jobs & aff_jobs;
2611  if (candidates.any()) {
2612  // Need to check the running jobs per client ip
2613  TNSBitVector::enumerator en(candidates.first());
2614  for (; en.valid(); ++en) {
2615  auto job_id = *en;
2617  job_id, running_jobs_per_client)) {
2618  return x_SJobPick(job_id, false, *k);
2619  }
2620  }
2621  }
2622  }
2623  if (any_affinity) {
2624  if (vacant_jobs.any()) {
2625  // Need to check the running jobs per client ip
2626  TNSBitVector::enumerator en(vacant_jobs.first());
2627  for (; en.valid(); ++en) {
2628  auto job_id = *en;
2630  job_id, running_jobs_per_client)) {
2631  return x_SJobPick(job_id, false,
2632  m_GCRegistry.GetAffinityID(job_id));
2633  }
2634  }
2635  }
2636  }
2637  return x_SJobPick();
2638  }
2639 
2640  // HERE: no prioritized affinities
2641  TNSBitVector all_pref_aff;
2642  if (exclusive_new_affinity)
2644  cmd_group);
2645 
2646  TNSBitVector::enumerator en(vacant_jobs.first());
2647  for (; en.valid(); ++en) {
2648  unsigned int job_id = *en;
2649 
2650  unsigned int aff_id = m_GCRegistry.GetAffinityID(job_id);
2651  if (aff_id != 0 && explicit_aff) {
2652  if (explicit_affs.get_bit(aff_id)) {
2654  job_id, running_jobs_per_client)) {
2655  return x_SJobPick(job_id, false, aff_id);
2656  }
2657  }
2658  }
2659 
2660  if (aff_id != 0 && effective_use_pref_affinity) {
2661  if (pref_aff.get_bit(aff_id)) {
2662  if (explicit_aff == false) {
2664  job_id, running_jobs_per_client)) {
2665  return x_SJobPick(job_id, false, aff_id);
2666  }
2667  }
2668 
2669  pref_aff_candidate_jobs.set_bit(job_id);
2670  continue;
2671  }
2672  }
2673 
2674  if (exclusive_new_affinity) {
2675  if (aff_id == 0 || all_pref_aff.get_bit(aff_id) == false) {
2676  if (explicit_aff == false &&
2677  effective_use_pref_affinity == false) {
2679  job_id, running_jobs_per_client)) {
2680  return x_SJobPick(job_id, true, aff_id);
2681  }
2682  }
2683 
2684  exclusive_aff_candidate_jobs.set_bit(job_id);
2685  }
2686  }
2687  } // end for
2688 
2689  TNSBitVector::enumerator en1(pref_aff_candidate_jobs.first());
2690  for (; en1.valid(); ++en1) {
2691  if (x_ValidateMaxJobsPerClientIP(*en1, running_jobs_per_client)) {
2692  return x_SJobPick(*en1, false, 0);
2693  }
2694  }
2695 
2696  TNSBitVector::enumerator en2(exclusive_aff_candidate_jobs.first());
2697  for (; en2.valid(); ++en2) {
2698  if (x_ValidateMaxJobsPerClientIP(*en2, running_jobs_per_client)) {
2699  return x_SJobPick(*en2, true, m_GCRegistry.GetAffinityID(*en2));
2700  }
2701  }
2702  }
2703 
2704  // The second condition looks strange and it covers a very specific
2705  // scenario: some (older) worker nodes may originally come with the only
2706  // flag set - use preferred affinities - while they have nothing in the
2707  // list of preferred affinities yet. In this case a first pending job
2708  // should be provided.
2709  if (any_affinity ||
2710  (!explicit_aff &&
2711  use_pref_affinity && !effective_use_pref_affinity &&
2712  !exclusive_new_affinity &&
2713  cmd_group == eGet)) {
2714 
2715  TNSBitVector jobs_in_scope;
2716  TNSBitVector restricted_jobs;
2717  bool no_scope_only = scope.empty() ||
2718  scope == kNoScopeOnly;
2719  unsigned int job_id = 0;
2720 
2721  if (no_scope_only)
2722  jobs_in_scope = m_ScopeRegistry.GetAllJobsInScopes();
2723  else {
2724  restricted_jobs = m_ScopeRegistry.GetJobs(scope);
2725  if (has_groups)
2726  m_GroupRegistry.RestrictByGroup(group_ids, restricted_jobs);
2727  }
2728 
2729  if (cmd_group == eGet) {
2730  // NOTE: this only to avoid an expensive temporary bvector
2732  jobs_in_scope);
2733 
2734  TNSBitVector pending_jobs;
2736  TNSBitVector::enumerator en = pending_jobs.first();
2737 
2738  if (no_scope_only) {
2739  // only the jobs which are not in the scope
2740  if (has_groups) {
2741  TNSBitVector group_jobs = m_GroupRegistry.GetJobs(group_ids);
2742  for (; en.valid(); ++en) {
2743  unsigned int candidate_job_id = *en;
2744  if (jobs_in_scope.get_bit(candidate_job_id))
2745  continue;
2746  if (!group_jobs.get_bit(candidate_job_id))
2747  continue;
2748  if (x_ValidateMaxJobsPerClientIP(candidate_job_id,
2749  running_jobs_per_client)) {
2750  job_id = candidate_job_id;
2751  break;
2752  }
2753  }
2754  } else {
2755  for (; en.valid(); ++en) {
2756  unsigned int candidate_job_id = *en;
2757  if (jobs_in_scope.get_bit(candidate_job_id))
2758  continue;
2759  if (x_ValidateMaxJobsPerClientIP(candidate_job_id,
2760  running_jobs_per_client)) {
2761  job_id = candidate_job_id;
2762  break;
2763  }
2764  }
2765  }
2766  } else {
2767  // only the specific scope jobs
2768  for (; en.valid(); ++en) {
2769  unsigned int candidate_job_id = *en;
2770  if (jobs_in_scope.get_bit(candidate_job_id))
2771  continue;
2772  if (!restricted_jobs.get_bit(candidate_job_id))
2773  continue;
2774  if (x_ValidateMaxJobsPerClientIP(candidate_job_id,
2775  running_jobs_per_client)) {
2776  job_id = candidate_job_id;
2777  break;
2778  }
2779  }
2780  }
2781  } else {
2782  if (no_scope_only) {
2783  // only the jobs which are not in the scope
2784 
2785  // NOTE: this only to avoid an expensive temporary bvector
2786  jobs_in_scope |= m_ReadJobs;
2788  jobs_in_scope);
2789  if (has_groups)
2792  jobs_in_scope,
2793  m_GroupRegistry.GetJobs(group_ids),
2794  has_groups);
2795  else
2798  jobs_in_scope,
2800  false);
2801  } else {
2802  // only the specific scope jobs
2803 
2804  // NOTE: this only to avoid an expensive temporary bvector
2805  jobs_in_scope = m_ReadJobs;
2807  jobs_in_scope);
2810  jobs_in_scope,
2811  restricted_jobs, true);
2812  }
2813  }
2814  return x_SJobPick(job_id, false, 0);
2815  }
2816 
2817  return x_SJobPick();
2818 }
2819 
2820 // Provides a map between the client IP and the number of running jobs
2822 {
2823  map<string, size_t> ret;
2824  TNSBitVector running_jobs;
2825 
2827  TNSBitVector::enumerator en(running_jobs.first());
2828  for (; en.valid(); ++en) {
2829  auto job_iter = m_Jobs.find(*en);
2830  if (job_iter != m_Jobs.end()) {
2831  string client_ip = job_iter->second.GetClientIP();
2832  auto iter = ret.find(client_ip);
2833  if (iter == ret.end()) {
2834  ret[client_ip] = 1;
2835  } else {
2836  iter->second += 1;
2837  }
2838  }
2839  }
2840  return ret;
2841 }
2842 
2843 
2844 bool
2846  unsigned int job_id,
2847  const map<string, size_t> & jobs_per_client_ip) const
2848 {
2849  if (jobs_per_client_ip.empty())
2850  return true;
2851 
2852  auto job_iter = m_Jobs.find(job_id);
2853  if (job_iter == m_Jobs.end())
2854  return true;
2855 
2856  string client_ip = job_iter->second.GetClientIP();
2857  auto iter = jobs_per_client_ip.find(client_ip);
2858  if (iter == jobs_per_client_ip.end())
2859  return true;
2860  return iter->second < m_MaxJobsPerClient;
2861 }
2862 
2863 
2866  unsigned int picked_earlier,
2867  const TNSBitVector & group_ids)
2868 {
2870  return x_SJobPick(); // Not configured
2871 
2872  string scope = client.GetScope();
2873  string virtual_scope = client.GetVirtualScope();
2874 
2875  if (!virtual_scope.empty()) {
2876  // Try first this scope: see CXX-5324
2877  x_SJobPick job_pick = x_FindOutdatedPendingJob(client, picked_earlier,
2878  group_ids,
2879  virtual_scope);
2880  if (job_pick.job_id != 0)
2881  return job_pick;
2882 
2883  // Fallback to a regular outdated pick
2884  }
2885 
2886  return x_FindOutdatedPendingJob(client, picked_earlier,
2887  group_ids, scope);
2888 }
2889 
2890 
2893  unsigned int picked_earlier,
2894  const TNSBitVector & group_ids,
2895  const string & scope)
2896 {
2897  TNSBitVector outdated_pending =
2900  m_GCRegistry);
2901  if (picked_earlier != 0)
2902  outdated_pending.set_bit(picked_earlier, false);
2903 
2905 
2906  if (scope.empty() || scope == kNoScopeOnly)
2907  outdated_pending -= m_ScopeRegistry.GetAllJobsInScopes();
2908  else
2909  outdated_pending &= m_ScopeRegistry.GetJobs(scope);
2910 
2911  if (group_ids.any())
2912  m_GroupRegistry.RestrictByGroup(group_ids, outdated_pending);
2913 
2914  if (!outdated_pending.any())
2915  return x_SJobPick();
2916 
2917 
2918  x_SJobPick job_pick;
2919  job_pick.job_id = *outdated_pending.first();
2920  job_pick.aff_id = m_GCRegistry.GetAffinityID(job_pick.job_id);
2921  job_pick.exclusive = job_pick.aff_id != 0;
2922  return job_pick;
2923 }
2924 
2925 
2928  unsigned int picked_earlier,
2929  const TNSBitVector & group_ids)
2930 {
2932  return x_SJobPick(); // Not configured
2933 
2934  string scope = client.GetScope();
2935  string virtual_scope = client.GetVirtualScope();
2936 
2937  if (!virtual_scope.empty()) {
2938  // Try first this scope: see CXX-5324
2940  picked_earlier,
2941  group_ids,
2942  virtual_scope);
2943  if (job_pick.job_id != 0)
2944  return job_pick;
2945 
2946  // Fallback to a regular outdated pick
2947  }
2948 
2949  return x_FindOutdatedJobForReading(client, picked_earlier,
2950  group_ids, scope);
2951 }
2952 
2953 
2956  unsigned int picked_earlier,
2957  const TNSBitVector & group_ids,
2958  const string & scope)
2959 {
2960  TNSBitVector outdated_read_jobs =
2964  if (picked_earlier != 0)
2965  outdated_read_jobs.set_bit(picked_earlier, false);
2966 
2968  outdated_read_jobs);
2969 
2970  if (scope.empty() || scope == kNoScopeOnly)
2971  outdated_read_jobs -= m_ScopeRegistry.GetAllJobsInScopes();
2972  else
2973  outdated_read_jobs &= m_ScopeRegistry.GetJobs(scope);
2974 
2975  if (group_ids.any())
2976  m_GroupRegistry.RestrictByGroup(group_ids, outdated_read_jobs);
2977 
2978  if (!outdated_read_jobs.any())
2979  return x_SJobPick();
2980 
2981  unsigned int job_id = *outdated_read_jobs.first();
2982  unsigned int aff_id = m_GCRegistry.GetAffinityID(job_id);
2983  return x_SJobPick(job_id, aff_id != 0, aff_id);
2984 }
2985 
2986 
2988  unsigned int job_id,
2989  const string & job_key,
2990  CJob & job,
2991  const string & auth_token,
2992  const string & err_msg,
2993  const string & output,
2994  int ret_code,
2995  bool no_retries,
2996  string warning)
2997 {
2998  unsigned failed_retries;
2999  unsigned max_output_size;
3000  {{
3001  CQueueParamAccessor qp(*this);
3002  failed_retries = qp.GetFailedRetries();
3003  max_output_size = qp.GetMaxOutputSize();
3004  }}
3005 
3006  if (output.size() > max_output_size) {
3007  NCBI_THROW(CNetScheduleException, eDataTooLong,
3008  "Output is too long");
3009  }
3010 
3012  bool rescheduled = false;
3013  TJobStatus old_status;
3014 
3016  TJobStatus new_status = CNetScheduleAPI::eFailed;
3017 
3018  old_status = GetJobStatus(job_id);
3019  if (old_status == CNetScheduleAPI::eFailed) {
3022  return old_status;
3023  }
3024 
3025  if (old_status != CNetScheduleAPI::eRunning) {
3026  // No job state change
3027  return old_status;
3028  }
3029 
3030  auto job_iter = m_Jobs.find(job_id);
3031  if (job_iter == m_Jobs.end())
3032  NCBI_THROW(CNetScheduleException, eInternalError,
3033  "Error fetching job");
3034 
3035  if (!auth_token.empty()) {
3036  // Need to check authorization token first
3037  CJob::EAuthTokenCompareResult token_compare_result =
3038  job_iter->second.CompareAuthToken(auth_token);
3039  if (token_compare_result == CJob::eInvalidTokenFormat)
3040  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
3041  "Invalid authorization token format");
3042  if (token_compare_result == CJob::eNoMatch)
3043  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
3044  "Authorization token does not match");
3045  if (token_compare_result == CJob::ePassportOnlyMatch) {
3046  // That means the job has been given to another worker node
3047  // by whatever reason (expired/failed/returned before)
3048  ERR_POST(Warning << "Received FPUT2 with only "
3049  "passport matched.");
3050  warning = "eJobPassportOnlyMatch:Only job passport "
3051  "matched. Command is ignored.";
3052  job = job_iter->second;
3053  return old_status;
3054  }
3055  // Here: the authorization token is OK, we can continue
3056  }
3057 
3058  CJobEvent * event = job_iter->second.GetLastEvent();
3059  if (!event)
3060  ERR_POST("No JobEvent for running job");
3061 
3062  event = &job_iter->second.AppendEvent();
3063  if (no_retries)
3064  event->SetEvent(CJobEvent::eFinalFail);
3065  else
3066  event->SetEvent(CJobEvent::eFail);
3067  event->SetStatus(CNetScheduleAPI::eFailed);
3068  event->SetTimestamp(curr);
3069  event->SetErrorMsg(err_msg);
3070  event->SetRetCode(ret_code);
3071  event->SetNodeAddr(client.GetAddress());
3072  event->SetClientNode(client.GetNode());
3073  event->SetClientSession(client.GetSession());
3074 
3075  if (no_retries) {
3076  job_iter->second.SetStatus(CNetScheduleAPI::eFailed);
3077  event->SetStatus(CNetScheduleAPI::eFailed);
3078  rescheduled = false;
3079  if (m_Log)
3080  ERR_POST(Warning << "Job failed "
3081  "unconditionally, no_retries = 1");
3082  } else {
3083  unsigned run_count = job_iter->second.GetRunCount();
3084  if (run_count <= failed_retries) {
3085  job_iter->second.SetStatus(CNetScheduleAPI::ePending);
3086  event->SetStatus(CNetScheduleAPI::ePending);
3087 
3088  new_status = CNetScheduleAPI::ePending;
3089 
3090  rescheduled = true;
3091  } else {
3092  job_iter->second.SetStatus(CNetScheduleAPI::eFailed);
3093  event->SetStatus(CNetScheduleAPI::eFailed);
3094  new_status = CNetScheduleAPI::eFailed;
3095  rescheduled = false;
3096  if (m_Log)
3097  ERR_POST(Warning << "Job failed, exceeded "
3098  "max number of retries ("
3099  << failed_retries << ")");
3100  }
3101  }
3102 
3103  job_iter->second.SetOutput(output);
3104  job_iter->second.SetLastTouch(curr);
3105 
3106  m_StatusTracker.SetStatus(job_id, new_status);
3107  if (new_status == CNetScheduleAPI::ePending)
3109  new_status,
3111  else
3113  new_status,
3115  g_DoPerfLogging(*this, job_iter->second, 200);
3116 
3117  TimeLineRemove(job_id);
3118 
3119  // Replace it with ClearExecuting(client, job_id) when all clients
3120  // provide their credentials and job passport is checked strictly
3123 
3125  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
3126  m_ReadTimeout,
3127  m_PendingTimeout, curr));
3128 
3129  if (rescheduled && m_PauseStatus == eNoPause)
3131  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
3134 
3135  if (new_status == CNetScheduleAPI::eFailed)
3136  if (!m_ReadJobs.get_bit(job_id)) {
3137  m_GCRegistry.UpdateReadVacantTime(job_id, curr);
3139  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
3142  }
3143 
3144  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, curr);
3145 
3146  job = job_iter->second;
3147  return old_status;
3148 }
3149 
3150 
3151 string CQueue::GetAffinityTokenByID(unsigned int aff_id) const
3152 {
3153  return m_AffinityRegistry.GetTokenByID(aff_id);
3154 }
3155 
3156 
3158  bool & client_was_found,
3159  string & old_session,
3160  bool & had_wn_pref_affs,
3161  bool & had_reader_pref_affs)
3162 {
3163  // Get the running and reading jobs and move them to the corresponding
3164  // states (pending and done)
3165 
3166  TNSBitVector running_jobs;
3167  TNSBitVector reading_jobs;
3168 
3169  {{
3171  m_ClientsRegistry.ClearClient(client, running_jobs, reading_jobs,
3172  client_was_found, old_session,
3173  had_wn_pref_affs, had_reader_pref_affs);
3174 
3175  }}
3176 
3177  if (running_jobs.any())
3178  x_ResetRunningDueToClear(client, running_jobs);
3179  if (reading_jobs.any())
3180  x_ResetReadingDueToClear(client, reading_jobs);
3181  return;
3182 }
3183 
3184 
3185 // Triggered from a notification thread only
3187 {
3189  // Pending outdated timeout is configured, so check outdated jobs
3191  TNSBitVector outdated_jobs =
3194  m_GCRegistry);
3195  if (outdated_jobs.any())
3196  m_NotificationsList.CheckOutdatedJobs(outdated_jobs,
3199  eGet);
3200  }
3201 
3203  // Read pending timeout is configured, so check read outdated jobs
3205  TNSBitVector outdated_jobs =
3209  if (outdated_jobs.any())
3210  m_NotificationsList.CheckOutdatedJobs(outdated_jobs,
3213  eRead);
3214  }
3215 
3216 
3217  // Check the configured notification interval
3218  static CNSPreciseTime last_notif_timeout = kTimeNever;
3219  static size_t skip_limit = 0;
3220  static size_t skip_count;
3221 
3222  if (m_NotifHifreqInterval != last_notif_timeout) {
3223  last_notif_timeout = m_NotifHifreqInterval;
3224  skip_count = 0;
3225  skip_limit = size_t(m_NotifHifreqInterval/0.1);
3226  }
3227 
3228  ++skip_count;
3229  if (skip_count < skip_limit)
3230  return;
3231 
3232  skip_count = 0;
3233 
3234  // The NotifyPeriodically() and CheckTimeout() calls may need to modify
3235  // the clients and affinity registry so it is safer to take the queue lock.
3241  else
3243 }
3244 
3245 
3247 {
3249 }
3250 
3251 
3253 {
3255  return m_ClientsRegistry.PrintClientsList(this,
3257 }
3258 
3259 
3261 {
3265 }
3266 
3267 
3269  bool verbose) const
3270 {
3271  TNSBitVector scope_jobs;
3272  string scope = client.GetScope();
3274 
3275  if (scope == kNoScopeOnly)
3276  scope_jobs = m_ScopeRegistry.GetAllJobsInScopes();
3277  else if (!scope.empty())
3278  scope_jobs = m_ScopeRegistry.GetJobs(scope);
3279 
3281  scope_jobs, scope,
3283 }
3284 
3285 
3287  bool verbose) const
3288 {
3289  TNSBitVector scope_jobs;
3290  string scope = client.GetScope();
3292 
3293  if (scope == kNoScopeOnly)
3294  scope_jobs = m_ScopeRegistry.GetAllJobsInScopes();
3295  else if (!scope.empty())
3296  scope_jobs = m_ScopeRegistry.GetJobs(scope);
3297 
3298  return m_GroupRegistry.Print(this, scope_jobs, scope,
3300 }
3301 
3302 
3304 {
3306  return m_ScopeRegistry.Print(this, 100, verbose);
3307 }
3308 
3309 
3311 {
3312  if (!m_RunTimeLine)
3313  return;
3314 
3315  CNSPreciseTime queue_run_timeout = GetRunTimeout();
3316  CNSPreciseTime queue_read_timeout = GetReadTimeout();
3318  TNSBitVector bv;
3319  {{
3321  m_RunTimeLine->ExtractObjects(curr.Sec(), &bv);
3322  }}
3323 
3325  for ( ;en.valid(); ++en) {
3326  x_CheckExecutionTimeout(queue_run_timeout, queue_read_timeout,
3327  *en, curr, logging);
3328  }
3329 }
3330 
3331 
3332 void CQueue::x_CheckExecutionTimeout(const CNSPreciseTime & queue_run_timeout,
3333  const CNSPreciseTime & queue_read_timeout,
3334  unsigned int job_id,
3335  const CNSPreciseTime & curr_time,
3336  bool logging)
3337 {
3338  CNSPreciseTime time_start = kTimeZero;
3339  CNSPreciseTime run_timeout = kTimeZero;
3340  CNSPreciseTime read_timeout = kTimeZero;
3341  CNSPreciseTime exp_time = kTimeZero;
3342  TJobStatus status;
3343  TJobStatus new_status;
3344  CJobEvent::EJobEvent event_type;
3346 
3347  {{
3349 
3350  status = GetJobStatus(job_id);
3351  if (status == CNetScheduleAPI::eRunning) {
3352  new_status = CNetScheduleAPI::ePending;
3353  event_type = CJobEvent::eTimeout;
3354  } else if (status == CNetScheduleAPI::eReading) {
3355  new_status = CNetScheduleAPI::eDone;
3356  event_type = CJobEvent::eReadTimeout;
3357  } else
3358  return; // Execution timeout is for Running and Reading jobs only
3359 
3360  job_iter = m_Jobs.find(job_id);
3361  if (job_iter == m_Jobs.end())
3362  return;
3363 
3364  CJobEvent * event = job_iter->second.GetLastEvent();
3365  time_start = event->GetTimestamp();
3366  run_timeout = job_iter->second.GetRunTimeout();
3367  if (run_timeout == kTimeZero)
3368  run_timeout = queue_run_timeout;
3369 
3370  if (status == CNetScheduleAPI::eRunning &&
3371  run_timeout == kTimeZero)
3372  // 0 timeout means the job never fails
3373  return;
3374 
3375  read_timeout = job_iter->second.GetReadTimeout();
3376  if (read_timeout == kTimeZero)
3377  read_timeout = queue_read_timeout;
3378 
3379  if (status == CNetScheduleAPI::eReading &&
3380  read_timeout == kTimeZero)
3381  // 0 timeout means the job never fails
3382  return;
3383 
3384  // Calculate the expiration time
3385  if (status == CNetScheduleAPI::eRunning)
3386  exp_time = time_start + run_timeout;
3387  else
3388  exp_time = time_start + read_timeout;
3389 
3390  if (curr_time < exp_time) {
3391  // we need to register job in time line
3392  TimeLineAdd(job_id, exp_time);
3393  return;
3394  }
3395 
3396  // The job timeout (running or reading) is expired.
3397  // Check the try counter, we may need to fail the job.
3398  if (status == CNetScheduleAPI::eRunning) {
3399  // Running state
3400  if (job_iter->second.GetRunCount() > m_FailedRetries)
3401  new_status = CNetScheduleAPI::eFailed;
3402  } else {
3403  // Reading state
3404  if (job_iter->second.GetReadCount() > m_ReadFailedRetries)
3405  new_status = CNetScheduleAPI::eReadFailed;
3406  else
3407  new_status = job_iter->second.GetStatusBeforeReading();
3408  m_ReadJobs.set_bit(job_id, false);
3409  ++m_ReadJobsOps;
3410  }
3411 
3412  job_iter->second.SetStatus(new_status);
3413  job_iter->second.SetLastTouch(curr_time);
3414 
3415  event = &job_iter->second.AppendEvent();
3416  event->SetStatus(new_status);
3417  event->SetEvent(event_type);
3418  event->SetTimestamp(curr_time);
3419 
3420  m_StatusTracker.SetStatus(job_id, new_status);
3422  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
3423  m_ReadTimeout,
3425  curr_time));
3426 
3427  if (status == CNetScheduleAPI::eRunning) {
3428  if (new_status == CNetScheduleAPI::ePending) {
3429  // Timeout and reschedule, put to blacklist as well
3433  new_status,
3435  } else {
3441  }
3442  } else {
3443  if (new_status == CNetScheduleAPI::eReadFailed) {
3447  new_status,
3449  } else {
3450  // The target status could be Done, Failed, Canceled.
3451  // The job could be read again by another reader.
3455  new_status,
3457  }
3458  }
3459  g_DoPerfLogging(*this, job_iter->second, 200);
3460 
3461  if (new_status == CNetScheduleAPI::ePending &&
3464  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
3467 
3468  if (new_status == CNetScheduleAPI::eDone ||
3469  new_status == CNetScheduleAPI::eFailed ||
3470  new_status == CNetScheduleAPI::eCanceled)
3471  if (!m_ReadJobs.get_bit(job_id)) {
3472  m_GCRegistry.UpdateReadVacantTime(job_id, curr_time);
3474  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
3477  }
3478  }}
3479 
3480  x_NotifyJobChanges(job_iter->second, MakeJobKey(job_id),
3481  eStatusChanged, curr_time);
3482 
3483  if (logging) {
3484  string purpose;
3485  if (status == CNetScheduleAPI::eRunning)
3486  purpose = "execution";
3487  else
3488  purpose = "reading";
3489 
3490  GetDiagContext().Extra()
3491  .Print("msg", "Timeout expired, rescheduled for " + purpose)
3492  .Print("msg_code", "410") // The code is for
3493  // searching in applog
3494  .Print("job_key", MakeJobKey(job_id))
3495  .Print("queue", m_QueueName)
3496  .Print("run_counter", job_iter->second.GetRunCount())
3497  .Print("read_counter", job_iter->second.GetReadCount())
3498  .Print("time_start", NS_FormatPreciseTime(time_start))
3499  .Print("exp_time", NS_FormatPreciseTime(exp_time))
3500  .Print("run_timeout", run_timeout)
3501  .Print("read_timeout", read_timeout);
3502  }
3503 }
3504 
3505 
3506 // Checks up to given # of jobs at the given status for expiration and
3507 // marks up to given # of jobs for deletion.
3508 // Returns the # of performed scans, the # of jobs marked for deletion and
3509 // the last scanned job id.
3513  unsigned int last_job,
3514  TJobStatus status)
3515 {
3516  TNSBitVector job_ids;
3518  unsigned int job_id;
3519  unsigned int aff;
3520  unsigned int group;
3521 
3522  result.job_id = attributes.job_id;
3523  result.deleted = 0;
3524  {{
3526 
3527  for (result.scans = 0;
3528  result.scans < attributes.scans; ++result.scans) {
3529  job_id = m_StatusTracker.GetNext(status, result.job_id);
3530  if (job_id == 0)
3531  break; // No more jobs in the state
3532  if (last_job != 0 && job_id >= last_job)
3533  break; // The job in the state is above the limit
3534 
3535  result.job_id = job_id;
3536 
3537  if (m_GCRegistry.DeleteIfTimedOut(job_id, current_time,
3538  &aff, &group))
3539  {
3540  // The job is expired and needs to be marked for deletion
3541  m_StatusTracker.Erase(job_id);
3542  job_ids.set_bit(job_id);
3543  ++result.deleted;
3544 
3545  // check if the affinity should also be updated
3546  if (aff != 0)
3548 
3549  // Check if the group registry should also be updated
3550  if (group != 0)
3551  m_GroupRegistry.RemoveJob(group, job_id);
3552 
3553  // Remove the job from the scope registry if so
3554  m_ScopeRegistry.RemoveJob(job_id);
3555 
3556  if (result.deleted >= attributes.deleted)
3557  break;
3558  }
3559  }
3560  }}
3561 
3562  if (result.deleted > 0) {
3563  TNSBitVector::enumerator en(job_ids.first());
3565 
3566  for (; en.valid(); ++en) {
3567  unsigned int id = *en;
3568  auto job_iter = m_Jobs.find(id);
3569 
3570  // if the job is deleted from the pending state then a performance
3571  // record should be produced. A job full information is also
3572  // required if the listener expects job notifications.
3573  if (job_iter != m_Jobs.end()) {
3574  x_NotifyJobChanges(job_iter->second, MakeJobKey(id),
3575  eJobDeleted, current_time);
3576  if (status == CNetScheduleAPI::ePending) {
3577  g_DoErasePerfLogging(*this, job_iter->second);
3578  }
3579  }
3580  }
3581 
3582  if (!m_StatusTracker.AnyPending())
3584  }
3585 
3586  x_Erase(job_ids, status);
3587  return result;
3588 }
3589 
3590 
3591 void CQueue::TimeLineMove(unsigned int job_id,
3592  const CNSPreciseTime & old_time,
3593  const CNSPreciseTime & new_time)
3594 {
3595  if (!job_id || !m_RunTimeLine)
3596  return;
3597 
3599  m_RunTimeLine->MoveObject(old_time.Sec(), new_time.Sec(), job_id);
3600 }
3601 
3602 
3603 void CQueue::TimeLineAdd(unsigned int job_id,
3604  const CNSPreciseTime & job_time)
3605 {
3606  if (!job_id || !m_RunTimeLine || !job_time)
3607  return;
3608 
3610  m_RunTimeLine->AddObject(job_time.Sec(), job_id);
3611 }
3612 
3613 
3614 void CQueue::TimeLineRemove(unsigned int job_id)
3615 {
3616  if (!m_RunTimeLine)
3617  return;
3618 
3620  m_RunTimeLine->RemoveObject(job_id);
3621 }
3622 
3623 
3624 void CQueue::TimeLineExchange(unsigned int remove_job_id,
3625  unsigned int add_job_id,
3626  const CNSPreciseTime & new_time)
3627 {
3628  if (!m_RunTimeLine)
3629  return;
3630 
3632  if (remove_job_id)
3633  m_RunTimeLine->RemoveObject(remove_job_id);
3634  if (add_job_id)
3635  m_RunTimeLine->AddObject(new_time.Sec(), add_job_id);
3636 }
3637 
3638 
3639 unsigned int CQueue::DeleteBatch(unsigned int max_deleted)
3640 {
3641  // Copy the vector with deleted jobs
3642  TNSBitVector jobs_to_delete;
3643 
3644  {{
3646  jobs_to_delete = m_JobsToDelete;
3647  }}
3648 
3649  static const size_t chunk_size = 100;
3650  unsigned int del_rec = 0;
3651  TNSBitVector::enumerator en = jobs_to_delete.first();
3652  TNSBitVector deleted_jobs;
3653 
3654  while (en.valid() && del_rec < max_deleted) {
3655  {{
3657 
3658  for (size_t n = 0;
3659  en.valid() && n < chunk_size && del_rec < max_deleted;
3660  ++en, ++n) {
3661  unsigned int job_id = *en;
3662  size_t del_count = m_Jobs.erase(job_id);
3663 
3664  if (del_count > 0) {
3665  ++del_rec;
3666  deleted_jobs.set_bit(job_id);
3667  }
3668 
3669  // The job might be the one which was given for reading
3670  // so the garbage should be collected
3671  m_ReadJobs.set_bit(job_id, false);
3672  ++m_ReadJobsOps;
3673  }
3674  }}
3675  }
3676 
3677  if (del_rec > 0) {
3679 
3680  {{
3681  TNSBitVector::enumerator en = deleted_jobs.first();
3683  for (; en.valid(); ++en) {
3684  m_JobsToDelete.set_bit(*en, false);
3686  }
3687 
3688  if (m_JobsToDeleteOps >= 1000000) {
3689  m_JobsToDeleteOps = 0;
3691  }
3692  }}
3693 
3695  if (m_ReadJobsOps >= 1000000) {
3696  m_ReadJobsOps = 0;
3698  }
3699  }
3700  return del_rec;
3701 }
3702 
3703 
3704 // See CXX-2838 for the description of how affinities garbage collection is
3705 // going to work.
3706 unsigned int CQueue::PurgeAffinities(void)
3707 {
3708  unsigned int aff_dict_size = m_AffinityRegistry.size();
3709  SNSRegistryParameters aff_reg_settings =
3711 
3712  if (aff_dict_size < (aff_reg_settings.low_mark_percentage / 100.0) *
3713  aff_reg_settings.max_records)
3714  // Did not reach the dictionary low mark
3715  return 0;
3716 
3717  unsigned int del_limit = aff_reg_settings.high_removal;
3718  if (aff_dict_size <
3719  (aff_reg_settings.high_mark_percentage / 100.0) *
3720  aff_reg_settings.max_records) {
3721  // Here: check the percentage of the affinities that have no references
3722  // to them
3723  unsigned int candidates_size =
3725 
3726  if (candidates_size <
3727  (aff_reg_settings.dirt_percentage / 100.0) *
3728  aff_reg_settings.max_records)
3729  // The number of candidates to be deleted is low
3730  return 0;
3731 
3732  del_limit = aff_reg_settings.low_removal;
3733  }
3734 
3735 
3736  // Here: need to delete affinities from the memory
3737  return m_AffinityRegistry.CollectGarbage(del_limit);
3738 }
3739 
3740 
3741 // See CQueue::PurgeAffinities - this one works similar
3742 unsigned int CQueue::PurgeGroups(void)
3743 {
3744  unsigned int group_dict_size = m_GroupRegistry.size();
3745  SNSRegistryParameters group_reg_settings =
3747 
3748  if (group_dict_size < (group_reg_settings.low_mark_percentage / 100.0) *
3749  group_reg_settings.max_records)
3750  // Did not reach the dictionary low mark
3751  return 0;
3752 
3753  unsigned int del_limit = group_reg_settings.high_removal;
3754  if (group_dict_size <
3755  (group_reg_settings.high_mark_percentage / 100.0) *
3756  group_reg_settings.max_records) {
3757  // Here: check the percentage of the groups that have no references
3758  // to them
3759  unsigned int candidates_size =
3761 
3762  if (candidates_size <
3763  (group_reg_settings.dirt_percentage / 100.0) *
3764  group_reg_settings.max_records)
3765  // The number of candidates to be deleted is low
3766  return 0;
3767 
3768  del_limit = group_reg_settings.low_removal;
3769  }
3770 
3771  // Here: need to delete groups from the memory
3772  return m_GroupRegistry.CollectGarbage(del_limit);
3773 }
3774 
3775 
3776 void CQueue::StaleNodes(const CNSPreciseTime & current_time)
3777 {
3778  // Clears the worker nodes affinities if the workers are inactive for
3779  // the configured timeout
3781  m_ClientsRegistry.StaleNodes(current_time,
3783 }
3784 
3785 
3787 {
3790 }
3791 
3792 
3794 {
3796  m_ClientsRegistry.Purge(current_time,
3807 }
3808 
3809 
3811  unsigned int job_id,
3812  TDumpFields dump_fields)
3813 {
3815 
3816  string job_dump;
3817 
3818  // Check first that the job has not been deleted yet
3819  {{
3821  if (m_JobsToDelete.get_bit(job_id))
3822  return job_dump;
3823  }}
3824 
3825  string scope = client.GetScope();
3826  {{
3828 
3829  // Check the scope restrictions
3830  if (scope == kNoScopeOnly) {
3831  if (m_ScopeRegistry.GetAllJobsInScopes()[job_id] == true)
3832  return job_dump;
3833  } else if (!scope.empty()) {
3834  if (m_ScopeRegistry.GetJobs(scope)[job_id] == false)
3835  return job_dump;
3836  }
3837 
3838  auto job_iter = m_Jobs.find(job_id);
3839  if (job_iter == m_Jobs.end())
3840  return job_dump;
3841 
3842  job_dump.reserve(2048);
3843  try {
3844  // GC can remove the job from its registry while the
3845  // DUMP is in process. If so the job should not be dumped
3846  // and the exception from m_GCRegistry.GetLifetime() should
3847  // be suppressed.
3848  job_dump = job_iter->second.Print(dump_fields,
3849  *this, m_AffinityRegistry,
3850  m_GroupRegistry);
3851  if (dump_fields & eGCEraseTime)
3852  job_dump.append("OK:GC erase time: ")
3854  .append(kNewLine);
3855  if (dump_fields & eScope)
3856  job_dump.append("OK:scope: '")
3857  .append(m_ScopeRegistry.GetJobScope(job_id))
3858  .append(1, '\'')
3859  .append(kNewLine);
3860  } catch (...) {}
3861 
3862  return job_dump;
3863  }}
3864 }
3865 
3866 
3868  const string & group,
3869  const string & aff_token,
3870  const vector<TJobStatus> & job_statuses,
3871  unsigned int start_after_job_id,
3872  unsigned int count,
3873  bool order_first,
3874  TDumpFields dump_fields,
3875  bool logging)
3876 {
3878 
3879  // Form a bit vector of all jobs to dump
3880  vector<TJobStatus> statuses;
3881  TNSBitVector jobs_to_dump;
3882 
3883  if (job_statuses.empty()) {
3884  // All statuses
3885  statuses.push_back(CNetScheduleAPI::ePending);
3886  statuses.push_back(CNetScheduleAPI::eRunning);
3887  statuses.push_back(CNetScheduleAPI::eCanceled);
3888  statuses.push_back(CNetScheduleAPI::eFailed);
3889  statuses.push_back(CNetScheduleAPI::eDone);
3890  statuses.push_back(CNetScheduleAPI::eReading);
3891  statuses.push_back(CNetScheduleAPI::eConfirmed);
3892  statuses.push_back(CNetScheduleAPI::eReadFailed);
3893  }
3894  else {
3895  // The user specified statuses explicitly
3896  // The list validity is checked by the caller.
3897  statuses = job_statuses;
3898  }
3899 
3900 
3901  {{
3902  string scope = client.GetScope();
3904  m_StatusTracker.GetJobs(statuses, jobs_to_dump);
3905 
3906  // Check if a certain group has been specified
3907  if (!group.empty()) {
3908  try {
3909  jobs_to_dump &= m_GroupRegistry.GetJobs(group);
3910  } catch (...) {
3911  jobs_to_dump.clear();
3912  if (logging)
3913  ERR_POST(Warning << "Job group '" + group +
3914  "' is not found. No jobs to dump.");
3915  }
3916  }
3917 
3918  if (!aff_token.empty()) {
3919  unsigned int aff_id = m_AffinityRegistry.GetIDByToken(aff_token);
3920  if (aff_id == 0) {
3921  jobs_to_dump.clear();
3922  if (logging)
3923  ERR_POST(Warning << "Affinity '" + aff_token +
3924  "' is not found. No jobs to dump.");
3925  } else
3926  jobs_to_dump &= m_AffinityRegistry.GetJobsWithAffinity(aff_id);
3927  }
3928 
3929  // Apply the scope limits
3930  if (scope == kNoScopeOnly) {
3931  jobs_to_dump -= m_ScopeRegistry.GetAllJobsInScopes();
3932  } else if (!scope.empty()) {
3933  // This is a specific scope
3934  jobs_to_dump &= m_ScopeRegistry.GetJobs(scope);
3935  }
3936  }}
3937 
3938  return x_DumpJobs(jobs_to_dump, start_after_job_id, count,
3939  dump_fields, order_first);
3940 }
3941 
3942 
3943 string CQueue::x_DumpJobs(const TNSBitVector & jobs_to_dump,
3944  unsigned int start_after_job_id,
3945  unsigned int count,
3946  TDumpFields dump_fields,
3947  bool order_first)
3948 {
3949  if (!jobs_to_dump.any())
3950  return kEmptyStr;
3951 
3952  // Skip the jobs which should not be dumped
3953  size_t skipped_jobs = 0;
3954  TNSBitVector::enumerator en(jobs_to_dump.first());
3955  while (en.valid() && *en <= start_after_job_id) {
3956  ++en;
3957  ++skipped_jobs;
3958  }
3959 
3960  if (count > 0 && !order_first) {
3961  size_t total_jobs = jobs_to_dump.count();
3962  size_t jobs_left = total_jobs - skipped_jobs;
3963  while (jobs_left > count) {
3964  ++en;
3965  --jobs_left;
3966  }
3967  }
3968 
3969  // Identify the required buffer size for jobs
3970  size_t buffer_size = m_DumpBufferSize;
3971  if (count != 0 && count < buffer_size)
3972  buffer_size = count;
3973 
3974  string result;
3975  result.reserve(2048*buffer_size);
3976 
3977  {{
3978  vector<CJob> buffer(buffer_size);
3979  size_t read_jobs = 0;
3980  size_t printed_count = 0;
3981 
3982  for ( ; en.valid(); ) {
3983  {{
3985 
3986  for ( ; en.valid() && read_jobs < buffer_size; ++en ) {
3987  auto job_iter = m_Jobs.find(*en);
3988  if (job_iter != m_Jobs.end()) {
3989  buffer[read_jobs] = job_iter->second;
3990  ++read_jobs;
3991  ++printed_count;
3992 
3993  if (count != 0)
3994  if (printed_count >= count)
3995  break;
3996  }
3997  }
3998  }}
3999 
4000  // Print what was read
4001  string one_job;
4002  one_job.reserve(2048);
4003  for (size_t index = 0; index < read_jobs; ++index) {
4004  one_job.clear();
4005  try {
4006  // GC can remove the job from its registry while the
4007  // DUMP is in process. If so the job should not be dumped
4008  // and the exception from m_GCRegistry.GetLifetime() should
4009  // be suppressed.
4010  unsigned int job_id = buffer[index].GetId();
4011  one_job.append(kNewLine)
4012  .append(buffer[index].Print(dump_fields,
4013  *this,
4015  m_GroupRegistry));
4016  if (dump_fields & eGCEraseTime)
4017  one_job.append("OK:GC erase time: ")
4018  .append(NS_FormatPreciseTime(
4019  m_GCRegistry.GetLifetime(job_id)))
4020  .append(kNewLine);
4021  if (dump_fields & eScope)
4022  one_job.append("OK:scope: '")
4023  .append(m_ScopeRegistry.GetJobScope(job_id))
4024  .append(1, '\'')
4025  .append(kNewLine);
4026  result.append(one_job);
4027  } catch (...) {}
4028  }
4029 
4030  if (count != 0)
4031  if (printed_count >= count)
4032  break;
4033 
4034  read_jobs = 0;
4035  }
4036  }}
4037 
4038  return result;
4039 }
4040 
4041 
4043 {
4044  return m_StatusTracker.CountStatus(st);
4045 }
4046 
4047 
4050 {
4052 }
4053 
4054 
4055 string CQueue::MakeJobKey(unsigned int job_id) const
4056 {
4057  if (m_ScrambleJobKeys)
4058  return m_KeyGenerator.GenerateCompoundID(job_id,
4060  return m_KeyGenerator.Generate(job_id);
4061 }
4062 
4063 
4065  bool & client_was_found,
4066  bool & session_was_reset,
4067  string & old_session,
4068  bool & had_wn_pref_affs,
4069  bool & had_reader_pref_affs)
4070 {
4071  TNSBitVector running_jobs;
4072  TNSBitVector reading_jobs;
4073 
4074  {{
4075  // The client registry may need to make changes in the notification
4076  // registry, i.e. two mutexes are to be locked. The other threads may
4077  // visit notifications first and then a client registry i.e. the very
4078  // same mutexes are locked in a reverse order.
4079  // To prevent it the operation lock is locked here.
4081  m_ClientsRegistry.Touch(client, running_jobs, reading_jobs,
4082  client_was_found, session_was_reset,
4083  old_session, had_wn_pref_affs,
4084  had_reader_pref_affs);
4085  guard.Release();
4086  }}
4087 
4088  if (session_was_reset) {
4089  if (running_jobs.any())
4090  x_ResetRunningDueToNewSession(client, running_jobs);
4091  if (reading_jobs.any())
4092  x_ResetReadingDueToNewSession(client, reading_jobs);
4093  }
4094 }
4095 
4096 
4098 {
4100 }
4101 
4102 
4104 {
4107 }
4108 
4109 
4111 {
4112  // Memorize the last client scope
4115 }
4116 
4117 
4118 // Moves the job to Pending/Failed or to Done/ReadFailed
4119 // when event event_type has come
4121  unsigned int job_id,
4122  const CNSPreciseTime & current_time,
4123  TJobStatus status_from,
4124  CJobEvent::EJobEvent event_type)
4125 {
4126  TJobStatus new_status;
4128  auto job_iter = m_Jobs.find(job_id);
4129 
4130  if (job_iter == m_Jobs.end()) {
4131  ERR_POST("Cannot fetch job to reset it due to " <<
4132  CJobEvent::EventToString(event_type) <<
4133  ". Job: " << DecorateJob(job_id));
4135  }
4136 
4137  if (status_from == CNetScheduleAPI::eRunning) {
4138  // The job was running
4139  if (job_iter->second.GetRunCount() > m_FailedRetries)
4140  new_status = CNetScheduleAPI::eFailed;
4141  else
4142  new_status = CNetScheduleAPI::ePending;
4143  } else {
4144  // The job was reading
4145  if (job_iter->second.GetReadCount() > m_ReadFailedRetries)
4146  new_status = CNetScheduleAPI::eReadFailed;
4147  else
4148  new_status = job_iter->second.GetStatusBeforeReading();
4149  m_ReadJobs.set_bit(job_id, false);
4150  ++m_ReadJobsOps;
4151  }
4152 
4153  job_iter->second.SetStatus(new_status);
4154  job_iter->second.SetLastTouch(current_time);
4155 
4156  CJobEvent * event = &job_iter->second.AppendEvent();
4157  event->SetStatus(new_status);
4158  event->SetEvent(event_type);
4159  event->SetTimestamp(current_time);
4160  event->SetClientNode(client.GetNode());
4161  event->SetClientSession(client.GetSession());
4162 
4163  // Update the memory map
4164  m_StatusTracker.SetStatus(job_id, new_status);
4165 
4166  // Count the transition and do a performance logging
4167  if (event_type == CJobEvent::eClear)
4168  m_StatisticsCounters.CountTransition(status_from, new_status,
4170  else
4171  // It is a new session case
4172  m_StatisticsCounters.CountTransition(status_from, new_status,
4174  g_DoPerfLogging(*this, job_iter->second, 200);
4175 
4177  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
4178  m_ReadTimeout,
4180  current_time));
4181 
4182  // remove the job from the time line
4183  TimeLineRemove(job_id);
4184 
4185  // Notify those who wait for the jobs if needed
4186  if (new_status == CNetScheduleAPI::ePending &&
4188  m_NotificationsList.Notify(job_id, job_iter->second.GetAffinityId(),
4192  eGet);
4193  // Notify readers if they wait for jobs
4194  if (new_status == CNetScheduleAPI::eDone ||
4195  new_status == CNetScheduleAPI::eFailed ||
4196  new_status == CNetScheduleAPI::eCanceled)
4197  if (!m_ReadJobs.get_bit(job_id)) {
4198  m_GCRegistry.UpdateReadVacantTime(job_id, current_time);
4199  m_NotificationsList.Notify(job_id, job_iter->second.GetAffinityId(),
4203  eRead);
4204  }
4205 
4206  x_NotifyJobChanges(job_iter->second, MakeJobKey(job_id),
4207  eStatusChanged, current_time);
4208  return new_status;
4209 }
4210 
4211 
4213  const TNSBitVector & jobs)
4214 {
4215  CNSPreciseTime current_time = CNSPreciseTime::Current();
4216  for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) {
4217  try {
4218  x_ResetDueTo(client, *en, current_time,
4220  } catch (...) {
4221  ERR_POST("Error resetting a running job when worker node is "
4222  "cleared. Job: " << DecorateJob(*en));
4223  }
4224  }
4225 }
4226 
4227 
4229  const TNSBitVector & jobs)
4230 {
4231  CNSPreciseTime current_time = CNSPreciseTime::Current();
4232  for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) {
4233  try {
4234  x_ResetDueTo(client, *en, current_time,
4236  } catch (...) {
4237  ERR_POST("Error resetting a reading job when worker node is "
4238  "cleared. Job: " << DecorateJob(*en));
4239  }
4240  }
4241 }
4242 
4243 
4245  const TNSBitVector & jobs)
4246 {
4247  CNSPreciseTime current_time = CNSPreciseTime::Current();
4248  for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) {
4249  try {
4250  x_ResetDueTo(client, *en, current_time,
4252  } catch (...) {
4253  ERR_POST("Error resetting a running job when worker node "
4254  "changed session. Job: " << DecorateJob(*en));
4255  }
4256  }
4257 }
4258 
4259 
4261  const TNSBitVector & jobs)
4262 {
4263  CNSPreciseTime current_time = CNSPreciseTime::Current();
4264  for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) {
4265  try {
4266  x_ResetDueTo(client, *en, current_time,
4268  } catch (...) {
4269  ERR_POST("Error resetting a reading job when worker node "
4270  "changed session. Job: " << DecorateJob(*en));
4271  }
4272  }
4273 }
4274 
4275 
4277  unsigned short port,
4278  unsigned int timeout,
4279  const TNSBitVector & aff_ids,
4280  bool wnode_aff,
4281  bool any_aff,
4282  bool exclusive_new_affinity,
4283  bool new_format,
4284  const TNSBitVector & group_ids)
4285 {
4286  // Add to the notification list and save the wait port
4288  wnode_aff, any_aff,
4289  exclusive_new_affinity, new_format,
4290  group_ids, eGet);
4291  if (client.IsComplete())
4293  aff_ids, eGet);
4294  return;
4295 }
4296 
4297 
4298 void
4300  unsigned short port,
4301  unsigned int timeout,
4302  const TNSBitVector & aff_ids,
4303  bool reader_aff,
4304  bool any_aff,
4305  bool exclusive_new_affinity,
4306  const TNSBitVector & group_ids)
4307 {
4308  // Add to the notification list and save the wait port
4310  reader_aff, any_aff,
4311  exclusive_new_affinity, true,
4312  group_ids, eRead);
4313  m_ClientsRegistry.SetNodeWaiting(client, port, aff_ids, eRead);
4314 }
4315 
4316 
4318  unsigned short port)
4319 {
4320  if (client.IsComplete())
4322 
4323  if (port > 0) {
4325  return true;
4326  }
4327  return false;
4328 }
4329 
4330 
4331 void CQueue::PrintStatistics(size_t & aff_count) const
4332 {
4333  CStatisticsCounters counters_copy = m_StatisticsCounters;
4334 
4335  // Do not print the server wide statistics the very first time
4337 
4338  if (double(m_StatisticsCountersLastPrintedTimestamp) == 0.0) {
4339  m_StatisticsCountersLastPrinted = counters_copy;
4341  return;
4342  }
4343 
4344  // Calculate the delta since the last time
4346 
4348  ctx.Reset(new CRequestContext());
4349  ctx->SetRequestID();
4350 
4351 
4352  CDiagContext & diag_context = GetDiagContext();
4353 
4354  diag_context.SetRequestContext(ctx);
4355  CDiagContext_Extra extra = diag_context.PrintRequestStart();
4356 
4357  size_t affinities = m_AffinityRegistry.size();
4358  aff_count += affinities;
4359 
4360  // The member is called only if there is a request context
4361  extra.Print("_type", "statistics_thread")
4362  .Print("_queue", GetQueueName())
4363  .Print("time_interval", NS_FormatPreciseTimeAsSec(delta))
4364  .Print("affinities", affinities)
4373  counters_copy.PrintTransitions(extra);
4374  counters_copy.PrintDelta(extra, m_StatisticsCountersLastPrinted);
4375  extra.Flush();
4376 
4377  ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK);
4378  diag_context.PrintRequestStop();
4379  ctx.Reset();
4380  diag_context.SetRequestContext(NULL);
4381 
4382  m_StatisticsCountersLastPrinted = counters_copy;
4384 }
4385 
4386 
4388 {
4389  vector<TJobStatus> statuses;
4390  statuses.push_back(CNetScheduleAPI::ePending);
4391  statuses.push_back(CNetScheduleAPI::eRunning);
4392  statuses.push_back(CNetScheduleAPI::eCanceled);
4393  statuses.push_back(CNetScheduleAPI::eFailed);
4394  statuses.push_back(CNetScheduleAPI::eDone);
4395  statuses.push_back(CNetScheduleAPI::eReading);
4396  statuses.push_back(CNetScheduleAPI::eConfirmed);
4397  statuses.push_back(CNetScheduleAPI::eReadFailed);
4398 
4399  vector<unsigned int> counters = m_StatusTracker.GetJobCounters(statuses);
4400  g_DoPerfLogging(*this, statuses, counters);
4401 }
4402 
4403 
4404 unsigned int CQueue::GetJobsToDeleteCount(void) const
4405 {
4407  return m_JobsToDelete.count();
4408 }
4409 
4410 
4412 {
4413  string output;
4414  output.reserve(4096);
4416  .append("OK:garbage_jobs: ")
4417  .append(to_string(GetJobsToDeleteCount()))
4418  .append(kNewLine)
4419  .append("OK:affinity_registry_size: ")
4420  .append(to_string(m_AffinityRegistry.size()))
4421  .append(kNewLine)
4422  .append("OK:client_registry_size: ")
4423  .append(to_string(m_ClientsRegistry.size()))
4424  .append(kNewLine);
4425  return output;
4426 }
4427 
4428 
4430  const string & group_token,
4431  const string & aff_token,
4432  size_t * jobs,
4433  vector<string> & warnings) const
4434 {
4435  TNSBitVector group_jobs;
4436  TNSBitVector aff_jobs;
4438 
4439  if (!group_token.empty()) {
4440  try {
4441  group_jobs = m_GroupRegistry.GetJobs(group_token);
4442  } catch (...) {
4443  warnings.push_back("eGroupNotFound:job group " + group_token +
4444  " is not found");
4445  }
4446  }
4447  if (!aff_token.empty()) {
4448  unsigned int aff_id = m_AffinityRegistry.GetIDByToken(aff_token);
4449  if (aff_id == 0)
4450  warnings.push_back("eAffinityNotFound:affinity " + aff_token +
4451  " is not found");
4452  else
4453  aff_jobs = m_AffinityRegistry.GetJobsWithAffinity(aff_id);
4454  }
4455 
4456  if (!warnings.empty())
4457  return;
4458 
4459 
4460  string scope = client.GetScope();
4461  TNSBitVector candidates;
4462  for (size_t index(0); index < g_ValidJobStatusesSize; ++index) {
4463  candidates.clear();
4464  m_StatusTracker.GetJobs(g_ValidJobStatuses[index], candidates);
4465 
4466  if (!group_token.empty())
4467  candidates &= group_jobs;
4468  if (!aff_token.empty())
4469  candidates &= aff_jobs;
4470 
4471  // Apply the scope limitations. Empty scope means that all the jobs
4472  // must be provided
4473  if (scope == kNoScopeOnly) {
4474  // Exclude all scoped jobs
4475  candidates -= m_ScopeRegistry.GetAllJobsInScopes();
4476  } else if (!scope.empty()) {
4477  // Specific scope
4478  candidates &= m_ScopeRegistry.GetJobs(scope);
4479  }
4480 
4481  jobs[index] = candidates.count();
4482  }
4483 }
4484 
4485 
4487  const string & group_token,
4488  const string & aff_token,
4489  vector<string> & warnings) const
4490 {
4491  size_t total = 0;
4492  string result;
4493  size_t jobs_per_state[g_ValidJobStatusesSize];
4494 
4495  GetJobsPerState(client, group_token, aff_token, jobs_per_state, warnings);
4496 
4497  // Warnings could be about non existing affinity or group. If so there are
4498  // no jobs to be printed.
4499  if (warnings.empty()) {
4500  for (size_t index(0); index < g_ValidJobStatusesSize; ++index) {
4501  result += "OK:" +
4503  ": " + to_string(jobs_per_state[index]) + "\n";
4504  total += jobs_per_state[index];
4505  }
4506  result += "OK:Total: " + to_string(total) + "\n";
4507  }
4508  return result;
4509 }
4510 
4511 
4512 unsigned int CQueue::CountActiveJobs(void) const
4513 {
4514  vector<CNetScheduleAPI::EJobStatus> statuses;
4515 
4516  statuses.push_back(CNetScheduleAPI::ePending);
4517  statuses.push_back(CNetScheduleAPI::eRunning);
4518  return m_StatusTracker.CountStatus(statuses);
4519 }
4520 
4521 
4523 {
4525 
4526  bool need_notifications = (status == eNoPause &&
4527  m_PauseStatus != eNoPause);
4528 
4529  m_PauseStatus = status;
4530  if (need_notifications)
4532 
4534 }
4535 
4536 
4538  unsigned short port,
4539  bool new_format)
4540 {
4542  new_format);
4543 }
4544 
4545 
4547  const string & auth_token,
4548  const CNSPreciseTime & curr,
4549  int ret_code,
4550  const string & output,
4551  CJob & job,
4552  const CNSClientId & client)
4553 {
4554  auto job_iter = m_Jobs.find(job_id);
4555  if (job_iter == m_Jobs.end())
4556  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
4557 
4558  if (!auth_token.empty()) {
4559  // Need to check authorization token first
4560  CJob::EAuthTokenCompareResult token_compare_result =
4561  job_iter->second.CompareAuthToken(auth_token);
4562  if (token_compare_result == CJob::eInvalidTokenFormat)
4563  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
4564  "Invalid authorization token format");
4565  if (token_compare_result == CJob::eNoMatch)
4566  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
4567  "Authorization token does not match");
4568  if (token_compare_result == CJob::ePassportOnlyMatch) {
4569  // That means that the job has been executing by another worker
4570  // node at the moment, but we can accept the results anyway
4571  ERR_POST(Warning << "Received PUT2 with only "
4572  "passport matched.");
4573  }
4574  // Here: the authorization token is OK, we can continue
4575  }
4576 
4577  // Append the event
4578  CJobEvent * event = &job_iter->second.AppendEvent();
4580  event->SetEvent(CJobEvent::eDone);
4581  event->SetTimestamp(curr);
4582  event->SetRetCode(ret_code);
4583 
4584  event->SetClientNode(client.GetNode());
4585  event->SetClientSession(client.GetSession());
4586  event->SetNodeAddr(client.GetAddress());
4587 
4588  job_iter->second.SetStatus(CNetScheduleAPI::eDone);
4589  job_iter->second.SetOutput(output);
4590  job_iter->second.SetLastTouch(curr);
4591 
4592  job = job_iter->second;
4593 }
4594 
4595 
4596 // If the job.job_id != 0 => the job has been read successfully
4597 // Exception => DB errors
4599  const CNSPreciseTime & curr,
4600  unsigned int job_id,
4601  ECommandGroup cmd_group,
4602  CJob & job)
4603 {
4604  auto job_iter = m_Jobs.find(job_id);
4605  if (job_iter == m_Jobs.end())
4606  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
4607 
4608  CJobEvent & event = job_iter->second.AppendEvent();
4609  event.SetTimestamp(curr);
4610  event.SetNodeAddr(client.GetAddress());
4611  event.SetClientNode(client.GetNode());
4612  event.SetClientSession(client.GetSession());
4613 
4614  if (cmd_group == eGet) {
4615  event.SetStatus(CNetScheduleAPI::eRunning);
4616  event.SetEvent(CJobEvent::eRequest);
4617  } else {
4618  event.SetStatus(CNetScheduleAPI::eReading);
4619  event.SetEvent(CJobEvent::eRead);
4620  }
4621 
4622  job_iter->second.SetLastTouch(curr);
4623  if (cmd_group == eGet) {
4624  job_iter->second.SetStatus(CNetScheduleAPI::eRunning);
4625  job_iter->second.SetRunTimeout(kTimeZero);
4626  job_iter->second.SetRunCount(job_iter->second.GetRunCount() + 1);
4627  } else {
4628  job_iter->second.SetStatus(CNetScheduleAPI::eReading);
4629  job_iter->second.SetReadTimeout(kTimeZero);
4630  job_iter->second.SetReadCount(job_iter->second.GetReadCount() + 1);
4631  }
4632 
4633  job = job_iter->second;
4634 }
4635 
4636 
4637 // Dumps all the jobs into a flat file at the time of shutdown
4638 void CQueue::Dump(const string & dump_dname)
4639 {
4640  // Form a bit vector of all jobs to dump
4641  vector<TJobStatus> statuses;
4642  TNSBitVector jobs_to_dump;
4643 
4644  // All statuses
4645  statuses.push_back(CNetScheduleAPI::ePending);
4646  statuses.push_back(CNetScheduleAPI::eRunning);
4647  statuses.push_back(CNetScheduleAPI::eCanceled);
4648  statuses.push_back(CNetScheduleAPI::eFailed);
4649  statuses.push_back(CNetScheduleAPI::eDone);
4650  statuses.push_back(CNetScheduleAPI::eReading);
4651  statuses.push_back(CNetScheduleAPI::eConfirmed);
4652  statuses.push_back(CNetScheduleAPI::eReadFailed);
4653 
4654  m_StatusTracker.GetJobs(statuses, jobs_to_dump);
4655 
4656  // Exclude all the jobs which belong to a certain scope. There is no
4657  // need to save them
4658  jobs_to_dump -= m_ScopeRegistry.GetAllJobsInScopes();
4659 
4660  if (!jobs_to_dump.any())
4661  return; // Nothing to dump
4662 
4663 
4664  string jobs_file_name = x_GetJobsDumpFileName(dump_dname);
4665  FILE * jobs_file = NULL;
4666 
4667  try {
4668  // Dump the affinity registry
4669  m_AffinityRegistry.Dump(dump_dname, m_QueueName);
4670 
4671  // Dump the group registry
4672  m_GroupRegistry.Dump(dump_dname, m_QueueName);
4673 
4674  jobs_file = fopen(jobs_file_name.c_str(), "wb");
4675  if (jobs_file == NULL)
4676  throw runtime_error("Cannot open file " + jobs_file_name +
4677  " to dump jobs");
4678 
4679  // Disable buffering to detect errors right away
4680  setbuf(jobs_file, NULL);
4681 
4682  // Write a header
4683  SJobDumpHeader header;
4684  header.Write(jobs_file);
4685 
4686  TNSBitVector::enumerator en(jobs_to_dump.first());
4687  for ( ; en.valid(); ++en) {
4688  auto job_iter = m_Jobs.find(*en);
4689  if (job_iter == m_Jobs.end()) {
4690  ERR_POST("Dump at SHUTDOWN: error fetching job " <<
4691  DecorateJob(*en) << ". Skip and continue.");
4692  continue;
4693  }
4694 
4695  job_iter->second.Dump(jobs_file);
4696  }
4697  } catch (const exception & ex) {
4698  if (jobs_file != NULL)
4699  fclose(jobs_file);
4700  RemoveDump(dump_dname);
4701  throw runtime_error("Error dumping queue " + m_QueueName +
4702  ": " + string(ex.what()));
4703  }
4704 
4705  fclose(jobs_file);
4706 }
4707 
4708 
4709 void CQueue::RemoveDump(const string & dump_dname)
4710 {
4712  m_GroupRegistry.RemoveDump(dump_dname, m_QueueName);
4713 
4714  string jobs_file_name = x_GetJobsDumpFileName(dump_dname);
4715 
4716  if (access(jobs_file_name.c_str(), F_OK) != -1)
4717  remove(jobs_file_name.c_str());
4718 }
4719 
4720 
4721 string CQueue::x_GetJobsDumpFileName(const string & dump_dname) const
4722 {
4723  string upper_queue_name = m_QueueName;
4724  NStr::ToUpper(upper_queue_name);
4725  return dump_dname + kJobsFileName + "." + upper_queue_name;
4726 }
4727 
4728 
4729 unsigned int CQueue::LoadFromDump(const string & dump_dname)
4730 {
4731  unsigned int recs = 0;
4732  string jobs_file_name = x_GetJobsDumpFileName(dump_dname);
4733  FILE * jobs_file = NULL;
4734 
4735  if (!CDir(dump_dname).Exists())
4736  return 0;
4737  if (!CFile(jobs_file_name).Exists())
4738  return 0;
4739 
4740  try {
4743 
4744  jobs_file = fopen(jobs_file_name.c_str(), "rb");
4745  if (jobs_file == NULL)
4746  throw runtime_error("Cannot open file " + jobs_file_name +
4747  " to load dumped jobs");
4748 
4749  SJobDumpHeader header;
4750  header.Read(jobs_file);
4751 
4752  CJob job;
4753  AutoArray<char> input_buf(new char[kNetScheduleMaxOverflowSize]);
4754  AutoArray<char> output_buf(new char[kNetScheduleMaxOverflowSize]);
4755  while (job.LoadFromDump(jobs_file,
4756  input_buf.get(), output_buf.get(),
4757  header)) {
4758  unsigned int job_id = job.GetId();
4759  unsigned int group_id = job.GetGroupId();
4760  unsigned int aff_id = job.GetAffinityId();
4761  TJobStatus status = job.GetStatus();
4762 
4763  m_Jobs[job_id] = job;
4764  m_StatusTracker.SetExactStatusNoLock(job_id, status, true);
4765 
4766  if ((status == CNetScheduleAPI::eRunning ||
4767  status == CNetScheduleAPI::eReading) &&
4768  m_RunTimeLine) {
4769  // Add object to the first available slot;
4770  // it is going to be rescheduled or dropped
4771  // in the background control thread
4772  // We can use time line without lock here because
4773  // the queue is still in single-use mode while
4774  // being loaded.
4776  }
4777 
4778  // Register the job for the affinity if so
4779  if (aff_id != 0)
4780  m_AffinityRegistry.AddJobToAffinity(job_id, aff_id);
4781 
4782  // Register the job in the group registry
4783  if (group_id != 0)
4784  m_GroupRegistry.AddJobToGroup(group_id, job_id);
4785 
4786  // Register the loaded job with the garbage collector
4787  CNSPreciseTime submit_time = job.GetSubmitTime();
4788  CNSPreciseTime expiration =
4789  GetJobExpirationTime(job.GetLastTouch(), status,
4790  submit_time, job.GetTimeout(),
4791  job.GetRunTimeout(),
4792  job.GetReadTimeout(),
4795  m_GCRegistry.RegisterJob(job_id, job.GetSubmitTime(),
4796  aff_id, group_id, expiration);
4797  ++recs;
4798  }
4799 
4800  // Make sure that there are no affinity IDs in the registry for which
4801  // there are no jobs and initialize the next affinity ID counter.
4803 
4804  // Make sure that there are no group IDs in the registry for which there
4805  // are no jobs and initialize the next group ID counter.
4807  } catch (const exception & ex) {
4808  if (jobs_file != NULL)
4809  fclose(jobs_file);
4810 
4811  x_ClearQueue();
4812  throw runtime_error("Error loading queue " + m_QueueName +
4813  " from its dump: " + string(ex.what()));
4814  } catch (...) {
4815  if (jobs_file != NULL)
4816  fclose(jobs_file);
4817 
4818  x_ClearQueue();
4819  throw runtime_error("Unknown error loading queue " + m_QueueName +
4820  " from its dump");
4821  }
4822 
4823  fclose(jobs_file);
4824  return recs;
4825 }
4826 
4827 
4828 // The member does not grab the operational lock.
4829 // The member is used at the time of loading jobs from dump and at that time
4830 // there is no concurrent access.
4832 {
4834  m_RunTimeLine->ReInit();
4835  m_JobsToDelete.clear(true);
4836  m_ReadJobs.clear(true);
4837 
4840  m_GCRegistry.Clear();
4842 
4843  m_Jobs.clear();
4844 }
4845 
4846 
4848  const string & job_key,
4849  ENotificationReason reason,
4850  const CNSPreciseTime & current_time)
4851 {
4852  string notification;
4853  TJobStatus job_status = job.GetStatus();
4854 
4855  if (reason == eJobDeleted)
4856  job_status = CNetScheduleAPI::eDeleted;
4857 
4858  if (reason != eProgressMessageChanged || job.GetLsnrNeedProgressMsgNotif()) {
4859  if (job.ShouldNotifyListener(current_time)) {
4861  job, job_key, job_status, reason);
4863  job.GetListenerNotifPort(),
4864  notification);
4865  }
4866  }
4867 
4868  if (reason != eNotificationStolen) {
4869  if (reason != eProgressMessageChanged || job.GetSubmNeedProgressMsgNotif()) {
4870  if (job.ShouldNotifySubmitter(current_time)) {
4871  if (notification.empty())
4873  job, job_key, job_status, reason);
4875  job.GetSubmNotifPort(),
4876  notification);
4877  }
4878  }
4879  }
4880 }
4881 
4882 
Algorithms for bvector<> (main include)
Temporary object for holding extra message arguments.
Definition: ncbidiag.hpp:1828
CDir –.
Definition: ncbifile.hpp:1696
CFile –.
Definition: ncbifile.hpp:1605
void Release()
Manually force the resource to be released.
Definition: guard.hpp:166
EJobEvent
Definition: job.hpp:60
@ eReturn
Definition: job.hpp:67
@ eRequest
Definition: job.hpp:65
@ eNSSubmitRollback
Definition: job.hpp:82
@ eCancel
Definition: job.hpp:77
@ eClear
Definition: job.hpp:75
@ eDone
Definition: job.hpp:66
@ eTimeout
Definition: job.hpp:78
@ eRedo
Definition: job.hpp:94
@ eReadTimeout
Definition: job.hpp:79
@ eFail
Definition: job.hpp:68
@ eBatchSubmit
Definition: job.hpp:64
@ eReadFail
Definition: job.hpp:71
@ eNSGetRollback
Definition: job.hpp:85
@ eReadRollback
Definition: job.hpp:74
@ eReadFinalFail
Definition: job.hpp:72
@ eReread
Definition: job.hpp:95
@ eReschedule
Definition: job.hpp:93
@ eReadDone
Definition: job.hpp:73
@ eReturnNoBlacklist
Definition: job.hpp:91
@ eRead
Definition: job.hpp:70
@ eSubmit
Definition: job.hpp:63
@ eSessionChanged
Definition: job.hpp:80
@ eNSReadRollback
Definition: job.hpp:88
@ eFinalFail
Definition: job.hpp:69
static std::string EventToString(EJobEvent event)
Definition: job.cpp:82
void SetNodeAddr(unsigned int node_ip)
Definition: job.hpp:134
void SetStatus(TJobStatus status)
Definition: job.hpp:128
void SetTimestamp(const CNSPreciseTime &t)
Definition: job.hpp:132
CNSPreciseTime GetLifetime(unsigned int job_id) const
bool DeleteIfTimedOut(unsigned int job_id, const CNSPreciseTime &current_time, unsigned int *aff_id, unsigned int *group_id)
void UpdateReadVacantTime(unsigned int job_id, const CNSPreciseTime &read_vacant_time)
void RegisterJob(unsigned int job_id, const CNSPreciseTime &submit_time, unsigned int aff_id, unsigned int group_id, const CNSPreciseTime &life_time)
void UpdateLifetime(unsigned int job_id, const CNSPreciseTime &life_time)
unsigned int GetAffinityID(unsigned int job_id) const
bool IsOutdatedJob(unsigned int job_id, ECommandGroup cmd_group, const CNSPreciseTime &timeout) const
void ChangeAffinityAndGroup(unsigned int job_id, unsigned int aff_id, unsigned int group_id)
unsigned GetNext(TJobStatus status, unsigned job_id) const
Definition: job_status.cpp:465
void SetStatus(unsigned int job_id, TJobStatus status)
Definition: job_status.cpp:194
void Erase(unsigned job_id)
Definition: job_status.cpp:226
void GetJobs(const vector< TJobStatus > &statuses, TNSBitVector &jobs) const
Definition: job_status.cpp:340
void AddPendingJob(unsigned int job_id)
Definition: job_status.cpp:219
void StatusStatistics(TJobStatus status, TNSBitVector::statistics *st) const
Definition: job_status.cpp:183
bool AnyPending() const
Definition: job_status.cpp:456
vector< unsigned int > GetJobCounters(const vector< TJobStatus > &statuses) const
Definition: job_status.cpp:103
void SetExactStatusNoLock(unsigned int job_id, TJobStatus status, bool set_clear)
Definition: job_status.cpp:267
unsigned int CountStatus(TJobStatus status) const
Definition: job_status.cpp:80
bool AnyJobs(void) const
Definition: job_status.cpp:150
TJobStatus GetStatus(unsigned job_id) const
Definition: job_status.cpp:66
TNSBitVector GetOutdatedReadVacantJobs(CNSPreciseTime timeout, const TNSBitVector &read_jobs, const CJobGCRegistry &gc_registry) const
Definition: job_status.cpp:405
void ClearAll(TNSBitVector *bv)
Definition: job_status.cpp:232
void AddPendingBatch(unsigned job_id_from, unsigned job_id_to)
Definition: job_status.cpp:276
unsigned int GetJobByStatus(TJobStatus status, const TNSBitVector &unwanted_jobs, const TNSBitVector &restrict_jobs, bool restricted) const
Definition: job_status.cpp:286
TNSBitVector GetOutdatedPendingJobs(CNSPreciseTime timeout, const CJobGCRegistry &gc_registry) const
Definition: job_status.cpp:361
Definition: job.hpp:183
void SetPassport(unsigned int passport)
Definition: job.hpp:274
unsigned GetRunCount() const
Definition: job.hpp:228
CNSPreciseTime GetTimeout() const
Definition: job.hpp:209
bool GetSubmNeedProgressMsgNotif() const
Definition: job.hpp:250
unsigned GetSubmAddr() const
Definition: job.hpp:216
bool ShouldNotifyListener(const CNSPreciseTime &current_time) const
Definition: job.cpp:291
void SetAffinityId(unsigned aff_id)
Definition: job.hpp:303
void SetLastTouch(const CNSPreciseTime &t)
Definition: job.hpp:309
unsigned int GetListenerNotifAddr() const
Definition: job.hpp:223
unsigned short GetSubmNotifPort() const
Definition: job.hpp:218
const string & GetInput() const
Definition: job.hpp:260
unsigned GetGroupId() const
Definition: job.hpp:240
CNSPreciseTime GetRunTimeout() const
Definition: job.hpp:211
void SetGroupId(unsigned id)
Definition: job.hpp:307
unsigned short GetListenerNotifPort() const
Definition: job.hpp:225
TJobStatus GetStatus() const
Definition: job.hpp:207
CJobEvent & AppendEvent()
Definition: job.cpp:236
CNSPreciseTime GetSubmitTime(void) const
Definition: job.hpp:336
bool GetLsnrNeedProgressMsgNotif() const
Definition: job.hpp:252
unsigned GetMask() const
Definition: job.hpp:238
bool ShouldNotifySubmitter(const CNSPreciseTime &current_time) const
Definition: job.cpp:281
unsigned GetAffinityId() const
Definition: job.hpp:235
unsigned GetId() const
Definition: job.hpp:201
void SetId(unsigned id)
Definition: job.hpp:272
CNSPreciseTime GetExpirationTime(const CNSPreciseTime &queue_timeout, const CNSPreciseTime &queue_run_timeout, const CNSPreciseTime &queue_read_timeout, const CNSPreciseTime &queue_pending_timeout, const CNSPreciseTime &event_time) const
Definition: job.hpp:341
EAuthTokenCompareResult
Definition: job.hpp:185
@ eNoMatch
Definition: job.hpp:188
@ eInvalidTokenFormat
Definition: job.hpp:190
@ ePassportOnlyMatch
Definition: job.hpp:187
@ eCompleteMatch
Definition: job.hpp:186
CNSPreciseTime GetLastTouch() const
Definition: job.hpp:242
CNSPreciseTime GetReadTimeout() const
Definition: job.hpp:213
bool LoadFromDump(FILE *jobs_file, char *input_buf, char *output_buf, const SJobDumpHeader &header)
Definition: job.cpp:843
void RemoveJobFromAffinity(unsigned int job_id, unsigned int aff_id)
size_t size(void) const
unsigned int ResolveAffinity(const string &token)
TNSBitVector GetJobsWithAffinity(unsigned int aff_id) const
void Dump(const string &dump_dir_name, const string &queue_name) const
void FinalizeAffinityDictionaryLoading(void)
TNSBitVector GetJobsWithAffinities(const TNSBitVector &affs) const
void RemoveDump(const string &dump_dir_name, const string &queue_name) const
bool CanAccept(const string &aff_token, size_t max_records) const
string Print(const CQueue *queue, const CNSClientsRegistry &clients_registry, const TNSBitVector &scope_jobs, const string &scope, size_t batch_size, bool verbose) const
string GetTokenByID(unsigned int aff_id) const
void ResolveAffinities(const list< string > &tokens, TNSBitVector &resolved_affs, vector< unsigned int > &aff_ids)
TNSBitVector GetRegisteredAffinities(void) const
unsigned int CollectGarbage(unsigned int max_to_del)
void LoadFromDump(const string &dump_dir_name, const string &queue_name)
unsigned int GetIDByToken(const string &aff_token) const
unsigned int CheckRemoveCandidates(void)
unsigned int ResolveAffinityToken(const string &token, unsigned int job_id, unsigned int client_id, ECommandGroup command_group)
void AddJobToAffinity(unsigned int job_id, unsigned int aff_id)
void AppendType(const CNSClientId &client, unsigned int type_to_append)
bool IsPreferredByAny(unsigned int aff_id, ECommandGroup cmd_group) const
void RegisterJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void SetPreferredAffinities(const CNSClientId &client, const TNSBitVector &aff_to_set, ECommandGroup cmd_group)
void SubtractBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
void Purge(const CNSPreciseTime &current_time, const CNSPreciseTime &timeout_worker_node, unsigned int min_worker_nodes, const CNSPreciseTime &timeout_admin, unsigned int min_admins, const CNSPreciseTime &timeout_submitter, unsigned int min_submitters, const CNSPreciseTime &timeout_reader, unsigned int min_readers, const CNSPreciseTime &timeout_unknown, unsigned int min_unknowns, bool is_log)
void StaleNodes(const CNSPreciseTime &current_time, const CNSPreciseTime &wn_timeout, const CNSPreciseTime &reader_timeout, bool is_log)
void AddToSubmitted(const CNSClientId &client, size_t count)
string PrintClientsList(const CQueue *queue, size_t batch_size, bool verbose) const
void UnregisterJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
TNSBitVector GetAllPreferredAffinities(ECommandGroup cmd_group) const
void RegisterBlacklistedJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
size_t size(void) const
bool CancelWaiting(CNSClient &client, ECommandGroup cmd_group, bool touch_notif_registry=true)
void SetBlacklistTimeouts(const CNSPreciseTime &blacklist_timeout, const CNSPreciseTime &read_blacklist_timeout)
void MoveJobToBlacklist(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void ClearClient(const CNSClientId &client, TNSBitVector &running_jobs, TNSBitVector &reading_jobs, bool &client_was_found, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
void GCBlacklistedJobs(const CJobStatusTracker &tracker, ECommandGroup cmd_group)
void UpdatePreferredAffinities(const CNSClientId &client, const TNSBitVector &aff_to_add, const TNSBitVector &aff_to_del, ECommandGroup cmd_group)
void Touch(CNSClientId &client, TNSBitVector &running_jobs, TNSBitVector &reading_jobs, bool &client_was_found, bool &session_was_reset, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
bool GetAffinityReset(const CNSClientId &client, ECommandGroup cmd_group) const
void SetLastScope(const CNSClientId &client)
void MarkAsAdmin(const CNSClientId &client)
void RegisterSocketWriteError(const CNSClientId &client)
int SetClientData(const CNSClientId &client, const string &data, int data_version)
void SetRegistries(CNSAffinityRegistry *aff_registry, CNSNotificationList *notif_registry)
void SetNodeWaiting(const CNSClientId &client, unsigned short port, const TNSBitVector &aff_ids, ECommandGroup cmd_group)
TNSBitVector GetPreferredAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
bool WasGarbageCollected(const CNSClientId &client, ECommandGroup cmd_group) const
void AddBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
void RemoveJob(unsigned int group_id, unsigned int job_id)
Definition: ns_group.cpp:304
void Clear(void)
Definition: ns_group.cpp:546
void RemoveDump(const string &dump_dir_name, const string &queue_name) const
Definition: ns_group.cpp:650
unsigned int CollectGarbage(unsigned int max_to_del)
Definition: ns_group.cpp:380
size_t size(void) const
Definition: ns_group.cpp:87
unsigned int CheckRemoveCandidates(void)
Definition: ns_group.cpp:407
TNSBitVector GetJobs(const string &group, bool allow_exception=true) const
Definition: ns_group.cpp:105
unsigned int ResolveGroup(const string &group)
Definition: ns_group.cpp:180
bool CanAccept(const string &group, size_t max_records) const
Definition: ns_group.cpp:94
void AddJobToGroup(unsigned int group_id, unsigned int job_id)
Definition: ns_group.cpp:290
unsigned int AddJob(const string &group, unsigned int job_id)
Definition: ns_group.cpp:245
unsigned int AddJobs(unsigned int group_id, unsigned int first_job_id, unsigned int count)
Definition: ns_group.cpp:223
void FinalizeGroupDictionaryLoading(void)
Definition: ns_group.cpp:357
string Print(const CQueue *queue, const TNSBitVector &scope_jobs, const string &scope, size_t batch_size, bool verbose) const
Definition: ns_group.cpp:324
void ResolveGroups(const list< string > &tokens, TNSBitVector &group_ids_vector)
Definition: ns_group.cpp:210
void RestrictByGroup(const string &group, TNSBitVector &bv) const
Definition: ns_group.cpp:139
void Dump(const string &dump_dir_name, const string &queue_name) const
Definition: ns_group.cpp:589
void LoadFromDump(const string &dump_dir_name, const string &queue_name)
Definition: ns_group.cpp:676
string BuildJobChangedNotification(const CJob &job, const string &job_key, TJobStatus job_status, ENotificationReason reason)
void onQueueResumed(bool any_pending)
void Notify(unsigned int job_id, unsigned int aff_id, CNSClientsRegistry &clients_registry, CNSAffinityRegistry &aff_registry, CNSGroupsRegistry &group_registry, CNSScopeRegistry &scope_registry, const CNSPreciseTime &notif_highfreq_period, const CNSPreciseTime &notif_handicap, ECommandGroup cmd_group)
void AddToQueueResumedNotifications(unsigned int address, unsigned short port, bool new_format)
void NotifyJobChanges(unsigned int address, unsigned short port, const string &notification)
void UnregisterListener(const CNSClientId &client, unsigned short port, ECommandGroup cmd_group)
void CheckOutdatedJobs(const TNSBitVector &outdated_jobs, CNSClientsRegistry &clients_registry, const CNSPreciseTime &notif_highfreq_period, ECommandGroup cmd_group)
void ClearExactGetNotifications(void)
void CheckTimeout(const CNSPreciseTime &current_time, CNSClientsRegistry &clients_registry, ECommandGroup cmd_group)
void NotifyPeriodically(const CNSPreciseTime &current_time, unsigned int notif_lofreq_mult, CNSClientsRegistry &clients_registry)
void RegisterListener(const CNSClientId &client, unsigned short port, unsigned int timeout, bool wnode_aff, bool any_job, bool exclusive_new_affinity, bool new_format, const TNSBitVector &groups, ECommandGroup cmd_group)
string Print(const CNSClientsRegistry &clients_registry, const CNSAffinityRegistry &aff_registry, const CNSGroupsRegistry &group_registry, bool verbose) const
CNSPreciseTime NotifyExactListeners(void)
time_t & Sec(void)
static CNSPreciseTime Current(void)
TNSBitVector GetAllJobsInScopes(void) const
Definition: ns_scope.cpp:86
void Clear(void)
Definition: ns_scope.cpp:233
void AddJobs(const string &scope, unsigned int first_job_id, unsigned int count)
Definition: ns_scope.cpp:117
string Print(const CQueue *queue, size_t batch_size, bool verbose) const
Definition: ns_scope.cpp:156
bool CanAccept(const string &scope, size_t max_records) const
Definition: ns_scope.cpp:60
TNSBitVector GetJobs(const string &scope) const
Definition: ns_scope.cpp:74
string GetJobScope(unsigned int job_id) const
Definition: ns_scope.cpp:254
void RemoveJob(unsigned int job_id)
Definition: ns_scope.cpp:139
void AddJob(const string &scope, unsigned int job_id)
Definition: ns_scope.cpp:93
CJsonNode SetHosts(const string &host_names)
Definition: access_list.cpp:54
NetSchedule internal exception.
@ eStatus_OK
Command is ok and execution is good.
Definition: ns_handler.hpp:97
NetScheduler threaded server.
Definition: ns_server.hpp:57
void SetJobsStartID(const string &qname, unsigned int value)
Definition: ns_server.hpp:135
unsigned int GetJobsStartID(const string &qname)
Definition: ns_server.hpp:137
SNSRegistryParameters GetAffRegistrySettings(void) const
Definition: ns_server.hpp:112
bool ShouldPerfLogTransitions(const string &queue_name, const string &class_name) const
Definition: ns_server.cpp:587
CCompoundIDPool GetCompoundIDPool(void) const
Definition: ns_server.hpp:133
SNSRegistryParameters GetScopeRegistrySettings(void) const
Definition: ns_server.hpp:116
SNSRegistryParameters GetGroupRegistrySettings(void) const
Definition: ns_server.hpp:114
void AddClientInfo(const CQueueClientInfo &cinfo)
Definition: queue_vc.hpp:74
map< string, string > GetLinkedSection(const string &section_name) const
unsigned int GetMaxOutputSize() const
unsigned int GetFailedRetries() const
string GetParamValue(unsigned int n) const
unsigned int GetMaxInputSize() const
string GetParamName(unsigned int n) const
int TJobReturnOption
Definition: ns_queue.hpp:102
CFastMutex m_OperationLock
Definition: ns_queue.hpp:648
void x_ResetReadingDueToClear(const CNSClientId &client, const TNSBitVector &jobs)
Definition: ns_queue.cpp:4228
void SetPauseStatus(const CNSClientId &client, TPauseStatus status)
Definition: ns_queue.cpp:4522
CFastMutex m_JobsToDeleteLock
Definition: ns_queue.hpp:663
unsigned int CountActiveJobs(void) const
Definition: ns_queue.cpp:4512
@ eNoPause
Definition: ns_queue.hpp:90
bool m_ScrambleJobKeys
Definition: ns_queue.hpp:723
map< string, size_t > x_GetRunningJobsPerClientIP(void)
Definition: ns_queue.cpp:2821
CNSScopeRegistry m_ScopeRegistry
Definition: ns_queue.hpp:746
void x_ResetRunningDueToNewSession(const CNSClientId &client, const TNSBitVector &jobs)
Definition: ns_queue.cpp:4244
CNSPreciseTime m_ClientRegistryTimeoutSubmitter
Definition: ns_queue.hpp:739
TJobStatus ConfirmReadingJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token)
Definition: ns_queue.cpp:2201
string DecorateJob(unsigned int job_id) const
Definition: ns_queue.hpp:154
void TimeLineRemove(unsigned int job_id)
Definition: ns_queue.cpp:3614
void PurgeClientRegistry(const CNSPreciseTime &current_time)
Definition: ns_queue.cpp:3793
map< unsigned int, CJob > m_Jobs
Definition: ns_queue.hpp:638
void TimeLineMove(unsigned int job_id, const CNSPreciseTime &old_time, const CNSPreciseTime &new_time)
Definition: ns_queue.cpp:3591
unsigned int m_DumpAffBufferSize
Definition: ns_queue.hpp:721
int SetClientData(const CNSClientId &client, const string &data, int data_version)
Definition: ns_queue.cpp:1119
unsigned m_ReadFailedRetries
Definition: ns_queue.hpp:681
TPauseStatus m_PauseStatus
Definition: ns_queue.hpp:732
unsigned int GetNextId()
Definition: ns_queue.cpp:1959
string m_QueueName
Definition: ns_queue.hpp:644
CNetScheduleServer * m_Server
Definition: ns_queue.hpp:634
TNSBitVector m_ReadJobs
Definition: ns_queue.hpp:671
TJobStatus x_ChangeReadingStatus(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, const string &err_msg, TJobStatus target_status, bool is_ns_rollback, bool no_retries)
Definition: ns_queue.cpp:2341
unsigned CountStatus(TJobStatus) const
Definition: ns_queue.cpp:4042
CQueueDataBase & m_QueueDB
Definition: ns_queue.hpp:636
CNSPreciseTime m_PendingTimeout
Definition: ns_queue.hpp:689
CJobStatusTracker m_StatusTracker
Definition: ns_queue.hpp:635
unsigned m_MaxJobsPerClient
Definition: ns_queue.hpp:682
void x_RegisterGetListener(const CNSClientId &client, unsigned short port, unsigned int timeout, const TNSBitVector &aff_ids, bool wnode_aff, bool any_aff, bool exclusive_new_affinity, bool new_format, const TNSBitVector &group_ids)
Definition: ns_queue.cpp:4276
void x_CheckExecutionTimeout(const CNSPreciseTime &queue_run_timeout, const CNSPreciseTime &queue_read_timeout, unsigned job_id, const CNSPreciseTime &curr_time, bool logging)
Definition: ns_queue.cpp:3332
unsigned int CancelAllJobs(const CNSClientId &client, bool logging)
Definition: ns_queue.cpp:1761
string PrintNotificationsList(bool verbose) const
Definition: ns_queue.cpp:3260
unsigned int m_ClientRegistryMinWorkerNodes
Definition: ns_queue.hpp:736
void SetAffinity(const CNSClientId &client, const list< string > &aff, ECommandGroup cmd_group)
Definition: ns_queue.cpp:1069
const string & GetQueueName() const
Definition: ns_queue.hpp:150
void PrintStatistics(size_t &aff_count) const
Definition: ns_queue.cpp:4331
void StatusStatistics(TJobStatus status, TNSBitVector::statistics *st) const
Definition: ns_queue.cpp:4048
unsigned int LoadFromDump(const string &dump_dir_name)
Definition: ns_queue.cpp:4729
string PrintClientsList(bool verbose) const
Definition: ns_queue.cpp:3252
x_SJobPick x_FindOutdatedPendingJob(const CNSClientId &client, unsigned int picked_earlier, const TNSBitVector &group_ids)
Definition: ns_queue.cpp:2865
void NotifyListenersPeriodically(const CNSPreciseTime &current_time)
Definition: ns_queue.cpp:3186
CNSPreciseTime m_ReadTimeout
Definition: ns_queue.hpp:678
CStatisticsCounters m_StatisticsCountersLastPrinted
Definition: ns_queue.hpp:709
bool PutProgressMessage(unsigned int job_id, CJob &job, const string &msg)
Definition: ns_queue.cpp:1336
unsigned int CancelSelectedJobs(const CNSClientId &client, const string &group, const string &aff_token, const vector< TJobStatus > &statuses, bool logging, vector< string > &warnings)
Definition: ns_queue.cpp:1879
void CheckExecutionTimeout(bool logging)
Definition: ns_queue.cpp:3310
unsigned SubmitBatch(const CNSClientId &client, vector< pair< CJob, string > > &batch, const string &group, bool logging, CNSRollbackInterface *&rollback_action)
Definition: ns_queue.cpp:520
TJobStatus ReadAndTouchJob(unsigned int job_id, CJob &job, CNSPreciseTime *lifetime)
Definition: ns_queue.cpp:1650
CNSPreciseTime m_ClientRegistryTimeoutReader
Definition: ns_queue.hpp:741
unsigned int m_ClientRegistryMinUnknowns
Definition: ns_queue.hpp:744
void TimeLineAdd(unsigned int job_id, const CNSPreciseTime &job_time)
Definition: ns_queue.cpp:3603
unsigned int GetJobsToDeleteCount(void) const
Definition: ns_queue.cpp:4404
CNSPreciseTime m_ReadBlacklistTime
Definition: ns_queue.hpp:684
unsigned m_FailedRetries
Definition: ns_queue.hpp:680
TJobStatus FailReadingJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, const string &err_msg, bool no_retries)
Definition: ns_queue.cpp:2217
TJobStatus SetJobListener(unsigned int job_id, CJob &job, unsigned int address, unsigned short port, const CNSPreciseTime &timeout, bool need_stolen, bool need_progress_msg, size_t *last_event_index)
Definition: ns_queue.cpp:1281
void ClearWorkerNode(const CNSClientId &client, bool &client_was_found, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
Definition: ns_queue.cpp:3157
CNSPreciseTime m_ReaderTimeout
Definition: ns_queue.hpp:688
void RegisterQueueResumeNotification(unsigned int address, unsigned short port, bool new_format)
Definition: ns_queue.cpp:4537
string GetAffinityTokenByID(unsigned int aff_id) const
Definition: ns_queue.cpp:3151
CJobTimeLine * m_RunTimeLine
Definition: ns_queue.hpp:641
void Dump(const string &dump_dir_name)
Definition: ns_queue.cpp:4638
bool GetJobForReadingOrWait(const CNSClientId &client, unsigned int port, unsigned int timeout, const list< string > *aff_list, bool reader_affinity, bool any_affinity, bool exclusive_new_affinity, bool prioritized_aff, const list< string > *group_list, bool affinity_may_change, bool group_may_change, CJob *job, bool *no_more_jobs, CNSRollbackInterface *&rollback_action, string &added_pref_aff)
Definition: ns_queue.cpp:2011
unsigned m_MaxOutputSize
Definition: ns_queue.hpp:686
unsigned int PurgeAffinities(void)
Definition: ns_queue.cpp:3706
void UpdatePerfLoggingSettings(const string &qclass)
Definition: ns_queue.cpp:227
CNSPreciseTime m_Timeout
Definition: ns_queue.hpp:676
void SetParameters(const SQueueParameters &params)
Definition: ns_queue.cpp:159
CNetScheduleKeyGenerator m_KeyGenerator
Definition: ns_queue.hpp:701
TJobStatus JobDelayExpiration(unsigned int job_id, CJob &job, const CNSPreciseTime &tm)
Definition: ns_queue.cpp:1126
void SetClientScope(const CNSClientId &client)
Definition: ns_queue.cpp:4110
CQueueClientInfoList m_ProgramVersionList
Definition: ns_queue.hpp:693
unsigned int x_CancelJobs(const CNSClientId &client, const TNSBitVector &jobs_to_cancel, bool logging)
Definition: ns_queue.cpp:1782
string PrintTransitionCounters(void) const
Definition: ns_queue.cpp:4411
string PrintScopesList(bool verbose) const
Definition: ns_queue.cpp:3303
CNSPreciseTime m_BlacklistTime
Definition: ns_queue.hpp:683
CNSPreciseTime NotifyExactListeners(void)
Definition: ns_queue.cpp:3246
void CancelWaitRead(const CNSClientId &client)
Definition: ns_queue.cpp:923
CNetScheduleAccessList m_SubmHosts
Definition: ns_queue.hpp:695
TJobStatus FailJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, const string &err_msg, const string &output, int ret_code, bool no_retries, string warning)
Definition: ns_queue.cpp:2987
TJobStatus ReturnReadingJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, bool is_ns_rollback, bool blacklist, TJobStatus target_status)
Definition: ns_queue.cpp:2235
unsigned int GetNextJobIdForBatch(unsigned count)
Definition: ns_queue.cpp:1980
TParameterList GetParameters() const
Definition: ns_queue.cpp:234
void MarkClientAsAdmin(const CNSClientId &client)
Definition: ns_queue.cpp:4097
bool x_UnregisterGetListener(const CNSClientId &client, unsigned short port)
Definition: ns_queue.cpp:4317
CNSClientsRegistry m_ClientsRegistry
Definition: ns_queue.hpp:651
string x_GetJobsDumpFileName(const string &dump_dname) const
Definition: ns_queue.cpp:4721
~CQueue()
Definition: ns_queue.cpp:139
TJobStatus JobDelayReadExpiration(unsigned int job_id, CJob &job, const CNSPreciseTime &tm)
Definition: ns_queue.cpp:1173
string MakeJobKey(unsigned int job_id) const
Definition: ns_queue.cpp:4055
string PrintJobDbStat(const CNSClientId &client, unsigned int job_id, TDumpFields dump_fields)
Definition: ns_queue.cpp:3810
TJobStatus PutResult(const CNSClientId &client, const CNSPreciseTime &curr, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, int ret_code, const string &output)
Definition: ns_queue.cpp:650
unsigned int m_JobsToDeleteOps
Definition: ns_queue.hpp:667
CFastMutex m_LastIdLock
Definition: ns_queue.hpp:660
unsigned int Submit(const CNSClientId &client, CJob &job, const string &aff_token, const string &group, bool logging, CNSRollbackInterface *&rollback_action)
Definition: ns_queue.cpp:399
TJobStatus GetStatusAndLifetime(unsigned int job_id, string &client_ip, string &client_sid, string &client_phid, string &progress_msg, CNSPreciseTime *lifetime)
Definition: ns_queue.cpp:1223
void GetJobsPerState(const CNSClientId &client, const string &group_token, const string &aff_token, size_t *jobs, vector< string > &warnings) const
Definition: ns_queue.cpp:4429
void RemoveDump(const string &dump_dir_name)
Definition: ns_queue.cpp:4709
@ eRollback
Definition: ns_queue.hpp:100
@ eWithoutBlacklist
Definition: ns_queue.hpp:99
@ eWithBlacklist
Definition: ns_queue.hpp:98
TJobStatus x_ResetDueTo(const CNSClientId &client, unsigned int job_id, const CNSPreciseTime &current_time, TJobStatus status_from, CJobEvent::EJobEvent event_type)
Definition: ns_queue.cpp:4120
CNSPreciseTime m_ClientRegistryTimeoutAdmin
Definition: ns_queue.hpp:737
void Attach(void)
Definition: ns_queue.cpp:145
SPurgeAttributes CheckJobsExpiry(const CNSPreciseTime &current_time, SPurgeAttributes attributes, unsigned int last_job, TJobStatus status)
Definition: ns_queue.cpp:3511
void x_LogSubmit(const CJob &job)
Definition: ns_queue.cpp:390
void x_Erase(const TNSBitVector &job_ids, TJobStatus status)
Erase jobs from all structures, request delayed db deletion.
Definition: ns_queue.cpp:2489
unsigned int m_DumpGroupBufferSize
Definition: ns_queue.hpp:722
int TQueueKind
Definition: ns_queue.hpp:86
x_SJobPick x_FindOutdatedJobForReading(const CNSClientId &client, unsigned int picked_earlier, const TNSBitVector &group_ids)
Definition: ns_queue.cpp:2927
CNSPreciseTime m_WNodeTimeout
Definition: ns_queue.hpp:687
list< pair< string, string > > TParameterList
Definition: ns_queue.hpp:116
CNetScheduleAccessList m_WnodeHosts
Definition: ns_queue.hpp:697
TJobStatus ReturnJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, const string &auth_token, string &warning, TJobReturnOption how)
Definition: ns_queue.cpp:1362
unsigned int m_ClientRegistryMinSubmitters
Definition: ns_queue.hpp:740
int TPauseStatus
Definition: ns_queue.hpp:94
void TimeLineExchange(unsigned int remove_job_id, unsigned int add_job_id, const CNSPreciseTime &new_time)
Definition: ns_queue.cpp:3624
void x_ResetReadingDueToNewSession(const CNSClientId &client, const TNSBitVector &jobs)
Definition: ns_queue.cpp:4260
CNSPreciseTime m_ClientRegistryTimeoutWorkerNode
Definition: ns_queue.hpp:735
string PrintGroupsList(const CNSClientId &client, bool verbose) const
Definition: ns_queue.cpp:3286
TJobStatus GetJobStatus(unsigned job_id) const
Definition: ns_queue.cpp:1946
void x_ClearQueue(void)
Definition: ns_queue.cpp:4831
void GetLinkedSections(map< string, map< string, string > > &linked_sections) const
Definition: ns_queue.cpp:262
unsigned int m_ReadJobsOps
Definition: ns_queue.hpp:672
TNSBitVector m_JobsToDelete
Definition: ns_queue.hpp:666
string PrintAllJobDbStat(const CNSClientId &client, const string &group, const string &aff_token, const vector< TJobStatus > &job_statuses, unsigned int start_after_job_id, unsigned int count, bool order_first, TDumpFields dump_fields, bool logging)
Definition: ns_queue.cpp:3867
bool IsEmpty() const
Definition: ns_queue.cpp:1952
TJobStatus RescheduleJob(const CNSClientId &client, unsigned int job_id, const string &job_key, const string &auth_token, const string &aff_token, const string &group, bool &auth_token_ok, CJob &job)
Definition: ns_queue.cpp:1470
CNSPreciseTime GetTimeout() const
Definition: ns_queue.hpp:760
CNSPreciseTime GetReadTimeout() const
Definition: ns_queue.hpp:768
string PrintAffinitiesList(const CNSClientId &client, bool verbose) const
Definition: ns_queue.cpp:3268
string x_DumpJobs(const TNSBitVector &jobs_to_dump, unsigned int start_after_job_id, unsigned int count, TDumpFields dump_fields, bool order_first)
Definition: ns_queue.cpp:3943
unsigned int m_ClientRegistryMinReaders
Definition: ns_queue.hpp:742
CNSNotificationList m_NotificationsList
Definition: ns_queue.hpp:713
bool GetJobOrWait(const CNSClientId &client, unsigned short port, unsigned int timeout, const list< string > *aff_list, bool wnode_affinity, bool any_affinity, bool exclusive_new_affinity, bool prioritized_aff, bool new_format, const list< string > *group_list, CJob *new_job, CNSRollbackInterface *&rollback_action, string &added_pref_aff)
Definition: ns_queue.cpp:717
CQueue(const string &queue_name, TQueueKind queue_kind, CNetScheduleServer *server, CQueueDataBase &qdb)
Definition: ns_queue.cpp:66
void RegisterSocketWriteError(const CNSClientId &client)
Definition: ns_queue.cpp:4103
unsigned int m_ClientRegistryMinAdmins
Definition: ns_queue.hpp:738
void x_UpdateDB_ProvideJobNoLock(const CNSClientId &client, const CNSPreciseTime &curr, unsigned int job_id, ECommandGroup cmd_group, CJob &job)
Definition: ns_queue.cpp:4598
CNSPreciseTime m_MaxPendingReadWaitTimeout
Definition: ns_queue.hpp:691
CNSPreciseTime m_ClientRegistryTimeoutUnknown
Definition: ns_queue.hpp:743
unsigned int PurgeGroups(void)
Definition: ns_queue.cpp:3742
CStatisticsCounters m_StatisticsCounters
Definition: ns_queue.hpp:708
bool x_NoMoreReadJobs(const CNSClientId &client, const TNSBitVector &aff_list, bool reader_affinity, bool any_affinity, bool exclusive_new_affinity, const TNSBitVector &group_list, bool affinity_may_change, bool group_may_change)
Definition: ns_queue.cpp:276
unsigned m_MaxInputSize
Definition: ns_queue.hpp:685
CNSAffinityRegistry m_AffinityRegistry
Definition: ns_queue.hpp:654
unsigned int m_DumpBufferSize
Definition: ns_queue.hpp:719
const bool & m_LogBatchEachJob
Definition: ns_queue.hpp:704
x_SJobPick x_FindVacantJob(const CNSClientId &client, const TNSBitVector &explicit_affs, const vector< unsigned int > &aff_ids, bool use_pref_affinity, bool any_affinity, bool exclusive_new_affinity, bool prioritized_aff, const TNSBitVector &group_ids, bool has_groups, ECommandGroup cmd_group)
Definition: ns_queue.cpp:2510
CNetScheduleAccessList m_ReaderHosts
Definition: ns_queue.hpp:699
CNSPreciseTime x_GetEstimatedJobLifetime(unsigned int job_id, TJobStatus status) const
Definition: ns_queue.cpp:1868
const bool & m_Log
Definition: ns_queue.hpp:703
map< string, string > m_LinkedSections
Definition: ns_queue.hpp:724
void PrintJobCounters(void) const
Definition: ns_queue.cpp:4387
unsigned int m_SavedId
Definition: ns_queue.hpp:658
TJobStatus RedoJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job)
Definition: ns_queue.cpp:1590
TJobStatus RereadJob(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, bool &no_op)
Definition: ns_queue.cpp:2258
CNSGroupsRegistry m_GroupRegistry
Definition: ns_queue.hpp:727
void x_NotifyJobChanges(const CJob &job, const string &job_key, ENotificationReason reason, const CNSPreciseTime &current_time)
Definition: ns_queue.cpp:4847
CNSPreciseTime GetRunTimeout() const
Definition: ns_queue.hpp:764
CNSPreciseTime m_NotifHifreqPeriod
Definition: ns_queue.hpp:715
string PrintJobsStat(const CNSClientId &client, const string &group_token, const string &aff_token, vector< string > &warnings) const
Definition: ns_queue.cpp:4486
void GetMaxIOSizesAndLinkedSections(unsigned int &max_input_size, unsigned int &max_output_size, map< string, map< string, string > > &linked_sections) const
Definition: ns_queue.cpp:248
unsigned int m_DumpClientBufferSize
Definition: ns_queue.hpp:720
void StaleNodes(const CNSPreciseTime &current_time)
Definition: ns_queue.cpp:3776
CFastMutex m_ParamLock
Definition: ns_queue.hpp:675
void x_ResetRunningDueToClear(const CNSClientId &client, const TNSBitVector &jobs)
Definition: ns_queue.cpp:4212
CJobGCRegistry m_GCRegistry
Definition: ns_queue.hpp:730
void OptimizeMem()
Definition: ns_queue.cpp:2503
void PurgeBlacklistedJobs(void)
Definition: ns_queue.cpp:3786
bool x_ValidateMaxJobsPerClientIP(unsigned int job_id, const map< string, size_t > &jobs_per_client_ip) const
Definition: ns_queue.cpp:2845
TJobStatus GetStatusAndLifetimeAndTouch(unsigned int job_id, CJob &job, CNSPreciseTime *lifetime)
Definition: ns_queue.cpp:1252
void TouchClientsRegistry(CNSClientId &client, bool &client_was_found, bool &session_was_reset, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
Definition: ns_queue.cpp:4064
void EraseJob(unsigned job_id, TJobStatus status)
Definition: ns_queue.cpp:2472
void CancelWaitGet(const CNSClientId &client)
Definition: ns_queue.cpp:905
CNSPreciseTime m_MaxPendingWaitTimeout
Definition: ns_queue.hpp:690
list< string > ChangeAffinity(const CNSClientId &client, const list< string > &aff_to_add, const list< string > &aff_to_del, ECommandGroup cmd_group)
Definition: ns_queue.cpp:942
void x_UpdateDB_PutResultNoLock(unsigned job_id, const string &auth_token, const CNSPreciseTime &curr, int ret_code, const string &output, CJob &job, const CNSClientId &client)
Definition: ns_queue.cpp:4546
bool m_ShouldPerfLogTransitions
Definition: ns_queue.hpp:748
CNSPreciseTime m_RunTimeout
Definition: ns_queue.hpp:677
void x_RegisterReadListener(const CNSClientId &client, unsigned short port, unsigned int timeout, const TNSBitVector &aff_ids, bool reader_aff, bool any_aff, bool exclusive_new_affinity, const TNSBitVector &group_ids)
Definition: ns_queue.cpp:4299
unsigned int m_NotifLofreqMult
Definition: ns_queue.hpp:716
unsigned int DeleteBatch(unsigned int max_deleted)
Definition: ns_queue.cpp:3639
CNSPreciseTime m_StatisticsCountersLastPrintedTimestamp
Definition: ns_queue.hpp:710
CNSPreciseTime m_HandicapTimeout
Definition: ns_queue.hpp:717
CRWLock m_RunTimeLineLock
Definition: ns_queue.hpp:642
vector< CNetScheduleAPI::EJobStatus > m_StatesForRead
Definition: ns_queue.hpp:752
unsigned int m_LastId
Definition: ns_queue.hpp:657
TJobStatus Cancel(const CNSClientId &client, unsigned int job_id, const string &job_key, CJob &job, bool is_ns_rollback=false)
Definition: ns_queue.cpp:1678
CNSPreciseTime m_NotifHifreqInterval
Definition: ns_queue.hpp:714
void CountNSGetRollback(size_t count)
void CountOutdatedPick(ECommandGroup cmd_group)
void CountDBDeletion(size_t count)
void CountRedo(CNetScheduleAPI::EJobStatus from)
void CountToPendingRescheduled(size_t count)
void CountReread(CNetScheduleAPI::EJobStatus from, CNetScheduleAPI::EJobStatus to)
void CountTransitionToDeleted(CNetScheduleAPI::EJobStatus from, size_t count)
void CountTransition(CNetScheduleAPI::EJobStatus from, CNetScheduleAPI::EJobStatus to, ETransitionPathOption path_option=eNone)
void CountNSSubmitRollback(size_t count)
void PrintDelta(CDiagContext_Extra &extra, const CStatisticsCounters &prev) const
void CountToPendingWithoutBlacklist(size_t count)
void CountNSReadRollback(size_t count)
void PrintTransitions(CDiagContext_Extra &extra) const
void CountSubmit(size_t count)
void MoveObject(time_t old_time, time_t new_time, unsigned object_id)
Move object from one time slot to another.
Definition: time_line.hpp:260
void AddObject(time_t tm, unsigned object_id)
Add object to the timeline.
Definition: time_line.hpp:178
bool RemoveObject(time_t object_time, unsigned object_id)
Remove object from the time line, object_time defines time slot.
Definition: time_line.hpp:214
void ExtractObjects(time_t tm, TBitVector *objects)
Extracts all objects up to 'tm' and puts them into 'objects' vector.
Definition: time_line.hpp:287
time_t GetHead() const
Return head of the timeline.
Definition: time_line.hpp:116
void ReInit(time_t tm=0)
Definition: time_line.hpp:162
Constant iterator designed to enumerate "ON" bits.
Definition: bm.h:603
bool valid() const noexcept
Checks if iterator is still valid.
Definition: bm.h:283
Bitvector Bit-vector container with runtime compression of bits.
Definition: bm.h:115
@ opt_free_0
Free unused 0 blocks.
Definition: bm.h:135
bool get_bit(size_type n) const noexcept
returns true if bit n is set and false is bit n is 0.
Definition: bm.h:3602
bool any() const noexcept
Returns true if any bits in this bitset are set, and otherwise returns false.
Definition: bm.h:2451
bool empty() const noexcept
Returns true if the set is empty (no bits are set, otherwise returns false) Please note that this is ...
Definition: bm.h:1562
size_type size() const noexcept
Returns bvector's capacity (number of bits it can store)
Definition: bm.h:1300
void optimize(bm::word_t *temp_block=0, optmode opt_mode=opt_compress, statistics *stat=0)
Optimize memory bitvector's memory allocation.
Definition: bm.h:3635
bool set_bit(size_type n, bool val=true)
Sets bit n.
Definition: bm.h:4227
enumerator first() const
Returns enumerator pointing on the first non-zero bit.
Definition: bm.h:1871
bvector< Alloc > & set_range(size_type left, size_type right, bool value=true)
Sets all bits in the specified closed interval [left,right] Interval must be inside the bvector's siz...
Definition: bm.h:2368
void clear(const size_type *ids, size_type ids_size, bm::sort_order so=bm::BM_UNKNOWN)
clear list of bits in this bitset
Definition: bm.h:4149
size_type count() const noexcept
population count (count of ON bits)
Definition: bm.h:2401
void erase(iterator pos)
Definition: map.hpp:167
container_type::const_iterator const_iterator
Definition: map.hpp:53
const_iterator begin() const
Definition: map.hpp:151
const_iterator end() const
Definition: map.hpp:152
bool empty() const
Definition: map.hpp:149
void clear()
Definition: map.hpp:169
const_iterator find(const key_type &key) const
Definition: map.hpp:153
void Print(const CCompactSAMApplication::AlignInfo &ai)
static const int chunk_size
CS_CONTEXT * ctx
Definition: t0006.c:12
#define false
Definition: bool.h:36
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
Definition: dlist.tmpl.h:46
static void DLIST_NAME() remove(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
Definition: dlist.tmpl.h:90
static const struct attribute attributes[]
Definition: attributes.c:165
static char precision
Definition: genparams.c:28
static SQLCHAR output[256]
Definition: print.c:5
char data[12]
Definition: iconv.c:80
element_type * get(void) const
Get pointer.
Definition: ncbimisc.hpp:581
#define NULL
Definition: ncbistd.hpp:225
void PrintRequestStop(void)
Print request stop message (for request-driven applications)
Definition: ncbidiag.cpp:2778
CDiagContext_Extra & Print(const string &name, const string &value)
The method does not print the argument, but adds it to the string.
Definition: ncbidiag.cpp:2622
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:818
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
Definition: ncbidiag.cpp:1907
void PrintRequestStart(const string &message)
Print request start message (for request-driven applications)
Definition: ncbidiag.cpp:2762
CDiagContext_Extra Extra(void) const
Create a temporary CDiagContext_Extra object.
Definition: ncbidiag.hpp:2095
void Flush(void)
Print the message and reset object.
Definition: ncbidiag.cpp:2332
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
EJobStatus
Job status codes.
static string StatusToString(EJobStatus status)
Printable status type.
string Generate(unsigned id) const
string GenerateCompoundID(unsigned id, CCompoundIDPool id_pool) const
@ eDeleted
The job has been wiped out of the database.
@ eDone
Job is ready (computed successfully)
@ eConfirmed
Final state - read confirmed.
@ eReading
Job has its output been reading.
@ eCanceled
Explicitly canceled.
@ eRunning
Running on a worker node.
@ eJobNotFound
No such job.
@ ePending
Waiting for execution.
@ eReadFailed
Final state - read failed.
@ eFailed
Failed to run (execution timeout)
@ eOutOfOrder
This jobs comes to the node before every regular jobs.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
#define kEmptyStr
Definition: ncbistr.hpp:123
static string & ToUpper(string &str)
Convert string to upper case – string& version.
Definition: ncbistr.cpp:424
unsigned short GetPort() const
Get the listening port number back.
@ BM_GAP
GAP compression is ON.
Definition: bmconst.h:148
CNSPreciseTime GetJobExpirationTime(const CNSPreciseTime &last_touch, TJobStatus status, const CNSPreciseTime &job_submit_time, const CNSPreciseTime &job_timeout, const CNSPreciseTime &job_run_timeout, const CNSPreciseTime &job_read_timeout, const CNSPreciseTime &queue_timeout, const CNSPreciseTime &queue_run_timeout, const CNSPreciseTime &queue_read_timeout, const CNSPreciseTime &queue_pending_timeout, const CNSPreciseTime &event_time)
Definition: job.cpp:101
const size_t g_ValidJobStatusesSize
Definition: job_status.hpp:64
const CNetScheduleAPI::EJobStatus g_ValidJobStatuses[]
Definition: job_status.hpp:55
yy_size_t n
Uint4 GetHost(TEndpointKey key)
const unsigned int kNetScheduleMaxDBDataSize
Int4 delta(size_t dimension_, const Int4 *score_)
const string k_NoAffinityToken
Definition: ns_affinity.hpp:52
long TDumpFields
@ eGCEraseTime
const CNSPreciseTime default_client_registry_timeout_unknown
const unsigned int default_dump_group_buffer_size
const CNSPreciseTime default_run_timeout(3600, 0)
const unsigned int default_max_jobs_per_client
const CNSPreciseTime default_wnode_timeout(40, 0)
const unsigned int default_notif_lofreq_mult
const unsigned int default_dump_aff_buffer_size
const CNSPreciseTime default_notif_hifreq_interval(0, kNSecsPerSecond/10)
const unsigned int default_dump_client_buffer_size
const unsigned int default_client_registry_min_submitters
const unsigned int default_client_registry_min_unknowns
const CNSPreciseTime default_pending_timeout(604800, 0)
const CNSPreciseTime default_reader_timeout(40, 0)
const CNSPreciseTime default_read_timeout(10, 0)
const unsigned int default_client_registry_min_admins
const CNSPreciseTime default_timeout(3600, 0)
const CNSPreciseTime default_client_registry_timeout_worker_node
const unsigned int default_dump_buffer_size
const unsigned int default_client_registry_min_worker_nodes
const unsigned int default_failed_retries
const CNSPreciseTime default_client_registry_timeout_admin
const CNSPreciseTime default_notif_hifreq_period(5, 0)
const CNSPreciseTime default_client_registry_timeout_reader
const CNSPreciseTime default_client_registry_timeout_submitter
const CNSPreciseTime default_blacklist_time
const unsigned int default_client_registry_min_readers
const bool default_scramble_job_keys
void g_DoErasePerfLogging(const CQueue &queue, const CJob &job)
void g_DoPerfLogging(const CQueue &queue, const CJob &job, int status)
const CNSPreciseTime kTimeZero
const CNSPreciseTime kTimeNever
string NS_FormatPreciseTimeAsSec(const CNSPreciseTime &t)
string NS_FormatPreciseTime(const CNSPreciseTime &t)
static const unsigned int s_ReserveDelta
Definition: ns_queue.cpp:63
CTimeLine< TNSBitVector > CJobTimeLine
Definition: ns_queue.hpp:69
void SerializePauseState(CNetScheduleServer *server)
const string kNoScopeOnly
Definition: ns_scope.hpp:55
const TNSBitVector kEmptyBitVector
Definition: ns_types.hpp:105
ENotificationReason
Definition: ns_types.hpp:62
@ eProgressMessageChanged
Definition: ns_types.hpp:65
@ eJobDeleted
Definition: ns_types.hpp:66
@ eStatusChanged
Definition: ns_types.hpp:63
@ eNotificationStolen
Definition: ns_types.hpp:64
ECommandGroup
Definition: ns_types.hpp:54
@ eGet
Definition: ns_types.hpp:55
@ eRead
Definition: ns_types.hpp:56
const string kJobsFileName("jobs.dump")
static string kNewLine("\n")
const unsigned kNetScheduleMaxOverflowSize
Definition: ns_types.hpp:118
#define count
static uint8_t * buffer
Definition: pcre2test.c:1016
true_type verbose
Definition: processing.cpp:878
Query parser execution implementations.
Query string parsing components.
Defines CRequestContext class for NCBI C++ diagnostic API.
static SLJIT_INLINE sljit_ins st(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static CNamedPipeClient * client
unsigned int job_id
Definition: ns_queue.hpp:487
unsigned int aff_id
Definition: ns_queue.hpp:489
int Read(FILE *f)
Definition: ns_db_dump.cpp:80
void Write(FILE *f)
Definition: ns_db_dump.cpp:72
unsigned int low_mark_percentage
unsigned int high_mark_percentage
unsigned int dump_buffer_size
unsigned int client_registry_min_submitters
CNSPreciseTime blacklist_time
CNSPreciseTime timeout
map< string, string > linked_sections
CNSPreciseTime read_timeout
unsigned int client_registry_min_unknowns
CNSPreciseTime notif_hifreq_interval
unsigned int dump_aff_buffer_size
CNSPreciseTime notif_hifreq_period
unsigned int max_output_size
CNSPreciseTime read_blacklist_time
CNSPreciseTime pending_timeout
unsigned int max_input_size
CNSPreciseTime notif_handicap
unsigned int max_jobs_per_client
CNSPreciseTime client_registry_timeout_submitter
unsigned int read_failed_retries
CNSPreciseTime client_registry_timeout_unknown
CNSPreciseTime CalculateRuntimePrecision(void) const
unsigned int notif_lofreq_mult
unsigned int dump_client_buffer_size
CNSPreciseTime reader_timeout
unsigned int failed_retries
CNSPreciseTime max_pending_read_wait_timeout
unsigned int dump_group_buffer_size
unsigned int client_registry_min_worker_nodes
CNSPreciseTime max_pending_wait_timeout
CNSPreciseTime client_registry_timeout_admin
CNSPreciseTime client_registry_timeout_reader
CNSPreciseTime run_timeout
CNSPreciseTime client_registry_timeout_worker_node
CNSPreciseTime wnode_timeout
unsigned int client_registry_min_admins
unsigned int client_registry_min_readers
Statistical information about bitset's memory allocation details.
Definition: bm.h:125
#define _ASSERT
else result
Definition: token2.c:20
Modified on Fri Sep 20 14:57:35 2024 by modify_doxy.py rev. 669887