NCBI C++ ToolKit
exec_helpers.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: exec_helpers.cpp 102566 2024-06-01 19:58:38Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Maxim Didenko, Dmitry Kazimirov, Rafael Sadyrov
27  *
28  * File Description: NetSchedule worker node sample
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include <sstream>
35 
36 #include <connect/ncbi_pipe.hpp>
37 
38 #include <corelib/rwstream.hpp>
39 #include <corelib/request_ctx.hpp>
40 
41 #if defined(NCBI_OS_UNIX)
42 #include <fcntl.h>
43 #endif
44 
45 #include "exec_helpers.hpp"
46 #include "async_task.hpp"
47 
48 #define PIPE_SIZE 64 * 1024
49 
51 
52 // A record for a child process
54 {
56 
58 
59  bool operator()(int current, int max_attempts);
60 };
61 
62 bool CRemoteAppReaperTask::operator()(int current, int max_attempts)
63 {
64  CProcess::CExitInfo exitinfo;
65  const bool first_attempt = current == 1;
66 
67  if (process.Wait(0, &exitinfo) != -1 || exitinfo.IsExited() || exitinfo.IsSignaled()) {
68  // Log a message for those that had failed to be killed before
69  if (!first_attempt) {
70  LOG_POST(Note << "Successfully waited for a process: " << process.GetHandle());
71  }
72 
73  return true;
74  }
75 
76  if (first_attempt) {
77  if (process.KillGroup()) return true;
78 
79  ERR_POST(Warning << "Failed to kill a process: " << process.GetHandle() << ", will wait for it");
80  return false;
81  }
82 
83  if (current > max_attempts) {
84  // Give up if there are too many attempts to wait for a process
85  ERR_POST("Gave up waiting for a process: " << process.GetHandle());
86  return true;
87  }
88 
89  return false;
90 }
91 
92 // This class is responsibe for the whole process of reaping child processes
93 class CRemoteAppReaper : public CAsyncTaskProcessor<CRemoteAppReaperTask>
94 {
96 };
97 
99 {
100  const string path;
101 
102  CRemoteAppRemoverTask(string p);
103 
104  bool operator()(int current, int max_attempts) const;
105 };
106 
108  path(std::move(p))
109 {
110  if (path.empty()) return;
111 
112  CDir dir(path);
113 
114  if (dir.Exists()) return;
115 
116  dir.CreatePath();
117 }
118 
119 bool CRemoteAppRemoverTask::operator()(int current, int max_attempts) const
120 {
121  if (path.empty()) return true;
122 
123  const bool first_attempt = current == 1;
124 
125  try {
127  // Log a message for those that had failed to be removed before
128  if (!first_attempt) {
129  LOG_POST(Note << "Successfully removed a path: " << path);
130  }
131 
132  return true;
133  }
134  }
135  catch (...) {
136  }
137 
138  if (current > max_attempts) {
139  // Give up if there are too many attempts to remove a path
140  ERR_POST("Gave up removing a path: " << path);
141  return true;
142  }
143 
144  if (first_attempt) {
145  ERR_POST(Warning << "Failed to remove a path: " << path << ", will try later");
146  return false;
147  }
148 
149  return false;
150 }
151 
152 // This class is responsibe for removing tmp directories
153 class CRemoteAppRemover : public CAsyncTaskProcessor<CRemoteAppRemoverTask>
154 {
155 public:
156  struct SGuard;
157 
159 };
160 
162 {
163  SGuard(CRemoteAppRemover* remover, CRemoteAppRemoverTask task, bool remove_tmp_dir) :
164  m_Scheduler(remover ? &remover->GetScheduler() : nullptr),
165  m_Task(std::move(task)),
166  m_RemoveTmpDir(remove_tmp_dir)
167  {}
168 
170  {
171  // Do not remove
172  if (!m_RemoveTmpDir) return;
173 
174  // Remove asynchronously
175  if (m_Scheduler && (*m_Scheduler)(m_Task)) return;
176 
177  // Remove synchronously
178  m_Task(1, 0);
179  }
180 
181 private:
182  CRemoteAppRemover::CScheduler* m_Scheduler;
185 };
186 
187 class CTimer
188 {
189 public:
190  CTimer(const CTimeout& timeout) :
191  m_Deadline(timeout),
192  m_Timeout(timeout)
193  {}
194 
196  bool IsExpired() const { return m_Deadline.IsExpired(); }
197  unsigned PresetSeconds() const { return (unsigned)m_Timeout.GetAsDouble(); }
198 
199 private:
202 };
203 
205 {
206 public:
207  struct SParams
208  {
209  string process_type;
211  CRemoteAppReaper::CScheduler& process_manager;
212 
213  SParams(string pt, const CTimeout& rt, CRemoteAppReaper::CScheduler& pm) :
214  process_type(std::move(pt)),
215  run_timeout(rt),
216  process_manager(pm)
217  {}
218  };
219 
221  : m_ProcessManager(p.process_manager),
222  m_ProcessType(p.process_type),
223  m_Deadline(p.run_timeout)
224  {
225  }
226 
228  {
229  if (m_Deadline.IsExpired()) {
230  ERR_POST(m_ProcessType << " run time exceeded "
232  <<" second(s), stopping the child: " << pid);
233  return m_ProcessManager(pid) ? eExit : eStop;
234  }
235 
236  return eContinue;
237  }
238 
239 protected:
240  CRemoteAppReaper::CScheduler& m_ProcessManager;
241  const string m_ProcessType;
243 };
244 
245 // This class is responsible for reporting app/cgi version run by this app
247 {
248 public:
249  CRemoteAppVersion(const string& app, const vector<string>& args)
250  : m_App(app), m_Args(args)
251  {}
252 
253  string Get(CTimedProcessWatcher::SParams& p, const string& v) const;
254 
255 private:
256  const string m_App;
257  const vector<string> m_Args;
258 };
259 
260 string CRemoteAppVersion::Get(CTimedProcessWatcher::SParams& p, const string& v) const
261 {
262  CTimedProcessWatcher wait_one_second(p);
263  istringstream in;
264  ostringstream out;
265  ostringstream err;
266  int exit_code;
267 
268  if (CPipe::ExecWait(m_App, m_Args, in, out, err, exit_code,
269  kEmptyStr, 0, &wait_one_second) == CPipe::eDone) {
270  // Restrict version string to 1024 chars
271  string app_ver(out.str(), 0, 1024);
272  return NStr::Sanitize(app_ver) + " / " + v;
273  }
274 
275  return v;
276 }
277 
278 // This class is responsible for reporting clients about apps timing out
280 {
281 public:
283 
284  void Report(CWorkerNodeJobContext& job_context, unsigned seconds)
285  {
286  if (m_Mode != eNever) {
287  job_context.PutProgressMessage("Job run time exceeded " +
288  NStr::UIntToString(seconds) + " seconds.", true,
289  m_Mode == eAlways);
290  }
291  }
292 
293 private:
295 
296  static EMode Get(const string& mode)
297  {
298  if (!NStr::CompareNocase(mode, "smart"))
299  return eSmart;
300 
301  else if (!NStr::CompareNocase(mode, "always"))
302  return eAlways;
303 
304  else if (!NStr::CompareNocase(mode, "never"))
305  return eNever;
306  else
307  ERR_POST("Unknown parameter value: "
308  "parameter \"progress_message_on_timeout\", "
309  "value: \"" << mode << "\". "
310  "Allowed values: smart, always, never");
311 
312  return eSmart;
313  }
314 
316 };
317 
318 /// Class representing (non overlapping) integer ranges.
319 ///
320 class CRanges
321 {
322 public:
323  /// Reads integer ranges from an input stream.
324  ///
325  /// The data must be in the following format (in ascending order):
326  /// [!] R1, N2, ..., Rn
327  ///
328  /// Where:
329  /// ! - negation, makes all provided ranges be excluded
330  /// (not included).
331  /// R1 ... Rn - integer closed ranges specified either as
332  /// FROM - TO (corresponds to [FROM, TO] range) or as
333  /// NUMBER (corresponds to [NUMBER, NUMBER] range).
334  /// Example:
335  /// 4, 6 - 9, 16 - 40, 64
336  ///
337  CRanges(istream& is);
338 
339  /// Checks whether provided number belongs to one of the ranges.
340  ///
341  bool Contain(int n) const;
342 
343 private:
344  vector<int> m_Ranges;
345 };
346 
347 CRanges::CRanges(istream& is)
348 {
349  char ch;
350 
351  if (!(is >> skipws >> ch)) return; // EoF
352 
353  // Char '!' makes provided ranges to be excluded (not included)
354  if (ch == '!')
355  m_Ranges.push_back(numeric_limits<int>::min());
356  else
357  is.putback(ch);
358 
359  bool interval = false;
360 
361  do {
362  int n;
363 
364  if (!(is >> n)) break;
365  if (!(is >> ch)) ch = ',';
366 
367  if (m_Ranges.size()) {
368  int previous = m_Ranges.back();
369 
370  if (n < previous) {
371  ostringstream err;
372  err << n << " is less or equal than previous number, "
373  "intervals must be sorted and not overlapping";
374  throw invalid_argument(err.str());
375  }
376  }
377 
378  if (ch == ',') {
379  // If it's only one number in interval, add both start and end of it
380  if (!interval) m_Ranges.push_back(n);
381 
382  m_Ranges.push_back(n + 1);
383  interval = false;
384 
385  } else if (ch == '-' && !interval) {
386  interval = true;
387  m_Ranges.push_back(n);
388 
389  } else {
390  ostringstream err;
391  err << "Unexpected char '" << ch << "'";
392  throw invalid_argument(err.str());
393  }
394  } while (is);
395 
396  if (interval) {
397  ostringstream err;
398  err << "Missing interval end";
399  throw invalid_argument(err.str());
400  }
401 
402  if (!is.eof()) {
403  ostringstream err;
404  err << "Not a number near: " << is.rdbuf();
405  throw invalid_argument(err.str());
406  }
407 }
408 
409 bool CRanges::Contain(int n) const
410 {
411  auto range = upper_bound(m_Ranges.begin(), m_Ranges.end(), n);
412 
413  // Every number starts a range, every second range is excluded
414  return (range - m_Ranges.begin()) % 2;
415 }
416 
417 CRanges* s_ReadRanges(const IRegistry& reg, const string& sec, string param)
418 {
419  if (reg.HasEntry(sec, param)) {
420  istringstream iss(reg.GetString(sec, param, kEmptyStr));
421 
422  try {
423  return new CRanges(iss);
424  }
425  catch (invalid_argument& ex) {
426  NCBI_THROW_FMT(CInvalidParamException, eInvalidCharacter,
427  "Parameter '" << param << "' parsing error: " << ex.what());
428  }
429  }
430 
431  return nullptr;
432 }
433 
434 CTimeout s_ToTimeout(unsigned sec)
435 {
436  // Zero counts as infinite timeout
437  return sec ? CTimeout(sec, 0) : CTimeout::eInfinite;
438 }
439 
440 struct SSection
441 {
442  const IRegistry& reg;
443  const string& name;
444 
445  SSection(const IRegistry& r, const string& n) : reg(r), name(n) {}
446 
447  int Get(const string& param, int def) const
448  {
449  return reg.GetInt(name, param, def, 0, IRegistry::eReturn);
450  }
451 
452  bool Get(const string& param, bool def) const
453  {
454  return reg.GetBool(name, param, def, 0, IRegistry::eReturn);
455  }
456 };
457 
458 //////////////////////////////////////////////////////////////////////////////
459 ///
461  const IRegistry& reg) :
462  m_NonZeroExitAction(eDoneOnNonZeroExit),
463  m_RemoveTempDir(true),
464  m_CacheStdOutErr(true)
465 {
466  const SSection sec(reg, sec_name);
467 
468  m_AppRunTimeout = s_ToTimeout(sec.Get("max_app_run_time", 0));
469  m_KeepAlivePeriod = s_ToTimeout(sec.Get("keep_alive_period", 0));
470 
471  if (reg.HasEntry(sec_name, "non_zero_exit_action") ) {
472  string val = reg.GetString(sec_name, "non_zero_exit_action", "");
473  if (NStr::CompareNocase(val, "fail") == 0 )
475  else if (NStr::CompareNocase(val, "return") == 0 )
477  else if (NStr::CompareNocase(val, "done") == 0 )
479  else {
480  ERR_POST("Unknown parameter value: "
481  "section [" << sec_name << "], "
482  "parameter \"non_zero_exit_action\", "
483  "value: \"" << val << "\". "
484  "Allowed values: fail, return, done");
485  }
486  } else if (sec.Get("fail_on_non_zero_exit", false))
488 
489  m_MustFailNoRetries.reset(
490  s_ReadRanges(reg, sec_name, "fail_no_retries_if_exit_code"));
491 
492  const string name = CNcbiApplication::Instance()->GetProgramDisplayName();
493 
494  if (sec.Get("run_in_separate_dir", false)) {
495  if (reg.HasEntry(sec_name, "tmp_dir"))
496  m_TempDir = reg.GetString(sec_name, "tmp_dir", "." );
497  else
498  m_TempDir = reg.GetString(sec_name, "tmp_path", "." );
499 
501  string tmp = CDir::GetCwd()
503  + m_TempDir;
505  }
506  if (reg.HasEntry(sec_name, "remove_tmp_dir"))
507  m_RemoveTempDir = sec.Get("remove_tmp_dir", true);
508  else
509  m_RemoveTempDir = sec.Get("remove_tmp_path", true);
510 
511  int sleep = sec.Get("sleep_between_remove_tmp_attempts", 60);
512  int max_attempts = sec.Get("max_remove_tmp_attempts", 60);
513  m_Remover.reset(new CRemoteAppRemover(sleep, max_attempts, name + "_rm"));
514 
515  m_CacheStdOutErr = sec.Get("cache_std_out_err", true);
516  }
517 
518  m_AppPath = reg.GetString(sec_name, "app_path", kEmptyStr);
519  if (m_AppPath.empty()) {
520  NCBI_THROW_FMT(CConfigException, eParameterMissing,
521  "Missing configuration parameter [" << sec_name <<
522  "].app_path");
523  }
525  string tmp = CDir::GetCwd()
527  + m_AppPath;
529  }
530 
531  m_MonitorAppPath = reg.GetString(sec_name, "monitor_app_path", kEmptyStr);
532  if (!m_MonitorAppPath.empty()) {
534  string tmp = CDir::GetCwd()
538  }
540  if (!f.Exists() || !CanExec(f)) {
541  ERR_POST("Can not execute \"" << m_MonitorAppPath
542  << "\". The Monitor application will not run!");
544  }
545  }
546 
547  m_MonitorRunTimeout = s_ToTimeout(sec.Get("max_monitor_running_time", 5));
548  m_MonitorPeriod = s_ToTimeout(sec.Get("monitor_period", 5));
549  m_KillTimeout.sec = sec.Get("kill_timeout", 1);
550  m_KillTimeout.usec = 0;
551 
552  m_ExcludeEnv.clear();
553  m_IncludeEnv.clear();
554  m_AddedEnv.clear();
555 
556  NStr::Split(reg.GetString("env_inherit", "exclude", "")," ;,", m_ExcludeEnv,
558  NStr::Split(reg.GetString("env_inherit", "include", "")," ;,", m_IncludeEnv,
560 
561  list<string> added_env;
562  reg.EnumerateEntries("env_set", &added_env);
563 
564  ITERATE(list<string>, it, added_env) {
565  const string& s = *it;
566  m_AddedEnv[s] = reg.GetString("env_set", s, "");
567  }
568 
569  int sleep = sec.Get("sleep_between_reap_attempts", 60);
570  int max_attempts = sec.Get("max_reap_attempts_after_kill", 60);
571  m_Reaper.reset(new CRemoteAppReaper(sleep, max_attempts, name + "_cl"));
572 
573  const string cmd = reg.GetString(sec_name, "version_cmd", m_AppPath);
574  const string args = reg.GetString(sec_name, "version_args", "-version");
575  vector<string> v;
576  m_Version.reset(new CRemoteAppVersion(cmd,
577  NStr::Split(args, " ", v)));
578 
579  const string mode = reg.GetString(sec_name, "progress_message_on_timeout",
580  "smart");
582 }
583 
584 // We need this explicit empty destructor,
585 // so it could destruct CRemoteAppReaper and CRemoteAppVersion instances.
586 // Otherwise, there would be implicit inline destructor
587 // that could be placed where these classes are incomplete.
589 {
590 }
591 
592 //////////////////////////////////////////////////////////////////////////////
593 ///
595 {
596  CDirEntry::TMode user_mode = 0;
597  if (!file.GetMode(&user_mode))
598  return false;
599  if (user_mode & CDirEntry::fExecute)
600  return true;
601  return false;
602 }
603 
604 //////////////////////////////////////////////////////////////////////////////
605 ///
607 {
608 public:
610  {
614 
616  string pt,
617  const CTimeout& rt,
618  const CTimeout& kap,
620  CRemoteAppReaper::CScheduler& pm)
621  : CTimedProcessWatcher::SParams(std::move(pt), rt, pm),
622  job_context(jc),
623  keep_alive_period(kap),
624  timeout_reporter(tr)
625  {}
626  };
627 
630  m_JobContext(p.job_context), m_KeepAlive(p.keep_alive_period),
631  m_TimeoutReporter(p.timeout_reporter)
632  {
633  }
634 
636  {
639  return eStop;
640  }
641 
642  LOG_POST(Note << "Child PID: " << NStr::UInt8ToString((Uint8) pid));
643 
644  return CTimedProcessWatcher::OnStart(pid);
645  }
646 
648  {
652  return eStop;
653  }
654 
655  EAction action = CTimedProcessWatcher::Watch(pid);
656 
657  if (action != eContinue) {
659  return action;
660  }
661 
662  if (m_KeepAlive.IsExpired()) {
665  }
666 
667  return eContinue;
668  }
669 
670 protected:
672 
673 private:
676 };
677 
678 //////////////////////////////////////////////////////////////////////////////
679 ///
681 {
682 public:
683  CMonitoredProcessWatcher(SParams& p, const string& job_wdir,
684  const string& path, const char* const* env,
685  CTimeout run_period, CTimeout run_timeout)
687  m_MonitorWatch(run_period),
688  m_JobWDir(job_wdir),
689  m_Path(path),
690  m_Env(env),
691  m_RunTimeout(run_timeout)
692  {
693  }
694 
696  {
698 
699  if (action != eContinue)
700  return action;
701 
702  if (m_MonitorWatch.IsExpired()) {
703  action = MonitorRun(pid);
705  }
706 
707  return action;
708  }
709 
710 private:
711  // The exit code of the monitor program is interpreted as follows
712  // (any exit code not listed below is treated as eInternalError)
713  enum EResult {
714  // The job is running as expected.
715  // The monitor's stdout is interpreted as a job progress message.
716  // The stderr goes to the log file if logging is enabled.
718  // The monitor detected an inconsistency with the job run;
719  // the job must be returned back to the queue.
720  // The monitor's stderr goes to the log file
721  // regardless of whether logging is enabled or not.
723  // The job must be failed.
724  // The monitor's stdout is interpreted as the error message;
725  // stderr goes to the log file regardless of whether
726  // logging is enabled or not.
728  // There's a problem with the monitor application itself.
729  // The job continues to run and the monitor's stderr goes
730  // to the log file unconditionally.
732  };
733 
735  {
738  CNcbiOstrstream err;
739  vector<string> args;
740  args.push_back("-pid");
741  args.push_back(NStr::UInt8ToString((Uint8) pid));
742  args.push_back("-jid");
743  args.push_back(m_JobContext.GetJobKey());
744  args.push_back("-jwdir");
745  args.push_back(m_JobWDir);
746 
748  CTimedProcessWatcher callback(params);
749  int exit_value = eInternalError;
750  try {
751  if (CPipe::eDone != CPipe::ExecWait(m_Path, args, in,
752  out, err, exit_value,
753  kEmptyStr, m_Env,
754  &callback,
755  NULL,
756  PIPE_SIZE)) {
757  exit_value = eInternalError;
758  }
759  }
760  catch (exception& ex) {
761  err << ex.what();
762  }
763  catch (...) {
764  err << "Unknown error";
765  }
766 
767  switch (exit_value) {
768  case eJobRunning:
769  {
770  bool non_empty_output = !IsOssEmpty(out);
771  if (non_empty_output) {
773  (CNcbiOstrstreamToString(out), true);
774  }
776  ( !non_empty_output || !IsOssEmpty(err) ))
777  x_Log("exited with zero return code", err);
778  }
779  return eContinue;
780 
781  case eJobToReturn:
783  x_Log("job is returned", err);
784  return eStop;
785 
786  case eJobFailed:
787  {
788  x_Log("job failed", err);
789  string errmsg;
790  if ( !IsOssEmpty(out) ) {
791  errmsg = CNcbiOstrstreamToString(out);
792  } else
793  errmsg = "Monitor requested job termination";
794  throw runtime_error(errmsg);
795  }
796  return eContinue;
797  }
798 
799  x_Log("internal error", err);
800  return eContinue;
801  }
802 
803  inline void x_Log(const string& what, CNcbiOstrstream& sstream)
804  {
805  if ( !IsOssEmpty(sstream) ) {
806  ERR_POST(m_JobContext.GetJobKey() << " (monitor) " << what <<
807  ": " << (string)CNcbiOstrstreamToString(sstream));
808  } else {
809  ERR_POST(m_JobContext.GetJobKey() << " (monitor) " << what << ".");
810  }
811  }
812 
814  string m_JobWDir;
815  string m_Path;
816  const char* const* m_Env;
818 };
819 
820 
821 //////////////////////////////////////////////////////////////////////////////
822 ///
823 
825 {
826 public:
827  CTmpStreamGuard(const string& tmp_dir,
828  const string& name,
829  CNcbiOstream& orig_stream,
830  bool cache_std_out_err) : m_OrigStream(orig_stream), m_Stream(NULL)
831  {
832  if (!tmp_dir.empty() && cache_std_out_err) {
833  m_Name = tmp_dir + CDirEntry::GetPathSeparator();
834  m_Name += name;
835  }
836  if (!m_Name.empty()) {
837  try {
840  } catch (CFileException& ex) {
841  ERR_POST("Could not create a temporary file " <<
842  m_Name << " :" << ex.what() << " the data will be "
843  "written directly to the original stream");
844  m_Name.erase();
846  return;
847  }
848 #if defined(NCBI_OS_UNIX)
849  // If the file is created on an NFS file system, the CLOEXEC
850  // flag needs to be set, otherwise deleting the temporary
851  // directory will not succeed.
852  TFileHandle fd = m_ReaderWriter->GetFileIO().GetFileHandle();
853  fcntl(fd, F_SETFD, fcntl(fd, F_GETFD, 0) | FD_CLOEXEC);
854 #endif
855  m_StreamGuard.reset(new CWStream(m_ReaderWriter.get()));
856  m_Stream = m_StreamGuard.get();
857  } else {
859  }
860  }
862  {
863  try {
864  Close();
865  }
866  catch (exception& ex) {
867  ERR_POST("CTmpStreamGuard::~CTmpStreamGuard(): " <<
868  m_Name << " --> " << ex.what());
869  }
870  }
871 
873 
874  void Close()
875  {
876  if (!m_Name.empty() && m_StreamGuard.get()) {
877  m_StreamGuard.reset();
878  m_ReaderWriter->Flush();
879  m_ReaderWriter->GetFileIO().SetFilePos(0, CFileIO_Base::eBegin);
880  {
881  CRStream rstm(m_ReaderWriter.get());
882  if (!rstm.good()
883  || !NcbiStreamCopy(m_OrigStream, rstm))
884  ERR_POST( "Cannot copy \"" << m_Name << "\" file.");
885  }
886  m_ReaderWriter.reset();
887  }
888  }
889 
890 private:
892  unique_ptr<CFileReaderWriter> m_ReaderWriter;
893  unique_ptr<CNcbiOstream> m_StreamGuard;
895  string m_Name;
896 };
897 
898 
899 bool CRemoteAppLauncher::ExecRemoteApp(const vector<string>& args,
901  int& exit_value,
902  CWorkerNodeJobContext& job_context,
903  unsigned app_run_timeout,
904  const char* const env[]) const
905 {
906  string tmp_path = m_TempDir;
907  if (!tmp_path.empty()) {
909  bool substitution_found = false;
910  size_t subst_pos;
911  while ((subst_pos = tmp_path.find('%')) != string::npos) {
912  if (subst_pos + 1 >= tmp_path.length())
913  break;
914  switch (tmp_path[subst_pos + 1]) {
915  case '%':
916  tmp_path.replace(subst_pos, 2, 1, '%');
917  continue;
918  case 'q':
919  tmp_path.replace(subst_pos, 2, job_context.GetQueueName());
920  break;
921  case 'j':
922  tmp_path.replace(subst_pos, 2, job_context.GetJobKey());
923  break;
924  case 'r':
925  tmp_path.replace(subst_pos, 2, NStr::UInt8ToString(
926  GetDiagContext().GetRequestContext().GetRequestID()));
927  break;
928  case 't':
929  tmp_path.replace(subst_pos, 2, NStr::UIntToString(
930  (unsigned) lt.GetLocalTime().GetTimeT()));
931  break;
932  default:
933  tmp_path.erase(subst_pos, 2);
934  }
935  substitution_found = true;
936  }
937  if (!substitution_found)
938  tmp_path += CDirEntry::GetPathSeparator() +
939  job_context.GetQueueName() + "_" +
940  job_context.GetJobKey() + "_" +
941  NStr::UIntToString((unsigned) lt.GetLocalTime().GetTimeT());
942  }
943 
944  CRemoteAppRemover::SGuard guard(m_Remover.get(), tmp_path, m_RemoveTempDir);
945  {
946  CTmpStreamGuard std_out_guard(tmp_path, "std.out", out,
948  CTmpStreamGuard std_err_guard(tmp_path, "std.err", err,
950 
951  CTimeout run_timeout = min(s_ToTimeout(app_run_timeout), m_AppRunTimeout);
952  string working_dir(tmp_path.empty() ? CDir::GetCwd() : tmp_path);
953 
954 #ifdef NCBI_OS_MSWIN
955  NStr::ReplaceInPlace(working_dir, "\\", "/");
956 #endif
957 
958  CJobContextProcessWatcher::SParams params(job_context,
959  "Job",
960  run_timeout,
963  m_Reaper->GetScheduler());
964 
965  bool monitor = !m_MonitorAppPath.empty() && m_MonitorPeriod.IsFinite();
966 
967  unique_ptr<CPipe::IProcessWatcher> watcher(monitor ?
968  new CMonitoredProcessWatcher(params, working_dir,
970  new CJobContextProcessWatcher(params));
971 
972  bool result = CPipe::ExecWait(GetAppPath(), args, in,
973  std_out_guard.GetOStream(),
974  std_err_guard.GetOStream(),
975  exit_value,
976  tmp_path, env, watcher.get(),
977  &m_KillTimeout,
979 
980  std_err_guard.Close();
981  std_out_guard.Close();
982 
983  return result;
984  }
985 }
986 
987 void CRemoteAppLauncher::FinishJob(bool finished_ok, int ret,
989 {
990  if (!finished_ok) {
991  if (!context.IsJobCommitted())
992  context.CommitJobWithFailure("Job has been canceled");
993  } else
994  // Check whether retries are disabled for the specified exit code.
995  if (m_MustFailNoRetries && m_MustFailNoRetries->Contain(ret))
996  context.CommitJobWithFailure(
997  "Exited with return code " + NStr::IntToString(ret) +
998  " - will not be rerun",
999  true /* no retries */);
1000  else if (ret == 0 || m_NonZeroExitAction == eDoneOnNonZeroExit)
1001  context.CommitJob();
1003  context.ReturnJob();
1004  else
1005  context.CommitJobWithFailure(
1006  "Exited with return code " + NStr::IntToString(ret));
1007 }
1008 
1009 string CRemoteAppLauncher::GetAppVersion(const string& v) const
1010 {
1011  CTimeout run_timeout(1.0);
1012  CTimedProcessWatcher::SParams params("Version", run_timeout, m_Reaper->GetScheduler());
1013  return m_Version->Get(params, v);
1014 }
1015 
1017 {
1018  m_Reaper->StartExecutor();
1019  if (m_Remover) m_Remover->StartExecutor();
1020 }
1021 
1023 {
1024  if (!m_AppCmd.empty())
1025  CExec::System(m_AppCmd.c_str());
1026 }
1027 
CConfigException –.
Definition: ncbi_config.hpp:53
CDeadline.
Definition: ncbitime.hpp:1830
CDir –.
Definition: ncbifile.hpp:1696
CFastLocalTime –.
Definition: ncbitime.hpp:1895
CFileException –.
Definition: ncbifile.hpp:136
CFile –.
Definition: ncbifile.hpp:1605
CInvalidParamException –.
Definition: ncbiexpt.hpp:1505
CJobContextProcessWatcher(SParams &p)
virtual EAction OnStart(TProcessHandle pid)
This method is called when the process has just been started by the ExecWait() method.
CWorkerNodeJobContext & m_JobContext
virtual EAction Watch(TProcessHandle pid)
This method is getting called periodically during the process execution by the ExecWait() method.
CRemoteAppTimeoutReporter & m_TimeoutReporter
void x_Log(const string &what, CNcbiOstrstream &sstream)
CMonitoredProcessWatcher(SParams &p, const string &job_wdir, const string &path, const char *const *env, CTimeout run_period, CTimeout run_timeout)
const char *const * m_Env
virtual EAction Watch(TProcessHandle pid)
This method is getting called periodically during the process execution by the ExecWait() method.
EAction MonitorRun(TProcessHandle pid)
static CNcbiApplication * Instance(void)
Singleton method.
Definition: ncbiapp.cpp:264
CNcbiOstrstreamToString class helps convert CNcbiOstrstream to a string Sample usage:
Definition: ncbistre.hpp:802
@ eShutdownImmediate
Urgent shutdown was requested.
Callback interface for ExecWait()
Definition: ncbi_pipe.hpp:403
Extended exit information for waited process.
CProcess –.
Note about the "buf_size" parameter for streams in this API.
Definition: rwstream.hpp:122
Class representing (non overlapping) integer ranges.
vector< int > m_Ranges
bool Contain(int n) const
Checks whether provided number belongs to one of the ranges.
CRanges(istream &is)
Reads integer ranges from an input stream.
void Run(CWorkerNodeIdleTaskContext &)
Do the Idle task here.
list< string > m_IncludeEnv
static bool CanExec(const CFile &file)
unique_ptr< CRemoteAppTimeoutReporter > m_TimeoutReporter
unique_ptr< CRemoteAppVersion > m_Version
CRemoteAppLauncher(const string &sec_name, const IRegistry &)
unique_ptr< CRemoteAppRemover > m_Remover
void FinishJob(bool finished_ok, int ret, CWorkerNodeJobContext &context) const
bool ExecRemoteApp(const vector< string > &args, CNcbiIstream &in, CNcbiOstream &out, CNcbiOstream &err, int &exit_value, CWorkerNodeJobContext &job_context, unsigned app_run_timeout, const char *const env[]) const
string GetAppVersion(const string &) const
const string & GetAppPath() const
unique_ptr< CRanges > m_MustFailNoRetries
CTimeout m_MonitorRunTimeout
unique_ptr< CRemoteAppReaper > m_Reaper
CTimeout m_KeepAlivePeriod
ENonZeroExitAction m_NonZeroExitAction
list< string > m_ExcludeEnv
void Report(CWorkerNodeJobContext &job_context, unsigned seconds)
CRemoteAppTimeoutReporter(const string &mode)
static EMode Get(const string &mode)
string Get(CTimedProcessWatcher::SParams &p, const string &v) const
const vector< string > m_Args
CRemoteAppVersion(const string &app, const vector< string > &args)
const string m_App
virtual EAction Watch(TProcessHandle pid)
This method is getting called periodically during the process execution by the ExecWait() method.
const string m_ProcessType
CTimedProcessWatcher(SParams &p)
const CTimer m_Deadline
CRemoteAppReaper::CScheduler & m_ProcessManager
CTimeout – Timeout interval.
Definition: ncbitime.hpp:1693
CTimeout m_Timeout
unsigned PresetSeconds() const
CTimer(const CTimeout &timeout)
void Restart()
CDeadline m_Deadline
bool IsExpired() const
CTmpStreamGuard(const string &tmp_dir, const string &name, CNcbiOstream &orig_stream, bool cache_std_out_err)
CNcbiOstream & GetOStream()
CNcbiOstream & m_OrigStream
unique_ptr< CFileReaderWriter > m_ReaderWriter
unique_ptr< CNcbiOstream > m_StreamGuard
CNcbiOstream * m_Stream
Writer-based output stream.
Definition: rwstream.hpp:171
Worker Node Idle Task Context.
Worker Node job context.
IRegistry –.
Definition: ncbireg.hpp:73
void clear()
Definition: map.hpp:169
std::ofstream out("events_result.xml")
main entry point for tests
CRanges * s_ReadRanges(const IRegistry &reg, const string &sec, string param)
#define PIPE_SIZE
CTimeout s_ToTimeout(unsigned sec)
static CS_COMMAND * cmd
Definition: ct_dynamic.c:26
#define true
Definition: bool.h:35
static HENV env
Definition: transaction2.c:38
static char tmp[3200]
Definition: utf8.c:42
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
const string & GetProgramDisplayName(void) const
Get the application's "display" name.
#define NULL
Definition: ncbistd.hpp:225
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:818
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
Definition: ncbiexpt.hpp:719
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
Definition: ncbiexpt.cpp:342
static TExitCode System(const char *cmdline)
Execute the specified command.
Definition: ncbiexec.cpp:505
static string NormalizePath(const string &path, EFollowLinks follow_links=eIgnoreLinks)
Normalize a path.
Definition: ncbifile.cpp:820
static bool IsAbsolutePath(const string &path)
Check if a "path" is absolute for the current OS.
Definition: ncbifile.cpp:508
unsigned int TMode
Bitwise OR of "EMode".
Definition: ncbifile.hpp:1173
bool CreatePath(TCreateFlags flags=fCreate_Default) const
Create the directory path recursively possibly more than one at a time.
Definition: ncbifile.cpp:4106
virtual bool Exists(void) const
Check if directory "dirname" exists.
Definition: ncbifile.hpp:4066
static char GetPathSeparator(void)
Get path separator symbol specific for the current platform.
Definition: ncbifile.cpp:433
HANDLE TFileHandle
Definition: ncbifile.hpp:115
static string GetCwd(void)
Get the current working directory.
Definition: ncbifile.cpp:3708
@ eCreate
Create a new file, or truncate an existing one.
Definition: ncbifile.hpp:3422
@ fExecute
Execute / List(directory) permission.
Definition: ncbifile.hpp:1152
@ eBegin
Absolute position from beginning of the file.
Definition: ncbifile.hpp:3462
@ eRecursiveIgnoreMissing
Same as eRecursive, but do not report an error for disappeared entries (e.g.
Definition: ncbifile.hpp:747
const string & GetQueueName() const
Get a name of a queue where this node is connected to.
CNetScheduleAdmin::EShutdownLevel GetShutdownLevel()
Check if job processing must be aborted.
void PutProgressMessage(const string &msg, bool send_immediately=false, bool overwrite=true)
Put progress message.
void ReturnJob()
Schedule the job for return.
bool IsLogRequested() const
Check if logging was requested in config file.
const string & GetJobKey() const
Get a job key.
void JobDelayExpiration(unsigned runtime_inc)
Increment job execution timeout.
static EFinish ExecWait(const string &cmd, const vector< string > &args, CNcbiIstream &in, CNcbiOstream &out, CNcbiOstream &err, int &exit_code, const string &current_dir=kEmptyStr, const char *const envp[]=0, IProcessWatcher *watcher=0, const STimeout *kill_timeout=0, size_t pipe_size=0)
Execute a command with a vector of arguments, and wait for its completion.
Definition: ncbi_pipe.cpp:2139
virtual EAction OnStart(TProcessHandle)
This method is called when the process has just been started by the ExecWait() method.
Definition: ncbi_pipe.hpp:422
EAction
An action which the ExecWait() method should take after the Watch() method has returned.
Definition: ncbi_pipe.hpp:407
@ eDone
Process finished normally.
Definition: ncbi_pipe.hpp:438
@ eContinue
Continue running.
Definition: ncbi_pipe.hpp:408
@ eStop
Kill the child process and exit.
Definition: ncbi_pipe.hpp:409
@ eExit
Exit without waiting for the child process.
Definition: ncbi_pipe.hpp:410
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
TProcessHandle GetHandle(void) const
Get stored process handle.
bool KillGroup(unsigned long timeout=kDefaultKillTimeout) const
Terminate a group of processes.
TPid TProcessHandle
bool IsExited(void) const
TRUE if the process terminated normally.
int Wait(unsigned long timeout=kInfiniteTimeoutMs, CExitInfo *info=0) const
Wait until process terminates.
bool IsSignaled(void) const
TRUE if the process terminated by a signal (UNIX only).
virtual bool GetBool(const string &section, const string &name, bool default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get boolean value of specified parameter name.
Definition: ncbireg.cpp:391
virtual int GetInt(const string &section, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
Definition: ncbireg.cpp:362
virtual bool HasEntry(const string &section, const string &name=kEmptyStr, TFlags flags=0) const
Definition: ncbireg.cpp:290
virtual void EnumerateEntries(const string &section, list< string > *entries, TFlags flags=fAllLayers) const
Enumerate parameter names for a specified section.
Definition: ncbireg.cpp:514
virtual string GetString(const string &section, const string &name, const string &default_value, TFlags flags=0) const
Get the parameter string value.
Definition: ncbireg.cpp:321
@ eReturn
Return default value.
Definition: ncbireg.hpp:203
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
bool IsOssEmpty(CNcbiOstrstream &oss)
Definition: ncbistre.hpp:831
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
IO_PREFIX::istream CNcbiIstream
Portable alias for istream.
Definition: ncbistre.hpp:146
bool NcbiStreamCopy(CNcbiOstream &os, CNcbiIstream &is)
Copy the entire contents of stream "is" to stream "os".
Definition: ncbistre.cpp:211
#define kEmptyStr
Definition: ncbistr.hpp:123
static int CompareNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive compare of a substring with another string.
Definition: ncbistr.cpp:219
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
Definition: ncbistr.cpp:3452
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
Definition: ncbistr.hpp:5086
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
Definition: ncbistr.hpp:5111
static string Sanitize(CTempString str, TSS_Flags flags=fSS_print)
Sanitize a string, allowing only specified classes of characters.
Definition: ncbistr.hpp:2878
static string & ReplaceInPlace(string &src, const string &search, const string &replace, SIZE_TYPE start_pos=0, SIZE_TYPE max_replace=0, SIZE_TYPE *num_replace=0)
Replace occurrences of a substring within a string.
Definition: ncbistr.cpp:3396
static string UInt8ToString(Uint8 value, TNumToStringFlags flags=0, int base=10)
Convert UInt8 to string.
Definition: ncbistr.hpp:5170
@ fSplit_Truncate
Definition: ncbistr.hpp:2503
@ fSplit_MergeDelimiters
Merge adjacent delimiters.
Definition: ncbistr.hpp:2500
bool IsExpired(void) const
Check if the deadline is expired.
Definition: ncbitime.hpp:1855
double GetAsDouble(void) const
Get as number of seconds (fractional value).
Definition: ncbitime.cpp:3512
bool IsFinite() const
Check if timeout holds a numeric value.
Definition: ncbitime.hpp:2735
@ eInfinite
Infinite timeout.
Definition: ncbitime.hpp:1699
unsigned int usec
microseconds (modulo 1,000,000)
Definition: ncbi_types.h:78
unsigned int sec
seconds
Definition: ncbi_types.h:77
where both of them are integers Note
FILE * file
yy_size_t n
range(_Ty, _Ty) -> range< _Ty >
mdb_mode_t mode
Definition: lmdb++.h:38
Portable class to work with a spawned process via pipes.
#define nullptr
Definition: ncbimisc.hpp:45
T min(T x_, T y_)
std::istream & in(std::istream &in_, double &x_)
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
double f(double x_, const double &y_)
Definition: njn_root.hpp:188
Defines CRequestContext class for NCBI C++ diagnostic API.
Reader-writer based streams.
SParams(CWorkerNodeJobContext &jc, string pt, const CTimeout &rt, const CTimeout &kap, CRemoteAppTimeoutReporter &tr, CRemoteAppReaper::CScheduler &pm)
CWorkerNodeJobContext & job_context
CRemoteAppTimeoutReporter & timeout_reporter
const CProcess process
CRemoteAppReaperTask(TProcessHandle handle)
bool operator()(int current, int max_attempts)
CRemoteAppRemoverTask(string p)
bool operator()(int current, int max_attempts) const
CRemoteAppRemoverTask m_Task
SGuard(CRemoteAppRemover *remover, CRemoteAppRemoverTask task, bool remove_tmp_dir)
CRemoteAppRemover::CScheduler * m_Scheduler
CRemoteAppReaper::CScheduler & process_manager
SParams(string pt, const CTimeout &rt, CRemoteAppReaper::CScheduler &pm)
const string & name
SSection(const IRegistry &r, const string &n)
const IRegistry & reg
int Get(const string &param, int def) const
bool Get(const string &param, bool def) const
else result
Definition: token2.c:20
static CS_CONTEXT * context
Definition: will_convert.c:21
Modified on Tue Jul 16 13:19:33 2024 by modify_doxy.py rev. 669887