NCBI C++ ToolKit
rpcbase.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef SERIAL___RPCBASE__HPP
2 #define SERIAL___RPCBASE__HPP
3 
4 /* $Id: rpcbase.hpp 91079 2020-09-01 17:28:37Z grichenk $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Author: Aaron Ucko, NCBI
30  *
31  * File Description:
32  * Generic template class for ASN.1/XML RPC clients
33  *
34  */
35 
36 #include <corelib/ncbimtx.hpp>
37 #include <corelib/ncbi_system.hpp>
39 #include <connect/ncbi_util.h>
40 #include <serial/objistr.hpp>
41 #include <serial/objostr.hpp>
42 #include <serial/serial.hpp>
43 #include <util/retry_ctx.hpp>
44 #include <serial/rpcbase_impl.hpp>
45 
46 /** @addtogroup GenClassSupport
47  *
48  * @{
49  */
50 
51 
53 
54 /// CRPCClient -- prototype client for ASN.1/XML-based RPC.
55 /// Normally connects automatically on the first real request and
56 /// disconnects automatically in the destructor, but allows both events
57 /// to occur explicitly.
58 
59 template <class TRequest, class TReply>
60 class CRPCClient : public CObject,
61  public CRPCClient_Base,
62  protected CConnIniter
63 {
64 public:
65  CRPCClient(const string& service = kEmptyStr)
68  {}
69  CRPCClient(const string& service,
71  : CRPCClient_Base(service, format),
73  {}
74  CRPCClient(const string& service,
76  unsigned int try_limit)
77  : CRPCClient_Base(service, format, try_limit),
79  {}
80  virtual ~CRPCClient(void)
81  {
82  if ( !sx_IsSpecial(m_Timeout) ) {
83  delete const_cast<STimeout*>(m_Timeout);
84  }
85  }
86 
87  virtual void Ask(const TRequest& request, TReply& reply)
88  { x_Ask(request, reply); }
89 
90  virtual void WriteRequest(CObjectOStream& out, const TRequest& request)
91  { out << request; }
92 
93  virtual void ReadReply(CObjectIStream& in, TReply& reply)
94  { in >> reply; }
95 
96  EIO_Status SetTimeout(const STimeout* timeout,
97  EIO_Event direction = eIO_ReadWrite);
98  const STimeout* GetTimeout(EIO_Event direction = eIO_Read) const;
99 
100  EIO_Status AsyncConnect(void* handle_buf, size_t handle_size);
101 
102 protected:
103  virtual string GetAffinity(const TRequest& /*request*/) const
104  {
105  return string();
106  }
107 
108  virtual void x_WriteRequest(CObjectOStream& out, const CSerialObject& request) override
109  {
110  WriteRequest(out, dynamic_cast<const TRequest&>(request));
111  }
112 
113  virtual void x_ReadReply(CObjectIStream& in, CSerialObject& reply) override
114  {
115  ReadReply(in, dynamic_cast<TReply&>(reply));
116  }
117 
118  virtual string x_GetAffinity(const CSerialObject& request) const override
119  {
120  return GetAffinity(dynamic_cast<const TRequest&>(request));
121  }
122 
123  virtual void x_Connect(void) override;
124 
125  /// Connect to a URL. (Discouraged; please establish and use a
126  /// suitable named service if possible.)
127  void x_ConnectURL(const string& url);
128 
129  // CConn_HttpStream callback for parsing headers.
130  // 'user_data' must point to an instance of CRPCConnStatus.
131  static EHTTP_HeaderParse sx_ParseHeader(const char* http_header,
132  void* user_data,
133  int server_error);
134 
135  static bool sx_IsSpecial(const STimeout* timeout);
136 
137  const STimeout* m_Timeout; ///< Cloned if not special.
138 
139 private:
140  void x_FillConnNetInfo(SConnNetInfo& net_info, SSERVICE_Extra* x_extra);
141 
142  unique_ptr<CConn_ServiceStream> m_AsyncStream;
143 };
144 
145 
146 ///////////////////////////////////////////////////////////////////////////
147 // Inline methods
148 
149 template<>
151 {
152  static void Delete(SConnNetInfo* net_info)
153  { ConnNetInfo_Destroy(net_info); }
154 };
155 
156 
157 template<class TRequest, class TReply>
158 inline
160  SSERVICE_Extra* x_extra)
161 {
162  if ( !m_Args.empty() ) {
163  if ( !ConnNetInfo_AppendArg(&net_info, m_Args.c_str(), 0) ) {
165  "Error sending additional request arguments");
166  }
167  }
168  if ( m_RetryCtx.IsSetArgs() ) {
169  if ( !ConnNetInfo_AppendArg(&net_info, m_RetryCtx.GetArgs().c_str(),
170  0) ) {
172  "Error sending retry context arguments");
173  }
174  } else if (x_extra != nullptr && !m_Affinity.empty()) {
175  if ( !ConnNetInfo_PostOverrideArg(&net_info, m_Affinity.c_str(), 0) ) {
177  "Error sending request affinity");
178  }
179  }
180  if (x_extra == nullptr) {
181  return;
182  }
183  // Install callback for parsing headers.
184  memset(x_extra, 0, sizeof(*x_extra));
185  x_extra->data = &m_RetryCtx;
186  x_extra->parse_header = sx_ParseHeader;
187  x_extra->flags = fHTTP_NoAutoRetry;
188  const char* user_header = GetContentTypeHeader(GetFormat());
189  if (user_header != NULL && *user_header != '\0') {
190  if ( !ConnNetInfo_OverrideUserHeader(&net_info, user_header)) {
192  "Error sending user header");
193  }
194  }
195 }
196 
197 template<class TRequest, class TReply>
198 inline
200 {
201  if (m_AsyncStream.get() != nullptr) {
202  m_AsyncStream->SetTimeout(eIO_Open, m_Timeout);
203  m_AsyncStream->SetTimeout(eIO_ReadWrite, m_Timeout);
204  x_SetStream(m_AsyncStream.release());
205  return;
206  } else if ( m_RetryCtx.IsSetUrl() ) {
207  x_ConnectURL(m_RetryCtx.GetUrl());
208  return;
209  }
210  _ASSERT( !m_Service.empty() );
211  SSERVICE_Extra x_extra;
212  AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(m_Service.c_str()));
213  x_FillConnNetInfo(*net_info, &x_extra);
214 
215  unique_ptr<CConn_ServiceStream> stream
217  (m_Service, fSERV_Any | fSERV_DelayOpen, net_info.get(), &x_extra, m_Timeout));
218  if ( m_Canceler.NotNull() ) {
219  stream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
220  }
221  x_SetStream(stream.release());
222 }
223 
224 
225 template<class TRequest, class TReply>
226 inline
228 {
229  AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(nullptr));
230  if ( !ConnNetInfo_ParseURL(net_info.get(), url.c_str()) ) {
231  NCBI_THROW(CCoreException, eInvalidArg, "Error parsing URL " + url);
232  }
233  x_FillConnNetInfo(*net_info, nullptr);
234  unique_ptr<CConn_HttpStream> stream(new CConn_HttpStream(net_info.get(),
235  GetContentTypeHeader(GetFormat()),
236  sx_ParseHeader, // callback
237  &m_RetryCtx, // user data for the callback
238  0, // adjust callback
239  0, // cleanup callback
241  m_Timeout));
242  if ( m_Canceler.NotNull() ) {
243  stream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
244  }
245  x_SetStream(stream.release());
246 }
247 
248 
249 template<class TRequest, class TReply>
250 inline
252  EIO_Event direction)
253 {
254  // save for future use, especially if there's no stream at present.
255  {{
256  const STimeout* old_timeout = m_Timeout;
257  if (sx_IsSpecial(timeout)) {
258  m_Timeout = timeout;
259  } else { // make a copy
260  m_Timeout = new STimeout(*timeout);
261  }
262  if ( !sx_IsSpecial(old_timeout) ) {
263  delete const_cast<STimeout*>(old_timeout);
264  }
265  }}
266 
267  CConn_IOStream* conn_stream
268  = dynamic_cast<CConn_IOStream*>(m_Stream.get());
269  if (conn_stream) {
270  return conn_stream->SetTimeout(direction, timeout);
271  } else if ( !m_Stream.get() ) {
272  return eIO_Success; // we've saved it, which is the best we can do...
273  } else {
274  return eIO_NotSupported;
275  }
276 }
277 
278 
279 template<class TRequest, class TReply>
280 inline
282  const
283 {
284  CConn_IOStream* conn_stream
285  = dynamic_cast<CConn_IOStream*>(m_Stream.get());
286  if (conn_stream) {
287  return conn_stream->GetTimeout(direction);
288  }
289  else {
290  return m_Timeout;
291  }
292 }
293 
294 
295 template<class TRequest, class TReply>
296 inline
298  size_t handle_size)
299 {
300  static const STimeout kZeroTimeout = { 0, 0 };
301  _ASSERT( !m_Service.empty() );
302  SSERVICE_Extra x_extra;
303  AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(m_Service.c_str()));
304  x_FillConnNetInfo(*net_info, &x_extra);
305 
306  m_AsyncStream.reset(
307  new CConn_ServiceStream(m_Service, fSERV_Any, net_info.get(), &x_extra,
308  &kZeroTimeout));
309  if (m_Timeout == kDefaultTimeout) {
310  m_Timeout = kInfiniteTimeout;
311  }
312  EIO_Status status = m_AsyncStream->Status();
313  if ( m_Canceler.NotNull() ) {
314  m_AsyncStream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
315  }
316  if (handle_buf != nullptr) {
317  CONN conn = m_AsyncStream->GetCONN();
318  if (conn != nullptr) {
319  SOCK sock = nullptr;
320  if ((status = CONN_GetSOCK(conn, &sock)) == eIO_Success
321  && sock != nullptr) {
322  status = SOCK_GetOSHandle(sock, handle_buf, handle_size);
323  }
324  }
325  }
326  return status;
327 }
328 
329 
330 template<class TRequest, class TReply>
331 inline
334  void* user_data,
335  int /*server_error*/)
336 {
337  if ( !user_data ) {
338  return eHTTP_HeaderContinue;
339  }
340  CHttpRetryContext* retry_ctx = reinterpret_cast<CHttpRetryContext*>(user_data);
341  _ASSERT(retry_ctx);
342  retry_ctx->ParseHeader(http_header);
343 
344  // Always read response body - normal content or error.
345  return eHTTP_HeaderContinue;
346 }
347 
348 
349 template<class TRequest, class TReply>
350 inline
352 {
353  return timeout == kDefaultTimeout || timeout == kInfiniteTimeout;
354 }
355 
356 
358 
359 
360 /* @} */
361 
362 #endif /* SERIAL___RPCBASE__HPP */
AutoPtr –.
Definition: ncbimisc.hpp:401
Helper hook-up class that installs default logging/registry/locking (but only if they have not yet be...
This stream exchanges data with an HTTP server located at the URL: http[s]://host[:port]/path[?...
Base class, inherited from "std::iostream", does both input and output, using the specified CONNECTOR...
This stream exchanges data with a named service, in a constraint that the service is implemented as o...
CCoreException –.
Definition: ncbiexpt.hpp:1476
HTTP-specific retry context implementation.
Definition: retry_ctx.hpp:191
CObjectIStream –.
Definition: objistr.hpp:93
CObjectOStream –.
Definition: objostr.hpp:83
CObject –.
Definition: ncbiobj.hpp:180
Base class for CRPCClient template - defines methods independent of request and response types.
CRPCClient – prototype client for ASN.1/XML-based RPC.
Definition: rpcbase.hpp:63
Base class for all serializable objects.
Definition: serialbase.hpp:150
std::ofstream out("events_result.xml")
main entry point for tests
static CS_CONNECTION * conn
Definition: ct_dynamic.c:25
element_type * get(void) const
Get pointer.
Definition: ncbimisc.hpp:469
string
Definition: cgiapp.hpp:690
#define NULL
Definition: ncbistd.hpp:225
const STimeout * GetTimeout(EIO_Event direction) const
EIO_Status SetTimeout(EIO_Event direction, const STimeout *timeout) const
Set connection timeout for "direction".
FHTTP_ParseHeader parse_header
EHTTP_HeaderParse
The extended version HTTP_CreateConnectorEx() is able to track the HTTP response chain and also chang...
EIO_Status CONN_GetSOCK(CONN conn, SOCK *sock)
Get an underlying SOCK handle for connection that is implemented as a socket.
@ fHTTP_NoAutoRetry
No auto-retries allowed.
@ fHTTP_AutoReconnect
See HTTP_CreateConnectorEx()
@ eHTTP_HeaderContinue
Parse succeeded, continue with body.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
CRPCClient(const string &service=kEmptyStr)
Definition: rpcbase.hpp:65
EIO_Status AsyncConnect(void *handle_buf, size_t handle_size)
Definition: rpcbase.hpp:297
virtual void x_WriteRequest(CObjectOStream &out, const CSerialObject &request) override
Definition: rpcbase.hpp:108
void x_ConnectURL(const string &url)
Connect to a URL.
Definition: rpcbase.hpp:227
CRPCClient(const string &service, ESerialDataFormat format)
Definition: rpcbase.hpp:69
virtual void ReadReply(CObjectIStream &in, TReply &reply)
Definition: rpcbase.hpp:93
static bool sx_IsSpecial(const STimeout *timeout)
Definition: rpcbase.hpp:351
CRPCClient(const string &service, ESerialDataFormat format, unsigned int try_limit)
Definition: rpcbase.hpp:74
static EHTTP_HeaderParse sx_ParseHeader(const char *http_header, void *user_data, int server_error)
Definition: rpcbase.hpp:333
virtual ~CRPCClient(void)
Definition: rpcbase.hpp:80
virtual void Ask(const TRequest &request, TReply &reply)
Definition: rpcbase.hpp:87
virtual void x_Connect(void) override
These run with m_Mutex already acquired.
Definition: rpcbase.hpp:199
virtual void WriteRequest(CObjectOStream &out, const TRequest &request)
Definition: rpcbase.hpp:90
const STimeout * GetTimeout(EIO_Event direction=eIO_Read) const
Definition: rpcbase.hpp:281
virtual void x_ReadReply(CObjectIStream &in, CSerialObject &reply) override
Definition: rpcbase.hpp:113
void x_FillConnNetInfo(SConnNetInfo &net_info, SSERVICE_Extra *x_extra)
Definition: rpcbase.hpp:159
static void Delete(SConnNetInfo *net_info)
Definition: rpcbase.hpp:152
virtual string GetAffinity(const TRequest &) const
Definition: rpcbase.hpp:103
unique_ptr< CConn_ServiceStream > m_AsyncStream
Definition: rpcbase.hpp:142
EIO_Status SetTimeout(const STimeout *timeout, EIO_Event direction=eIO_ReadWrite)
Definition: rpcbase.hpp:251
ESerialDataFormat
Data file format.
Definition: serialdef.hpp:71
const STimeout * m_Timeout
Cloned if not special.
Definition: rpcbase.hpp:137
void x_Ask(const CSerialObject &request, CSerialObject &reply)
Definition: rpcbase.cpp:227
virtual string x_GetAffinity(const CSerialObject &request) const override
Definition: rpcbase.hpp:118
@ eSerial_AsnBinary
ASN.1 binary.
Definition: serialdef.hpp:74
void ParseHeader(const char *http_header)
Definition: retry_ctx.cpp:67
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
@ fSERV_Any
Definition: ncbi_service.h:79
@ fSERV_DelayOpen
Don't open service until use.
Definition: ncbi_service.h:83
EIO_Status SOCK_GetOSHandle(SOCK sock, void *handle_buf, size_t handle_size)
Same as SOCK_GetOSHandleEx(sock, handle_buf, handle_size, eNoOwnership).
Definition: ncbi_socket.c:7745
#define kEmptyStr
Definition: ncbistr.hpp:123
#define kInfiniteTimeout
Definition: ncbi_types.h:82
int ConnNetInfo_OverrideUserHeader(SConnNetInfo *net_info, const char *header)
int ConnNetInfo_ParseURL(SConnNetInfo *net_info, const char *url)
int ConnNetInfo_AppendArg(SConnNetInfo *net_info, const char *arg, const char *val)
EIO_Status
I/O status.
Definition: ncbi_core.h:132
int ConnNetInfo_PostOverrideArg(SConnNetInfo *net_info, const char *arg, const char *val)
SConnNetInfo * ConnNetInfo_Create(const char *service)
struct STimeoutTag STimeout
Timeout structure.
EIO_Event
I/O event (or direction).
Definition: ncbi_core.h:118
#define kDefaultTimeout
Definition: ncbi_types.h:81
void ConnNetInfo_Destroy(SConnNetInfo *net_info)
@ eIO_NotSupported
operation is not supported or is not available
Definition: ncbi_core.h:138
@ eIO_Success
everything is fine, no error occurred
Definition: ncbi_core.h:133
@ eIO_ReadWrite
eIO_Read | eIO_Write (also, eCONN_OnFlush)
Definition: ncbi_core.h:122
@ eIO_Open
also serves as no-event indicator in SOCK_Poll
Definition: ncbi_core.h:119
@ eIO_Read
read
Definition: ncbi_core.h:120
static const STimeout kZeroTimeout
Multi-threading – mutexes; rw-locks; semaphore.
static Format format
Definition: njn_ioutil.cpp:53
std::istream & in(std::istream &in_, double &x_)
Retry context class.
ESerialDataFormat GetFormat(const string &name)
Functor template for deleting object.
Definition: ncbimisc.hpp:349
Timeout structure.
Definition: ncbi_types.h:76
#define _ASSERT
Modified on Fri Sep 20 14:57:07 2024 by modify_doxy.py rev. 669887