4 /* $Id: rpcbase.hpp 91079 2020-09-01 17:28:37Z grichenk $
5  * ===========================================================================
6  *
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Author: Aaron Ucko, NCBI
30  *
31  * File Description:
32  * Generic template class for ASN.1/XML RPC clients
33  *
34  */
36 #include <corelib/ncbimtx.hpp>
37 #include <corelib/ncbi_system.hpp>
39 #include <connect/ncbi_util.h>
40 #include <serial/objistr.hpp>
41 #include <serial/objostr.hpp>
42 #include <serial/serial.hpp>
43 #include <util/retry_ctx.hpp>
44 #include <serial/rpcbase_impl.hpp>
46 /** @addtogroup GenClassSupport
47  *
48  * @{
49  */
54 /// CRPCClient -- prototype client for ASN.1/XML-based RPC.
55 /// Normally connects automatically on the first real request and
56 /// disconnects automatically in the destructor, but allows both events
57 /// to occur explicitly.
59 template <class TRequest, class TReply>
60 class CRPCClient : public CObject,
61  public CRPCClient_Base,
62  protected CConnIniter
63 {
64 public:
65  CRPCClient(const string& service = kEmptyStr)
68  {}
69  CRPCClient(const string& service,
71  : CRPCClient_Base(service, format),
73  {}
74  CRPCClient(const string& service,
76  unsigned int try_limit)
77  : CRPCClient_Base(service, format, try_limit),
79  {}
80  virtual ~CRPCClient(void)
81  {
82  if ( !sx_IsSpecial(m_Timeout) ) {
83  delete const_cast<STimeout*>(m_Timeout);
84  }
85  }
87  virtual void Ask(const TRequest& request, TReply& reply)
88  { x_Ask(request, reply); }
90  virtual void WriteRequest(CObjectOStream& out, const TRequest& request)
91  { out << request; }
93  virtual void ReadReply(CObjectIStream& in, TReply& reply)
94  { in >> reply; }
96  EIO_Status SetTimeout(const STimeout* timeout,
97  EIO_Event direction = eIO_ReadWrite);
98  const STimeout* GetTimeout(EIO_Event direction = eIO_Read) const;
100  EIO_Status AsyncConnect(void* handle_buf, size_t handle_size);
102 protected:
103  virtual string GetAffinity(const TRequest& /*request*/) const
104  {
105  return string();
106  }
108  virtual void x_WriteRequest(CObjectOStream& out, const CSerialObject& request) override
109  {
110  WriteRequest(out, dynamic_cast<const TRequest&>(request));
111  }
113  virtual void x_ReadReply(CObjectIStream& in, CSerialObject& reply) override
114  {
115  ReadReply(in, dynamic_cast<TReply&>(reply));
116  }
118  virtual string x_GetAffinity(const CSerialObject& request) const override
119  {
120  return GetAffinity(dynamic_cast<const TRequest&>(request));
121  }
123  virtual void x_Connect(void) override;
125  /// Connect to a URL. (Discouraged; please establish and use a
126  /// suitable named service if possible.)
127  void x_ConnectURL(const string& url);
129  // CConn_HttpStream callback for parsing headers.
130  // 'user_data' must point to an instance of CRPCConnStatus.
131  static EHTTP_HeaderParse sx_ParseHeader(const char* http_header,
132  void* user_data,
133  int server_error);
135  static bool sx_IsSpecial(const STimeout* timeout);
137  const STimeout* m_Timeout; ///< Cloned if not special.
139 private:
140  void x_FillConnNetInfo(SConnNetInfo& net_info, SSERVICE_Extra* x_extra);
142  unique_ptr<CConn_ServiceStream> m_AsyncStream;
143 };
146 ///////////////////////////////////////////////////////////////////////////
147 // Inline methods
149 template<>
151 {
152  static void Delete(SConnNetInfo* net_info)
153  { ConnNetInfo_Destroy(net_info); }
154 };
157 template<class TRequest, class TReply>
158 inline
160  SSERVICE_Extra* x_extra)
161 {
162  if ( !m_Args.empty() ) {
163  if ( !ConnNetInfo_AppendArg(&net_info, m_Args.c_str(), 0) ) {
165  "Error sending additional request arguments");
166  }
167  }
168  if ( m_RetryCtx.IsSetArgs() ) {
169  if ( !ConnNetInfo_AppendArg(&net_info, m_RetryCtx.GetArgs().c_str(),
170  0) ) {
172  "Error sending retry context arguments");
173  }
174  } else if (x_extra != nullptr && !m_Affinity.empty()) {
175  if ( !ConnNetInfo_PostOverrideArg(&net_info, m_Affinity.c_str(), 0) ) {
177  "Error sending request affinity");
178  }
179  }
180  if (x_extra == nullptr) {
181  return;
182  }
183  // Install callback for parsing headers.
184  memset(x_extra, 0, sizeof(*x_extra));
185  x_extra->data = &m_RetryCtx;
186  x_extra->parse_header = sx_ParseHeader;
187  x_extra->flags = fHTTP_NoAutoRetry;
188  const char* user_header = GetContentTypeHeader(GetFormat());
189  if (user_header != NULL && *user_header != '\0') {
190  if ( !ConnNetInfo_OverrideUserHeader(&net_info, user_header)) {
192  "Error sending user header");
193  }
194  }
195 }
197 template<class TRequest, class TReply>
198 inline
200 {
201  if (m_AsyncStream.get() != nullptr) {
202  m_AsyncStream->SetTimeout(eIO_Open, m_Timeout);
203  m_AsyncStream->SetTimeout(eIO_ReadWrite, m_Timeout);
204  x_SetStream(m_AsyncStream.release());
205  return;
206  } else if ( m_RetryCtx.IsSetUrl() ) {
207  x_ConnectURL(m_RetryCtx.GetUrl());
208  return;
209  }
210  _ASSERT( !m_Service.empty() );
211  SSERVICE_Extra x_extra;
212  AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(m_Service.c_str()));
213  x_FillConnNetInfo(*net_info, &x_extra);
215  unique_ptr<CConn_ServiceStream> stream
217  (m_Service, fSERV_Any | fSERV_DelayOpen, net_info.get(), &x_extra, m_Timeout));
218  if ( m_Canceler.NotNull() ) {
219  stream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
220  }
221  x_SetStream(stream.release());
222 }
225 template<class TRequest, class TReply>
226 inline
228 {
229  AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(nullptr));
230  if ( !ConnNetInfo_ParseURL(net_info.get(), url.c_str()) ) {
231  NCBI_THROW(CCoreException, eInvalidArg, "Error parsing URL " + url);
232  }
233  x_FillConnNetInfo(*net_info, nullptr);
234  unique_ptr<CConn_HttpStream> stream(new CConn_HttpStream(net_info.get(),
235  GetContentTypeHeader(GetFormat()),
236  sx_ParseHeader, // callback
237  &m_RetryCtx, // user data for the callback
238  0, // adjust callback
239  0, // cleanup callback
241  m_Timeout));
242  if ( m_Canceler.NotNull() ) {
243  stream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
244  }
245  x_SetStream(stream.release());
246 }
249 template<class TRequest, class TReply>
250 inline
252  EIO_Event direction)
253 {
254  // save for future use, especially if there's no stream at present.
255  {{
256  const STimeout* old_timeout = m_Timeout;
257  if (sx_IsSpecial(timeout)) {
258  m_Timeout = timeout;
259  } else { // make a copy
260  m_Timeout = new STimeout(*timeout);
261  }
262  if ( !sx_IsSpecial(old_timeout) ) {
263  delete const_cast<STimeout*>(old_timeout);
264  }
265  }}
267  CConn_IOStream* conn_stream
268  = dynamic_cast<CConn_IOStream*>(m_Stream.get());
269  if (conn_stream) {
270  return conn_stream->SetTimeout(direction, timeout);
271  } else if ( !m_Stream.get() ) {
272  return eIO_Success; // we've saved it, which is the best we can do...
273  } else {
274  return eIO_NotSupported;
275  }
276 }
279 template<class TRequest, class TReply>
280 inline
282  const
283 {
284  CConn_IOStream* conn_stream
285  = dynamic_cast<CConn_IOStream*>(m_Stream.get());
286  if (conn_stream) {
287  return conn_stream->GetTimeout(direction);
288  }
289  else {
290  return m_Timeout;
291  }
292 }
295 template<class TRequest, class TReply>
296 inline
298  size_t handle_size)
299 {
300  static const STimeout kZeroTimeout = { 0, 0 };
301  _ASSERT( !m_Service.empty() );
302  SSERVICE_Extra x_extra;
303  AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(m_Service.c_str()));
304  x_FillConnNetInfo(*net_info, &x_extra);
306  m_AsyncStream.reset(
307  new CConn_ServiceStream(m_Service, fSERV_Any, net_info.get(), &x_extra,
308  &kZeroTimeout));
309  if (m_Timeout == kDefaultTimeout) {
310  m_Timeout = kInfiniteTimeout;
311  }
312  EIO_Status status = m_AsyncStream->Status();
313  if ( m_Canceler.NotNull() ) {
314  m_AsyncStream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
315  }
316  if (handle_buf != nullptr) {
317  CONN conn = m_AsyncStream->GetCONN();
318  if (conn != nullptr) {
319  SOCK sock = nullptr;
320  if ((status = CONN_GetSOCK(conn, &sock)) == eIO_Success
321  && sock != nullptr) {
322  status = SOCK_GetOSHandle(sock, handle_buf, handle_size);
323  }
324  }
325  }
326  return status;
327 }
330 template<class TRequest, class TReply>
331 inline
334  void* user_data,
335  int /*server_error*/)
336 {
337  if ( !user_data ) {
338  return eHTTP_HeaderContinue;
339  }
340  CHttpRetryContext* retry_ctx = reinterpret_cast<CHttpRetryContext*>(user_data);
341  _ASSERT(retry_ctx);
342  retry_ctx->ParseHeader(http_header);
344  // Always read response body - normal content or error.
345  return eHTTP_HeaderContinue;
346 }
349 template<class TRequest, class TReply>
350 inline
352 {
353  return timeout == kDefaultTimeout || timeout == kInfiniteTimeout;
354 }
360 /* @} */
362 #endif /* SERIAL___RPCBASE__HPP */
