NCBI C++ ToolKit
cass_blob_op.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef CASSBLOBOP__HPP
2 #define CASSBLOBOP__HPP
3 
4 /* $Id: cass_blob_op.hpp 103120 2024-09-11 14:33:06Z saprykin $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Dmitri Dmitrienko
30  *
31  * File Description:
32  *
33  * BigData application layer
34  *
35  */
36 
38 #include <corelib/ncbidiag.hpp>
39 
40 #include <atomic>
41 #include <chrono>
42 #include <memory>
43 #include <string>
44 #include <utility>
45 #include <vector>
46 
47 #include "cass_driver.hpp"
48 #include "cass_exception.hpp"
49 #include "Key.hpp"
50 #include "IdCassScope.hpp"
51 #include "cass_util.hpp"
52 #include "blob_record.hpp"
53 #include "nannot/record.hpp"
54 #include "id2_split/record.hpp"
55 #include "acc_ver_hist/record.hpp"
56 
59 
60 using TBlobChunkCallback = function<void(const unsigned char * data, unsigned int size, int chunk_no)>;
61 using TPropsCallback = function<void(const SBlobStat& stat, bool isFound)>;
62 using TDataErrorCallback = function<void(CRequestStatus::ECode status, int code, EDiagSev severity, const string & message)>;
63 using TDataReadyCallback = void(*)(void*);
64 
66 {
67 protected:
69  : m_ErrorCb(other.m_ErrorCb)
70  , m_Conn(other.m_Conn)
72  , m_Async(other.m_Async)
73  , m_Keyspace(other.m_Keyspace)
74  , m_Key(other.m_Key)
75  , m_MaxRetries(other.m_MaxRetries)
76  {}
77 public:
81 
83  shared_ptr<CCassConnection> conn,
84  const string & keyspace,
85  int32_t key,
86  bool async,
87  TDataErrorCallback error_cb
88  )
89  : m_ErrorCb(std::move(error_cb))
90  , m_Conn(std::move(conn))
91  , m_Async(async)
92  , m_Keyspace(keyspace)
93  , m_Key(key)
94  {
95  if (m_Conn == nullptr) {
96  NCBI_THROW(CCassandraException, eFatal, "CCassBlobWaiter() Cassandra connection should not be nullptr");
97  }
98  }
99 
101  shared_ptr<CCassConnection> conn,
102  const string & keyspace,
103  bool async,
104  TDataErrorCallback error_cb
105  )
106  : m_ErrorCb(std::move(error_cb))
107  , m_Conn(std::move(conn))
108  , m_Async(async)
109  , m_Keyspace(keyspace)
110  {
111  if (m_Conn == nullptr) {
112  NCBI_THROW(CCassandraException, eFatal, "CCassBlobWaiter() Cassandra connection should not be nullptr");
113  }
114  }
115 
116  /// Experimental!!! May conflict with CCassConnection::SetQueryTimeoutRetry() when query timed out
117  /// and CCassQuery::RestartQuery() called to make another attempt
118  /// Currently required to test PubSeqGateway Casandra timeouts handling for multi-stage operations
119  /// (Resolution => Primary blob retrieval => ID2 split blob retrieval) when Cassandra timeout happens at
120  /// later stages of operation
121  ///
122  /// Setup individual operation timeout instead of using timeout configured for CCassConnection
123  virtual void SetQueryTimeout(std::chrono::milliseconds value);
124  virtual std::chrono::milliseconds GetQueryTimeout() const;
125 
127  {
128  CloseAll();
129  }
130 
131  bool Cancelled() const
132  {
133  return m_Cancelled;
134  }
135 
136  bool Finished() const noexcept
137  {
138  auto state = m_State.load();
139  return state == eDone || state == eError || m_Cancelled;
140  }
141 
142  /// Get internal state for debug purposes.
144  {
145  return m_State.load(std::memory_order_relaxed);
146  }
147 
148  virtual void Cancel()
149  {
150  if (m_State != eDone) {
151  m_Cancelled = true;
152  m_State = eError;
153  }
154  }
155 
156  bool Wait()
157  {
158  bool finished = Finished();
159  while (!finished) {
160  try {
161  Wait1();
162  } catch (const CCassandraException& e) {
163  // We will not re-throw here as CassandraException is not fatal
165  } catch (const exception& e) {
166  // See ID-6241 There is a requirement to catch all exceptions and continue here
168  } catch (...) {
169  // See ID-6241 There is a requirement to catch all exceptions and continue here
171  }
172  finished = Finished();
173  if (m_Async) {
174  break;
175  }
176  }
177  return finished;
178  }
179 
180  bool HasError() const
181  {
182  return !m_LastError.empty();
183  }
184 
185  string LastError() const
186  {
187  return m_LastError;
188  }
189 
190  void ClearError()
191  {
192  m_LastError.clear();
193  }
194 
195  string GetKeySpace() const
196  {
197  return m_Keyspace;
198  }
199 
200  void SetKeySpace(string const & keyspace) {
201  if (m_State != eInit) {
202  NCBI_THROW(CCassandraException, eSeqFailed, "CCassBlobWaiter: Cannot change keyspace for started task");
203  }
204  m_Keyspace = keyspace;
205  }
206 
207  int32_t GetKey() const
208  {
209  return m_Key;
210  }
211 
213  {
214  m_ErrorCb = std::move(error_cb);
215  }
216 
217  /// Set connection point parameters.
218  ///
219  /// @param value
220  /// Max number of query retries operation allows.
221  /// < 0 means not configured. Will use value provided by CCassConnection
222  /// 0 means no limit in auto-restart count,
223  /// 1 means no 2nd start -> no re-starts at all
224  /// n > 1 means n-1 restart allowed
226  {
227  m_MaxRetries = value < 0 ? -1 : value;
228  }
229 
230  int GetMaxRetries() const
231  {
232  return m_MaxRetries < 0 ? m_Conn->GetMaxRetries() : m_MaxRetries;
233  }
234 
235  void SetDataReadyCB3(shared_ptr<CCassDataCallbackReceiver> datareadycb3)
236  {
237  m_DataReadyCb3 = datareadycb3;
238  }
239 
241  {
243  }
244 
246  {
248  }
249 
250 protected:
252  eInit = 0,
253  eDone = 10000,
254  eError = -1
255  };
256  struct SQueryRec {
257  shared_ptr<CCassQuery> query;
258  unsigned int restart_count;
259  };
260 
261  void CloseAll(void)
262  {
263  for (auto & it : m_QueryArr) {
264  it.query->Close();
265  it.restart_count = 0;
266  }
267  }
268 
269  void SetupQueryCB3(shared_ptr<CCassQuery>& query)
270  {
271  auto DataReadyCb3 = m_DataReadyCb3.lock();
272  if (DataReadyCb3) {
273  query->SetOnData3(DataReadyCb3);
274  } else if (IsDataReadyCallbackExpired()) {
275  char msg[1024];
276  snprintf(msg, sizeof(msg), "Failed to setup data ready callback (expired)");
278  }
279  }
280 
281  // Returns true for expired non empty weak pointers
283  {
284  using wt = weak_ptr<CCassDataCallbackReceiver>;
285  return m_DataReadyCb3.owner_before(wt{}) || wt{}.owner_before(m_DataReadyCb3);
286  }
287 
289  int code,
290  EDiagSev severity,
291  const string & message)
292  {
293  assert(m_ErrorCb != nullptr);
294  m_State = eError;
295  m_LastError = message;
296  m_ErrorCb(status, code, severity, message);
297  }
298 
299  bool CanRestart(shared_ptr<CCassQuery> query, unsigned int restart_count) const
300  {
301  int max_retries = GetMaxRetries();
302  bool is_out_of_retries = (max_retries > 0) &&
303  (restart_count >= static_cast<unsigned int>(max_retries) - 1);
304  ERR_POST(Info << "CanRestartQ?" <<
305  " out_of_retries=" << is_out_of_retries <<
306  ", time=" << gettime() / 1000L <<
307  ", timeout=" << query->Timeout() << "ms");
308  return !is_out_of_retries && !m_Cancelled;
309  }
310 
311  // @ToDo Switch all child classes to use this method to produce CCassQuery
312  shared_ptr<CCassQuery> ProduceQuery() const;
313 
314  bool CanRestart(SQueryRec& it) const
315  {
316  return CanRestart(it.query, it.restart_count);
317  }
318 
319  bool CheckReady(shared_ptr<CCassQuery> qry, unsigned int restart_counter, bool& need_repeat)
320  {
321  need_repeat = false;
322  try {
323  if (m_Async && qry->WaitAsync(0) == ar_wait) {
324  return false;
325  }
326  return true;
327  } catch (const CCassandraException& e) {
328  if (
331  && CanRestart(qry, restart_counter))
332  {
333  need_repeat = true;
334  } else {
336  }
337  }
338 
339  return false;
340  }
341 
343  {
344  bool need_restart = false;
345  bool rv = CheckReady(it.query, it.restart_count, need_restart);
346  if (!rv && need_restart) {
347  try {
348  ++it.restart_count;
349  ERR_POST(Warning << "CCassBlobWaiter query retry: " + to_string(it.restart_count));
350  it.query->Restart();
351  } catch (const exception& ex) {
352  ERR_POST(NCBI_NS_NCBI::Error << "Failed to restart query (p2): " << ex.what());
353  throw;
354  }
355  }
356  return rv;
357  }
358 
360  {
361  return m_ReadConsistency;
362  }
363 
365  {
366  return m_WriteConsistency;
367  }
368 
369  bool CheckMaxActive();
370  virtual void Wait1() = 0;
371 
373  weak_ptr<CCassDataCallbackReceiver> m_DataReadyCb3;
374 
375  // @ToDo Make it private with protected accessor
376  shared_ptr<CCassConnection> m_Conn;
377 
378  std::chrono::milliseconds m_QueryTimeout{0};
379  atomic<int32_t> m_State{eInit};
380  string m_LastError;
381  bool m_Async{true};
382  atomic_bool m_Cancelled{false};
383  vector<SQueryRec> m_QueryArr;
384 
385 private:
386  string m_Keyspace;
388  int m_MaxRetries{-1};
389 
392 };
393 
394 class CCassBlobOp: public enable_shared_from_this<CCassBlobOp>
395 {
396 public:
397  enum EBlopOpFlag {
400  eFlagOpSet
401  };
402 
403 public:
404  CCassBlobOp(CCassBlobOp&&) = default;
406  CCassBlobOp(const CCassBlobOp&) = delete;
407  CCassBlobOp& operator=(const CCassBlobOp&) = delete;
408 
409  explicit CCassBlobOp(shared_ptr<CCassConnection> conn)
410  : m_Conn(conn)
411  {
412  m_Keyspace = m_Conn->Keyspace();
413  }
414 
415  virtual ~CCassBlobOp()
416  {
417  m_Conn = nullptr;
418  }
419 
420  NCBI_STD_DEPRECATED("GetBlobChunkSize() is deprecated and will be deleted after 08/01/2023")
421  void GetBlobChunkSize(unsigned int timeout_ms, const string & keyspace, int64_t * chunk_size);
422 
423  NCBI_STD_DEPRECATED("GetBigBlobSizeLimit() is deprecated and will be deleted after 08/01/2023")
424  void GetBigBlobSizeLimit(unsigned int timeout_ms, const string & keyspace, int64_t * value);
425 
426  /// It is unsafe to use this function.
427  /// Call should be protected in multi-threaded environment.
428  void SetKeyspace(const string & keyspace)
429  {
430  m_Keyspace = keyspace;
431  }
432 
433  string GetKeyspace() const
434  {
435  return m_Keyspace;
436  }
437 
438  NCBI_STD_DEPRECATED("GetSetting() is deprecated and will be deleted after 08/01/2023")
439  bool GetSetting(unsigned int op_timeout_ms, const string & domain, const string & name, string & value);
440 
441  shared_ptr<CCassConnection> GetConn()
442  {
443  if (!m_Conn) {
444  NCBI_THROW(CCassandraException, eSeqFailed, "CCassBlobOp instance is not initialized with DB connection");
445  }
446  return m_Conn;
447  }
448 
449 private:
450  shared_ptr<CCassConnection> m_Conn;
451  string m_Keyspace;
452 };
453 
455 
456 #endif // CASSBLOBOP__HPP
#define END_IDBLOB_SCOPE
Definition: IdCassScope.hpp:40
#define BEGIN_IDBLOB_SCOPE
Definition: IdCassScope.hpp:39
void(*)(void *) TDataReadyCallback
function< void(const SBlobStat &stat, bool isFound)> TPropsCallback
function< void(const unsigned char *data, unsigned int size, int chunk_no)> TBlobChunkCallback
function< void(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)> TDataErrorCallback
USING_NCBI_SCOPE
CassConsistency TCassConsistency
Definition: cass_driver.hpp:99
@ ar_wait
Definition: cass_driver.hpp:68
int64_t gettime(void)
Definition: cass_util.cpp:41
CCassBlobOp(const CCassBlobOp &)=delete
string GetKeyspace() const
virtual ~CCassBlobOp()
void SetKeyspace(const string &keyspace)
It is unsafe to use this function.
void GetBigBlobSizeLimit(unsigned int timeout_ms, const string &keyspace, int64_t *value)
CCassBlobOp(CCassBlobOp &&)=default
CCassBlobOp & operator=(const CCassBlobOp &)=delete
bool GetSetting(unsigned int op_timeout_ms, const string &domain, const string &name, string &value)
shared_ptr< CCassConnection > m_Conn
CCassBlobOp(shared_ptr< CCassConnection > conn)
void GetBlobChunkSize(unsigned int timeout_ms, const string &keyspace, int64_t *chunk_size)
shared_ptr< CCassConnection > GetConn()
string m_Keyspace
CCassBlobOp & operator=(CCassBlobOp &&)=default
bool CheckMaxActive()
CCassBlobWaiter.
void SetWriteConsistency(TCassConsistency value)
atomic< int32_t > m_State
bool Cancelled() const
void SetErrorCB(TDataErrorCallback error_cb)
CCassBlobWaiter(shared_ptr< CCassConnection > conn, const string &keyspace, int32_t key, bool async, TDataErrorCallback error_cb)
atomic_bool m_Cancelled
std::chrono::milliseconds m_QueryTimeout
void SetMaxRetries(int value)
Set connection point parameters.
void SetKeySpace(string const &keyspace)
shared_ptr< CCassConnection > m_Conn
string GetKeySpace() const
int32_t GetKey() const
CCassBlobWaiter & operator=(CCassBlobWaiter &&)=delete
string LastError() const
TCassConsistency GetWriteConsistency() const
int32_t GetStateDebug() const noexcept
Get internal state for debug purposes.
void SetDataReadyCB3(shared_ptr< CCassDataCallbackReceiver > datareadycb3)
TCassConsistency m_ReadConsistency
bool Finished() const noexcept
TCassConsistency GetReadConsistency() const
weak_ptr< CCassDataCallbackReceiver > m_DataReadyCb3
void SetupQueryCB3(shared_ptr< CCassQuery > &query)
CCassBlobWaiter(const CCassBlobWaiter &other)
bool CheckReady(SQueryRec &it)
bool CheckReady(shared_ptr< CCassQuery > qry, unsigned int restart_counter, bool &need_repeat)
TCassConsistency m_WriteConsistency
int GetMaxRetries() const
bool IsDataReadyCallbackExpired() const
void SetReadConsistency(TCassConsistency value)
void CloseAll(void)
CCassBlobWaiter & operator=(const CCassBlobWaiter &)=delete
bool HasError() const
vector< SQueryRec > m_QueryArr
bool CanRestart(shared_ptr< CCassQuery > query, unsigned int restart_count) const
shared_ptr< CCassQuery > ProduceQuery() const
virtual std::chrono::milliseconds GetQueryTimeout() const
virtual void SetQueryTimeout(std::chrono::milliseconds value)
Experimental!!! May conflict with CCassConnection::SetQueryTimeoutRetry() when query timed out and CC...
TDataErrorCallback m_ErrorCb
virtual void Cancel()
bool CanRestart(SQueryRec &it) const
virtual ~CCassBlobWaiter()
virtual void Wait1()=0
void Error(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
CCassBlobWaiter(CCassBlobWaiter &&)=delete
CCassBlobWaiter(shared_ptr< CCassConnection > conn, const string &keyspace, bool async, TDataErrorCallback error_cb)
static constexpr TCassConsistency kLocalQuorum
static const int chunk_size
@ eFatal
static CS_CONNECTION * conn
Definition: ct_dynamic.c:25
#define bool
Definition: bool.h:34
char data[12]
Definition: iconv.c:80
Int4 int32_t
Int8 int64_t
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
void Error(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1197
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
Definition: ncbiexpt.cpp:342
void Info(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1185
#define NCBI_STD_DEPRECATED(message)
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::KEY key
const GenericPointer< typename T::ValueType > T2 value
Definition: pointer.h:1227
Defines NCBI C++ diagnostic APIs, classes, and macros.
Defines CRequestStatus class for NCBI C++ diagnostic API.
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
#define assert(x)
Definition: srv_diag.hpp:58
shared_ptr< CCassQuery > query
static string query
Definition: inftrees.h:24
#define const
Definition: zconf.h:232
Modified on Fri Sep 20 14:57:01 2024 by modify_doxy.py rev. 669887