1 #ifndef CONN_SERVICES___NETSCHEDULE_API_IMPL__HPP
2 #define CONN_SERVICES___NETSCHEDULE_API_IMPL__HPP
53 namespace netschedule {
58 static string Name() {
return "client node ID"; }
62 return isalnum(c) || c ==
'_' || c ==
'-' || c ==
'.' || c ==
':' || c ==
'@' || c ==
'|';
68 static string Name() {
return "client session ID"; }
72 return isalnum(c) || c ==
'_' || c ==
'-' || c ==
'.' || c ==
':' || c ==
'@' || c ==
'|';
78 static string Name() {
return "queue name"; }
84 if (s.front() ==
'_') {
95 static string Name() {
return "job group name"; }
102 static string Name() {
return "affinity token"; }
109 static string Name() {
return "security token"; }
116 template <
class TValue>
119 if (TValue::IsValidValue(
value))
return;
121 auto it = find_if_not(
value.begin(),
value.end(), TValue::IsValidChar);
123 if (it !=
value.end()) {
165 bool Transform(
const string&
prefix,
string& name)
const;
170 enum {
eOff, eImplicit, eExplicit } m_Mode;
180 m_SharedData(shared_data)
188 TPropCreator GetPropCreator()
const override;
194 void OnErrorImpl(
const string& err_msg,
CNetServer& server)
override;
195 void OnWarningImpl(
const string& warn_msg,
CNetServer& server)
override;
218 m_NotificationSemaphore(0, 1),
228 void InterruptWait();
230 void RegisterServer(
const string& ns_node);
232 bool GetNextNotification(
string* ns_node);
238 m_Interrupted =
false;
239 m_NotificationSemaphore.TryWait();
265 ENotificationType CheckNotification(
string* ns_node);
267 virtual void*
Main();
269 unsigned short GetPort()
const {
return m_Receiver.port; }
271 const string&
GetMessage()
const {
return m_Receiver.message; }
273 void CmdAppendPortAndTimeout(
string*
cmd,
unsigned remaining_seconds);
289 fWnCompatible = (0 << 0),
290 fNonWnCompatible = (1 << 0),
291 fConfigLoading = (1 << 1),
292 fWorkerNode = fWnCompatible,
293 fNetSchedule = fNonWnCompatible,
299 if (wn)
return fWorkerNode;
300 if (try_config)
return fNetSchedule | fConfigLoading;
307 const string& queue_name =
kEmptyStr,
bool wn =
false,
bool try_config =
true);
317 m_Service->m_Listener.GetPointer());
327 void GetQueueParams(
const string& queue_name,
TQueueParams& queue_params);
333 return m_Service.GetServer(nskey.
host, nskey.
port);
341 template <
class TJob>
344 auto server = GetServer(job);
345 auto retry_on_exception = (roe ==
eDefault) ? m_RetryOnException : (roe ==
eOn);
346 return server->ConnectAndExec(
cmd,
false, retry_on_exception).response;
349 bool GetServerByNode(
const string& ns_node,
CNetServer* server);
351 void AllocNotificationThread();
352 void StartNotificationThread();
359 void UpdateAuthString();
360 void UseOldStyleAuth();
361 void SetAuthParam(
const string& param_name,
const string& param_value);
365 string MakeAuthString();
392 constexpr
static long kAskMaxCount = 100;
393 } m_ServerParamsSync;
401 unsigned m_JobTtl = 0;
420 void FinalizeRead(
const char* cmd_start,
421 const string& job_id,
422 const string& auth_token,
423 const string& error_message);
426 unsigned wait_time, time_t* job_exptime =
NULL);
428 void AppendClientIPSessionIDHitID(
string&
cmd,
const string& job_group);
452 const string& affinity);
458 const string& prio_aff_list,
503 const string& group,
const string& affinity) :
504 m_Impl(ns_api_impl, group, affinity),
525 const string& group,
const string& affinity) :
532 limits::Check<limits::SJobGroup>(group);
533 limits::Check<limits::SAffinity>(affinity);
542 const string& prio_aff_list,
static CRef< CScope > m_Scope
CAtomicCounter_WithAutoInit –.
Pool of recycled CCompoundID objects.
Client API for NCBI NetSchedule server.
const SRegSynonyms & m_Sections
CSynRegistry & m_Registry
CNetScheduleGETCmdListener(SNetScheduleExecutorImpl *executor)
SNetScheduleExecutorImpl * m_Executor
virtual void OnExec(CNetServerConnection::TInstance conn_impl, const string &cmd)
CRef< SNetScheduleSharedData > m_SharedData
CNetScheduleServerListener(bool non_wn, SNetScheduleSharedData *shared_data)
void SetAuthString(const string &auth)
CTimeout – Timeout interval.
CNetServer WaitForNotifications(const CDeadline &deadline)
bool CheckEntry(SEntry &entry, const string &prio_aff_list, bool any_affinity, CNetScheduleJob &job, CNetScheduleAPI::EJobStatus *job_status)
void ReturnJob(CNetScheduleJob &job)
bool MoreJobs(const SEntry &entry)
CNetServer ReadNotifications()
CImpl(CNetScheduleAPI::TInstance ns_api_impl, const string &group, const string &affinity)
const_iterator end() const
int Main(int argc, const char *argv[])
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
EJobStatus
Job status codes.
CNetServer server
The server the job belongs to.
unsigned short port
TCP/IP port number.
EJobAffinityPreference
Affinity matching modes.
EReadNextJobResult
Possible outcomes of ReadNextJob() calls.
string job_id
Output job key.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
CNanoTimeout GetRemainingTime(void) const
Get time left to the expiration.
enum ENcbiSwitch ESwitch
Aux.
Definition of all error codes used in connect services library (xconnserv.lib and others).
void Check(const string &value)
void ThrowIllegalChar(const string &name, const string &value, char c)
NetSchedule client specs.
ENetScheduleQueuePauseMode
Defines whether the job queue is paused, and if so, defines the pause mode set by the administrator.
void copy(Njn::Matrix< S > *matrix_, const Njn::Matrix< T > &matrix0_)
static const TDS_WORD limits[]
static const char * prefix[]
Defines CRequestContext class for NCBI C++ diagnostic API.
vector< pair< string, string > > TAffinityLadder
Meaningful information encoded in the NetSchedule key.
CNetScheduleAPI::SServerParams m_ServerParams
CRef< SNetScheduleSharedData > m_SharedData
CNetScheduleAPI::TQueueParams TQueueParams
CNetScheduleGetJob::TAffinityLadder m_AffinityLadder
CCompoundIDPool GetCompoundIDPool()
CFastMutex m_NotificationThreadMutex
static TMode GetMode(bool wn, bool try_config)
void StartNotificationThread()
CNetServer GetServer(const CNetScheduleJob &job)
CNetScheduleServerListener * GetListener()
CAtomicCounter_WithAutoInit m_NotificationThreadStartStopCounter
bool m_UseEmbeddedStorage
CCompoundIDPool m_CompoundIDPool
string ExecOnJobServer(const TJob &job, const string &cmd, ESwitch roe=eDefault)
const CNetScheduleAPI::SServerParams & GetServerParams()
list< string > m_AffinityList
map< string, string > TAuthParams
CRef< SNetScheduleNotificationThread > m_NotificationThread
CNetServer GetServer(const string &job_key)
SNetScheduleAdminImpl(CNetScheduleAPI::TInstance ns_api_impl)
ESwitch retry_on_exception
SNetScheduleExecutorImpl(CNetScheduleAPI::TInstance ns_api_impl)
bool ExecGET(SNetServerImpl *server, const string &get_cmd, CNetScheduleJob &job)
CNetScheduleExecutor::EJobAffinityPreference m_AffinityPreference
CNetScheduleNotificationHandler m_NotificationHandler
int AppendAffinityTokens(string &cmd, const vector< string > *affs, EChangeAffAction action)
void ReturnJob(const CNetScheduleJob &job, bool blacklist=true)
CFastMutex m_PreferredAffMutex
set< string > m_PreferredAffinities
bool x_GetJobWithAffinityLadder(SNetServerImpl *server, const CDeadline &timeout, const string &prio_aff_list, bool any_affinity, CNetScheduleJob &job)
void ClaimNewPreferredAffinity(CNetServer orig_server, const string &affinity)
void x_StartNotificationThread()
CNetScheduleGetJobImpl< CImpl > m_Timeline
CNetScheduleJobReader::EReadNextJobResult ReadNextJob(CNetScheduleJob *job, CNetScheduleAPI::EJobStatus *job_status, const CTimeout *timeout)
SNetScheduleJobReaderImpl(CNetScheduleAPI::TInstance ns_api_impl, const string &group, const string &affinity)
SServerNotifications m_GetNotifications
unsigned short GetPort() const
SNetScheduleAPIImpl * m_API
const string & GetMessage() const
SNetScheduleNotificationReceiver m_Receiver
SServerNotifications m_ReadNotifications
SNetScheduleServerProperties()
map< string, SNetServerInPool * > TServerByNode
CFastMutex m_ServerByNodeMutex
CFastMutex m_AffinitySubmissionMutex
TServerByNode m_ServerByNode
SNetScheduleSubmitterImpl(CNetScheduleAPI::TInstance ns_api_impl)
void x_ClearInterruptFlag()
bool Wait(const CDeadline &deadline)
TReadyServers m_ReadyServers
set< string > TReadyServers
CSemaphore m_NotificationSemaphore
static bool IsValidValue(const string &s)
static bool IsValidChar(char c)
static bool IsValidValue(const string &)
static bool IsValidChar(char c)
static bool IsValidValue(const string &)
static bool IsValidChar(char c)
static bool IsValidValue(const string &)
static bool IsValidChar(char c)
static bool IsValidValue(const string &s)
static bool IsValidChar(char c)
static bool IsValidChar(char c)
static bool IsValidValue(const string &s)