NCBI C++ ToolKit
cass_blob_op.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef CASSBLOBOP__HPP
2 #define CASSBLOBOP__HPP
3 
4 /* $Id: cass_blob_op.hpp 99865 2023-05-17 18:57:22Z saprykin $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Dmitri Dmitrienko
30  *
31  * File Description:
32  *
33  * BigData application layer
34  *
35  */
36 
38 #include <corelib/ncbidiag.hpp>
39 
40 #include <atomic>
41 #include <vector>
42 #include <utility>
43 #include <string>
44 #include <memory>
45 
46 #include "cass_driver.hpp"
47 #include "cass_exception.hpp"
48 #include "Key.hpp"
49 #include "IdCassScope.hpp"
50 #include "cass_util.hpp"
51 #include "blob_record.hpp"
52 #include "nannot/record.hpp"
53 #include "id2_split/record.hpp"
54 #include "acc_ver_hist/record.hpp"
55 
58 
59 using TBlobChunkCallback = function<void(const unsigned char * data, unsigned int size, int chunk_no)>;
60 using TPropsCallback = function<void(const SBlobStat& stat, bool isFound)>;
61 using TDataErrorCallback = function<void(CRequestStatus::ECode status, int code, EDiagSev severity, const string & message)>;
62 using TDataReadyCallback = void(*)(void*);
63 
65 {
66  public:
67  CCassBlobWaiter(const CCassBlobWaiter&) = delete;
71 
73  shared_ptr<CCassConnection> conn,
74  const string & keyspace,
75  int32_t key,
76  bool async,
77  TDataErrorCallback error_cb
78  )
79  : m_ErrorCb(move(error_cb))
80  , m_Conn(move(conn))
81  , m_Async(async)
82  , m_Keyspace(keyspace)
83  , m_Key(key)
84  {
85  if (m_Conn == nullptr) {
86  NCBI_THROW(CCassandraException, eFatal, "CCassBlobWaiter() Cassandra connection should not be nullptr");
87  }
88  }
89 
91  shared_ptr<CCassConnection> conn,
92  const string & keyspace,
93  bool async,
94  TDataErrorCallback error_cb
95  )
96  : m_ErrorCb(move(error_cb))
97  , m_Conn(move(conn))
98  , m_Async(async)
99  , m_Keyspace(keyspace)
100  {
101  if (m_Conn == nullptr) {
102  NCBI_THROW(CCassandraException, eFatal, "CCassBlobWaiter() Cassandra connection should not be nullptr");
103  }
104  }
105 
107  {
108  CloseAll();
109  }
110 
111  bool Cancelled() const
112  {
113  return m_Cancelled;
114  }
115 
116  virtual void Cancel()
117  {
118  if (m_State != eDone) {
119  m_Cancelled = true;
120  m_State = eError;
121  }
122  }
123 
124  bool Wait()
125  {
126  while (m_State != eDone && m_State != eError && !m_Cancelled) {
127  try {
128  Wait1();
129  } catch (const CCassandraException& e) {
130  // We will not re-throw here as CassandraException is not fatal
132  } catch (const exception& e) {
133  // See ID-6241 There is a requirement to catch all exceptions and continue here
135  } catch (...) {
136  // See ID-6241 There is a requirement to catch all exceptions and continue here
138  }
139  if (m_Async) {
140  break;
141  }
142  }
143  return (m_State == eDone || m_State == eError || m_Cancelled);
144  }
145 
146  bool HasError() const
147  {
148  return !m_LastError.empty();
149  }
150 
151  string LastError() const
152  {
153  return m_LastError;
154  }
155 
156  void ClearError()
157  {
158  m_LastError.clear();
159  }
160 
161  string GetKeySpace() const
162  {
163  return m_Keyspace;
164  }
165 
166  void SetKeySpace(string const & keyspace) {
167  if (m_State != eInit) {
168  NCBI_THROW(CCassandraException, eSeqFailed, "CCassBlobWaiter: Cannot change keyspace for started task");
169  }
170  m_Keyspace = keyspace;
171  }
172 
173  int32_t GetKey() const
174  {
175  return m_Key;
176  }
177 
179  {
180  m_ErrorCb = std::move(error_cb);
181  }
182 
183  /// Set connection point parameters.
184  ///
185  /// @param value
186  /// Max number of query retries operation allows.
187  /// < 0 means not configured. Will use value provided by CCassConnection
188  /// 0 means no limit in auto-restart count,
189  /// 1 means no 2nd start -> no re-starts at all
190  /// n > 1 means n-1 restart allowed
192  {
193  m_MaxRetries = value < 0 ? -1 : value;
194  }
195 
196  int GetMaxRetries() const
197  {
198  return m_MaxRetries < 0 ? m_Conn->GetMaxRetries() : m_MaxRetries;
199  }
200 
201  void SetDataReadyCB3(shared_ptr<CCassDataCallbackReceiver> datareadycb3)
202  {
203  m_DataReadyCb3 = datareadycb3;
204  }
205 
206  protected:
208  eInit = 0,
209  eDone = 10000,
210  eError = -1
211  };
212  struct SQueryRec {
213  shared_ptr<CCassQuery> query;
214  unsigned int restart_count;
215  };
216 
217  void CloseAll(void)
218  {
219  for (auto & it : m_QueryArr) {
220  it.query->Close();
221  it.restart_count = 0;
222  }
223  }
224 
225  void SetupQueryCB3(shared_ptr<CCassQuery>& query)
226  {
227  auto DataReadyCb3 = m_DataReadyCb3.lock();
228  if (DataReadyCb3) {
229  query->SetOnData3(DataReadyCb3);
230  } else if (IsDataReadyCallbackExpired()) {
231  char msg[1024];
232  snprintf(msg, sizeof(msg), "Failed to setup data ready callback (expired)");
234  }
235  }
236 
237  // Returns true for expired non empty weak pointers
239  {
240  using wt = weak_ptr<CCassDataCallbackReceiver>;
241  return m_DataReadyCb3.owner_before(wt{}) || wt{}.owner_before(m_DataReadyCb3);
242  }
243 
245  int code,
246  EDiagSev severity,
247  const string & message)
248  {
249  assert(m_ErrorCb != nullptr);
250  m_State = eError;
251  m_LastError = message;
252  m_ErrorCb(status, code, severity, message);
253  }
254 
255  bool CanRestart(shared_ptr<CCassQuery> query, unsigned int restart_count) const
256  {
257  int max_retries = GetMaxRetries();
258  bool is_out_of_retries = (max_retries > 0) &&
259  (restart_count >= static_cast<unsigned int>(max_retries) - 1);
260  ERR_POST(Info << "CanRestartQ?" <<
261  " out_of_retries=" << is_out_of_retries <<
262  ", time=" << gettime() / 1000L <<
263  ", timeout=" << query->Timeout() << "ms");
264  return !is_out_of_retries && !m_Cancelled;
265  }
266 
267  bool CanRestart(SQueryRec& it) const
268  {
269  return CanRestart(it.query, it.restart_count);
270  }
271 
272  bool CheckReady(shared_ptr<CCassQuery> qry, unsigned int restart_counter, bool& need_repeat)
273  {
274  need_repeat = false;
275  try {
276  if (m_Async && qry->WaitAsync(0) == ar_wait) {
277  return false;
278  }
279  return true;
280  } catch (const CCassandraException& e) {
281  if (
284  && CanRestart(qry, restart_counter))
285  {
286  need_repeat = true;
287  } else {
289  }
290  }
291 
292  return false;
293  }
294 
296  {
297  bool need_restart = false;
298  bool rv = CheckReady(it.query, it.restart_count, need_restart);
299  if (!rv && need_restart) {
300  try {
301  ++it.restart_count;
302  ERR_POST(Warning << "CCassBlobWaiter query retry: " + to_string(it.restart_count));
303  it.query->Restart();
304  } catch (const exception& ex) {
305  ERR_POST(NCBI_NS_NCBI::Error << "Failed to restart query (p2): " << ex.what());
306  throw;
307  }
308  }
309  return rv;
310  }
311 
312  CassConsistency GetQueryConsistency(void)
313  {
314  return CASS_CONSISTENCY_LOCAL_QUORUM;
315  }
316 
317  bool CheckMaxActive();
318  virtual void Wait1() = 0;
319 
321  weak_ptr<CCassDataCallbackReceiver> m_DataReadyCb3;
322  shared_ptr<CCassConnection> m_Conn;
323  atomic<int32_t> m_State{eInit};
324  string m_LastError;
325  bool m_Async;
326  atomic_bool m_Cancelled{false};
327  vector<SQueryRec> m_QueryArr;
328 
329  private:
330  string m_Keyspace;
332  int m_MaxRetries{-1};
333 };
334 
335 class CCassBlobOp: public enable_shared_from_this<CCassBlobOp>
336 {
337  public:
338  enum EBlopOpFlag {
341  eFlagOpSet
342  };
343 
344  public:
345  CCassBlobOp(CCassBlobOp&&) = default;
347  CCassBlobOp(const CCassBlobOp&) = delete;
348  CCassBlobOp& operator=(const CCassBlobOp&) = delete;
349 
350  explicit CCassBlobOp(shared_ptr<CCassConnection> conn)
351  : m_Conn(conn)
352  {
353  m_Keyspace = m_Conn->Keyspace();
354  }
355 
356  virtual ~CCassBlobOp()
357  {
358  m_Conn = nullptr;
359  }
360 
361  NCBI_STD_DEPRECATED("GetBlobChunkSize() is deprecated and will be deleted after 08/01/2023")
362  void GetBlobChunkSize(unsigned int timeout_ms, const string & keyspace, int64_t * chunk_size);
363 
364  NCBI_STD_DEPRECATED("GetBigBlobSizeLimit() is deprecated and will be deleted after 08/01/2023")
365  void GetBigBlobSizeLimit(unsigned int timeout_ms, const string & keyspace, int64_t * value);
366 
367  /// It is unsafe to use this function.
368  /// Call should be protected in multi-threaded environment.
369  void SetKeyspace(const string & keyspace)
370  {
371  m_Keyspace = keyspace;
372  }
373 
374  string GetKeyspace() const
375  {
376  return m_Keyspace;
377  }
378 
379  NCBI_STD_DEPRECATED("GetSetting() is deprecated and will be deleted after 08/01/2023")
380  bool GetSetting(unsigned int op_timeout_ms, const string & domain, const string & name, string & value);
381 
382  shared_ptr<CCassConnection> GetConn()
383  {
384  if (!m_Conn) {
385  NCBI_THROW(CCassandraException, eSeqFailed, "CCassBlobOp instance is not initialized with DB connection");
386  }
387  return m_Conn;
388  }
389 
390  private:
391  shared_ptr<CCassConnection> m_Conn;
392  string m_Keyspace;
393 };
394 
396 
397 #endif
#define END_IDBLOB_SCOPE
Definition: IdCassScope.hpp:40
#define BEGIN_IDBLOB_SCOPE
Definition: IdCassScope.hpp:39
#define bool
Definition: bool.h:34
void(*)(void *) TDataReadyCallback
function< void(const SBlobStat &stat, bool isFound)> TPropsCallback
function< void(const unsigned char *data, unsigned int size, int chunk_no)> TBlobChunkCallback
function< void(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)> TDataErrorCallback
USING_NCBI_SCOPE
@ ar_wait
Definition: cass_driver.hpp:68
int64_t gettime(void)
Definition: cass_util.cpp:41
CCassBlobOp(const CCassBlobOp &)=delete
string GetKeyspace() const
virtual ~CCassBlobOp()
void SetKeyspace(const string &keyspace)
It is unsafe to use this function.
void GetBigBlobSizeLimit(unsigned int timeout_ms, const string &keyspace, int64_t *value)
CCassBlobOp(CCassBlobOp &&)=default
CCassBlobOp & operator=(const CCassBlobOp &)=delete
bool GetSetting(unsigned int op_timeout_ms, const string &domain, const string &name, string &value)
shared_ptr< CCassConnection > m_Conn
CCassBlobOp(shared_ptr< CCassConnection > conn)
void GetBlobChunkSize(unsigned int timeout_ms, const string &keyspace, int64_t *chunk_size)
shared_ptr< CCassConnection > GetConn()
string m_Keyspace
CCassBlobOp & operator=(CCassBlobOp &&)=default
bool CheckMaxActive()
CCassBlobWaiter.
atomic< int32_t > m_State
bool Cancelled() const
void SetErrorCB(TDataErrorCallback error_cb)
CCassBlobWaiter(shared_ptr< CCassConnection > conn, const string &keyspace, int32_t key, bool async, TDataErrorCallback error_cb)
atomic_bool m_Cancelled
void SetMaxRetries(int value)
Set connection point parameters.
void SetKeySpace(string const &keyspace)
shared_ptr< CCassConnection > m_Conn
string GetKeySpace() const
int32_t GetKey() const
CCassBlobWaiter & operator=(CCassBlobWaiter &&)=delete
string LastError() const
void SetDataReadyCB3(shared_ptr< CCassDataCallbackReceiver > datareadycb3)
CCassBlobWaiter(const CCassBlobWaiter &)=delete
weak_ptr< CCassDataCallbackReceiver > m_DataReadyCb3
CassConsistency GetQueryConsistency(void)
void SetupQueryCB3(shared_ptr< CCassQuery > &query)
bool CheckReady(SQueryRec &it)
bool CheckReady(shared_ptr< CCassQuery > qry, unsigned int restart_counter, bool &need_repeat)
int GetMaxRetries() const
bool IsDataReadyCallbackExpired() const
void CloseAll(void)
CCassBlobWaiter & operator=(const CCassBlobWaiter &)=delete
bool HasError() const
vector< SQueryRec > m_QueryArr
bool CanRestart(shared_ptr< CCassQuery > query, unsigned int restart_count) const
TDataErrorCallback m_ErrorCb
virtual void Cancel()
bool CanRestart(SQueryRec &it) const
virtual ~CCassBlobWaiter()
virtual void Wait1()=0
void Error(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
CCassBlobWaiter(CCassBlobWaiter &&)=delete
CCassBlobWaiter(shared_ptr< CCassConnection > conn, const string &keyspace, bool async, TDataErrorCallback error_cb)
static const int chunk_size
char value[7]
Definition: config.c:431
static CS_CONNECTION * conn
Definition: ct_dynamic.c:25
@ eFatal
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
void Error(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1197
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
Definition: ncbiexpt.cpp:342
void Info(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1185
#define NCBI_STD_DEPRECATED(message)
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::KEY key
Defines NCBI C++ diagnostic APIs, classes, and macros.
Defines CRequestStatus class for NCBI C++ diagnostic API.
#define assert(x)
Definition: srv_diag.hpp:58
signed __int64 int64_t
Definition: stdint.h:135
signed int int32_t
Definition: stdint.h:123
shared_ptr< CCassQuery > query
static string query
Definition: inftrees.h:24
#define const
Definition: zconf.h:230
Modified on Thu Nov 30 04:56:55 2023 by modify_doxy.py rev. 669887