NCBI C++ ToolKit
netschedule_api.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: netschedule_api.cpp 88604 2019-12-20 15:57:19Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Anatoliy Kuznetsov, Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  * Implementation of NetSchedule API.
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "netschedule_api_impl.hpp"
36 
37 #include <connect/ncbi_socket.hpp>
40 
41 #include <corelib/ncbi_system.hpp>
43 
44 #include <array>
45 #include <memory>
46 #include <stdio.h>
47 
48 
49 #define COMPATIBLE_NETSCHEDULE_VERSION "4.10.0"
50 
51 
53 
54 namespace grid {
55 namespace netschedule {
56 namespace limits {
57 
58 void ThrowIllegalChar(const string& name, const string& value, char c)
59 {
60  NCBI_THROW_FMT(CConfigException, eInvalidParameter,
61  "Invalid character '" << NStr::PrintableString(CTempString(&c, 1)) <<
62  "' in the " << name << " \"" << NStr::PrintableString(value) << "\".");
63 }
64 
65 }
66 }
67 }
68 
69 using namespace grid::netschedule;
70 
72  SNetScheduleAPIImpl* ns_api) :
73  m_API(ns_api),
74  m_StopThread(false)
75 {
76 }
77 
79 {
80  STimeout rto;
81  rto.sec = rto.usec = 0;
82  socket.SetDataLogging(TServConn_ConnDataLogging::GetDefault() ? eOn : eOff);
83  socket.SetTimeout(eIO_Read, &rto);
84 
85  EIO_Status status = socket.Bind(0);
86  if (status != eIO_Success) {
88  "Could not bind a UDP socket: " << IO_StatusStr(status));
89  }
90 
92 }
93 
95  string* cmd, unsigned remaining_seconds)
96 {
97  if (remaining_seconds > 0) {
98  *cmd += " port=";
100 
101  *cmd += " timeout=";
102  *cmd += NStr::UIntToString(remaining_seconds);
103  }
104 }
105 
107 {
108 }
109 
111 try {
112  return CUrlArgs(output);
113 }
114 catch (CUrlParserException&) {
115  return CUrlArgs();
116 }
117 
120 {
121 }
122 
123 const string& SNetScheduleOutputParser::operator()(const string& param) const
124 {
125  auto it = FindFirst(param);
126  return it != GetArgs().end() ? it->value : kEmptyStr;
127 }
128 
129 int g_ParseNSOutput(const string& attr_string, const char* const* attr_names,
130  string* attr_values, size_t attr_count)
131 {
132  try {
133  CUrlArgs attr_parser(attr_string);
134  const CUrlArgs::TArgs& attr_list = attr_parser.GetArgs();
135 
136  int found_attrs = 0;
137 
138  CUrlArgs::const_iterator attr_it;
139 
140  do {
141  if ((attr_it = attr_parser.FindFirst(*attr_names)) !=
142  attr_list.end()) {
143  *attr_values = attr_it->value;
144  ++found_attrs;
145  }
146  ++attr_names;
147  ++attr_values;
148  } while (--attr_count > 0);
149 
150  return found_attrs;
151  }
152  catch (CUrlParserException&) {
153  }
154 
155  return -1;
156 }
157 
160 {
161  _ASSERT(ns_node);
162 
164 
165  if (parser("queue") != m_API->m_Queue) return eNT_Unknown;
166 
167  *ns_node = parser("ns_node");
168 
169  const auto reason = parser("reason");
170 
171  if (reason.empty())
172  return eNT_GetNotification;
173  else if (NStr::CompareCase(reason, CTempString("get", 3)) == 0)
174  return eNT_GetNotification;
175  else if (NStr::CompareCase(reason, CTempString("read", 4)) == 0)
176  return eNT_ReadNotification;
177  else
178  return eNT_Unknown;
179 }
180 
182 {
183  CFastMutexGuard guard(m_Mutex);
184 
185  if (m_Interrupted)
187  else {
188  m_Interrupted = true;
189  if (!m_ReadyServers.empty())
191  }
192 
194 }
195 
196 void SServerNotifications::RegisterServer(const string& ns_node)
197 {
198  CFastMutexGuard guard(m_Mutex);
199 
200  if (!m_ReadyServers.empty())
201  m_Interrupted = false;
202  else {
205  }
206 
207  m_ReadyServers.insert(ns_node);
208 }
209 
211 {
212  CFastMutexGuard guard(m_Mutex);
213 
215 
216  if (m_ReadyServers.empty())
217  return false;
218 
220  *ns_node = *next_server;
221  m_ReadyServers.erase(next_server);
222 
223  if (m_ReadyServers.empty())
224  // Make sure the notification semaphore count is reset to zero.
226 
227  return true;
228 }
229 
231 {
233  (CNcbiApplication::Instance()->GetProgramDisplayName() +
234  "_nt").c_str());
235 
236  static const STimeout two_seconds = {2, 0};
237 
238  string server_host;
239 
240  while (!m_StopThread)
241  if (m_Receiver.socket.Wait(&two_seconds) == eIO_Success) {
242  if (m_StopThread)
243  break;
244 
245  if (m_Receiver(&server_host)) {
246  string ns_node;
247 
248  ENotificationType notif_type = CheckNotification(&ns_node);
249 
250  switch (notif_type) {
251  case eNT_GetNotification:
253  break;
256  break;
257  default:
258  break;
259  }
260  }
261  }
262  return NULL;
263 }
264 
266 {
267  array<char, 64 * 1024> buffer; // Enough to hold any UDP
268  size_t msg_len;
269 
270  if (socket.Recv(buffer.data(), buffer.size(), &msg_len,
271  server_host, NULL) != eIO_Success)
272  return false;
273 
274  _ASSERT(buffer.size() > msg_len);
275  buffer[msg_len] = '\0'; // Make it null-terminated in case it's not
276  message.assign(buffer.data()); // Ignore everything after the first null character
277 
278  return true;
279 }
280 
282 {
283  return m_Receiver(server_host);
284 }
285 
287  const CDeadline& deadline, string* server_host)
288 {
289  STimeout timeout;
290 
291  for (;;) {
292  deadline.GetRemainingTime().Get(&timeout.sec, &timeout.usec);
293 
294  if (timeout.sec == 0 && timeout.usec == 0)
295  return false;
296 
297  switch (m_Receiver.socket.Wait(&timeout)) {
298  case eIO_Timeout:
299  return false;
300 
301  case eIO_Success:
302  if (ReceiveNotification(server_host))
303  return true;
304  /* FALL THROUGH */
305 
306  default:
307  break;
308  }
309  }
310 
311  return false;
312 }
313 
315 {
316  printf("Using UDP port %hu\n", GetPort());
317 }
318 
320 {
322 
323  if (m_NotificationThread == NULL)
325 }
326 
328 {
331 }
332 
334 {
337 
338  if (m_NotificationThread != NULL) {
340  CDatagramSocket().Send("INTERRUPT", sizeof("INTERRUPT"),
341  "127.0.0.1", m_NotificationThread->m_Receiver.port);
343  }
344  }
345 }
346 
348 {
349  string cmd("CLRN");
351 
352  for (CNetServiceIterator it =
354  CNetServer server = *it;
355 
356  try {
357  CNetServer::SExecResult exec_result;
358  server->ConnectAndExec(cmd, false, exec_result);
359  } catch (CNetSrvConnException& e) {
360  if (m_Service.IsLoadBalanced()) {
362  ": " << e.what());
363  }
364  } catch (CNetServiceException& e) {
366  throw;
367  else {
369  ": " << e.what());
370  }
371  }
372  }
373 }
374 
376 {
377  return ns_conf ? "netschedule_conf_from_netschedule" : "netcache_conf_from_netschedule";
378 }
379 
381  m_Registry(registry), m_Sections(sections), m_NsConf(ns_conf), m_Mode(eImplicit)
382 {
383  sections.Insert(s_GetSection(m_NsConf));
384 
385  const auto param = "load_config_from_ns";
386 
387  if (m_Registry.Has(m_Sections, param)) {
388  m_Mode = m_Registry.Get(m_Sections, param, true) ? eExplicit : eOff;
389  }
390 }
391 
392 bool CNetScheduleConfigLoader::Transform(const string& prefix, string& name) const
393 {
394  if (m_NsConf) {
395  // If it's "service to queue" special case (we do not know queue name)
396  if (name == "queue_name") return true;
397 
398  // Queue parameter "timeout" determines the initial TTL of a submitted job.
399  // Since "timeout" is too generic, replaced it with "job_ttl" on client side.
400  if (name == "timeout") {
401  name = "job_ttl";
402  return true;
403  }
404  }
405 
406  // Do not load client_name from server
407  if (name == "client_name") return false;
408 
409  // Only params starting with provided prefix are used
410  if (NStr::StartsWith(name, prefix)) {
411  name.erase(0, prefix.size());
412  return true;
413  }
414 
415  return false;
416 }
417 
419 {
420  _ASSERT(impl);
421 
422  if (m_Mode == eOff) return false;
423 
424  // Turn off any subsequent attempts
425  const auto mode = m_Mode;
426  m_Mode = eOff;
427 
428  // Errors could happen when we try to load config from servers that either
429  // do not support "GETP2" command (introduced in 4.16.9)
430  // or, do not support "QINF2 service=name" command (introduced in 4.17.0)
431  // or, do not have "service to queue" mapping set
432  // or, are not actually NetSchedule servers but worker nodes
433  // or, are currently not reachable (behind some firewall)
434  // and need cross connectivity which is usually enabled later
435  //
436  // This guard is set to suppress errors and avoid retries if config loading is not enabled explicitly
437  const auto retry_mode = mode == eImplicit ?
440  auto retry_guard = impl->m_Service->CreateRetryGuard(retry_mode);
441 
442  CNetScheduleAPI::TQueueParams queue_params;
443 
444  try {
445  impl->GetQueueParams(kEmptyStr, queue_params);
446  }
447  catch (...) {
448  if (mode == eExplicit) throw;
449  return false;
450  }
451 
452  CRef<CMemoryRegistry> mem_registry(new CMemoryRegistry);
453  const string prefix = m_NsConf ? "ns." : "nc.";
454  const string section = s_GetSection(m_NsConf);
455 
456  for (auto& param : queue_params) {
457  auto name = param.first;
458 
459  if (Transform(prefix, name)) {
460  mem_registry->Set(section, name, param.second);
461  }
462  }
463 
464  if (mem_registry->Empty()) return false;
465 
466  m_Registry.Add(mem_registry.GetObject());
467  return true;
468 }
469 
471 {
472  string auth(m_Service->MakeAuthString());
473 
474  const CVersionAPI* version = nullptr;
475  string name;
476 
477  {{
479  version = &app->GetFullVersion();
480  name = app->GetProgramDisplayName();
481  }
482  }}
483 
484  if (version && m_ProgramVersion.empty()) {
485  m_ProgramVersion += name;
486  auto package_name = version->GetPackageName();
487 
488  if (!package_name.empty()) {
489  m_ProgramVersion += ": ";
490  m_ProgramVersion += package_name;
491  m_ProgramVersion += ' ';
492  m_ProgramVersion += version->GetPackageVersion().Print();
493  m_ProgramVersion += " built on ";
494  m_ProgramVersion += version->GetBuildInfo().date;
495  }
496  }
497 
498  if (!m_ProgramVersion.empty()) {
499  auth += " prog=\"";
500  auth += m_ProgramVersion;
501  auth += '\"';
502  }
503 
504  switch (m_ClientType) {
506  auth += " client_type=\"admin\"";
507  break;
508 
510  auth += " client_type=\"submitter\"";
511  break;
512 
514  auth += " client_type=\"worker node\"";
515  break;
516 
518  auth += " client_type=\"reader\"";
519 
520  default: /* eCT_Auto */
521  break;
522  }
523 
524  if (!m_ClientNode.empty()) {
525  auth += " client_node=\"";
526  auth += m_ClientNode;
527  auth += '\"';
528  }
529 
530  if (!m_ClientSession.empty()) {
531  auth += " client_session=\"";
532  auth += m_ClientSession;
533  auth += '\"';
534  }
535 
536  if (version) {
537  auth += " client_version=\"";
538  auth += version->GetVersionInfo().Print();
539  auth += '\"';
540  }
541 
543  auth += it->second;
544  }
545 
546  auth += " ns_compat_ver=\"" COMPATIBLE_NETSCHEDULE_VERSION "\""
547  "\r\n";
548 
549  auth += m_Queue;
550 
551  // Make the auth token look like a command to be able to
552  // check for potential authentication/initialization errors
553  // like the "queue not found" error.
554  if (m_Mode & fNonWnCompatible) {
555  auth += "\r\nVERSION";
556  }
557 
558  return auth;
559 }
560 
562 {
563  return [] { return new SNetScheduleServerProperties; };
564 }
565 
567 {
568  return new CNetScheduleServerListener(*this);
569 }
570 
572 {
574 
575  m_RetryOnException = registry.Get(sections, "enforce_retry_policy", false);
576 
577  if (!m_Queue.empty()) limits::Check<limits::SQueueName>(m_Queue);
578 
579  const string& user(GetDiagContext().GetUsername());
580  m_ClientNode =
581  m_Service->GetClientName() + "::" +
582  (user.empty() ? kEmptyStr : user + '@') +
584 
585  CNetScheduleConfigLoader loader(registry, sections);
586 
587  bool affinities_initialized = false;
588 
589  // There are two phases of Init in case we need to load config from server
590  // 1) Setup as much as possible and try to get config from server
591  // 2) Setup everything using received config from server
592  do {
593  if (m_Queue.empty()) {
594  m_Queue = registry.Get(sections, "queue_name", "");
595  if (!m_Queue.empty()) limits::Check<limits::SQueueName>(m_Queue);
596  }
597 
598  m_UseEmbeddedStorage = registry.Get(sections, { "use_embedded_storage", "use_embedded_input" }, true);
599  m_JobGroup = registry.Get(sections, "job_group", "");
600  m_JobTtl = registry.Get(sections, "job_ttl", 0);
601  m_ClientNode = registry.Get(sections, "client_node", m_ClientNode);
602  GetListener()->Scope() = registry.Get(sections, "scope", "");
603 
604  if (!affinities_initialized && registry.Get(sections, "use_affinities", false)) {
605  affinities_initialized = true;
606  InitAffinities(registry, sections);
607  }
608 
609  if (!m_ClientNode.empty()) {
612  NStr::NumericToString(GetFastLocalTime().GetTimeT()) + ':' +
614  }
615 
617 
618  // If not working in WN compatible mode
619  if (!(m_Mode & fConfigLoading)) break;
620  } while (loader(this));
621 }
622 
624 {
625  if (m_NonWn) {
626  string version_info(connection.Exec(m_Auth, false));
627 
628  CNetServerInfo server_info(new SNetServerInfoImpl(version_info));
629 
630  string attr_name, attr_value;
631  string ns_node, ns_session;
633 
634  while (server_info.GetNextAttribute(attr_name, attr_value))
635  if (attr_name == "ns_node")
636  ns_node = attr_value;
637  else if (attr_name == "ns_session")
638  ns_session = attr_value;
639  else if (attr_name == "server_version")
640  version = CVersionInfo(attr_value);
641 
642  // Usually, all attributes come together, so no need to check version
643  if (!ns_node.empty() && !ns_session.empty()) {
644  auto server_props = connection->m_Server->Get<SNetScheduleServerProperties>();
645 
646  // Version cannot change without session, so no need to compare, too
647  if (server_props->ns_node != ns_node ||
648  server_props->ns_session != ns_session) {
650  server_props->ns_node = ns_node;
651  server_props->ns_session = ns_session;
652  server_props->version = version;
653  m_SharedData->m_ServerByNode[ns_node] = connection->m_Server->m_ServerInPool;
654  server_props->affs_synced = false;
655  }
656  }
657 
658  if (!m_Scope.empty()) {
659  string cmd("SETSCOPE " + m_Scope);
661  connection.Exec(cmd, false);
662  }
663  } else
664  connection->WriteLine(m_Auth);
665 }
666 
668  const string& err_msg, CNetServer& server)
669 {
670  string code;
671  string msg;
672 
673  if (!NStr::SplitInTwo(err_msg, ":", code, msg)) {
674  if (err_msg == "Job not found") {
675  NCBI_THROW(CNetScheduleException, eJobNotFound, err_msg);
676  }
677  code = err_msg;
678  }
679 
680  // Map code into numeric value
682 
683  switch (err_code) {
685  NCBI_THROW(CNetServiceException, eCommunicationError, err_msg);
686 
690  // Convert these errors into warnings.
691  OnWarning(msg, server);
692  break;
693 
695  NCBI_THROW(CNetScheduleException, eJobNotFound, "Job not found");
696 
697  default:
698  NCBI_THROW(CNetScheduleException, EErrCode(err_code), !msg.empty() ?
700  }
701 }
702 
703 void CNetScheduleServerListener::OnWarningImpl(const string& warn_msg,
704  CNetServer& server)
705 {
707  ": " << warn_msg);
708 }
709 
710 const char* const kNetScheduleAPIDriverName = "netschedule_api";
711 
712 SNetScheduleAPIImpl::SNetScheduleAPIImpl(CSynRegistryBuilder registry_builder, const string& section,
713  const string& service_name, const string& client_name,
714  const string& queue_name, bool wn, bool try_config) :
715  m_Mode(GetMode(wn, try_config)),
716  m_SharedData(new SNetScheduleSharedData),
717  m_Queue(queue_name)
718 {
719  SRegSynonyms sections{ section, kNetScheduleAPIDriverName };
720  m_Service = SNetServiceImpl::Create("NetScheduleAPI", service_name, client_name,
722  registry_builder, sections);
723  Init(registry_builder, sections);
724 }
725 
727  SNetServerInPool* server, SNetScheduleAPIImpl* parent) :
728  m_Mode(parent->m_Mode),
729  m_SharedData(parent->m_SharedData),
730  m_RetryOnException(parent->m_RetryOnException),
731  m_Service(SNetServiceImpl::Clone(server, parent->m_Service)),
732  m_Queue(parent->m_Queue),
733  m_ProgramVersion(parent->m_ProgramVersion),
734  m_ClientNode(parent->m_ClientNode),
735  m_ClientSession(parent->m_ClientSession),
736  m_AffinityPreference(parent->m_AffinityPreference),
737  m_UseEmbeddedStorage(parent->m_UseEmbeddedStorage)
738 {
739 }
740 
742  const string& conf_section /* = kEmptyStr */) :
743  m_Impl(new SNetScheduleAPIImpl(nullptr, conf_section))
744 {
745 }
746 
748  const string& conf_section) :
749  m_Impl(new SNetScheduleAPIImpl(reg, conf_section))
750 {
751 }
752 
753 CNetScheduleAPI::CNetScheduleAPI(CConfig* conf, const string& conf_section) :
754  m_Impl(new SNetScheduleAPIImpl(conf, conf_section))
755 {
756 }
757 
758 CNetScheduleAPI::CNetScheduleAPI(const string& service_name,
759  const string& client_name, const string& queue_name) :
760  m_Impl(new SNetScheduleAPIImpl(nullptr, kEmptyStr, service_name, client_name, queue_name))
761 {
762 }
763 
765 {
766  m_Impl->m_ProgramVersion = pv;
767 
768  m_Impl->UpdateAuthString();
769 }
770 
772 {
773  return m_Impl->m_ProgramVersion;
774 }
775 
776 const string& CNetScheduleAPI::GetQueueName() const
777 {
778  return m_Impl->m_Queue;
779 }
780 
782 {
783  switch(status) {
784  case eJobNotFound: return "NotFound";
785  case ePending: return "Pending";
786  case eRunning: return "Running";
787  case eCanceled: return "Canceled";
788  case eFailed: return "Failed";
789  case eDone: return "Done";
790  case eReading: return "Reading";
791  case eConfirmed: return "Confirmed";
792  case eReadFailed: return "ReadFailed";
793  case eDeleted: return "Deleted";
794 
795  default: _ASSERT(0);
796  }
797  return kEmptyStr;
798 }
799 
802 {
803  if (NStr::CompareNocase(status_str, "Pending") == 0)
804  return ePending;
805  if (NStr::CompareNocase(status_str, "Running") == 0)
806  return eRunning;
807  if (NStr::CompareNocase(status_str, "Canceled") == 0)
808  return eCanceled;
809  if (NStr::CompareNocase(status_str, "Failed") == 0)
810  return eFailed;
811  if (NStr::CompareNocase(status_str, "Done") == 0)
812  return eDone;
813  if (NStr::CompareNocase(status_str, "Reading") == 0)
814  return eReading;
815  if (NStr::CompareNocase(status_str, "Confirmed") == 0)
816  return eConfirmed;
817  if (NStr::CompareNocase(status_str, "ReadFailed") == 0)
818  return eReadFailed;
819  if (NStr::CompareNocase(status_str, "Deleted") == 0)
820  return eDeleted;
821 
822  return eJobNotFound;
823 }
824 
825 #define EXTRACT_WARNING_TYPE(warning_type) \
826  if (NStr::StartsWith(warn_msg, "e" #warning_type ":")) { \
827  warn_msg.erase(0, sizeof("e" #warning_type ":") - 1); \
828  return eWarn##warning_type; \
829  }
830 
833 {
834  EXTRACT_WARNING_TYPE(AffinityNotFound);
835  EXTRACT_WARNING_TYPE(AffinityNotPreferred);
836  EXTRACT_WARNING_TYPE(AffinityAlreadyPreferred);
837  EXTRACT_WARNING_TYPE(GroupNotFound);
838  EXTRACT_WARNING_TYPE(JobNotFound);
839  EXTRACT_WARNING_TYPE(JobAlreadyCanceled);
840  EXTRACT_WARNING_TYPE(JobAlreadyDone);
841  EXTRACT_WARNING_TYPE(JobAlreadyFailed);
842  EXTRACT_WARNING_TYPE(JobPassportOnlyMatch);
843  EXTRACT_WARNING_TYPE(NoParametersChanged);
844  EXTRACT_WARNING_TYPE(ConfigFileNotChanged);
845  EXTRACT_WARNING_TYPE(AlertNotFound);
846  EXTRACT_WARNING_TYPE(AlertAlreadyAcknowledged);
847  EXTRACT_WARNING_TYPE(SubmitsDisabledForServer);
848  EXTRACT_WARNING_TYPE(QueueAlreadyPaused);
849  EXTRACT_WARNING_TYPE(QueueNotPaused);
850  EXTRACT_WARNING_TYPE(CommandObsolete);
851  EXTRACT_WARNING_TYPE(JobNotRead);
852  return eWarnUnknown;
853 }
854 
855 #define WARNING_TYPE_TO_STRING(warning_type) \
856  case eWarn##warning_type: \
857  return #warning_type;
858 
861 {
862  switch (warning_type) {
863  WARNING_TYPE_TO_STRING(AffinityNotFound);
864  WARNING_TYPE_TO_STRING(AffinityNotPreferred);
865  WARNING_TYPE_TO_STRING(AffinityAlreadyPreferred);
866  WARNING_TYPE_TO_STRING(GroupNotFound);
867  WARNING_TYPE_TO_STRING(JobNotFound);
868  WARNING_TYPE_TO_STRING(JobAlreadyCanceled);
869  WARNING_TYPE_TO_STRING(JobAlreadyDone);
870  WARNING_TYPE_TO_STRING(JobAlreadyFailed);
871  WARNING_TYPE_TO_STRING(JobPassportOnlyMatch);
872  WARNING_TYPE_TO_STRING(NoParametersChanged);
873  WARNING_TYPE_TO_STRING(ConfigFileNotChanged);
874  WARNING_TYPE_TO_STRING(AlertNotFound);
875  WARNING_TYPE_TO_STRING(AlertAlreadyAcknowledged);
876  WARNING_TYPE_TO_STRING(SubmitsDisabledForServer);
877  WARNING_TYPE_TO_STRING(QueueAlreadyPaused);
878  WARNING_TYPE_TO_STRING(QueueNotPaused);
879  WARNING_TYPE_TO_STRING(CommandObsolete);
880  WARNING_TYPE_TO_STRING(JobNotRead);
881  default:
882  return "eWarnUnknown";
883  }
884 }
885 
887 {
888  return new SNetScheduleSubmitterImpl(m_Impl);
889 }
890 
892 {
893  return new SNetScheduleExecutorImpl(m_Impl);
894 }
895 
897  const string& affinity)
898 {
899  m_Impl->AllocNotificationThread();
900  return new SNetScheduleJobReaderImpl(m_Impl, group, affinity);
901 }
902 
904 {
905  return new SNetScheduleAdminImpl(m_Impl);
906 }
907 
909 {
910  return m_Impl->m_Service;
911 }
912 
913 static void s_SetJobExpTime(time_t* job_exptime, const string& time_str)
914 {
915  if (job_exptime != NULL)
916  *job_exptime = (time_t) NStr::StringToUInt8(time_str,
918 }
919 
921  const string& mode_str)
922 {
923  if (pause_mode != NULL)
924  *pause_mode = mode_str.empty() ? eNSQ_NoPause :
925  mode_str == "pullback" ? eNSQ_WithPullback :
927 }
928 
930  CNetScheduleJob& job,
931  time_t* job_exptime,
932  ENetScheduleQueuePauseMode* pause_mode)
933 {
934  string cmd("STATUS2 " + job.job_id);
936  cmd += " need_progress_msg=1";
937  auto response = m_Impl->ExecOnJobServer(job, cmd);
938 
939  SNetScheduleOutputParser parser(response);
940 
941  const auto status = StringToStatus(parser("job_status"));
942 
943  s_SetJobExpTime(job_exptime, parser("job_exptime"));
944  s_SetPauseMode(pause_mode, parser("pause"));
945 
946  switch (status) {
947  case ePending:
948  case eRunning:
949  case eCanceled:
950  case eFailed:
951  case eDone:
952  case eReading:
953  case eConfirmed:
954  case eReadFailed:
955  job.input = parser("input");
956  job.output = parser("output");
957  job.ret_code = NStr::StringToInt(parser("ret_code"), NStr::fConvErr_NoThrow);
958  job.error_msg = parser("err_msg");
959  break;
960 
961  default:
962  job.input.erase();
963  job.ret_code = 0;
964  job.output.erase();
965  job.error_msg.erase();
966  }
967 
968  job.affinity.erase();
970  job.progress_msg = parser("msg");
971 
972  return status;
973 }
974 
976  const CNetScheduleJob& job, time_t* job_exptime,
977  ENetScheduleQueuePauseMode* pause_mode)
978 {
979  string response;
980 
981  try {
982  cmd += ' ';
983  cmd += job.job_id;
985  response = ExecOnJobServer(job, cmd);
986  }
987  catch (CNetScheduleException& e) {
989  throw;
990 
991  if (job_exptime != NULL)
992  *job_exptime = 0;
993 
995  }
996 
997  SNetScheduleOutputParser parser(response);
998 
999  s_SetJobExpTime(job_exptime, parser("job_exptime"));
1000  s_SetPauseMode(pause_mode, parser("pause"));
1001 
1002  return CNetScheduleAPI::StringToStatus(parser("job_status"));
1003 }
1004 
1005 bool SNetScheduleAPIImpl::GetServerByNode(const string& ns_node,
1006  CNetServer* server)
1007 {
1008  SNetServerInPool* known_server; /* NCBI_FAKE_WARNING */
1009 
1010  {{
1012 
1013  auto server_props_it = m_SharedData->m_ServerByNode.find(ns_node);
1014 
1015  if (server_props_it == m_SharedData->m_ServerByNode.end())
1016  return false;
1017 
1018  known_server = server_props_it->second;
1019  }}
1020 
1021  *server = new SNetServerImpl(m_Service,
1022  m_Service->m_ServerPool->ReturnServer(known_server));
1023 
1024  return true;
1025 }
1026 
1029 {
1031 
1032  if (m_AskCount-- > 0) return m_ServerParams;
1033 
1035 
1038 
1039  string cmd("QINF2 " + queue);
1041 
1042  CUrlArgs url_parser(service.FindServerAndExec(cmd, false).response);
1043 
1044  enum {
1045  eMaxInputSize,
1046  eMaxOutputSize,
1047  eNumberOfSizeParams
1048  };
1049 
1050  int field_bits = 0;
1051 
1052  ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
1053  if (field->name[0] == 'm') {
1054  if (field->name == "max_input_size") {
1055  field_bits |= (1 << eMaxInputSize);
1057  NStr::StringToInt(field->value);
1058  } else if (field->name == "max_output_size") {
1059  field_bits |= (1 << eMaxOutputSize);
1061  NStr::StringToInt(field->value);
1062  }
1063  }
1064  if (field_bits == (1 << eNumberOfSizeParams) - 1)
1065  break;
1066  }
1067 
1068  return m_ServerParams;
1069 }
1070 
1072 {
1073  return m_Impl->GetServerParams();
1074 }
1075 
1077  const string& queue_name, TQueueParams& queue_params)
1078 {
1079  string cmd("QINF2 ");
1080 
1081  if (!queue_name.empty()) {
1082  limits::Check<limits::SQueueName>(queue_name);
1083 
1084  cmd += queue_name;
1085  } else if (!m_Queue.empty()) {
1086  cmd += m_Queue;
1087  } else {
1088  cmd += "service=" + m_Service->m_ServiceName;
1089  }
1090 
1092 
1093  CUrlArgs url_parser(m_Service.FindServerAndExec(cmd, false).response);
1094 
1095  ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
1096  queue_params[field->name] = field->value;
1097  }
1098 }
1099 
1101  const string& queue_name, TQueueParams& queue_params)
1102 {
1103  return m_Impl->GetQueueParams(queue_name, queue_params);
1104 }
1105 
1107 {
1108  string cmd("GETP2");
1110 
1112  false).response);
1113 
1114  ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
1115  queue_params[field->name] = field->value;
1116  }
1117 }
1118 
1120 {
1121  return m_Impl->GetQueueParams(queue_params);
1122 }
1123 
1125 {
1126  string cmd("MGET " + job.job_id);
1128  auto response = m_Impl->ExecOnJobServer(job, cmd);
1129  job.progress_msg = NStr::ParseEscapes(response);
1130 }
1131 
1132 void CNetScheduleAPI::SetClientNode(const string& client_node)
1133 {
1134  // Cannot add this to limits::SClientNode due to CNetScheduleAPIExt allowing reset to empty values
1135  if (client_node.empty()) {
1136  NCBI_THROW_FMT(CConfigException, eParameterMissing,
1137  "'" << limits::SClientNode::Name() << "' cannot be empty");
1138  }
1139 
1140  limits::Check<limits::SClientNode>(client_node);
1141 
1142  m_Impl->m_ClientNode = client_node;
1143 
1144  m_Impl->UpdateAuthString();
1145 }
1146 
1147 void CNetScheduleAPI::SetClientSession(const string& client_session)
1148 {
1149  // Cannot add this to limits::SClientSession due to CNetScheduleAPIExt allowing reset to empty values
1150  if (client_session.empty()) {
1151  NCBI_THROW_FMT(CConfigException, eParameterMissing,
1152  "'" << limits::SClientSession::Name() << "' cannot be empty");
1153  }
1154 
1155  limits::Check<limits::SClientSession>(client_session);
1156 
1157  m_Impl->m_ClientSession = client_session;
1158 
1159  m_Impl->UpdateAuthString();
1160 }
1161 
1163 {
1165 
1167 }
1168 
1170 {
1171  m_Impl->m_ClientType = client_type;
1172 
1173  m_Impl->UpdateAuthString();
1174 }
1175 
1177 {
1179 
1180  UpdateAuthString();
1181 }
1182 
1183 void SNetScheduleAPIImpl::SetAuthParam(const string& param_name,
1184  const string& param_value)
1185 {
1186  if (!param_value.empty()) {
1187  string auth_param(' ' + param_name);
1188  auth_param += "=\"";
1189  auth_param += NStr::PrintableString(param_value);
1190  auth_param += '"';
1191  m_AuthParams[param_name] = auth_param;
1192  } else
1193  m_AuthParams.erase(param_name);
1194 
1195  UpdateAuthString();
1196 }
1197 
1199 {
1200  const bool claim_new_affinities = registry.Get(sections, "claim_new_affinities", false);
1201  const bool process_any_job = registry.Get(sections, "process_any_job", false);
1202  const string affinity_list = registry.Get(sections, "affinity_list", kEmptyStr);
1203  const string affinity_ladder = registry.Get(sections, "affinity_ladder", kEmptyStr);
1204 
1205  if (affinity_ladder.empty()) {
1206 
1207  if (claim_new_affinities) {
1209 
1210  } else if (process_any_job) {
1212 
1213  } else {
1215  }
1216 
1217  if (affinity_list.empty()) return;
1218 
1219  NStr::Split(affinity_list, ", ", m_AffinityList,
1221 
1222  for (auto& affinity : m_AffinityList) {
1223  limits::Check<limits::SAffinity>(affinity);
1224  }
1225 
1226  return;
1227  }
1228 
1229  // Sanity checks
1230  if (claim_new_affinities) {
1231  NCBI_THROW(CConfigException, eInvalidParameter,
1232  "'affinity_ladder' cannot be used with 'claim_new_affinities'");
1233  }
1234  if (!affinity_list.empty()) {
1235  NCBI_THROW(CConfigException, eInvalidParameter,
1236  "'affinity_ladder' cannot be used with 'affinity_list'");
1237  }
1238 
1239  if (!process_any_job) {
1241  }
1242 
1243  list<CTempString> affinities;
1244  NStr::Split(affinity_ladder, ", ", affinities,
1246 
1247  if (affinities.empty()) return;
1248 
1249  string affinity_step;
1250 
1251  for (auto& affinity : affinities) {
1252  limits::Check<limits::SAffinity>(affinity);
1253 
1254  if (!affinity_step.empty()) affinity_step += ',';
1255  affinity_step += affinity;
1256  m_AffinityLadder.emplace_back(affinity, affinity_step);
1257  }
1258 }
1259 
1260 ///////////////////////////////////////////////////////////////////////////////
1261 
1262 /// @internal
1263 class CNetScheduleAPICF : public IClassFactory<SNetScheduleAPIImpl>
1264 {
1265 public:
1266 
1273 
1274  /// Construction
1275  ///
1276  /// @param driver_name
1277  /// Driver name string
1278  /// @param patch_level
1279  /// Patch level implemented by the driver.
1280  /// By default corresponds to interface patch level.
1281  CNetScheduleAPICF(const string& driver_name = kNetScheduleAPIDriverName,
1282  int patch_level = -1)
1283  : m_DriverVersionInfo
1284  (ncbi::CInterfaceVersion<IFace>::eMajor,
1285  ncbi::CInterfaceVersion<IFace>::eMinor,
1286  patch_level >= 0 ?
1287  patch_level : ncbi::CInterfaceVersion<IFace>::ePatchLevel),
1288  m_DriverName(driver_name)
1289  {
1290  _ASSERT(!m_DriverName.empty());
1291  }
1292 
1293  /// Create instance of TDriver
1294  virtual TInterface*
1295  CreateInstance(const string& driver = kEmptyStr,
1297  const TPluginManagerParamTree* params = 0) const
1298  {
1299  if (params && (driver.empty() || driver == m_DriverName) &&
1302  CConfig config(params);
1303  return new SNetScheduleAPIImpl(&config, m_DriverName);
1304  }
1305  return NULL;
1306  }
1307 
1308  void GetDriverVersions(TDriverList& info_list) const
1309  {
1310  info_list.push_back(TDriverInfo(m_DriverName, m_DriverVersionInfo));
1311  }
1312 
1313 protected:
1316 };
1317 
1318 
1322 {
1324  NCBI_EntryPointImpl(info_list, method);
1325 
1326 }
1327 
1328 
1330 {
1331  string& client_node(m_Impl->m_ClientNode);
1332  client_node += ':';
1333  client_node += data;
1334  UpdateAuthString();
1335 }
1336 
1338 {
1339  m_Impl->UpdateAuthString();
1340 }
1341 
1343 {
1344  m_Impl->UseOldStyleAuth();
1345 }
1346 
1348 {
1349  return m_Impl->m_CompoundIDPool;
1350 }
1351 
1353 {
1354  return new SNetScheduleAPIImpl(server->m_ServerInPool, m_Impl);
1355 }
1356 
1357 void CNetScheduleAPIExt::ReSetClientNode(const string& client_node)
1358 {
1359  limits::Check<limits::SClientNode>(client_node);
1360  m_Impl->m_ClientNode = client_node;
1361  m_Impl->UpdateAuthString();
1362 }
1363 
1364 void CNetScheduleAPIExt::ReSetClientSession(const string& client_session)
1365 {
1366  limits::Check<limits::SClientSession>(client_session);
1367  m_Impl->m_ClientSession = client_session;
1368  m_Impl->UpdateAuthString();
1369 }
1370 
1372 CNetScheduleAPIExt::CreateWnCompat(const string& service_name,
1373  const string& client_name)
1374 {
1375  return new SNetScheduleAPIImpl(nullptr, kEmptyStr, service_name, client_name, kEmptyStr, true, false);
1376 }
1377 
1379 CNetScheduleAPIExt::CreateNoCfgLoad(const string& service_name,
1380  const string& client_name, const string& queue_name)
1381 {
1382  return new SNetScheduleAPIImpl(nullptr, kEmptyStr, service_name, client_name, queue_name, false, false);
1383 }
1384 
1385 
Pool of recycled CCompoundID objects.
CConfigException –.
Definition: ncbi_config.hpp:53
CDatagramSocket::
CDeadline.
Definition: ncbitime.hpp:1830
CInterfaceVersion<> –.
CMemoryRegistry –.
Definition: ncbireg.hpp:584
static CNcbiApplication * Instance(void)
Singleton method.
Definition: ncbiapp.cpp:264
IClassFactory< SNetScheduleAPIImpl > TParent
CVersionInfo m_DriverVersionInfo
virtual TInterface * CreateInstance(const string &driver=kEmptyStr, CVersionInfo version=NCBI_INTERFACE_VERSION(IFace), const TPluginManagerParamTree *params=0) const
Create instance of TDriver.
TParent::TDriverList TDriverList
SNetScheduleAPIImpl IFace
void GetDriverVersions(TDriverList &info_list) const
Versions of the interface exported by the factory.
TParent::SDriverInfo TDriverInfo
CNetScheduleAPICF(const string &driver_name=kNetScheduleAPIDriverName, int patch_level=-1)
Construction.
SNetScheduleAPIImpl TDriver
Client API for NCBI NetSchedule server.
CNetScheduleConfigLoader(CSynRegistry &registry, SRegSynonyms &sections, bool ns_conf=true)
enum CNetScheduleConfigLoader::@987 m_Mode
bool Transform(const string &prefix, string &name) const
bool operator()(SNetScheduleAPIImpl *impl)
const SRegSynonyms & m_Sections
NetSchedule internal exception.
Smart pointer to a part of the NetSchedule API that does job retrieval and processing on the worker n...
Smart pointer to a part of the NetSchedule API that allows to retrieve completed jobs.
bool WaitForNotification(const CDeadline &deadline, string *server_host=NULL)
SNetScheduleNotificationReceiver m_Receiver
bool ReceiveNotification(string *server_host=NULL)
CRef< SNetScheduleSharedData > m_SharedData
void OnConnected(CNetServerConnection &connection) override
void OnErrorImpl(const string &err_msg, CNetServer &server) override
void OnWarningImpl(const string &warn_msg, CNetServer &server) override
CNetScheduleServerListener(bool non_wn, SNetScheduleSharedData *shared_data)
TPropCreator GetPropCreator() const override
void SetAuthString(const string &auth)
INetServerConnectionListener * Clone() override
Smart pointer to the job submission part of the NetSchedule API.
string Exec(const string &cmd, bool multiline_output=false, const STimeout *timeout=NULL)
Execute remote command 'cmd', wait for the reply, check that it starts with 'OK:',...
bool GetNextAttribute(string &attr_name, string &attr_value)
Return the next attribute.
Net Service exception.
CNetServer::SExecResult FindServerAndExec(const string &cmd, bool multiline_output=false)
bool IsLoadBalanced() const
CNetServiceIterator Iterate(EIterationMode mode=eSortByLoad)
Net Service exception.
CPluginManager<> –.
CRef –.
Definition: ncbiobj.hpp:618
void Add(const IRegistry &registry)
TR< TType >::T Get(const SRegSynonyms &sections, SRegSynonyms names, TType default_value)
bool Has(const SRegSynonyms &sections, SRegSynonyms names)
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
definition of a Culling tree
Definition: ncbi_tree.hpp:100
CUrlArgs::
Definition: ncbi_url.hpp:240
CUrlParserException –.
Definition: ncbi_url.hpp:539
CVersionInfo –.
IClassFactory<> –.
IRegistry –.
Definition: ncbireg.hpp:73
void erase(iterator pos)
Definition: map.hpp:167
const_iterator end() const
Definition: map.hpp:152
const_iterator find(const key_type &key) const
Definition: map.hpp:153
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
const_iterator begin() const
Definition: set.hpp:135
parent_type::iterator iterator
Definition: set.hpp:80
bool empty() const
Definition: set.hpp:133
void erase(iterator pos)
Definition: set.hpp:151
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
struct config config
static CS_COMMAND * cmd
Definition: ct_dynamic.c:26
#define false
Definition: bool.h:36
static SQLCHAR output[256]
Definition: print.c:5
static const TDS_WORD limits[]
Definition: num_limits.h:85
char data[12]
Definition: iconv.c:80
static CNcbiApplicationGuard InstanceGuard(void)
Singleton method.
Definition: ncbiapp.cpp:133
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
@ eOn
Definition: ncbi_types.h:111
#define NULL
Definition: ncbistd.hpp:225
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
Definition: ncbicntr.hpp:278
const string & GetHost(void) const
Get host name.
Definition: ncbidiag.cpp:1766
string GetStringUID(TUID uid=0) const
Return string representation of UID.
Definition: ncbidiag.cpp:1691
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:818
static TPID GetPID(void)
Get cached PID (read real PID if not cached yet).
Definition: ncbidiag.cpp:1526
void SetDiagUserAndHost(TDiagUserAndHost flags=fDiag_AddUser|fDiag_AddHost)
Set username and hostname properties for the diag context.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
int TErrCode
Definition: ncbiexpt.hpp:889
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
Definition: ncbiexpt.hpp:719
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
Definition: ncbiexpt.cpp:342
@ eInvalid
To be used ONLY as a return value; please, NEVER throw an exception with this code.
Definition: ncbiexpt.hpp:885
EJobStatus
Job status codes.
CNetScheduleSubmitter GetSubmitter()
Create an instance of CNetScheduleSubmitter.
string output
Job result data.
void GetQueueParams(const string &queue_name, TQueueParams &queue_params)
static const char * GetErrCodeDescription(CException::TErrCode err_code)
EJobStatus GetJobDetails(CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get job details.
void SetClientSession(const string &client_session)
static ENetScheduleWarningType ExtractWarningType(string &warn_msg)
CNetScheduleJobReader GetJobReader(const string &group=kEmptyStr, const string &affinity=kEmptyStr)
Create an instance of CNetScheduleJobReader.
@ eNSQ_NoPause
@ eNSQ_WithPullback
@ eNSQ_WithoutPullback
CNetScheduleAPI::TJobMask mask
int ret_code
Job return code.
static string StatusToString(EJobStatus status)
Printable status type.
const string & GetProgramVersion() const
Get program version string.
string input
Input data.
CNetRef< SNetScheduleAPIImpl > m_Impl
const string & GetQueueName() const
Return Queue name.
static const char * WarningTypeToString(ENetScheduleWarningType warning_type)
void GetProgressMsg(CNetScheduleJob &job)
Update the progress_message field of the job structure.
EAppRegistry
Defines how this object must be initialized.
void SetClientType(EClientType client_type)
const SServerParams & GetServerParams()
CNetService GetService()
void SetProgramVersion(const string &pv)
Set program version (like: MyProgram v.
CNetScheduleExecutor GetExecutor()
Create an instance of CNetScheduleExecutor.
static EJobStatus StringToStatus(const CTempString &status_str)
Parse status string into enumerator value.
CNetScheduleAdmin GetAdmin()
static CException::TErrCode GetCode(const string &name)
string job_id
Output job key.
void SetClientNode(const string &client_node)
@ eDeleted
The job has been wiped out of the database.
@ eDone
Job is ready (computed successfully)
@ eConfirmed
Final state - read confirmed.
@ eReading
Job has its output been reading.
@ eCanceled
Explicitly canceled.
@ eRunning
Running on a worker node.
@ eJobNotFound
No such job.
@ ePending
Waiting for execution.
@ eReadFailed
Final state - read failed.
@ eFailed
Failed to run (execution timeout)
TObjectType & GetObject(void)
Get object.
Definition: ncbiobj.hpp:1011
bool Empty(void) const THROWS_NONE
Check if CRef is empty – not pointing to any object, which means having a null value.
Definition: ncbiobj.hpp:719
static void NCBI_EntryPointImpl(TDriverInfoList &info_list, EEntryPointRequest method)
Entry point implementation.
#define NCBI_INTERFACE_VERSION(iface)
Macro to construct CVersionInfo class using interface name (relies on CInterfaceVersion class)
virtual const string & Get(const string &section, const string &name, TFlags flags=0) const
Get the parameter value.
Definition: ncbireg.cpp:262
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
EIO_Status Wait(const STimeout *timeout=kInfiniteTimeout)
EIO_Status Bind(unsigned short port)
EIO_Status Recv(void *buf, size_t buflen, size_t *msglen=0, string *sender_host=0, unsigned short *sender_port=0, size_t maxmsglen=0)
ESwitch SetDataLogging(ESwitch log=eOn)
EIO_Status SetTimeout(EIO_Event event, const STimeout *timeout)
Set timeout for I/O in the specified direction.
EIO_Status Send(const void *data, size_t datalen, const string &host=string(), unsigned short port=0)
unsigned short GetLocalPort(ENH_ByteOrder byte_order, bool trueport=false) const
Get socket local port number.
@ eNH_HostByteOrder
Definition: ncbi_socket.h:193
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
Definition: ncbistr.cpp:3953
#define kEmptyStr
Definition: ncbistr.hpp:123
static int CompareNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive compare of a substring with another string.
Definition: ncbistr.cpp:219
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
Definition: ncbistr.cpp:630
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
Definition: ncbistr.cpp:3461
static string ParseEscapes(const CTempString str, EEscSeqRange mode=eEscSeqRange_Standard, char user_char='?')
Parse C-style escape sequences in the specified string.
Definition: ncbistr.cpp:4793
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
Definition: ncbistr.hpp:5109
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
Definition: ncbistr.hpp:5412
static Uint8 StringToUInt8(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to Uint8.
Definition: ncbistr.cpp:873
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3554
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
static int CompareCase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-sensitive compare of a substring with another string.
Definition: ncbistr.cpp:135
@ fConvErr_NoThrow
Do not throw an exception on error.
Definition: ncbistr.hpp:285
@ fSplit_Truncate
Definition: ncbistr.hpp:2501
@ fSplit_MergeDelimiters
Merge adjacent delimiters.
Definition: ncbistr.hpp:2498
bool Run(TRunMode flags=fRunDefault)
Run the thread.
Definition: ncbithr.cpp:724
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
Definition: ncbithr.cpp:958
bool TryWait(unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Timed wait.
Definition: ncbimtx.cpp:1844
void Post(unsigned int count=1)
Increment the semaphore by "count".
Definition: ncbimtx.cpp:1971
void Join(void **exit_data=0)
Wait for the thread termination.
Definition: ncbithr.cpp:863
CNanoTimeout GetRemainingTime(void) const
Get time left to the expiration.
Definition: ncbitime.cpp:3859
CTime GetFastLocalTime(void)
Quick and dirty getter of local time.
Definition: ncbitime.cpp:4167
void Get(unsigned int *sec, unsigned int *microsec) const
Get timeout in seconds and microseconds.
Definition: ncbitime.cpp:3545
TArgs::const_iterator const_iterator
Definition: ncbi_url.hpp:278
const TArgs & GetArgs(void) const
Get the const list of arguments.
Definition: ncbi_url.hpp:300
list< TArg > TArgs
Definition: ncbi_url.hpp:276
iterator FindFirst(const string &name)
Find the first argument with the given name.
Definition: ncbi_url.hpp:667
EIO_Status
I/O status.
Definition: ncbi_core.h:132
unsigned int usec
microseconds (modulo 1,000,000)
Definition: ncbi_types.h:78
const char * IO_StatusStr(EIO_Status status)
Get the text form of an enum status value.
Definition: ncbi_core.c:56
unsigned int sec
seconds
Definition: ncbi_types.h:77
@ eIO_Timeout
timeout expired before any I/O succeeded
Definition: ncbi_core.h:134
@ eIO_Success
everything is fine, no error occurred
Definition: ncbi_core.h:133
@ eIO_Read
read
Definition: ncbi_core.h:120
@ eNonCompatible
major, minor does not match
#define NCBI_XCONNECT_EXPORT
static int version
Definition: mdb_load.c:29
void ThrowIllegalChar(const string &name, const string &value, char c)
mdb_mode_t mode
Definition: lmdb++.h:38
Magic spell ;-) needed for some weird compilers... very empiric.
const GenericPointer< typename T::ValueType > T2 value
Definition: pointer.h:1227
#define nullptr
Definition: ncbimisc.hpp:45
static void s_SetJobExpTime(time_t *job_exptime, const string &time_str)
#define WARNING_TYPE_TO_STRING(warning_type)
#define EXTRACT_WARNING_TYPE(warning_type)
CTempString s_GetSection(bool ns_conf)
CUrlArgs s_CreateCUrlArgs(const string &output)
int g_ParseNSOutput(const string &attr_string, const char *const *attr_names, string *attr_values, size_t attr_count)
static void s_SetPauseMode(ENetScheduleQueuePauseMode *pause_mode, const string &mode_str)
void NCBI_EntryPoint_xnetscheduleapi(CPluginManager< SNetScheduleAPIImpl >::TDriverInfoList &info_list, CPluginManager< SNetScheduleAPIImpl >::EEntryPointRequest method)
#define COMPATIBLE_NETSCHEDULE_VERSION
const char *const kNetScheduleAPIDriverName
ENetScheduleQueuePauseMode
Defines whether the job queue is paused, and if so, defines the pause mode set by the administrator.
const unsigned int kNetScheduleMaxDBDataSize
void g_AppendClientIPSessionIDHitID(string &cmd)
static const char * prefix[]
Definition: pcregrep.c:405
static pcre_uint8 * buffer
Definition: pcretest.c:1051
Helper classes and templates to implement plugins.
void ReSetClientSession(const string &)
void AddToClientNode(const string &data)
void ReSetClientNode(const string &)
CCompoundIDPool GetCompoundIDPool()
static TInstance CreateWnCompat(const string &, const string &)
static TInstance CreateNoCfgLoad(const string &, const string &, const string &)
CNetScheduleAPI GetServer(CNetServer::TInstance server)
Extract one of the servers comprising this service as a separate NetSchedule API object.
Job description.
function< INetServerProperties *()> TPropCreator
void OnWarning(const string &warn_msg, CNetServer &server)
SCompoundIDObjectPool< SCompoundIDImpl, ECompoundIDClass > m_CompoundIDPool
const CNetScheduleAPI::SServerParams & operator()(CNetService &service, const string &queue)
CNetScheduleAPI::SServerParams m_ServerParams
CNetScheduleExecutor::EJobAffinityPreference m_AffinityPreference
CRef< SNetScheduleSharedData > m_SharedData
CNetScheduleGetJob::TAffinityLadder m_AffinityLadder
bool GetServerByNode(const string &ns_node, CNetServer *server)
CNetScheduleAPI::EClientType m_ClientType
CNetScheduleServerListener * GetListener()
CAtomicCounter_WithAutoInit m_NotificationThreadStartStopCounter
void SetAuthParam(const string &param_name, const string &param_value)
void Init(CSynRegistry &registry, SRegSynonyms &sections)
string ExecOnJobServer(const TJob &job, const string &cmd, ESwitch roe=eDefault)
void InitAffinities(CSynRegistry &registry, const SRegSynonyms &sections)
SNetScheduleAPIImpl(CSynRegistryBuilder registry_builder, const string &section, const string &service_name=kEmptyStr, const string &client_name=kEmptyStr, const string &queue_name=kEmptyStr, bool wn=false, bool try_config=true)
CNetScheduleAPI::EJobStatus GetJobStatus(string cmd, const CNetScheduleJob &job, time_t *job_exptime, ENetScheduleQueuePauseMode *pause_mode)
CRef< SNetScheduleNotificationThread > m_NotificationThread
void GetQueueParams(const string &queue_name, TQueueParams &queue_params)
bool operator()(string *server_host)
virtual void * Main()
Derived (user-created) class must provide a real thread function.
void CmdAppendPortAndTimeout(string *cmd, unsigned remaining_seconds)
SNetScheduleNotificationReceiver m_Receiver
ENotificationType CheckNotification(string *ns_node)
SNetScheduleNotificationThread(SNetScheduleAPIImpl *ns_api)
const string & operator()(const string &param) const
SNetScheduleOutputParser(const string &output)
void WriteLine(const string &line)
CRef< SNetServerInPool > m_ServerInPool
CRef< TProperties > Get()
CNetServer::SExecResult ConnectAndExec(const string &cmd, bool multiline_output, bool retry_on_exception=false)
CRef< SNetServerInPool > ReturnServer(SNetServerInPool *server_impl)
CNetServerPool m_ServerPool
static SNetServiceImpl * Create(const string &api_name, const string &service_name, const string &client_name, INetServerConnectionListener *listener, CSynRegistry &registry, SRegSynonyms &sections, const string &ns_client_name=kEmptyStr)
const string & GetClientName() const
void Insert(CTempString s)
bool GetNextNotification(string *ns_node)
void RegisterServer(const string &ns_node)
string AsString() const
Timeout structure.
Definition: ncbi_types.h:76
Definition: inftrees.h:24
#define _ASSERT
int g(Seg_Gsm *spe, Seq_Mtf *psm, Thd_Gsm *tdg)
Definition: thrddgri.c:44
Modified on Sun Jun 23 05:17:53 2024 by modify_doxy.py rev. 669887