NCBI C++ ToolKit
ns_cmds.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: ns_cmds.cpp 100724 2023-09-01 19:52:08Z lavr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitry Kazimirov
27  *
28  * File Description: NetSchedule-specific commands of the grid_cli application.
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "ns_cmd_impl.hpp"
35 #include "util.hpp"
36 
40 
41 #include <corelib/rwstream.hpp>
42 
43 #include <ctype.h>
44 
46 
49  CGridCommandLineInterfaceApp::EAdminCmdSeverity cmd_severity, bool require_queue)
50 {
51  if ((api_class == eNetScheduleSubmitter) || (api_class == eNetScheduleExecutor))
52  SetUp_NetCacheCmd(false, false, false);
53 
54  m_APIClass = api_class;
55 
56  string service = m_Opts.ns_service;
57  string queue = m_Opts.queue;
58 
59  const bool job_provided = IsOptionSet(eID) || IsOptionSet(eJobId);
60  const bool service_provided = IsOptionExplicitlySet(eNetSchedule);
61 
62  if (IsOptionSet(eWorkerNode)) {
64 
65  } else {
66  if (job_provided) {
68 
69  if (queue.empty())
70  queue = key.queue;
71 
72  if (!service_provided) {
73  key.host.push_back(':');
74  key.host.append(NStr::NumericToString(key.port));
75  service = key.host;
76  }
77  } else if (!IsOptionSet(eNetSchedule)) {
78  NCBI_THROW(CArgException, eNoValue, "'--" NETSCHEDULE_OPTION "' option is required.");
79 
80  } else if (!IsOptionSet(eQueue) && !IsOptionSet(eTargetQueueArg) && require_queue) {
81  NCBI_THROW(CArgException, eNoValue, "'--" QUEUE_OPTION "' option is required.");
82 
83  }
84 
86  }
87 
88  if (job_provided && service_provided) {
89  if (auto address = SSocketAddress::Parse(m_Opts.ns_service)) {
91  } else {
92  NCBI_THROW(CArgException, eInvalidArg,
93  "When job ID is given, '--" NETSCHEDULE_OPTION "' "
94  "must be a host:port server address.");
95  }
96  }
97 
98  if (m_AdminMode)
100 
101  auto warning_handler = [&](const string& m, CNetServer s) {
102  return OnWarning(m_APIClass != eWorkerNodeAdmin, m, s);
103  };
104 
105  m_NetScheduleAPI.GetService().SetWarningHandler(warning_handler);
106 
107  if (IsOptionSet(eCompatMode)) {
109  }
110 
111  // Specialize NetSchedule API.
112  switch (api_class) {
113  case eNetScheduleAPI:
114  break;
115  case eNetScheduleAdmin:
116  if (cmd_severity != eReadOnlyAdminCmd) {
117  if (!service_provided) {
119  "' must be explicitly specified.");
120  }
122  NCBI_THROW(CArgException, eNoValue, "'--" QUEUE_OPTION
123  "' must be specified explicitly (not via $"
124  LOGIN_TOKEN_ENV ").");
125  }
126  }
127  /* FALL THROUGH */
128  case eWorkerNodeAdmin:
130  break;
133 
134  if (!IsOptionSet(eLoginToken) &&
136  NCBI_THROW(CArgException, eNoArg, "client identification required "
137  "(see the '" LOGIN_COMMAND "' command).");
138  }
139 
140  if (IsOptionSet(eJobGroup))
142 
143  /* FALL THROUGH */
146 
150  break;
151  default:
152  _ASSERT(0);
153  break;
154  }
155 
156  if (IsOptionSet(eClientNode)) {
158  }
159 
162  }
163 }
164 
167 {
168  string job_status(CNetScheduleAPI::StatusToString(status));
169 
170  if (m_Opts.output_format == eJSON)
171  printf("{\"status\": \"%s\"}\n", job_status.c_str());
172  else
173  PrintLine(job_status);
174 }
175 
177 {
179 
183  if (IsOptionSet(eStatusOnly)) {
184  JobInfo_PrintStatus(status);
185  return 0;
186  }
187  } else if (IsOptionSet(eStatusOnly)) {
188  CNetScheduleJob job;
189  job.job_id = m_Opts.id;
191  return 0;
192  }
193 
195  CNetScheduleJob job;
196  job.job_id = m_Opts.id;
198  if (m_Opts.output_format == eJSON)
199  printf("{\"progress_message\": \"%s\"}\n",
200  job.progress_msg.c_str());
201  else
202  if (!job.progress_msg.empty())
203  PrintLine(job.progress_msg);
204  return 0;
205  }
206 
207  switch (m_Opts.output_format) {
208  case eRaw:
209  if (IsOptionSet(eBrief)) {
210  fprintf(stderr, GRID_APP_NAME " " JOBINFO_COMMAND ": option '--"
211  BRIEF_OPTION "' cannot be used with '"
212  RAW_OUTPUT_FORMAT "' output format.\n");
213  return 2;
214  }
216  break;
217 
218  case eJSON:
219  {
220  CJobInfoToJSON job_info_to_json;
222  &job_info_to_json, !IsOptionSet(eBrief), m_CompoundIDPool);
223  g_PrintJSON(stdout, job_info_to_json.GetRootNode());
224  }
225  break;
226 
227  default:
228  {
229  CPrintJobInfo print_job_info;
231  &print_job_info, !IsOptionSet(eBrief), m_CompoundIDPool);
232  }
233  }
234 
235  return 0;
236 }
237 
239 {
240 public:
241  CBatchSubmitAttrParser(istream* input_stream) :
242  m_InputStream(input_stream),
243  m_LineNumber(0)
244  {
245  }
246  bool NextLine();
247  bool NextAttribute();
249  const string& GetAttributeValue() const {return m_JobAttributeValue;}
250  size_t GetLineNumber() const {return m_LineNumber;}
251 
252 private:
253  istream* m_InputStream;
254  size_t m_LineNumber;
255  string m_Line;
259 };
260 
262 {
263  if (m_InputStream == NULL)
264  return false;
265 
266  ++m_LineNumber;
267 
268  getline(*m_InputStream, m_Line);
269 
270  if (m_InputStream->fail()) {
272  return false;
273  }
274 
275  if (m_InputStream->eof()) {
277  if (m_Line.empty())
278  return false;
279  }
280 
282  return true;
283 }
284 
286 {
288 
289 #define ATTR_CHECK_SET(name, type) \
290  if (attr_name.length() == sizeof(name) - 1 && \
291  memcmp(attr_name.data(), name, sizeof(name) - 1) == 0) { \
292  m_JobAttribute = type; \
293  break; \
294  }
295 
296  CTempString attr_name;
297  size_t attr_column;
298 
299  CAttrListParser::ENextAttributeType next_attr_type =
300  m_AttrParser.NextAttribute(&attr_name,
301  &m_JobAttributeValue, &attr_column);
302 
303  if (next_attr_type == CAttrListParser::eNoMoreAttributes)
304  return false;
305 
306  switch (attr_name[0]) {
307  case 'i':
308  ATTR_CHECK_SET("input", eInput);
309  break;
310  case 'a':
312  ATTR_CHECK_SET("affinity", eAffinity);
313  break;
314  case 'e':
315  ATTR_CHECK_SET("exclusive", eExclusiveJob);
316  }
317 
318 #define ATTR_POS " at line " << m_LineNumber << ", column " << attr_column
319 
320  switch (m_JobAttribute) {
321  case eUntypedArg:
322  NCBI_THROW_FMT(CArgException, eInvalidArg,
323  "unknown attribute " << attr_name << ATTR_POS);
324 
325  case eExclusiveJob:
326  break;
327 
328  default:
329  if (next_attr_type != CAttrListParser::eAttributeWithValue) {
330  NCBI_THROW_FMT(CArgException, eInvalidArg,
331  "attribute " << attr_name << " requires a value" ATTR_POS);
332  }
333  }
334 
335  return true;
336 }
337 
338 static const string s_NotificationTimestampFormat("Y/M/D h:m:s.l");
339 
341  CNetScheduleNotificationHandler& submit_job_handler,
342  const string& job_key,
343  const string& server_host)
344 {
346  int last_event_index = -1;
347 
348  const char* format = "%s \"%s\" from %s [invalid]\n";
349 
350  if (submit_job_handler.CheckJobStatusNotification(job_key,
351  &job_status, &last_event_index))
352  format = "%s \"%s\" from %s [valid, "
353  "job_status=%s, last_event_index=%d]\n";
354 
355  printf(format, GetFastLocalTime().
356  AsString(s_NotificationTimestampFormat).c_str(),
357  submit_job_handler.GetMessage().c_str(),
358  server_host.c_str(),
359  CNetScheduleAPI::StatusToString(job_status).c_str(),
360  last_event_index);
361 }
362 
364  CNcbiOstream& job_input_ostream)
365 {
366  if (job_input_ostream.fail()) {
367  NCBI_THROW(CIOException, eWrite, "Error while writing job input");
368  }
369 }
370 
372  size_t max_embedded_input_size,
373  const string& args,
374  CNcbiIstream& remote_app_stdin,
375  CNcbiOstream& job_input_ostream)
376 {
377  CRemoteAppRequest request(m_GridClient->GetNetCacheAPI());
378 
379  // Roughly estimate the maximum embedded input size.
380  request.SetMaxInlineSize(max_embedded_input_size == 0 ?
381  numeric_limits<size_t>().max() :
382  max_embedded_input_size - max_embedded_input_size / 10);
383 
384  request.SetCmdLine(args);
385 
386  remote_app_stdin.peek();
387  if (!remote_app_stdin.eof())
388  NcbiStreamCopy(request.GetStdIn(), remote_app_stdin);
389 
390  request.Send(job_input_ostream);
391 }
392 
395 
396  string job_input;
400  string affinity;
402 
403  SBatchSubmitRecord(istream* input_stream) : attr_parser(input_stream) {}
404 
405  bool LoadNextRecord();
406 };
407 
409 {
410  if (!attr_parser.NextLine())
411  return false;
412 
417 
418  while (attr_parser.NextAttribute())
419  switch (attr_parser.GetAttributeType()) {
420  case eInput:
421  if (job_input_defined) {
422  NCBI_THROW_FMT(CArgException, eInvalidArg,
423  "More than one \"input\" attribute is defined "
424  "at line " << attr_parser.GetLineNumber());
425  }
427  job_input_defined = true;
428  break;
429  case eRemoteAppArgs:
431  NCBI_THROW_FMT(CArgException, eInvalidArg,
432  "More than one \"args\" attribute is defined "
433  "at line " << attr_parser.GetLineNumber());
434  }
437  break;
438  case eAffinity:
440  break;
441  case eExclusiveJob:
442  exclusive_job = true;
443  break;
444  default:
445  _ASSERT(0);
446  break;
447  }
448 
450  NCBI_THROW_FMT(CArgException, eInvalidArg, "\"input\" "
451  "(and/or \"args\") attribute is required "
452  "at line " << attr_parser.GetLineNumber());
453  }
454 
455  return true;
456 }
457 
459  size_t max_embedded_input_size, CNcbiOstream &job_input_ostream)
460 {
462  if (!IsOptionSet(eInput))
463  PrepareRemoteAppJobInput(max_embedded_input_size,
465  *m_Opts.input_stream, job_input_ostream);
466  else {
467  CNcbiStrstream remote_app_stdin;
468  remote_app_stdin.write(m_Opts.input.data(),
469  m_Opts.input.length());
470  PrepareRemoteAppJobInput(max_embedded_input_size,
472  remote_app_stdin, job_input_ostream);
473  }
474  } else if (IsOptionSet(eInput))
475  job_input_ostream.write(m_Opts.input.data(), m_Opts.input.length());
476  else
477  NcbiStreamCopy(job_input_ostream, *m_Opts.input_stream);
478 
479  CheckJobInputStream(job_input_ostream);
480 }
481 
483 {
484  SBatchSubmitRecord job_input_record(m_Opts.input_stream);
485 
486  size_t max_embedded_input_size = m_GridClient->GetMaxServerInputSize();
487 
488  if (m_Opts.batch_size <= 1) {
489  while (job_input_record.LoadNextRecord()) {
490  CNcbiOstream& job_input_ostream(m_GridClient->GetOStream());
491 
492  if (job_input_record.remote_app_args_defined) {
493  CNcbiStrstream remote_app_stdin;
494  remote_app_stdin.write(job_input_record.job_input.data(),
495  job_input_record.job_input.length());
496  PrepareRemoteAppJobInput(max_embedded_input_size,
497  job_input_record.remote_app_args,
498  remote_app_stdin, job_input_ostream);
499  } else {
500  job_input_ostream.write(job_input_record.job_input.data(),
501  job_input_record.job_input.length());
502  }
503 
504  CheckJobInputStream(job_input_ostream);
505 
506  if (!job_input_record.affinity.empty())
507  m_GridClient->SetJobAffinity(job_input_record.affinity);
508 
509  if (job_input_record.exclusive_job)
511 
512  if (IsOptionSet(eJobGroup))
513  m_GridClient->SetJobGroup(m_Opts.job_group);
514 
515  fprintf(m_Opts.output_stream,
516  "%s\n", m_GridClient->Submit(m_Opts.affinity).c_str());
517  }
518  } else {
519  CGridJobBatchSubmitter& batch_submitter(
520  m_GridClient->GetJobBatchSubmitter());
521  unsigned remaining_batch_size = m_Opts.batch_size;
522 
523  while (job_input_record.LoadNextRecord()) {
524  if (remaining_batch_size == 0) {
525  batch_submitter.Submit(m_Opts.job_group);
526  const vector<CNetScheduleJob>& jobs =
527  batch_submitter.GetBatch();
528  ITERATE(vector<CNetScheduleJob>, it, jobs)
529  fprintf(m_Opts.output_stream,
530  "%s\n", it->job_id.c_str());
531  batch_submitter.Reset();
532  remaining_batch_size = m_Opts.batch_size;
533  }
534  batch_submitter.PrepareNextJob();
535 
536  CNcbiOstream& job_input_ostream(batch_submitter.GetOStream());
537 
538  if (job_input_record.remote_app_args_defined) {
539  CNcbiStrstream remote_app_stdin;
540  remote_app_stdin.write(job_input_record.job_input.data(),
541  job_input_record.job_input.length());
542  PrepareRemoteAppJobInput(max_embedded_input_size,
543  job_input_record.remote_app_args,
544  remote_app_stdin, job_input_ostream);
545  } else
546  job_input_ostream.write(job_input_record.job_input.data(),
547  job_input_record.job_input.length());
548 
549  CheckJobInputStream(job_input_ostream);
550 
551  if (!job_input_record.affinity.empty())
552  batch_submitter.SetJobAffinity(job_input_record.affinity);
553 
554  if (job_input_record.exclusive_job)
556 
557  --remaining_batch_size;
558  }
559  if (remaining_batch_size < m_Opts.batch_size) {
560  batch_submitter.Submit(m_Opts.job_group);
561  const vector<CNetScheduleJob>& jobs =
562  batch_submitter.GetBatch();
563  ITERATE(vector<CNetScheduleJob>, it, jobs)
564  fprintf(m_Opts.output_stream, "%s\n", it->job_id.c_str());
565  batch_submitter.Reset();
566  }
567  }
568 }
569 
571 {
573 
574  if (IsOptionSet(eBatch)) {
575  if (IsOptionSet(eJobInputDir)) {
576  NCBI_THROW(CArgException, eInvalidArg, "'--" JOB_INPUT_DIR_OPTION
577  "' option is not supported in batch mode");
578  }
579  SubmitJob_Batch();
580  } else if (IsOptionSet(eJobInputDir)) {
581  CNetScheduleJob job;
582 
583  job.affinity = m_Opts.affinity;
584  job.group = m_Opts.job_group;
585 
587  job_key.AppendCurrentTime();
588  job_key.AppendRandom();
589  job.job_id = job_key.ToString();
590 
591  {{
592  CStringOrBlobStorageWriter job_input_writer(
593  numeric_limits<size_t>().max(), NULL, job.input);
594  CWStream job_input_ostream(&job_input_writer, 0, NULL);
595 
596  x_LoadJobInput(0, job_input_ostream);
597  }}
598 
599  CNetScheduleJobSerializer job_serializer(job, m_CompoundIDPool);
601 
602  PrintLine(job.job_id);
603  } else {
604  CNcbiOstream& job_input_ostream = m_GridClient->GetOStream();
605 
606  size_t max_embedded_input_size = m_GridClient->GetMaxServerInputSize();
607 
608  x_LoadJobInput(max_embedded_input_size, job_input_ostream);
609 
610  m_GridClient->SetJobGroup(m_Opts.job_group);
611  m_GridClient->SetJobAffinity(m_Opts.affinity);
612 
615 
617  PrintLine(m_GridClient->Submit());
618  else {
619  m_GridClient->CloseStream();
620 
621  CNetScheduleJob& job = m_GridClient->GetJob();
622 
623  CDeadline deadline(m_Opts.timeout, 0);
624 
625  CNetScheduleNotificationHandler submit_job_handler;
626 
627  submit_job_handler.SubmitJob(m_NetScheduleSubmitter,
628  job, m_Opts.timeout);
629 
630  PrintLine(job.job_id);
631 
634  submit_job_handler.WaitForJobCompletion
635  (job, deadline, m_NetScheduleAPI);
636 
638 
639  if (status == CNetScheduleAPI::eDone) {
643  }
644  } else {
645  submit_job_handler.PrintPortNumber();
646 
647  string server_host;
648 
649  if (submit_job_handler.WaitForNotification(deadline,
650  &server_host))
651  PrintJobStatusNotification(submit_job_handler,
652  job.job_id, server_host);
653  }
654  }
655  }
656 
657  return 0;
658 }
659 
661 {
663 
664  if (!IsOptionSet(eWaitTimeout)) {
665  fprintf(stderr, GRID_APP_NAME " " WATCHJOB_COMMAND
666  ": option '--" WAIT_TIMEOUT_OPTION "' is required.\n");
667  return 2;
668  }
669 
673 
674  CDeadline deadline(m_Opts.timeout, 0);
675 
676  CNetScheduleNotificationHandler submit_job_handler;
677 
678  CNetScheduleAPI::EJobStatus job_status;
679  int last_event_index = -1;
680 
681  tie(job_status, last_event_index, ignore) =
682  submit_job_handler.RequestJobWatching(m_NetScheduleAPI, m_Opts.id, deadline);
683 
684  if (job_status == CNetScheduleAPI::eJobNotFound) {
685  fprintf(stderr, GRID_APP_NAME ": unexpected error while "
686  "setting up a job event listener.\n");
687  return 3;
688  }
689 
691  if (last_event_index <= m_Opts.last_event_index &&
692  (m_Opts.job_status_mask & (1 << job_status)) == 0)
693  job_status = submit_job_handler.WaitForJobEvent(m_Opts.id,
695  m_Opts.last_event_index, &last_event_index);
696 
697  printf("%d\n%s\n", last_event_index,
698  CNetScheduleAPI::StatusToString(job_status).c_str());
699  } else {
700  if (last_event_index > m_Opts.last_event_index) {
701  fprintf(stderr, "Job event index (%d) has already "
702  "exceeded %d; won't wait.\n",
703  last_event_index, m_Opts.last_event_index);
704  return 6;
705  }
706  if ((m_Opts.job_status_mask & (1 << job_status)) != 0) {
707  fprintf(stderr, "Job is already '%s'; won't wait.\n",
708  CNetScheduleAPI::StatusToString(job_status).c_str());
709  return 6;
710  }
711 
712  submit_job_handler.PrintPortNumber();
713 
714  string server_host;
715 
716  while (submit_job_handler.WaitForNotification(deadline,
717  &server_host))
718  PrintJobStatusNotification(submit_job_handler,
719  m_Opts.id, server_host);
720  }
721 
722  return 0;
723 }
724 
725 static bool s_DumpStdStream(CNcbiIstream& std_stream, FILE* output_stream)
726 {
727  char buffer[IO_BUFFER_SIZE];
728  size_t bytes_read;
729 
730  std_stream.exceptions((ios::iostate) 0);
731 
732  while (!std_stream.eof()) {
733  std_stream.read(buffer, sizeof(buffer));
734  if (std_stream.fail() && !std_stream.eof())
735  return false;
736  bytes_read = (size_t) std_stream.gcount();
737 
738  // bytes_read could be zero due to EoF reported after read
739  if (bytes_read &&
740  fwrite(buffer, bytes_read, 1, output_stream) != 1)
741  return false;
742  }
743 
744  return true;
745 }
746 
748  const string& data_or_blob_id)
749 {
750  const auto kStreamFlags = CRWStreambuf::fOwnReader | CRWStreambuf::fLeakExceptions;
751  unique_ptr<IReader> reader;
752 
753  try {
754  reader.reset(new CStringOrBlobStorageReader(data_or_blob_id,
755  m_NetCacheAPI));
756  }
758  // Allow the special case of jobs submitted bypassing the Grid API.
759 
761  throw;
762 
766  throw;
767 
768  if (fwrite(data_or_blob_id.data(), data_or_blob_id.length(), 1,
769  m_Opts.output_stream) != 1)
770  goto Error;
771 
772  return 0;
773  }
774 
777 
778  try {
779  CRStream input_stream(reader.release(), 0, 0, kStreamFlags);
780 
781  request.Deserialize(input_stream);
782  }
783  catch (exception&) {
784  fprintf(stderr, GRID_APP_NAME
785  ": Cannot deserialize remote_app job input.\n");
786  return 3;
787  }
788 
790  goto Error;
791 
792  return 0;
793  }
794 
796  CRStream input_stream(reader.release(), 0, 0, kStreamFlags);
797 
798  CRemoteAppResult remote_app_result(m_NetCacheAPI);
799  remote_app_result.Receive(input_stream);
800 
801  CNcbiIstream& std_stream = IsOptionSet(eRemoteAppStdOut) ?
802  remote_app_result.GetStdOut() : remote_app_result.GetStdErr();
803 
804  if (!s_DumpStdStream(std_stream, m_Opts.output_stream))
805  goto Error;
806 
807  return 0;
808  }
809 
810  char buffer[IO_BUFFER_SIZE];
811  size_t bytes_read;
812 
813  while (reader->Read(buffer, sizeof(buffer), &bytes_read) != eRW_Eof)
814  if (fwrite(buffer, bytes_read, 1, m_Opts.output_stream) != 1)
815  goto Error;
816 
817  return 0;
818 
819 Error:
820  fprintf(stderr, GRID_APP_NAME ": I/O error.\n");
821  return 3;
822 }
823 
825  const CNetScheduleJob& job)
826 {
827  PrintLine(job.job_id);
828  if (!job.auth_token.empty())
829  printf("%s ", job.auth_token.c_str());
830  if (!job.affinity.empty()) {
831  string affinity(NStr::PrintableString(job.affinity));
832  printf(job.mask & CNetScheduleAPI::eExclusiveJob ?
833  "affinity=\"%s\" exclusive\n" : "affinity=\"%s\"\n",
834  affinity.c_str());
835  } else
836  printf(job.mask & CNetScheduleAPI::eExclusiveJob ?
837  "exclusive\n" : "\n");
838  return DumpJobInputOutput(job.input);
839 }
840 
842 {
844 
845  CNetScheduleJob job;
846  job.job_id = m_Opts.id;
847 
849  fprintf(stderr, GRID_APP_NAME ": job %s has expired.\n",
850  job.job_id.c_str());
851  return 3;
852  }
853 
854  return DumpJobInputOutput(job.input);
855 }
856 
858 {
860 
861  CNetScheduleJob job;
862  job.job_id = m_Opts.id;
864 
865  switch (status) {
870  break;
871 
872  default:
873  fprintf(stderr, "Warning: job is in %s status.\n",
874  CNetScheduleAPI::StatusToString(status).c_str());
875  }
876 
877  return DumpJobInputOutput(job.output);
878 }
879 
881 {
883 
884  const bool reliable_first = IsOptionSet(eReliableRead);
885  const bool reliable_second = IsOptionSet(eConfirmRead) || IsOptionSet(eFailRead) || IsOptionSet(eRollbackRead);
886 
887  // Option eClientSession is also set when login token is specified
888  if ((reliable_first || reliable_second) && !IsOptionSet(eClientSession)) {
889  fprintf(stderr, GRID_APP_NAME " " READJOB_COMMAND
890  ": Either option '--" LOGIN_TOKEN_OPTION "' or '--" CLIENT_SESSION_OPTION
891  "' is required in reliable mode (same session must be used for both steps).\n");
892  return 2;
893  }
894 
895  if (!reliable_second) {
896  if (IsOptionSet(eJobId)) {
897  fprintf(stderr, GRID_APP_NAME " " READJOB_COMMAND
898  ": option '--" JOB_ID_OPTION "' cannot be used in %s.\n",
899  reliable_first ? "the first step of reliable mode" : "simple mode");
900  return 2;
901  }
902 
903  CNetScheduleJob job;
904  CNetScheduleAPI::EJobStatus job_status;
905 
908 
910 
912  rnj_result = job_reader.ReadNextJob(&job, &job_status);
913  else {
914  CTimeout timeout(m_Opts.timeout);
915  rnj_result = job_reader.ReadNextJob(&job, &job_status, &timeout);
916  }
917 
918  switch (rnj_result) {
920  PrintLine(job.job_id);
922 
923  if (reliable_first)
924  PrintLine(job.auth_token);
925  else {
926  if (job_status == CNetScheduleAPI::eDone) {
928  int ret_code = DumpJobInputOutput(job.output);
929  if (ret_code != 0)
930  return ret_code;
931  }
933  }
934  break;
935 
937  return 3;
938 
940  if (IsOptionSet(eWaitTimeout)) {
941  PrintLine("TIMEOUT");
942  }
943 
944  return 0;
945 
947  PrintLine("NOJOBS");
948  return 0;
949  }
950  } else {
951  if (!IsOptionSet(eJobId)) {
952  fprintf(stderr, GRID_APP_NAME " " READJOB_COMMAND
953  ": option '--" JOB_ID_OPTION "' is required.\n");
954  return 2;
955  }
956 
959  else if (IsOptionSet(eFailRead))
962  else
964  }
965 
966  return 0;
967 }
968 
970 {
971  if (IsOptionSet(eJobGroup)) {
973 
976  } else if (IsOptionSet(eAllJobs) || !m_Opts.job_statuses.empty()) {
978 
980  } else {
982 
984  }
985 
986  return 0;
987 }
988 
990 {
992 
995 
999  case OPTION_N(2) + OPTION_N(0):
1000  affinity_preference = CNetScheduleExecutor::ePreferredAffsOrAnyJob;
1001  break;
1002 
1003  case OPTION_N(1):
1004  case OPTION_N(1) + OPTION_N(0):
1005  affinity_preference = CNetScheduleExecutor::eClaimNewPreferredAffs;
1006  break;
1007 
1008  case OPTION_N(0):
1009  affinity_preference = CNetScheduleExecutor::ePreferredAffinities;
1010  break;
1011 
1012  case 0:
1014  break;
1015  /* FALL THROUGH */
1016 
1017  case OPTION_N(2):
1018  affinity_preference = CNetScheduleExecutor::eAnyJob;
1019  break;
1020 
1021  default:
1022  fprintf(stderr, GRID_APP_NAME ": options '--"
1024  "' are mutually exclusive.\n");
1025  return 2;
1026  }
1027 
1028  m_NetScheduleExecutor.SetAffinityPreference(affinity_preference);
1029 
1030  CNetScheduleJob job;
1031 
1034  return PrintJobAttrsAndDumpInput(job);
1035  } else {
1036  CDeadline deadline(m_Opts.timeout, 0);
1037 
1038  CNetScheduleNotificationHandler wait_job_handler;
1039 
1040  wait_job_handler.PrintPortNumber();
1041 
1043  affinity_preference, m_Opts.affinity));
1044 
1045  wait_job_handler.CmdAppendTimeoutGroupAndClientInfo(cmd,
1046  &deadline, m_Opts.job_group);
1047 
1048  if (wait_job_handler.RequestJob(m_NetScheduleExecutor, job, cmd)) {
1049  fprintf(stderr, "%s\nA job has been returned; won't wait.\n",
1050  job.job_id.c_str());
1051  return 6;
1052  }
1053 
1054  string server_host;
1055  CNetServer server;
1056  string server_address;
1057 
1058  while (wait_job_handler.WaitForNotification(deadline,
1059  &server_host)) {
1060  const char* format = "%s \"%s\" from %s [invalid]\n";
1061 
1062  if (wait_job_handler.CheckRequestJobNotification(
1063  m_NetScheduleExecutor, &server)) {
1064  server_address = server.GetServerAddress();
1065  format = "%s \"%s\" from %s [valid, server=%s]\n";
1066  }
1067 
1068  printf(format, GetFastLocalTime().AsString(
1070  wait_job_handler.GetMessage().c_str(),
1071  server_host.c_str(),
1072  server_address.c_str());
1073  }
1074  }
1075 
1076  return 0;
1077 }
1078 
1080 {
1082 
1083  CNetScheduleJob job;
1084 
1085  job.job_id = m_Opts.id;
1086  job.ret_code = m_Opts.return_code;
1088 
1090  job.output = "K " + m_Opts.job_output_blob;
1091  else {
1092  const auto kMaxOutputSize = m_NetScheduleAPI.GetServerParams().max_output_size;
1093  CStringOrBlobStorageWriter writer(kMaxOutputSize, m_NetCacheAPI, job.output);
1094 
1095  if (!IsOptionSet(eJobOutput)) {
1096  char buffer[IO_BUFFER_SIZE];
1097 
1098  do {
1099  m_Opts.input_stream->read(buffer, sizeof(buffer));
1100  if (m_Opts.input_stream->fail() &&
1101  !m_Opts.input_stream->eof()) {
1103  "Error while reading job output data");
1104  }
1105  if (writer.Write(buffer,
1106  (size_t) m_Opts.input_stream->gcount()) != eRW_Success)
1107  goto ErrorExit;
1108  } while (!m_Opts.input_stream->eof());
1109  } else
1110  if (writer.Write(m_Opts.job_output.data(),
1111  m_Opts.job_output.length()) != eRW_Success)
1112  goto ErrorExit;
1113  }
1114 
1115  if (!IsOptionSet(eFailJob))
1117  else {
1120  }
1121 
1122  return 0;
1123 
1124 ErrorExit:
1125  fprintf(stderr, GRID_APP_NAME ": error while sending job output.\n");
1126  return 3;
1127 }
1128 
1130 {
1132 
1133  CNetScheduleJob job;
1134  job.job_id = m_Opts.id;
1136 
1138 
1139  return 0;
1140 }
1141 
1143 {
1145 
1147 
1148  return 0;
1149 }
1150 
1152 {
1154 
1158 
1159  CNetScheduleJob job;
1160  job.job_id = m_Opts.id;
1161 
1163  executor.JobDelayExpiration(job,
1164  (unsigned) m_Opts.extend_lifetime_by);
1165 
1168  executor.PutProgressMsg(job);
1169  }
1170  }
1171 
1172  return 0;
1173 }
1174 
1176 {
1178 
1179  if (!IsOptionSet(eQueueClasses)) {
1180  if ((IsOptionSet(eQueueArg) ^ IsOptionSet(eAllQueues)) == 0) {
1181  fprintf(stderr, GRID_APP_NAME " " QUEUEINFO_COMMAND
1182  ": either the '" QUEUE_ARG "' argument or the '--"
1183  ALL_QUEUES_OPTION "' option must be specified.\n");
1184  return 1;
1185  }
1186  if (m_Opts.output_format == eJSON)
1189  else if (!IsOptionSet(eAllQueues))
1191  else {
1192  m_NetScheduleAPI.GetService().PrintCmdOutput("STAT QUEUES",
1194  }
1195  } else if (m_Opts.output_format == eJSON)
1197  else
1198  m_NetScheduleAPI.GetService().PrintCmdOutput("STAT QCLASSES",
1200 
1201  return 0;
1202 }
1203 
1205 {
1207 
1210 
1211  return 0;
1212 }
1213 
1215 {
1217 
1220 
1221  return 0;
1222 }
1223 
1225 {
1227 
1229 
1231 
1232  typedef set<string> TServerSet;
1233  typedef map<string, TServerSet> TQueueRegister;
1234 
1235  TQueueRegister queue_register;
1236 
1237  ITERATE (CNetScheduleAdmin::TQueueList, it, queues) {
1238  string server_address = it->server.GetServerAddress();
1239 
1240  ITERATE(std::list<std::string>, queue, it->queues) {
1241  queue_register[*queue].insert(server_address);
1242  }
1243  }
1244 
1245  ITERATE(TQueueRegister, it, queue_register) {
1246  NcbiCout << it->first;
1247  if (it->second.size() != queues.size()) {
1248  const char* sep = " (limited to ";
1249  ITERATE(TServerSet, server, it->second) {
1250  NcbiCout << sep << *server;
1251  sep = ", ";
1252  }
1253  NcbiCout << ")";
1254  }
1255  NcbiCout << NcbiEndl;
1256  }
1257 
1258  return 0;
1259 }
1260 
1262 {
1264 
1266 
1267  return 0;
1268 }
1269 
CJsonNode g_QueueClassInfoToJson(CNetScheduleAPI ns_api)
Definition: util.cpp:210
void g_ProcessJobInfo(CNetScheduleAPI ns_api, const string &job_key, IJobInfoProcessor *processor, bool verbose, CCompoundIDPool::TInstance id_pool)
Definition: util.cpp:572
void g_PrintJSON(FILE *output_stream, CJsonNode node, const char *indent)
Definition: util.cpp:738
CJsonNode g_QueueInfoToJson(CNetScheduleAPI ns_api, const string &queue_name)
Definition: util.cpp:194
#define GRID_APP_NAME
Definition: cgi2rcgi.cpp:63
CArgException –.
Definition: ncbiargs.hpp:120
ENextAttributeType NextAttribute(CTempString *attr_name, string *attr_value, size_t *attr_column)
void Reset(const char *position, const char *eol)
size_t GetLineNumber() const
Definition: ns_cmds.cpp:250
CBatchSubmitAttrParser(istream *input_stream)
Definition: ns_cmds.cpp:241
CAttrListParser m_AttrParser
Definition: ns_cmds.cpp:256
EOption GetAttributeType() const
Definition: ns_cmds.cpp:248
istream * m_InputStream
Definition: ns_cmds.cpp:253
const string & GetAttributeValue() const
Definition: ns_cmds.cpp:249
CCompoundID NewID(ECompoundIDClass new_id_class)
Create and return a new CCompoundID objects.
Base64-encoded ID string that contains extractable typed fields.
void AppendRandom(Uint4 random_number)
Append an eCIT_Random field at the end of this compound ID.
void AppendCurrentTime()
Get the current time and append it as an eCIT_Timestamp field at the end of this compound ID.
string ToString()
Pack the ID and return its string representation.
CDeadline.
Definition: ncbitime.hpp:1830
Grid Client (the submitter).
static bool OnWarning(bool worker_node_admin, const string &warn_msg, CNetServer server)
Definition: grid_cli.cpp:1749
int DumpJobInputOutput(const string &data_or_blob_id)
Definition: ns_cmds.cpp:747
CCompoundIDPool m_CompoundIDPool
Definition: grid_cli.hpp:457
bool IsOptionSet(int option) const
Definition: grid_cli.hpp:421
unique_ptr< CGridClient > m_GridClient
Definition: grid_cli.hpp:453
void SetUp_NetScheduleCmd(EAPIClass api_class, EAdminCmdSeverity cmd_severity=eReadOnlyAdminCmd, bool require_queue=true)
Definition: ns_cmds.cpp:47
struct CGridCommandLineInterfaceApp::SOptions m_Opts
void PrepareRemoteAppJobInput(size_t max_embedded_input_size, const string &args, CNcbiIstream &remote_app_stdin, CNcbiOstream &job_input_ostream)
Definition: ns_cmds.cpp:371
CNetScheduleSubmitter m_NetScheduleSubmitter
Definition: grid_cli.hpp:451
void JobInfo_PrintStatus(CNetScheduleAPI::EJobStatus status)
Definition: ns_cmds.cpp:165
CNetScheduleAdmin m_NetScheduleAdmin
Definition: grid_cli.hpp:450
bool IsOptionAcceptedAndSetImplicitly(EOption option) const
Definition: grid_cli.hpp:411
CNetScheduleAPIExt m_NetScheduleAPI
Definition: grid_cli.hpp:449
void MarkOptionAsSet(int option)
Definition: grid_cli.hpp:416
void PrintJobStatusNotification(CNetScheduleNotificationHandler &submit_job_handler, const string &job_key, const string &server_host)
Definition: ns_cmds.cpp:340
enum CGridCommandLineInterfaceApp::EAPIClass m_APIClass
int PrintJobAttrsAndDumpInput(const CNetScheduleJob &job)
Definition: ns_cmds.cpp:824
static void PrintLine(const string &line)
Definition: grid_cli.cpp:1775
bool IsOptionExplicitlySet(int option) const
Definition: grid_cli.hpp:436
void x_LoadJobInput(size_t max_embedded_input_size, CNcbiOstream &job_input_ostream)
Definition: ns_cmds.cpp:458
CNetScheduleExecutor m_NetScheduleExecutor
Definition: grid_cli.hpp:452
void CheckJobInputStream(CNcbiOstream &job_input_ostream)
Definition: ns_cmds.cpp:363
Grid Job Batch Submitter.
Definition: grid_client.hpp:60
CJsonNode GetRootNode() const
Definition: util.hpp:89
void DeleteQueue(const string &qname)
Delete queue Applicable only to queues, created through CreateQueue method.
void GetQueueList(TQueueList &result)
void CreateQueue(const string &qname, const string &qclass, const string &description=kEmptyStr)
Create an instance of the given queue class.
void DumpJob(CNcbiOstream &out, const string &job_key)
void CancelAllJobs(const string &job_statuses=kEmptyStr)
Cancel all jobs in the queue (optionally with particular statuses).
void DumpQueue(CNcbiOstream &output_stream, const string &start_after_job=kEmptyStr, size_t job_count=0, const string &job_statuses=kEmptyStr, const string &job_group=kEmptyStr)
void PrintQueueInfo(const string &queue_name, CNcbiOstream &output_stream)
list< SServerQueueList > TQueueList
Smart pointer to a part of the NetSchedule API that does job retrieval and processing on the worker n...
Smart pointer to a part of the NetSchedule API that allows to retrieve completed jobs.
string SaveJobInput(const string &target_dir, CNetCacheAPI &nc_api)
CNetScheduleAPI::EJobStatus WaitForJobEvent(const string &job_key, CDeadline &deadline, CNetScheduleAPI ns_api, TJobStatusMask status_mask, int last_event_index=kMax_Int, int *new_event_index=NULL)
bool CheckJobStatusNotification(const string &job_id, CNetScheduleAPI::EJobStatus *job_status, int *last_event_index=NULL)
bool WaitForNotification(const CDeadline &deadline, string *server_host=NULL)
void CmdAppendTimeoutGroupAndClientInfo(string &cmd, const CDeadline *deadline, const string &job_group)
bool CheckRequestJobNotification(CNetScheduleExecutor::TInstance executor, CNetServer *server)
bool RequestJob(CNetScheduleExecutor::TInstance executor, CNetScheduleJob &job, const string &cmd)
TJobInfo RequestJobWatching(CNetScheduleAPI::TInstance ns_api, const string &job_id, const CDeadline &deadline)
void SubmitJob(CNetScheduleSubmitter::TInstance submitter, CNetScheduleJob &job, unsigned wait_time, CNetServer *server=NULL)
static string MkBaseGETCmd(CNetScheduleExecutor::EJobAffinityPreference affinity_preference, const string &affinity_list)
CNetScheduleAPI::EJobStatus WaitForJobCompletion(CNetScheduleJob &job, CDeadline &deadline, CNetScheduleAPI ns_api, time_t *job_exptime=NULL)
void StickToServer(SSocketAddress address)
string GetServerAddress() const
void PrintCmdOutput(const string &cmd, CNcbiOstream &output_stream, ECmdOutputStyle output_style, CNetService::EIterationMode=CNetService::eSortByLoad)
void SetWarningHandler(TEventHandler warning_handler)
CNetServerPool GetServerPool()
Note about the "buf_size" parameter for streams in this API.
Definition: rwstream.hpp:122
@ fLeakExceptions
Exceptions leaked out.
Definition: rwstreambuf.hpp:72
@ fOwnReader
Own the underlying reader.
Definition: rwstreambuf.hpp:66
Remote Application Request (both client side and application executor side)
Definition: remote_app.hpp:96
CNcbiIstream & GetStdInForRead()
Get the stdin stream of the remote application.
Definition: remote_app.hpp:131
void SetMaxInlineSize(size_t max_inline_size)
Definition: remote_app.hpp:156
void Send(CNcbiOstream &os)
Serialize a request to a given stream.
Definition: remote_app.cpp:164
void Deserialize(CNcbiIstream &is)
Definition: remote_app.hpp:161
CNcbiOstream & GetStdIn()
Get an output stream to write data to a remote application stdin.
Definition: remote_app.hpp:126
void SetCmdLine(const string &cmdline)
Set the command line for the remote application.
Definition: remote_app.hpp:106
Remote Application Result (both client side and application executor side)
Definition: remote_app.hpp:209
CNcbiIstream & GetStdErr()
Get a remote application stderr.
Definition: remote_app.hpp:243
CNcbiIstream & GetStdOut()
Get a remote application stdout.
Definition: remote_app.hpp:232
void Receive(CNcbiIstream &is)
Deserialize a request from a given stream.
Definition: remote_app.cpp:355
String or Blob Storage Reader.
String or Blob Storage Writer.
ERW_Result Write(const void *buf, size_t count, size_t *bytes_written=0) override
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
CTimeout – Timeout interval.
Definition: ncbitime.hpp:1693
Writer-based output stream.
Definition: rwstream.hpp:171
Definition: map.hpp:338
@ eCIC_GenericID
Definition: compound_id.hpp:51
static CS_COMMAND * cmd
Definition: ct_dynamic.c:26
#define RAW_OUTPUT_FORMAT
Definition: grid_cli.hpp:137
#define QUEUEINFO_COMMAND
Definition: grid_cli.hpp:133
#define LOGIN_COMMAND
Definition: grid_cli.hpp:129
#define JOBINFO_COMMAND
Definition: grid_cli.hpp:130
#define LOGIN_TOKEN_ENV
Definition: grid_cli.hpp:49
#define BRIEF_OPTION
Definition: grid_cli.hpp:101
#define QUEUE_ARG
Definition: grid_cli.hpp:110
#define LOGIN_TOKEN_OPTION
Definition: grid_cli.hpp:69
#define OPTION_N(number)
Definition: grid_cli.hpp:286
#define QUEUE_OPTION
Definition: grid_cli.hpp:87
#define WATCHJOB_COMMAND
Definition: grid_cli.hpp:132
#define JOB_ID_OPTION
Definition: grid_cli.hpp:100
#define ANY_AFFINITY_OPTION
Definition: grid_cli.hpp:91
#define IO_BUFFER_SIZE
Definition: grid_cli.hpp:145
#define CLIENT_SESSION_OPTION
Definition: grid_cli.hpp:125
#define NETSCHEDULE_OPTION
Definition: grid_cli.hpp:81
EOption
Definition: grid_cli.hpp:150
@ eID
Definition: grid_cli.hpp:153
@ eWaitForJobStatus
Definition: grid_cli.hpp:225
@ eFailRead
Definition: grid_cli.hpp:208
@ eRemoteAppArgs
Definition: grid_cli.hpp:165
@ eRemoteAppStdIn
Definition: grid_cli.hpp:166
@ eProgressMessageOnly
Definition: grid_cli.hpp:223
@ eClientSession
Definition: grid_cli.hpp:267
@ eReliableRead
Definition: grid_cli.hpp:205
@ eRollbackRead
Definition: grid_cli.hpp:207
@ eWaitTimeout
Definition: grid_cli.hpp:231
@ eDeferExpiration
Definition: grid_cli.hpp:224
@ eQueueArg
Definition: grid_cli.hpp:233
@ eDumpNSNotifications
Definition: grid_cli.hpp:272
@ eFailJob
Definition: grid_cli.hpp:232
@ eJobOutputBlob
Definition: grid_cli.hpp:201
@ eAnyAffinity
Definition: grid_cli.hpp:198
@ eAffinityList
Definition: grid_cli.hpp:195
@ eClaimNewAffinities
Definition: grid_cli.hpp:197
@ eLoginToken
Definition: grid_cli.hpp:161
@ eJobInputDir
Definition: grid_cli.hpp:247
@ eQueueClasses
Definition: grid_cli.hpp:235
@ eClientNode
Definition: grid_cli.hpp:266
@ eRemoteAppStdOut
Definition: grid_cli.hpp:167
@ eJobId
Definition: grid_cli.hpp:210
@ eAffinity
Definition: grid_cli.hpp:194
@ eCompatMode
Definition: grid_cli.hpp:246
@ eTargetQueueArg
Definition: grid_cli.hpp:236
@ eBrief
Definition: grid_cli.hpp:221
@ eNetSchedule
Definition: grid_cli.hpp:190
@ eInput
Definition: grid_cli.hpp:163
@ eRemoteAppStdErr
Definition: grid_cli.hpp:168
@ eConfirmRead
Definition: grid_cli.hpp:206
@ eUntypedArg
Definition: grid_cli.hpp:151
@ eExtendLifetime
Definition: grid_cli.hpp:227
@ eExclusiveJob
Definition: grid_cli.hpp:199
@ eQueue
Definition: grid_cli.hpp:191
@ eWaitForJobEventAfter
Definition: grid_cli.hpp:226
@ eAllQueues
Definition: grid_cli.hpp:234
@ eJobOutput
Definition: grid_cli.hpp:200
@ eUsePreferredAffinities
Definition: grid_cli.hpp:196
@ eWorkerNode
Definition: grid_cli.hpp:192
@ eBatch
Definition: grid_cli.hpp:193
@ eProgressMessage
Definition: grid_cli.hpp:228
@ eAllJobs
Definition: grid_cli.hpp:230
@ eJobGroup
Definition: grid_cli.hpp:229
@ eStatusOnly
Definition: grid_cli.hpp:222
#define ALL_QUEUES_OPTION
Definition: grid_cli.hpp:107
#define WAIT_TIMEOUT_OPTION
Definition: grid_cli.hpp:105
#define READJOB_COMMAND
Definition: grid_cli.hpp:131
#define CLAIM_NEW_AFFINITIES_OPTION
Definition: grid_cli.hpp:90
#define JOB_INPUT_DIR_OPTION
Definition: grid_cli.hpp:117
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
#define NULL
Definition: ncbistd.hpp:225
void Error(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1197
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
Definition: ncbiexpt.hpp:719
EJobStatus
Job status codes.
void ReadConfirm(const string &job_id, const string &auth_token)
Mark the specified job as successfully retrieved.
CNetScheduleSubmitter GetSubmitter()
Create an instance of CNetScheduleSubmitter.
void Submit(const string &job_group=kEmptyStr)
Submit a batch to the queue.
void SetJobMask(CNetScheduleAPI::TJobMask mask)
Set a job mask.
string output
Job result data.
void SetJobAffinity(const string &affinity)
Set a job affinity.
EJobStatus GetJobDetails(CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get job details.
CNetScheduleAPI::EJobStatus GetJobStatus(const CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get the current status of the specified job.
CNetScheduleJobReader GetJobReader(const string &group=kEmptyStr, const string &affinity=kEmptyStr)
Create an instance of CNetScheduleJobReader.
void ReturnJob(const CNetScheduleJob &job)
Switch the job back to the "Pending" status so that it can be run again on a different worker node.
void ClearNode()
Unregister client-listener.
void PutResult(const CNetScheduleJob &job)
Put job result (job should be received by GetJob() or WaitJob())
CNetScheduleAPI::TJobMask mask
int ret_code
Job return code.
static string StatusToString(EJobStatus status)
Printable status type.
string input
Input data.
EReadNextJobResult ReadNextJob(CNetScheduleJob *job, CNetScheduleAPI::EJobStatus *job_status, const CTimeout *timeout=NULL)
Wait and return the next completed job.
CNetScheduleAPI::EJobStatus GetJobDetails(CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get full information about the specified job.
void SetJobGroup(const string &job_group)
Retrieve jobs from the specified group only.
CNetScheduleAPI::EJobStatus GetJobStatus(const string &job_key, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get the current status of the specified job.
void JobDelayExpiration(const CNetScheduleJob &job, unsigned runtime_inc)
Increment job execution timeout.
EJobAffinityPreference
Affinity matching modes.
void CancelJobGroup(const string &job_group, const string &job_statuses=kEmptyStr)
Cancel job group.
void GetProgressMsg(CNetScheduleJob &job)
Update the progress_message field of the job structure.
void SetClientType(EClientType client_type)
void ReadRollback(const string &job_id, const string &auth_token)
Refuse from processing the results of the specified job.
void SetAffinityPreference(EJobAffinityPreference aff_pref)
Set preferred method of requesting jobs with affinities.
const SServerParams & GetServerParams()
EReadNextJobResult
Possible outcomes of ReadNextJob() calls.
CNetService GetService()
void PutProgressMsg(const CNetScheduleJob &job)
Put job interim (progress) message.
bool GetJob(CNetScheduleJob &job, const string &affinity_list=kEmptyStr, CDeadline *dealine=NULL)
Get a pending job.
CNetScheduleExecutor GetExecutor()
Create an instance of CNetScheduleExecutor.
void ReadFail(const string &job_id, const string &auth_token, const string &error_message=kEmptyStr)
Refuse from processing the results of the specified job and increase its counter of failed job result...
CNetScheduleAdmin GetAdmin()
void PutFailure(const CNetScheduleJob &job, bool no_retries=false)
Submit job failure diagnostics.
string job_id
Output job key.
void CancelJob(const string &job_key)
Cancel job.
const vector< CNetScheduleJob > & GetBatch() const
Definition: grid_client.hpp:93
CNcbiOstream & GetOStream()
Get a stream where a client can write an input data for the remote job.
@ eDone
Job is ready (computed successfully)
@ eConfirmed
Final state - read confirmed.
@ eReading
Job has its output been reading.
@ eRunning
Running on a worker node.
@ eJobNotFound
No such job.
@ ePending
Waiting for execution.
@ eReadFailed
Final state - read failed.
@ eExclusiveJob
Exclusive job - the node executes only this job, even if there are processor resources.
@ eRNJ_NotReady
No matching jobs are ready for reading.
@ eRNJ_JobReady
A job is returned.
@ eRNJ_NoMoreJobs
No matching jobs.
@ eRNJ_Interrupt
ReadNextJob() has been interrupted.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
#define NcbiEndl
Definition: ncbistre.hpp:548
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
#define NcbiCout
Definition: ncbistre.hpp:543
IO_PREFIX::istream CNcbiIstream
Portable alias for istream.
Definition: ncbistre.hpp:146
bool NcbiStreamCopy(CNcbiOstream &os, CNcbiIstream &is)
Copy the entire contents of stream "is" to stream "os".
Definition: ncbistre.cpp:211
@ eRW_Eof
End of data, should be considered permanent.
@ eRW_Success
Everything is okay, I/O completed.
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
Definition: ncbistr.cpp:3953
#define kEmptyStr
Definition: ncbistr.hpp:123
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
CTime GetFastLocalTime(void)
Quick and dirty getter of local time.
Definition: ncbitime.cpp:4166
@ eRaw
Definition: types.hpp:81
const struct ncbi::grid::netcache::search::fields::KEY key
T max(T x_, T y_)
static Format format
Definition: njn_ioutil.cpp:53
static const string s_NotificationTimestampFormat("Y/M/D h:m:s.l")
#define ATTR_POS
#define ATTR_CHECK_SET(name, type)
static bool s_DumpStdStream(CNcbiIstream &std_stream, FILE *output_stream)
Definition: ns_cmds.cpp:725
@ eRead
Definition: ns_types.hpp:56
static pcre_uint8 * buffer
Definition: pcretest.c:1051
Reader-writer based streams.
void ReSetClientSession(const string &)
void ReSetClientNode(const string &)
static TInstance CreateWnCompat(const string &, const string &)
static TInstance CreateNoCfgLoad(const string &, const string &, const string &)
Job description.
Meaningful information encoded in the NetSchedule key.
bool LoadNextRecord()
Definition: ns_cmds.cpp:408
string remote_app_args
Definition: ns_cmds.cpp:398
SBatchSubmitRecord(istream *input_stream)
Definition: ns_cmds.cpp:403
CBatchSubmitAttrParser attr_parser
Definition: ns_cmds.cpp:394
bool remote_app_args_defined
Definition: ns_cmds.cpp:399
static SSocketAddress Parse(const string &address, SHost::EName name=SHost::EName::eResolved)
#define _ASSERT
Modified on Tue Apr 16 20:10:26 2024 by modify_doxy.py rev. 669887