73 m_QueueName(queue_name),
100 m_Log(server->IsLog()),
101 m_LogBatchEachJob(server->IsLogBatchEachJob()),
102 m_RefuseSubmits(
false),
105 m_StatisticsCountersLastPrintedTimestamp(0.0),
106 m_NotificationsList(qdb, server->GetNodeID(), queue_name),
115 m_PauseStatus(eNoPause),
116 m_ClientRegistryTimeoutWorkerNode(
127 m_ShouldPerfLogTransitions(
false)
168 unsigned int interval_sec =
precision.Sec();
169 if (interval_sec < 1)
240 for (
unsigned n = 0;
n < nParams; ++
n) {
241 parameters.push_back(
249 unsigned int & max_input_size,
250 unsigned int & max_output_size,
270 linked_sections[k->first] = values;
278 bool reader_affinity,
280 bool exclusive_new_affinity,
282 bool affinity_may_change,
283 bool group_may_change)
286 if (!reader_affinity &&
288 !exclusive_new_affinity &&
295 vector<CNetScheduleAPI::EJobStatus> from_state;
298 string scope =
client.GetScope();
322 pending_running_jobs -= all_jobs_in_scopes;
323 other_jobs -= all_jobs_in_scopes;
324 reading_jobs -= all_jobs_in_scopes;
329 pending_running_jobs &= scope_jobs;
330 other_jobs &= scope_jobs;
331 reading_jobs &= scope_jobs;
334 if (group_ids.
any()) {
337 if (!group_may_change)
341 other_jobs |= reading_jobs;
344 other_jobs |= reading_jobs;
346 TNSBitVector candidates = pending_running_jobs | other_jobs;
348 if (!candidates.
any())
355 return !candidates.
any();
366 no_aff_jobs = candidates - all_aff_jobs;
367 if (exclusive_new_affinity && no_aff_jobs.
any())
370 if (exclusive_new_affinity)
371 suitable_affinities = all_aff - all_pref_affs;
375 suitable_affinities |= aff_ids;
379 suitable_affinities);
380 if (affinity_may_change)
381 candidates = pending_running_jobs |
382 (other_jobs & suitable_aff_jobs);
384 candidates &= suitable_aff_jobs;
385 return !candidates.
any();
401 const string & aff_token,
402 const string & group,
412 unsigned int aff_id = 0;
413 unsigned int group_id = 0;
422 event.SetNodeAddr(
client.GetAddress());
425 event.SetTimestamp(op_begin_time);
426 event.SetClientNode(
client.GetNode());
427 event.SetClientSession(
client.GetSession());
441 string scope =
client.GetScope();
445 if (!scope.empty()) {
451 "No available slots in the queue scope registry");
453 if (!group.empty()) {
459 "No available slots in the queue group registry");
461 if (!aff_token.empty()) {
467 "No available slots in the queue affinity registry");
471 if (!group.empty()) {
475 if (!aff_token.empty()) {
521 vector< pair<CJob, string> > & batch,
522 const string & group,
526 unsigned int batch_size = batch.size();
532 unsigned int job_id_cnt = job_id;
533 unsigned int group_id = 0;
534 vector<string> aff_tokens;
535 string scope =
client.GetScope();
538 for (
size_t k = 0; k < batch_size; ++k) {
539 const string & aff_token = batch[k].second;
540 if (!aff_token.empty())
541 aff_tokens.push_back(aff_token);
547 if (!scope.empty()) {
553 "No available slots in the queue scope registry");
555 if (!group.empty()) {
561 "No available slots in the queue group registry");
563 if (!aff_tokens.empty()) {
569 "No available slots in the queue affinity registry");
573 for (
size_t k = 0; k < batch_size; ++k) {
575 CJob & job = batch[k].first;
576 const string & aff_token = batch[k].second;
579 job.
SetId(job_id_cnt);
584 event.SetNodeAddr(
client.GetAddress());
587 event.SetTimestamp(curr_time);
588 event.SetClientNode(
client.GetNode());
589 event.SetClientSession(
client.GetSession());
591 if (!aff_token.empty()) {
593 ResolveAffinityToken(aff_token,
615 jobs.
set_range(job_id, job_id + batch_size - 1);
619 batch_size != aff_tokens.
size(),
628 for (
size_t k = 0; k < batch_size; ++k) {
630 batch[k].
first.GetId(), curr_time,
631 batch[k].first.GetAffinityId(), group_id,
632 batch[k].first.GetExpirationTime(
m_Timeout,
642 for (
size_t k = 0; k < batch_size; ++k)
653 const string & job_key,
655 const string & auth_token,
664 "Output is too long");
720 unsigned int timeout,
723 const list<string> * aff_list,
726 bool exclusive_new_affinity,
727 bool prioritized_aff,
729 const list<string> * group_list,
732 string & added_pref_aff)
741 vector<unsigned int> aff_ids;
744 bool has_groups =
false;
749 if (wnode_affinity) {
762 if (group_list !=
NULL) {
764 has_groups = !group_list->empty();
766 if (aff_list !=
NULL)
777 aff_ids_vector, aff_ids,
780 exclusive_new_affinity,
782 group_ids_vector, has_groups,
785 bool outdated_job =
false;
788 if (job_pick.
job_id == 0) {
789 if (exclusive_new_affinity)
794 if (job_pick.
job_id == 0) {
795 if (timeout != 0 && port > 0)
800 wnode_affinity, any_affinity,
801 exclusive_new_affinity,
802 new_format, group_ids_vector);
812 if (exclusive_new_affinity) {
820 if (outdated_pick.
job_id != 0) {
821 job_pick = outdated_pick;
831 outdated_job ==
false) {
842 UpdatePreferredAffinities(
845 added_pref_aff = aff_token;
848 if (outdated_job && job_pick.
aff_id != 0) {
855 UpdatePreferredAffinities(
858 added_pref_aff = aff_token;
912 "which does not wait anything (node: "
913 <<
client.GetNode() <<
" session: "
914 <<
client.GetSession() <<
")");
930 "which does not wait anything (node: "
931 <<
client.GetNode() <<
" session: "
932 <<
client.GetSession() <<
")");
938 const list<string> & aff_to_add,
939 const list<string> & aff_to_del,
944 if (cmd_group ==
eGet)
951 unsigned int client_id =
client.GetID();
957 bool any_to_add =
false;
958 bool any_to_del =
false;
961 for (list<string>::const_iterator k(aff_to_del.begin());
962 k != aff_to_del.end(); ++k) {
968 <<
"' deletes unknown affinity '"
969 << *k <<
"'. Ignored.");
970 msgs.push_back(
"eAffinityNotFound:"
971 "unknown affinity to delete: " + *k);
975 if (!current_affinities.
get_bit(aff_id)) {
979 <<
"' deletes affinity '" << *k
980 <<
"' which is not in the list of the "
981 "preferred client affinities. Ignored.");
982 msgs.push_back(
"eAffinityNotPreferred:not registered affinity "
1001 if (current_affinities.
count() + aff_to_add.size()
1002 - aff_id_to_del.
count() >
1005 "The client '" +
client.GetNode() +
1006 "' exceeds the limit (" +
1008 ") of the preferred affinities. Changed request ignored.");
1012 vector<string> already_added_affinities;
1018 for (list<string>::const_iterator k(aff_to_add.begin());
1019 k != aff_to_add.end(); ++k ) {
1020 unsigned int aff_id =
1025 if (current_affinities.
get_bit(aff_id)) {
1026 already_added_affinities.push_back(*k);
1030 aff_id_to_add.
set_bit(aff_id);
1036 for (vector<string>::const_iterator j(already_added_affinities.begin());
1037 j != already_added_affinities.end(); ++j) {
1040 <<
"' adds affinity '" << *j
1041 <<
"' which is already in the list of the "
1042 "preferred client affinities. Ignored.");
1043 msgs.push_back(
"eAffinityAlreadyPreferred:already registered "
1044 "affinity to add: " + *j);
1047 if (any_to_add || any_to_del)
1055 <<
"' has been garbage collected and tries to "
1056 "update its preferred affinities.");
1057 msgs.push_back(
"eClientGarbageCollected:the client had been "
1058 "garbage collected");
1065 const list<string> & aff,
1068 if (cmd_group ==
eGet)
1078 "The client '" +
client.GetNode() +
1079 "' exceeds the limit (" +
1081 ") of the preferred affinities. Set request ignored.");
1084 unsigned int client_id =
client.GetID();
1096 for (list<string>::const_iterator k(aff.begin());
1097 k != aff.end(); ++k ) {
1098 unsigned int aff_id =
1103 if (current_affinities.
get_bit(aff_id))
1104 already_added_aff_id.
set_bit(aff_id);
1106 aff_id_to_set.
set_bit(aff_id);
1115 const string & data,
int data_version)
1141 time_start = job_iter->second.GetLastEvent()->GetTimestamp();
1142 run_timeout = job_iter->second.GetRunTimeout();
1144 run_timeout = queue_run_timeout;
1146 if (time_start + run_timeout > curr + tm) {
1147 job = job_iter->second;
1152 job_iter->second.SetRunTimeout(curr + tm - time_start);
1153 job_iter->second.SetLastTouch(curr);
1159 exp_time = time_start + run_timeout;
1163 job = job_iter->second;
1188 time_start = job_iter->second.GetLastEvent()->GetTimestamp();
1189 read_timeout = job_iter->second.GetReadTimeout();
1191 read_timeout = queue_read_timeout;
1193 if (time_start + read_timeout > curr + tm) {
1194 job = job_iter->second;
1200 job_iter->second.SetReadTimeout(curr + tm - time_start);
1201 job_iter->second.SetLastTouch(curr);
1207 exp_time = time_start + read_timeout;
1211 job = job_iter->second;
1220 string & client_sid,
1221 string & client_phid,
1222 string & progress_msg,
1236 client_ip = job_iter->second.GetClientIP();
1237 client_sid = job_iter->second.GetClientSID();
1238 client_phid = job_iter->second.GetNCBIPHID();
1239 progress_msg = job_iter->second.GetProgressMsg();
1263 job_iter->second.SetLastTouch(curr);
1271 job = job_iter->second;
1278 unsigned int address,
1279 unsigned short port,
1282 bool need_progress_msg,
1283 size_t * last_event_index)
1294 *last_event_index = job_iter->second.GetLastEventIndex();
1295 status = job_iter->second.GetStatus();
1297 unsigned int old_listener_addr = job_iter->second.GetListenerNotifAddr();
1298 unsigned short old_listener_port = job_iter->second.GetListenerNotifPort();
1300 if (job_iter->second.GetNeedStolenNotif() &&
1301 old_listener_addr != 0 && old_listener_port != 0) {
1302 if (old_listener_addr != address || old_listener_port != port) {
1310 if (address == 0 || port == 0 || timeout ==
kTimeZero) {
1313 job_iter->second.SetListenerNotifAddr(0);
1314 job_iter->second.SetListenerNotifPort(0);
1315 job_iter->second.SetListenerNotifAbsTime(
kTimeZero);
1317 job_iter->second.SetListenerNotifAddr(address);
1318 job_iter->second.SetListenerNotifPort(port);
1319 job_iter->second.SetListenerNotifAbsTime(curr + timeout);
1322 job_iter->second.SetNeedLsnrProgressMsgNotif(need_progress_msg);
1323 job_iter->second.SetNeedStolenNotif(need_stolen);
1324 job_iter->second.SetLastTouch(curr);
1326 job = job_iter->second;
1342 job_iter->second.SetProgressMsg(msg);
1343 job_iter->second.SetLastTouch(curr);
1352 job = job_iter->second;
1358 unsigned int job_id,
1359 const string & job_key,
1361 const string & auth_token,
1376 if (!auth_token.empty()) {
1379 job_iter->second.CompareAuthToken(auth_token);
1382 "Invalid authorization token format");
1385 "Authorization token does not match");
1390 "passport matched.");
1391 warning =
"eJobPassportOnlyMatch:Only job passport matched. "
1392 "Command is ignored.";
1393 job = job_iter->second;
1399 unsigned int run_count = job_iter->second.
GetRunCount();
1400 CJobEvent *
event = job_iter->second.GetLastEvent();
1403 ERR_POST(
"No JobEvent for running job");
1405 event = &job_iter->second.AppendEvent();
1406 event->SetNodeAddr(
client.GetAddress());
1419 event->SetTimestamp(current_time);
1420 event->SetClientNode(
client.GetNode());
1421 event->SetClientSession(
client.GetSession());
1424 job_iter->second.SetRunCount(run_count - 1);
1427 job_iter->second.SetLastTouch(current_time);
1460 job = job_iter->second;
1466 unsigned int job_id,
1467 const string & job_key,
1468 const string & auth_token,
1469 const string & aff_token,
1470 const string & group,
1471 bool & auth_token_ok,
1477 unsigned int affinity_id = 0;
1478 unsigned int group_id = 0;
1479 unsigned int job_affinity_id;
1480 unsigned int job_group_id;
1486 if (!aff_token.empty() || !group.empty()) {
1487 if (!aff_token.empty())
1499 job_iter->second.CompareAuthToken(auth_token);
1503 "Invalid authorization token format");
1506 auth_token_ok =
false;
1507 job = job_iter->second;
1512 auth_token_ok =
true;
1517 job_group_id = job_iter->second.GetGroupId();
1520 job_iter->second.SetAffinityId(affinity_id);
1521 job_iter->second.SetGroupId(group_id);
1523 unsigned int run_count = job_iter->second.GetRunCount();
1524 CJobEvent *
event = job_iter->second.GetLastEvent();
1527 ERR_POST(
"No JobEvent for running job");
1529 event = &job_iter->second.AppendEvent();
1530 event->SetNodeAddr(
client.GetAddress());
1533 event->SetTimestamp(current_time);
1534 event->SetClientNode(
client.GetNode());
1535 event->SetClientSession(
client.GetSession());
1538 job_iter->second.SetRunCount(run_count - 1);
1541 job_iter->second.SetLastTouch(current_time);
1545 if (job_affinity_id != affinity_id) {
1546 if (job_affinity_id != 0)
1548 if (affinity_id != 0)
1551 if (job_group_id != group_id) {
1552 if (job_group_id != 0)
1557 if (job_affinity_id != affinity_id || job_group_id != group_id)
1580 job = job_iter->second;
1586 unsigned int job_id,
1587 const string & job_key,
1604 "Error fetching job");
1606 CJobEvent *
event = job_iter->second.GetLastEvent();
1608 ERR_POST(
"Inconsistency: a job has no events");
1610 event = &job_iter->second.AppendEvent();
1611 event->SetNodeAddr(
client.GetAddress());
1614 event->SetTimestamp(current_time);
1615 event->SetClientNode(
client.GetNode());
1616 event->SetClientSession(
client.GetSession());
1619 job_iter->second.SetLastTouch(current_time);
1635 job_iter->second.GetAffinityId(),
1640 job = job_iter->second;
1661 job_iter->second.SetLastTouch(curr);
1668 job = job_iter->second;
1674 unsigned int job_id,
1675 const string & job_key,
1677 bool is_ns_rollback)
1702 CJobEvent *
event = &job_iter->second.AppendEvent();
1710 event->SetTimestamp(current_time);
1711 event->SetClientNode(
client.GetNode());
1712 event->SetClientSession(
client.GetSession());
1715 job_iter->second.SetLastTouch(current_time);
1718 if (is_ns_rollback) {
1751 job = job_iter->second;
1759 vector<CNetScheduleAPI::EJobStatus> statuses;
1786 string scope =
client.GetScope();
1796 unsigned int count = 0;
1797 for (; en.
valid(); ++en) {
1798 unsigned int job_id = *en;
1804 " while cancelling jobs");
1808 CJobEvent *
event = &job_iter->second.AppendEvent();
1813 event->SetTimestamp(current_time);
1814 event->SetClientNode(
client.GetNode());
1815 event->SetClientSession(
client.GetSession());
1818 job_iter->second.SetLastTouch(current_time);
1852 .
Print(
"job_phid", job_iter->second.GetNCBIPHID());
1875 const string & group,
1876 const string & aff_token,
1877 const vector<TJobStatus> & job_statuses,
1879 vector<string> & warnings)
1881 if (group.empty() && aff_token.empty() && job_statuses.empty()) {
1888 vector<TJobStatus> statuses;
1890 if (job_statuses.empty()) {
1904 statuses = job_statuses;
1910 if (!group.empty()) {
1914 jobs_to_cancel.
clear();
1915 warnings.push_back(
"eGroupNotFound:job group " + group +
1919 "' is not found. No jobs are canceled.");
1923 if (!aff_token.empty()) {
1926 jobs_to_cancel.
clear();
1927 warnings.push_back(
"eAffinityNotFound:affinity " + aff_token +
1931 "' is not found. No jobs are canceled.");
1980 unsigned int start_index =
m_LastId;
2008 unsigned int timeout,
2009 const list<string> * aff_list,
2010 bool reader_affinity,
2012 bool exclusive_new_affinity,
2013 bool prioritized_aff,
2014 const list<string> * group_list,
2015 bool affinity_may_change,
2016 bool group_may_change,
2018 bool * no_more_jobs,
2020 string & added_pref_aff)
2024 bool has_groups =
false;
2026 vector<unsigned int> aff_ids;
2031 *no_more_jobs =
false;
2036 if (reader_affinity) {
2049 if (group_list !=
NULL) {
2051 has_groups = !group_list->empty();
2053 if (aff_list !=
NULL)
2064 aff_ids_vector, aff_ids,
2067 exclusive_new_affinity,
2069 group_ids_vector, has_groups,
2073 bool outdated_job =
false;
2077 if (job_pick.
job_id == 0) {
2078 if (exclusive_new_affinity)
2082 if (job_pick.
job_id == 0) {
2084 reader_affinity, any_affinity,
2085 exclusive_new_affinity,
2087 affinity_may_change,
2089 if (timeout != 0 && port > 0)
2092 reader_affinity, any_affinity,
2093 exclusive_new_affinity,
2097 outdated_job =
true;
2107 if (exclusive_new_affinity) {
2115 if (outdated_pick.
job_id != 0) {
2116 job_pick = outdated_pick;
2117 outdated_job =
true;
2126 outdated_job ==
false) {
2138 added_pref_aff = aff_token;
2142 if (outdated_job && job_pick.
aff_id != 0) {
2149 UpdatePreferredAffinities(
2152 added_pref_aff = aff_token;
2193 unsigned int job_id,
2194 const string & job_key,
2196 const string & auth_token)
2200 job, auth_token,
"",
2209 unsigned int job_id,
2210 const string & job_key,
2212 const string & auth_token,
2213 const string & err_msg,
2218 job, auth_token, err_msg,
2227 unsigned int job_id,
2228 const string & job_key,
2230 const string & auth_token,
2231 bool is_ns_rollback,
2237 job, auth_token,
"",
2241 if (is_ns_rollback || blacklist ==
false)
2250 unsigned int job_id,
2251 const string & job_key,
2282 "Error fetching job");
2284 const vector<CJobEvent>& job_events = job_iter->second.GetEvents();
2285 if (job_events.empty())
2287 "Inconsistency: a job has no events");
2289 state_before_read = job_iter->second.GetStatusBeforeReading();
2291 CJobEvent *
event = &job_iter->second.AppendEvent();
2293 event->SetStatus(state_before_read);
2295 event->SetTimestamp(current_time);
2296 event->SetClientNode(
client.GetNode());
2297 event->SetClientSession(
client.GetSession());
2299 job_iter->second.SetStatus(state_before_read);
2300 job_iter->second.SetLastTouch(current_time);
2327 job = job_iter->second;
2333 unsigned int job_id,
2334 const string & job_key,
2336 const string & auth_token,
2337 const string & err_msg,
2339 bool is_ns_rollback,
2358 if (is_ns_rollback ==
false) {
2360 job_iter->second.CompareAuthToken(auth_token);
2363 "Invalid authorization token format");
2366 "Authorization token does not match");
2372 "Internal inconsistency detected. The job state in memory is " +
2374 " while in database it is " +
2378 target_status = job_iter->second.GetStatusBeforeReading();
2382 CJobEvent &
event = job_iter->second.AppendEvent();
2384 event.SetNodeAddr(
client.GetAddress());
2385 event.SetClientNode(
client.GetNode());
2386 event.SetClientSession(
client.GetSession());
2387 event.SetErrorMsg(err_msg);
2389 if (is_ns_rollback) {
2391 job_iter->second.SetReadCount(job_iter->second.GetReadCount() - 1);
2393 switch (target_status) {
2398 job_iter->second.SetReadCount(job_iter->second.GetReadCount() - 1);
2422 event.SetStatus(target_status);
2423 job_iter->second.SetStatus(target_status);
2424 job_iter->second.SetLastTouch(current_time);
2456 job = job_iter->second;
2482 size_t job_count = job_ids.
count();
2503 const vector<unsigned int> & aff_ids,
2504 bool use_pref_affinity,
2506 bool exclusive_new_affinity,
2507 bool prioritized_aff,
2512 string scope =
client.GetScope();
2513 string virtual_scope =
client.GetVirtualScope();
2515 if (!virtual_scope.empty()) {
2518 aff_ids, use_pref_affinity,
2520 exclusive_new_affinity,
2522 group_ids, has_groups,
2523 cmd_group, virtual_scope);
2524 if (job_pick.
job_id != 0)
2531 any_affinity, exclusive_new_affinity,
2532 prioritized_aff, group_ids, has_groups,
2540 const vector<unsigned int> & aff_ids,
2541 bool use_pref_affinity,
2543 bool exclusive_new_affinity,
2544 bool prioritized_aff,
2548 const string & scope)
2550 bool explicit_aff = !aff_ids.empty();
2551 bool effective_use_pref_affinity = use_pref_affinity;
2563 if (use_pref_affinity)
2564 effective_use_pref_affinity = use_pref_affinity && pref_aff.
any();
2566 if (explicit_aff || effective_use_pref_affinity || exclusive_new_affinity) {
2570 if (cmd_group ==
eGet)
2591 if (cmd_group ==
eRead)
2594 if (prioritized_aff) {
2597 for (vector<unsigned int>::const_iterator k = aff_ids.begin();
2598 k != aff_ids.end(); ++k) {
2600 GetJobsWithAffinity(*k);
2602 if (candidates.
any()) {
2605 for (; en.
valid(); ++en) {
2608 job_id, running_jobs_per_client)) {
2615 if (vacant_jobs.
any()) {
2618 for (; en.
valid(); ++en) {
2621 job_id, running_jobs_per_client)) {
2633 if (exclusive_new_affinity)
2638 for (; en.
valid(); ++en) {
2639 unsigned int job_id = *en;
2642 if (aff_id != 0 && explicit_aff) {
2643 if (explicit_affs.
get_bit(aff_id)) {
2645 job_id, running_jobs_per_client)) {
2651 if (aff_id != 0 && effective_use_pref_affinity) {
2652 if (pref_aff.
get_bit(aff_id)) {
2653 if (explicit_aff ==
false) {
2655 job_id, running_jobs_per_client)) {
2660 pref_aff_candidate_jobs.
set_bit(job_id);
2665 if (exclusive_new_affinity) {
2666 if (aff_id == 0 || all_pref_aff.
get_bit(aff_id) ==
false) {
2667 if (explicit_aff ==
false &&
2668 effective_use_pref_affinity ==
false) {
2670 job_id, running_jobs_per_client)) {
2675 exclusive_aff_candidate_jobs.
set_bit(job_id);
2681 for (; en1.
valid(); ++en1) {
2688 for (; en2.
valid(); ++en2) {
2702 use_pref_affinity && !effective_use_pref_affinity &&
2703 !exclusive_new_affinity &&
2704 cmd_group ==
eGet)) {
2708 bool no_scope_only = scope.
empty() ||
2710 unsigned int job_id = 0;
2720 if (cmd_group ==
eGet) {
2729 if (no_scope_only) {
2733 for (; en.
valid(); ++en) {
2734 unsigned int candidate_job_id = *en;
2735 if (jobs_in_scope.
get_bit(candidate_job_id))
2737 if (!group_jobs.
get_bit(candidate_job_id))
2740 running_jobs_per_client)) {
2741 job_id = candidate_job_id;
2746 for (; en.
valid(); ++en) {
2747 unsigned int candidate_job_id = *en;
2748 if (jobs_in_scope.
get_bit(candidate_job_id))
2751 running_jobs_per_client)) {
2752 job_id = candidate_job_id;
2759 for (; en.
valid(); ++en) {
2760 unsigned int candidate_job_id = *en;
2761 if (jobs_in_scope.
get_bit(candidate_job_id))
2763 if (!restricted_jobs.
get_bit(candidate_job_id))
2766 running_jobs_per_client)) {
2767 job_id = candidate_job_id;
2773 if (no_scope_only) {
2802 restricted_jobs,
true);
2819 for (; en.
valid(); ++en) {
2822 string client_ip = job_iter->second.GetClientIP();
2823 auto iter = ret.
find(client_ip);
2824 if (iter == ret.
end()) {
2837 unsigned int job_id,
2840 if (jobs_per_client_ip.
empty())
2847 string client_ip = job_iter->second.GetClientIP();
2848 auto iter = jobs_per_client_ip.
find(client_ip);
2849 if (iter == jobs_per_client_ip.
end())
2857 unsigned int picked_earlier,
2863 string scope =
client.GetScope();
2864 string virtual_scope =
client.GetVirtualScope();
2866 if (!virtual_scope.empty()) {
2871 if (job_pick.
job_id != 0)
2884 unsigned int picked_earlier,
2886 const string & scope)
2892 if (picked_earlier != 0)
2893 outdated_pending.
set_bit(picked_earlier,
false);
2902 if (group_ids.
any())
2905 if (!outdated_pending.
any())
2919 unsigned int picked_earlier,
2925 string scope =
client.GetScope();
2926 string virtual_scope =
client.GetVirtualScope();
2928 if (!virtual_scope.empty()) {
2934 if (job_pick.
job_id != 0)
2947 unsigned int picked_earlier,
2949 const string & scope)
2955 if (picked_earlier != 0)
2956 outdated_read_jobs.
set_bit(picked_earlier,
false);
2959 outdated_read_jobs);
2966 if (group_ids.
any())
2969 if (!outdated_read_jobs.
any())
2972 unsigned int job_id = *outdated_read_jobs.
first();
2974 return x_SJobPick(job_id, aff_id != 0, aff_id);
2979 unsigned int job_id,
2980 const string & job_key,
2982 const string & auth_token,
2983 const string & err_msg,
2989 unsigned failed_retries;
2990 unsigned max_output_size;
2997 if (
output.size() > max_output_size) {
2999 "Output is too long");
3003 bool rescheduled =
false;
3024 "Error fetching job");
3026 if (!auth_token.empty()) {
3029 job_iter->second.CompareAuthToken(auth_token);
3032 "Invalid authorization token format");
3035 "Authorization token does not match");
3040 "passport matched.");
3041 warning =
"eJobPassportOnlyMatch:Only job passport "
3042 "matched. Command is ignored.";
3043 job = job_iter->second;
3049 CJobEvent *
event = job_iter->second.GetLastEvent();
3051 ERR_POST(
"No JobEvent for running job");
3053 event = &job_iter->second.AppendEvent();
3059 event->SetTimestamp(curr);
3060 event->SetErrorMsg(err_msg);
3061 event->SetRetCode(ret_code);
3062 event->SetNodeAddr(
client.GetAddress());
3063 event->SetClientNode(
client.GetNode());
3064 event->SetClientSession(
client.GetSession());
3069 rescheduled =
false;
3072 "unconditionally, no_retries = 1");
3074 unsigned run_count = job_iter->second.GetRunCount();
3075 if (run_count <= failed_retries) {
3086 rescheduled =
false;
3089 "max number of retries ("
3090 << failed_retries <<
")");
3094 job_iter->second.SetOutput(
output);
3095 job_iter->second.SetLastTouch(curr);
3137 job = job_iter->second;
3149 bool & client_was_found,
3150 string & old_session,
3151 bool & had_wn_pref_affs,
3152 bool & had_reader_pref_affs)
3163 client_was_found, old_session,
3164 had_wn_pref_affs, had_reader_pref_affs);
3168 if (running_jobs.
any())
3170 if (reading_jobs.
any())
3186 if (outdated_jobs.
any())
3200 if (outdated_jobs.
any())
3210 static size_t skip_limit = 0;
3211 static size_t skip_count;
3220 if (skip_count < skip_limit)
3263 string scope =
client.GetScope();
3268 else if (!scope.empty())
3281 string scope =
client.GetScope();
3286 else if (!scope.empty())
3316 for ( ;en.
valid(); ++en) {
3318 *en, curr, logging);
3325 unsigned int job_id,
3355 CJobEvent *
event = job_iter->second.GetLastEvent();
3356 time_start =
event->GetTimestamp();
3357 run_timeout = job_iter->second.GetRunTimeout();
3359 run_timeout = queue_run_timeout;
3366 read_timeout = job_iter->second.GetReadTimeout();
3368 read_timeout = queue_read_timeout;
3377 exp_time = time_start + run_timeout;
3379 exp_time = time_start + read_timeout;
3381 if (curr_time < exp_time) {
3398 new_status = job_iter->second.GetStatusBeforeReading();
3403 job_iter->second.SetStatus(new_status);
3404 job_iter->second.SetLastTouch(curr_time);
3406 event = &job_iter->second.AppendEvent();
3407 event->SetStatus(new_status);
3408 event->SetEvent(event_type);
3409 event->SetTimestamp(curr_time);
3477 purpose =
"execution";
3479 purpose =
"reading";
3482 .
Print(
"msg",
"Timeout expired, rescheduled for " + purpose)
3483 .
Print(
"msg_code",
"410")
3487 .
Print(
"run_counter", job_iter->second.GetRunCount())
3488 .
Print(
"read_counter", job_iter->second.GetReadCount())
3491 .
Print(
"run_timeout", run_timeout)
3492 .
Print(
"read_timeout", read_timeout);
3504 unsigned int last_job,
3509 unsigned int job_id;
3523 if (last_job != 0 && job_id >= last_job)
3553 if (
result.deleted > 0) {
3557 for (; en.
valid(); ++en) {
3558 unsigned int id = *en;
3616 unsigned int add_job_id,
3641 unsigned int del_rec = 0;
3645 while (en.
valid() && del_rec < max_deleted) {
3652 unsigned int job_id = *en;
3655 if (del_count > 0) {
3674 for (; en.
valid(); ++en) {
3708 unsigned int del_limit = aff_reg_settings.
high_removal;
3714 unsigned int candidates_size =
3717 if (candidates_size <
3744 unsigned int del_limit = group_reg_settings.
high_removal;
3745 if (group_dict_size <
3750 unsigned int candidates_size =
3753 if (candidates_size <
3802 unsigned int job_id,
3816 string scope =
client.GetScope();
3824 }
else if (!scope.empty()) {
3833 job_dump.reserve(2048);
3839 job_dump = job_iter->second.Print(dump_fields,
3843 job_dump.append(
"OK:GC erase time: ")
3846 if (dump_fields &
eScope)
3847 job_dump.append(
"OK:scope: '")
3859 const string & group,
3860 const string & aff_token,
3861 const vector<TJobStatus> & job_statuses,
3862 unsigned int start_after_job_id,
3871 vector<TJobStatus> statuses;
3874 if (job_statuses.empty()) {
3888 statuses = job_statuses;
3893 string scope =
client.GetScope();
3898 if (!group.empty()) {
3902 jobs_to_dump.
clear();
3905 "' is not found. No jobs to dump.");
3909 if (!aff_token.empty()) {
3912 jobs_to_dump.
clear();
3915 "' is not found. No jobs to dump.");
3923 }
else if (!scope.empty()) {
3929 return x_DumpJobs(jobs_to_dump, start_after_job_id, count,
3930 dump_fields, order_first);
3935 unsigned int start_after_job_id,
3940 if (!jobs_to_dump.
any())
3944 size_t skipped_jobs = 0;
3946 while (en.
valid() && *en <= start_after_job_id) {
3951 if (count > 0 && !order_first) {
3952 size_t total_jobs = jobs_to_dump.
count();
3953 size_t jobs_left = total_jobs - skipped_jobs;
3954 while (jobs_left > count) {
3970 size_t read_jobs = 0;
3971 size_t printed_count = 0;
3973 for ( ; en.
valid(); ) {
3980 buffer[read_jobs] = job_iter->second;
3985 if (printed_count >= count)
3993 one_job.reserve(2048);
3994 for (
size_t index = 0; index < read_jobs; ++index) {
4001 unsigned int job_id =
buffer[index].GetId();
4008 one_job.append(
"OK:GC erase time: ")
4012 if (dump_fields &
eScope)
4013 one_job.append(
"OK:scope: '")
4022 if (printed_count >= count)
4056 bool & client_was_found,
4057 bool & session_was_reset,
4058 string & old_session,
4059 bool & had_wn_pref_affs,
4060 bool & had_reader_pref_affs)
4073 client_was_found, session_was_reset,
4074 old_session, had_wn_pref_affs,
4075 had_reader_pref_affs);
4079 if (session_was_reset) {
4080 if (running_jobs.
any())
4082 if (reading_jobs.
any())
4112 unsigned int job_id,
4122 ERR_POST(
"Cannot fetch job to reset it due to " <<
4139 new_status = job_iter->second.GetStatusBeforeReading();
4144 job_iter->second.SetStatus(new_status);
4145 job_iter->second.SetLastTouch(current_time);
4147 CJobEvent *
event = &job_iter->second.AppendEvent();
4149 event->SetEvent(event_type);
4150 event->SetTimestamp(current_time);
4151 event->SetClientNode(
client.GetNode());
4152 event->SetClientSession(
client.GetSession());
4212 ERR_POST(
"Error resetting a running job when worker node is "
4228 ERR_POST(
"Error resetting a reading job when worker node is "
4244 ERR_POST(
"Error resetting a running job when worker node "
4260 ERR_POST(
"Error resetting a reading job when worker node "
4268 unsigned short port,
4269 unsigned int timeout,
4273 bool exclusive_new_affinity,
4280 exclusive_new_affinity, new_format,
4291 unsigned short port,
4292 unsigned int timeout,
4296 bool exclusive_new_affinity,
4301 reader_aff, any_aff,
4302 exclusive_new_affinity,
true,
4309 unsigned short port)
4340 ctx->SetRequestID();
4349 aff_count += affinities;
4352 extra.
Print(
"_type",
"statistics_thread")
4355 .
Print(
"affinities", affinities)
4380 vector<TJobStatus> statuses;
4407 .append(
"OK:garbage_jobs: ")
4410 .append(
"OK:affinity_registry_size: ")
4413 .append(
"OK:client_registry_size: ")
4421 const string & group_token,
4422 const string & aff_token,
4424 vector<string> & warnings)
const
4430 if (!group_token.empty()) {
4434 warnings.push_back(
"eGroupNotFound:job group " + group_token +
4438 if (!aff_token.empty()) {
4441 warnings.push_back(
"eAffinityNotFound:affinity " + aff_token +
4447 if (!warnings.empty())
4451 string scope =
client.GetScope();
4457 if (!group_token.empty())
4458 candidates &= group_jobs;
4459 if (!aff_token.empty())
4460 candidates &= aff_jobs;
4467 }
else if (!scope.empty()) {
4472 jobs[index] = candidates.
count();
4478 const string & group_token,
4479 const string & aff_token,
4480 vector<string> & warnings)
const
4490 if (warnings.empty()) {
4494 ": " + to_string(jobs_per_state[index]) +
"\n";
4495 total += jobs_per_state[index];
4497 result +=
"OK:Total: " + to_string(total) +
"\n";
4505 vector<CNetScheduleAPI::EJobStatus> statuses;
4517 bool need_notifications = (status ==
eNoPause &&
4521 if (need_notifications)
4529 unsigned short port,
4538 const string & auth_token,
4549 if (!auth_token.empty()) {
4552 job_iter->second.CompareAuthToken(auth_token);
4555 "Invalid authorization token format");
4558 "Authorization token does not match");
4563 "passport matched.");
4569 CJobEvent *
event = &job_iter->second.AppendEvent();
4572 event->SetTimestamp(curr);
4573 event->SetRetCode(ret_code);
4575 event->SetClientNode(
client.GetNode());
4576 event->SetClientSession(
client.GetSession());
4577 event->SetNodeAddr(
client.GetAddress());
4580 job_iter->second.SetOutput(
output);
4581 job_iter->second.SetLastTouch(curr);
4583 job = job_iter->second;
4591 unsigned int job_id,
4599 CJobEvent &
event = job_iter->second.AppendEvent();
4601 event.SetNodeAddr(
client.GetAddress());
4602 event.SetClientNode(
client.GetNode());
4603 event.SetClientSession(
client.GetSession());
4605 if (cmd_group ==
eGet) {
4613 job_iter->second.SetLastTouch(curr);
4614 if (cmd_group ==
eGet) {
4616 job_iter->second.SetRunTimeout(
kTimeZero);
4617 job_iter->second.SetRunCount(job_iter->second.GetRunCount() + 1);
4620 job_iter->second.SetReadTimeout(
kTimeZero);
4621 job_iter->second.SetReadCount(job_iter->second.GetReadCount() + 1);
4624 job = job_iter->second;
4632 vector<TJobStatus> statuses;
4651 if (!jobs_to_dump.
any())
4656 FILE * jobs_file =
NULL;
4665 jobs_file = fopen(jobs_file_name.c_str(),
"wb");
4666 if (jobs_file ==
NULL)
4667 throw runtime_error(
"Cannot open file " + jobs_file_name +
4671 setbuf(jobs_file,
NULL);
4675 header.
Write(jobs_file);
4678 for ( ; en.
valid(); ++en) {
4681 ERR_POST(
"Dump at SHUTDOWN: error fetching job " <<
4686 job_iter->second.Dump(jobs_file);
4688 }
catch (
const exception & ex) {
4689 if (jobs_file !=
NULL)
4692 throw runtime_error(
"Error dumping queue " +
m_QueueName +
4693 ": " +
string(ex.what()));
4707 if (access(jobs_file_name.c_str(), F_OK) != -1)
4708 remove(jobs_file_name.c_str());
4722 unsigned int recs = 0;
4724 FILE * jobs_file =
NULL;
4726 if (!
CDir(dump_dname).Exists())
4728 if (!
CFile(jobs_file_name).Exists())
4735 jobs_file = fopen(jobs_file_name.c_str(),
"rb");
4736 if (jobs_file ==
NULL)
4737 throw runtime_error(
"Cannot open file " + jobs_file_name +
4738 " to load dumped jobs");
4741 header.
Read(jobs_file);
4747 input_buf.
get(), output_buf.
get(),
4749 unsigned int job_id = job.
GetId();
4787 aff_id, group_id, expiration);
4798 }
catch (
const exception & ex) {
4799 if (jobs_file !=
NULL)
4803 throw runtime_error(
"Error loading queue " +
m_QueueName +
4804 " from its dump: " +
string(ex.what()));
4806 if (jobs_file !=
NULL)
4810 throw runtime_error(
"Unknown error loading queue " +
m_QueueName +
4839 const string & job_key,
4843 string notification;
4852 job, job_key, job_status, reason);
4862 if (notification.empty())
4864 job, job_key, job_status, reason);
static const struct attribute attributes[]
Algorithms for bvector<> (main include)
void Release()
Manually force the resource to be released.
static std::string EventToString(EJobEvent event)
void SetNodeAddr(unsigned int node_ip)
void SetStatus(TJobStatus status)
void SetTimestamp(const CNSPreciseTime &t)
CNSPreciseTime GetLifetime(unsigned int job_id) const
bool DeleteIfTimedOut(unsigned int job_id, const CNSPreciseTime ¤t_time, unsigned int *aff_id, unsigned int *group_id)
void UpdateReadVacantTime(unsigned int job_id, const CNSPreciseTime &read_vacant_time)
void RegisterJob(unsigned int job_id, const CNSPreciseTime &submit_time, unsigned int aff_id, unsigned int group_id, const CNSPreciseTime &life_time)
void UpdateLifetime(unsigned int job_id, const CNSPreciseTime &life_time)
unsigned int GetAffinityID(unsigned int job_id) const
bool IsOutdatedJob(unsigned int job_id, ECommandGroup cmd_group, const CNSPreciseTime &timeout) const
void ChangeAffinityAndGroup(unsigned int job_id, unsigned int aff_id, unsigned int group_id)
unsigned GetNext(TJobStatus status, unsigned job_id) const
void SetStatus(unsigned int job_id, TJobStatus status)
void Erase(unsigned job_id)
void GetJobs(const vector< TJobStatus > &statuses, TNSBitVector &jobs) const
void AddPendingJob(unsigned int job_id)
void StatusStatistics(TJobStatus status, TNSBitVector::statistics *st) const
vector< unsigned int > GetJobCounters(const vector< TJobStatus > &statuses) const
void SetExactStatusNoLock(unsigned int job_id, TJobStatus status, bool set_clear)
unsigned int CountStatus(TJobStatus status) const
TJobStatus GetStatus(unsigned job_id) const
TNSBitVector GetOutdatedReadVacantJobs(CNSPreciseTime timeout, const TNSBitVector &read_jobs, const CJobGCRegistry &gc_registry) const
void ClearAll(TNSBitVector *bv)
void AddPendingBatch(unsigned job_id_from, unsigned job_id_to)
unsigned int GetJobByStatus(TJobStatus status, const TNSBitVector &unwanted_jobs, const TNSBitVector &restrict_jobs, bool restricted) const
TNSBitVector GetOutdatedPendingJobs(CNSPreciseTime timeout, const CJobGCRegistry &gc_registry) const
void SetPassport(unsigned int passport)
unsigned GetRunCount() const
CNSPreciseTime GetTimeout() const
bool GetSubmNeedProgressMsgNotif() const
unsigned GetSubmAddr() const
bool ShouldNotifyListener(const CNSPreciseTime ¤t_time) const
void SetAffinityId(unsigned aff_id)
void SetLastTouch(const CNSPreciseTime &t)
unsigned int GetListenerNotifAddr() const
unsigned short GetSubmNotifPort() const
const string & GetInput() const
unsigned GetGroupId() const
CNSPreciseTime GetRunTimeout() const
void SetGroupId(unsigned id)
unsigned short GetListenerNotifPort() const
TJobStatus GetStatus() const
CJobEvent & AppendEvent()
CNSPreciseTime GetSubmitTime(void) const
bool GetLsnrNeedProgressMsgNotif() const
bool ShouldNotifySubmitter(const CNSPreciseTime ¤t_time) const
unsigned GetAffinityId() const
CNSPreciseTime GetExpirationTime(const CNSPreciseTime &queue_timeout, const CNSPreciseTime &queue_run_timeout, const CNSPreciseTime &queue_read_timeout, const CNSPreciseTime &queue_pending_timeout, const CNSPreciseTime &event_time) const
CNSPreciseTime GetLastTouch() const
CNSPreciseTime GetReadTimeout() const
bool LoadFromDump(FILE *jobs_file, char *input_buf, char *output_buf, const SJobDumpHeader &header)
void RemoveJobFromAffinity(unsigned int job_id, unsigned int aff_id)
unsigned int ResolveAffinity(const string &token)
TNSBitVector GetJobsWithAffinity(unsigned int aff_id) const
void Dump(const string &dump_dir_name, const string &queue_name) const
void FinalizeAffinityDictionaryLoading(void)
TNSBitVector GetJobsWithAffinities(const TNSBitVector &affs) const
void RemoveDump(const string &dump_dir_name, const string &queue_name) const
bool CanAccept(const string &aff_token, size_t max_records) const
string Print(const CQueue *queue, const CNSClientsRegistry &clients_registry, const TNSBitVector &scope_jobs, const string &scope, size_t batch_size, bool verbose) const
string GetTokenByID(unsigned int aff_id) const
void ResolveAffinities(const list< string > &tokens, TNSBitVector &resolved_affs, vector< unsigned int > &aff_ids)
TNSBitVector GetRegisteredAffinities(void) const
unsigned int CollectGarbage(unsigned int max_to_del)
void LoadFromDump(const string &dump_dir_name, const string &queue_name)
unsigned int GetIDByToken(const string &aff_token) const
unsigned int CheckRemoveCandidates(void)
unsigned int ResolveAffinityToken(const string &token, unsigned int job_id, unsigned int client_id, ECommandGroup command_group)
void AddJobToAffinity(unsigned int job_id, unsigned int aff_id)
void AppendType(const CNSClientId &client, unsigned int type_to_append)
bool IsPreferredByAny(unsigned int aff_id, ECommandGroup cmd_group) const
void RegisterJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void SetPreferredAffinities(const CNSClientId &client, const TNSBitVector &aff_to_set, ECommandGroup cmd_group)
void SubtractBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
void Purge(const CNSPreciseTime ¤t_time, const CNSPreciseTime &timeout_worker_node, unsigned int min_worker_nodes, const CNSPreciseTime &timeout_admin, unsigned int min_admins, const CNSPreciseTime &timeout_submitter, unsigned int min_submitters, const CNSPreciseTime &timeout_reader, unsigned int min_readers, const CNSPreciseTime &timeout_unknown, unsigned int min_unknowns, bool is_log)
void StaleNodes(const CNSPreciseTime ¤t_time, const CNSPreciseTime &wn_timeout, const CNSPreciseTime &reader_timeout, bool is_log)
void AddToSubmitted(const CNSClientId &client, size_t count)
string PrintClientsList(const CQueue *queue, size_t batch_size, bool verbose) const
void UnregisterJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
TNSBitVector GetAllPreferredAffinities(ECommandGroup cmd_group) const
void RegisterBlacklistedJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
bool CancelWaiting(CNSClient &client, ECommandGroup cmd_group, bool touch_notif_registry=true)
void SetBlacklistTimeouts(const CNSPreciseTime &blacklist_timeout, const CNSPreciseTime &read_blacklist_timeout)
void MoveJobToBlacklist(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void ClearClient(const CNSClientId &client, TNSBitVector &running_jobs, TNSBitVector &reading_jobs, bool &client_was_found, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
void GCBlacklistedJobs(const CJobStatusTracker &tracker, ECommandGroup cmd_group)
void UpdatePreferredAffinities(const CNSClientId &client, const TNSBitVector &aff_to_add, const TNSBitVector &aff_to_del, ECommandGroup cmd_group)
void Touch(CNSClientId &client, TNSBitVector &running_jobs, TNSBitVector &reading_jobs, bool &client_was_found, bool &session_was_reset, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
bool GetAffinityReset(const CNSClientId &client, ECommandGroup cmd_group) const
void SetLastScope(const CNSClientId &client)
void MarkAsAdmin(const CNSClientId &client)
void RegisterSocketWriteError(const CNSClientId &client)
int SetClientData(const CNSClientId &client, const string &data, int data_version)
void SetRegistries(CNSAffinityRegistry *aff_registry, CNSNotificationList *notif_registry)
void SetNodeWaiting(const CNSClientId &client, unsigned short port, const TNSBitVector &aff_ids, ECommandGroup cmd_group)
TNSBitVector GetPreferredAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
bool WasGarbageCollected(const CNSClientId &client, ECommandGroup cmd_group) const
void AddBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
void RemoveJob(unsigned int group_id, unsigned int job_id)
void RemoveDump(const string &dump_dir_name, const string &queue_name) const
unsigned int CollectGarbage(unsigned int max_to_del)
unsigned int CheckRemoveCandidates(void)
TNSBitVector GetJobs(const string &group, bool allow_exception=true) const
unsigned int ResolveGroup(const string &group)
bool CanAccept(const string &group, size_t max_records) const
void AddJobToGroup(unsigned int group_id, unsigned int job_id)
unsigned int AddJob(const string &group, unsigned int job_id)
unsigned int AddJobs(unsigned int group_id, unsigned int first_job_id, unsigned int count)
void FinalizeGroupDictionaryLoading(void)
string Print(const CQueue *queue, const TNSBitVector &scope_jobs, const string &scope, size_t batch_size, bool verbose) const
void ResolveGroups(const list< string > &tokens, TNSBitVector &group_ids_vector)
void RestrictByGroup(const string &group, TNSBitVector &bv) const