NCBI C++ ToolKit
netscheduled.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: netscheduled.cpp 86066 2019-04-03 17:53:33Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Anatoliy Kuznetsov, Victor Joukov
27  *
28  * File Description: Network scheduler daemon
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 #include <stdio.h>
34 #include <corelib/ncbiapp.hpp>
35 #include <corelib/ncbienv.hpp>
36 #include <corelib/ncbireg.hpp>
37 #include <corelib/ncbiargs.hpp>
38 #include <corelib/ncbistr.hpp>
39 #include <corelib/ncbimisc.hpp>
40 #include <corelib/ncbi_config.hpp>
41 #include <corelib/ncbimtx.hpp>
42 #include <corelib/ncbidiag.hpp>
43 
45 
47 #include <connect/server.hpp>
48 #include <connect/ncbi_socket.hpp>
50 
51 #include "queue_database.hpp"
52 #include "ns_types.hpp"
53 #include "ns_server_params.hpp"
54 #include "ns_handler.hpp"
55 #include "ns_server.hpp"
56 #include "netschedule_version.hpp"
57 #include "ns_application.hpp"
58 #include "ns_util.hpp"
59 
60 #include <corelib/ncbi_process.hpp>
61 #include <signal.h>
62 #include <unistd.h>
63 
64 
66 
67 
68 static const string kPidFileArgName = "pidfile";
69 static const string kReinitArgName = "reinit";
70 static const string kNodaemonArgName = "nodaemon";
71 
72 
73 /// @internal
75 {
77 
79  if (server &&
80  (!server->ShutdownRequested()) ) {
81  server->SetShutdownFlag(signum);
82  }
83 }
84 
85 
87  : CNcbiApplication(), m_Reinit(false)
88 {
92  CRef<CVersion> full_version(new CVersion(version));
93 
94  full_version->AddComponentVersion("Storage",
98  full_version->AddComponentVersion("Protocol",
102 
104  SetFullVersion(full_version);
105 }
106 
107 
109 {
110  // Convert multi-line diagnostic messages into one-line ones by default.
111  // The two diagnostics flags meka difference for the local logs only.
112  // When the code is run on production hosts the logs are saved in /logs and
113  // in this case the diagnostics switches the flags unconditionally.
114  // SetDiagPostFlag(eDPF_PreMergeLines);
115  // SetDiagPostFlag(eDPF_MergeLines);
116 
117 
118  // Create command-line argument descriptions class
119  unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
120 
121  // Specify USAGE context
122  arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
123  "Network job queue server");
124  arg_desc->SetMiscFlags(CArgDescriptions::fNoUsage |
126 
127  arg_desc->AddOptionalKey(kPidFileArgName, "File_Name",
128  "File to save NetSchedule process PID",
130  arg_desc->AddFlag(kReinitArgName, "Recreate the storage directory.");
131  arg_desc->AddFlag(kNodaemonArgName,
132  "Turn off daemonization of NetSchedule at the start.");
133 
134  SetupArgDescriptions(arg_desc.release());
135 
137 }
138 
139 
141 {
143  .Print("_type", "startup")
144  .Print("info", "versions")
145  .Print("server", NETSCHEDULED_VERSION)
148  .Print("build", NETSCHEDULED_BUILD_DATE);
149 
150  const CArgs & args = GetArgs();
151  const CNcbiRegistry & reg = GetConfig();
152 
153  x_SaveSection(reg, "log", m_OrigLogSection);
154  x_SaveSection(reg, "diag", m_OrigDiagSection);
155 
156  CNcbiOstrstream ostr;
157  reg.Write(ostr);
159  .Print("_type", "startup")
160  .Print("info", "configuration")
161  .Print("content", (string)CNcbiOstrstreamToString(ostr));
162 
163  // attempt to get server gracefully shutdown on signal
164  signal(SIGINT, Threaded_Server_SignalHandler);
165  signal(SIGTERM, Threaded_Server_SignalHandler);
166 
167  vector<string> config_warnings;
168  bool admin_decrypt_error(false);
169  // true -> throw exception if there is a port problem
170  NS_ValidateConfigFile(reg, config_warnings, true, admin_decrypt_error);
171 
172  // [server] section
173  SNS_Parameters params;
174  params.Read(reg);
175 
179 
181  unique_ptr<CNetScheduleServer> server(new CNetScheduleServer(params.path,
182  params.diskless));
183  server->SetCustomThreadSuffix("_h");
184  server->SetNSParameters(params, false);
185  server->SetAnybodyCanReconfigure(admin_decrypt_error);
186 
187  // Use port passed through parameters
188  server->AddDefaultListener(new CNetScheduleConnectionFactory(&*server));
189  server->StartListening();
190 
191  m_Reinit = args[kReinitArgName];
192  unique_ptr<CQueueDataBase> qdb(new CQueueDataBase(server.get(),
193  params.path,
194  params.max_queues,
195  params.diskless,
196  m_Reinit));
197 
198  if (!args[kNodaemonArgName]) {
200  .Print("_type", "startup")
201  .Print("info", "entering UNIX daemon mode");
202  // Here's workaround for SQLite3 bug: if stdin is closed in forked
203  // process then 0 file descriptor is returned to SQLite after open().
204  // But there's assert there which prevents fd to be equal to 0. So
205  // we keep descriptors 0, 1 and 2 in child process open. Latter two -
206  // just in case somebody will try to write to them.
207  bool is_good = CCurrentProcess::Daemonize(
208  kEmptyCStr,
212  if (!is_good)
213  NCBI_THROW(CNetScheduleException, eInternalError,
214  "Error during daemonization.");
215  }
216  else
218  .Print("_type", "startup")
219  .Print("info", "operating in non-daemon mode");
220 
221  // [queue_*], [qclass_*] and [queues] sections
222  // Scan and mount queues
223  CJsonNode unused_diff = CJsonNode::NewObjectNode();
224  CNSPreciseTime min_run_timeout = qdb->Configure(reg, unused_diff);
225 
226  if (min_run_timeout < CNSPreciseTime(0.2))
227  min_run_timeout = CNSPreciseTime(0.2);
228 
230  .Print("_type", "startup")
231  .Print("info", "check running jobs expiration")
232  .Print("timeout", NS_FormatPreciseTimeAsSec(min_run_timeout));
233 
234  // Save the process PID if PID is given
235  x_WritePid(server.get());
236 
237  if (!config_warnings.empty()) {
238  string msg;
239  for (vector<string>::const_iterator k = config_warnings.begin();
240  k != config_warnings.end(); ++k) {
241  ERR_POST(*k);
242  if (!msg.empty())
243  msg += "\n";
244  msg += *k;
245  }
246  server->RegisterAlert(eStartupConfig, msg);
247  }
248 
249  // Calculate the config file checksum and memorize it
250  vector<string> config_checksum_warnings;
251  string config_checksum = NS_GetConfigFileChecksum(
252  GetConfigPath(), config_checksum_warnings);
253  if (config_checksum_warnings.empty()) {
254  server->SetRAMConfigFileChecksum(config_checksum);
255  server->SetDiskConfigFileChecksum(config_checksum);
256  } else {
257  for (vector<string>::const_iterator
258  k = config_checksum_warnings.begin();
259  k != config_checksum_warnings.end(); ++k)
260  ERR_POST(*k);
261  }
262 
263  qdb->RunExecutionWatcherThread(min_run_timeout);
264  qdb->RunPurgeThread();
265  qdb->RunNotifThread();
266  qdb->RunServiceThread();
267 
268  server->SetQueueDB(qdb.release());
269  server->ReadServicesConfig(reg);
270 
271  if (args[kNodaemonArgName])
272  NcbiCout << "Server started" << NcbiEndl;
273 
274  try {
275  server->Run();
276  }
277  catch (CThreadException& ex) {
278  ERR_POST(Critical << ex);
279  }
280 
281  if (args[kNodaemonArgName])
282  NcbiCout << "Server stopped" << NcbiEndl;
283 
284  return 0;
285 }
286 
287 
289  const string & section,
290  map<string, string> & storage)
291 {
292  storage.clear();
293 
294  if (reg.HasEntry(section)) {
295  list<string> entries;
296  reg.EnumerateEntries(section, &entries);
297 
298  for (list<string>::const_iterator k = entries.begin();
299  k != entries.end(); ++k)
300  storage[*k] = reg.GetString(section, *k, kEmptyStr);
301  }
302 }
303 
304 
305 // true if the PID is written successfully
307 {
308  const CArgs & args = GetArgs();
309 
310  // Check that the pidfile argument is really a file
311  if (args[kPidFileArgName]) {
312 
313  string pid_file = args[kPidFileArgName].AsString();
314 
315  if (pid_file == "-") {
316  string msg = "pid file cannot be standard output and only "
317  "file name is accepted";
318  ERR_POST(Warning << msg << ". Ignore and continue.");
319  server->RegisterAlert(ePidFile, msg);
320  return;
321  }
322 
323  // Test writeability
324  if (access(pid_file.c_str(), F_OK) == 0) {
325  // File exists
326  if (access(pid_file.c_str(), W_OK) != 0) {
327  string msg = "pid file is not writable";
328  ERR_POST(Warning << msg << ". Ignore and continue.");
329  server->RegisterAlert(ePidFile, msg);
330  return;
331  }
332  }
333 
334  // File does not exist or write access is granted
335  FILE * f = fopen(pid_file.c_str(), "w");
336  if (f == NULL) {
337  string msg = "error opening pid file for writing";
338  ERR_POST(Warning << msg << ". Ignore and continue.");
339  server->RegisterAlert(ePidFile, msg);
340  return;
341  }
342  fprintf(f, "%d", (unsigned int) CDiagContext::GetPID());
343  fclose(f);
344  }
345 }
346 
347 
348 int main(int argc, const char* argv[])
349 {
355  return CNetScheduleDApp().AppMain(argc, argv, NULL, eDS_ToStdlog);
356 }
357 
#define false
Definition: bool.h:36
CArgDescriptions –.
Definition: ncbiargs.hpp:541
CArgs –.
Definition: ncbiargs.hpp:379
JSON node abstraction.
static CJsonNode NewObjectNode()
Create a new JSON object node.
CNcbiOstrstreamToString class helps convert CNcbiOstrstream to a string Sample usage:
Definition: ncbistre.hpp:802
CNcbiRegistry –.
Definition: ncbireg.hpp:913
CNetScheduleConnectionFactory::
STimeout m_ServerAcceptTimeout
map< string, string > m_OrigLogSection
int Run(void)
Run the application.
void x_SaveSection(const CNcbiRegistry &reg, const string &section, map< string, string > &storage)
void Init(void)
Initialize the application.
void x_WritePid(CNetScheduleServer *server) const
map< string, string > m_OrigDiagSection
NetSchedule internal exception.
NetScheduler threaded server.
Definition: ns_server.hpp:57
static CNetScheduleServer * GetInstance(void)
Definition: ns_server.cpp:690
void RegisterAlert(EAlertType alert_type, const string &message)
Definition: ns_server.cpp:643
void SetShutdownFlag(int signum=0, bool db_was_drained=false)
Definition: ns_server.cpp:382
virtual bool ShutdownRequested(void)
Runs synchronously between iterations.
Definition: ns_server.cpp:369
CRef –.
Definition: ncbiobj.hpp:618
CVersionInfo –.
void clear()
Definition: map.hpp:169
The NCBI C++ standard methods for dealing with std::string.
void SetFullVersion(CRef< CVersionAPI > version)
Set version data for the program.
Definition: ncbiapp.cpp:1154
const CNcbiRegistry & GetConfig(void) const
Get the application's cached configuration parameters (read-only).
virtual const CArgs & GetArgs(void) const
Get parsed command line arguments.
Definition: ncbiapp.cpp:285
int AppMain(int argc, const char *const *argv, const char *const *envp=0, EAppDiagStream diag=eDS_Default, const char *conf=NcbiEmptyCStr, const string &name=NcbiEmptyString)
Main function (entry point) for the NCBI application.
Definition: ncbiapp.cpp:799
virtual void SetupArgDescriptions(CArgDescriptions *arg_desc)
Setup the command line argument descriptions.
Definition: ncbiapp.cpp:1175
const string & GetConfigPath(void) const
Get the full path to the configuration file (if any) we ended up using.
const CNcbiArguments & GetArguments(void) const
Get the application's cached unprocessed command-line arguments.
void SetVersion(const CVersionInfo &version)
Set the version number for the program.
Definition: ncbiapp.cpp:1135
@ fNoUsage
Do not print USAGE on argument error.
Definition: ncbiargs.hpp:1028
@ fDupErrToCerr
Print arg error to both log and cerr.
Definition: ncbiargs.hpp:1032
@ eOutputFile
Name of file (must be writable)
Definition: ncbiargs.hpp:596
#define NULL
Definition: ncbistd.hpp:225
CDiagContext_Extra & Print(const string &name, const string &value)
The method does not print the argument, but adds it to the string.
Definition: ncbidiag.cpp:2622
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:818
CDiagContext_Extra Extra(void) const
Create a temporary CDiagContext_Extra object.
Definition: ncbidiag.hpp:2095
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
Definition: ncbidiag.cpp:1901
static TPID GetPID(void)
Get cached PID (read real PID if not cached yet).
Definition: ncbidiag.cpp:1526
void SetAutoIncRequestIDOnPost(bool enable)
Auto-increment request ID with every posted message.
static void SetAllowedSessionIDFormat(ESessionIDFormat fmt)
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
static void SetDefaultAutoIncRequestIDOnPost(bool enable)
Set default auto-increment flag used for each default request context.
static void SetOldPostFormat(bool value)
Set old/new format flag.
Definition: ncbidiag.cpp:3352
void g_Diag_Use_RWLock(bool enable=true)
Use RW-lock for synchronization rather than mutex.
Definition: ncbidiag.cpp:88
@ eSID_Other
Any other format.
@ eDS_ToStdlog
Try standard log file (app.name + ".log") in /log/ and current directory, use stderr if both fail.
Definition: ncbidiag.hpp:1783
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1203
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
static TPid Daemonize(const char *logfile=0, TDaemonFlags flags=0)
Go daemon.
@ fDF_KeepCWD
Don't change CWD to "/".
@ fDF_KeepStdout
Keep stdout open as "/dev/null" (WO)
@ fDF_KeepStdin
Keep stdin open as "/dev/null" (RO)
virtual bool HasEntry(const string &section, const string &name=kEmptyStr, TFlags flags=0) const
Definition: ncbireg.cpp:290
virtual void EnumerateEntries(const string &section, list< string > *entries, TFlags flags=fAllLayers) const
Enumerate parameter names for a specified section.
Definition: ncbireg.cpp:514
virtual string GetString(const string &section, const string &name, const string &default_value, TFlags flags=0) const
Get the parameter string value.
Definition: ncbireg.cpp:321
bool Write(CNcbiOstream &os, TFlags flags=0) const
Write the registry content to output stream.
Definition: ncbireg.cpp:196
EIO_Status SOCK_InitializeAPI(void)
Initialize all internal/system data & resources to be used by the SOCK API.
Definition: ncbi_socket.c:991
ESOCK_IOWaitSysAPI SOCK_SetIOWaitSysAPI(ESOCK_IOWaitSysAPI api)
This is a helper call that can improve I/O performance (ignored for MSVC).
Definition: ncbi_socket.c:1086
@ eSOCK_IOWaitSysAPIPoll
always use poll()
Definition: ncbi_socket.h:386
#define NcbiEndl
Definition: ncbistre.hpp:548
#define NcbiCout
Definition: ncbistre.hpp:543
#define kEmptyStr
Definition: ncbistr.hpp:123
const char *const kEmptyCStr
Empty "C" string (points to a '\0').
Definition: ncbistr.cpp:68
const STimeout * accept_timeout
Maximum t between exit checks.
Definition: server.hpp:440
unsigned int usec
microseconds (modulo 1,000,000)
Definition: ncbi_types.h:78
unsigned int sec
seconds
Definition: ncbi_types.h:77
#define CVersion
static int version
Definition: mdb_load.c:29
Compressed bitset (entry point to bm.h)
Parameters initialization model.
#define NCBI_PACKAGE_VERSION_PATCH
#define NCBI_PACKAGE_VERSION_MINOR
#define NCBI_PACKAGE_VERSION_MAJOR
Defines process management classes.
Defines the CNcbiApplication and CAppException classes for creating NCBI applications.
Defines command line argument related classes.
Defines NCBI C++ diagnostic APIs, classes, and macros.
Defines unified interface to application:
Miscellaneous common-use basic types and functionality.
Multi-threading – mutexes; rw-locks; semaphore.
Process information in the NCBI Registry, including working with configuration files.
#define NETSCHEDULED_VERSION
#define NETSCHEDULED_PROTOCOL_VERSION_MINOR
#define NETSCHEDULED_STORAGE_VERSION_MAJOR
#define NETSCHEDULED_STORAGE_VERSION_PATCH
#define NETSCHEDULED_STORAGE_VERSION
#define NETSCHEDULED_PROTOCOL_VERSION_PATCH
#define NETSCHEDULED_PROTOCOL_VERSION_MAJOR
#define NETSCHEDULED_PROTOCOL_VERSION
#define NETSCHEDULED_BUILD_DATE
#define NETSCHEDULED_STORAGE_VERSION_MINOR
static const string kReinitArgName
static const string kNodaemonArgName
int main(int argc, const char *argv[])
USING_NCBI_SCOPE
void Threaded_Server_SignalHandler(int signum)
static const string kPidFileArgName
T signum(T x_)
double f(double x_, const double &y_)
Definition: njn_root.hpp:188
@ eStartupConfig
Definition: ns_alert.hpp:48
@ ePidFile
Definition: ns_alert.hpp:50
string NS_FormatPreciseTimeAsSec(const CNSPreciseTime &t)
void NS_ValidateConfigFile(const IRegistry &reg, vector< string > &warnings, bool throw_port_exception, bool &decrypting_error)
Definition: ns_util.cpp:190
string NS_GetConfigFileChecksum(const string &file_name, vector< string > &warnings)
Definition: ns_util.cpp:605
Framework to create multithreaded network servers with thread-per-request scheduling.
unsigned int max_queues
void Read(const IRegistry &reg)
static wxAcceleratorEntry entries[3]
Modified on Sat Dec 02 09:19:20 2023 by modify_doxy.py rev. 669887