NCBI C++ ToolKit
ns_queue.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: ns_queue.cpp 97381 2022-07-12 14:45:56Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Victor Joukov
27  *
28  * File Description:
29  * NetSchedule queue structure and parameters
30  */
31 #include <ncbi_pch.hpp>
32 
33 #include <unistd.h>
34 
35 #include "ns_queue.hpp"
37 #include "background_host.hpp"
38 #include "ns_util.hpp"
39 #include "ns_server.hpp"
40 #include "ns_precise_time.hpp"
41 #include "ns_rollback.hpp"
42 #include "queue_database.hpp"
43 #include "ns_handler.hpp"
44 #include "ns_ini_params.hpp"
45 #include "ns_perf_logging.hpp"
46 #include "ns_restore_state.hpp"
47 #include "ns_db_dump.hpp"
48 
49 #include <corelib/ncbi_system.hpp> // SleepMilliSec
50 #include <corelib/request_ctx.hpp>
54 #include <util/bitset/bmalgo.h>
55 
56 
58 
59 
60 // Used together with m_SavedId. m_SavedId is saved in a DB and is used
61 // as to start from value for the restarted neschedule.
62 // s_ReserveDelta value is used to avoid to often DB updates
63 static const unsigned int s_ReserveDelta = 10000;
64 
65 
66 CQueue::CQueue(const string & queue_name,
67  TQueueKind queue_kind,
68  CNetScheduleServer * server,
69  CQueueDataBase & qdb) :
70  m_Server(server),
71  m_QueueDB(qdb),
72  m_RunTimeLine(NULL),
73  m_QueueName(queue_name),
74  m_Kind(queue_kind),
75 
76  m_LastId(0),
77  m_SavedId(s_ReserveDelta),
78 
79  m_JobsToDeleteOps(0),
80  m_ReadJobsOps(0),
81 
82  m_Timeout(default_timeout),
83  m_RunTimeout(default_run_timeout),
84  m_ReadTimeout(default_read_timeout),
85  m_FailedRetries(default_failed_retries),
86  m_ReadFailedRetries(default_failed_retries), // See CXX-5161, the same
87  // default as for
88  // failed_retries
89  m_MaxJobsPerClient(default_max_jobs_per_client),
90  m_BlacklistTime(default_blacklist_time),
91  m_ReadBlacklistTime(default_blacklist_time), // See CXX-4993, the same
92  // default as for
93  // blacklist_time
94  m_MaxInputSize(kNetScheduleMaxDBDataSize),
95  m_MaxOutputSize(kNetScheduleMaxDBDataSize),
96  m_WNodeTimeout(default_wnode_timeout),
97  m_ReaderTimeout(default_reader_timeout),
98  m_PendingTimeout(default_pending_timeout),
99  m_KeyGenerator(server->GetHost(), server->GetPort(), queue_name),
100  m_Log(server->IsLog()),
101  m_LogBatchEachJob(server->IsLogBatchEachJob()),
102  m_RefuseSubmits(false),
103  m_StatisticsCounters(CStatisticsCounters::eQueueCounters),
104  m_StatisticsCountersLastPrinted(CStatisticsCounters::eQueueCounters),
105  m_StatisticsCountersLastPrintedTimestamp(0.0),
106  m_NotificationsList(qdb, server->GetNodeID(), queue_name),
107  m_NotifHifreqInterval(default_notif_hifreq_interval), // 0.1 sec
108  m_NotifHifreqPeriod(default_notif_hifreq_period),
109  m_NotifLofreqMult(default_notif_lofreq_mult),
110  m_DumpBufferSize(default_dump_buffer_size),
111  m_DumpClientBufferSize(default_dump_client_buffer_size),
112  m_DumpAffBufferSize(default_dump_aff_buffer_size),
113  m_DumpGroupBufferSize(default_dump_group_buffer_size),
114  m_ScrambleJobKeys(default_scramble_job_keys),
115  m_PauseStatus(eNoPause),
116  m_ClientRegistryTimeoutWorkerNode(
118  m_ClientRegistryMinWorkerNodes(default_client_registry_min_worker_nodes),
119  m_ClientRegistryTimeoutAdmin(default_client_registry_timeout_admin),
120  m_ClientRegistryMinAdmins(default_client_registry_min_admins),
121  m_ClientRegistryTimeoutSubmitter(default_client_registry_timeout_submitter),
122  m_ClientRegistryMinSubmitters(default_client_registry_min_submitters),
123  m_ClientRegistryTimeoutReader(default_client_registry_timeout_reader),
124  m_ClientRegistryMinReaders(default_client_registry_min_readers),
125  m_ClientRegistryTimeoutUnknown(default_client_registry_timeout_unknown),
126  m_ClientRegistryMinUnknowns(default_client_registry_min_unknowns),
127  m_ShouldPerfLogTransitions(false)
128 {
129  _ASSERT(!queue_name.empty());
132 
136 }
137 
138 
140 {
141  delete m_RunTimeLine;
142 }
143 
144 
145 void CQueue::Attach(void)
146 {
147  // Here we have a db, so we can read the counter value we should start from
150  if (m_SavedId < m_LastId) {
151  // Overflow
152  m_LastId = 0;
154  }
156 }
157 
158 
160 {
162 
163  m_Timeout = params.timeout;
164  m_RunTimeout = params.run_timeout;
165  if (!m_RunTimeLine) {
166  // One time only. Precision can not be reset.
168  unsigned int interval_sec = precision.Sec();
169  if (interval_sec < 1)
170  interval_sec = 1;
171  m_RunTimeLine = new CJobTimeLine(interval_sec, 0);
172  }
173 
174  m_ReadTimeout = params.read_timeout;
182  m_WNodeTimeout = params.wnode_timeout;
198 
210 
213 
214  // program version control
216  if (!params.program_name.empty()) {
218  }
222 
224 }
225 
226 
227 void CQueue::UpdatePerfLoggingSettings(const string & qclass)
228 {
230  qclass);
231 }
232 
233 
235 {
236  TParameterList parameters;
237  CQueueParamAccessor qp(*this);
238  unsigned nParams = qp.GetNumParams();
239 
240  for (unsigned n = 0; n < nParams; ++n) {
241  parameters.push_back(
242  pair<string, string>(qp.GetParamName(n), qp.GetParamValue(n)));
243  }
244  return parameters;
245 }
246 
247 
249  unsigned int & max_input_size,
250  unsigned int & max_output_size,
251  map< string, map<string, string> > & linked_sections) const
252 {
253  CQueueParamAccessor qp(*this);
254 
255  max_input_size = qp.GetMaxInputSize();
256  max_output_size = qp.GetMaxOutputSize();
257  GetLinkedSections(linked_sections);
258 }
259 
260 
261 void
263  map<string, string> > & linked_sections) const
264 {
266  k != m_LinkedSections.end(); ++k) {
268 
269  if (!values.empty())
270  linked_sections[k->first] = values;
271  }
272 }
273 
274 
275 // It is called only if there was no job for reading
277  const TNSBitVector & aff_ids,
278  bool reader_affinity,
279  bool any_affinity,
280  bool exclusive_new_affinity,
281  const TNSBitVector & group_ids,
282  bool affinity_may_change,
283  bool group_may_change)
284 {
285  // This certain condition guarantees that there will be no job given
286  if (!reader_affinity &&
287  !aff_ids.any() &&
288  !exclusive_new_affinity &&
289  !any_affinity)
290  return true;
291 
292  // Used only in the GetJobForReadingOrWait().
293  // Operation lock has to be already taken.
294  // Provides true if there are no more jobs for reading.
295  vector<CNetScheduleAPI::EJobStatus> from_state;
296  TNSBitVector pending_running_jobs;
297  TNSBitVector other_jobs;
298  string scope = client.GetScope();
299 
300  from_state.push_back(CNetScheduleAPI::ePending);
301  from_state.push_back(CNetScheduleAPI::eRunning);
302  m_StatusTracker.GetJobs(from_state, pending_running_jobs);
303 
305 
306  // Remove those which have been read or in process of reading. This cannot
307  // affect the pending and running jobs
308  other_jobs -= m_ReadJobs;
309  // Add those which are in a process of reading.
310  // This needs to be done after '- m_ReadJobs' because that vector holds
311  // both jobs which have been read and jobs which are in a process of
312  // reading. When calculating 'no_more_jobs' the only already read jobs must
313  // be excluded.
314  TNSBitVector reading_jobs;
316 
317  // Apply scope limitations to all participant jobs
318  if (scope.empty() || scope == kNoScopeOnly) {
319  // Both these cases should consider only the non-scope jobs
320  TNSBitVector all_jobs_in_scopes(bm::BM_GAP);
321  all_jobs_in_scopes = m_ScopeRegistry.GetAllJobsInScopes();
322  pending_running_jobs -= all_jobs_in_scopes;
323  other_jobs -= all_jobs_in_scopes;
324  reading_jobs -= all_jobs_in_scopes;
325  } else {
326  // Consider only the jobs in the particular scope
327  TNSBitVector scope_jobs(bm::BM_GAP);
328  scope_jobs = m_ScopeRegistry.GetJobs(scope);
329  pending_running_jobs &= scope_jobs;
330  other_jobs &= scope_jobs;
331  reading_jobs &= scope_jobs;
332  }
333 
334  if (group_ids.any()) {
335  // The pending and running jobs may change their group and or affinity
336  // later via the RESCHEDULE command
337  if (!group_may_change)
338  m_GroupRegistry.RestrictByGroup(group_ids, pending_running_jobs);
339 
340  // The job group cannot be changed for the other job states
341  other_jobs |= reading_jobs;
342  m_GroupRegistry.RestrictByGroup(group_ids, other_jobs);
343  } else
344  other_jobs |= reading_jobs;
345 
346  TNSBitVector candidates = pending_running_jobs | other_jobs;
347 
348  if (!candidates.any())
349  return true;
350 
351 
352  // Deal with affinities
353  // The weakest condition is if any affinity is suitable
354  if (any_affinity)
355  return !candidates.any();
356 
357  TNSBitVector suitable_affinities;
358  TNSBitVector all_aff;
359  TNSBitVector all_pref_affs;
360  TNSBitVector all_aff_jobs; // All jobs with an affinity
361  TNSBitVector no_aff_jobs; // Jobs without any affinity
362 
365  all_aff_jobs = m_AffinityRegistry.GetJobsWithAffinities(all_aff);
366  no_aff_jobs = candidates - all_aff_jobs;
367  if (exclusive_new_affinity && no_aff_jobs.any())
368  return false;
369 
370  if (exclusive_new_affinity)
371  suitable_affinities = all_aff - all_pref_affs;
372  if (reader_affinity)
373  suitable_affinities |= m_ClientsRegistry.
374  GetPreferredAffinities(client, eRead);
375  suitable_affinities |= aff_ids;
376 
377  TNSBitVector suitable_aff_jobs =
379  suitable_affinities);
380  if (affinity_may_change)
381  candidates = pending_running_jobs |
382  (other_jobs & suitable_aff_jobs);
383  else
384  candidates &= suitable_aff_jobs;
385  return !candidates.any();
386 }
387 
388 
389 // Used to log a single job
390 void CQueue::x_LogSubmit(const CJob & job)
391 {
393  .Print("job_key", MakeJobKey(job.GetId()));
394 
395  extra.Flush();
396 }
397 
398 
399 unsigned int CQueue::Submit(const CNSClientId & client,
400  CJob & job,
401  const string & aff_token,
402  const string & group,
403  bool logging,
404  CNSRollbackInterface * & rollback_action)
405 {
406  // the only config parameter used here is the max input size so there is no
407  // need to have a safe parameters accessor.
408 
409  if (job.GetInput().size() > m_MaxInputSize)
410  NCBI_THROW(CNetScheduleException, eDataTooLong, "Input is too long");
411 
412  unsigned int aff_id = 0;
413  unsigned int group_id = 0;
414  CNSPreciseTime op_begin_time = CNSPreciseTime::Current();
415  unsigned int job_id = GetNextId();
416  CJobEvent & event = job.AppendEvent();
417 
418  job.SetId(job_id);
419  job.SetPassport(rand());
420  job.SetLastTouch(op_begin_time);
421 
422  event.SetNodeAddr(client.GetAddress());
423  event.SetStatus(CNetScheduleAPI::ePending);
424  event.SetEvent(CJobEvent::eSubmit);
425  event.SetTimestamp(op_begin_time);
426  event.SetClientNode(client.GetNode());
427  event.SetClientSession(client.GetSession());
428 
429  // Special treatment for system job masks
431  {
432  // NOT IMPLEMENTED YET: put job id into OutOfOrder list.
433  // The idea is that there can be an urgent job, which
434  // should be executed before jobs which were submitted
435  // earlier, e.g. for some administrative purposes. See
436  // CNetScheduleAPI::EJobMask in file netschedule_api.hpp
437  }
438 
439  // Take the queue lock and start the operation
440  {{
441  string scope = client.GetScope();
443 
444 
445  if (!scope.empty()) {
446  // Check the scope registry limits
447  SNSRegistryParameters params =
449  if (!m_ScopeRegistry.CanAccept(scope, params.max_records))
450  NCBI_THROW(CNetScheduleException, eDataTooLong,
451  "No available slots in the queue scope registry");
452  }
453  if (!group.empty()) {
454  // Check the group registry limits
455  SNSRegistryParameters params =
457  if (!m_GroupRegistry.CanAccept(group, params.max_records))
458  NCBI_THROW(CNetScheduleException, eDataTooLong,
459  "No available slots in the queue group registry");
460  }
461  if (!aff_token.empty()) {
462  // Check the affinity registry limits
463  SNSRegistryParameters params =
465  if (!m_AffinityRegistry.CanAccept(aff_token, params.max_records))
466  NCBI_THROW(CNetScheduleException, eDataTooLong,
467  "No available slots in the queue affinity registry");
468  }
469 
470 
471  if (!group.empty()) {
472  group_id = m_GroupRegistry.AddJob(group, job_id);
473  job.SetGroupId(group_id);
474  }
475  if (!aff_token.empty()) {
476  aff_id = m_AffinityRegistry.ResolveAffinityToken(aff_token,
477  job_id, 0, eUndefined);
478  job.SetAffinityId(aff_id);
479  }
480 
481  m_Jobs[job_id] = job;
482 
484 
485  if (!scope.empty())
486  m_ScopeRegistry.AddJob(scope, job_id);
487 
488  // Register the job with the client
490 
491  // Make the decision whether to send or not a notification
492  if (m_PauseStatus == eNoPause)
497 
498  m_GCRegistry.RegisterJob(job_id, op_begin_time,
499  aff_id, group_id,
501  m_RunTimeout,
504  op_begin_time));
505  }}
506 
507  rollback_action = new CNSSubmitRollback(client, job_id,
508  op_begin_time,
510 
512  if (logging)
513  x_LogSubmit(job);
514 
515  return job_id;
516 }
517 
518 
519 unsigned int
521  vector< pair<CJob, string> > & batch,
522  const string & group,
523  bool logging,
524  CNSRollbackInterface * & rollback_action)
525 {
526  unsigned int batch_size = batch.size();
527  unsigned int job_id = GetNextJobIdForBatch(batch_size);
528  TNSBitVector affinities;
530 
531  {{
532  unsigned int job_id_cnt = job_id;
533  unsigned int group_id = 0;
534  vector<string> aff_tokens;
535  string scope = client.GetScope();
536 
537  // Count the number of affinities
538  for (size_t k = 0; k < batch_size; ++k) {
539  const string & aff_token = batch[k].second;
540  if (!aff_token.empty())
541  aff_tokens.push_back(aff_token);
542  }
543 
544 
546 
547  if (!scope.empty()) {
548  // Check the scope registry limits
549  SNSRegistryParameters params =
551  if (!m_ScopeRegistry.CanAccept(scope, params.max_records))
552  NCBI_THROW(CNetScheduleException, eDataTooLong,
553  "No available slots in the queue scope registry");
554  }
555  if (!group.empty()) {
556  // Check the group registry limits
557  SNSRegistryParameters params =
559  if (!m_GroupRegistry.CanAccept(group, params.max_records))
560  NCBI_THROW(CNetScheduleException, eDataTooLong,
561  "No available slots in the queue group registry");
562  }
563  if (!aff_tokens.empty()) {
564  // Check the affinity registry limits
565  SNSRegistryParameters params =
567  if (!m_AffinityRegistry.CanAccept(aff_tokens, params.max_records))
568  NCBI_THROW(CNetScheduleException, eDataTooLong,
569  "No available slots in the queue affinity registry");
570  }
571 
572  group_id = m_GroupRegistry.ResolveGroup(group);
573  for (size_t k = 0; k < batch_size; ++k) {
574 
575  CJob & job = batch[k].first;
576  const string & aff_token = batch[k].second;
577  CJobEvent & event = job.AppendEvent();
578 
579  job.SetId(job_id_cnt);
580  job.SetPassport(rand());
581  job.SetGroupId(group_id);
582  job.SetLastTouch(curr_time);
583 
584  event.SetNodeAddr(client.GetAddress());
585  event.SetStatus(CNetScheduleAPI::ePending);
586  event.SetEvent(CJobEvent::eBatchSubmit);
587  event.SetTimestamp(curr_time);
588  event.SetClientNode(client.GetNode());
589  event.SetClientSession(client.GetSession());
590 
591  if (!aff_token.empty()) {
592  unsigned int aff_id = m_AffinityRegistry.
593  ResolveAffinityToken(aff_token,
594  job_id_cnt,
595  0,
596  eUndefined);
597 
598  job.SetAffinityId(aff_id);
599  affinities.set_bit(aff_id);
600  }
601 
602  m_Jobs[job_id_cnt] = job;
603  ++job_id_cnt;
604  }
605 
606  m_GroupRegistry.AddJobs(group_id, job_id, batch_size);
607  m_StatusTracker.AddPendingBatch(job_id, job_id + batch_size - 1);
609 
610  if (!scope.empty())
611  m_ScopeRegistry.AddJobs(scope, job_id, batch_size);
612 
613  // Make a decision whether to notify clients or not
614  TNSBitVector jobs;
615  jobs.set_range(job_id, job_id + batch_size - 1);
616 
617  if (m_PauseStatus == eNoPause)
618  m_NotificationsList.Notify(jobs, affinities,
619  batch_size != aff_tokens.size(),
626  eGet);
627 
628  for (size_t k = 0; k < batch_size; ++k) {
630  batch[k].first.GetId(), curr_time,
631  batch[k].first.GetAffinityId(), group_id,
632  batch[k].first.GetExpirationTime(m_Timeout,
633  m_RunTimeout,
636  curr_time));
637  }
638  }}
639 
640  m_StatisticsCounters.CountSubmit(batch_size);
641  if (m_LogBatchEachJob && logging)
642  for (size_t k = 0; k < batch_size; ++k)
643  x_LogSubmit(batch[k].first);
644 
645  rollback_action = new CNSBatchSubmitRollback(client, job_id, batch_size);
646  return job_id;
647 }
648 
649 
651  const CNSPreciseTime & curr,
652  unsigned int job_id,
653  const string & job_key,
654  CJob & job,
655  const string & auth_token,
656  int ret_code,
657  const string & output)
658 {
659  // The only one parameter (max output size) is required for the put
660  // operation so there is no need to use CQueueParamAccessor
661 
662  if (output.size() > m_MaxOutputSize)
663  NCBI_THROW(CNetScheduleException, eDataTooLong,
664  "Output is too long");
665 
667  TJobStatus old_status = GetJobStatus(job_id);
668 
669  if (old_status == CNetScheduleAPI::eDone) {
672  return old_status;
673  }
674 
675  if (old_status != CNetScheduleAPI::ePending &&
676  old_status != CNetScheduleAPI::eRunning &&
677  old_status != CNetScheduleAPI::eFailed)
678  return old_status;
679 
680  x_UpdateDB_PutResultNoLock(job_id, auth_token, curr, ret_code, output,
681  job, client);
682 
686  g_DoPerfLogging(*this, job, 200);
688 
691  m_RunTimeout,
694  curr));
695 
696  TimeLineRemove(job_id);
697 
698  x_NotifyJobChanges(job, job_key, eStatusChanged, curr);
699 
700  // Notify the readers if the job has not been given for reading yet
701  if (!m_ReadJobs.get_bit(job_id)) {
702  m_GCRegistry.UpdateReadVacantTime(job_id, curr);
710  eRead);
711  }
712  return old_status;
713 }
714 
715 
716 bool
718  unsigned short port, // Port the client
719  // will wait on
720  unsigned int timeout, // If timeout != 0 =>
721  // WGET
722  const CNSPreciseTime & curr,
723  const list<string> * aff_list,
724  bool wnode_affinity,
725  bool any_affinity,
726  bool exclusive_new_affinity,
727  bool prioritized_aff,
728  bool new_format,
729  const list<string> * group_list,
730  CJob * new_job,
731  CNSRollbackInterface * & rollback_action,
732  string & added_pref_aff)
733 {
734  // We need exactly 1 parameter - m_RunTimeout, so we can access it without
735  // CQueueParamAccessor
736 
737  // This is a worker node command, so mark the node type as a worker
738  // node
740 
741  vector<unsigned int> aff_ids;
742  TNSBitVector aff_ids_vector;
743  TNSBitVector group_ids_vector;
744  bool has_groups = false;
745 
746  {{
748 
749  if (wnode_affinity) {
750  // Check that the preferred affinities were not reset
752  return false;
753 
754  // Check that the client was garbage collected with preferred affs
756  return false;
757  }
758 
759  // Resolve affinities and groups. It is supposed that the client knows
760  // better what affinities and groups to expect i.e. even if they do not
761  // exist yet, they may appear soon.
762  if (group_list != NULL) {
763  m_GroupRegistry.ResolveGroups(*group_list, group_ids_vector);
764  has_groups = !group_list->empty();
765  }
766  if (aff_list != NULL)
767  m_AffinityRegistry.ResolveAffinities(*aff_list, aff_ids_vector,
768  aff_ids);
769 
771  }}
772 
773  for (;;) {
774  // No lock here to make it possible to pick a job
775  // simultaneously from many threads
776  x_SJobPick job_pick = x_FindVacantJob(client,
777  aff_ids_vector, aff_ids,
778  wnode_affinity,
779  any_affinity,
780  exclusive_new_affinity,
781  prioritized_aff,
782  group_ids_vector, has_groups,
783  eGet);
784  {{
785  bool outdated_job = false;
787 
788  if (job_pick.job_id == 0) {
789  if (exclusive_new_affinity)
790  // Second try only if exclusive new aff is set on
791  job_pick = x_FindOutdatedPendingJob(client, 0,
792  group_ids_vector);
793 
794  if (job_pick.job_id == 0) {
795  if (timeout != 0 && port > 0)
796  // WGET: // There is no job, so the client might need to
797  // be registered in the waiting list
798  x_RegisterGetListener(client, port, timeout,
799  aff_ids_vector,
800  wnode_affinity, any_affinity,
801  exclusive_new_affinity,
802  new_format, group_ids_vector);
803  return true;
804  }
805  outdated_job = true;
806  } else {
807  // Check that the job is still Pending; it could be
808  // grabbed by another WN or GC
810  continue; // Try to pick a job again
811 
812  if (exclusive_new_affinity) {
814  job_pick.job_id, eGet,
815  m_MaxPendingWaitTimeout) == false) {
816  x_SJobPick outdated_pick =
818  client, job_pick.job_id,
819  group_ids_vector);
820  if (outdated_pick.job_id != 0) {
821  job_pick = outdated_pick;
822  outdated_job = true;
823  }
824  }
825  }
826  }
827 
828  // The job is still pending, check if it was received as
829  // with exclusive affinity
830  if (job_pick.exclusive && job_pick.aff_id != 0 &&
831  outdated_job == false) {
833  job_pick.aff_id, eGet))
834  continue; // Other WN grabbed this affinity already
835 
836  string aff_token = m_AffinityRegistry.GetTokenByID(
837  job_pick.aff_id);
838  // CXX-8843: The '-' affinity must not be added to the list of
839  // preferred affinities
840  if (aff_token != k_NoAffinityToken) {
841  bool added = m_ClientsRegistry.
842  UpdatePreferredAffinities(
843  client, job_pick.aff_id, 0, eGet);
844  if (added)
845  added_pref_aff = aff_token;
846  }
847  }
848  if (outdated_job && job_pick.aff_id != 0) {
849  string aff_token = m_AffinityRegistry.GetTokenByID(
850  job_pick.aff_id);
851  // CXX-8843: The '-' affinity must not be added to the list of
852  // preferred affinities
853  if (aff_token != k_NoAffinityToken) {
854  bool added = m_ClientsRegistry.
855  UpdatePreferredAffinities(
856  client, job_pick.aff_id, 0, eGet);
857  if (added)
858  added_pref_aff = aff_token;
859  }
860  }
861 
863  *new_job);
866 
870  g_DoPerfLogging(*this, *new_job, 200);
871  if (outdated_job)
873 
875  job_pick.job_id,
876  new_job->GetExpirationTime(m_Timeout,
877  m_RunTimeout,
880  curr));
881  TimeLineAdd(job_pick.job_id, curr + m_RunTimeout);
883 
884  x_NotifyJobChanges(*new_job, MakeJobKey(job_pick.job_id),
885  eStatusChanged, curr);
886 
887  // If there are no more pending jobs, let's clear the
888  // list of delayed exact notifications.
891 
892  rollback_action = new CNSGetJobRollback(client, job_pick.job_id);
893  return true;
894  }}
895  }
896  return true;
897 }
898 
899 
901 {
902  bool result;
903 
904  {{
906 
908  }}
909 
910  if (result == false)
911  ERR_POST(Warning << "Attempt to cancel WGET for the client "
912  "which does not wait anything (node: "
913  << client.GetNode() << " session: "
914  << client.GetSession() << ")");
915 }
916 
917 
919 {
920  bool result;
921 
922  {{
924 
926  }}
927 
928  if (result == false)
929  ERR_POST(Warning << "Attempt to cancel waiting READ for the client "
930  "which does not wait anything (node: "
931  << client.GetNode() << " session: "
932  << client.GetSession() << ")");
933 }
934 
935 
936 list<string>
938  const list<string> & aff_to_add,
939  const list<string> & aff_to_del,
940  ECommandGroup cmd_group)
941 {
942  // It is guaranteed here that the client is a new style one.
943  // I.e. it has both client_node and client_session.
944  if (cmd_group == eGet)
946  else
948 
949 
950  list<string> msgs; // Warning messages for the socket
951  unsigned int client_id = client.GetID();
952  TNSBitVector current_affinities =
954  cmd_group);
955  TNSBitVector aff_id_to_add;
956  TNSBitVector aff_id_to_del;
957  bool any_to_add = false;
958  bool any_to_del = false;
959 
960  // Identify the affinities which should be deleted
961  for (list<string>::const_iterator k(aff_to_del.begin());
962  k != aff_to_del.end(); ++k) {
963  unsigned int aff_id = m_AffinityRegistry.GetIDByToken(*k);
964 
965  if (aff_id == 0) {
966  // The affinity is not known for NS at all
967  ERR_POST(Warning << "Client '" << client.GetNode()
968  << "' deletes unknown affinity '"
969  << *k << "'. Ignored.");
970  msgs.push_back("eAffinityNotFound:"
971  "unknown affinity to delete: " + *k);
972  continue;
973  }
974 
975  if (!current_affinities.get_bit(aff_id)) {
976  // This a try to delete something which has not been added or
977  // deleted before.
978  ERR_POST(Warning << "Client '" << client.GetNode()
979  << "' deletes affinity '" << *k
980  << "' which is not in the list of the "
981  "preferred client affinities. Ignored.");
982  msgs.push_back("eAffinityNotPreferred:not registered affinity "
983  "to delete: " + *k);
984  continue;
985  }
986 
987  // The affinity will really be deleted
988  aff_id_to_del.set_bit(aff_id);
989  any_to_del = true;
990  }
991 
992 
993  // Check that the update of the affinities list will not exceed the limit
994  // for the max number of affinities per client.
995  // Note: this is not precise check. There could be non-unique affinities in
996  // the add list or some of affinities to add could already be in the list.
997  // The precise checking however requires more CPU and blocking so only an
998  // approximate (but fast) checking is done.
999  SNSRegistryParameters aff_reg_settings =
1001  if (current_affinities.count() + aff_to_add.size()
1002  - aff_id_to_del.count() >
1003  aff_reg_settings.max_records) {
1004  NCBI_THROW(CNetScheduleException, eTooManyPreferredAffinities,
1005  "The client '" + client.GetNode() +
1006  "' exceeds the limit (" +
1007  to_string(aff_reg_settings.max_records) +
1008  ") of the preferred affinities. Changed request ignored.");
1009  }
1010 
1011  // To avoid logging under the lock
1012  vector<string> already_added_affinities;
1013 
1014  {{
1016 
1017  // Convert the aff_to_add to the affinity IDs
1018  for (list<string>::const_iterator k(aff_to_add.begin());
1019  k != aff_to_add.end(); ++k ) {
1020  unsigned int aff_id =
1022  0, client_id,
1023  cmd_group);
1024 
1025  if (current_affinities.get_bit(aff_id)) {
1026  already_added_affinities.push_back(*k);
1027  continue;
1028  }
1029 
1030  aff_id_to_add.set_bit(aff_id);
1031  any_to_add = true;
1032  }
1033  }}
1034 
1035  // Log the warnings and add it to the warning message
1036  for (vector<string>::const_iterator j(already_added_affinities.begin());
1037  j != already_added_affinities.end(); ++j) {
1038  // That was a try to add something which has already been added
1039  ERR_POST(Warning << "Client '" << client.GetNode()
1040  << "' adds affinity '" << *j
1041  << "' which is already in the list of the "
1042  "preferred client affinities. Ignored.");
1043  msgs.push_back("eAffinityAlreadyPreferred:already registered "
1044  "affinity to add: " + *j);
1045  }
1046 
1047  if (any_to_add || any_to_del)
1049  aff_id_to_add,
1050  aff_id_to_del,
1051  cmd_group);
1052 
1053  if (m_ClientsRegistry.WasGarbageCollected(client, cmd_group)) {
1054  ERR_POST(Warning << "Client '" << client.GetNode()
1055  << "' has been garbage collected and tries to "
1056  "update its preferred affinities.");
1057  msgs.push_back("eClientGarbageCollected:the client had been "
1058  "garbage collected");
1059  }
1060  return msgs;
1061 }
1062 
1063 
1065  const list<string> & aff,
1066  ECommandGroup cmd_group)
1067 {
1068  if (cmd_group == eGet)
1070  else
1072 
1073  SNSRegistryParameters aff_reg_settings =
1075 
1076  if (aff.size() > aff_reg_settings.max_records) {
1077  NCBI_THROW(CNetScheduleException, eTooManyPreferredAffinities,
1078  "The client '" + client.GetNode() +
1079  "' exceeds the limit (" +
1080  to_string(aff_reg_settings.max_records) +
1081  ") of the preferred affinities. Set request ignored.");
1082  }
1083 
1084  unsigned int client_id = client.GetID();
1085  TNSBitVector aff_id_to_set;
1086  TNSBitVector already_added_aff_id;
1087 
1088 
1089  TNSBitVector current_affinities =
1091  cmd_group);
1092  {{
1094 
1095  // Convert the aff to the affinity IDs
1096  for (list<string>::const_iterator k(aff.begin());
1097  k != aff.end(); ++k ) {
1098  unsigned int aff_id =
1100  0, client_id,
1101  cmd_group);
1102 
1103  if (current_affinities.get_bit(aff_id))
1104  already_added_aff_id.set_bit(aff_id);
1105 
1106  aff_id_to_set.set_bit(aff_id);
1107  }
1108  }}
1109 
1110  m_ClientsRegistry.SetPreferredAffinities(client, aff_id_to_set, cmd_group);
1111 }
1112 
1113 
1115  const string & data, int data_version)
1116 {
1117  return m_ClientsRegistry.SetClientData(client, data, data_version);
1118 }
1119 
1120 
1122  CJob & job,
1123  const CNSPreciseTime & tm)
1124 {
1125  CNSPreciseTime queue_run_timeout = GetRunTimeout();
1127 
1129  TJobStatus status = GetJobStatus(job_id);
1130 
1131  if (status != CNetScheduleAPI::eRunning)
1132  return status;
1133 
1134  CNSPreciseTime time_start = kTimeZero;
1135  CNSPreciseTime run_timeout = kTimeZero;
1136 
1137  auto job_iter = m_Jobs.find(job_id);
1138  if (job_iter == m_Jobs.end())
1140 
1141  time_start = job_iter->second.GetLastEvent()->GetTimestamp();
1142  run_timeout = job_iter->second.GetRunTimeout();
1143  if (run_timeout == kTimeZero)
1144  run_timeout = queue_run_timeout;
1145 
1146  if (time_start + run_timeout > curr + tm) {
1147  job = job_iter->second;
1148  return CNetScheduleAPI::eRunning; // Old timeout is enough to cover
1149  // this request, so keep it.
1150  }
1151 
1152  job_iter->second.SetRunTimeout(curr + tm - time_start);
1153  job_iter->second.SetLastTouch(curr);
1154 
1155  // No need to update the GC registry because the running (and reading)
1156  // jobs are skipped by GC
1157  CNSPreciseTime exp_time = kTimeZero;
1158  if (run_timeout != kTimeZero)
1159  exp_time = time_start + run_timeout;
1160 
1161  TimeLineMove(job_id, exp_time, curr + tm);
1162 
1163  job = job_iter->second;
1165 }
1166 
1167 
1169  CJob & job,
1170  const CNSPreciseTime & tm)
1171 {
1172  CNSPreciseTime queue_read_timeout = GetReadTimeout();
1174 
1176  TJobStatus status = GetJobStatus(job_id);
1177 
1178  if (status != CNetScheduleAPI::eReading)
1179  return status;
1180 
1181  CNSPreciseTime time_start = kTimeZero;
1182  CNSPreciseTime read_timeout = kTimeZero;
1183 
1184  auto job_iter = m_Jobs.find(job_id);
1185  if (job_iter == m_Jobs.end())
1187 
1188  time_start = job_iter->second.GetLastEvent()->GetTimestamp();
1189  read_timeout = job_iter->second.GetReadTimeout();
1190  if (read_timeout == kTimeZero)
1191  read_timeout = queue_read_timeout;
1192 
1193  if (time_start + read_timeout > curr + tm) {
1194  job = job_iter->second;
1195  return CNetScheduleAPI::eReading; // Old timeout is enough to
1196  // cover this request, so
1197  // keep it.
1198  }
1199 
1200  job_iter->second.SetReadTimeout(curr + tm - time_start);
1201  job_iter->second.SetLastTouch(curr);
1202 
1203  // No need to update the GC registry because the running (and reading)
1204  // jobs are skipped by GC
1205  CNSPreciseTime exp_time = kTimeZero;
1206  if (read_timeout != kTimeZero)
1207  exp_time = time_start + read_timeout;
1208 
1209  TimeLineMove(job_id, exp_time, curr + tm);
1210 
1211  job = job_iter->second;
1213 }
1214 
1215 
1216 
1217 // This member is used for WST/WST2 which do not need to touch the job
1219  string & client_ip,
1220  string & client_sid,
1221  string & client_phid,
1222  string & progress_msg,
1223  CNSPreciseTime * lifetime)
1224 {
1226  TJobStatus status = GetJobStatus(job_id);
1227 
1228  if (status == CNetScheduleAPI::eJobNotFound)
1229  return status;
1230 
1231  auto job_iter = m_Jobs.find(job_id);
1232 
1233  if (job_iter == m_Jobs.end())
1234  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
1235 
1236  client_ip = job_iter->second.GetClientIP();
1237  client_sid = job_iter->second.GetClientSID();
1238  client_phid = job_iter->second.GetNCBIPHID();
1239  progress_msg = job_iter->second.GetProgressMsg();
1240 
1241  *lifetime = x_GetEstimatedJobLifetime(job_id, status);
1242  return status;
1243 }
1244 
1245 
1246 // This member is used for the SST/SST2 commands which also touch the job
1248  CJob & job,
1249  CNSPreciseTime * lifetime)
1250 {
1252  TJobStatus status = GetJobStatus(job_id);
1253 
1254  if (status == CNetScheduleAPI::eJobNotFound)
1255  return status;
1256 
1258  auto job_iter = m_Jobs.find(job_id);
1259 
1260  if (job_iter == m_Jobs.end())
1261  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
1262 
1263  job_iter->second.SetLastTouch(curr);
1264 
1266  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1267  m_ReadTimeout,
1268  m_PendingTimeout, curr));
1269 
1270  *lifetime = x_GetEstimatedJobLifetime(job_id, status);
1271  job = job_iter->second;
1272  return status;
1273 }
1274 
1275 
1277  CJob & job,
1278  unsigned int address,
1279  unsigned short port,
1280  const CNSPreciseTime & timeout,
1281  bool need_stolen,
1282  bool need_progress_msg,
1283  size_t * last_event_index)
1284 {
1288 
1289  auto job_iter = m_Jobs.find(job_id);
1290 
1291  if (job_iter == m_Jobs.end())
1292  return status;
1293 
1294  *last_event_index = job_iter->second.GetLastEventIndex();
1295  status = job_iter->second.GetStatus();
1296 
1297  unsigned int old_listener_addr = job_iter->second.GetListenerNotifAddr();
1298  unsigned short old_listener_port = job_iter->second.GetListenerNotifPort();
1299 
1300  if (job_iter->second.GetNeedStolenNotif() &&
1301  old_listener_addr != 0 && old_listener_port != 0) {
1302  if (old_listener_addr != address || old_listener_port != port) {
1303  // Send the stolen notification only if it is
1304  // really a new listener
1305  x_NotifyJobChanges(job_iter->second, MakeJobKey(job_id),
1306  eNotificationStolen, curr);
1307  }
1308  }
1309 
1310  if (address == 0 || port == 0 || timeout == kTimeZero) {
1311  // If at least one of the values is 0 => no notifications
1312  // So to make the job properly dumped put zeros everywhere.
1313  job_iter->second.SetListenerNotifAddr(0);
1314  job_iter->second.SetListenerNotifPort(0);
1315  job_iter->second.SetListenerNotifAbsTime(kTimeZero);
1316  } else {
1317  job_iter->second.SetListenerNotifAddr(address);
1318  job_iter->second.SetListenerNotifPort(port);
1319  job_iter->second.SetListenerNotifAbsTime(curr + timeout);
1320  }
1321 
1322  job_iter->second.SetNeedLsnrProgressMsgNotif(need_progress_msg);
1323  job_iter->second.SetNeedStolenNotif(need_stolen);
1324  job_iter->second.SetLastTouch(curr);
1325 
1326  job = job_iter->second;
1327  return status;
1328 }
1329 
1330 
1331 bool CQueue::PutProgressMessage(unsigned int job_id,
1332  CJob & job,
1333  const string & msg)
1334 {
1337 
1338  auto job_iter = m_Jobs.find(job_id);
1339  if (job_iter == m_Jobs.end())
1340  return false;
1341 
1342  job_iter->second.SetProgressMsg(msg);
1343  job_iter->second.SetLastTouch(curr);
1344 
1346  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1347  m_ReadTimeout,
1348  m_PendingTimeout, curr));
1349  x_NotifyJobChanges(job_iter->second, MakeJobKey(job_id),
1350  eProgressMessageChanged, curr);
1351 
1352  job = job_iter->second;
1353  return true;
1354 }
1355 
1356 
1358  unsigned int job_id,
1359  const string & job_key,
1360  CJob & job,
1361  const string & auth_token,
1362  string & warning,
1363  TJobReturnOption how)
1364 {
1365  CNSPreciseTime current_time = CNSPreciseTime::Current();
1367  TJobStatus old_status = GetJobStatus(job_id);
1368 
1369  if (old_status != CNetScheduleAPI::eRunning)
1370  return old_status;
1371 
1372  auto job_iter = m_Jobs.find(job_id);
1373  if (job_iter == m_Jobs.end())
1374  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
1375 
1376  if (!auth_token.empty()) {
1377  // Need to check authorization token first
1378  CJob::EAuthTokenCompareResult token_compare_result =
1379  job_iter->second.CompareAuthToken(auth_token);
1380  if (token_compare_result == CJob::eInvalidTokenFormat)
1381  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
1382  "Invalid authorization token format");
1383  if (token_compare_result == CJob::eNoMatch)
1384  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
1385  "Authorization token does not match");
1386  if (token_compare_result == CJob::ePassportOnlyMatch) {
1387  // That means the job has been given to another worker node
1388  // by whatever reason (expired/failed/returned before)
1389  ERR_POST(Warning << "Received RETURN2 with only "
1390  "passport matched.");
1391  warning = "eJobPassportOnlyMatch:Only job passport matched. "
1392  "Command is ignored.";
1393  job = job_iter->second;
1394  return old_status;
1395  }
1396  // Here: the authorization token is OK, we can continue
1397  }
1398 
1399  unsigned int run_count = job_iter->second.GetRunCount();
1400  CJobEvent * event = job_iter->second.GetLastEvent();
1401 
1402  if (!event)
1403  ERR_POST("No JobEvent for running job");
1404 
1405  event = &job_iter->second.AppendEvent();
1406  event->SetNodeAddr(client.GetAddress());
1407  event->SetStatus(CNetScheduleAPI::ePending);
1408  switch (how) {
1409  case eWithBlacklist:
1410  event->SetEvent(CJobEvent::eReturn);
1411  break;
1412  case eWithoutBlacklist:
1413  event->SetEvent(CJobEvent::eReturnNoBlacklist);
1414  break;
1415  case eRollback:
1416  event->SetEvent(CJobEvent::eNSGetRollback);
1417  break;
1418  }
1419  event->SetTimestamp(current_time);
1420  event->SetClientNode(client.GetNode());
1421  event->SetClientSession(client.GetSession());
1422 
1423  if (run_count)
1424  job_iter->second.SetRunCount(run_count - 1);
1425 
1426  job_iter->second.SetStatus(CNetScheduleAPI::ePending);
1427  job_iter->second.SetLastTouch(current_time);
1428 
1430  switch (how) {
1431  case eWithBlacklist:
1434  break;
1435  case eWithoutBlacklist:
1437  break;
1438  case eRollback:
1440  break;
1441  }
1442  g_DoPerfLogging(*this, job_iter->second, 200);
1443  TimeLineRemove(job_id);
1445  if (how == eWithBlacklist)
1448  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1450  current_time));
1451 
1452  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, current_time);
1453 
1454  if (m_PauseStatus == eNoPause)
1456  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
1459 
1460  job = job_iter->second;
1461  return old_status;
1462 }
1463 
1464 
1466  unsigned int job_id,
1467  const string & job_key,
1468  const string & auth_token,
1469  const string & aff_token,
1470  const string & group,
1471  bool & auth_token_ok,
1472  CJob & job)
1473 {
1474  CNSPreciseTime current_time = CNSPreciseTime::Current();
1476  TJobStatus old_status = GetJobStatus(job_id);
1477  unsigned int affinity_id = 0;
1478  unsigned int group_id = 0;
1479  unsigned int job_affinity_id;
1480  unsigned int job_group_id;
1481 
1482  if (old_status != CNetScheduleAPI::eRunning)
1483  return old_status;
1484 
1485  // Resolve affinity and group in a separate transaction
1486  if (!aff_token.empty() || !group.empty()) {
1487  if (!aff_token.empty())
1488  affinity_id = m_AffinityRegistry.ResolveAffinity(aff_token);
1489  if (!group.empty())
1490  group_id = m_GroupRegistry.ResolveGroup(group);
1491  }
1492 
1493  auto job_iter = m_Jobs.find(job_id);
1494  if (job_iter == m_Jobs.end())
1495  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
1496 
1497  // Need to check authorization token first
1498  CJob::EAuthTokenCompareResult token_compare_result =
1499  job_iter->second.CompareAuthToken(auth_token);
1500 
1501  if (token_compare_result == CJob::eInvalidTokenFormat)
1502  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
1503  "Invalid authorization token format");
1504 
1505  if (token_compare_result != CJob::eCompleteMatch) {
1506  auth_token_ok = false;
1507  job = job_iter->second;
1508  return old_status;
1509  }
1510 
1511  // Here: the authorization token is OK, we can continue
1512  auth_token_ok = true;
1513 
1514  // Memorize the job group and affinity for the proper updates after
1515  // the transaction is finished
1516  job_affinity_id = job_iter->second.GetAffinityId();
1517  job_group_id = job_iter->second.GetGroupId();
1518 
1519  // Update the job affinity and group
1520  job_iter->second.SetAffinityId(affinity_id);
1521  job_iter->second.SetGroupId(group_id);
1522 
1523  unsigned int run_count = job_iter->second.GetRunCount();
1524  CJobEvent * event = job_iter->second.GetLastEvent();
1525 
1526  if (!event)
1527  ERR_POST("No JobEvent for running job");
1528 
1529  event = &job_iter->second.AppendEvent();
1530  event->SetNodeAddr(client.GetAddress());
1531  event->SetStatus(CNetScheduleAPI::ePending);
1532  event->SetEvent(CJobEvent::eReschedule);
1533  event->SetTimestamp(current_time);
1534  event->SetClientNode(client.GetNode());
1535  event->SetClientSession(client.GetSession());
1536 
1537  if (run_count)
1538  job_iter->second.SetRunCount(run_count - 1);
1539 
1540  job_iter->second.SetStatus(CNetScheduleAPI::ePending);
1541  job_iter->second.SetLastTouch(current_time);
1542 
1543  // Job has been updated in the DB. Update the affinity and group
1544  // registries as needed.
1545  if (job_affinity_id != affinity_id) {
1546  if (job_affinity_id != 0)
1547  m_AffinityRegistry.RemoveJobFromAffinity(job_id, job_affinity_id);
1548  if (affinity_id != 0)
1549  m_AffinityRegistry.AddJobToAffinity(job_id, affinity_id);
1550  }
1551  if (job_group_id != group_id) {
1552  if (job_group_id != 0)
1553  m_GroupRegistry.RemoveJob(job_group_id, job_id);
1554  if (group_id != 0)
1555  m_GroupRegistry.AddJob(group_id, job_id);
1556  }
1557  if (job_affinity_id != affinity_id || job_group_id != group_id)
1558  m_GCRegistry.ChangeAffinityAndGroup(job_id, affinity_id, group_id);
1559 
1562  g_DoPerfLogging(*this, job_iter->second, 200);
1563 
1564  TimeLineRemove(job_id);
1567  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1568  m_ReadTimeout,
1570  current_time));
1571 
1572  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, current_time);
1573 
1574  if (m_PauseStatus == eNoPause)
1576  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
1579 
1580  job = job_iter->second;
1581  return old_status;
1582 }
1583 
1584 
1586  unsigned int job_id,
1587  const string & job_key,
1588  CJob & job)
1589 {
1590  CNSPreciseTime current_time = CNSPreciseTime::Current();
1592  TJobStatus old_status = GetJobStatus(job_id);
1593 
1594  if (old_status == CNetScheduleAPI::eJobNotFound ||
1595  old_status == CNetScheduleAPI::ePending ||
1596  old_status == CNetScheduleAPI::eRunning ||
1597  old_status == CNetScheduleAPI::eReading)
1598  return old_status;
1599 
1600  auto job_iter = m_Jobs.find(job_id);
1601 
1602  if (job_iter == m_Jobs.end())
1603  NCBI_THROW(CNetScheduleException, eInternalError,
1604  "Error fetching job");
1605 
1606  CJobEvent * event = job_iter->second.GetLastEvent();
1607  if (!event)
1608  ERR_POST("Inconsistency: a job has no events");
1609 
1610  event = &job_iter->second.AppendEvent();
1611  event->SetNodeAddr(client.GetAddress());
1612  event->SetStatus(CNetScheduleAPI::ePending);
1613  event->SetEvent(CJobEvent::eRedo);
1614  event->SetTimestamp(current_time);
1615  event->SetClientNode(client.GetNode());
1616  event->SetClientSession(client.GetSession());
1617 
1618  job_iter->second.SetStatus(CNetScheduleAPI::ePending);
1619  job_iter->second.SetLastTouch(current_time);
1620 
1622  m_StatisticsCounters.CountRedo(old_status);
1623  g_DoPerfLogging(*this, job_iter->second, 200);
1624 
1626  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1627  m_ReadTimeout,
1629  current_time));
1630 
1631  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, current_time);
1632 
1633  if (m_PauseStatus == eNoPause)
1634  m_NotificationsList.Notify(job_id,
1635  job_iter->second.GetAffinityId(),
1640  job = job_iter->second;
1641  return old_status;
1642 }
1643 
1644 
1646  CJob & job,
1647  CNSPreciseTime * lifetime)
1648 {
1650  TJobStatus status = GetJobStatus(job_id);
1651 
1652  if (status == CNetScheduleAPI::eJobNotFound)
1653  return status;
1654 
1656  auto job_iter = m_Jobs.find(job_id);
1657 
1658  if (job_iter == m_Jobs.end())
1659  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
1660 
1661  job_iter->second.SetLastTouch(curr);
1662 
1664  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1665  m_ReadTimeout,
1666  m_PendingTimeout, curr));
1667  *lifetime = x_GetEstimatedJobLifetime(job_id, status);
1668  job = job_iter->second;
1669  return status;
1670 }
1671 
1672 
1674  unsigned int job_id,
1675  const string & job_key,
1676  CJob & job,
1677  bool is_ns_rollback)
1678 {
1679  TJobStatus old_status;
1680  CNSPreciseTime current_time = CNSPreciseTime::Current();
1681 
1683 
1684  old_status = m_StatusTracker.GetStatus(job_id);
1685  if (old_status == CNetScheduleAPI::eJobNotFound)
1687 
1688  if (old_status == CNetScheduleAPI::eCanceled) {
1689  if (is_ns_rollback)
1691  else
1696  }
1697 
1698  auto job_iter = m_Jobs.find(job_id);
1699  if (job_iter == m_Jobs.end())
1701 
1702  CJobEvent * event = &job_iter->second.AppendEvent();
1703 
1704  event->SetNodeAddr(client.GetAddress());
1705  event->SetStatus(CNetScheduleAPI::eCanceled);
1706  if (is_ns_rollback)
1707  event->SetEvent(CJobEvent::eNSSubmitRollback);
1708  else
1709  event->SetEvent(CJobEvent::eCancel);
1710  event->SetTimestamp(current_time);
1711  event->SetClientNode(client.GetNode());
1712  event->SetClientSession(client.GetSession());
1713 
1714  job_iter->second.SetStatus(CNetScheduleAPI::eCanceled);
1715  job_iter->second.SetLastTouch(current_time);
1716 
1718  if (is_ns_rollback) {
1720  } else {
1723  g_DoPerfLogging(*this, job_iter->second, 200);
1724  }
1725 
1726  TimeLineRemove(job_id);
1727  if (old_status == CNetScheduleAPI::eRunning)
1729  else if (old_status == CNetScheduleAPI::eReading)
1731 
1733  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1734  m_ReadTimeout,
1736  current_time));
1737 
1738  x_NotifyJobChanges(job_iter->second, job_key,
1739  eStatusChanged, current_time);
1740 
1741  // Notify the readers if the job has not been given for reading yet
1742  // and it was not a rollback due to a socket write error
1743  if (!m_ReadJobs.get_bit(job_id) && is_ns_rollback == false) {
1744  m_GCRegistry.UpdateReadVacantTime(job_id, current_time);
1746  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
1749  }
1750 
1751  job = job_iter->second;
1752  return old_status;
1753 }
1754 
1755 
1757  bool logging)
1758 {
1759  vector<CNetScheduleAPI::EJobStatus> statuses;
1760 
1761  // All except cancelled
1762  statuses.push_back(CNetScheduleAPI::ePending);
1763  statuses.push_back(CNetScheduleAPI::eRunning);
1764  statuses.push_back(CNetScheduleAPI::eFailed);
1765  statuses.push_back(CNetScheduleAPI::eDone);
1766  statuses.push_back(CNetScheduleAPI::eReading);
1767  statuses.push_back(CNetScheduleAPI::eConfirmed);
1768  statuses.push_back(CNetScheduleAPI::eReadFailed);
1769 
1770  TNSBitVector jobs;
1772  m_StatusTracker.GetJobs(statuses, jobs);
1773  return x_CancelJobs(client, jobs, logging);
1774 }
1775 
1776 
1778  const TNSBitVector & candidates_to_cancel,
1779  bool logging)
1780 {
1781  CJob job;
1782  CNSPreciseTime current_time = CNSPreciseTime::Current();
1783  TNSBitVector jobs_to_cancel = candidates_to_cancel;
1784 
1785  // Filter the jobs basing on scope if so
1786  string scope = client.GetScope();
1787  if (scope.empty() || scope != kNoScopeOnly) {
1788  // Both these cases should consider only the non-scope jobs
1789  jobs_to_cancel -= m_ScopeRegistry.GetAllJobsInScopes();
1790  } else {
1791  // Consider only the jobs in the particular scope
1792  jobs_to_cancel &= m_ScopeRegistry.GetJobs(scope);
1793  }
1794 
1795  TNSBitVector::enumerator en(jobs_to_cancel.first());
1796  unsigned int count = 0;
1797  for (; en.valid(); ++en) {
1798  unsigned int job_id = *en;
1799  TJobStatus old_status = m_StatusTracker.GetStatus(job_id);
1800  auto job_iter = m_Jobs.find(job_id);
1801 
1802  if (job_iter == m_Jobs.end()) {
1803  ERR_POST("Cannot fetch job " << DecorateJob(job_id) <<
1804  " while cancelling jobs");
1805  continue;
1806  }
1807 
1808  CJobEvent * event = &job_iter->second.AppendEvent();
1809 
1810  event->SetNodeAddr(client.GetAddress());
1811  event->SetStatus(CNetScheduleAPI::eCanceled);
1812  event->SetEvent(CJobEvent::eCancel);
1813  event->SetTimestamp(current_time);
1814  event->SetClientNode(client.GetNode());
1815  event->SetClientSession(client.GetSession());
1816 
1817  job_iter->second.SetStatus(CNetScheduleAPI::eCanceled);
1818  job_iter->second.SetLastTouch(current_time);
1819 
1823  g_DoPerfLogging(*this, job_iter->second, 200);
1824 
1825  TimeLineRemove(job_id);
1826  if (old_status == CNetScheduleAPI::eRunning)
1828  else if (old_status == CNetScheduleAPI::eReading)
1830 
1832  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
1833  m_ReadTimeout,
1835  current_time));
1836 
1837  x_NotifyJobChanges(job_iter->second, MakeJobKey(job_id),
1838  eStatusChanged, current_time);
1839 
1840  // Notify the readers if the job has not been given for reading yet
1841  if (!m_ReadJobs.get_bit(job_id)) {
1842  m_GCRegistry.UpdateReadVacantTime(job_id, current_time);
1844  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
1847  }
1848 
1849  if (logging)
1850  GetDiagContext().Extra()
1851  .Print("job_key", MakeJobKey(job_id))
1852  .Print("job_phid", job_iter->second.GetNCBIPHID());
1853 
1854  ++count;
1855  }
1856  return count;
1857 }
1858 
1859 
1860 // This function must be called under the operations lock.
1861 // If called for not existing job then an exception is generated
1864  TJobStatus status) const
1865 {
1866  if (status == CNetScheduleAPI::eRunning ||
1867  status == CNetScheduleAPI::eReading)
1868  return CNSPreciseTime::Current() + GetTimeout();
1869  return m_GCRegistry.GetLifetime(job_id);
1870 }
1871 
1872 
1873 unsigned int
1875  const string & group,
1876  const string & aff_token,
1877  const vector<TJobStatus> & job_statuses,
1878  bool logging,
1879  vector<string> & warnings)
1880 {
1881  if (group.empty() && aff_token.empty() && job_statuses.empty()) {
1882  // This possible if there was only 'Canceled' status and
1883  // it was filtered out. A warning for this case is already produced
1884  return 0;
1885  }
1886 
1887  TNSBitVector jobs_to_cancel;
1888  vector<TJobStatus> statuses;
1889 
1890  if (job_statuses.empty()) {
1891  // All statuses
1892  statuses.push_back(CNetScheduleAPI::ePending);
1893  statuses.push_back(CNetScheduleAPI::eRunning);
1894  statuses.push_back(CNetScheduleAPI::eCanceled);
1895  statuses.push_back(CNetScheduleAPI::eFailed);
1896  statuses.push_back(CNetScheduleAPI::eDone);
1897  statuses.push_back(CNetScheduleAPI::eReading);
1898  statuses.push_back(CNetScheduleAPI::eConfirmed);
1899  statuses.push_back(CNetScheduleAPI::eReadFailed);
1900  }
1901  else {
1902  // The user specified statuses explicitly
1903  // The list validity is checked by the caller.
1904  statuses = job_statuses;
1905  }
1906 
1908  m_StatusTracker.GetJobs(statuses, jobs_to_cancel);
1909 
1910  if (!group.empty()) {
1911  try {
1912  jobs_to_cancel &= m_GroupRegistry.GetJobs(group);
1913  } catch (...) {
1914  jobs_to_cancel.clear();
1915  warnings.push_back("eGroupNotFound:job group " + group +
1916  " is not found");
1917  if (logging)
1918  ERR_POST(Warning << "Job group '" + group +
1919  "' is not found. No jobs are canceled.");
1920  }
1921  }
1922 
1923  if (!aff_token.empty()) {
1924  unsigned int aff_id = m_AffinityRegistry.GetIDByToken(aff_token);
1925  if (aff_id == 0) {
1926  jobs_to_cancel.clear();
1927  warnings.push_back("eAffinityNotFound:affinity " + aff_token +
1928  " is not found");
1929  if (logging)
1930  ERR_POST(Warning << "Affinity '" + aff_token +
1931  "' is not found. No jobs are canceled.");
1932  }
1933  else
1934  jobs_to_cancel &= m_AffinityRegistry.GetJobsWithAffinity(aff_id);
1935  }
1936 
1937  return x_CancelJobs(client, jobs_to_cancel, logging);
1938 }
1939 
1940 
1941 TJobStatus CQueue::GetJobStatus(unsigned int job_id) const
1942 {
1943  return m_StatusTracker.GetStatus(job_id);
1944 }
1945 
1946 
1947 bool CQueue::IsEmpty() const
1948 {
1950  return !m_StatusTracker.AnyJobs();
1951 }
1952 
1953 
1954 unsigned int CQueue::GetNextId()
1955 {
1957 
1958  // Job indexes are expected to start from 1,
1959  // the m_LastId is 0 at the very beginning
1960  ++m_LastId;
1961  if (m_LastId >= m_SavedId) {
1963  if (m_SavedId < m_LastId) {
1964  // Overflow for the saved ID
1965  m_LastId = 1;
1967  }
1969  }
1970  return m_LastId;
1971 }
1972 
1973 
1974 // Reserves the given number of the job IDs
1975 unsigned int CQueue::GetNextJobIdForBatch(unsigned int count)
1976 {
1978 
1979  // Job indexes are expected to start from 1 and be monotonously growing
1980  unsigned int start_index = m_LastId;
1981 
1982  m_LastId += count;
1983  if (m_LastId < start_index ) {
1984  // Overflow
1985  m_LastId = count;
1986  m_SavedId = count + s_ReserveDelta;
1988  }
1989 
1990  // There were no overflow, check the reserved value
1991  if (m_LastId >= m_SavedId) {
1993  if (m_SavedId < m_LastId) {
1994  // Overflow for the saved ID
1995  m_LastId = count;
1996  m_SavedId = count + s_ReserveDelta;
1997  }
1999  }
2000 
2001  return m_LastId - count + 1;
2002 }
2003 
2004 
2005 bool
2007  unsigned int port,
2008  unsigned int timeout,
2009  const list<string> * aff_list,
2010  bool reader_affinity,
2011  bool any_affinity,
2012  bool exclusive_new_affinity,
2013  bool prioritized_aff,
2014  const list<string> * group_list,
2015  bool affinity_may_change,
2016  bool group_may_change,
2017  CJob * job,
2018  bool * no_more_jobs,
2019  CNSRollbackInterface * & rollback_action,
2020  string & added_pref_aff)
2021 {
2023  TNSBitVector group_ids_vector;
2024  bool has_groups = false;
2025  TNSBitVector aff_ids_vector;
2026  vector<unsigned int> aff_ids;
2027 
2028  // This is a reader command, so mark the node type as a reader
2030 
2031  *no_more_jobs = false;
2032 
2033  {{
2035 
2036  if (reader_affinity) {
2037  // Check that the preferred affinities were not reset
2039  return false;
2040 
2041  // Check that the client was garbage collected with preferred affs
2043  return false;
2044  }
2045 
2046  // Resolve affinities and groups. It is supposed that the client knows
2047  // better what affinities and groups to expect i.e. even if they do not
2048  // exist yet, they may appear soon.
2049  if (group_list != NULL) {
2050  m_GroupRegistry.ResolveGroups(*group_list, group_ids_vector);
2051  has_groups = !group_list->empty();
2052  }
2053  if (aff_list != NULL)
2054  m_AffinityRegistry.ResolveAffinities(*aff_list, aff_ids_vector,
2055  aff_ids);
2056 
2058  }}
2059 
2060  for (;;) {
2061  // No lock here to make it possible to pick a job
2062  // simultaneously from many threads
2063  x_SJobPick job_pick = x_FindVacantJob(client,
2064  aff_ids_vector, aff_ids,
2065  reader_affinity,
2066  any_affinity,
2067  exclusive_new_affinity,
2068  prioritized_aff,
2069  group_ids_vector, has_groups,
2070  eRead);
2071 
2072  {{
2073  bool outdated_job = false;
2074  TJobStatus old_status;
2076 
2077  if (job_pick.job_id == 0) {
2078  if (exclusive_new_affinity)
2079  job_pick = x_FindOutdatedJobForReading(client, 0,
2080  group_ids_vector);
2081 
2082  if (job_pick.job_id == 0) {
2083  *no_more_jobs = x_NoMoreReadJobs(client, aff_ids_vector,
2084  reader_affinity, any_affinity,
2085  exclusive_new_affinity,
2086  group_ids_vector,
2087  affinity_may_change,
2088  group_may_change);
2089  if (timeout != 0 && port > 0)
2090  x_RegisterReadListener(client, port, timeout,
2091  aff_ids_vector,
2092  reader_affinity, any_affinity,
2093  exclusive_new_affinity,
2094  group_ids_vector);
2095  return true;
2096  }
2097  outdated_job = true;
2098  } else {
2099  // Check that the job is still Done/Failed/Canceled
2100  // it could be grabbed by another reader or GC
2101  old_status = GetJobStatus(job_pick.job_id);
2102  if (old_status != CNetScheduleAPI::eDone &&
2103  old_status != CNetScheduleAPI::eFailed &&
2104  old_status != CNetScheduleAPI::eCanceled)
2105  continue; // try to pick another job
2106 
2107  if (exclusive_new_affinity) {
2109  job_pick.job_id, eRead,
2110  m_MaxPendingReadWaitTimeout) == false) {
2111  x_SJobPick outdated_pick =
2113  client, job_pick.job_id,
2114  group_ids_vector);
2115  if (outdated_pick.job_id != 0) {
2116  job_pick = outdated_pick;
2117  outdated_job = true;
2118  }
2119  }
2120  }
2121  }
2122 
2123  // The job is still in acceptable state. Check if it was received
2124  // with exclusive affinity
2125  if (job_pick.exclusive && job_pick.aff_id != 0 &&
2126  outdated_job == false) {
2128  continue; // Other reader grabbed this affinity already
2129 
2130  string aff_token = m_AffinityRegistry.GetTokenByID(
2131  job_pick.aff_id);
2132  // CXX-8843: The '-' affinity must not be added to the list of
2133  // preferred affinities
2134  if (aff_token != k_NoAffinityToken) {
2136  client, job_pick.aff_id, 0, eRead);
2137  if (added)
2138  added_pref_aff = aff_token;
2139  }
2140  }
2141 
2142  if (outdated_job && job_pick.aff_id != 0) {
2143  string aff_token = m_AffinityRegistry.GetTokenByID(
2144  job_pick.aff_id);
2145  // CXX-8843: The '-' affinity must not be added to the list of
2146  // preferred affinities
2147  if (aff_token != k_NoAffinityToken) {
2148  bool added = m_ClientsRegistry.
2149  UpdatePreferredAffinities(
2150  client, job_pick.aff_id, 0, eRead);
2151  if (added)
2152  added_pref_aff = aff_token;
2153  }
2154  }
2155 
2156  old_status = GetJobStatus(job_pick.job_id);
2157  x_UpdateDB_ProvideJobNoLock(client, curr, job_pick.job_id,
2158  eRead, *job);
2159  m_StatusTracker.SetStatus(job_pick.job_id,
2161 
2164  g_DoPerfLogging(*this, *job, 200);
2165 
2166  if (outdated_job)
2168 
2171  m_RunTimeout,
2172  m_ReadTimeout,
2174  curr));
2175  TimeLineAdd(job_pick.job_id, curr + m_ReadTimeout);
2177 
2178  x_NotifyJobChanges(*job, MakeJobKey(job_pick.job_id),
2179  eStatusChanged, curr);
2180 
2181  rollback_action = new CNSReadJobRollback(client, job_pick.job_id,
2182  old_status);
2183  m_ReadJobs.set_bit(job_pick.job_id);
2184  ++m_ReadJobsOps;
2185  return true;
2186  }}
2187  }
2188  return true; // unreachable
2189 }
2190 
2191 
2193  unsigned int job_id,
2194  const string & job_key,
2195  CJob & job,
2196  const string & auth_token)
2197 {
2198  TJobStatus old_status = x_ChangeReadingStatus(
2199  client, job_id, job_key,
2200  job, auth_token, "",
2202  false, false);
2204  return old_status;
2205 }
2206 
2207 
2209  unsigned int job_id,
2210  const string & job_key,
2211  CJob & job,
2212  const string & auth_token,
2213  const string & err_msg,
2214  bool no_retries)
2215 {
2216  TJobStatus old_status = x_ChangeReadingStatus(
2217  client, job_id, job_key,
2218  job, auth_token, err_msg,
2220  false, no_retries);
2222  return old_status;
2223 }
2224 
2225 
2227  unsigned int job_id,
2228  const string & job_key,
2229  CJob & job,
2230  const string & auth_token,
2231  bool is_ns_rollback,
2232  bool blacklist,
2233  TJobStatus target_status)
2234 {
2235  TJobStatus old_status = x_ChangeReadingStatus(
2236  client, job_id, job_key,
2237  job, auth_token, "",
2238  target_status,
2239  is_ns_rollback,
2240  false);
2241  if (is_ns_rollback || blacklist == false)
2243  else
2245  return old_status;
2246 }
2247 
2248 
2250  unsigned int job_id,
2251  const string & job_key,
2252  CJob & job,
2253  bool & no_op)
2254 {
2255  CNSPreciseTime current_time = CNSPreciseTime::Current();
2257  TJobStatus old_status = GetJobStatus(job_id);
2258 
2259  if (old_status == CNetScheduleAPI::eJobNotFound ||
2260  old_status == CNetScheduleAPI::ePending ||
2261  old_status == CNetScheduleAPI::eRunning ||
2262  old_status == CNetScheduleAPI::eReading)
2263  return old_status;
2264 
2265  if (old_status == CNetScheduleAPI::eFailed ||
2266  old_status == CNetScheduleAPI::eDone) {
2267  no_op = true;
2268  return old_status;
2269  }
2270 
2271  // Check that the job has been read already
2272  if (!m_ReadJobs.get_bit(job_id)) {
2273  no_op = true;
2274  return old_status;
2275  }
2276 
2277  TJobStatus state_before_read = CNetScheduleAPI::eJobNotFound;
2278  auto job_iter = m_Jobs.find(job_id);
2279 
2280  if (job_iter == m_Jobs.end())
2281  NCBI_THROW(CNetScheduleException, eInternalError,
2282  "Error fetching job");
2283 
2284  const vector<CJobEvent>& job_events = job_iter->second.GetEvents();
2285  if (job_events.empty())
2286  NCBI_THROW(CNetScheduleException, eInternalError,
2287  "Inconsistency: a job has no events");
2288 
2289  state_before_read = job_iter->second.GetStatusBeforeReading();
2290 
2291  CJobEvent * event = &job_iter->second.AppendEvent();
2292  event->SetNodeAddr(client.GetAddress());
2293  event->SetStatus(state_before_read);
2294  event->SetEvent(CJobEvent::eReread);
2295  event->SetTimestamp(current_time);
2296  event->SetClientNode(client.GetNode());
2297  event->SetClientSession(client.GetSession());
2298 
2299  job_iter->second.SetStatus(state_before_read);
2300  job_iter->second.SetLastTouch(current_time);
2301 
2302  m_StatusTracker.SetStatus(job_id, state_before_read);
2303  m_StatisticsCounters.CountReread(old_status, state_before_read);
2304  g_DoPerfLogging(*this, job_iter->second, 200);
2305 
2307  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
2308  m_ReadTimeout,
2310  current_time));
2311 
2312  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, current_time);
2313 
2314  // Notify the readers
2315  m_NotificationsList.Notify(job_id, job_iter->second.GetAffinityId(),
2322  eRead);
2323 
2324  m_ReadJobs.set_bit(job_id, false);
2325  ++m_ReadJobsOps;
2326 
2327  job = job_iter->second;
2328  return old_status;
2329 }
2330 
2331 
2333  unsigned int job_id,
2334  const string & job_key,
2335  CJob & job,
2336  const string & auth_token,
2337  const string & err_msg,
2338  TJobStatus target_status,
2339  bool is_ns_rollback,
2340  bool no_retries)
2341 {
2342  CNSPreciseTime current_time =
2347  TJobStatus old_status =
2348  GetJobStatus(job_id);
2349 
2350  if (old_status != CNetScheduleAPI::eReading)
2351  return old_status;
2352 
2353  auto job_iter = m_Jobs.find(job_id);
2354  if (job_iter == m_Jobs.end())
2355  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
2356 
2357  // Check that authorization token matches
2358  if (is_ns_rollback == false) {
2359  CJob::EAuthTokenCompareResult token_compare_result =
2360  job_iter->second.CompareAuthToken(auth_token);
2361  if (token_compare_result == CJob::eInvalidTokenFormat)
2362  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
2363  "Invalid authorization token format");
2364  if (token_compare_result == CJob::eNoMatch)
2365  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
2366  "Authorization token does not match");
2367  }
2368 
2369  // Sanity check of the current job state
2370  if (job_iter->second.GetStatus() != CNetScheduleAPI::eReading)
2371  NCBI_THROW(CNetScheduleException, eInternalError,
2372  "Internal inconsistency detected. The job state in memory is " +
2374  " while in database it is " +
2375  CNetScheduleAPI::StatusToString(job_iter->second.GetStatus()));
2376 
2377  if (target_status == CNetScheduleAPI::eJobNotFound)
2378  target_status = job_iter->second.GetStatusBeforeReading();
2379 
2380 
2381  // Add an event
2382  CJobEvent & event = job_iter->second.AppendEvent();
2383  event.SetTimestamp(current_time);
2384  event.SetNodeAddr(client.GetAddress());
2385  event.SetClientNode(client.GetNode());
2386  event.SetClientSession(client.GetSession());
2387  event.SetErrorMsg(err_msg);
2388 
2389  if (is_ns_rollback) {
2390  event.SetEvent(CJobEvent::eNSReadRollback);
2391  job_iter->second.SetReadCount(job_iter->second.GetReadCount() - 1);
2392  } else {
2393  switch (target_status) {
2397  event.SetEvent(CJobEvent::eReadRollback);
2398  job_iter->second.SetReadCount(job_iter->second.GetReadCount() - 1);
2399  break;
2401  if (no_retries) {
2402  event.SetEvent(CJobEvent::eReadFinalFail);
2403  } else {
2404  event.SetEvent(CJobEvent::eReadFail);
2405  // Check the number of tries first
2406  if (job_iter->second.GetReadCount() <= m_ReadFailedRetries) {
2407  // The job needs to be re-scheduled for reading
2408  target_status = CNetScheduleAPI::eDone;
2409  path_option = CStatisticsCounters::eFail;
2410  }
2411  }
2412  break;
2414  event.SetEvent(CJobEvent::eReadDone);
2415  break;
2416  default:
2417  _ASSERT(0);
2418  break;
2419  }
2420  }
2421 
2422  event.SetStatus(target_status);
2423  job_iter->second.SetStatus(target_status);
2424  job_iter->second.SetLastTouch(current_time);
2425 
2426  if (target_status != CNetScheduleAPI::eConfirmed &&
2427  target_status != CNetScheduleAPI::eReadFailed) {
2428  m_ReadJobs.set_bit(job_id, false);
2429  ++m_ReadJobsOps;
2430 
2431  m_GCRegistry.UpdateReadVacantTime(job_id, current_time);
2432 
2433  // Notify the readers
2435  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
2438  }
2439 
2440  TimeLineRemove(job_id);
2441 
2442  m_StatusTracker.SetStatus(job_id, target_status);
2444  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
2446  current_time));
2447  if (is_ns_rollback)
2449  else
2451  target_status,
2452  path_option);
2453  g_DoPerfLogging(*this, job_iter->second, 200);
2454  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, current_time);
2455 
2456  job = job_iter->second;
2458 }
2459 
2460 
2461 // This function is called from places where the operations lock has been
2462 // already taken. So there is no lock around memory status tracker
2463 void CQueue::EraseJob(unsigned int job_id, TJobStatus status)
2464 {
2465  m_StatusTracker.Erase(job_id);
2466 
2467  {{
2468  // Request delayed record delete
2470 
2471  m_JobsToDelete.set_bit(job_id);
2473  }}
2474  TimeLineRemove(job_id);
2476 }
2477 
2478 
2479 // status - the job status from which the job was deleted
2480 void CQueue::x_Erase(const TNSBitVector & job_ids, TJobStatus status)
2481 {
2482  size_t job_count = job_ids.count();
2483  if (job_count <= 0)
2484  return;
2485 
2487 
2488  m_JobsToDelete |= job_ids;
2489  m_JobsToDeleteOps += job_count;
2491 }
2492 
2493 
2495 {
2497 }
2498 
2499 
2502  const TNSBitVector & explicit_affs,
2503  const vector<unsigned int> & aff_ids,
2504  bool use_pref_affinity,
2505  bool any_affinity,
2506  bool exclusive_new_affinity,
2507  bool prioritized_aff,
2508  const TNSBitVector & group_ids,
2509  bool has_groups,
2510  ECommandGroup cmd_group)
2511 {
2512  string scope = client.GetScope();
2513  string virtual_scope = client.GetVirtualScope();
2514 
2515  if (!virtual_scope.empty()) {
2516  // Try first this scope: see CXX-5324
2517  x_SJobPick job_pick = x_FindVacantJob(client, explicit_affs,
2518  aff_ids, use_pref_affinity,
2519  any_affinity,
2520  exclusive_new_affinity,
2521  prioritized_aff,
2522  group_ids, has_groups,
2523  cmd_group, virtual_scope);
2524  if (job_pick.job_id != 0)
2525  return job_pick;
2526 
2527  // Fallback to a regular pick
2528  }
2529 
2530  return x_FindVacantJob(client, explicit_affs, aff_ids, use_pref_affinity,
2531  any_affinity, exclusive_new_affinity,
2532  prioritized_aff, group_ids, has_groups,
2533  cmd_group, scope);
2534 }
2535 
2536 
2539  const TNSBitVector & explicit_affs,
2540  const vector<unsigned int> & aff_ids,
2541  bool use_pref_affinity,
2542  bool any_affinity,
2543  bool exclusive_new_affinity,
2544  bool prioritized_aff,
2545  const TNSBitVector & group_ids,
2546  bool has_groups,
2547  ECommandGroup cmd_group,
2548  const string & scope)
2549 {
2550  bool explicit_aff = !aff_ids.empty();
2551  bool effective_use_pref_affinity = use_pref_affinity;
2552  TNSBitVector pref_aff_candidate_jobs;
2553  TNSBitVector exclusive_aff_candidate_jobs;
2554 
2555  // Jobs per client support: CXX-11138 (only for GET)
2556  map<string, size_t> running_jobs_per_client;
2557  if (m_MaxJobsPerClient > 0 && cmd_group == eGet) {
2558  running_jobs_per_client = x_GetRunningJobsPerClientIP();
2559  }
2560 
2562  client, cmd_group);
2563  if (use_pref_affinity)
2564  effective_use_pref_affinity = use_pref_affinity && pref_aff.any();
2565 
2566  if (explicit_aff || effective_use_pref_affinity || exclusive_new_affinity) {
2567  // Check all vacant jobs: pending jobs for eGet,
2568  // done/failed/cancel jobs for eRead
2569  TNSBitVector vacant_jobs;
2570  if (cmd_group == eGet)
2572  else
2573  m_StatusTracker.GetJobs(m_StatesForRead, vacant_jobs);
2574 
2575  if (scope.empty() || scope == kNoScopeOnly) {
2576  // Both these cases should consider only the non-scope jobs
2577  vacant_jobs -= m_ScopeRegistry.GetAllJobsInScopes();
2578  } else {
2579  // Consider only the jobs in the particular scope
2580  vacant_jobs &= m_ScopeRegistry.GetJobs(scope);
2581  }
2582 
2583  // Exclude blacklisted jobs
2585  vacant_jobs);
2586  // Keep only the group jobs if the groups are provided
2587  if (has_groups)
2588  m_GroupRegistry.RestrictByGroup(group_ids, vacant_jobs);
2589 
2590  // Exclude jobs which have been read or in a process of reading
2591  if (cmd_group == eRead)
2592  vacant_jobs -= m_ReadJobs;
2593 
2594  if (prioritized_aff) {
2595  // The criteria here is a list of explicit affinities
2596  // (respecting their order) which may be followed by any affinity
2597  for (vector<unsigned int>::const_iterator k = aff_ids.begin();
2598  k != aff_ids.end(); ++k) {
2599  TNSBitVector aff_jobs = m_AffinityRegistry.
2600  GetJobsWithAffinity(*k);
2601  TNSBitVector candidates = vacant_jobs & aff_jobs;
2602  if (candidates.any()) {
2603  // Need to check the running jobs per client ip
2604  TNSBitVector::enumerator en(candidates.first());
2605  for (; en.valid(); ++en) {
2606  auto job_id = *en;
2608  job_id, running_jobs_per_client)) {
2609  return x_SJobPick(job_id, false, *k);
2610  }
2611  }
2612  }
2613  }
2614  if (any_affinity) {
2615  if (vacant_jobs.any()) {
2616  // Need to check the running jobs per client ip
2617  TNSBitVector::enumerator en(vacant_jobs.first());
2618  for (; en.valid(); ++en) {
2619  auto job_id = *en;
2621  job_id, running_jobs_per_client)) {
2622  return x_SJobPick(job_id, false,
2623  m_GCRegistry.GetAffinityID(job_id));
2624  }
2625  }
2626  }
2627  }
2628  return x_SJobPick();
2629  }
2630 
2631  // HERE: no prioritized affinities
2632  TNSBitVector all_pref_aff;
2633  if (exclusive_new_affinity)
2635  cmd_group);
2636 
2637  TNSBitVector::enumerator en(vacant_jobs.first());
2638  for (; en.valid(); ++en) {
2639  unsigned int job_id = *en;
2640 
2641  unsigned int aff_id = m_GCRegistry.GetAffinityID(job_id);
2642  if (aff_id != 0 && explicit_aff) {
2643  if (explicit_affs.get_bit(aff_id)) {
2645  job_id, running_jobs_per_client)) {
2646  return x_SJobPick(job_id, false, aff_id);
2647  }
2648  }
2649  }
2650 
2651  if (aff_id != 0 && effective_use_pref_affinity) {
2652  if (pref_aff.get_bit(aff_id)) {
2653  if (explicit_aff == false) {
2655  job_id, running_jobs_per_client)) {
2656  return x_SJobPick(job_id, false, aff_id);
2657  }
2658  }
2659 
2660  pref_aff_candidate_jobs.set_bit(job_id);
2661  continue;
2662  }
2663  }
2664 
2665  if (exclusive_new_affinity) {
2666  if (aff_id == 0 || all_pref_aff.get_bit(aff_id) == false) {
2667  if (explicit_aff == false &&
2668  effective_use_pref_affinity == false) {
2670  job_id, running_jobs_per_client)) {
2671  return x_SJobPick(job_id, true, aff_id);
2672  }
2673  }
2674 
2675  exclusive_aff_candidate_jobs.set_bit(job_id);
2676  }
2677  }
2678  } // end for
2679 
2680  TNSBitVector::enumerator en1(pref_aff_candidate_jobs.first());
2681  for (; en1.valid(); ++en1) {
2682  if (x_ValidateMaxJobsPerClientIP(*en1, running_jobs_per_client)) {
2683  return x_SJobPick(*en1, false, 0);
2684  }
2685  }
2686 
2687  TNSBitVector::enumerator en2(exclusive_aff_candidate_jobs.first());
2688  for (; en2.valid(); ++en2) {
2689  if (x_ValidateMaxJobsPerClientIP(*en2, running_jobs_per_client)) {
2690  return x_SJobPick(*en2, true, m_GCRegistry.GetAffinityID(*en2));
2691  }
2692  }
2693  }
2694 
2695  // The second condition looks strange and it covers a very specific
2696  // scenario: some (older) worker nodes may originally come with the only
2697  // flag set - use preferred affinities - while they have nothing in the
2698  // list of preferred affinities yet. In this case a first pending job
2699  // should be provided.
2700  if (any_affinity ||
2701  (!explicit_aff &&
2702  use_pref_affinity && !effective_use_pref_affinity &&
2703  !exclusive_new_affinity &&
2704  cmd_group == eGet)) {
2705 
2706  TNSBitVector jobs_in_scope;
2707  TNSBitVector restricted_jobs;
2708  bool no_scope_only = scope.empty() ||
2709  scope == kNoScopeOnly;
2710  unsigned int job_id = 0;
2711 
2712  if (no_scope_only)
2713  jobs_in_scope = m_ScopeRegistry.GetAllJobsInScopes();
2714  else {
2715  restricted_jobs = m_ScopeRegistry.GetJobs(scope);
2716  if (has_groups)
2717  m_GroupRegistry.RestrictByGroup(group_ids, restricted_jobs);
2718  }
2719 
2720  if (cmd_group == eGet) {
2721  // NOTE: this only to avoid an expensive temporary bvector
2723  jobs_in_scope);
2724 
2725  TNSBitVector pending_jobs;
2727  TNSBitVector::enumerator en = pending_jobs.first();
2728 
2729  if (no_scope_only) {
2730  // only the jobs which are not in the scope
2731  if (has_groups) {
2732  TNSBitVector group_jobs = m_GroupRegistry.GetJobs(group_ids);
2733  for (; en.valid(); ++en) {
2734  unsigned int candidate_job_id = *en;
2735  if (jobs_in_scope.get_bit(candidate_job_id))
2736  continue;
2737  if (!group_jobs.get_bit(candidate_job_id))
2738  continue;
2739  if (x_ValidateMaxJobsPerClientIP(candidate_job_id,
2740  running_jobs_per_client)) {
2741  job_id = candidate_job_id;
2742  break;
2743  }
2744  }
2745  } else {
2746  for (; en.valid(); ++en) {
2747  unsigned int candidate_job_id = *en;
2748  if (jobs_in_scope.get_bit(candidate_job_id))
2749  continue;
2750  if (x_ValidateMaxJobsPerClientIP(candidate_job_id,
2751  running_jobs_per_client)) {
2752  job_id = candidate_job_id;
2753  break;
2754  }
2755  }
2756  }
2757  } else {
2758  // only the specific scope jobs
2759  for (; en.valid(); ++en) {
2760  unsigned int candidate_job_id = *en;
2761  if (jobs_in_scope.get_bit(candidate_job_id))
2762  continue;
2763  if (!restricted_jobs.get_bit(candidate_job_id))
2764  continue;
2765  if (x_ValidateMaxJobsPerClientIP(candidate_job_id,
2766  running_jobs_per_client)) {
2767  job_id = candidate_job_id;
2768  break;
2769  }
2770  }
2771  }
2772  } else {
2773  if (no_scope_only) {
2774  // only the jobs which are not in the scope
2775 
2776  // NOTE: this only to avoid an expensive temporary bvector
2777  jobs_in_scope |= m_ReadJobs;
2779  jobs_in_scope);
2780  if (has_groups)
2783  jobs_in_scope,
2784  m_GroupRegistry.GetJobs(group_ids),
2785  has_groups);
2786  else
2789  jobs_in_scope,
2791  false);
2792  } else {
2793  // only the specific scope jobs
2794 
2795  // NOTE: this only to avoid an expensive temporary bvector
2796  jobs_in_scope = m_ReadJobs;
2798  jobs_in_scope);
2801  jobs_in_scope,
2802  restricted_jobs, true);
2803  }
2804  }
2805  return x_SJobPick(job_id, false, 0);
2806  }
2807 
2808  return x_SJobPick();
2809 }
2810 
2811 // Provides a map between the client IP and the number of running jobs
2813 {
2814  map<string, size_t> ret;
2815  TNSBitVector running_jobs;
2816 
2818  TNSBitVector::enumerator en(running_jobs.first());
2819  for (; en.valid(); ++en) {
2820  auto job_iter = m_Jobs.find(*en);
2821  if (job_iter != m_Jobs.end()) {
2822  string client_ip = job_iter->second.GetClientIP();
2823  auto iter = ret.find(client_ip);
2824  if (iter == ret.end()) {
2825  ret[client_ip] = 1;
2826  } else {
2827  iter->second += 1;
2828  }
2829  }
2830  }
2831  return ret;
2832 }
2833 
2834 
2835 bool
2837  unsigned int job_id,
2838  const map<string, size_t> & jobs_per_client_ip) const
2839 {
2840  if (jobs_per_client_ip.empty())
2841  return true;
2842 
2843  auto job_iter = m_Jobs.find(job_id);
2844  if (job_iter == m_Jobs.end())
2845  return true;
2846 
2847  string client_ip = job_iter->second.GetClientIP();
2848  auto iter = jobs_per_client_ip.find(client_ip);
2849  if (iter == jobs_per_client_ip.end())
2850  return true;
2851  return iter->second < m_MaxJobsPerClient;
2852 }
2853 
2854 
2857  unsigned int picked_earlier,
2858  const TNSBitVector & group_ids)
2859 {
2861  return x_SJobPick(); // Not configured
2862 
2863  string scope = client.GetScope();
2864  string virtual_scope = client.GetVirtualScope();
2865 
2866  if (!virtual_scope.empty()) {
2867  // Try first this scope: see CXX-5324
2868  x_SJobPick job_pick = x_FindOutdatedPendingJob(client, picked_earlier,
2869  group_ids,
2870  virtual_scope);
2871  if (job_pick.job_id != 0)
2872  return job_pick;
2873 
2874  // Fallback to a regular outdated pick
2875  }
2876 
2877  return x_FindOutdatedPendingJob(client, picked_earlier,
2878  group_ids, scope);
2879 }
2880 
2881 
2884  unsigned int picked_earlier,
2885  const TNSBitVector & group_ids,
2886  const string & scope)
2887 {
2888  TNSBitVector outdated_pending =
2891  m_GCRegistry);
2892  if (picked_earlier != 0)
2893  outdated_pending.set_bit(picked_earlier, false);
2894 
2896 
2897  if (scope.empty() || scope == kNoScopeOnly)
2898  outdated_pending -= m_ScopeRegistry.GetAllJobsInScopes();
2899  else
2900  outdated_pending &= m_ScopeRegistry.GetJobs(scope);
2901 
2902  if (group_ids.any())
2903  m_GroupRegistry.RestrictByGroup(group_ids, outdated_pending);
2904 
2905  if (!outdated_pending.any())
2906  return x_SJobPick();
2907 
2908 
2909  x_SJobPick job_pick;
2910  job_pick.job_id = *outdated_pending.first();
2911  job_pick.aff_id = m_GCRegistry.GetAffinityID(job_pick.job_id);
2912  job_pick.exclusive = job_pick.aff_id != 0;
2913  return job_pick;
2914 }
2915 
2916 
2919  unsigned int picked_earlier,
2920  const TNSBitVector & group_ids)
2921 {
2923  return x_SJobPick(); // Not configured
2924 
2925  string scope = client.GetScope();
2926  string virtual_scope = client.GetVirtualScope();
2927 
2928  if (!virtual_scope.empty()) {
2929  // Try first this scope: see CXX-5324
2931  picked_earlier,
2932  group_ids,
2933  virtual_scope);
2934  if (job_pick.job_id != 0)
2935  return job_pick;
2936 
2937  // Fallback to a regular outdated pick
2938  }
2939 
2940  return x_FindOutdatedJobForReading(client, picked_earlier,
2941  group_ids, scope);
2942 }
2943 
2944 
2947  unsigned int picked_earlier,
2948  const TNSBitVector & group_ids,
2949  const string & scope)
2950 {
2951  TNSBitVector outdated_read_jobs =
2955  if (picked_earlier != 0)
2956  outdated_read_jobs.set_bit(picked_earlier, false);
2957 
2959  outdated_read_jobs);
2960 
2961  if (scope.empty() || scope == kNoScopeOnly)
2962  outdated_read_jobs -= m_ScopeRegistry.GetAllJobsInScopes();
2963  else
2964  outdated_read_jobs &= m_ScopeRegistry.GetJobs(scope);
2965 
2966  if (group_ids.any())
2967  m_GroupRegistry.RestrictByGroup(group_ids, outdated_read_jobs);
2968 
2969  if (!outdated_read_jobs.any())
2970  return x_SJobPick();
2971 
2972  unsigned int job_id = *outdated_read_jobs.first();
2973  unsigned int aff_id = m_GCRegistry.GetAffinityID(job_id);
2974  return x_SJobPick(job_id, aff_id != 0, aff_id);
2975 }
2976 
2977 
2979  unsigned int job_id,
2980  const string & job_key,
2981  CJob & job,
2982  const string & auth_token,
2983  const string & err_msg,
2984  const string & output,
2985  int ret_code,
2986  bool no_retries,
2987  string warning)
2988 {
2989  unsigned failed_retries;
2990  unsigned max_output_size;
2991  {{
2992  CQueueParamAccessor qp(*this);
2993  failed_retries = qp.GetFailedRetries();
2994  max_output_size = qp.GetMaxOutputSize();
2995  }}
2996 
2997  if (output.size() > max_output_size) {
2998  NCBI_THROW(CNetScheduleException, eDataTooLong,
2999  "Output is too long");
3000  }
3001 
3003  bool rescheduled = false;
3004  TJobStatus old_status;
3005 
3007  TJobStatus new_status = CNetScheduleAPI::eFailed;
3008 
3009  old_status = GetJobStatus(job_id);
3010  if (old_status == CNetScheduleAPI::eFailed) {
3013  return old_status;
3014  }
3015 
3016  if (old_status != CNetScheduleAPI::eRunning) {
3017  // No job state change
3018  return old_status;
3019  }
3020 
3021  auto job_iter = m_Jobs.find(job_id);
3022  if (job_iter == m_Jobs.end())
3023  NCBI_THROW(CNetScheduleException, eInternalError,
3024  "Error fetching job");
3025 
3026  if (!auth_token.empty()) {
3027  // Need to check authorization token first
3028  CJob::EAuthTokenCompareResult token_compare_result =
3029  job_iter->second.CompareAuthToken(auth_token);
3030  if (token_compare_result == CJob::eInvalidTokenFormat)
3031  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
3032  "Invalid authorization token format");
3033  if (token_compare_result == CJob::eNoMatch)
3034  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
3035  "Authorization token does not match");
3036  if (token_compare_result == CJob::ePassportOnlyMatch) {
3037  // That means the job has been given to another worker node
3038  // by whatever reason (expired/failed/returned before)
3039  ERR_POST(Warning << "Received FPUT2 with only "
3040  "passport matched.");
3041  warning = "eJobPassportOnlyMatch:Only job passport "
3042  "matched. Command is ignored.";
3043  job = job_iter->second;
3044  return old_status;
3045  }
3046  // Here: the authorization token is OK, we can continue
3047  }
3048 
3049  CJobEvent * event = job_iter->second.GetLastEvent();
3050  if (!event)
3051  ERR_POST("No JobEvent for running job");
3052 
3053  event = &job_iter->second.AppendEvent();
3054  if (no_retries)
3055  event->SetEvent(CJobEvent::eFinalFail);
3056  else
3057  event->SetEvent(CJobEvent::eFail);
3058  event->SetStatus(CNetScheduleAPI::eFailed);
3059  event->SetTimestamp(curr);
3060  event->SetErrorMsg(err_msg);
3061  event->SetRetCode(ret_code);
3062  event->SetNodeAddr(client.GetAddress());
3063  event->SetClientNode(client.GetNode());
3064  event->SetClientSession(client.GetSession());
3065 
3066  if (no_retries) {
3067  job_iter->second.SetStatus(CNetScheduleAPI::eFailed);
3068  event->SetStatus(CNetScheduleAPI::eFailed);
3069  rescheduled = false;
3070  if (m_Log)
3071  ERR_POST(Warning << "Job failed "
3072  "unconditionally, no_retries = 1");
3073  } else {
3074  unsigned run_count = job_iter->second.GetRunCount();
3075  if (run_count <= failed_retries) {
3076  job_iter->second.SetStatus(CNetScheduleAPI::ePending);
3077  event->SetStatus(CNetScheduleAPI::ePending);
3078 
3079  new_status = CNetScheduleAPI::ePending;
3080 
3081  rescheduled = true;
3082  } else {
3083  job_iter->second.SetStatus(CNetScheduleAPI::eFailed);
3084  event->SetStatus(CNetScheduleAPI::eFailed);
3085  new_status = CNetScheduleAPI::eFailed;
3086  rescheduled = false;
3087  if (m_Log)
3088  ERR_POST(Warning << "Job failed, exceeded "
3089  "max number of retries ("
3090  << failed_retries << ")");
3091  }
3092  }
3093 
3094  job_iter->second.SetOutput(output);
3095  job_iter->second.SetLastTouch(curr);
3096 
3097  m_StatusTracker.SetStatus(job_id, new_status);
3098  if (new_status == CNetScheduleAPI::ePending)
3100  new_status,
3102  else
3104  new_status,
3106  g_DoPerfLogging(*this, job_iter->second, 200);
3107 
3108  TimeLineRemove(job_id);
3109 
3110  // Replace it with ClearExecuting(client, job_id) when all clients
3111  // provide their credentials and job passport is checked strictly
3114 
3116  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
3117  m_ReadTimeout,
3118  m_PendingTimeout, curr));
3119 
3120  if (rescheduled && m_PauseStatus == eNoPause)
3122  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
3125 
3126  if (new_status == CNetScheduleAPI::eFailed)
3127  if (!m_ReadJobs.get_bit(job_id)) {
3128  m_GCRegistry.UpdateReadVacantTime(job_id, curr);
3130  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
3133  }
3134 
3135  x_NotifyJobChanges(job_iter->second, job_key, eStatusChanged, curr);
3136 
3137  job = job_iter->second;
3138  return old_status;
3139 }
3140 
3141 
3142 string CQueue::GetAffinityTokenByID(unsigned int aff_id) const
3143 {
3144  return m_AffinityRegistry.GetTokenByID(aff_id);
3145 }
3146 
3147 
3149  bool & client_was_found,
3150  string & old_session,
3151  bool & had_wn_pref_affs,
3152  bool & had_reader_pref_affs)
3153 {
3154  // Get the running and reading jobs and move them to the corresponding
3155  // states (pending and done)
3156 
3157  TNSBitVector running_jobs;
3158  TNSBitVector reading_jobs;
3159 
3160  {{
3162  m_ClientsRegistry.ClearClient(client, running_jobs, reading_jobs,
3163  client_was_found, old_session,
3164  had_wn_pref_affs, had_reader_pref_affs);
3165 
3166  }}
3167 
3168  if (running_jobs.any())
3169  x_ResetRunningDueToClear(client, running_jobs);
3170  if (reading_jobs.any())
3171  x_ResetReadingDueToClear(client, reading_jobs);
3172  return;
3173 }
3174 
3175 
3176 // Triggered from a notification thread only
3178 {
3180  // Pending outdated timeout is configured, so check outdated jobs
3182  TNSBitVector outdated_jobs =
3185  m_GCRegistry);
3186  if (outdated_jobs.any())
3187  m_NotificationsList.CheckOutdatedJobs(outdated_jobs,
3190  eGet);
3191  }
3192 
3194  // Read pending timeout is configured, so check read outdated jobs
3196  TNSBitVector outdated_jobs =
3200  if (outdated_jobs.any())
3201  m_NotificationsList.CheckOutdatedJobs(outdated_jobs,
3204  eRead);
3205  }
3206 
3207 
3208  // Check the configured notification interval
3209  static CNSPreciseTime last_notif_timeout = kTimeNever;
3210  static size_t skip_limit = 0;
3211  static size_t skip_count;
3212 
3213  if (m_NotifHifreqInterval != last_notif_timeout) {
3214  last_notif_timeout = m_NotifHifreqInterval;
3215  skip_count = 0;
3216  skip_limit = size_t(m_NotifHifreqInterval/0.1);
3217  }
3218 
3219  ++skip_count;
3220  if (skip_count < skip_limit)
3221  return;
3222 
3223  skip_count = 0;
3224 
3225  // The NotifyPeriodically() and CheckTimeout() calls may need to modify
3226  // the clients and affinity registry so it is safer to take the queue lock.
3232  else
3234 }
3235 
3236 
3238 {
3240 }
3241 
3242 
3244 {
3246  return m_ClientsRegistry.PrintClientsList(this,
3248 }
3249 
3250 
3252 {
3256 }
3257 
3258 
3260  bool verbose) const
3261 {
3262  TNSBitVector scope_jobs;
3263  string scope = client.GetScope();
3265 
3266  if (scope == kNoScopeOnly)
3267  scope_jobs = m_ScopeRegistry.GetAllJobsInScopes();
3268  else if (!scope.empty())
3269  scope_jobs = m_ScopeRegistry.GetJobs(scope);
3270 
3272  scope_jobs, scope,
3274 }
3275 
3276 
3278  bool verbose) const
3279 {
3280  TNSBitVector scope_jobs;
3281  string scope = client.GetScope();
3283 
3284  if (scope == kNoScopeOnly)
3285  scope_jobs = m_ScopeRegistry.GetAllJobsInScopes();
3286  else if (!scope.empty())
3287  scope_jobs = m_ScopeRegistry.GetJobs(scope);
3288 
3289  return m_GroupRegistry.Print(this, scope_jobs, scope,
3291 }
3292 
3293 
3295 {
3297  return m_ScopeRegistry.Print(this, 100, verbose);
3298 }
3299 
3300 
3302 {
3303  if (!m_RunTimeLine)
3304  return;
3305 
3306  CNSPreciseTime queue_run_timeout = GetRunTimeout();
3307  CNSPreciseTime queue_read_timeout = GetReadTimeout();
3309  TNSBitVector bv;
3310  {{
3312  m_RunTimeLine->ExtractObjects(curr.Sec(), &bv);
3313  }}
3314 
3316  for ( ;en.valid(); ++en) {
3317  x_CheckExecutionTimeout(queue_run_timeout, queue_read_timeout,
3318  *en, curr, logging);
3319  }
3320 }
3321 
3322 
3323 void CQueue::x_CheckExecutionTimeout(const CNSPreciseTime & queue_run_timeout,
3324  const CNSPreciseTime & queue_read_timeout,
3325  unsigned int job_id,
3326  const CNSPreciseTime & curr_time,
3327  bool logging)
3328 {
3329  CNSPreciseTime time_start = kTimeZero;
3330  CNSPreciseTime run_timeout = kTimeZero;
3331  CNSPreciseTime read_timeout = kTimeZero;
3332  CNSPreciseTime exp_time = kTimeZero;
3333  TJobStatus status;
3334  TJobStatus new_status;
3335  CJobEvent::EJobEvent event_type;
3337 
3338  {{
3340 
3341  status = GetJobStatus(job_id);
3342  if (status == CNetScheduleAPI::eRunning) {
3343  new_status = CNetScheduleAPI::ePending;
3344  event_type = CJobEvent::eTimeout;
3345  } else if (status == CNetScheduleAPI::eReading) {
3346  new_status = CNetScheduleAPI::eDone;
3347  event_type = CJobEvent::eReadTimeout;
3348  } else
3349  return; // Execution timeout is for Running and Reading jobs only
3350 
3351  job_iter = m_Jobs.find(job_id);
3352  if (job_iter == m_Jobs.end())
3353  return;
3354 
3355  CJobEvent * event = job_iter->second.GetLastEvent();
3356  time_start = event->GetTimestamp();
3357  run_timeout = job_iter->second.GetRunTimeout();
3358  if (run_timeout == kTimeZero)
3359  run_timeout = queue_run_timeout;
3360 
3361  if (status == CNetScheduleAPI::eRunning &&
3362  run_timeout == kTimeZero)
3363  // 0 timeout means the job never fails
3364  return;
3365 
3366  read_timeout = job_iter->second.GetReadTimeout();
3367  if (read_timeout == kTimeZero)
3368  read_timeout = queue_read_timeout;
3369 
3370  if (status == CNetScheduleAPI::eReading &&
3371  read_timeout == kTimeZero)
3372  // 0 timeout means the job never fails
3373  return;
3374 
3375  // Calculate the expiration time
3376  if (status == CNetScheduleAPI::eRunning)
3377  exp_time = time_start + run_timeout;
3378  else
3379  exp_time = time_start + read_timeout;
3380 
3381  if (curr_time < exp_time) {
3382  // we need to register job in time line
3383  TimeLineAdd(job_id, exp_time);
3384  return;
3385  }
3386 
3387  // The job timeout (running or reading) is expired.
3388  // Check the try counter, we may need to fail the job.
3389  if (status == CNetScheduleAPI::eRunning) {
3390  // Running state
3391  if (job_iter->second.GetRunCount() > m_FailedRetries)
3392  new_status = CNetScheduleAPI::eFailed;
3393  } else {
3394  // Reading state
3395  if (job_iter->second.GetReadCount() > m_ReadFailedRetries)
3396  new_status = CNetScheduleAPI::eReadFailed;
3397  else
3398  new_status = job_iter->second.GetStatusBeforeReading();
3399  m_ReadJobs.set_bit(job_id, false);
3400  ++m_ReadJobsOps;
3401  }
3402 
3403  job_iter->second.SetStatus(new_status);
3404  job_iter->second.SetLastTouch(curr_time);
3405 
3406  event = &job_iter->second.AppendEvent();
3407  event->SetStatus(new_status);
3408  event->SetEvent(event_type);
3409  event->SetTimestamp(curr_time);
3410 
3411  m_StatusTracker.SetStatus(job_id, new_status);
3413  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
3414  m_ReadTimeout,
3416  curr_time));
3417 
3418  if (status == CNetScheduleAPI::eRunning) {
3419  if (new_status == CNetScheduleAPI::ePending) {
3420  // Timeout and reschedule, put to blacklist as well
3424  new_status,
3426  } else {
3432  }
3433  } else {
3434  if (new_status == CNetScheduleAPI::eReadFailed) {
3438  new_status,
3440  } else {
3441  // The target status could be Done, Failed, Canceled.
3442  // The job could be read again by another reader.
3446  new_status,
3448  }
3449  }
3450  g_DoPerfLogging(*this, job_iter->second, 200);
3451 
3452  if (new_status == CNetScheduleAPI::ePending &&
3455  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
3458 
3459  if (new_status == CNetScheduleAPI::eDone ||
3460  new_status == CNetScheduleAPI::eFailed ||
3461  new_status == CNetScheduleAPI::eCanceled)
3462  if (!m_ReadJobs.get_bit(job_id)) {
3463  m_GCRegistry.UpdateReadVacantTime(job_id, curr_time);
3465  job_id, job_iter->second.GetAffinityId(), m_ClientsRegistry,
3468  }
3469  }}
3470 
3471  x_NotifyJobChanges(job_iter->second, MakeJobKey(job_id),
3472  eStatusChanged, curr_time);
3473 
3474  if (logging) {
3475  string purpose;
3476  if (status == CNetScheduleAPI::eRunning)
3477  purpose = "execution";
3478  else
3479  purpose = "reading";
3480 
3481  GetDiagContext().Extra()
3482  .Print("msg", "Timeout expired, rescheduled for " + purpose)
3483  .Print("msg_code", "410") // The code is for
3484  // searching in applog
3485  .Print("job_key", MakeJobKey(job_id))
3486  .Print("queue", m_QueueName)
3487  .Print("run_counter", job_iter->second.GetRunCount())
3488  .Print("read_counter", job_iter->second.GetReadCount())
3489  .Print("time_start", NS_FormatPreciseTime(time_start))
3490  .Print("exp_time", NS_FormatPreciseTime(exp_time))
3491  .Print("run_timeout", run_timeout)
3492  .Print("read_timeout", read_timeout);
3493  }
3494 }
3495 
3496 
3497 // Checks up to given # of jobs at the given status for expiration and
3498 // marks up to given # of jobs for deletion.
3499 // Returns the # of performed scans, the # of jobs marked for deletion and
3500 // the last scanned job id.
3504  unsigned int last_job,
3505  TJobStatus status)
3506 {
3507  TNSBitVector job_ids;
3509  unsigned int job_id;
3510  unsigned int aff;
3511  unsigned int group;
3512 
3513  result.job_id = attributes.job_id;
3514  result.deleted = 0;
3515  {{
3517 
3518  for (result.scans = 0;
3519  result.scans < attributes.scans; ++result.scans) {
3520  job_id = m_StatusTracker.GetNext(status, result.job_id);
3521  if (job_id == 0)
3522  break; // No more jobs in the state
3523  if (last_job != 0 && job_id >= last_job)
3524  break; // The job in the state is above the limit
3525 
3526  result.job_id = job_id;
3527 
3528  if (m_GCRegistry.DeleteIfTimedOut(job_id, current_time,
3529  &aff, &group))
3530  {
3531  // The job is expired and needs to be marked for deletion
3532  m_StatusTracker.Erase(job_id);
3533  job_ids.set_bit(job_id);
3534  ++result.deleted;
3535 
3536  // check if the affinity should also be updated
3537  if (aff != 0)
3539 
3540  // Check if the group registry should also be updated
3541  if (group != 0)
3542  m_GroupRegistry.RemoveJob(group, job_id);
3543 
3544  // Remove the job from the scope registry if so
3545  m_ScopeRegistry.RemoveJob(job_id);
3546 
3547  if (result.deleted >= attributes.deleted)
3548  break;
3549  }
3550  }
3551  }}
3552 
3553  if (result.deleted > 0) {
3554  TNSBitVector::enumerator en(job_ids.first());
3556 
3557  for (; en.valid(); ++en) {
3558  unsigned int id = *en;
3559  auto job_iter = m_Jobs.find(id);
3560 
3561  // if the job is deleted from the pending state then a performance
3562  // record should be produced. A job full information is also
3563  // required if the listener expects job notifications.
3564  if (job_iter != m_Jobs.end()) {
3565  x_NotifyJobChanges(job_iter->second, MakeJobKey(id),
3566  eJobDeleted, current_time);
3567  if (status == CNetScheduleAPI::ePending) {
3568  g_DoErasePerfLogging(*this, job_iter->second);
3569  }
3570  }
3571  }
3572 
3573  if (!m_StatusTracker.AnyPending())
3575  }
3576 
3577  x_Erase(job_ids, status);
3578  return result;
3579 }
3580 
3581 
3582 void CQueue::TimeLineMove(unsigned int job_id,
3583  const CNSPreciseTime & old_time,
3584  const CNSPreciseTime & new_time)
3585 {
3586  if (!job_id || !m_RunTimeLine)
3587  return;
3588 
3590  m_RunTimeLine->MoveObject(old_time.Sec(), new_time.Sec(), job_id);
3591 }
3592 
3593 
3594 void CQueue::TimeLineAdd(unsigned int job_id,
3595  const CNSPreciseTime & job_time)
3596 {
3597  if (!job_id || !m_RunTimeLine || !job_time)
3598  return;
3599 
3601  m_RunTimeLine->AddObject(job_time.Sec(), job_id);
3602 }
3603 
3604 
3605 void CQueue::TimeLineRemove(unsigned int job_id)
3606 {
3607  if (!m_RunTimeLine)
3608  return;
3609 
3611  m_RunTimeLine->RemoveObject(job_id);
3612 }
3613 
3614 
3615 void CQueue::TimeLineExchange(unsigned int remove_job_id,
3616  unsigned int add_job_id,
3617  const CNSPreciseTime & new_time)
3618 {
3619  if (!m_RunTimeLine)
3620  return;
3621 
3623  if (remove_job_id)
3624  m_RunTimeLine->RemoveObject(remove_job_id);
3625  if (add_job_id)
3626  m_RunTimeLine->AddObject(new_time.Sec(), add_job_id);
3627 }
3628 
3629 
3630 unsigned int CQueue::DeleteBatch(unsigned int max_deleted)
3631 {
3632  // Copy the vector with deleted jobs
3633  TNSBitVector jobs_to_delete;
3634 
3635  {{
3637  jobs_to_delete = m_JobsToDelete;
3638  }}
3639 
3640  static const size_t chunk_size = 100;
3641  unsigned int del_rec = 0;
3642  TNSBitVector::enumerator en = jobs_to_delete.first();
3643  TNSBitVector deleted_jobs;
3644 
3645  while (en.valid() && del_rec < max_deleted) {
3646  {{
3648 
3649  for (size_t n = 0;
3650  en.valid() && n < chunk_size && del_rec < max_deleted;
3651  ++en, ++n) {
3652  unsigned int job_id = *en;
3653  size_t del_count = m_Jobs.erase(job_id);
3654 
3655  if (del_count > 0) {
3656  ++del_rec;
3657  deleted_jobs.set_bit(job_id);
3658  }
3659 
3660  // The job might be the one which was given for reading
3661  // so the garbage should be collected
3662  m_ReadJobs.set_bit(job_id, false);
3663  ++m_ReadJobsOps;
3664  }
3665  }}
3666  }
3667 
3668  if (del_rec > 0) {
3670 
3671  {{
3672  TNSBitVector::enumerator en = deleted_jobs.first();
3674  for (; en.valid(); ++en) {
3675  m_JobsToDelete.set_bit(*en, false);
3677  }
3678 
3679  if (m_JobsToDeleteOps >= 1000000) {
3680  m_JobsToDeleteOps = 0;
3682  }
3683  }}
3684 
3686  if (m_ReadJobsOps >= 1000000) {
3687  m_ReadJobsOps = 0;
3689  }
3690  }
3691  return del_rec;
3692 }
3693 
3694 
3695 // See CXX-2838 for the description of how affinities garbage collection is
3696 // going to work.
3697 unsigned int CQueue::PurgeAffinities(void)
3698 {
3699  unsigned int aff_dict_size = m_AffinityRegistry.size();
3700  SNSRegistryParameters aff_reg_settings =
3702 
3703  if (aff_dict_size < (aff_reg_settings.low_mark_percentage / 100.0) *
3704  aff_reg_settings.max_records)
3705  // Did not reach the dictionary low mark
3706  return 0;
3707 
3708  unsigned int del_limit = aff_reg_settings.high_removal;
3709  if (aff_dict_size <
3710  (aff_reg_settings.high_mark_percentage / 100.0) *
3711  aff_reg_settings.max_records) {
3712  // Here: check the percentage of the affinities that have no references
3713  // to them
3714  unsigned int candidates_size =
3716 
3717  if (candidates_size <
3718  (aff_reg_settings.dirt_percentage / 100.0) *
3719  aff_reg_settings.max_records)
3720  // The number of candidates to be deleted is low
3721  return 0;
3722 
3723  del_limit = aff_reg_settings.low_removal;
3724  }
3725 
3726 
3727  // Here: need to delete affinities from the memory
3728  return m_AffinityRegistry.CollectGarbage(del_limit);
3729 }
3730 
3731 
3732 // See CQueue::PurgeAffinities - this one works similar
3733 unsigned int CQueue::PurgeGroups(void)
3734 {
3735  unsigned int group_dict_size = m_GroupRegistry.size();
3736  SNSRegistryParameters group_reg_settings =
3738 
3739  if (group_dict_size < (group_reg_settings.low_mark_percentage / 100.0) *
3740  group_reg_settings.max_records)
3741  // Did not reach the dictionary low mark
3742  return 0;
3743 
3744  unsigned int del_limit = group_reg_settings.high_removal;
3745  if (group_dict_size <
3746  (group_reg_settings.high_mark_percentage / 100.0) *
3747  group_reg_settings.max_records) {
3748  // Here: check the percentage of the groups that have no references
3749  // to them
3750  unsigned int candidates_size =
3752 
3753  if (candidates_size <
3754  (group_reg_settings.dirt_percentage / 100.0) *
3755  group_reg_settings.max_records)
3756  // The number of candidates to be deleted is low
3757  return 0;
3758 
3759  del_limit = group_reg_settings.low_removal;
3760  }
3761 
3762  // Here: need to delete groups from the memory
3763  return m_GroupRegistry.CollectGarbage(del_limit);
3764 }
3765 
3766 
3767 void CQueue::StaleNodes(const CNSPreciseTime & current_time)
3768 {
3769  // Clears the worker nodes affinities if the workers are inactive for
3770  // the configured timeout
3772  m_ClientsRegistry.StaleNodes(current_time,
3774 }
3775 
3776 
3778 {
3781 }
3782 
3783 
3785 {
3787  m_ClientsRegistry.Purge(current_time,
3798 }
3799 
3800 
3802  unsigned int job_id,
3803  TDumpFields dump_fields)
3804 {
3806 
3807  string job_dump;
3808 
3809  // Check first that the job has not been deleted yet
3810  {{
3812  if (m_JobsToDelete.get_bit(job_id))
3813  return job_dump;
3814  }}
3815 
3816  string scope = client.GetScope();
3817  {{
3819 
3820  // Check the scope restrictions
3821  if (scope == kNoScopeOnly) {
3822  if (m_ScopeRegistry.GetAllJobsInScopes()[job_id] == true)
3823  return job_dump;
3824  } else if (!scope.empty()) {
3825  if (m_ScopeRegistry.GetJobs(scope)[job_id] == false)
3826  return job_dump;
3827  }
3828 
3829  auto job_iter = m_Jobs.find(job_id);
3830  if (job_iter == m_Jobs.end())
3831  return job_dump;
3832 
3833  job_dump.reserve(2048);
3834  try {
3835  // GC can remove the job from its registry while the
3836  // DUMP is in process. If so the job should not be dumped
3837  // and the exception from m_GCRegistry.GetLifetime() should
3838  // be suppressed.
3839  job_dump = job_iter->second.Print(dump_fields,
3840  *this, m_AffinityRegistry,
3841  m_GroupRegistry);
3842  if (dump_fields & eGCEraseTime)
3843  job_dump.append("OK:GC erase time: ")
3845  .append(kNewLine);
3846  if (dump_fields & eScope)
3847  job_dump.append("OK:scope: '")
3848  .append(m_ScopeRegistry.GetJobScope(job_id))
3849  .append(1, '\'')
3850  .append(kNewLine);
3851  } catch (...) {}
3852 
3853  return job_dump;
3854  }}
3855 }
3856 
3857 
3859  const string & group,
3860  const string & aff_token,
3861  const vector<TJobStatus> & job_statuses,
3862  unsigned int start_after_job_id,
3863  unsigned int count,
3864  bool order_first,
3865  TDumpFields dump_fields,
3866  bool logging)
3867 {
3869 
3870  // Form a bit vector of all jobs to dump
3871  vector<TJobStatus> statuses;
3872  TNSBitVector jobs_to_dump;
3873 
3874  if (job_statuses.empty()) {
3875  // All statuses
3876  statuses.push_back(CNetScheduleAPI::ePending);
3877  statuses.push_back(CNetScheduleAPI::eRunning);
3878  statuses.push_back(CNetScheduleAPI::eCanceled);
3879  statuses.push_back(CNetScheduleAPI::eFailed);
3880  statuses.push_back(CNetScheduleAPI::eDone);
3881  statuses.push_back(CNetScheduleAPI::eReading);
3882  statuses.push_back(CNetScheduleAPI::eConfirmed);
3883  statuses.push_back(CNetScheduleAPI::eReadFailed);
3884  }
3885  else {
3886  // The user specified statuses explicitly
3887  // The list validity is checked by the caller.
3888  statuses = job_statuses;
3889  }
3890 
3891 
3892  {{
3893  string scope = client.GetScope();
3895  m_StatusTracker.GetJobs(statuses, jobs_to_dump);
3896 
3897  // Check if a certain group has been specified
3898  if (!group.empty()) {
3899  try {
3900  jobs_to_dump &= m_GroupRegistry.GetJobs(group);
3901  } catch (...) {
3902  jobs_to_dump.clear();
3903  if (logging)
3904  ERR_POST(Warning << "Job group '" + group +
3905  "' is not found. No jobs to dump.");
3906  }
3907  }
3908 
3909  if (!aff_token.empty()) {
3910  unsigned int aff_id = m_AffinityRegistry.GetIDByToken(aff_token);
3911  if (aff_id == 0) {
3912  jobs_to_dump.clear();
3913  if (logging)
3914  ERR_POST(Warning << "Affinity '" + aff_token +
3915  "' is not found. No jobs to dump.");
3916  } else
3917  jobs_to_dump &= m_AffinityRegistry.GetJobsWithAffinity(aff_id);
3918  }
3919 
3920  // Apply the scope limits
3921  if (scope == kNoScopeOnly) {
3922  jobs_to_dump -= m_ScopeRegistry.GetAllJobsInScopes();
3923  } else if (!scope.empty()) {
3924  // This is a specific scope
3925  jobs_to_dump &= m_ScopeRegistry.GetJobs(scope);
3926  }
3927  }}
3928 
3929  return x_DumpJobs(jobs_to_dump, start_after_job_id, count,
3930  dump_fields, order_first);
3931 }
3932 
3933 
3934 string CQueue::x_DumpJobs(const TNSBitVector & jobs_to_dump,
3935  unsigned int start_after_job_id,
3936  unsigned int count,
3937  TDumpFields dump_fields,
3938  bool order_first)
3939 {
3940  if (!jobs_to_dump.any())
3941  return kEmptyStr;
3942 
3943  // Skip the jobs which should not be dumped
3944  size_t skipped_jobs = 0;
3945  TNSBitVector::enumerator en(jobs_to_dump.first());
3946  while (en.valid() && *en <= start_after_job_id) {
3947  ++en;
3948  ++skipped_jobs;
3949  }
3950 
3951  if (count > 0 && !order_first) {
3952  size_t total_jobs = jobs_to_dump.count();
3953  size_t jobs_left = total_jobs - skipped_jobs;
3954  while (jobs_left > count) {
3955  ++en;
3956  --jobs_left;
3957  }
3958  }
3959 
3960  // Identify the required buffer size for jobs
3961  size_t buffer_size = m_DumpBufferSize;
3962  if (count != 0 && count < buffer_size)
3963  buffer_size = count;
3964 
3965  string result;
3966  result.reserve(2048*buffer_size);
3967 
3968  {{
3969  vector<CJob> buffer(buffer_size);
3970  size_t read_jobs = 0;
3971  size_t printed_count = 0;
3972 
3973  for ( ; en.valid(); ) {
3974  {{
3976 
3977  for ( ; en.valid() && read_jobs < buffer_size; ++en ) {
3978  auto job_iter = m_Jobs.find(*en);
3979  if (job_iter != m_Jobs.end()) {
3980  buffer[read_jobs] = job_iter->second;
3981  ++read_jobs;
3982  ++printed_count;
3983 
3984  if (count != 0)
3985  if (printed_count >= count)
3986  break;
3987  }
3988  }
3989  }}
3990 
3991  // Print what was read
3992  string one_job;
3993  one_job.reserve(2048);
3994  for (size_t index = 0; index < read_jobs; ++index) {
3995  one_job.clear();
3996  try {
3997  // GC can remove the job from its registry while the
3998  // DUMP is in process. If so the job should not be dumped
3999  // and the exception from m_GCRegistry.GetLifetime() should
4000  // be suppressed.
4001  unsigned int job_id = buffer[index].GetId();
4002  one_job.append(kNewLine)
4003  .append(buffer[index].Print(dump_fields,
4004  *this,
4006  m_GroupRegistry));
4007  if (dump_fields & eGCEraseTime)
4008  one_job.append("OK:GC erase time: ")
4009  .append(NS_FormatPreciseTime(
4010  m_GCRegistry.GetLifetime(job_id)))
4011  .append(kNewLine);
4012  if (dump_fields & eScope)
4013  one_job.append("OK:scope: '")
4014  .append(m_ScopeRegistry.GetJobScope(job_id))
4015  .append(1, '\'')
4016  .append(kNewLine);
4017  result.append(one_job);
4018  } catch (...) {}
4019  }
4020 
4021  if (count != 0)
4022  if (printed_count >= count)
4023  break;
4024 
4025  read_jobs = 0;
4026  }
4027  }}
4028 
4029  return result;
4030 }
4031 
4032 
4034 {
4035  return m_StatusTracker.CountStatus(st);
4036 }
4037 
4038 
4040  TNSBitVector::statistics* st) const
4041 {
4042  m_StatusTracker.StatusStatistics(status, st);
4043 }
4044 
4045 
4046 string CQueue::MakeJobKey(unsigned int job_id) const
4047 {
4048  if (m_ScrambleJobKeys)
4049  return m_KeyGenerator.GenerateCompoundID(job_id,
4051  return m_KeyGenerator.Generate(job_id);
4052 }
4053 
4054 
4056  bool & client_was_found,
4057  bool & session_was_reset,
4058  string & old_session,
4059  bool & had_wn_pref_affs,
4060  bool & had_reader_pref_affs)
4061 {
4062  TNSBitVector running_jobs;
4063  TNSBitVector reading_jobs;
4064 
4065  {{
4066  // The client registry may need to make changes in the notification
4067  // registry, i.e. two mutexes are to be locked. The other threads may
4068  // visit notifications first and then a client registry i.e. the very
4069  // same mutexes are locked in a reverse order.
4070  // To prevent it the operation lock is locked here.
4072  m_ClientsRegistry.Touch(client, running_jobs, reading_jobs,
4073  client_was_found, session_was_reset,
4074  old_session, had_wn_pref_affs,
4075  had_reader_pref_affs);
4076  guard.Release();
4077  }}
4078 
4079  if (session_was_reset) {
4080  if (running_jobs.any())
4081  x_ResetRunningDueToNewSession(client, running_jobs);
4082  if (reading_jobs.any())
4083  x_ResetReadingDueToNewSession(client, reading_jobs);
4084  }
4085 }
4086 
4087 
4089 {
4091 }
4092 
4093 
4095 {
4098 }
4099 
4100 
4102 {
4103  // Memorize the last client scope
4106 }
4107 
4108 
4109 // Moves the job to Pending/Failed or to Done/ReadFailed
4110 // when event event_type has come
4112  unsigned int job_id,
4113  const CNSPreciseTime & current_time,
4114  TJobStatus status_from,
4115  CJobEvent::EJobEvent event_type)
4116 {
4117  TJobStatus new_status;
4119  auto job_iter = m_Jobs.find(job_id);
4120 
4121  if (job_iter == m_Jobs.end()) {
4122  ERR_POST("Cannot fetch job to reset it due to " <<
4123  CJobEvent::EventToString(event_type) <<
4124  ". Job: " << DecorateJob(job_id));
4126  }
4127 
4128  if (status_from == CNetScheduleAPI::eRunning) {
4129  // The job was running
4130  if (job_iter->second.GetRunCount() > m_FailedRetries)
4131  new_status = CNetScheduleAPI::eFailed;
4132  else
4133  new_status = CNetScheduleAPI::ePending;
4134  } else {
4135  // The job was reading
4136  if (job_iter->second.GetReadCount() > m_ReadFailedRetries)
4137  new_status = CNetScheduleAPI::eReadFailed;
4138  else
4139  new_status = job_iter->second.GetStatusBeforeReading();
4140  m_ReadJobs.set_bit(job_id, false);
4141  ++m_ReadJobsOps;
4142  }
4143 
4144  job_iter->second.SetStatus(new_status);
4145  job_iter->second.SetLastTouch(current_time);
4146 
4147  CJobEvent * event = &job_iter->second.AppendEvent();
4148  event->SetStatus(new_status);
4149  event->SetEvent(event_type);
4150  event->SetTimestamp(current_time);
4151  event->SetClientNode(client.GetNode());
4152  event->SetClientSession(client.GetSession());
4153 
4154  // Update the memory map
4155  m_StatusTracker.SetStatus(job_id, new_status);
4156 
4157  // Count the transition and do a performance logging
4158  if (event_type == CJobEvent::eClear)
4159  m_StatisticsCounters.CountTransition(status_from, new_status,
4161  else
4162  // It is a new session case
4163  m_StatisticsCounters.CountTransition(status_from, new_status,
4165  g_DoPerfLogging(*this, job_iter->second, 200);
4166 
4168  job_id, job_iter->second.GetExpirationTime(m_Timeout, m_RunTimeout,
4169  m_ReadTimeout,
4171  current_time));
4172 
4173  // remove the job from the time line
4174  TimeLineRemove(job_id);
4175 
4176  // Notify those who wait for the jobs if needed
4177  if (new_status == CNetScheduleAPI::ePending &&
4179  m_NotificationsList.Notify(job_id, job_iter->second.GetAffinityId(),
4183  eGet);
4184  // Notify readers if they wait for jobs
4185  if (new_status == CNetScheduleAPI::eDone ||
4186  new_status == CNetScheduleAPI::eFailed ||
4187  new_status == CNetScheduleAPI::eCanceled)
4188  if (!m_ReadJobs.get_bit(job_id)) {
4189  m_GCRegistry.UpdateReadVacantTime(job_id, current_time);
4190  m_NotificationsList.Notify(job_id, job_iter->second.GetAffinityId(),
4194  eRead);
4195  }
4196 
4197  x_NotifyJobChanges(job_iter->second, MakeJobKey(job_id),
4198  eStatusChanged, current_time);
4199  return new_status;
4200 }
4201 
4202 
4204  const TNSBitVector & jobs)
4205 {
4206  CNSPreciseTime current_time = CNSPreciseTime::Current();
4207  for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) {
4208  try {
4209  x_ResetDueTo(client, *en, current_time,
4211  } catch (...) {
4212  ERR_POST("Error resetting a running job when worker node is "
4213  "cleared. Job: " << DecorateJob(*en));
4214  }
4215  }
4216 }
4217 
4218 
4220  const TNSBitVector & jobs)
4221 {
4222  CNSPreciseTime current_time = CNSPreciseTime::Current();
4223  for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) {
4224  try {
4225  x_ResetDueTo(client, *en, current_time,
4227  } catch (...) {
4228  ERR_POST("Error resetting a reading job when worker node is "
4229  "cleared. Job: " << DecorateJob(*en));
4230  }
4231  }
4232 }
4233 
4234 
4236  const TNSBitVector & jobs)
4237 {
4238  CNSPreciseTime current_time = CNSPreciseTime::Current();
4239  for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) {
4240  try {
4241  x_ResetDueTo(client, *en, current_time,
4243  } catch (...) {
4244  ERR_POST("Error resetting a running job when worker node "
4245  "changed session. Job: " << DecorateJob(*en));
4246  }
4247  }
4248 }
4249 
4250 
4252  const TNSBitVector & jobs)
4253 {
4254  CNSPreciseTime current_time = CNSPreciseTime::Current();
4255  for (TNSBitVector::enumerator en(jobs.first()); en.valid(); ++en) {
4256  try {
4257  x_ResetDueTo(client, *en, current_time,
4259  } catch (...) {
4260  ERR_POST("Error resetting a reading job when worker node "
4261  "changed session. Job: " << DecorateJob(*en));
4262  }
4263  }
4264 }
4265 
4266 
4268  unsigned short port,
4269  unsigned int timeout,
4270  const TNSBitVector & aff_ids,
4271  bool wnode_aff,
4272  bool any_aff,
4273  bool exclusive_new_affinity,
4274  bool new_format,
4275  const TNSBitVector & group_ids)
4276 {
4277  // Add to the notification list and save the wait port
4279  wnode_aff, any_aff,
4280  exclusive_new_affinity, new_format,
4281  group_ids, eGet);
4282  if (client.IsComplete())
4284  aff_ids, eGet);
4285  return;
4286 }
4287 
4288 
4289 void
4291  unsigned short port,
4292  unsigned int timeout,
4293  const TNSBitVector & aff_ids,
4294  bool reader_aff,
4295  bool any_aff,
4296  bool exclusive_new_affinity,
4297  const TNSBitVector & group_ids)
4298 {
4299  // Add to the notification list and save the wait port
4301  reader_aff, any_aff,
4302  exclusive_new_affinity, true,
4303  group_ids, eRead);
4304  m_ClientsRegistry.SetNodeWaiting(client, port, aff_ids, eRead);
4305 }
4306 
4307 
4309  unsigned short port)
4310 {
4311  if (client.IsComplete())
4313 
4314  if (port > 0) {
4316  return true;
4317  }
4318  return false;
4319 }
4320 
4321 
4322 void CQueue::PrintStatistics(size_t & aff_count) const
4323 {
4324  CStatisticsCounters counters_copy = m_StatisticsCounters;
4325 
4326  // Do not print the server wide statistics the very first time
4328 
4329  if (double(m_StatisticsCountersLastPrintedTimestamp) == 0.0) {
4330  m_StatisticsCountersLastPrinted = counters_copy;
4332  return;
4333  }
4334 
4335  // Calculate the delta since the last time
4337 
4339  ctx.Reset(new CRequestContext());
4340  ctx->SetRequestID();
4341 
4342 
4343  CDiagContext & diag_context = GetDiagContext();
4344 
4345  diag_context.SetRequestContext(ctx);
4346  CDiagContext_Extra extra = diag_context.PrintRequestStart();
4347 
4348  size_t affinities = m_AffinityRegistry.size();
4349  aff_count += affinities;
4350 
4351  // The member is called only if there is a request context
4352  extra.Print("_type", "statistics_thread")
4353  .Print("_queue", GetQueueName())
4354  .Print("time_interval", NS_FormatPreciseTimeAsSec(delta))
4355  .Print("affinities", affinities)
4364  counters_copy.PrintTransitions(extra);
4365  counters_copy.PrintDelta(extra, m_StatisticsCountersLastPrinted);
4366  extra.Flush();
4367 
4368  ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK);
4369  diag_context.PrintRequestStop();
4370  ctx.Reset();
4371  diag_context.SetRequestContext(NULL);
4372 
4373  m_StatisticsCountersLastPrinted = counters_copy;
4375 }
4376 
4377 
4379 {
4380  vector<TJobStatus> statuses;
4381  statuses.push_back(CNetScheduleAPI::ePending);
4382  statuses.push_back(CNetScheduleAPI::eRunning);
4383  statuses.push_back(CNetScheduleAPI::eCanceled);
4384  statuses.push_back(CNetScheduleAPI::eFailed);
4385  statuses.push_back(CNetScheduleAPI::eDone);
4386  statuses.push_back(CNetScheduleAPI::eReading);
4387  statuses.push_back(CNetScheduleAPI::eConfirmed);
4388  statuses.push_back(CNetScheduleAPI::eReadFailed);
4389 
4390  vector<unsigned int> counters = m_StatusTracker.GetJobCounters(statuses);
4391  g_DoPerfLogging(*this, statuses, counters);
4392 }
4393 
4394 
4395 unsigned int CQueue::GetJobsToDeleteCount(void) const
4396 {
4398  return m_JobsToDelete.count();
4399 }
4400 
4401 
4403 {
4404  string output;
4405  output.reserve(4096);
4407  .append("OK:garbage_jobs: ")
4408  .append(to_string(GetJobsToDeleteCount()))
4409  .append(kNewLine)
4410  .append("OK:affinity_registry_size: ")
4411  .append(to_string(m_AffinityRegistry.size()))
4412  .append(kNewLine)
4413  .append("OK:client_registry_size: ")
4414  .append(to_string(m_ClientsRegistry.size()))
4415  .append(kNewLine);
4416  return output;
4417 }
4418 
4419 
4421  const string & group_token,
4422  const string & aff_token,
4423  size_t * jobs,
4424  vector<string> & warnings) const
4425 {
4426  TNSBitVector group_jobs;
4427  TNSBitVector aff_jobs;
4429 
4430  if (!group_token.empty()) {
4431  try {
4432  group_jobs = m_GroupRegistry.GetJobs(group_token);
4433  } catch (...) {
4434  warnings.push_back("eGroupNotFound:job group " + group_token +
4435  " is not found");
4436  }
4437  }
4438  if (!aff_token.empty()) {
4439  unsigned int aff_id = m_AffinityRegistry.GetIDByToken(aff_token);
4440  if (aff_id == 0)
4441  warnings.push_back("eAffinityNotFound:affinity " + aff_token +
4442  " is not found");
4443  else
4444  aff_jobs = m_AffinityRegistry.GetJobsWithAffinity(aff_id);
4445  }
4446 
4447  if (!warnings.empty())
4448  return;
4449 
4450 
4451  string scope = client.GetScope();
4452  TNSBitVector candidates;
4453  for (size_t index(0); index < g_ValidJobStatusesSize; ++index) {
4454  candidates.clear();
4455  m_StatusTracker.GetJobs(g_ValidJobStatuses[index], candidates);
4456 
4457  if (!group_token.empty())
4458  candidates &= group_jobs;
4459  if (!aff_token.empty())
4460  candidates &= aff_jobs;
4461 
4462  // Apply the scope limitations. Empty scope means that all the jobs
4463  // must be provided
4464  if (scope == kNoScopeOnly) {
4465  // Exclude all scoped jobs
4466  candidates -= m_ScopeRegistry.GetAllJobsInScopes();
4467  } else if (!scope.empty()) {
4468  // Specific scope
4469  candidates &= m_ScopeRegistry.GetJobs(scope);
4470  }
4471 
4472  jobs[index] = candidates.count();
4473  }
4474 }
4475 
4476 
4478  const string & group_token,
4479  const string & aff_token,
4480  vector<string> & warnings) const
4481 {
4482  size_t total = 0;
4483  string result;
4484  size_t jobs_per_state[g_ValidJobStatusesSize];
4485 
4486  GetJobsPerState(client, group_token, aff_token, jobs_per_state, warnings);
4487 
4488  // Warnings could be about non existing affinity or group. If so there are
4489  // no jobs to be printed.
4490  if (warnings.empty()) {
4491  for (size_t index(0); index < g_ValidJobStatusesSize; ++index) {
4492  result += "OK:" +
4494  ": " + to_string(jobs_per_state[index]) + "\n";
4495  total += jobs_per_state[index];
4496  }
4497  result += "OK:Total: " + to_string(total) + "\n";
4498  }
4499  return result;
4500 }
4501 
4502 
4503 unsigned int CQueue::CountActiveJobs(void) const
4504 {
4505  vector<CNetScheduleAPI::EJobStatus> statuses;
4506 
4507  statuses.push_back(CNetScheduleAPI::ePending);
4508  statuses.push_back(CNetScheduleAPI::eRunning);
4509  return m_StatusTracker.CountStatus(statuses);
4510 }
4511 
4512 
4514 {
4516 
4517  bool need_notifications = (status == eNoPause &&
4518  m_PauseStatus != eNoPause);
4519 
4520  m_PauseStatus = status;
4521  if (need_notifications)
4523 
4525 }
4526 
4527 
4529  unsigned short port,
4530  bool new_format)
4531 {
4533  new_format);
4534 }
4535 
4536 
4538  const string & auth_token,
4539  const CNSPreciseTime & curr,
4540  int ret_code,
4541  const string & output,
4542  CJob & job,
4543  const CNSClientId & client)
4544 {
4545  auto job_iter = m_Jobs.find(job_id);
4546  if (job_iter == m_Jobs.end())
4547  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
4548 
4549  if (!auth_token.empty()) {
4550  // Need to check authorization token first
4551  CJob::EAuthTokenCompareResult token_compare_result =
4552  job_iter->second.CompareAuthToken(auth_token);
4553  if (token_compare_result == CJob::eInvalidTokenFormat)
4554  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
4555  "Invalid authorization token format");
4556  if (token_compare_result == CJob::eNoMatch)
4557  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
4558  "Authorization token does not match");
4559  if (token_compare_result == CJob::ePassportOnlyMatch) {
4560  // That means that the job has been executing by another worker
4561  // node at the moment, but we can accept the results anyway
4562  ERR_POST(Warning << "Received PUT2 with only "
4563  "passport matched.");
4564  }
4565  // Here: the authorization token is OK, we can continue
4566  }
4567 
4568  // Append the event
4569  CJobEvent * event = &job_iter->second.AppendEvent();
4571  event->SetEvent(CJobEvent::eDone);
4572  event->SetTimestamp(curr);
4573  event->SetRetCode(ret_code);
4574 
4575  event->SetClientNode(client.GetNode());
4576  event->SetClientSession(client.GetSession());
4577  event->SetNodeAddr(client.GetAddress());
4578 
4579  job_iter->second.SetStatus(CNetScheduleAPI::eDone);
4580  job_iter->second.SetOutput(output);
4581  job_iter->second.SetLastTouch(curr);
4582 
4583  job = job_iter->second;
4584 }
4585 
4586 
4587 // If the job.job_id != 0 => the job has been read successfully
4588 // Exception => DB errors
4590  const CNSPreciseTime & curr,
4591  unsigned int job_id,
4592  ECommandGroup cmd_group,
4593  CJob & job)
4594 {
4595  auto job_iter = m_Jobs.find(job_id);
4596  if (job_iter == m_Jobs.end())
4597  NCBI_THROW(CNetScheduleException, eInternalError, "Error fetching job");
4598 
4599  CJobEvent & event = job_iter->second.AppendEvent();
4600  event.SetTimestamp(curr);
4601  event.SetNodeAddr(client.GetAddress());
4602  event.SetClientNode(client.GetNode());
4603  event.SetClientSession(client.GetSession());
4604 
4605  if (cmd_group == eGet) {
4606  event.SetStatus(CNetScheduleAPI::eRunning);
4607  event.SetEvent(CJobEvent::eRequest);
4608  } else {
4609  event.SetStatus(CNetScheduleAPI::eReading);
4610  event.SetEvent(CJobEvent::eRead);
4611  }
4612 
4613  job_iter->second.SetLastTouch(curr);
4614  if (cmd_group == eGet) {
4615  job_iter->second.SetStatus(CNetScheduleAPI::eRunning);
4616  job_iter->second.SetRunTimeout(kTimeZero);
4617  job_iter->second.SetRunCount(job_iter->second.GetRunCount() + 1);
4618  } else {
4619  job_iter->second.SetStatus(CNetScheduleAPI::eReading);
4620  job_iter->second.SetReadTimeout(kTimeZero);
4621  job_iter->second.SetReadCount(job_iter->second.GetReadCount() + 1);
4622  }
4623 
4624  job = job_iter->second;
4625 }
4626 
4627 
4628 // Dumps all the jobs into a flat file at the time of shutdown
4629 void CQueue::Dump(const string & dump_dname)
4630 {
4631  // Form a bit vector of all jobs to dump
4632  vector<TJobStatus> statuses;
4633  TNSBitVector jobs_to_dump;
4634 
4635  // All statuses
4636  statuses.push_back(CNetScheduleAPI::ePending);
4637  statuses.push_back(CNetScheduleAPI::eRunning);
4638  statuses.push_back(CNetScheduleAPI::eCanceled);
4639  statuses.push_back(CNetScheduleAPI::eFailed);
4640  statuses.push_back(CNetScheduleAPI::eDone);
4641  statuses.push_back(CNetScheduleAPI::eReading);
4642  statuses.push_back(CNetScheduleAPI::eConfirmed);
4643  statuses.push_back(CNetScheduleAPI::eReadFailed);
4644 
4645  m_StatusTracker.GetJobs(statuses, jobs_to_dump);
4646 
4647  // Exclude all the jobs which belong to a certain scope. There is no
4648  // need to save them
4649  jobs_to_dump -= m_ScopeRegistry.GetAllJobsInScopes();
4650 
4651  if (!jobs_to_dump.any())
4652  return; // Nothing to dump
4653 
4654 
4655  string jobs_file_name = x_GetJobsDumpFileName(dump_dname);
4656  FILE * jobs_file = NULL;
4657 
4658  try {
4659  // Dump the affinity registry
4660  m_AffinityRegistry.Dump(dump_dname, m_QueueName);
4661 
4662  // Dump the group registry
4663  m_GroupRegistry.Dump(dump_dname, m_QueueName);
4664 
4665  jobs_file = fopen(jobs_file_name.c_str(), "wb");
4666  if (jobs_file == NULL)
4667  throw runtime_error("Cannot open file " + jobs_file_name +
4668  " to dump jobs");
4669 
4670  // Disable buffering to detect errors right away
4671  setbuf(jobs_file, NULL);
4672 
4673  // Write a header
4674  SJobDumpHeader header;
4675  header.Write(jobs_file);
4676 
4677  TNSBitVector::enumerator en(jobs_to_dump.first());
4678  for ( ; en.valid(); ++en) {
4679  auto job_iter = m_Jobs.find(*en);
4680  if (job_iter == m_Jobs.end()) {
4681  ERR_POST("Dump at SHUTDOWN: error fetching job " <<
4682  DecorateJob(*en) << ". Skip and continue.");
4683  continue;
4684  }
4685 
4686  job_iter->second.Dump(jobs_file);
4687  }
4688  } catch (const exception & ex) {
4689  if (jobs_file != NULL)
4690  fclose(jobs_file);
4691  RemoveDump(dump_dname);
4692  throw runtime_error("Error dumping queue " + m_QueueName +
4693  ": " + string(ex.what()));
4694  }
4695 
4696  fclose(jobs_file);
4697 }
4698 
4699 
4700 void CQueue::RemoveDump(const string & dump_dname)
4701 {
4703  m_GroupRegistry.RemoveDump(dump_dname, m_QueueName);
4704 
4705  string jobs_file_name = x_GetJobsDumpFileName(dump_dname);
4706 
4707  if (access(jobs_file_name.c_str(), F_OK) != -1)
4708  remove(jobs_file_name.c_str());
4709 }
4710 
4711 
4712 string CQueue::x_GetJobsDumpFileName(const string & dump_dname) const
4713 {
4714  string upper_queue_name = m_QueueName;
4715  NStr::ToUpper(upper_queue_name);
4716  return dump_dname + kJobsFileName + "." + upper_queue_name;
4717 }
4718 
4719 
4720 unsigned int CQueue::LoadFromDump(const string & dump_dname)
4721 {
4722  unsigned int recs = 0;
4723  string jobs_file_name = x_GetJobsDumpFileName(dump_dname);
4724  FILE * jobs_file = NULL;
4725 
4726  if (!CDir(dump_dname).Exists())
4727  return 0;
4728  if (!CFile(jobs_file_name).Exists())
4729  return 0;
4730 
4731  try {
4734 
4735  jobs_file = fopen(jobs_file_name.c_str(), "rb");
4736  if (jobs_file == NULL)
4737  throw runtime_error("Cannot open file " + jobs_file_name +
4738  " to load dumped jobs");
4739 
4740  SJobDumpHeader header;
4741  header.Read(jobs_file);
4742 
4743  CJob job;
4744  AutoArray<char> input_buf(new char[kNetScheduleMaxOverflowSize]);
4745  AutoArray<char> output_buf(new char[kNetScheduleMaxOverflowSize]);
4746  while (job.LoadFromDump(jobs_file,
4747  input_buf.get(), output_buf.get(),
4748  header)) {
4749  unsigned int job_id = job.GetId();
4750  unsigned int group_id = job.GetGroupId();
4751  unsigned int aff_id = job.GetAffinityId();
4752  TJobStatus status = job.GetStatus();
4753 
4754  m_Jobs[job_id] = job;
4755  m_StatusTracker.SetExactStatusNoLock(job_id, status, true);
4756 
4757  if ((status == CNetScheduleAPI::eRunning ||
4758  status == CNetScheduleAPI::eReading) &&
4759  m_RunTimeLine) {
4760  // Add object to the first available slot;
4761  // it is going to be rescheduled or dropped
4762  // in the background control thread
4763  // We can use time line without lock here because
4764  // the queue is still in single-use mode while
4765  // being loaded.
4767  }
4768 
4769  // Register the job for the affinity if so
4770  if (aff_id != 0)
4771  m_AffinityRegistry.AddJobToAffinity(job_id, aff_id);
4772 
4773  // Register the job in the group registry
4774  if (group_id != 0)
4775  m_GroupRegistry.AddJobToGroup(group_id, job_id);
4776 
4777  // Register the loaded job with the garbage collector
4778  CNSPreciseTime submit_time = job.GetSubmitTime();
4779  CNSPreciseTime expiration =
4780  GetJobExpirationTime(job.GetLastTouch(), status,
4781  submit_time, job.GetTimeout(),
4782  job.GetRunTimeout(),
4783  job.GetReadTimeout(),
4786  m_GCRegistry.RegisterJob(job_id, job.GetSubmitTime(),
4787  aff_id, group_id, expiration);
4788  ++recs;
4789  }
4790 
4791  // Make sure that there are no affinity IDs in the registry for which
4792  // there are no jobs and initialize the next affinity ID counter.
4794 
4795  // Make sure that there are no group IDs in the registry for which there
4796  // are no jobs and initialize the next group ID counter.
4798  } catch (const exception & ex) {
4799  if (jobs_file != NULL)
4800  fclose(jobs_file);
4801 
4802  x_ClearQueue();
4803  throw runtime_error("Error loading queue " + m_QueueName +
4804  " from its dump: " + string(ex.what()));
4805  } catch (...) {
4806  if (jobs_file != NULL)
4807  fclose(jobs_file);
4808 
4809  x_ClearQueue();
4810  throw runtime_error("Unknown error loading queue " + m_QueueName +
4811  " from its dump");
4812  }
4813 
4814  fclose(jobs_file);
4815  return recs;
4816 }
4817 
4818 
4819 // The member does not grab the operational lock.
4820 // The member is used at the time of loading jobs from dump and at that time
4821 // there is no concurrent access.
4823 {
4825  m_RunTimeLine->ReInit();
4826  m_JobsToDelete.clear(true);
4827  m_ReadJobs.clear(true);
4828 
4831  m_GCRegistry.Clear();
4833 
4834  m_Jobs.clear();
4835 }
4836 
4837 
4839  const string & job_key,
4840  ENotificationReason reason,
4841  const CNSPreciseTime & current_time)
4842 {
4843  string notification;
4844  TJobStatus job_status = job.GetStatus();
4845 
4846  if (reason == eJobDeleted)
4847  job_status = CNetScheduleAPI::eDeleted;
4848 
4849  if (reason != eProgressMessageChanged || job.GetLsnrNeedProgressMsgNotif()) {
4850  if (job.ShouldNotifyListener(current_time)) {
4852  job, job_key, job_status, reason);
4854  job.GetListenerNotifPort(),
4855  notification);
4856  }
4857  }
4858 
4859  if (reason != eNotificationStolen) {
4860  if (reason != eProgressMessageChanged || job.GetSubmNeedProgressMsgNotif()) {
4861  if (job.ShouldNotifySubmitter(current_time)) {
4862  if (notification.empty())
4864  job, job_key, job_status, reason);
4866  job.GetSubmNotifPort(),
4867  notification);
4868  }
4869  }
4870  }
4871 }
4872 
4873 
static const struct attribute attributes[]
Definition: attributes.c:165
Algorithms for bvector<> (main include)
#define false
Definition: bool.h:36
Temporary object for holding extra message arguments.
Definition: ncbidiag.hpp:1828
CDir –.
Definition: ncbifile.hpp:1695
CFile –.
Definition: ncbifile.hpp:1604
void Release()
Manually force the resource to be released.
Definition: guard.hpp:166
EJobEvent
Definition: job.hpp:60
@ eReturn
Definition: job.hpp:67
@ eRequest
Definition: job.hpp:65
@ eNSSubmitRollback
Definition: job.hpp:82
@ eCancel
Definition: job.hpp:77
@ eClear
Definition: job.hpp:75
@ eDone
Definition: job.hpp:66
@ eTimeout
Definition: job.hpp:78
@ eRedo
Definition: job.hpp:94
@ eReadTimeout
Definition: job.hpp:79
@ eFail
Definition: job.hpp:68
@ eBatchSubmit
Definition: job.hpp:64
@ eReadFail
Definition: job.hpp:71
@ eNSGetRollback
Definition: job.hpp:85
@ eReadRollback
Definition: job.hpp:74
@ eReadFinalFail
Definition: job.hpp:72
@ eReread
Definition: job.hpp:95
@ eReschedule
Definition: job.hpp:93
@ eReadDone
Definition: job.hpp:73
@ eReturnNoBlacklist
Definition: job.hpp:91
@ eRead
Definition: job.hpp:70
@ eSubmit
Definition: job.hpp:63
@ eSessionChanged
Definition: job.hpp:80
@ eNSReadRollback
Definition: job.hpp:88
@ eFinalFail
Definition: job.hpp:69
static std::string EventToString(EJobEvent event)
Definition: job.cpp:82
void SetNodeAddr(unsigned int node_ip)
Definition: job.hpp:134
void SetStatus(TJobStatus status)
Definition: job.hpp:128
void SetTimestamp(const CNSPreciseTime &t)
Definition: job.hpp:132
CNSPreciseTime GetLifetime(unsigned int job_id) const
bool DeleteIfTimedOut(unsigned int job_id, const CNSPreciseTime &current_time, unsigned int *aff_id, unsigned int *group_id)
void UpdateReadVacantTime(unsigned int job_id, const CNSPreciseTime &read_vacant_time)
void RegisterJob(unsigned int job_id, const CNSPreciseTime &submit_time, unsigned int aff_id, unsigned int group_id, const CNSPreciseTime &life_time)
void UpdateLifetime(unsigned int job_id, const CNSPreciseTime &life_time)
unsigned int GetAffinityID(unsigned int job_id) const
bool IsOutdatedJob(unsigned int job_id, ECommandGroup cmd_group, const CNSPreciseTime &timeout) const
void ChangeAffinityAndGroup(unsigned int job_id, unsigned int aff_id, unsigned int group_id)
unsigned GetNext(TJobStatus status, unsigned job_id) const
Definition: job_status.cpp:465
void SetStatus(unsigned int job_id, TJobStatus status)
Definition: job_status.cpp:194
void Erase(unsigned job_id)
Definition: job_status.cpp:226
void GetJobs(const vector< TJobStatus > &statuses, TNSBitVector &jobs) const
Definition: job_status.cpp:340
void AddPendingJob(unsigned int job_id)
Definition: job_status.cpp:219
void StatusStatistics(TJobStatus status, TNSBitVector::statistics *st) const
Definition: job_status.cpp:183
bool AnyPending() const
Definition: job_status.cpp:456
vector< unsigned int > GetJobCounters(const vector< TJobStatus > &statuses) const
Definition: job_status.cpp:103
void SetExactStatusNoLock(unsigned int job_id, TJobStatus status, bool set_clear)
Definition: job_status.cpp:267
unsigned int CountStatus(TJobStatus status) const
Definition: job_status.cpp:80
bool AnyJobs(void) const
Definition: job_status.cpp:150
TJobStatus GetStatus(unsigned job_id) const
Definition: job_status.cpp:66
TNSBitVector GetOutdatedReadVacantJobs(CNSPreciseTime timeout, const TNSBitVector &read_jobs, const CJobGCRegistry &gc_registry) const
Definition: job_status.cpp:405
void ClearAll(TNSBitVector *bv)
Definition: job_status.cpp:232
void AddPendingBatch(unsigned job_id_from, unsigned job_id_to)
Definition: job_status.cpp:276
unsigned int GetJobByStatus(TJobStatus status, const TNSBitVector &unwanted_jobs, const TNSBitVector &restrict_jobs, bool restricted) const
Definition: job_status.cpp:286
TNSBitVector GetOutdatedPendingJobs(CNSPreciseTime timeout, const CJobGCRegistry &gc_registry) const
Definition: job_status.cpp:361
Definition: job.hpp:183
void SetPassport(unsigned int passport)
Definition: job.hpp:274
unsigned GetRunCount() const
Definition: job.hpp:228
CNSPreciseTime GetTimeout() const
Definition: job.hpp:209
bool GetSubmNeedProgressMsgNotif() const
Definition: job.hpp:250
unsigned GetSubmAddr() const
Definition: job.hpp:216
bool ShouldNotifyListener(const CNSPreciseTime &current_time) const
Definition: job.cpp:291
void SetAffinityId(unsigned aff_id)
Definition: job.hpp:303
void SetLastTouch(const CNSPreciseTime &t)
Definition: job.hpp:309
unsigned int GetListenerNotifAddr() const
Definition: job.hpp:223
unsigned short GetSubmNotifPort() const
Definition: job.hpp:218
const string & GetInput() const
Definition: job.hpp:260
unsigned GetGroupId() const
Definition: job.hpp:240
CNSPreciseTime GetRunTimeout() const
Definition: job.hpp:211
void SetGroupId(unsigned id)
Definition: job.hpp:307
unsigned short GetListenerNotifPort() const
Definition: job.hpp:225
TJobStatus GetStatus() const
Definition: job.hpp:207
CJobEvent & AppendEvent()
Definition: job.cpp:236
CNSPreciseTime GetSubmitTime(void) const
Definition: job.hpp:336
bool GetLsnrNeedProgressMsgNotif() const
Definition: job.hpp:252
unsigned GetMask() const
Definition: job.hpp:238
bool ShouldNotifySubmitter(const CNSPreciseTime &current_time) const
Definition: job.cpp:281
unsigned GetAffinityId() const
Definition: job.hpp:235
unsigned GetId() const
Definition: job.hpp:201
void SetId(unsigned id)
Definition: job.hpp:272
CNSPreciseTime GetExpirationTime(const CNSPreciseTime &queue_timeout, const CNSPreciseTime &queue_run_timeout, const CNSPreciseTime &queue_read_timeout, const CNSPreciseTime &queue_pending_timeout, const CNSPreciseTime &event_time) const
Definition: job.hpp:341
EAuthTokenCompareResult
Definition: job.hpp:185
@ eNoMatch
Definition: job.hpp:188
@ eInvalidTokenFormat
Definition: job.hpp:190
@ ePassportOnlyMatch
Definition: job.hpp:187
@ eCompleteMatch
Definition: job.hpp:186
CNSPreciseTime GetLastTouch() const
Definition: job.hpp:242
CNSPreciseTime GetReadTimeout() const
Definition: job.hpp:213
bool LoadFromDump(FILE *jobs_file, char *input_buf, char *output_buf, const SJobDumpHeader &header)
Definition: job.cpp:843
void RemoveJobFromAffinity(unsigned int job_id, unsigned int aff_id)
size_t size(void) const
unsigned int ResolveAffinity(const string &token)
TNSBitVector GetJobsWithAffinity(unsigned int aff_id) const
void Dump(const string &dump_dir_name, const string &queue_name) const
void FinalizeAffinityDictionaryLoading(void)
TNSBitVector GetJobsWithAffinities(const TNSBitVector &affs) const
void RemoveDump(const string &dump_dir_name, const string &queue_name) const
bool CanAccept(const string &aff_token, size_t max_records) const
string Print(const CQueue *queue, const CNSClientsRegistry &clients_registry, const TNSBitVector &scope_jobs, const string &scope, size_t batch_size, bool verbose) const
string GetTokenByID(unsigned int aff_id) const
void ResolveAffinities(const list< string > &tokens, TNSBitVector &resolved_affs, vector< unsigned int > &aff_ids)
TNSBitVector GetRegisteredAffinities(void) const
unsigned int CollectGarbage(unsigned int max_to_del)
void LoadFromDump(const string &dump_dir_name, const string &queue_name)
unsigned int GetIDByToken(const string &aff_token) const
unsigned int CheckRemoveCandidates(void)
unsigned int ResolveAffinityToken(const string &token, unsigned int job_id, unsigned int client_id, ECommandGroup command_group)
void AddJobToAffinity(unsigned int job_id, unsigned int aff_id)
void AppendType(const CNSClientId &client, unsigned int type_to_append)
bool IsPreferredByAny(unsigned int aff_id, ECommandGroup cmd_group) const
void RegisterJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void SetPreferredAffinities(const CNSClientId &client, const TNSBitVector &aff_to_set, ECommandGroup cmd_group)
void SubtractBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
void Purge(const CNSPreciseTime &current_time, const CNSPreciseTime &timeout_worker_node, unsigned int min_worker_nodes, const CNSPreciseTime &timeout_admin, unsigned int min_admins, const CNSPreciseTime &timeout_submitter, unsigned int min_submitters, const CNSPreciseTime &timeout_reader, unsigned int min_readers, const CNSPreciseTime &timeout_unknown, unsigned int min_unknowns, bool is_log)
void StaleNodes(const CNSPreciseTime &current_time, const CNSPreciseTime &wn_timeout, const CNSPreciseTime &reader_timeout, bool is_log)
void AddToSubmitted(const CNSClientId &client, size_t count)
string PrintClientsList(const CQueue *queue, size_t batch_size, bool verbose) const
void UnregisterJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
TNSBitVector GetAllPreferredAffinities(ECommandGroup cmd_group) const
void RegisterBlacklistedJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
size_t size(void) const
bool CancelWaiting(CNSClient &client, ECommandGroup cmd_group, bool touch_notif_registry=true)
void SetBlacklistTimeouts(const CNSPreciseTime &blacklist_timeout, const CNSPreciseTime &read_blacklist_timeout)
void MoveJobToBlacklist(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void ClearClient(const CNSClientId &client, TNSBitVector &running_jobs, TNSBitVector &reading_jobs, bool &client_was_found, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
void GCBlacklistedJobs(const CJobStatusTracker &tracker, ECommandGroup cmd_group)
void UpdatePreferredAffinities(const CNSClientId &client, const TNSBitVector &aff_to_add, const TNSBitVector &aff_to_del, ECommandGroup cmd_group)
void Touch(CNSClientId &client, TNSBitVector &running_jobs, TNSBitVector &reading_jobs, bool &client_was_found, bool &session_was_reset, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
bool GetAffinityReset(const CNSClientId &client, ECommandGroup cmd_group) const
void SetLastScope(const CNSClientId &client)
void MarkAsAdmin(const CNSClientId &client)
void RegisterSocketWriteError(const CNSClientId &client)
int SetClientData(const CNSClientId &client, const string &data, int data_version)
void SetRegistries(CNSAffinityRegistry *aff_registry, CNSNotificationList *notif_registry)
void SetNodeWaiting(const CNSClientId &client, unsigned short port, const TNSBitVector &aff_ids, ECommandGroup cmd_group)
TNSBitVector GetPreferredAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
bool WasGarbageCollected(const CNSClientId &client, ECommandGroup cmd_group) const
void AddBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
void RemoveJob(unsigned int group_id, unsigned int job_id)
Definition: ns_group.cpp:304
void Clear(void)
Definition: ns_group.cpp:546
void RemoveDump(const string &dump_dir_name, const string &queue_name) const
Definition: ns_group.cpp:650
unsigned int CollectGarbage(unsigned int max_to_del)
Definition: ns_group.cpp:380
size_t size(void) const
Definition: ns_group.cpp:87
unsigned int CheckRemoveCandidates(void)
Definition: ns_group.cpp:407
TNSBitVector GetJobs(const string &group, bool allow_exception=true) const
Definition: ns_group.cpp:105
unsigned int ResolveGroup(const string &group)
Definition: ns_group.cpp:180
bool CanAccept(const string &group, size_t max_records) const
Definition: ns_group.cpp:94
void AddJobToGroup(unsigned int group_id, unsigned int job_id)
Definition: ns_group.cpp:290
unsigned int AddJob(const string &group, unsigned int job_id)
Definition: ns_group.cpp:245
unsigned int AddJobs(unsigned int group_id, unsigned int first_job_id, unsigned int count)
Definition: ns_group.cpp:223
void FinalizeGroupDictionaryLoading(void)
Definition: ns_group.cpp:357
string Print(const CQueue *queue, const TNSBitVector &scope_jobs, const string &scope, size_t batch_size, bool verbose) const
Definition: ns_group.cpp:324
void ResolveGroups(const list< string > &tokens, TNSBitVector &group_ids_vector)
Definition: ns_group.cpp:210
void RestrictByGroup(const string &group, TNSBitVector &bv) const
Definition: