NCBI C++ ToolKit
netschedule_api_admin.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: netschedule_api_admin.cpp 87146 2019-07-30 15:47:21Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Anatoliy Kuznetsov, Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  * Implementation of NetSchedule API.
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "netschedule_api_impl.hpp"
36 
38 
39 
41 
42 using namespace grid::netschedule;
43 
45 {
46  string cmd(on_off != eOff ?
47  "REFUSESUBMITS mode=1" : "REFUSESUBMITS mode=0");
49  m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
50 }
51 
54 {
55  const auto die = level == eDie;
56  string cmd(die ? "SHUTDOWN SUICIDE" :
57  level == eShutdownImmediate ? "SHUTDOWN IMMEDIATE" :
58  level == eDrain ? "SHUTDOWN drain=1" : "SHUTDOWN");
60 
62  auto retry_guard = m_Impl->m_API->m_Service->CreateRetryGuard(retry_mode);
63 
64  try {
65  m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
66  }
67  catch (CNetSrvConnException& ex)
68  {
69  if ((ex.GetErrCode() != CNetSrvConnException::eConnClosedByServer) || !die) throw;
70  }
71 }
72 
73 
75 {
76  string cmd("RECO");
78  m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
79 }
80 
81 void CNetScheduleAdmin::CreateQueue(const string& qname, const string& qclass,
82  const string& description)
83 {
84  limits::Check<limits::SQueueName>(qname);
85 
86  string cmd = "QCRE " + qname;
87  cmd += ' ';
88  cmd += qclass;
89 
90  if (!description.empty()) {
91  cmd += " \"";
92  cmd += description;
93  cmd += '"';
94  }
95 
97 
98  m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
99 }
100 
101 void CNetScheduleAdmin::DeleteQueue(const string& qname)
102 {
103  limits::Check<limits::SQueueName>(qname);
104 
105  string cmd("QDEL " + qname);
107  m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
108 }
109 
110 void CNetScheduleAdmin::DumpJob(CNcbiOstream& out, const string& job_key)
111 {
112  CNetServerMultilineCmdOutput output(DumpJob(job_key));
113 
114  string line;
115 
116  while (output.ReadLine(line))
117  out << line << "\n";
118 }
119 
121 {
122  string cmd("DUMP " + job_key);
124  return m_Impl->m_API->GetServer(job_key).ExecWithRetry(cmd, true);
125 }
126 
127 void CNetScheduleAdmin::CancelAllJobs(const string& job_statuses)
128 {
129  string cmd;
130  if (job_statuses.empty()) {
131  cmd.assign("CANCELQ");
132  } else {
133  cmd.assign("CANCEL status=");
134  cmd.append(job_statuses);
135  }
137  m_Impl->m_API->m_Service.ExecOnAllServers(cmd);
138 }
139 
140 
142 {
143  string cmd("VERSION");
145  m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
146  output_stream, CNetService::eSingleLineOutput);
147 }
148 
149 
151  CNcbiOstream& output_stream,
152  const string& start_after_job,
153  size_t job_count,
154  const string& job_statuses,
155  const string& job_group)
156 {
157  string cmd("DUMP");
158  if (!job_statuses.empty()) {
159  cmd.append(" status=");
160  cmd.append(job_statuses);
161  }
162  if (!start_after_job.empty()) {
163  cmd.append(" start_after=");
164  cmd.append(start_after_job);
165  }
166  if (job_count > 0) {
167  cmd.append(" count=");
168  cmd.append(NStr::NumericToString(job_count));
169  }
170  if (!job_group.empty()) {
171  limits::Check<limits::SJobGroup>(job_group);
172  cmd.append(" group=");
173  cmd.append(job_group);
174  }
176  m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
177  output_stream, CNetService::eMultilineOutput);
178 }
179 
181  CNcbiOstream& output_stream,
182  const string& start_after_job,
183  size_t job_count,
185  const string& job_group)
186 {
187  string job_statuses(CNetScheduleAPI::StatusToString(status));
188 
189  // Must be a rare case
190  if (status == CNetScheduleAPI::eJobNotFound) {
191  job_statuses.clear();
192  }
193 
194  DumpQueue(output_stream, start_after_job, job_count, job_statuses, job_group);
195 }
196 
197 
198 static string s_MkQINFCmd(const string& queue_name)
199 {
200  string qinf_cmd("QINF2 " + queue_name);
202  return qinf_cmd;
203 }
204 
205 static void s_ParseQueueInfo(const string& server_output,
206  CNetScheduleAdmin::TQueueInfo& queue_info)
207 {
208  CUrlArgs url_parser(server_output);
209 
210  ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
211  queue_info[field->name] = field->value;
212  }
213 }
214 
216  const string& queue_name, CNetScheduleAdmin::TQueueInfo& queue_info)
217 {
218  CNetServer::SExecResult exec_result;
219 
220  server->ConnectAndExec(s_MkQINFCmd(queue_name), false, exec_result);
221 
222  s_ParseQueueInfo(exec_result.response, queue_info);
223 }
224 
225 void CNetScheduleAdmin::GetQueueInfo(const string& queue_name,
226  CNetScheduleAdmin::TQueueInfo& queue_info)
227 {
228  GetQueueInfo(m_Impl->m_API->m_Service.Iterate().GetServer(),
229  queue_name, queue_info);
230 }
231 
233  CNetScheduleAdmin::TQueueInfo& queue_info)
234 {
235  GetQueueInfo(server, m_Impl->m_API->m_Queue, queue_info);
236 }
237 
239 {
240  GetQueueInfo(m_Impl->m_API->m_Queue, queue_info);
241 }
242 
243 void CNetScheduleAdmin::PrintQueueInfo(const string& queue_name,
244  CNcbiOstream& output_stream)
245 {
246  bool print_headers = m_Impl->m_API->m_Service.IsLoadBalanced();
247 
248  for (CNetServiceIterator it = m_Impl->m_API->m_Service.Iterate(
249  CNetService::eIncludePenalized); it; ++it) {
250  if (print_headers)
251  output_stream << '[' << (*it).GetServerAddress() << ']' << NcbiEndl;
252 
253  TQueueInfo queue_info;
254 
255  GetQueueInfo(*it, queue_name, queue_info);
256 
257  ITERATE(TQueueInfo, qi, queue_info) {
258  output_stream << qi->first << ": " << qi->second << NcbiEndl;
259  }
260 
261  if (print_headers)
262  output_stream << NcbiEndl;
263  }
264 }
265 
266 void g_GetWorkerNodes(CNetScheduleAPI api, list<CNetScheduleAdmin::SWorkerNodeInfo>& worker_nodes);
267 
269  list<SWorkerNodeInfo>& worker_nodes)
270 {
271  g_GetWorkerNodes(m_Impl->m_API, worker_nodes);
272 }
273 
275 {
276  string cmd("GETCONF");
278  m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
279  output_stream, CNetService::eMultilineOutput);
280 }
281 
283  EStatisticsOptions opt)
284 {
285  string cmd(opt == eStatisticsBrief ? "STAT" :
286  opt == eStatisticsClients ? "STAT CLIENTS" : "STAT ALL");
288  m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
290 }
291 
293 {
294  string cmd("HEALTH");
296  m_Impl->m_API->m_Service.PrintCmdOutput(cmd,
297  output_stream, CNetService::eUrlEncodedOutput);
298 }
299 
301 {
302  string cmd("STAT QUEUES");
304 
305  string output_line;
306 
307  for (CNetServiceIterator it = m_Impl->m_API->m_Service.Iterate(
308  CNetService::eIncludePenalized); it; ++it) {
309  CNetServer server = *it;
310 
311  qlist.push_back(SServerQueueList(server));
312 
313  CNetServerMultilineCmdOutput cmd_output((*it).ExecWithRetry(cmd, true));
314  while (cmd_output.ReadLine(output_line))
315  if (NStr::StartsWith(output_line, "[queue ") &&
316  output_line.length() > sizeof("[queue "))
317  qlist.back().queues.push_back(output_line.substr(
318  sizeof("[queue ") - 1,
319  output_line.length() - sizeof("[queue ")));
320  }
321 }
322 
324  CNetScheduleAdmin::TStatusMap& status_map,
325  const string& affinity_token,
326  const string& job_group)
327 {
328  string cmd = "STAT JOBS";
329 
330  if (!affinity_token.empty()) {
331  limits::Check<limits::SAffinity>(affinity_token);
332  cmd.append(" aff=");
333  cmd.append(affinity_token);
334  }
335 
336  if (!job_group.empty()) {
337  limits::Check<limits::SJobGroup>(job_group);
338  cmd.append(" group=");
339  cmd.append(job_group);
340  }
341 
343 
344  string output_line;
345  CTempString st_str, cnt_str;
346 
347  try {
348  for (CNetServiceIterator it = m_Impl->m_API->m_Service.Iterate(
349  CNetService::eIncludePenalized); it; ++it) {
350  CNetServerMultilineCmdOutput cmd_output(
351  (*it).ExecWithRetry(cmd, true));
352 
353  while (cmd_output.ReadLine(output_line))
354  if (NStr::SplitInTwo(output_line, ":", st_str, cnt_str))
355  status_map[st_str] +=
357  (cnt_str, NStr::eTrunc_Begin));
358  }
359  }
360  catch (CStringException& ex)
361  {
362  NCBI_RETHROW(ex, CNetScheduleException, eProtocolSyntaxError,
363  "Error while parsing STAT JOBS response");
364  }
365 }
366 
Client API for NCBI NetSchedule server.
void GetQueueInfo(CNetServer server, const string &queue_name, TQueueInfo &queue_info)
void PrintServerStatistics(CNcbiOstream &output_stream, EStatisticsOptions opt=eStatisticsBrief)
void PrintServerVersion(CNcbiOstream &output_stream)
Print version string.
void DeleteQueue(const string &qname)
Delete queue Applicable only to queues, created through CreateQueue method.
void GetQueueList(TQueueList &result)
void SwitchToDrainMode(ESwitch on_off)
Enable server drain mode.
void CreateQueue(const string &qname, const string &qclass, const string &description=kEmptyStr)
Create an instance of the given queue class.
void PrintConf(CNcbiOstream &output_stream)
EShutdownLevel
Shutdown level.
void DumpJob(CNcbiOstream &out, const string &job_key)
void ShutdownServer(EShutdownLevel level=eNormalShutdown)
Shutdown the server daemon.
void StatusSnapshot(TStatusMap &status_map, const string &affinity_token=kEmptyStr, const string &job_group=kEmptyStr)
Returns statuses for a given affinity token.
void CancelAllJobs(const string &job_statuses=kEmptyStr)
Cancel all jobs in the queue (optionally with particular statuses).
void DumpQueue(CNcbiOstream &output_stream, const string &start_after_job=kEmptyStr, size_t job_count=0, const string &job_statuses=kEmptyStr, const string &job_group=kEmptyStr)
void PrintHealth(CNcbiOstream &output_stream)
void GetWorkerNodes(list< SWorkerNodeInfo > &worker_nodes)
void PrintQueueInfo(const string &queue_name, CNcbiOstream &output_stream)
list< SServerQueueList > TQueueList
NetSchedule internal exception.
bool ReadLine(string &output)
@ eMultilineOutput_NetCacheStyle
Net Service exception.
CStringException –.
Definition: ncbistr.hpp:4505
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
CUrlArgs::
Definition: ncbi_url.hpp:240
static CS_COMMAND * cmd
Definition: ct_dynamic.c:26
std::ofstream out("events_result.xml")
main entry point for tests
@ eDie
Definition: grid_cli.hpp:244
@ eDrain
Definition: grid_cli.hpp:245
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
@ eOff
Definition: ncbi_types.h:110
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_RETHROW(prev_exception, exception_class, err_code, message)
Generic macro to re-throw an exception.
Definition: ncbiexpt.hpp:737
EJobStatus
Job status codes.
static string StatusToString(EJobStatus status)
Printable status type.
@ eJobNotFound
No such job.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
#define NcbiEndl
Definition: ncbistre.hpp:548
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
static CTempString TruncateSpaces_Unsafe(const CTempString str, ETrunc where=eTrunc_Both)
Truncate spaces in a string.
Definition: ncbistr.cpp:3187
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
Definition: ncbistr.hpp:5411
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3550
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
Definition: ncbistr.cpp:642
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
@ eTrunc_Begin
Truncate leading spaces only.
Definition: ncbistr.hpp:2240
const TArgs & GetArgs(void) const
Get the const list of arguments.
Definition: ncbi_url.hpp:300
list< TArg > TArgs
Definition: ncbi_url.hpp:276
enum ENcbiSwitch ESwitch
Aux.
NetSchedule client specs.
static string s_MkQINFCmd(const string &queue_name)
void g_GetWorkerNodes(CNetScheduleAPI api, list< CNetScheduleAdmin::SWorkerNodeInfo > &worker_nodes)
static void s_ParseQueueInfo(const string &server_output, CNetScheduleAdmin::TQueueInfo &queue_info)
void g_AppendClientIPSessionIDHitID(string &cmd)
static SQLCHAR output[256]
Definition: print.c:5
CNetServer::SExecResult ConnectAndExec(const string &cmd, bool multiline_output, bool retry_on_exception=false)
Modified on Sat Dec 09 04:49:04 2023 by modify_doxy.py rev. 669887