NCBI C++ ToolKit
netschedule_api_impl.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef CONN_SERVICES___NETSCHEDULE_API_IMPL__HPP
2 #define CONN_SERVICES___NETSCHEDULE_API_IMPL__HPP
3 
4 /* $Id: netschedule_api_impl.hpp 87801 2019-10-04 15:52:36Z sadyrovr $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Anatoliy Kuznetsov, Maxim Didenko, Victor Joukov, Dmitry Kazimirov
30  *
31  * File Description:
32  * NetSchedule client API implementation details.
33  *
34  */
35 
36 #include "netservice_api_impl.hpp"
38 
39 #include <corelib/request_ctx.hpp>
40 
43 
44 #include <list>
45 #include <map>
46 #include <vector>
47 #include <algorithm>
48 
49 
51 
52 namespace grid {
53 namespace netschedule {
54 namespace limits {
55 
57 {
58  static string Name() { return "client node ID"; }
59  static bool IsValidValue(const string&) { return false; }
60  static bool IsValidChar(char c)
61  {
62  return isalnum(c) || c == '_' || c == '-' || c == '.' || c == ':' || c == '@' || c == '|';
63  }
64 };
65 
67 {
68  static string Name() { return "client session ID"; }
69  static bool IsValidValue(const string&) { return false; }
70  static bool IsValidChar(char c)
71  {
72  return isalnum(c) || c == '_' || c == '-' || c == '.' || c == ':' || c == '@' || c == '|';
73  }
74 };
75 
76 struct SQueueName
77 {
78  static string Name() { return "queue name"; }
79  static bool IsValidValue(const string& s)
80  {
81  if (s.empty()) {
82  NCBI_THROW(CConfigException, eParameterMissing, "Queue name cannot be empty.");
83  }
84  if (s.front() == '_') {
85  NCBI_THROW(CConfigException, eInvalidParameter, "Queue name cannot start with underscore character.");
86  }
87 
88  return false;
89  }
90  static bool IsValidChar(char c) { return isalnum(c) || c == '_' || c == '-'; }
91 };
92 
93 struct SJobGroup
94 {
95  static string Name() { return "job group name"; }
96  static bool IsValidValue(const string& s) { return s == "-"; }
97  static bool IsValidChar(char c) { return isalnum(c) || c == '_' || c == '.'; }
98 };
99 
100 struct SAffinity
101 {
102  static string Name() { return "affinity token"; }
103  static bool IsValidValue(const string& s) { return s == "-"; }
104  static bool IsValidChar(char c) { return isalnum(c) || c == '_' || c == '.'; }
105 };
106 
107 struct SAuthToken
108 {
109  static string Name() { return "security token"; }
110  static bool IsValidValue(const string&) { return false; }
111  static bool IsValidChar(char c) { return isalnum(c) || c == '_' || c == '.'; }
112 };
113 
114 void ThrowIllegalChar(const string&, const string&, char);
115 
116 template <class TValue>
117 void Check(const string& value)
118 {
119  if (TValue::IsValidValue(value)) return;
120 
121  auto it = find_if_not(value.begin(), value.end(), TValue::IsValidChar);
122 
123  if (it != value.end()) {
124  ThrowIllegalChar(TValue::Name(), value, *it);
125  }
126 }
127 
128 }
129 }
130 }
131 
132 using namespace grid::netschedule;
133 
134 
135 ////////////////////////////////////////////////////////////////////////////////
136 //
137 
139 {
141  affs_synced(false)
142  {
143  }
144 
145  string ns_node;
146  string ns_session;
147 
148  // Warning:
149  // Version is not set until we execute a command on a server.
150  // Therefore, if that command is version dependent,
151  // old version of the command will be sent to the server at first.
153 
155 };
156 
158 {
159 public:
160  CNetScheduleConfigLoader(CSynRegistry& registry, SRegSynonyms& sections, bool ns_conf = true);
161 
162  bool operator()(SNetScheduleAPIImpl* impl);
163 
164 private:
165  bool Transform(const string& prefix, string& name) const;
166 
169  const bool m_NsConf;
170  enum { eOff, eImplicit, eExplicit } m_Mode;
171 };
172 
174 
176 {
177 public:
179  m_NonWn(non_wn),
180  m_SharedData(shared_data)
181  {
182  _ASSERT(shared_data);
183  }
184 
185  void SetAuthString(const string& auth) { m_Auth = auth; }
186  string& Scope() { return m_Scope; }
187 
188  TPropCreator GetPropCreator() const override;
189  INetServerConnectionListener* Clone() override;
190 
191  void OnConnected(CNetServerConnection& connection) override;
192 
193 private:
194  void OnErrorImpl(const string& err_msg, CNetServer& server) override;
195  void OnWarningImpl(const string& warn_msg, CNetServer& server) override;
196 
197  string m_Auth;
198  const bool m_NonWn;
199  string m_Scope;
201 };
202 
204 {
208 
209  // Make sure the worker node does not attempt to submit its
210  // preferred affinities from two threads.
212 };
213 
214 // Structure that governs NetSchedule server notifications.
216 {
218  m_NotificationSemaphore(0, 1),
219  m_Interrupted(false)
220  {
221  }
222 
223  bool Wait(const CDeadline& deadline)
224  {
225  return m_NotificationSemaphore.TryWait(deadline.GetRemainingTime());
226  }
227 
228  void InterruptWait();
229 
230  void RegisterServer(const string& ns_node);
231 
232  bool GetNextNotification(string* ns_node);
233 
234 private:
236  {
237  if (m_Interrupted) {
238  m_Interrupted = false;
239  m_NotificationSemaphore.TryWait();
240  }
241  }
242  // Semaphore that the worker node or the job reader can wait on.
243  // If count=0 then m_ReadyServers is empty and m_Interrupted=false;
244  // if count=1 then at least one server is ready or m_Interrupted=true.
246  // Protection against concurrent access to m_ReadyServers.
248  // A set of NetSchedule node IDs of the servers that are ready.
249  // (i.e. the servers have sent notifications).
252  // This flag is set when the wait must be interrupted.
254 };
255 
257 {
259 
264  };
265  ENotificationType CheckNotification(string* ns_node);
266 
267  virtual void* Main();
268 
269  unsigned short GetPort() const { return m_Receiver.port; }
270 
271  const string& GetMessage() const { return m_Receiver.message; }
272 
273  void CmdAppendPortAndTimeout(string* cmd, unsigned remaining_seconds);
274 
276 
278 
280 
283 };
284 
286 {
287 private:
288  enum EMode {
289  fWnCompatible = (0 << 0),
290  fNonWnCompatible = (1 << 0),
291  fConfigLoading = (1 << 1),
292  fWorkerNode = fWnCompatible,
293  fNetSchedule = fNonWnCompatible,
294  };
295  typedef int TMode;
296 
297  static TMode GetMode(bool wn, bool try_config)
298  {
299  if (wn) return fWorkerNode;
300  if (try_config) return fNetSchedule | fConfigLoading;
301  return fNetSchedule;
302  }
303 
304 public:
305  SNetScheduleAPIImpl(CSynRegistryBuilder registry_builder, const string& section,
306  const string& service_name = kEmptyStr, const string& client_name = kEmptyStr,
307  const string& queue_name = kEmptyStr, bool wn = false, bool try_config = true);
308 
309  // Special constructor for CNetScheduleAPI::GetServer().
311 
313 
315  {
316  return static_cast<CNetScheduleServerListener*>(
317  m_Service->m_Listener.GetPointer());
318  }
319 
320  CNetScheduleAPI::EJobStatus GetJobStatus(string cmd,
321  const CNetScheduleJob& job, time_t* job_exptime,
322  ENetScheduleQueuePauseMode* pause_mode);
323 
324  const CNetScheduleAPI::SServerParams& GetServerParams() { return m_ServerParamsSync(m_Service, m_Queue); }
325 
327  void GetQueueParams(const string& queue_name, TQueueParams& queue_params);
328  void GetQueueParams(TQueueParams& queue_params);
329 
330  CNetServer GetServer(const string& job_key)
331  {
332  CNetScheduleKey nskey(job_key, m_CompoundIDPool);
333  return m_Service.GetServer(nskey.host, nskey.port);
334  }
335 
337  {
338  return job.server != NULL ? job.server : GetServer(job.job_id);
339  }
340 
341  template <class TJob>
342  string ExecOnJobServer(const TJob& job, const string& cmd, ESwitch roe = eDefault)
343  {
344  auto server = GetServer(job);
345  auto retry_on_exception = (roe == eDefault) ? m_RetryOnException : (roe == eOn);
346  return server->ConnectAndExec(cmd, false, retry_on_exception).response;
347  }
348 
349  bool GetServerByNode(const string& ns_node, CNetServer* server);
350 
351  void AllocNotificationThread();
352  void StartNotificationThread();
353 
354  // Unregister client-listener. After this call, the
355  // server will not try to send any notification messages or
356  // maintain job affinity for the client.
357  void x_ClearNode();
358 
359  void UpdateAuthString();
360  void UseOldStyleAuth();
361  void SetAuthParam(const string& param_name, const string& param_value);
362  CCompoundIDPool GetCompoundIDPool() { return m_CompoundIDPool; }
363  void Init(CSynRegistry& registry, SRegSynonyms& sections);
364  void InitAffinities(CSynRegistry& registry, const SRegSynonyms& sections);
365  string MakeAuthString();
366 
367 private:
368  const TMode m_Mode;
369 
370 public:
375 
376  string m_Queue;
378  string m_ClientNode;
380 
383 
385  {
386  const CNetScheduleAPI::SServerParams& operator()(CNetService& service, const string& queue);
387 
388  private:
391  long m_AskCount = 0;
392  constexpr static long kAskMaxCount = 100;
393  } m_ServerParamsSync;
394 
396  list<string> m_AffinityList;
397 
399 
400  string m_JobGroup;
401  unsigned m_JobTtl = 0;
402 
404 
406 
410 };
411 
412 
414 {
416 
417  string SubmitJobImpl(CNetScheduleNewJob& job, unsigned short udp_port,
418  unsigned wait_time, CNetServer* server = NULL);
419 
420  void FinalizeRead(const char* cmd_start,
421  const string& job_id,
422  const string& auth_token,
423  const string& error_message);
424 
425  CNetScheduleAPI::EJobStatus SubmitJobAndWait(CNetScheduleJob& job,
426  unsigned wait_time, time_t* job_exptime = NULL);
427 
428  void AppendClientIPSessionIDHitID(string& cmd, const string& job_group);
429 
431 };
432 
434  CNetScheduleAPI::TInstance ns_api_impl) :
435  m_API(ns_api_impl)
436 {
437 }
438 
440 {
442  m_API(ns_api_impl),
444  m_JobGroup(ns_api_impl->m_JobGroup)
445  {
446  copy(ns_api_impl->m_AffinityList.begin(),
447  ns_api_impl->m_AffinityList.end(),
449  }
450 
451  void ClaimNewPreferredAffinity(CNetServer orig_server,
452  const string& affinity);
453  string MkSETAFFCmd();
454  bool ExecGET(SNetServerImpl* server,
455  const string& get_cmd, CNetScheduleJob& job);
457  const CDeadline& timeout,
458  const string& prio_aff_list,
459  bool any_affinity,
460  CNetScheduleJob& job);
461 
462  void ReturnJob(const CNetScheduleJob& job, bool blacklist = true);
463 
467  };
468  int AppendAffinityTokens(string& cmd,
469  const vector<string>* affs, EChangeAffAction action);
470 
472 
474 
476 
479 
480  string m_JobGroup;
482 };
483 
485 {
486 public:
488  m_Executor(executor)
489  {
490  }
491 
492  virtual void OnExec(CNetServerConnection::TInstance conn_impl,
493  const string& cmd);
494 
496 };
497 
498 const unsigned s_Timeout = 10;
499 
501 {
503  const string& group, const string& affinity) :
504  m_Impl(ns_api_impl, group, affinity),
506  {
507  }
508 
510  {
512  }
513 
515  CNetScheduleJob* job,
516  CNetScheduleAPI::EJobStatus* job_status,
517  const CTimeout* timeout);
518  void InterruptReading();
519 
520 private:
521  class CImpl : public CNetScheduleGetJob
522  {
523  public:
525  const string& group, const string& affinity) :
526  m_API(ns_api_impl),
528  m_JobGroup(group),
529  m_Affinity(affinity),
531  {
532  limits::Check<limits::SJobGroup>(group);
533  limits::Check<limits::SAffinity>(affinity);
534  }
535 
536  EState CheckState();
538  CNetServer WaitForNotifications(const CDeadline& deadline);
539  bool MoreJobs(const SEntry& entry);
540  bool CheckEntry(
541  SEntry& entry,
542  const string& prio_aff_list,
543  bool any_affinity,
544  CNetScheduleJob& job,
545  CNetScheduleAPI::EJobStatus* job_status);
546  void ReturnJob(CNetScheduleJob& job);
547 
549  const unsigned m_Timeout;
550  string m_JobGroup;
551  string m_Affinity;
552 
553  private:
555  };
556 
559 };
560 
562 {
564  m_API(ns_api_impl)
565  {
566  }
567 
569 };
570 
572 
573 
574 #endif /* CONN_SERVICES___NETSCHEDULE_API_IMPL__HPP */
static CRef< CScope > m_Scope
#define false
Definition: bool.h:36
CAtomicCounter_WithAutoInit –.
Definition: ncbicntr.hpp:120
Pool of recycled CCompoundID objects.
CConfigException –.
Definition: ncbi_config.hpp:53
CDeadline.
Definition: ncbitime.hpp:1830
CFastMutex –.
Definition: ncbimtx.hpp:667
Client API for NCBI NetSchedule server.
const SRegSynonyms & m_Sections
CNetScheduleGETCmdListener(SNetScheduleExecutorImpl *executor)
SNetScheduleExecutorImpl * m_Executor
virtual void OnExec(CNetServerConnection::TInstance conn_impl, const string &cmd)
CRef< SNetScheduleSharedData > m_SharedData
CNetScheduleServerListener(bool non_wn, SNetScheduleSharedData *shared_data)
void SetAuthString(const string &auth)
CObject –.
Definition: ncbiobj.hpp:180
CSemaphore –.
Definition: ncbimtx.hpp:1375
CTimeout – Timeout interval.
Definition: ncbitime.hpp:1693
CVersionInfo –.
CNetServer WaitForNotifications(const CDeadline &deadline)
bool CheckEntry(SEntry &entry, const string &prio_aff_list, bool any_affinity, CNetScheduleJob &job, CNetScheduleAPI::EJobStatus *job_status)
CImpl(CNetScheduleAPI::TInstance ns_api_impl, const string &group, const string &affinity)
const_iterator end() const
Definition: set.hpp:136
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
char value[7]
Definition: config.c:431
static CS_COMMAND * cmd
Definition: ct_dynamic.c:26
static void Init(void)
Definition: cursor6.c:76
int Main(int argc, const char *argv[])
@ eOff
Definition: ncbi_types.h:110
@ eDefault
Definition: ncbi_types.h:112
@ eOn
Definition: ncbi_types.h:111
#define NULL
Definition: ncbistd.hpp:225
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
EJobStatus
Job status codes.
CNetServer server
The server the job belongs to.
unsigned short port
TCP/IP port number.
EJobAffinityPreference
Affinity matching modes.
EReadNextJobResult
Possible outcomes of ReadNextJob() calls.
string host
Server name.
string job_id
Output job key.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
#define kEmptyStr
Definition: ncbistr.hpp:123
CNanoTimeout GetRemainingTime(void) const
Get time left to the expiration.
Definition: ncbitime.cpp:3858
enum ENcbiSwitch ESwitch
Aux.
Definition of all error codes used in connect services library (xconnserv.lib and others).
void Check(const string &value)
void ThrowIllegalChar(const string &name, const string &value, char c)
int isalnum(Uchar c)
Definition: ncbictype.hpp:62
NetSchedule client specs.
ENetScheduleQueuePauseMode
Defines whether the job queue is paused, and if so, defines the pause mode set by the administrator.
const unsigned s_Timeout
void copy(Njn::Matrix< S > *matrix_, const Njn::Matrix< T > &matrix0_)
Definition: njn_matrix.hpp:613
static const TDS_WORD limits[]
Definition: num_limits.h:85
static const char * prefix[]
Definition: pcregrep.c:405
Defines CRequestContext class for NCBI C++ diagnostic API.
vector< pair< string, string > > TAffinityLadder
Job description.
Meaningful information encoded in the NetSchedule key.
New job description.
CNetScheduleAPI::SServerParams m_ServerParams
CRef< SNetScheduleSharedData > m_SharedData
CNetScheduleAPI::TQueueParams TQueueParams
CNetScheduleGetJob::TAffinityLadder m_AffinityLadder
CCompoundIDPool GetCompoundIDPool()
static TMode GetMode(bool wn, bool try_config)
CNetServer GetServer(const CNetScheduleJob &job)
CNetScheduleServerListener * GetListener()
CAtomicCounter_WithAutoInit m_NotificationThreadStartStopCounter
CCompoundIDPool m_CompoundIDPool
string ExecOnJobServer(const TJob &job, const string &cmd, ESwitch roe=eDefault)
const CNetScheduleAPI::SServerParams & GetServerParams()
map< string, string > TAuthParams
CRef< SNetScheduleNotificationThread > m_NotificationThread
CNetServer GetServer(const string &job_key)
SNetScheduleAdminImpl(CNetScheduleAPI::TInstance ns_api_impl)
SNetScheduleExecutorImpl(CNetScheduleAPI::TInstance ns_api_impl)
bool ExecGET(SNetServerImpl *server, const string &get_cmd, CNetScheduleJob &job)
CNetScheduleExecutor::EJobAffinityPreference m_AffinityPreference
CNetScheduleNotificationHandler m_NotificationHandler
int AppendAffinityTokens(string &cmd, const vector< string > *affs, EChangeAffAction action)
void ReturnJob(const CNetScheduleJob &job, bool blacklist=true)
bool x_GetJobWithAffinityLadder(SNetServerImpl *server, const CDeadline &timeout, const string &prio_aff_list, bool any_affinity, CNetScheduleJob &job)
void ClaimNewPreferredAffinity(CNetServer orig_server, const string &affinity)
CNetScheduleGetJobImpl< CImpl > m_Timeline
CNetScheduleJobReader::EReadNextJobResult ReadNextJob(CNetScheduleJob *job, CNetScheduleAPI::EJobStatus *job_status, const CTimeout *timeout)
SNetScheduleJobReaderImpl(CNetScheduleAPI::TInstance ns_api_impl, const string &group, const string &affinity)
SNetScheduleNotificationReceiver m_Receiver
map< string, SNetServerInPool * > TServerByNode
SNetScheduleSubmitterImpl(CNetScheduleAPI::TInstance ns_api_impl)
bool Wait(const CDeadline &deadline)
static bool IsValidValue(const string &s)
static bool IsValidValue(const string &s)
static bool IsValidValue(const string &s)
#define _ASSERT
Modified on Thu Sep 28 03:34:11 2023 by modify_doxy.py rev. 669887