NCBI C++ ToolKit
netschedule_client_sample.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: netschedule_client_sample.cpp 90033 2020-05-05 16:14:33Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Anatoliy Kuznetsov
27  *
28  * File Description: NetSchedule client test
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 #include <corelib/ncbiapp.hpp>
34 #include <corelib/ncbiargs.hpp>
35 #include <corelib/ncbienv.hpp>
36 #include <corelib/ncbireg.hpp>
37 #include <corelib/ncbi_system.hpp>
38 #include <corelib/ncbimisc.hpp>
39 
41 #include <connect/ncbi_socket.hpp>
43 #include <connect/ncbi_types.h>
44 
45 
47 
48 
49 ///////////////////////////////////////////////////////////////////////
50 /// Sample application
51 ///
52 /// @internal
53 ///
54 
56 {
57 public:
58  virtual void Init(void);
59  virtual int Run(void);
60 };
61 
62 
64 {
67 
68  // Setup command line arguments and parameters
69 
70  // Create command-line argument descriptions class
71  unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
72 
73  // Specify USAGE context
74  arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
75  "NetSchedule client");
76 
77  arg_desc->AddPositional("service",
78  "NetSchedule service name",
80 
81  arg_desc->AddPositional("queue",
82  "NetSchedule queue name (like: noname).",
84 
85 
86  arg_desc->AddOptionalKey("jcount",
87  "jcount",
88  "Number of jobs to submit",
90 
91  // Setup arg.descriptions for this application
92  SetupArgDescriptions(arg_desc.release());
93 }
94 
95 
97 {
98  const CArgs& args = GetArgs();
99  const string& service = args["service"].AsString();
100  const string& queue_name = args["queue"].AsString();
101 
102  unsigned jcount = 100;
103  if (args["jcount"]) {
104  jcount = args["jcount"].AsInteger();
105  }
107  CNetScheduleAPI cl(service, "client_sample", queue_name);
108 
109  CNetScheduleSubmitter submitter = cl.GetSubmitter();
110 
111  const string input = "Hello " + queue_name;
112 
113  CNetScheduleJob job(input);
114  submitter.SubmitJob(job);
115  cout << job.job_id << endl;
116 
117  vector<string> jobs;
118  {{
120 
121  cout << "Submit " << jcount << " jobs..." << endl;
122 
123  for (unsigned i = 0; i < jcount; ++i) {
124  CNetScheduleJob job(input);
125  submitter.SubmitJob(job);
126  jobs.push_back(job.job_id);
127  if (i % 1000 == 0) {
128  cout << "." << flush;
129  }
130  }
131  double elapsed = sw.Elapsed();
132  cout << endl << "Done." << endl;
133  double avg = elapsed / jcount;
134  cout.setf(IOS_BASE::fixed, IOS_BASE::floatfield);
135  cout << "Avg time:" << avg << " sec." << endl;
136  }}
137 
138  // Waiting for jobs to be done
139 
140  cout << "Waiting for jobs..." << jobs.size() << endl;
141  unsigned cnt = 0;
142  SleepMilliSec(5000);
143 
144  CNetScheduleAdmin admin = cl.GetAdmin();
145  /*
146  CNetScheduleKeys keys;
147  admin.RetrieveKeys("status=pending", keys);
148 
149  for (CNetScheduleKeys::const_iterator it = keys.begin();
150  it != keys.end(); ++it) {
151  cout << string(*it) << endl;
152  }
153  */
154 
155  size_t last_jobs = 0;
156  size_t no_jobs_executes_cnt = 0;
157 
158  while (jobs.size()) {
159  for (auto it = jobs.begin(); it != jobs.end(); ++it) {
160  CNetScheduleJob job;
161  job.job_id = *it;
162  status = submitter.GetJobDetails(job);
163 
164  if (status == CNetScheduleAPI::eDone) {
165  string expected_output = "DONE " + queue_name;
166  if (job.output != expected_output || job.ret_code != 0) {
167  ERR_POST("Unexpected output or return code:" + job.output);
168  }
169  jobs.erase(it);
170  ++cnt;
171  break;
172  } else
173  if (status != CNetScheduleAPI::ePending) {
174  if (status == CNetScheduleAPI::eJobNotFound) {
175  cerr << "Job lost:" << job.job_id << endl;
176  }
177  jobs.erase(it);
178  ++cnt;
179  break;
180  }
181 
182  ++cnt;
183  if (cnt % 1000 == 0) {
184  cout << "Waiting for " << jobs.size() << " jobs." << endl;
185  // it is necessary to give system a rest periodically
186  SleepMilliSec(2000);
187  // check status of only first 1000 jobs
188  // since the JS queue execution priority is FIFO
189  break;
190  }
191  }
192 
193  // check if worker node picks up jobs, otherwise stop
194  // trying after 10 attempts.
195 
196  if (jobs.size() == last_jobs) {
197  ++no_jobs_executes_cnt;
198  if (no_jobs_executes_cnt == 3) {
199  cout << "No progress in job execution. Stopping..." << endl;
200  break;
201  } else {
202  last_jobs = jobs.size();
203  }
204  }
205 
206  } // while
207 
208  cout << endl << "Done." << endl;
209  if (jobs.size()) {
210  cout << "Remaining job count = " << jobs.size() << endl;
211  }
212  return 0;
213 }
214 
215 
216 int NcbiSys_main(int argc, ncbi::TXChar* argv[])
217 {
218  return CSampleNetScheduleClient().AppMain(argc, argv);
219 }
CArgDescriptions –.
Definition: ncbiargs.hpp:541
CArgs –.
Definition: ncbiargs.hpp:379
Client API for NCBI NetSchedule server.
Smart pointer to the job submission part of the NetSchedule API.
virtual int Run(void)
Run the application.
virtual void Init(void)
Initialize the application.
CStopWatch –.
Definition: ncbitime.hpp:1937
virtual const CArgs & GetArgs(void) const
Get parsed command line arguments.
Definition: ncbiapp.cpp:305
int AppMain(int argc, const char *const *argv, const char *const *envp=0, EAppDiagStream diag=eDS_Default, const char *conf=NcbiEmptyCStr, const string &name=NcbiEmptyString)
Main function (entry point) for the NCBI application.
Definition: ncbiapp.cpp:819
virtual void SetupArgDescriptions(CArgDescriptions *arg_desc)
Setup the command line argument descriptions.
Definition: ncbiapp.cpp:1195
const CNcbiArguments & GetArguments(void) const
Get the application's cached unprocessed command-line arguments.
@ eString
An arbitrary string.
Definition: ncbiargs.hpp:589
@ eInteger
Convertible into an integer number (int or Int8)
Definition: ncbiargs.hpp:592
void SetDiagPostFlag(EDiagPostFlag flag)
Set the specified flag (globally).
Definition: ncbidiag.cpp:6070
EDiagSev SetDiagPostLevel(EDiagSev post_sev=eDiag_Error)
Set the threshold severity for posting the messages.
Definition: ncbidiag.cpp:6129
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
@ eDPF_Trace
Default flags to use when tracing.
Definition: ncbidiag.hpp:722
@ eDiag_Info
Informational message.
Definition: ncbidiag.hpp:651
EJobStatus
Job status codes.
CNetScheduleSubmitter GetSubmitter()
Create an instance of CNetScheduleSubmitter.
string output
Job result data.
int ret_code
Job return code.
CNetScheduleAPI::EJobStatus GetJobDetails(CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get full information about the specified job.
CNetScheduleAdmin GetAdmin()
string SubmitJob(CNetScheduleNewJob &job)
Submit job.
string job_id
Output job key.
@ eDone
Job is ready (computed successfully)
@ eJobNotFound
No such job.
@ ePending
Waiting for execution.
char TXChar
Definition: ncbistr.hpp:172
double Elapsed(void) const
Return time elapsed since first Start() or last Restart() call (in seconds).
Definition: ncbitime.hpp:2775
@ eStart
Start timer immediately after creating.
Definition: ncbitime.hpp:1941
static CStopWatch sw
static int input()
int i
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Defines the CNcbiApplication and CAppException classes for creating NCBI applications.
Defines command line argument related classes.
Defines unified interface to application:
Miscellaneous common-use basic types and functionality.
Process information in the NCBI Registry, including working with configuration files.
NetSchedule client specs.
int NcbiSys_main(int argc, ncbi::TXChar *argv[])
static unsigned cnt[256]
Job description.
Modified on Mon Jun 24 05:19:52 2024 by modify_doxy.py rev. 669887