39 #define NCBI_USE_ERRCODE_X ConnServ_NetSchedule
62 if (field->name ==
"job_key") {
63 job_bits |= (1 << eJobKey);
66 }
else if (field->name ==
"input") {
67 job_bits |= (1 << eJobInput);
68 job.
input = field->value;
70 }
else if (field->name ==
"auth_token") {
71 job_bits |= (1 << eJobAuthToken);
74 }
else if (field->name ==
"affinity") {
75 job_bits |= (1 << eJobAffinity);
78 }
else if (field->name ==
"client_ip") {
79 job_bits |= (1 << eClientIP);
82 }
else if (field->name ==
"client_sid") {
83 job_bits |= (1 << eClientSessionID);
86 }
else if (field->name ==
"mask") {
87 job_bits |= (1 << eJobMask);
88 job.
mask = atoi(field->value.c_str());
90 }
else if (field->name ==
"ncbi_phid") {
91 job_bits |= (1 << ePageHitID);
95 if (job_bits == (1 << eNumberOfJobBits) - 1)
98 return !job.
job_id.empty();
105 if (response.empty())
113 "Cannot parse server output for GET2:\n" + response);
119 unsigned runtime_inc)
125 m_Impl->m_API->ExecOnJobServer(job,
cmd,
eOn);
133 m_GetCmd(get_cmd), m_Job(job), m_Executor(executor)
147 return m_Executor->ExecGET(server, m_GetCmd, m_Job);
152 return m_Impl->m_API->GetServerParams();
157 m_Impl->m_API->x_ClearNode();
163 m_Impl->m_AffinityPreference = aff_pref;
168 m_Impl->m_JobGroup = job_group;
172 CNetServer orig_server,
const string& affinity)
180 if (m_PreferredAffinities.find(affinity) == m_PreferredAffinities.end()) {
181 m_PreferredAffinities.insert(affinity);
182 string new_preferred_aff_cmd =
"CHAFF add=" + affinity;
185 m_API->m_Service.ExcludeServer(orig_server); it; ++it)
187 (*it).ExecWithRetry(new_preferred_aff_cmd,
false);
190 ERR_POST(
"Error while notifying " <<
191 (*it).GetServerAddress() <<
192 " of a new affinity " << e);
194 CFastMutexGuard sync_guard(m_API->m_SharedData->m_AffinitySubmissionMutex);
206 string cmd(
"SETAFF aff=\"");
207 const char* sep =
"";
228 exec_result,
NULL, &get_cmd_listener);
244 exec_result,
NULL, &get_cmd_listener);
255 ClaimNewPreferredAffinity(server, job.
affinity);
266 const auto affinity_preference = any_affinity ? m_AffinityPreference :
270 const bool have_affinities = !prio_aff_list.empty();
272 if (have_affinities)
cmd +=
" aff=" + prio_aff_list;
274 m_NotificationHandler.CmdAppendTimeoutGroupAndClientInfo(
cmd,
275 &timeout, m_JobGroup);
277 if (have_affinities)
cmd +=
" prioritized_aff=1";
279 return ExecGET(server,
cmd, job);
283 const string& affinity_list,
287 m_Impl->m_AffinityPreference, affinity_list));
289 string cmd(base_cmd);
290 m_Impl->m_NotificationHandler.CmdAppendTimeoutGroupAndClientInfo(
291 cmd, deadline, m_Impl->m_JobGroup);
292 if (m_Impl->m_NotificationHandler.RequestJob(m_Impl, job,
cmd))
295 if (deadline ==
NULL)
298 while (m_Impl->m_NotificationHandler.WaitForNotification(*deadline)) {
300 if (m_Impl->m_NotificationHandler.CheckRequestJobNotification(m_Impl,
302 cmd.erase(base_cmd.length());
303 m_Impl->m_NotificationHandler.CmdAppendTimeoutGroupAndClientInfo(
304 cmd, deadline, m_Impl->m_JobGroup);
312 string cancel_wget_cmd(
"CWGET");
315 m_Impl->m_API->m_Service.ExcludeServer(server); it; ++it)
316 (*it).ExecWithRetry(cancel_wget_cmd,
false);
320 m_Impl->ClaimNewPreferredAffinity(server, job.
affinity);
332 const string& affinity_list)
335 return GetJob(job, affinity_list);
339 return GetJob(job, affinity_list, &deadline);
345 switch (affinity_preference) {
347 return "GET2 wnode_aff=1 any_aff=1";
350 return "GET2 wnode_aff=1 any_aff=0";
353 return "GET2 wnode_aff=1 any_aff=0 exclusive_new_aff=1";
356 return "GET2 wnode_aff=0 any_aff=1";
359 return "GET2 wnode_aff=0 any_aff=0";
365 const string& affinity_list)
369 if (!affinity_list.empty()) {
370 list<CTempString> affinity_tokens;
375 ITERATE(list<CTempString>, token, affinity_tokens) {
376 limits::Check<limits::SAffinity>(*token);
380 cmd += affinity_list;
387 string&
cmd,
const CDeadline* deadline,
const string& job_group)
389 if (deadline !=
NULL) {
390 unsigned remaining_seconds = (unsigned)
393 if (remaining_seconds > 0) {
402 if (!job_group.empty()) {
424 string cancel_wget_cmd(
"CWGET");
428 (*it).ExecWithRetry(cancel_wget_cmd,
false);
446 if (
output.length() > max_output_size) {
448 "Output data too long.");
455 m_Impl->m_API->GetServerParams().max_output_size);
457 string cmd(
"PUT2 job_key=" + job.
job_id);
459 limits::Check<limits::SAuthToken>(job.
auth_token);
460 cmd.append(
" auth_token=");
463 cmd.append(
" job_return_code=");
466 cmd.append(
" output=\"");
471 m_Impl->m_API->ExecOnJobServer(job,
cmd, m_Impl->retry_on_exception);
478 "Progress message too long");
485 m_Impl->m_API->ExecOnJobServer(job,
cmd, m_Impl->retry_on_exception);
490 m_Impl->m_API.GetProgressMsg(job);
497 m_Impl->m_API->GetServerParams().max_output_size);
501 "Error message too long");
504 string cmd(
"FPUT2 job_key=" + job.
job_id);
506 limits::Check<limits::SAuthToken>(job.
auth_token);
507 cmd.append(
" auth_token=");
510 cmd.append(
" err_msg=\"");
513 cmd.append(
"\" output=\"");
516 cmd.append(
"\" job_return_code=");
522 cmd.append(
" no_retries=1");
524 m_Impl->m_API->ExecOnJobServer(job,
cmd, m_Impl->retry_on_exception);
529 string cmd(
"RESCHEDULE job_key=" + job.
job_id);
531 limits::Check<limits::SAuthToken>(job.
auth_token);
532 cmd +=
" auth_token=";
537 limits::Check<limits::SAffinity>(job.
affinity);
542 if (!job.
group.empty()) {
544 limits::Check<limits::SJobGroup>(job.
group);
550 m_Impl->m_API->ExecOnJobServer(job,
cmd, m_Impl->retry_on_exception);
557 return m_Impl->m_API->GetJobStatus(
"WST2", job, job_exptime, pause_mode);
563 string cmd(
"RETURN2 job_key=" + job.
job_id);
565 limits::Check<limits::SAuthToken>(job.
auth_token);
566 cmd.append(
" auth_token=");
570 cmd.append(
" blacklist=0");
574 m_API->ExecOnJobServer(job,
cmd, retry_on_exception);
579 m_Impl->ReturnJob(job);
583 const vector<string>* affs,
586 if (affs ==
NULL || affs->empty())
589 const char* sep = action == eAddAffs ?
" add=\"" :
" del=\"";
591 ITERATE(vector<string>, aff, *affs) {
593 limits::Check<limits::SAffinity>(*aff);
601 if (action == eAddAffs)
602 ITERATE(vector<string>, aff, *affs) {
605 "Affinity '-' cannot be added as a preferred one.");
608 m_PreferredAffinities.insert(*aff);
611 ITERATE(vector<string>, aff, *affs) {
612 m_PreferredAffinities.erase(*aff);
619 const vector<string>* affs_to_add,
const vector<string>* affs_to_delete)
623 if (m_Impl->AppendAffinityTokens(
cmd, affs_to_add,
625 m_Impl->AppendAffinityTokens(
cmd, affs_to_delete,
629 m_Impl->m_API->m_Service.ExecOnAllServers(
cmd);
635 return m_Impl->m_API.GetQueueName();
640 return m_Impl->m_API->m_Service->GetClientName();
645 return m_Impl->m_API->m_Service.GetServiceName();
651 switch (m_Executor->m_AffinityPreference) {
658 CFastMutexGuard guard(m_Executor->m_API->m_SharedData->m_AffinitySubmissionMutex);
662 conn.Exec(m_Executor->MkSETAFFCmd(),
false);
virtual bool Consider(CNetServer server)
CGetJobCmdExecutor(const string &get_cmd, CNetScheduleJob &job, SNetScheduleExecutorImpl *executor)
SNetScheduleExecutorImpl * m_Executor
NetSchedule internal exception.
virtual void OnExec(CNetServerConnection::TInstance conn_impl, const string &cmd)
void CmdAppendTimeoutGroupAndClientInfo(string &cmd, const CDeadline *deadline, const string &job_group)
bool CheckRequestJobNotification(CNetScheduleExecutor::TInstance executor, CNetServer *server)
bool RequestJob(CNetScheduleExecutor::TInstance executor, CNetScheduleJob &job, const string &cmd)
static string MkBaseGETCmd(CNetScheduleExecutor::EJobAffinityPreference affinity_preference, const string &affinity_list)
SExecResult ExecWithRetry(const string &cmd, bool multiline_output=false)
Execute remote command 'cmd', wait for the reply, check if it starts with 'OK:', and return the remai...
CNetServiceIterator FindServer(INetServerFinder *finder, EIterationMode mode=eSortByLoad)
static CS_CONNECTION * conn
static SQLCHAR output[256]
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
TErrCode GetErrCode(void) const
Get error code.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
EJobStatus
Job status codes.
string output
Job result data.
CNetScheduleAPI::EJobStatus GetJobStatus(const CNetScheduleJob &job, time_t *job_exptime=NULL, ENetScheduleQueuePauseMode *pause_mode=NULL)
Get the current status of the specified job.
const string & GetQueueName()
Return Queue name.
const string & GetClientName()
const string & GetServiceName()
void ReturnJob(const CNetScheduleJob &job)
Switch the job back to the "Pending" status so that it can be run again on a different worker node.
void ClearNode()
Unregister client-listener.
void PutResult(const CNetScheduleJob &job)
Put job result (job should be received by GetJob() or WaitJob())
CNetScheduleAPI::TJobMask mask
int ret_code
Job return code.
const CNetScheduleAPI::SServerParams & GetServerParams()
Retrieve queue parameters from the server.
void SetJobGroup(const string &job_group)
Retrieve jobs from the specified group only.
const string & GetQueueName() const
Return Queue name.
CNetServer server
The server the job belongs to.
void JobDelayExpiration(const CNetScheduleJob &job, unsigned runtime_inc)
Increment job execution timeout.
EJobAffinityPreference
Affinity matching modes.
void SetAffinityPreference(EJobAffinityPreference aff_pref)
Set preferred method of requesting jobs with affinities.
void ChangePreferredAffinities(const vector< string > *affs_to_add, const vector< string > *affs_to_delete)
void PutProgressMsg(const CNetScheduleJob &job)
Put job interim (progress) message.
bool GetJob(CNetScheduleJob &job, const string &affinity_list=kEmptyStr, CDeadline *dealine=NULL)
Get a pending job.
void Reschedule(const CNetScheduleJob &job)
Reschedule a job with new affinity and/or group information.
void GetProgressMsg(CNetScheduleJob &job)
Get progress message.
void PutFailure(const CNetScheduleJob &job, bool no_retries=false)
Submit job failure diagnostics.
string job_id
Output job key.
@ eExplicitAffinitiesOnly
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
@ fSplit_MergeDelimiters
Merge adjacent delimiters.
unsigned short GetPort() const
Get the listening port number back.
CNanoTimeout GetRemainingTime(void) const
Get time left to the expiration.
double GetAsDouble(void) const
Get as number of seconds (fractional value).
const TArgs & GetArgs(void) const
Get the const list of arguments.
ENetScheduleQueuePauseMode
Defines whether the job queue is paused, and if so, defines the pause mode set by the administrator.
bool s_DoParseGet2JobResponse(CNetScheduleJob &job, const string &response)
string s_GET2(CNetScheduleExecutor::EJobAffinityPreference affinity_preference)
bool s_ParseGetJobResponse(CNetScheduleJob &job, const string &response)
static void s_CheckOutputSize(const string &output, size_t max_output_size)
const unsigned int kNetScheduleMaxDBDataSize
const unsigned int kNetScheduleMaxDBErrSize
void g_AppendClientIPSessionIDHitID(string &cmd)
bool GetServerByNode(const string &ns_node, CNetServer *server)
bool ExecGET(SNetServerImpl *server, const string &get_cmd, CNetScheduleJob &job)
int AppendAffinityTokens(string &cmd, const vector< string > *affs, EChangeAffAction action)
void ReturnJob(const CNetScheduleJob &job, bool blacklist=true)
bool x_GetJobWithAffinityLadder(SNetServerImpl *server, const CDeadline &timeout, const string &prio_aff_list, bool any_affinity, CNetScheduleJob &job)
void ClaimNewPreferredAffinity(CNetServer orig_server, const string &affinity)
CRef< TProperties > Get()
CNetServer::SExecResult ConnectAndExec(const string &cmd, bool multiline_output, bool retry_on_exception=false)