NCBI C++ ToolKit
rpcbase.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: rpcbase.cpp 99295 2023-03-07 19:42:46Z ucko $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Aleksey Grichenko
27 *
28 * File Description:
29 * CRPCClient helpers
30 */
31 
32 #include <ncbi_pch.hpp>
33 #include <corelib/ncbistr.hpp>
34 #include <corelib/ncbi_cookies.hpp>
35 #include <serial/rpcbase_impl.hpp>
36 
38 
40 {
41 public:
42  CCounterGuard(int* counter)
43  : m_Counter(*counter)
44  {
45  m_Counter++;
46  }
47 
49  {
50  m_Counter--;
51  }
52 
53 private:
54  int& m_Counter;
55 };
56 
57 
58 static string s_GetConfigString(const string& service,
59  const string& variable)
60 {
61  if (service.empty() || variable.empty()) return kEmptyStr;
62 
63  string env_var = service + "__RPC_CLIENT__" + variable;
64  NStr::ToUpper(env_var);
65  const TXChar* str = NcbiSys_getenv(_T_XCSTRING(env_var.c_str()));
66 
67  if (str && *str) {
68  return _T_CSTRING(str);
69  }
70 
72  if (app && app->HasLoadedConfig()) {
73  return app->GetConfig().Get(service + ".rpc_client", variable);
74  }
75  return kEmptyStr;
76 }
77 
78 
79 static unsigned int s_GetTryLimit(const string& service)
80 {
81  string str = s_GetConfigString(service, "max_try");
82  if (!str.empty()) {
83  try {
84  unsigned int ret = NStr::StringToNumeric<unsigned int>(str);
85  return ret > 0 ? ret : 3;
86  }
87  catch (...) {
88  ERR_POST(Warning << "Bad " << service << "/max_try value: " << str);
89  }
90  }
91  return 3;
92 }
93 
94 
95 static CTimeSpan s_GetRetryDelay(const string& service)
96 {
97  CTimeSpan ret;
98  string str = s_GetConfigString(service, "retry_delay");
99  if (!str.empty()) {
100  try {
101  double sec = NStr::StringToNumeric<double>(str);
102  return CTimeSpan(sec > 0 ? sec : 0);
103  }
104  catch (...) {
105  ERR_POST(Warning << "Bad " << service << "/retry_delay value: " << str);
106  }
107  }
108  return ret;
109 }
110 
111 
112 CRPCClient_Base::CRPCClient_Base(const string& service,
114  : m_Format(format),
115  m_RetryDelay(s_GetRetryDelay(service)),
116  m_TryCount(0),
117  m_RecursionCount(0),
118  m_Service(service),
119  m_TryLimit(s_GetTryLimit(service))
120 {
121 }
122 
123 
124 CRPCClient_Base::CRPCClient_Base(const string& service,
126  unsigned int try_limit)
127  : m_Format(format),
128  m_RetryDelay(s_GetRetryDelay(service)),
129  m_TryCount(0),
130  m_RecursionCount(0),
131  m_Service(service),
132  m_TryLimit(try_limit > 0 ? try_limit : 3)
133 {
134 }
135 
136 
138 {
139  try {
140  x_Disconnect();
141  } STD_CATCH_ALL_XX(Serial_RPCClient, 2, "CRPCClient_Base::x_Disconnect()");
142 }
143 
144 
146 {
147  // Do not connect from recursive requests - this must be done
148  // by the main request only.
149  if (m_RecursionCount > 1) return;
150 
151  if (m_Stream.get() && m_Stream->good()) {
152  return; // already connected
153  }
154  CMutexGuard LOCK(m_Mutex);
155  // repeat test with mutex held to avoid races
156  if (m_Stream.get() && m_Stream->good()) {
157  return; // already connected
158  }
159  x_Connect();
161 }
162 
163 
165 {
166  CMutexGuard LOCK(m_Mutex);
167  if ( !m_Stream.get() || !m_Stream->good() ) {
168  // not connected -- don't call x_Disconnect, which might
169  // temporarily reconnect to send a fini!
170  return;
171  }
172  CCounterGuard recursion_guard(&m_RecursionCount);
173  try {
174  x_Disconnect();
175  } catch (CInvalidChoiceSelection&) {
176  }
177 }
178 
179 
181 {
182  CMutexGuard LOCK(m_Mutex);
183  if (m_Stream.get() && m_Stream->good()) {
184  x_Disconnect();
185  }
186  x_Connect();
187 }
188 
189 
190 void CRPCClient_Base::SetAffinity(const string& affinity)
191 {
192  if (m_Affinity != affinity) {
193  if (m_RecursionCount > 1) {
194  ERR_POST("Affinity cannot be changed on a recursive request");
195  return;
196  }
197  Disconnect();
198  m_Affinity = affinity;
199  }
200 }
201 
202 
204 {
205  if (m_Out.get() != nullptr) {
207  }
208  m_In.reset();
209  m_Out.reset();
210  m_Stream.reset();
211 }
212 
213 
215 {
216  if (m_Out.get() != nullptr) {
218  }
219  m_In .reset();
220  m_Out.reset();
221  m_Stream.reset(stream);
222  m_In .reset(CObjectIStream::Open(m_Format, *stream));
223  m_Out.reset(CObjectOStream::Open(m_Format, *stream));
224 }
225 
226 
228 {
229  CMutexGuard LOCK(m_Mutex);
230  if (m_RecursionCount == 0) {
231  m_TryCount = 0;
232  }
233  // Recursion counter needs to be decremented on both success and failure.
234  CCounterGuard recursion_guard(&m_RecursionCount);
235 
236  const string& request_name = request.GetThisTypeInfo() != NULL
237  ? ("("+request.GetThisTypeInfo()->GetName()+")")
238  : "(no_request_type)";
239 
240  // Reset headers from previous requests if any.
241  m_RetryCtx.Reset();
242  double max_span = m_RetryDelay.GetAsDouble()*m_TryLimit;
243  double span = max_span;
244  bool limit_by_time = !m_RetryDelay.IsEmpty();
245  // Retry context can be either the default one (m_RetryCtx), or provided
246  // through an exception.
247  for (;;) {
248  if ( IsCanceled() ) {
250  "Request canceled " + request_name);
251  }
254  diag_guard.SetSeverityCap(eDiag_Info);
255  try {
256  SetAffinity(x_GetAffinity(request));
257  Connect(); // No-op if already connected
262  }
263  }
264  else {
265  // by default re-send the original request
266  x_WriteRequest(*m_Out, request);
267  }
268  m_Stream->peek(); // send data, read response headers
269  if (!m_Stream->good() && !m_Stream->eof()) {
271  "Connection stream is in bad state " + request_name);
272  }
275  // store response content to send it with the next retry
279  }
280  else {
281  // read normal response
282  x_ReadReply(*m_In, reply);
283  }
284  // If reading reply succeeded and no retry was requested by the server, break.
285  if ( !m_RetryCtx.GetNeedRetry() ) {
287  break;
288  }
289  } catch (CException& e) {
290  // Some exceptions tend to correspond to transient glitches;
291  // the remainder, however, may as well get propagated immediately.
292  CRPCClientException* rpc_ex = dynamic_cast<CRPCClientException*>(&e);
293  if (rpc_ex && rpc_ex->GetErrCode() == CRPCClientException::eRetry) {
294  if ( rpc_ex->IsSetRetryContext() ) {
295  // Save information to the local retry context and proceed.
296  m_RetryCtx = rpc_ex->GetRetryContext();
297  }
298  // proceed to retry
299  }
300  else if ( !dynamic_cast<CSerialException*>(&e)
301  && !dynamic_cast<CIOException*>(&e) ) {
302  // Not a retry related exception, abort.
303  throw;
304  }
305  }
306  // No retries for recursive requests (e.g. AskInit called by Connect).
307  // Exit immediately, do not reset retry context - it may be used by
308  // the main request's retry loop.
309  if (m_RecursionCount > 1) {
311  return;
312  }
313 
314  // Retry request on exception or on explicit retry request from the server.
315 
316  // If using time limit, allow to make more than m_RetryLimit attempts
317  // if the server has set shorter delay.
318  if ((!limit_by_time && ++m_TryCount >= m_TryLimit) ||
322  "Failed to receive reply after "
324  + (m_TryCount == 1 ? " try " : " tries ")
325  + request_name );
326  }
327  if ( m_RetryCtx.IsSetStop() ) {
330  "Retrying request stopped by the server: "
331  + m_RetryCtx.GetStopReason() + ' ' + request_name);
332  }
333  CTimeSpan delay = x_GetRetryDelay(span);
334  if ( !delay.IsEmpty() ) {
335  SleepSec(delay.GetCompleteSeconds());
337  span -= delay.GetAsDouble();
338  if (limit_by_time && span <= 0) {
341  "Failed to receive reply in "
342  + CTimeSpan(max_span).AsSmartString()
343  + ' ' + request_name);
344  }
345  }
346  // Always reconnect on retry.
347  if ( IsCanceled() ) {
349  "Request canceled " + request_name);
350  }
351  try {
352  Reset();
353  } STD_CATCH_ALL_XX(Serial_RPCClient, 1,
354  "CRPCClient_Base::Reset() " + request_name);
355  }
356  // Reset retry context when done.
357  m_RetryCtx.Reset();
358  // If there were any retries, force disconnect to prevent using old
359  // retry url, args etc. with the next request.
360  if ( m_TryCount > 0 && m_RecursionCount <= 1 ) {
361  Disconnect();
362  }
363 }
364 
365 
366 bool CRPCClient_Base::x_ShouldRetry(unsigned int tries) /* NCBI_FAKE_WARNING */
367 {
368  _TRACE("CRPCClient_Base::x_ShouldRetry: retrying after " << tries
369  << " failure(s)");
370  return true;
371 }
372 
373 
375 {
376  // If not set by the server, use local delay.
377  if ( !m_RetryCtx.IsSetDelay() ) {
378  return m_RetryDelay;
379  }
380  // If local delay is not zero, we have to limit total retries time to max_delay.
381  if (!m_RetryDelay.IsEmpty() &&
382  m_RetryCtx.GetDelay().GetAsDouble() > max_delay) {
383  return CTimeSpan(max_delay);
384  }
385  return m_RetryCtx.GetDelay();
386 }
387 
388 
390 {
391  switch (format) {
392  case eSerial_None:
393  break;
394  case eSerial_AsnText:
395  return "Content-Type: x-ncbi-data/x-asn-text\r\n";
396  case eSerial_AsnBinary:
397  return "Content-Type: x-ncbi-data/x-asn-binary\r\n";
398  case eSerial_Xml:
399  return "Content-Type: application/xml\r\n";
400  case eSerial_Json:
401  return "Content-Type: application/json\r\n";
402  }
403  return NULL; // kEmptyCStr?
404 }
405 
406 
408 {
409  switch (GetErrCode()) {
410  case eRetry: return "eRetry";
411  case eFailed: return "eFailed";
412  case eArgs: return "eArgs";
413  case eOther: return "eOther";
414  default: return CException::GetErrCodeString();
415  }
416 }
417 
418 
~CCounterGuard(void)
Definition: rpcbase.cpp:48
int & m_Counter
Definition: rpcbase.cpp:54
CCounterGuard(int *counter)
Definition: rpcbase.cpp:42
Guard for collecting diag messages (affects the current thread only).
Definition: ncbidiag.hpp:1300
Thrown on an attempt to access wrong choice variant.
Definition: exception.hpp:102
CNcbiOstrstreamToString class helps convert CNcbiOstrstream to a string Sample usage:
Definition: ncbistre.hpp:802
Root class for all serialization exceptions.
Definition: exception.hpp:50
Base class for all serializable objects.
Definition: serialbase.hpp:150
CTimeSpan.
Definition: ncbitime.hpp:1313
The NCBI C++ standard methods for dealing with std::string.
static const char * str(char *buf, int n)
Definition: stats.c:84
bool HasLoadedConfig(void) const
Check if the config file has been loaded.
static CNcbiApplicationGuard InstanceGuard(void)
Singleton method.
Definition: ncbiapp.cpp:133
const CNcbiRegistry & GetConfig(void) const
Get the application's cached configuration parameters (read-only).
#define NULL
Definition: ncbistd.hpp:225
#define _TRACE(message)
Definition: ncbidbg.hpp:122
void Release(void)
Release the guard.
Definition: ncbidiag.cpp:575
EDiagSev GetDiagPostLevel(void)
Get current threshold severity for posting the messages.
Definition: ncbidiag.cpp:6151
void SetSeverityCap(EDiagSev sev)
Set new severity cap for use in PrintCapped mode.
Definition: ncbidiag.hpp:1357
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
@ ePrintCapped
Print collected messages at reduced severity.
Definition: ncbidiag.hpp:1306
@ ePrint
Print all collected messages as is.
Definition: ncbidiag.hpp:1304
@ eDiag_Info
Informational message.
Definition: ncbidiag.hpp:651
@ eDiag_Critical
Critical error message.
Definition: ncbidiag.hpp:654
#define STD_CATCH_ALL_XX(err_name, err_subcode, message)
Standard handling of "exception"-derived exceptions; catches non-standard exceptions and generates "u...
Definition: ncbiexpt.hpp:640
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
virtual const char * GetErrCodeString(void) const
Get error code interpreted as text.
Definition: ncbiexpt.cpp:444
bool IsCanceled(void) const
void SetAffinity(const string &affinity)
Definition: rpcbase.cpp:190
CTimeSpan m_RetryDelay
CRPCClient_Base(const string &service, ESerialDataFormat format)
Definition: rpcbase.cpp:112
bool IsSetRetryContext(void) const
virtual void x_ReadReply(CObjectIStream &in, CSerialObject &reply)=0
unsigned int m_TryLimit
void x_SetStream(CNcbiIostream *stream)
Definition: rpcbase.cpp:214
virtual void x_Disconnect(void)
Disconnect as cleanly as possible.
Definition: rpcbase.cpp:203
static const char * GetContentTypeHeader(ESerialDataFormat format)
Definition: rpcbase.cpp:389
unique_ptr< CObjectIStream > m_In
ESerialDataFormat m_Format
CMutex m_Mutex
To allow sharing across threads.
void Disconnect(void)
Definition: rpcbase.cpp:164
virtual void x_WriteRequest(CObjectOStream &out, const CSerialObject &request)=0
CRetryContext & GetRetryContext(void)
Read retry related data.
unique_ptr< CObjectOStream > m_Out
virtual string x_GetAffinity(const CSerialObject &request) const =0
CTimeSpan x_GetRetryDelay(double max_delay) const
Definition: rpcbase.cpp:374
virtual const CTypeInfo * GetThisTypeInfo(void) const =0
virtual bool x_ShouldRetry(unsigned int tries)
Definition: rpcbase.cpp:366
virtual const char * GetErrCodeString(void) const override
Get error code interpreted as text.
Definition: rpcbase.cpp:407
virtual void x_Connect(void)=0
These run with m_Mutex already acquired.
virtual ~CRPCClient_Base(void)
Definition: rpcbase.cpp:137
unique_ptr< CNcbiIostream > m_Stream
void Connect(void)
Definition: rpcbase.cpp:145
CHttpRetryContext m_RetryCtx
void Reset(void)
Definition: rpcbase.cpp:180
ESerialDataFormat
Data file format.
Definition: serialdef.hpp:71
void x_Ask(const CSerialObject &request, CSerialObject &reply)
Definition: rpcbase.cpp:227
unsigned int m_TryCount
@ eArgs
Failed to send request arguments.
@ eRetry
Request failed, should be retried if possible.
@ eFailed
Request (or retry) failed.
@ eSerial_AsnText
ASN.1 text.
Definition: serialdef.hpp:73
@ eSerial_Xml
XML.
Definition: serialdef.hpp:75
@ eSerial_Json
JSON.
Definition: serialdef.hpp:76
@ eSerial_None
Definition: serialdef.hpp:72
@ eSerial_AsnBinary
ASN.1 binary.
Definition: serialdef.hpp:74
static CObjectOStream * Open(ESerialDataFormat format, CNcbiOstream &outStream, bool deleteOutStream)
Create serial object writer and attach it to an output stream.
Definition: objostr.cpp:126
static CObjectIStream * Open(ESerialDataFormat format, CNcbiIstream &inStream, bool deleteInStream)
Create serial object reader and attach it to an input stream.
Definition: objistr.cpp:195
virtual const string & Get(const string &section, const string &name, TFlags flags=0) const
Get the parameter value.
Definition: ncbireg.cpp:262
bool IsSetStop(void) const
Check if STOP flag is set.
Definition: retry_ctx.hpp:271
const CTimeSpan & GetDelay(void) const
Get retry delay.
Definition: retry_ctx.hpp:308
void ResetNeedReconnect(void)
Reset need-reconnect flag (e.g.
Definition: retry_ctx.hpp:155
bool IsSetContentOverride(void) const
Check if content source is set.
Definition: retry_ctx.hpp:415
const string & GetContent(void) const
Get retry content.
Definition: retry_ctx.hpp:456
bool IsSetDelay(void) const
Check if retry delay is set.
Definition: retry_ctx.hpp:301
const string & GetStopReason(void) const
Get STOP reason (or empty string).
Definition: retry_ctx.hpp:278
bool IsSetContent(void) const
Check if retry content is set and content-override flag assumes any content (eFromResponse or eData).
Definition: retry_ctx.hpp:447
EContentOverride GetContentOverride(void) const
Get content source.
Definition: retry_ctx.hpp:424
void SetContent(const string &content)
Set retry content.
Definition: retry_ctx.hpp:463
bool GetNeedRetry(void) const
Check if another retry attempt has been requested (any new headers except STOP received in the last r...
Definition: retry_ctx.hpp:236
void Reset(void)
Clear all options.
Definition: retry_ctx.hpp:257
@ eFromResponse
On retry send content from the response body.
Definition: retry_ctx.hpp:67
@ eNoContent
Do not send any content on retry.
Definition: retry_ctx.hpp:66
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
IO_PREFIX::iostream CNcbiIostream
Portable alias for iostream.
Definition: ncbistre.hpp:152
bool NcbiStreamCopy(CNcbiOstream &os, CNcbiIstream &is)
Copy the entire contents of stream "is" to stream "os".
Definition: ncbistre.cpp:211
#define kEmptyStr
Definition: ncbistr.hpp:123
char TXChar
Definition: ncbistr.hpp:172
#define _T_CSTRING(x)
Definition: ncbistr.hpp:182
#define _T_XCSTRING(x)
Definition: ncbistr.hpp:181
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
static string & ToUpper(string &str)
Convert string to upper case – string& version.
Definition: ncbistr.cpp:424
bool IsEmpty(void) const
Return TRUE is an object keep zero time span.
Definition: ncbitime.hpp:2580
long GetNanoSecondsAfterSecond(void) const
Get number of nanoseconds.
Definition: ncbitime.hpp:2563
double GetAsDouble(void) const
Return time span as number of seconds.
Definition: ncbitime.hpp:2566
long GetCompleteSeconds(void) const
Get number of complete seconds.
Definition: ncbitime.hpp:2560
const string & GetName(void) const
Get name of this type.
Definition: typeinfo.cpp:249
char * buf
void SleepMicroSec(unsigned long mc_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
void SleepSec(unsigned long sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Sleep.
static Format format
Definition: njn_ioutil.cpp:53
static string s_GetConfigString(const string &service, const string &variable)
Definition: rpcbase.cpp:58
static unsigned int s_GetTryLimit(const string &service)
Definition: rpcbase.cpp:79
static CTimeSpan s_GetRetryDelay(const string &service)
Definition: rpcbase.cpp:95
#define NcbiSys_getenv
Definition: ncbisys.hpp:90
Modified on Wed Apr 17 13:10:14 2024 by modify_doxy.py rev. 669887