NCBI C++ ToolKit
grid_control_thread.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: grid_control_thread.cpp 94591 2021-08-19 19:07:39Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  * NetSchedule Worker Node implementation
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "netschedule_api_impl.hpp"
35 #include "grid_worker_impl.hpp"
36 #include "grid_control_thread.hpp"
37 
40 
41 #include <corelib/ncbistre.hpp>
42 #include <corelib/ncbi_process.hpp>
43 
44 #include <math.h>
45 
46 
47 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
48 
50 
51 /////////////////////////////////////////////////////////////////////////////
52 //
53 ///@internal
54 
56 {
57 public:
58  virtual void Process(const string& /*request*/,
59  CNcbiOstream& os,
60  CWorkerNodeControlServer* control_server)
61  {
62  CGridWorkerNode node(control_server->GetWorkerNode());
64 
65  os << "OK:version=" << NStr::URLEncode(version.first) <<
66  "&build_date=" << NStr::URLEncode(version.second.date) <<
67  "&build_tag=" << NStr::URLEncode(version.second.tag) << "\n";
68  }
69 };
70 
72 {
73 public:
74  virtual bool Authenticate(const string& host,
75  const string& /*auth*/,
76  const string& /*queue*/,
77  CNcbiOstream& os,
78  CWorkerNodeControlServer* control_server)
79  {
80  m_Host = host;
81  size_t pos = m_Host.find_first_of(':');
82  if (pos != string::npos) {
83  m_Host = m_Host.substr(0, pos);
84  }
85  if (control_server->GetWorkerNode().IsHostInAdminHostsList(m_Host)) {
86  return true;
87  }
88  os << "ERR:Shutdown access denied.\n";
89  LOG_POST_X(10, Warning << "Shutdown access denied for host " << m_Host);
90  return false;
91  }
92 
93 protected:
94  string m_Host;
95 };
96 
98 {
99 public:
100  virtual void Process(const string& request,
101  CNcbiOstream& os,
102  CWorkerNodeControlServer* /*control_server*/)
103  {
104  if (request.find("SUICIDE") != NPOS) {
105  LOG_POST_X(11, Warning <<
106  "Shutdown request has been received from host: " << m_Host);
107  LOG_POST_X(12, Warning << "Server is shutting down");
109  } else {
112  if (request.find("IMMEDIATE") != NPOS)
114  os << "OK:\n";
116  LOG_POST_X(13, "Shutdown request has been received from host " <<
117  m_Host);
118  }
119  }
120 };
121 
123 {
124 public:
125  virtual void Process(const string& request, CNcbiOstream& os,
126  CWorkerNodeControlServer* control_server)
127  {
128  bool pullback = NStr::Find(request.c_str(), "pullback") != NPOS;
129 
130  unsigned timeout = 0;
131  SIZE_TYPE timeout_str = NStr::Find(request.c_str(), "timeout=");
132  if (timeout_str != NPOS)
133  timeout = NStr::StringToUInt(request.c_str() +
134  timeout_str + sizeof("timeout=") - 1,
136 
137  LOG_POST("Received SUSPEND request from " << m_Host <<
138  " (pullback=" << (pullback ? "ON" : "OFF") <<
139  ", timeout=" << timeout << ')');
140 
141  control_server->GetWorkerNode().Suspend(pullback, timeout);
142 
143  os << "OK:\n";
144  }
145 };
146 
148 {
149 public:
150  virtual void Process(const string& /*request*/, CNcbiOstream& os,
151  CWorkerNodeControlServer* control_server)
152  {
153  control_server->GetWorkerNode().Resume();
154  LOG_POST("Received RESUME request from " << m_Host);
155  os << "OK:\n";
156  }
157 };
158 
161 {
162 public:
163  virtual void Process(const string& /*request*/,
164  CNcbiOstream& os,
165  CWorkerNodeControlServer* control_server)
166  {
167  CGridWorkerNode node(control_server->GetWorkerNode());
169 
170  os << "OK:Application: " << node.GetAppName() <<
171  "\nVersion: " << version.first <<
172  "\nBuild date: " << version.second.date <<
173  "\nBuild tag: " << version.second.tag << "\n";
174 
175  for (const auto& p : version.second.m_extra) {
176  os << SBuildInfo::ExtraName(p.first) << ": " << p.second << '\n';
177  }
178 
179  {{
181  if (app)
182  os << "Executable path: " << app->GetProgramExecutablePath()
183  << "\nPID: " << CCurrentProcess::GetPid() << "\n";
184  }}
185 
186  CNetScheduleAPI ns_api(node.GetNetScheduleAPI());
187 
188  os << "Host name: " << CSocketAPI::gethostname() <<
189  "\nControl port: " << control_server->GetControlPort() <<
190  "\nUser name: " << GetDiagContext().GetUsername() <<
191  "\nNetCache client name: " << node.GetNetCacheAPI().
192  GetService()->GetClientName() <<
193  "\nNetSchedule client name: " << node->GetClientName() <<
194  "\nQueue name: " << node->GetQueueName() <<
195  "\nNode ID: " << ns_api->m_ClientNode <<
196  "\nNode session: " << ns_api->m_ClientSession <<
197  "\nMaximum job threads: " << node.GetMaxThreads() << "\n";
198 
199  if (node.IsSuspended())
200  os << "The node is suspended\n";
201 
202  if (CGridGlobals::GetInstance().IsShuttingDown())
203  os << "The node is shutting down\n";
204 
205  if (node->IsExclusiveMode())
206  os << "The node is processing an exclusive job\n";
207 
209 
210  os << "NetSchedule service: " <<
211  ns_api.GetService().GetServiceName() << "\n";
212 
213  os << "NetSchedule servers:";
214  try {
215  for (CNetServiceIterator it = ns_api.GetService().
216  Iterate(CNetService::eIncludePenalized); it; ++it)
217  os << ' ' << (*it).GetServerAddress();
218  os << "\n";
219  }
220  catch (CNetSrvConnException&) {
221  os << " N/A\n";
222  }
223 
224  os << "Preferred affinities:";
225  CNetScheduleExecutor ns_executor(node.GetNSExecutor());
226  CFastMutexGuard guard(ns_executor->m_PreferredAffMutex);
227  ITERATE(set<string>, aff, ns_executor->m_PreferredAffinities) {
228  os << ' ' << *aff;
229  }
230  os << "\n";
231 
232  auto registry = node->m_SynRegistry;
233  _ASSERT(registry);
234 
235  registry->Alerts(os);
236 
237  os << "OK:END\n";
238  }
239 };
240 
242 {
243 public:
244  virtual bool Authenticate(const string& /*host*/,
245  const string& auth,
246  const string& queue,
247  CNcbiOstream& os,
248  CWorkerNodeControlServer* control_server)
249  {
250  CGridWorkerNode node(control_server->GetWorkerNode());
251 
252  if (NStr::FindCase(auth, node->GetClientName()) == NPOS) {
253  os <<"ERR:Wrong client name. Required: " <<
254  node->GetClientName() << "\n";
255  return false;
256  }
257 
258  CTempString qname, connection_info;
259  NStr::SplitInTwo(queue, ";", qname, connection_info);
260  if (qname != node->GetQueueName()) {
261  os << "ERR:Wrong queue name. Required: " <<
262  node->GetQueueName() << "\n";
263  return false;
264  }
265 
266  return true;
267  }
268 
269  virtual void Process(const string& /*request*/,
270  CNcbiOstream& os,
271  CWorkerNodeControlServer* control_server)
272  {
273  int load = control_server->GetWorkerNode().GetMaxThreads() -
275  os << "OK:" << load << "\n";
276  }
277 };
278 
280 {
281 public:
282  void Process(const string& request, CNcbiOstream& reply, CWorkerNodeControlServer* control_server) override;
283 };
284 
285 void CGetConfProcessor::Process(const string&, CNcbiOstream& reply, CWorkerNodeControlServer* control_server)
286 {
287  _ASSERT(control_server);
288 
289  auto node = static_cast<SGridWorkerNodeImpl*>(control_server->GetWorkerNode());
290  _ASSERT(node);
291 
292  auto registry = node->m_SynRegistry;
293  _ASSERT(registry);
294 
295  registry->Report(reply);
296  reply << "OK:END\n";
297 }
298 
300 {
301 public:
302  void Process(const string& request, CNcbiOstream& reply, CWorkerNodeControlServer* control_server) override;
303 };
304 
305 void CAckAlertProcessor::Process(const string& request, CNcbiOstream& reply, CWorkerNodeControlServer* control_server)
306 {
307  _ASSERT(control_server);
308 
309  auto node = static_cast<SGridWorkerNodeImpl*>(control_server->GetWorkerNode());
310  _ASSERT(node);
311 
312  auto registry = node->m_SynRegistry;
313  _ASSERT(registry);
314 
315  const string kAlertIDPrefix = " alert_";
316  auto pos = NStr::Find(request, kAlertIDPrefix, NStr::eNocase);
317 
318  if (pos == NPOS) {
319  reply << "ERR:Alert ID is required\n";
320  return;
321  }
322 
324  auto id = NStr::StringToUInt(request.c_str() + pos + kAlertIDPrefix.size(), kFlags);
325 
326  if (!registry->AckAlert(id)) {
327  reply << "ERR:Failed to find an alert with such ID (" << id << ")\n";
328  } else {
329  reply << "OK:\n";
330  }
331 }
332 
334 {
335 public:
336  virtual void Process(const string& request,
337  CNcbiOstream& os,
338  CWorkerNodeControlServer* /*control_server*/)
339  {
340  os << "ERR:Unknown command -- " << request << "\n";
341  }
342 };
343 
344 /////////////////////////////////////////////////////////////////////////////
345 //
346 ///@internal
347 
348 /* static */
351 {
352  if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("VERSION")))
353  return new CGetVersionProcessor;
354 
355  if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("STAT")))
356  return new CGetStatisticsProcessor;
357 
358  if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("SHUTDOWN")))
359  return new CShutdownProcessor;
360 
361  if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("SUSPEND")))
362  return new CSuspendProcessor;
363 
364  if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("RESUME")))
365  return new CResumeProcessor;
366 
367  if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("GETLOAD")))
368  return new CGetLoadProcessor;
369 
370  if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("GETCONF")))
371  return new CGetConfProcessor;
372 
373  if (NStr::StartsWith(cmd, TEMP_STRING_CTOR("ACKALERT")))
374  return new CAckAlertProcessor;
375 
376  return new CUnknownProcessor;
377 }
378 
380 {
381 public:
383  unsigned short& start_port, unsigned short end_port)
384  : m_Server(server), m_Port(start_port), m_EndPort(end_port)
385  {}
387  return new CWNCTConnectionHandler(m_Server);
388  }
389  virtual EListenAction OnFailure(unsigned short* port)
390  {
391  if (*port >= m_EndPort)
392  return eLAFail;
393  m_Port = ++(*port);
394  return eLARetry;
395  }
396 
397 private:
399  unsigned short& m_Port;
400  unsigned short m_EndPort;
401 };
402 
403 static STimeout kAcceptTimeout = {1,0};
404 
406  SGridWorkerNodeImpl* worker_node,
407  unsigned short start_port,
408  unsigned short end_port) :
409  m_WorkerNode(worker_node),
410  m_ShutdownRequested(false),
411  m_Port(start_port)
412 {
413  SServer_Parameters params;
414  params.init_threads = 1;
415  params.max_threads = 3;
416  params.accept_timeout = &kAcceptTimeout;
417  SetParameters(params);
418  AddListener(new CWNCTConnectionFactory(*this, m_Port, end_port), m_Port);
419 }
420 
422 {
423  LOG_POST_X(14, Info << "Control server stopped.");
424 }
426 {
427  return m_ShutdownRequested;
428 }
429 
431 {
433 
434  // If shutting down, request closing control server/port as well
435  if (CGridGlobals::GetInstance().IsShuttingDown()) {
436  RequestShutdown();
437  }
438 }
439 
440 
441 
442 ////////////////////////////////////////////////
443 static string s_ReadStrFromBUF(BUF buf)
444 {
445  size_t size = BUF_Size(buf);
446  string ret(size, '\0');
447  if (size > 0)
448  BUF_Read(buf, &ret[0], size);
449  return ret;
450 }
451 
453  : m_Server(server)
454 {}
455 
457 {}
458 
460 {
461  CSocket& socket = GetSocket();
462  socket.DisableOSSendDelay();
464 
465 }
466 
467 static void s_HandleError(CSocket& socket, const string& msg)
468 {
469  ERR_POST_X(15, "Exception in the control server: " << msg);
470  string err = "ERR:" + NStr::PrintableString(msg);
471  socket.Write(&err[0], err.size());
472 }
474 {
475  try {
476  (this->*m_ProcessMessage)(buffer);
477  } catch(exception& ex) {
478  s_HandleError(GetSocket(), ex.what());
479  } catch(...) {
480  s_HandleError(GetSocket(), "Unknown Error");
481  }
482 }
483 
485 {
488 }
490 {
493 }
495 {
496  string request = s_ReadStrFromBUF(buffer);
497 
498  CSocket& socket = GetSocket();
499  string host = socket.GetPeerAddress();
500 
501  CNcbiOstrstream os;
502 
503  unique_ptr<CWorkerNodeControlServer::IRequestProcessor>
504  processor(m_Server.MakeProcessor(request));
505 
506  if (processor->Authenticate(host, m_Auth, m_Queue, os, &m_Server))
507  processor->Process(request, os, &m_Server);
508 
509  string s = CNcbiOstrstreamToString(os);
510  socket.Write(s.data(), s.size());
511 }
512 
static const NStr::TNumToStringFlags kFlags
void Process(const string &request, CNcbiOstream &reply, CWorkerNodeControlServer *control_server) override
virtual bool Authenticate(const string &host, const string &, const string &, CNcbiOstream &os, CWorkerNodeControlServer *control_server)
void Process(const string &request, CNcbiOstream &reply, CWorkerNodeControlServer *control_server) override
virtual bool Authenticate(const string &, const string &auth, const string &queue, CNcbiOstream &os, CWorkerNodeControlServer *control_server)
virtual void Process(const string &, CNcbiOstream &os, CWorkerNodeControlServer *control_server)
virtual void Process(const string &, CNcbiOstream &os, CWorkerNodeControlServer *control_server)
virtual void Process(const string &, CNcbiOstream &os, CWorkerNodeControlServer *control_server)
CWNJobWatcher & GetJobWatcher()
void RequestShutdown(CNetScheduleAdmin::EShutdownLevel level)
Request node shutdown.
static CGridGlobals & GetInstance()
Grid Worker Node.
CNcbiOstrstreamToString class helps convert CNcbiOstrstream to a string Sample usage:
Definition: ncbistre.hpp:802
Client API for NCBI NetSchedule server.
EShutdownLevel
Shutdown level.
@ eNormalShutdown
Normal shutdown was requested.
@ eShutdownImmediate
Urgent shutdown was requested.
Smart pointer to a part of the NetSchedule API that does job retrieval and processing on the worker n...
const string & GetServiceName() const
Net Service exception.
virtual void Process(const string &, CNcbiOstream &os, CWorkerNodeControlServer *control_server)
virtual void Process(const string &request, CNcbiOstream &os, CWorkerNodeControlServer *)
CSocket::
virtual void Process(const string &request, CNcbiOstream &os, CWorkerNodeControlServer *control_server)
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
virtual void Process(const string &request, CNcbiOstream &os, CWorkerNodeControlServer *)
CWNCTConnectionFactory(CWorkerNodeControlServer &server, unsigned short &start_port, unsigned short end_port)
virtual EListenAction OnFailure(unsigned short *port)
Return desired action if the port, mentioned in AddListener is busy.
virtual IServer_ConnectionHandler * Create(void)
CWorkerNodeControlServer & m_Server
virtual void OnOpen(void)
Runs in response to an external event [asynchronous].
void x_ProcessRequest(BUF buffer)
CWorkerNodeControlServer & m_Server
void(CWNCTConnectionHandler::* m_ProcessMessage)(BUF buffer)
CWNCTConnectionHandler(CWorkerNodeControlServer &server)
virtual void OnMessage(BUF buffer)
void CheckForInfiniteLoop()
void Print(CNcbiOstream &os) const
unsigned GetJobsRunningNumber() const
virtual void ProcessTimeout(void)
Runs synchronously when no socket activity has occurred in a while (as determined by m_AcceptTimeout)...
unsigned short GetControlPort() const
virtual bool ShutdownRequested(void)
Runs synchronously between iterations.
static IRequestProcessor * MakeProcessor(const string &cmd)
CWorkerNodeControlServer(SGridWorkerNodeImpl *worker_node, unsigned short start_port, unsigned short end_port)
IServer_ConnectionFactory::
Definition: server.hpp:388
IServer_ConnectionHandler::
Definition: server.hpp:226
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
static CS_COMMAND * cmd
Definition: ct_dynamic.c:26
#define false
Definition: bool.h:36
static string s_ReadStrFromBUF(BUF buf)
static void s_HandleError(CSocket &socket, const string &msg)
static STimeout kAcceptTimeout
Grid Framework specs.
const string & GetProgramExecutablePath(EFollowLinks follow_links=eIgnoreLinks) const
Get the application's executable path.
static CNcbiApplicationGuard InstanceGuard(void)
Singleton method.
Definition: ncbiapp.cpp:133
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
size_t BUF_Size(BUF buf)
Definition: ncbi_buffer.c:84
size_t BUF_Read(BUF buf, void *data, size_t size)
Definition: ncbi_buffer.c:414
#define LOG_POST_X(err_subcode, message)
Definition: ncbidiag.hpp:553
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:818
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
Definition: ncbidiag.hpp:550
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
const string & GetUsername(void) const
Get username.
Definition: ncbidiag.cpp:1754
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
void Info(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1185
bool IsHostInAdminHostsList(const string &host) const
TVersion GetAppVersion() const
CNetCacheAPI GetNetCacheAPI() const
pair< string, SBuildInfo > TVersion
unsigned int GetMaxThreads() const
Get the maximum threads running simultaneously.
CNetScheduleExecutor GetNSExecutor() const
CNetService GetService()
bool IsSuspended() const
string GetAppName() const
CNetScheduleAPI GetNetScheduleAPI() const
void Suspend(bool pullback, unsigned timeout)
static TPid GetPid(void)
Get process identifier (pid) for the current process.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
static string gethostname(ESwitch log=eOff)
Return empty string on error.
void DisableOSSendDelay(bool on_off=true)
void GetPeerAddress(unsigned int *host, unsigned short *port, ENH_ByteOrder byte_order) const
Get peer address.
EIO_Status Write(const void *buf, size_t size, size_t *n_written=0, EIO_WriteMethod how=eIO_WritePersist)
Write to socket.
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
NCBI_NS_STD::string::size_type SIZE_TYPE
Definition: ncbistr.hpp:132
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
Definition: ncbistr.cpp:3953
#define NPOS
Definition: ncbistr.hpp:133
static SIZE_TYPE Find(const CTempString str, const CTempString pattern, ECase use_case=eCase, EDirection direction=eForwardSearch, SIZE_TYPE occurrence=0)
Find the pattern in the string.
Definition: ncbistr.cpp:2891
static SIZE_TYPE FindCase(const CTempString str, const CTempString pattern, SIZE_TYPE start, SIZE_TYPE end, EOccurrence which=eFirst)
Find the pattern in the specified range of a string using a case sensitive search.
Definition: ncbistr.hpp:5490
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
Definition: ncbistr.hpp:5412
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3554
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
Definition: ncbistr.cpp:642
static string URLEncode(const CTempString str, EUrlEncode flag=eUrlEnc_SkipMarkChars)
URL-encode string.
Definition: ncbistr.cpp:6062
@ fConvErr_NoThrow
Do not throw an exception on error.
Definition: ncbistr.hpp:285
@ fAllowTrailingSymbols
Ignore trailing non-numerics characters.
Definition: ncbistr.hpp:298
@ eNocase
Case insensitive compare.
Definition: ncbistr.hpp:1206
EListenAction
What to do if the port is busy.
Definition: server.hpp:391
const STimeout * accept_timeout
Maximum t between exit checks.
Definition: server.hpp:440
void AddListener(IServer_ConnectionFactory *factory, unsigned short port)
Register a listener.
Definition: server.cpp:605
unsigned short m_Port
TCP port to listen on.
void SetParameters(const SServer_Parameters &new_params)
Definition: server.cpp:618
unsigned int max_threads
Maximum simultaneous threads.
Definition: server.hpp:447
CSocket & GetSocket(void)
Get underlying socket.
Definition: server.hpp:294
unsigned int init_threads
Number of initial threads.
Definition: server.hpp:446
static string ExtraName(EExtra key)
Definition: version.cpp:480
#define TEMP_STRING_CTOR(str)
Definition: util.hpp:79
char * buf
static int version
Definition: mdb_load.c:29
const struct ncbi::grid::netcache::search::fields::SIZE size
Defines process management classes.
NCBI C++ stream class wrappers for triggering between "new" and "old" C++ stream libraries.
static pcre_uint8 * buffer
Definition: pcretest.c:1051
CSynRegistry::TPtr m_SynRegistry
bool IsExclusiveMode() const
const string & GetQueueName() const
const string & GetClientName() const
SServer_Parameters::
Definition: server.hpp:434
Timeout structure.
Definition: ncbi_types.h:76
#define _ASSERT
Modified on Mon May 27 04:40:07 2024 by modify_doxy.py rev. 669887