NCBI C++ ToolKit
netschedule_api_wn_info.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: netschedule_api_wn_info.cpp 95002 2021-09-27 16:36:44Z grichenk $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Dmitry Kazimirov
27  *
28  */
29 
30 #include <ncbi_pch.hpp>
31 
32 #include "netschedule_api_impl.hpp"
33 
35 
36 #include <locale>
37 
39 
41 
42 /// Types of administrative information NetSchedule can return.
43 ///
45 
46 void g_GetWorkerNodes(CNetScheduleAPI api, list<SWorkerNodeInfo>& worker_nodes)
47 {
48  worker_nodes.clear();
49 
50  set<string> unique_sessions;
51 
52  for (CNetServiceIterator iter = api->m_Service.Iterate(
53  CNetService::eIncludePenalized); iter; ++iter) {
54 
55  CJsonNode client_info_array = g_GenericStatToJson(*iter,
56  eNetScheduleStatClients, /*verbose*/ false);
57 
58  for (CJsonIterator client_info_iter = client_info_array.Iterate();
59  client_info_iter; ++client_info_iter) {
60  CJsonNode client_info = *client_info_iter;
61  if (NStr::Find(client_info.GetString("type"),
62  "worker node") == NPOS)
63  continue;
64 
65  string session = client_info.GetString("session");
66 
67  if (unique_sessions.insert(session).second) {
68  worker_nodes.push_back(SWorkerNodeInfo());
69 
70  SWorkerNodeInfo& wn_info = worker_nodes.back();
71  wn_info.name = client_info.GetString("client_node");
72  string not_used;
73  NStr::SplitInTwo(wn_info.name, "::", wn_info.prog, not_used);
74  wn_info.session = session;
75  wn_info.host = client_info.GetString("client_host");
76  wn_info.port = (unsigned short)
77  client_info.GetInteger("worker_node_control_port");
78  wn_info.last_access = CTime(
79  client_info.GetString("last_access"), "M/D/Y h:m:s.r");
80  }
81  }
82  }
83 }
84 
86 {
88 
89  list<SWorkerNodeInfo> worker_nodes;
90  g_GetWorkerNodes(api, worker_nodes);
91 
92  ITERATE(list<CNetScheduleAdmin::SWorkerNodeInfo>, wn_info, worker_nodes) {
93  string wn_address = wn_info->host + ':' +
94  NStr::NumericToString(wn_info->port);
95  try {
96  CNetScheduleAPI wn_api(wn_address,
97  api->m_Service->GetClientName(),
98  kEmptyStr);
99 
100  result.SetByKey(wn_info->session, g_WorkerNodeInfoToJson(
101  wn_api.GetService().Iterate().GetServer()));
102  }
103  catch (CException& e) {
104  ERR_POST(e);
105  }
106  }
107 
108  return result;
109 }
110 
111 struct {
112  const char* command;
113  const char* record_prefix;
114  const char* entity_name;
116  // eNetScheduleStatJobGroups
117  {"STAT GROUPS", "GROUP: ", "group"},
118  // eNetScheduleStatClients
119  {"STAT CLIENTS", "CLIENT: ", "client_node"},
120  // eNetScheduleStatNotifications
121  {"STAT NOTIFICATIONS", "CLIENT: ", "client_node"},
122  // eNetScheduleStatAffinities
123  {"STAT AFFINITIES", "AFFINITY: ", "affinity_token"}
124 };
125 
127 {
128  return s_StatTopics[topic].command;
129 }
130 
131 // Return unquoted and unescaped version of 'str' if it
132 // starts with a quote. Otherwise, return 'str' itself.
133 //
135 {
136  switch (str[0]) {
137  case '"':
138  case '\'':
139  return NStr::ParseQuoted(str);
140  default:
141  return str;
142  }
143 }
144 
146 {
147  locale loc;
148  char* begin = const_cast<char*>(key.data());
149  char* end = begin + key.length();
150 
151  while (begin < end && !isalnum(*begin))
152  ++begin;
153 
154  while (begin < end && !isalnum(end[-1]))
155  --end;
156 
157  if (begin == end) {
158  key = "_";
159  return;
160  }
161 
162  key.assign(begin, end - begin);
163 
164  for (; begin < end; ++begin)
165  *begin = isalnum(*begin, loc) ? tolower(*begin, loc) : '_';
166 }
167 
168 // Detect older format of the worker node info line that contained
169 // executable name followed by PID.
170 // @return false if 'stat_info' already has the new format
171 // of the executable name; true if 'stat_info' was fixed.
172 //
173 bool g_FixMisplacedPID(CJsonNode& stat_info, CTempString& executable_path,
174  const char* pid_key)
175 {
176  SIZE_TYPE misplaced_pid = NStr::Find(executable_path, "; PID: ");
177  if (misplaced_pid == NPOS)
178  return false;
179 
180  SIZE_TYPE pos = misplaced_pid + sizeof("; PID: ") - 1;
181  stat_info.SetInteger(pid_key, NStr::StringToInt8(
182  CTempString(executable_path.data() + pos,
183  executable_path.length() - pos)));
184  executable_path.erase(misplaced_pid);
185  return true;
186 }
187 
188 // Return a JSON object that contains the requested infromation.
189 // The structure of the object depends on the topic.
190 //
192  ENetScheduleStatTopic topic, bool verbose)
193 {
194  string stat_cmd(s_StatTopics[topic].command);
197 
198  if (verbose)
199  stat_cmd.append(" VERBOSE");
200 
202 
203  CNetServerMultilineCmdOutput output(server.ExecWithRetry(stat_cmd, true));
204 
206  CJsonNode entity_info;
207  CJsonNode array_value;
208 
209  string line;
210 
211  while (output.ReadLine(line)) {
212  if (NStr::StartsWith(line, prefix)) {
213  if (entity_info)
214  entities.Append(entity_info);
215  entity_info = CJsonNode::NewObjectNode();
217  CTempString(line.data() + prefix.length(),
218  line.length() - prefix.length())));
219  } else if (entity_info && NStr::StartsWith(line, " ")) {
220  if (NStr::StartsWith(line, " ") && array_value) {
221  array_value.AppendString(g_UnquoteIfQuoted(
223  } else {
224  if (array_value)
225  array_value = NULL;
227  NStr::SplitInTwo(line, ":", key, value);
229  string key_norm(key);
231  if (value.empty())
232  entity_info.SetByKey(key_norm, array_value =
234  else {
235  const auto string_node = (topic == eNetScheduleStatClients) && (key_norm == "client_host");
236  auto node = string_node ? CJsonNode::NewStringNode(value) : CJsonNode::GuessType(value);
237  entity_info.SetByKey(key_norm, node);
238  }
239  }
240  }
241  }
242  if (entity_info)
243  entities.Append(entity_info);
244 
245  return entities;
246 }
247 
248 // Convert 'server_info' to a JSON object.
249 //
251  bool server_version_key)
252 {
253  CJsonNode server_info_node(CJsonNode::NewObjectNode());
254 
255  string attr_name, attr_value;
256 
257  ESwitch old_format = eDefault;
258 
259  while (server_info.GetNextAttribute(attr_name, attr_value)) {
260  switch (old_format) {
261  case eOn:
262  if (attr_name == "Build")
263  attr_name = "build_date";
264  else
265  NStr::ReplaceInPlace(NStr::ToLower(attr_name), " ", "_");
266  break;
267 
268  case eDefault:
269  if (NStr::EndsWith(attr_name, " version"))
270  {
271  old_format = eOn;
272  attr_name = server_version_key ? "server_version" : "version";
273  break;
274  } else
275  old_format = eOff;
276  /* FALL THROUGH */
277 
278  case eOff:
279  if (server_version_key && attr_name == "version")
280  attr_name = "server_version";
281  }
282 
283  server_info_node.SetString(attr_name, attr_value);
284  }
285 
286  return server_info_node;
287 }
288 
289 static bool s_ExtractKey(const CTempString& line,
290  string& key, CTempString& value)
291 {
292  locale loc;
293  key.erase(0, key.length());
294  SIZE_TYPE line_len = line.length();
295  key.reserve(line_len);
296 
297  for (SIZE_TYPE i = 0; i < line_len; ++i)
298  {
299  char c = line[i];
300  if (isalnum(c, loc))
301  key += tolower(c, loc);
302  else if (c == ' ' || c == '_' || c == '-')
303  key += '_';
304  else if (c != ':' || key.empty())
305  break;
306  else {
307  if (++i < line_len && line[i] == ' ')
308  ++i;
309  value.assign(line, i, line_len - i);
310  return true;
311  }
312  }
313  return false;
314 }
315 
317 {
318  CJsonNode array_node(CJsonNode::NewArrayNode());
319  list<CTempString> words;
320  NStr::Split(str, TEMP_STRING_CTOR(" "), words,
322  ITERATE(list<CTempString>, it, words) {
323  array_node.AppendString(*it);
324  }
325  return array_node;
326 }
327 
328 // Return a JSON object that contains status information reported
329 // by the specified worker node.
330 //
332 {
334  worker_node.ExecWithRetry("STAT", true));
335 
337 
338  string line;
339  string key;
341  CJsonNode job_counters(CJsonNode::NewObjectNode());
342  CJsonNode running_jobs(CJsonNode::NewArrayNode());
344  SIZE_TYPE pos;
345  int running_job_count = 0;
346  int free_worker_threads = 1;
347  bool suspended = false;
348  bool shutting_down = false;
349  bool exclusive_job = false;
350  bool working = false;
351 
352  while (output.ReadLine(line)) {
353  if (line.empty() || isspace(line[0]))
354  continue;
355 
356  if (running_job_count > 0) {
357  if ((pos = NStr::Find(CTempString(line),
358  TEMP_STRING_CTOR("running for "),
360  --running_job_count;
361 
362  pos += sizeof("running for ") - 1;
363  Uint8 run_time = NStr::StringToUInt8(
364  CTempString(line.data() + pos, line.length() - pos),
368 
369  if ((pos = NStr::Find(line, TEMP_STRING_CTOR(" "))) != NPOS) {
370  CJsonNode running_job(CJsonNode::NewObjectNode());
371 
372  running_job.SetString("key", line.substr(0, pos));
373  running_job.SetInteger("run_time", (Int8) run_time);
374  running_jobs.Append(running_job);
375  }
376  }
377  } else if (NStr::StartsWith(line, TEMP_STRING_CTOR("Jobs "))) {
378  if (s_ExtractKey(CTempString(line.data() + sizeof("Jobs ") - 1,
379  line.length() - (sizeof("Jobs ") - 1)), key, value)) {
380  job_counters.SetInteger(key, NStr::StringToInt8(value));
381  if (key == "running") {
382  running_job_count = NStr::StringToInt(value,
386  working = running_job_count > 0;
387  free_worker_threads -= running_job_count;
388  }
389  }
390  } else if (NStr::StartsWith(line, TEMP_STRING_CTOR("Alert_")) && s_ExtractKey(line, key, value)) {
391  alerts.SetString(key, value);
392  } else if (s_ExtractKey(line, key, value)) {
393  if (key == "host_name")
394  key = "hostname";
395  else if (key == "node_started_at")
396  key = "started";
397  else if (key == "executable_path")
398  g_FixMisplacedPID(wn_info, value, "pid");
399  else if (key == "maximum_job_threads") {
400  int maximum_job_threads = NStr::StringToInt(value,
404  free_worker_threads += maximum_job_threads - 1;
405  wn_info.SetInteger(key, maximum_job_threads);
406  continue;
407  } else if (key == "netschedule_servers") {
408  if (value != "N/A")
410  else
412  continue;
413  } else if (key == "preferred_affinities") {
415  continue;
416  }
418  } else if (wn_info.GetSize() == 0)
419  wn_info = g_ServerInfoToJson(g_ServerInfoFromString(line), false);
420  else if (NStr::Find(line, TEMP_STRING_CTOR("suspended")) != NPOS)
421  suspended = true;
422  else if (NStr::Find(line, TEMP_STRING_CTOR("shutting down")) != NPOS)
423  shutting_down = true;
424  else if (NStr::Find(line, TEMP_STRING_CTOR("exclusive job")) != NPOS)
425  exclusive_job = true;
426  }
427 
428  if (!wn_info.HasKey("maximum_job_threads"))
429  wn_info.SetInteger("maximum_job_threads", 1);
430  if (!wn_info.HasKey("version"))
431  wn_info.SetString("version", kEmptyStr);
432  if (!wn_info.HasKey("build_date"))
433  wn_info.SetString("build_date", kEmptyStr);
434 
435  wn_info.SetString("status",
436  shutting_down ? "shutting_down" :
437  suspended ? "suspended" :
438  exclusive_job ? "processing_exclusive_job" :
439  free_worker_threads == 0 ? "busy" :
440  working ? "working" : "ready");
441 
442  wn_info.SetByKey("job_counters", job_counters);
443  wn_info.SetByKey("running_jobs", running_jobs);
444  if (alerts.GetSize()) wn_info.SetByKey("alerts", alerts);
445 
446  return wn_info;
447 }
448 
Iterator for JSON arrays and objects.
JSON node abstraction.
size_t GetSize() const
For a container node (that is, either an array or an object), return the number of elements in the co...
static CJsonNode NewArrayNode()
Create a new JSON array node.
static CJsonNode GuessType(const CTempString &value)
Guess the type of a JSON scalar from the string representation of its value and initialize a new node...
bool HasKey(const string &key) const
Check if an object node has an element accessible by the specified key.
SJsonIteratorImpl * Iterate(EIterationMode mode=eNatural) const
For a container node (that is, either an array or an object), begin iteration over its elements.
void SetString(const string &key, const string &value)
Set a JSON object element to the specified string value.
Int8 GetInteger(const string &key) const
For a JSON object node, return the integer referred to by the specified key.
void AppendString(const string &value)
For an array node, add a string node at the end of the array.
string GetString(const string &key) const
For a JSON object node, return the string referred to by the specified key.
void SetInteger(const string &key, Int8 value)
Set a JSON object element to the specified integer value.
void SetByKey(const string &key, CJsonNode::TInstance value)
For a JSON object node, insert a new element or update an existing element.
static CJsonNode NewStringNode(const string &value)
Create a new JSON string node.
static CJsonNode NewObjectNode()
Create a new JSON object node.
void Append(CJsonNode::TInstance value)
For an array node, add a new element at the end of the array.
Client API for NCBI NetSchedule server.
bool GetNextAttribute(string &attr_name, string &attr_value)
Return the next attribute.
SExecResult ExecWithRetry(const string &cmd, bool multiline_output=false)
Execute remote command 'cmd', wait for the reply, check if it starts with 'OK:', and return the remai...
CNetServiceIterator Iterate(EIterationMode mode=eSortByLoad)
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
CTime –.
Definition: ncbitime.hpp:296
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
char value[7]
Definition: config.c:431
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
@ eOff
Definition: ncbi_types.h:110
@ eDefault
Definition: ncbi_types.h:112
@ eOn
Definition: ncbi_types.h:111
#define NULL
Definition: ncbistd.hpp:225
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
CNetService GetService()
int64_t Int8
8-byte (64-bit) signed integer
Definition: ncbitype.h:104
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
NCBI_NS_STD::string::size_type SIZE_TYPE
Definition: ncbistr.hpp:132
static CTempString TruncateSpaces_Unsafe(const CTempString str, ETrunc where=eTrunc_Both)
Truncate spaces in a string.
Definition: ncbistr.cpp:3187
#define kEmptyStr
Definition: ncbistr.hpp:123
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
Definition: ncbistr.cpp:630
static Int8 StringToInt8(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to Int8.
Definition: ncbistr.cpp:793
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
Definition: ncbistr.cpp:3457
static bool EndsWith(const CTempString str, const CTempString end, ECase use_case=eCase)
Check if a string ends with a specified suffix value.
Definition: ncbistr.hpp:5429
#define NPOS
Definition: ncbistr.hpp:133
static SIZE_TYPE Find(const CTempString str, const CTempString pattern, ECase use_case=eCase, EDirection direction=eForwardSearch, SIZE_TYPE occurrence=0)
Find the pattern in the string.
Definition: ncbistr.cpp:2887
const char * data(void) const
Return a pointer to the array represented.
Definition: tempstr.hpp:313
void erase(size_type pos=0)
Truncate the string at some specified position Note: basic_string<> supports additional erase() optio...
Definition: tempstr.hpp:514
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
Definition: ncbistr.hpp:5411
static Uint8 StringToUInt8(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to Uint8.
Definition: ncbistr.cpp:873
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3550
size_type length(void) const
Return the length of the represented array.
Definition: tempstr.hpp:320
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
static string & ReplaceInPlace(string &src, const string &search, const string &replace, SIZE_TYPE start_pos=0, SIZE_TYPE max_replace=0, SIZE_TYPE *num_replace=0)
Replace occurrences of a substring within a string.
Definition: ncbistr.cpp:3401
static string TruncateSpaces(const string &str, ETrunc where=eTrunc_Both)
Truncate spaces in a string.
Definition: ncbistr.cpp:3182
static string & ToLower(string &str)
Convert string to lower case – string& version.
Definition: ncbistr.cpp:405
static string ParseQuoted(const CTempString str, size_t *n_read=NULL)
Discard C-style backslash escapes and extract a quoted string.
Definition: ncbistr.cpp:4923
@ fConvErr_NoThrow
Do not throw an exception on error.
Definition: ncbistr.hpp:285
@ fAllowTrailingSymbols
Ignore trailing non-numerics characters.
Definition: ncbistr.hpp:298
@ fAllowLeadingSpaces
Ignore leading spaces in converted string.
Definition: ncbistr.hpp:294
@ fSplit_Truncate
Definition: ncbistr.hpp:2501
@ fSplit_MergeDelimiters
Merge adjacent delimiters.
Definition: ncbistr.hpp:2498
@ eReverseSearch
Search in a backward direction.
Definition: ncbistr.hpp:1947
@ eTrunc_Begin
Truncate leading spaces only.
Definition: ncbistr.hpp:2240
@ eNocase
Case insensitive compare.
Definition: ncbistr.hpp:1206
enum ENcbiSwitch ESwitch
Aux.
#define TEMP_STRING_CTOR(str)
Definition: util.hpp:79
int i
const struct ncbi::grid::netcache::search::fields::KEY key
int isspace(Uchar c)
Definition: ncbictype.hpp:69
int isalnum(Uchar c)
Definition: ncbictype.hpp:62
int tolower(Uchar c)
Definition: ncbictype.hpp:72
NetSchedule client specs.
ENetScheduleStatTopic
@ eNetScheduleStatClients
@ eNumberOfNetStheduleStatTopics
const char * entity_name
string g_GetNetScheduleStatCommand(ENetScheduleStatTopic topic)
static bool s_ExtractKey(const CTempString &line, string &key, CTempString &value)
string g_UnquoteIfQuoted(const CTempString &str)
CNetScheduleAdmin::SWorkerNodeInfo SWorkerNodeInfo
CJsonNode g_ServerInfoToJson(CNetServerInfo server_info, bool server_version_key)
static CJsonNode s_WordsToJsonArray(const CTempString &str)
bool g_FixMisplacedPID(CJsonNode &stat_info, CTempString &executable_path, const char *pid_key)
const char * command
CJsonNode g_GetWorkerNodeInfo(CNetScheduleAPI api)
const char * record_prefix
CJsonNode g_GenericStatToJson(CNetServer server, ENetScheduleStatTopic topic, bool verbose)
CJsonNode g_WorkerNodeInfoToJson(CNetServer worker_node)
void g_GetWorkerNodes(CNetScheduleAPI api, list< SWorkerNodeInfo > &worker_nodes)
struct @970 s_StatTopics[eNumberOfNetStheduleStatTopics]
static void NormalizeStatKeyName(CTempString &key)
void g_AppendClientIPSessionIDHitID(string &cmd)
static const char * prefix[]
Definition: pcregrep.c:405
static char * locale
Definition: pcregrep.c:149
static SQLCHAR output[256]
Definition: print.c:5
true_type verbose
Definition: processing.cpp:902
CNetServerInfo g_ServerInfoFromString(const string &server_info)
static const char * str(char *buf, int n)
Definition: stats.c:84
const string & GetClientName() const
else result
Definition: token2.c:20
Modified on Wed Dec 06 07:14:19 2023 by modify_doxy.py rev. 669887