NCBI C++ ToolKit
grid_client.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: grid_client.cpp 88604 2019-12-20 15:57:19Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "netschedule_api_impl.hpp"
35 #include "netcache_api_impl.hpp"
36 
39 
41 
42 //////////////////////////////////////////////////////////////////////////////
43 //
45  IBlobStorage& storage,
47  EProgressMsg progress_msg) :
48  CGridClient(ns_submitter, dynamic_cast<CBlobStorage_NetCache&>(storage).GetNetCacheAPI(), cleanup, progress_msg)
49 {
50 }
51 
53  CNetCacheAPI::TInstance nc_client,
55  EProgressMsg progress_msg) :
56  m_NetScheduleSubmitter(ns_submitter),
57  m_NetCacheAPI(nc_client),
58  m_JobBatchSubmitter(*this),
59  m_AutoCleanUp(cleanup == eAutomaticCleanup),
60  m_UseProgress(progress_msg == eProgressMsgOn)
61 {
62 }
63 
65 {
67  return m_JobBatchSubmitter;
68 }
69 
70 void CGridClient::CancelJob(const string& job_key)
71 {
73 }
74 void CGridClient::RemoveDataBlob(const string& data_key)
75 {
77  m_NetCacheAPI.Remove(data_key);
78 }
79 
81 {
84 }
85 
86 //////////////////////////////////////////////////////////////////////////////
87 //
88 void CGridClient::SetJobInput(const string& input)
89 {
90  m_Job.input = input;
91 }
92 
94 {
96 }
97 
99 {
100  m_GridWrite.Reset(true);
101 }
102 
103 string CGridClient::Submit(const string& affinity)
104 {
105  CloseStream();
106 
107  if (!affinity.empty() && m_Job.affinity.empty())
108  m_Job.affinity = affinity;
109  string job_key = GetNetScheduleSubmitter().SubmitJob(m_Job);
111  m_Job.Reset();
112  return job_key;
113 }
114 
116 {
117  CloseStream();
118  time_t job_exptime = 0;
120  m_NetScheduleSubmitter->SubmitJobAndWait(m_Job, wait_time, &job_exptime);
121 
122  return x_CheckAllJobBlobs(status, job_exptime);
123 }
124 
126  unsigned wait_time)
127 {
129  m_NetScheduleSubmitter.WaitForJob(job_key, wait_time);
130 
132 }
133 
134 //////////////////////////////////////////////////////////
136 {
137  if (m_HasBeenSubmitted)
138  NCBI_THROW(CGridClientException, eBatchAlreadySubmitted,
139  "The batch has been already submitted. "
140  "Use Reset() to start a new one");
141 }
142 
144 {
146 
147  if (m_Jobs.empty())
148  PrepareNextJob();
149 }
150 
152 {
154 
155  m_Jobs[m_JobIndex].input = input;
156 }
157 
159 {
162  m_Jobs[m_JobIndex].input);
163 }
164 
166 {
168 
169  m_Jobs[m_JobIndex].mask = mask;
170 }
171 
172 void CGridJobBatchSubmitter::SetJobAffinity(const string& affinity)
173 {
175 
176  m_Jobs[m_JobIndex].affinity = affinity;
177 }
178 
180 {
182  m_GridWrite.Reset();
183  if (!m_Jobs.empty())
184  ++m_JobIndex;
185  m_Jobs.push_back(CNetScheduleJob());
186 }
187 
188 void CGridJobBatchSubmitter::Submit(const string& job_group)
189 {
191  m_GridWrite.Reset();
192  if (!m_Jobs.empty()) {
194  job_group);
195  m_HasBeenSubmitted = true;
196  }
197 }
198 
200 {
201  m_GridWrite.Reset();
202  m_HasBeenSubmitted = false;
203  m_JobIndex = 0;
204  m_Jobs.clear();
205 }
206 
208  : m_GridClient(grid_client), m_JobIndex(0),
209  m_HasBeenSubmitted(false)
210 {
211 }
212 
213 static unsigned s_TimeToTtl(time_t time);
214 
215 //////////////////////////////////////////////////////////////////////////////
216 //
217 
219  CNetScheduleAPI::EJobStatus status, time_t job_exptime)
220 {
221  if (m_AutoCleanUp && (
222  status == CNetScheduleAPI::eDone ||
223  status == CNetScheduleAPI::eCanceled)) {
224  if (m_Job.input.length() > 1 &&
225  m_Job.input[0] == 'K' && m_Job.input[1] == ' ')
226  RemoveDataBlob(m_Job.input.c_str() + 2);
227  if (m_UseProgress) {
229  if (m_Job.progress_msg.length() > 1) {
230  if (m_Job.progress_msg[1] != ' ')
232  else if (m_Job.progress_msg[0] == 'K')
233  RemoveDataBlob(m_Job.progress_msg.c_str() + 2);
234  }
235  }
236  } else {
237  x_RenewAllJobBlobs(s_TimeToTtl(job_exptime));
238  }
239 
240  m_JobDetailsRead = true;
241  return status;
242 }
244 {
245  time_t job_exptime = 0;
246 
249 
250  return x_CheckAllJobBlobs(status, job_exptime);
251 }
252 
254 {
255  x_GetJobDetails();
257 }
258 
260 {
261  if (m_UseProgress) {
263 
264  if (m_Job.progress_msg.length() <= 1)
265  return m_Job.progress_msg;
266  else {
267  string progress_message_key;
268  if (m_Job.progress_msg[1] != ' ')
271  progress_message_key = m_Job.progress_msg;
272  else
273  return m_Job.progress_msg;
274  else if (m_Job.progress_msg[0] == 'K')
275  progress_message_key = m_Job.progress_msg.c_str() + 2;
276  else if (m_Job.progress_msg[0] == 'D')
277  return m_Job.progress_msg.c_str() + 2;
278  else
279  return m_Job.progress_msg;
280 
281  string buffer;
282  GetNetCacheAPI().ReadData(progress_message_key, buffer);
283  return buffer;
284  }
285  }
286  return string();
287 }
288 
289 void CGridClient::SetJobKey(const string& job_key)
290 {
291  m_Job.Reset();
292  m_Job.job_id = job_key;
293  m_GridRead.Reset();
294  m_BlobSize = 0;
295  m_JobDetailsRead = false;
296 }
297 
298 void CGridClient::x_ProlongBlobLifetime(const string& blob_key, unsigned ttl)
299 {
300  try {
301  m_NetCacheAPI.ProlongBlobLifetime(blob_key, ttl);
302  }
303  catch (CNetServiceException& e) {
304  ERR_POST(Warning << "Error while prolonging lifetime for " <<
305  blob_key << ": " << e.GetMsg());
306  }
307 }
308 
310  const string& job_field, unsigned ttl)
311 {
312  if (!NStr::StartsWith(job_field, "K "))
313  return false;
314 
315  x_ProlongBlobLifetime(string(job_field, 2), ttl);
316 
317  return true;
318 }
319 
320 static unsigned s_TimeToTtl(time_t exptime)
321 {
322  time_t current_time = time(NULL);
323  return exptime > current_time ? unsigned(exptime - current_time + 1) : 0;
324 }
325 
327 {
330  if (!m_Job.progress_msg.empty() &&
333  m_Job.progress_msg.length(), NULL,
336 }
337 
339 {
340  if (m_JobDetailsRead)
341  return;
342  time_t job_exptime = 0;
344  x_RenewAllJobBlobs(s_TimeToTtl(job_exptime));
345  m_JobDetailsRead = true;
346 }
347 
349 {
350  x_GetJobDetails();
351  return m_Job.output;
352 }
353 
355 {
356  x_GetJobDetails();
357  return m_Job.input;
358 }
360 {
361  x_GetJobDetails();
362  return m_Job.ret_code;
363 }
364 
366 {
367  x_GetJobDetails();
368  return m_Job.error_msg;
369 }
370 
371 
ncbi::TMaskedQueryRegions mask
CBlobStorage_NetCache – NetCache-based implementation of IBlobStorage.
Grid Client exception.
Grid Client (the submitter).
Grid Job Batch Submitter.
Definition: grid_client.hpp:60
Net Service exception.
Blob Storage interface.
static void cleanup(void)
Definition: ct_dynamic.c:30
#define false
Definition: bool.h:36
static unsigned s_TimeToTtl(time_t time)
NetSchedule Framework specs.
string
Definition: cgiapp.hpp:687
#define NULL
Definition: ncbistd.hpp:225
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
const string & GetMsg(void) const
Get message string.
Definition: ncbiexpt.cpp:461
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
CCompoundIDPool GetCompoundIDPool()
static bool IsValidKey(const char *key_str, size_t key_len, CCompoundIDPool::TInstance id_pool=NULL)
static bool ParseBlobKey(const char *key_str, size_t key_len, CNetCacheKey *key_obj, CCompoundIDPool::TInstance id_pool=NULL)
Parse blob key string into a CNetCacheKey structure.
void Remove(const string &blob_id, const CNamedParameterList *optional=NULL)
Remove BLOB by key.
void ProlongBlobLifetime(const string &blob_key, unsigned ttl, const CNamedParameterList *optional=NULL)
void ReadData(const string &key, string &buffer, const CNamedParameterList *optional=NULL)
Read the blob pointed to by "key" and store its contents in "buffer".
SGridRead m_GridRead
CNetCacheAPI m_NetCacheAPI
void x_GetJobDetails()
EJobStatus
Job status codes.
CNetScheduleAPI::EJobStatus SubmitAndWait(unsigned wait_time)
Submit a job to the queue.
void SetJobInput(const string &input)
Set a job's input This string will be sent to then the job is submitted.
Definition: grid_client.cpp:88
void Submit(const string &job_group=kEmptyStr)
Submit a batch to the queue.
CNetScheduleAPI::EJobStatus WaitForJob(const string &job_key, unsigned wait_time)
Wait for job to finish its execution (done/failed/canceled).
void SetJobMask(CNetScheduleAPI::TJobMask mask)
Set a job mask.
string output
Job result data.
const string & GetJobInput()
Get a job's input sting.
bool m_JobDetailsRead
void RemoveDataBlob(const string &)
Definition: grid_client.cpp:74
void GetProgressMsg(CNetScheduleJob &job)
Get progress message.
void SetJobAffinity(const string &affinity)
Set a job affinity.
void SubmitJobBatch(vector< CNetScheduleJob > &jobs, const string &job_group=kEmptyStr)
Submit job batch.
CGridClient & m_GridClient
void SetJobKey(const string &job_key)
bool m_AutoCleanUp
const string & GetErrorMessage()
If something bad has happened this method will return an explanation.
CNetScheduleAPI::EJobStatus GetStatus()
Get a job status.
CGridJobBatchSubmitter(CGridClient &)
CGridClient(CNetScheduleSubmitter::TInstance ns_submitter, IBlobStorage &storage, ECleanUp cleanup, EProgressMsg progress_msg)
Constructor.
Definition: grid_client.cpp:44
CNetScheduleSubmitter GetNetScheduleSubmitter()
void CloseStream()
Definition: grid_client.cpp:98
CNcbiIstream & GetIStream()
Get a stream with a job's result.
string GetProgressMessage()
Get a job interim message.
int ret_code
Job return code.
CNetScheduleAPI::EJobStatus WaitForJob(const string &job_id, unsigned wait_time)
Wait for job to finish its execution (done/failed/canceled).
string input
Input data.
void CancelJob(const string &job_key)
Cancel Job.
Definition: grid_client.cpp:70
CNetScheduleAPI::EJobStatus GetJobDetails(CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get full information about the specified job.
string Submit(const string &affinity=kEmptyStr)
Submit a job to the queue.
CNetCacheAPI & GetNetCacheAPI()
int GetReturnCode()
Get a job's return code.
SGridWrite m_GridWrite
CGridJobBatchSubmitter & GetJobBatchSubmitter()
Get a job submitter.
Definition: grid_client.cpp:64
bool x_ProlongJobFieldLifetime(const string &job_field, unsigned ttl)
CNetScheduleJob m_Job
void CheckIfBatchSubmittedAndPrepareNextJob()
bool m_UseProgress
void x_ProlongBlobLifetime(const string &blob_key, unsigned ttl)
CNcbiOstream & GetOStream()
Get a stream where a client can write an input data for the remote job.
Definition: grid_client.cpp:93
const string & GetJobOutput()
Get a job's output string.
vector< CNetScheduleJob > m_Jobs
CNetScheduleSubmitter m_NetScheduleSubmitter
CGridJobBatchSubmitter m_JobBatchSubmitter
void x_RenewAllJobBlobs(unsigned ttl)
CNetScheduleAPI::EJobStatus x_CheckAllJobBlobs(CNetScheduleAPI::EJobStatus status, time_t job_exptime)
string SubmitJob(CNetScheduleNewJob &job)
Submit job.
size_t GetMaxServerInputSize()
Definition: grid_client.cpp:80
void SetJobInput(const string &input)
Set a job's input This string will be sent to then the job is submitted.
string job_id
Output job key.
void CancelJob(const string &job_key)
Cancel job.
CNcbiOstream & GetOStream()
Get a stream where a client can write an input data for the remote job.
size_t m_BlobSize
@ eDone
Job is ready (computed successfully)
@ eCanceled
Explicitly canceled.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
IO_PREFIX::istream CNcbiIstream
Portable alias for istream.
Definition: ncbistre.hpp:146
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
Definition: ncbistr.hpp:5412
static int input()
static pcre_uint8 * buffer
Definition: pcretest.c:1051
Job description.
void Reset()
void Reset(bool flush=false)
const CNetScheduleAPI::SServerParams & GetServerParams()
CNetScheduleAPI::EJobStatus SubmitJobAndWait(CNetScheduleJob &job, unsigned wait_time, time_t *job_exptime=NULL)
Modified on Sat Apr 20 12:20:13 2024 by modify_doxy.py rev. 669887