NCBI C++ ToolKit
remote_app_wn.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: remote_app_wn.cpp 95014 2021-09-27 21:04:25Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description: NetSchedule worker node sample
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "exec_helpers.hpp"
35 
37 
39 
40 
42 {
43 public:
44  CAppEnvHolder(const CRemoteAppLauncher& remote_app_launcher) :
45  m_RemoteAppLauncher(remote_app_launcher)
46  {}
47 
48  const char* const* GetEnv(const CWorkerNodeJobContext&);
49 
50 private:
52  list<string> m_EnvValues;
53  list<string> m_CtxEnvValues;
54  vector<const char*> m_Env;
55 };
56 
57 const char* const* CAppEnvHolder::GetEnv(
59 {
60  // If called for the first time
61  if (m_Env.empty()) {
62  const CRemoteAppLauncher::TEnvMap& added_env =
64 
65  ITERATE(CRemoteAppLauncher::TEnvMap, it, added_env) {
66  m_EnvValues.push_back(it->first + "=" +it->second);
67  }
68  list<string> names;
70  env.Enumerate(names);
71  ITERATE(list<string>, it, names) {
72  if (added_env.find(*it) == added_env.end())
73  m_EnvValues.push_back(*it + "=" + env.Get(*it));
74  }
75  } else {
76  m_Env.clear();
77  }
78 
79  // m_CtxEnvValues cannot be replaced with a local variable,
80  // as it holds actual values (m_Env holds only pointers to them)
81  m_CtxEnvValues.clear();
82  m_CtxEnvValues.push_back("NCBI_NS_SERVICE=" +
83  context.GetWorkerNode().GetServiceName());
84 
85  m_CtxEnvValues.push_back("NCBI_NS_QUEUE=" + context.GetQueueName());
86 
87  const CNetScheduleJob& job = context.GetJob();
88 
89  m_CtxEnvValues.push_back("NCBI_NS_JID=" + job.job_id);
90  m_CtxEnvValues.push_back("NCBI_JOB_AFFINITY=" + job.affinity);
91 
92  if (!job.client_ip.empty()) {
93  m_CtxEnvValues.push_back("NCBI_LOG_CLIENT_IP=" + job.client_ip);
94  }
95 
96  if (!job.session_id.empty()) {
97  m_CtxEnvValues.push_back("NCBI_LOG_SESSION_ID=" + job.session_id);
98  }
99 
100  if (!job.page_hit_id.empty()) {
101  m_CtxEnvValues.push_back("NCBI_LOG_HIT_ID=" + job.page_hit_id);
102  }
103 
104  ITERATE(list<string>, it, m_CtxEnvValues) {
105  m_Env.push_back(it->c_str());
106  }
107 
108  ITERATE(list<string>, it, m_EnvValues) {
109  m_Env.push_back(it->c_str());
110  }
111 
112  m_Env.push_back(NULL);
113  return &m_Env[0];
114 }
115 
116 ///////////////////////////////////////////////////////////////////////
117 
118 /// The remote_app NetSchedule job.
119 ///
120 /// This class performs demarshalling of the command line arguments
121 /// and then executes the specified program.
122 ///
124 {
125 public:
127  const CRemoteAppLauncher& remote_app_launcher);
128 
129  virtual ~CRemoteAppJob() {}
130 
132 
133 private:
137 };
138 
140 {
141  if (context.IsLogRequested()) {
142  LOG_POST(Note << context.GetJobKey() << " is received.");
143  }
144 
147 
148  try {
149  request.Deserialize(context.GetIStream());
150  }
151  catch (exception&) {
152  ERR_POST("Cannot deserialize remote_app job");
153  context.CommitJobWithFailure("Error while "
154  "unpacking remote_app arguments");
155  return -1;
156  }
157 
158  result.SetStdOutErrFileNames(request.GetStdOutFileName(),
159  request.GetStdErrFileName(),
160  request.GetStdOutErrStorageType());
161 
162  size_t output_size = context.GetWorkerNode().GetServerOutputSize();
163  if (output_size == 0) {
164  // NetSchedule internal storage is not supported; all
165  // output will be stored in NetCache anyway, so it can
166  // be put into one blob.
167  output_size = kMax_UInt;
168  } else {
169  // Empiric estimation of the maximum output size
170  // (reduction by 10%).
171  output_size = output_size - output_size / 10;
172  }
173  result.SetMaxInlineSize(output_size);
174 
175  if (context.IsLogRequested()) {
176  if (!request.GetInBlobIdOrData().empty()) {
177  LOG_POST(Note << context.GetJobKey()
178  << " Input data: " << request.GetInBlobIdOrData());
179  }
180  LOG_POST(Note << context.GetJobKey()
181  << " Args: " << request.GetCmdLine());
182  if (!request.GetStdOutFileName().empty()) {
183  LOG_POST(Note << context.GetJobKey()
184  << " StdOutFile: " << request.GetStdOutFileName());
185  }
186  if (!request.GetStdErrFileName().empty()) {
187  LOG_POST(Note << context.GetJobKey()
188  << " StdErrFile: " << request.GetStdErrFileName());
189  }
190  }
191 
192  vector<string> args;
193  TokenizeCmdLine(request.GetCmdLine(), args);
194 
195  int ret = -1;
196  bool finished_ok = m_RemoteAppLauncher.ExecRemoteApp(args,
197  request.GetStdInForRead(),
198  result.GetStdOutForWrite(),
199  result.GetStdErrForWrite(),
200  ret,
201  context,
202  request.GetAppRunTimeout(),
204 
205  result.SetRetCode(ret);
206  result.Serialize(context.GetOStream());
207 
208  m_RemoteAppLauncher.FinishJob(finished_ok, ret, context);
209 
210  if (context.IsLogRequested()) {
211  LOG_POST(Note << "Job " << context.GetJobKey() <<
212  " is " << context.GetCommitStatusDescription(
213  context.GetCommitStatus()) <<
214  ". Exit code: " << ret);
215  if (!result.GetErrBlobIdOrData().empty()) {
216  LOG_POST(Note << context.GetJobKey() << " Err data: " <<
217  result.GetErrBlobIdOrData());
218  }
219  }
220  request.Reset();
221  result.Reset();
222  return ret;
223 }
224 
226  const CRemoteAppLauncher& remote_app_launcher) :
227  m_NetCacheAPI(context.GetNetCacheAPI()),
228  m_RemoteAppLauncher(remote_app_launcher),
229  m_AppEnvHolder(remote_app_launcher)
230 {
232 }
233 
235 {
236 public:
238  : CRemoteAppBaseListener(launcher) {}
239 
240  virtual void OnInit(IWorkerNodeInitBaseContext* ctx);
241 };
242 
244 {
245  // In essence, this makes remote_app behave as if default value for "[log]merge_lines" was true
246  // (the parameter is read in SGridWorkerNodeImpl::Init).
247  if (!ctx->GetConfig().HasEntry("log", "merge_lines")) {
250  }
251 }
252 
253 
254 /////////////////////////////////////////////////////////////////////////////
255 
256 #define GRID_APP_NAME "remote_app"
257 extern const char kGridAppName[] = GRID_APP_NAME;
258 
260 
261 int main(int argc, const char* argv[])
262 {
264  return Main<TRemoteAppJobFactory, CRemoteAppListener>(argc, argv);
265 }
list< string > m_EnvValues
CAppEnvHolder(const CRemoteAppLauncher &remote_app_launcher)
const char *const * GetEnv(const CWorkerNodeJobContext &)
vector< const char * > m_Env
list< string > m_CtxEnvValues
const CRemoteAppLauncher & m_RemoteAppLauncher
void SetReuseJobObject(bool value)
static CGridGlobals & GetInstance()
CNcbiEnvironment –.
Definition: ncbienv.hpp:110
Client API for NetCache server.
unique_ptr< CRemoteAppLauncher > TLauncherPtr
The remote_app NetSchedule job.
virtual ~CRemoteAppJob()
int Do(CWorkerNodeJobContext &context)
Execute the job.
const CRemoteAppLauncher & m_RemoteAppLauncher
CAppEnvHolder m_AppEnvHolder
CNetCacheAPI m_NetCacheAPI
CRemoteAppJob(const IWorkerNodeInitContext &context, const CRemoteAppLauncher &remote_app_launcher)
const CNcbiEnvironment & GetLocalEnv() const
void FinishJob(bool finished_ok, int ret, CWorkerNodeJobContext &context) const
bool ExecRemoteApp(const vector< string > &args, CNcbiIstream &in, CNcbiOstream &out, CNcbiOstream &err, int &exit_value, CWorkerNodeJobContext &job_context, unsigned app_run_timeout, const char *const env[]) const
const TEnvMap & GetAddedEnv() const
CRemoteAppListener(const TLauncherPtr &launcher)
virtual void OnInit(IWorkerNodeInitBaseContext *ctx)
Perform the necessary pre-init checks.
Remote Application Request (both client side and application executor side)
Definition: remote_app.hpp:96
CNcbiIstream & GetStdInForRead()
Get the stdin stream of the remote application.
Definition: remote_app.hpp:131
const string & GetStdOutFileName() const
Definition: remote_app.hpp:149
const string & GetInBlobIdOrData() const
Definition: remote_app.hpp:154
void Deserialize(CNcbiIstream &is)
Definition: remote_app.hpp:161
EStdOutErrStorageType GetStdOutErrStorageType() const
Definition: remote_app.hpp:151
const string & GetCmdLine() const
Get the command line of the remote application.
Definition: remote_app.hpp:108
const string & GetStdErrFileName() const
Definition: remote_app.hpp:150
unsigned int GetAppRunTimeout() const
Definition: remote_app.hpp:111
Remote Application Result (both client side and application executor side)
Definition: remote_app.hpp:209
Worker Node job context.
Worker Node initialize context.
Worker Node Job interface.
const_iterator end() const
Definition: map.hpp:152
const_iterator find(const key_type &key) const
Definition: map.hpp:153
CS_CONTEXT * ctx
Definition: t0006.c:12
static const struct name_t names[]
static HENV env
Definition: transaction2.c:38
#define GRID_APP_CHECK_VERSION_ARGS()
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
#define NULL
Definition: ncbistd.hpp:225
void SetDiagPostFlag(EDiagPostFlag flag)
Set the specified flag (globally).
Definition: ncbidiag.cpp:6070
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
@ eDPF_MergeLines
Escape EOLs.
Definition: ncbidiag.hpp:751
@ eDPF_PreMergeLines
Obsolete. Use eDPF_MergeLines.
Definition: ncbidiag.hpp:752
string job_id
Output job key.
#define kMax_UInt
Definition: ncbi_limits.h:185
where both of them are integers Note
void TokenizeCmdLine(const string &cmdline, vector< string > &args)
Definition: remote_app.cpp:381
const char kGridAppName[]
#define GRID_APP_NAME
int main(int argc, const char *argv[])
USING_NCBI_SCOPE
Job description.
else result
Definition: token2.c:20
static CS_CONTEXT * context
Definition: will_convert.c:21
Modified on Wed Apr 17 13:10:22 2024 by modify_doxy.py rev. 669887