41 #if defined(NCBI_OS_UNIX)
48 #define PIPE_SIZE 64 * 1024
59 bool operator()(
int current,
int max_attempts);
65 const bool first_attempt = current == 1;
83 if (current > max_attempts) {
104 bool operator()(
int current,
int max_attempts)
const;
110 if (
path.empty())
return;
121 if (
path.empty())
return true;
123 const bool first_attempt = current == 1;
128 if (!first_attempt) {
138 if (current > max_attempts) {
232 <<
" second(s), stopping the child: " << pid);
271 string app_ver(
out.str(), 0, 1024);
307 ERR_POST(
"Unknown parameter value: "
308 "parameter \"progress_message_on_timeout\", "
309 "value: \"" <<
mode <<
"\". "
310 "Allowed values: smart, always, never");
351 if (!(is >> skipws >> ch))
return;
359 bool interval =
false;
364 if (!(is >>
n))
break;
365 if (!(is >> ch)) ch =
',';
372 err <<
n <<
" is less or equal than previous number, "
373 "intervals must be sorted and not overlapping";
374 throw invalid_argument(err.str());
385 }
else if (ch ==
'-' && !interval) {
391 err <<
"Unexpected char '" << ch <<
"'";
392 throw invalid_argument(err.str());
398 err <<
"Missing interval end";
399 throw invalid_argument(err.str());
404 err <<
"Not a number near: " << is.rdbuf();
405 throw invalid_argument(err.str());
425 catch (invalid_argument& ex) {
427 "Parameter '" << param <<
"' parsing error: " << ex.
what());
447 int Get(
const string& param,
int def)
const
452 bool Get(
const string& param,
bool def)
const
462 m_NonZeroExitAction(eDoneOnNonZeroExit),
463 m_RemoveTempDir(
true),
464 m_CacheStdOutErr(
true)
471 if (reg.
HasEntry(sec_name,
"non_zero_exit_action") ) {
472 string val = reg.
GetString(sec_name,
"non_zero_exit_action",
"");
480 ERR_POST(
"Unknown parameter value: "
481 "section [" << sec_name <<
"], "
482 "parameter \"non_zero_exit_action\", "
483 "value: \"" <<
val <<
"\". "
484 "Allowed values: fail, return, done");
486 }
else if (sec.
Get(
"fail_on_non_zero_exit",
false))
490 s_ReadRanges(reg, sec_name,
"fail_no_retries_if_exit_code"));
494 if (sec.
Get(
"run_in_separate_dir",
false)) {
495 if (reg.
HasEntry(sec_name,
"tmp_dir"))
506 if (reg.
HasEntry(sec_name,
"remove_tmp_dir"))
511 int sleep = sec.
Get(
"sleep_between_remove_tmp_attempts", 60);
512 int max_attempts = sec.
Get(
"max_remove_tmp_attempts", 60);
521 "Missing configuration parameter [" << sec_name <<
542 <<
"\". The Monitor application will not run!");
561 list<string> added_env;
564 ITERATE(list<string>, it, added_env) {
565 const string& s = *it;
569 int sleep = sec.
Get(
"sleep_between_reap_attempts", 60);
570 int max_attempts = sec.
Get(
"max_reap_attempts_after_kill", 60);
574 const string args = reg.
GetString(sec_name,
"version_args",
"-version");
579 const string mode = reg.
GetString(sec_name,
"progress_message_on_timeout",
597 if (!
file.GetMode(&user_mode))
620 CRemoteAppReaper::CScheduler& pm)
684 const string& path,
const char*
const*
env,
740 args.push_back(
"-pid");
742 args.push_back(
"-jid");
744 args.push_back(
"-jwdir");
752 out, err, exit_value,
760 catch (exception& ex) {
764 err <<
"Unknown error";
767 switch (exit_value) {
771 if (non_empty_output) {
777 x_Log(
"exited with zero return code", err);
783 x_Log(
"job is returned", err);
788 x_Log(
"job failed", err);
793 errmsg =
"Monitor requested job termination";
794 throw runtime_error(errmsg);
799 x_Log(
"internal error", err);
832 if (!tmp_dir.empty() && cache_std_out_err) {
841 ERR_POST(
"Could not create a temporary file " <<
842 m_Name <<
" :" << ex.
what() <<
" the data will be "
843 "written directly to the original stream");
848 #if defined(NCBI_OS_UNIX)
853 fcntl(fd, F_SETFD, fcntl(fd, F_GETFD, 0) | FD_CLOEXEC);
866 catch (exception& ex) {
867 ERR_POST(
"CTmpStreamGuard::~CTmpStreamGuard(): " <<
868 m_Name <<
" --> " << ex.what());
903 unsigned app_run_timeout,
904 const char*
const env[])
const
907 if (!tmp_path.empty()) {
909 bool substitution_found =
false;
911 while ((subst_pos = tmp_path.find(
'%')) != string::npos) {
912 if (subst_pos + 1 >= tmp_path.length())
914 switch (tmp_path[subst_pos + 1]) {
916 tmp_path.replace(subst_pos, 2, 1,
'%');
919 tmp_path.replace(subst_pos, 2, job_context.
GetQueueName());
922 tmp_path.replace(subst_pos, 2, job_context.
GetJobKey());
930 (
unsigned)
lt.GetLocalTime().GetTimeT()));
933 tmp_path.erase(subst_pos, 2);
935 substitution_found =
true;
937 if (!substitution_found)
952 string working_dir(tmp_path.empty() ?
CDir::GetCwd() : tmp_path);
967 unique_ptr<CPipe::IProcessWatcher> watcher(monitor ?
976 tmp_path,
env, watcher.get(),
980 std_err_guard.
Close();
981 std_out_guard.
Close();
998 " - will not be rerun",
CScheduler & GetScheduler()
CInvalidParamException –.
CJobContextProcessWatcher(SParams &p)
virtual EAction OnStart(TProcessHandle pid)
This method is called when the process has just been started by the ExecWait() method.
CWorkerNodeJobContext & m_JobContext
virtual EAction Watch(TProcessHandle pid)
This method is getting called periodically during the process execution by the ExecWait() method.
CRemoteAppTimeoutReporter & m_TimeoutReporter
void x_Log(const string &what, CNcbiOstrstream &sstream)
CMonitoredProcessWatcher(SParams &p, const string &job_wdir, const string &path, const char *const *env, CTimeout run_period, CTimeout run_timeout)
const char *const * m_Env
virtual EAction Watch(TProcessHandle pid)
This method is getting called periodically during the process execution by the ExecWait() method.
EAction MonitorRun(TProcessHandle pid)
static CNcbiApplication * Instance(void)
Singleton method.
CNcbiOstrstreamToString class helps convert CNcbiOstrstream to a string Sample usage:
@ eShutdownImmediate
Urgent shutdown was requested.
Callback interface for ExecWait()
Extended exit information for waited process.
Note about the "buf_size" parameter for streams in this API.
Class representing (non overlapping) integer ranges.
bool Contain(int n) const
Checks whether provided number belongs to one of the ranges.
CRanges(istream &is)
Reads integer ranges from an input stream.
void Run(CWorkerNodeIdleTaskContext &)
Do the Idle task here.
list< string > m_IncludeEnv
static bool CanExec(const CFile &file)
unique_ptr< CRemoteAppTimeoutReporter > m_TimeoutReporter
unique_ptr< CRemoteAppVersion > m_Version
CRemoteAppLauncher(const string &sec_name, const IRegistry &)
unique_ptr< CRemoteAppRemover > m_Remover
void FinishJob(bool finished_ok, int ret, CWorkerNodeJobContext &context) const
bool ExecRemoteApp(const vector< string > &args, CNcbiIstream &in, CNcbiOstream &out, CNcbiOstream &err, int &exit_value, CWorkerNodeJobContext &job_context, unsigned app_run_timeout, const char *const env[]) const
string GetAppVersion(const string &) const
const string & GetAppPath() const
unique_ptr< CRanges > m_MustFailNoRetries
CTimeout m_MonitorRunTimeout
unique_ptr< CRemoteAppReaper > m_Reaper
CTimeout m_KeepAlivePeriod
ENonZeroExitAction m_NonZeroExitAction
list< string > m_ExcludeEnv
void Report(CWorkerNodeJobContext &job_context, unsigned seconds)
CRemoteAppTimeoutReporter(const string &mode)
static EMode Get(const string &mode)
string Get(CTimedProcessWatcher::SParams &p, const string &v) const
const vector< string > m_Args
CRemoteAppVersion(const string &app, const vector< string > &args)
virtual EAction Watch(TProcessHandle pid)
This method is getting called periodically during the process execution by the ExecWait() method.
const string m_ProcessType
CTimedProcessWatcher(SParams &p)
CRemoteAppReaper::CScheduler & m_ProcessManager
CTimeout – Timeout interval.
unsigned PresetSeconds() const
CTimer(const CTimeout &timeout)
CTmpStreamGuard(const string &tmp_dir, const string &name, CNcbiOstream &orig_stream, bool cache_std_out_err)
CNcbiOstream & GetOStream()
CNcbiOstream & m_OrigStream
unique_ptr< CFileReaderWriter > m_ReaderWriter
unique_ptr< CNcbiOstream > m_StreamGuard
Writer-based output stream.
Worker Node Idle Task Context.
std::ofstream out("events_result.xml")
main entry point for tests
CRanges * s_ReadRanges(const IRegistry ®, const string &sec, string param)
CTimeout s_ToTimeout(unsigned sec)
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
const string & GetProgramDisplayName(void) const
Get the application's "display" name.
CDiagContext & GetDiagContext(void)
Get diag context instance.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
void Warning(CExceptionArgs_Base &args)
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
static TExitCode System(const char *cmdline)
Execute the specified command.
static string NormalizePath(const string &path, EFollowLinks follow_links=eIgnoreLinks)
Normalize a path.
static bool IsAbsolutePath(const string &path)
Check if a "path" is absolute for the current OS.
unsigned int TMode
Bitwise OR of "EMode".
bool CreatePath(TCreateFlags flags=fCreate_Default) const
Create the directory path recursively possibly more than one at a time.
virtual bool Exists(void) const
Check if directory "dirname" exists.
static char GetPathSeparator(void)
Get path separator symbol specific for the current platform.
static string GetCwd(void)
Get the current working directory.
@ eCreate
Create a new file, or truncate an existing one.
@ fExecute
Execute / List(directory) permission.
@ eBegin
Absolute position from beginning of the file.
@ eRecursiveIgnoreMissing
Same as eRecursive, but do not report an error for disappeared entries (e.g.
void CommitJob()
Confirm that a job is done and result is ready to be sent back to the client.
void CommitJobWithFailure(const string &err_msg, bool no_retries=false)
Confirm that a job is finished, but an error has happened during its execution.
const string & GetQueueName() const
Get a name of a queue where this node is connected to.
bool IsJobCommitted() const
CNetScheduleAdmin::EShutdownLevel GetShutdownLevel()
Check if job processing must be aborted.
void PutProgressMessage(const string &msg, bool send_immediately=false, bool overwrite=true)
Put progress message.
void ReturnJob()
Schedule the job for return.
bool IsLogRequested() const
Check if logging was requested in config file.
const string & GetJobKey() const
Get a job key.
void JobDelayExpiration(unsigned runtime_inc)
Increment job execution timeout.
virtual EAction OnStart(TProcessHandle)
This method is called when the process has just been started by the ExecWait() method.
EAction
An action which the ExecWait() method should take after the Watch() method has returned.
static EFinish ExecWait(const string &cmd, const vector< string > &args, CNcbiIstream &in, CNcbiOstream &out, CNcbiOstream &err, int &exit_code, const string ¤t_dir=kEmptyStr, const char *const env[]=0, IProcessWatcher *watcher=0, const STimeout *kill_timeout=0, size_t pipe_size=0)
Execute a command with a vector of arguments, and wait for its completion.
@ eDone
Process finished normally.
@ eContinue
Continue running.
@ eStop
Kill the child process and exit.
@ eExit
Exit without waiting for the child process.
uint64_t Uint8
8-byte (64-bit) unsigned integer
TProcessHandle GetHandle(void) const
Get stored process handle.
bool KillGroup(unsigned long timeout=kDefaultKillTimeout) const
Terminate a group of processes.
bool IsExited(void) const
TRUE if the process terminated normally.
int Wait(unsigned long timeout=kInfiniteTimeoutMs, CExitInfo *info=0) const
Wait until process terminates.
bool IsSignaled(void) const
TRUE if the process terminated by a signal (UNIX only).
virtual bool GetBool(const string §ion, const string &name, bool default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get boolean value of specified parameter name.
virtual int GetInt(const string §ion, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
virtual bool HasEntry(const string §ion, const string &name=kEmptyStr, TFlags flags=0) const
virtual void EnumerateEntries(const string §ion, list< string > *entries, TFlags flags=fAllLayers) const
Enumerate parameter names for a specified section.
virtual string GetString(const string §ion, const string &name, const string &default_value, TFlags flags=0) const
Get the parameter string value.
@ eReturn
Return default value.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
bool IsOssEmpty(CNcbiOstrstream &oss)
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
IO_PREFIX::istream CNcbiIstream
Portable alias for istream.
bool NcbiStreamCopy(CNcbiOstream &os, CNcbiIstream &is)
Copy the entire contents of stream "is" to stream "os".
static int CompareNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive compare of a substring with another string.
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
static string Sanitize(CTempString str, TSS_Flags flags=fSS_print)
Sanitize a string, allowing only specified classes of characters.
static string & ReplaceInPlace(string &src, const string &search, const string &replace, SIZE_TYPE start_pos=0, SIZE_TYPE max_replace=0, SIZE_TYPE *num_replace=0)
Replace occurrences of a substring within a string.
static string UInt8ToString(Uint8 value, TNumToStringFlags flags=0, int base=10)
Convert UInt8 to string.
@ fSplit_MergeDelimiters
Merge adjacent delimiters.
bool IsExpired(void) const
Check if the deadline is expired.
double GetAsDouble(void) const
Get as number of seconds (fractional value).
bool IsFinite() const
Check if timeout holds a numeric value.
@ eInfinite
Infinite timeout.
unsigned int usec
microseconds (modulo 1,000,000)
where both of them are integers Note
range(_Ty, _Ty) -> range< _Ty >
Portable class to work with a spawned process via pipes.
std::istream & in(std::istream &in_, double &x_)
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
double f(double x_, const double &y_)
Defines CRequestContext class for NCBI C++ diagnostic API.
Reader-writer based streams.
SParams(CWorkerNodeJobContext &jc, string pt, const CTimeout &rt, const CTimeout &kap, CRemoteAppTimeoutReporter &tr, CRemoteAppReaper::CScheduler &pm)
CWorkerNodeJobContext & job_context
const CTimeout & keep_alive_period
CRemoteAppTimeoutReporter & timeout_reporter
CRemoteAppReaperTask(TProcessHandle handle)
bool operator()(int current, int max_attempts)
CRemoteAppRemoverTask(string p)
bool operator()(int current, int max_attempts) const
CRemoteAppRemoverTask m_Task
SGuard(CRemoteAppRemover *remover, CRemoteAppRemoverTask task, bool remove_tmp_dir)
CRemoteAppRemover::CScheduler * m_Scheduler
CRemoteAppReaper::CScheduler & process_manager
const CTimeout & run_timeout
SParams(string pt, const CTimeout &rt, CRemoteAppReaper::CScheduler &pm)
SSection(const IRegistry &r, const string &n)
int Get(const string ¶m, int def) const
bool Get(const string ¶m, bool def) const