NCBI C++ ToolKit
cass_query_list.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 
2 #ifndef CASS_QUERY_LIST__HPP
3 #define CASS_QUERY_LIST__HPP
4 
5 #include <memory>
6 
9 
11 
13 
14 class CCassQueryList;
15 
16 using TCassQueryListTickCB = function<void()>;
17 
19 public:
25  virtual ~ICassQueryListConsumer() = default;
26  virtual bool Start(shared_ptr<CCassQuery> query, CCassQueryList& list, size_t query_idx) = 0;
27  virtual bool Finish(shared_ptr<CCassQuery>, CCassQueryList&, size_t /*query_idx*/) {
28  return true;
29  }
30  virtual bool ProcessRow(shared_ptr<CCassQuery> query, CCassQueryList& list, size_t query_idx) = 0;
31  virtual void Reset(shared_ptr<CCassQuery>, CCassQueryList&, size_t /*query_idx*/) {}
32  virtual void Failed(shared_ptr<CCassQuery> query, CCassQueryList& list, size_t query_idx, const exception* e) = 0;
33 };
34 
36 public:
37  static constexpr const unsigned int kDfltMaxQuery = 128;
38  static constexpr const uint64_t kReadyPushWaitTimeout = 500000;
39  static constexpr const uint64_t kReadyPopWaitTimeout = 500;
40  static constexpr const size_t kNotifyQueueLen = 2048;
41  static constexpr const unsigned int kResetRelaxTime = 10;
42  static shared_ptr<CCassQueryList> Create(shared_ptr<CCassConnection> cass_conn) noexcept;
43  virtual ~CCassQueryList();
44 
45  CCassQueryList& SetMaxQueries(size_t max_queries);
46  CCassQueryList& SetKeyspace(const string& keyspace);
48  string GetKeyspace() const;
49  size_t GetMaxQueries() const;
50  bool HasError() const;
51  size_t NumberOfActiveQueries() const;
52  size_t NumberOfBusySlots() const;
53  size_t NumberOfPendingSlots() const;
54  string ToString() const;
55 
56  void Finalize();
57  void Cancel(const exception* e = nullptr);
58  void Cancel(ICassQueryListConsumer* consumer, const exception* e = nullptr);
59  void Execute(unique_ptr<ICassQueryListConsumer> consumer, int retry_count, bool post_async = false);
60  bool HasEmptySlot();
61  void Yield(bool wait);
62  shared_ptr<CCassQuery> Extract(size_t slot_index);
63 protected:
70  {}
71 private:
78  ssLast = ssReleasing + 1
79  };
80  static string SQrySlotStateStr[ssLast];
81  struct SQrySlot {
82  unique_ptr<ICassQueryListConsumer> m_consumer;
83  shared_ptr<CCassQuery> m_qry;
84  size_t m_index;
87  };
89  public:
90  CQryNotification(shared_ptr<CCassQueryList> query_list, size_t index);
91  virtual void OnData() override;
92  private:
93  weak_ptr<CCassQueryList> m_query_list;
94  size_t m_index;
95  };
96  struct SPendingSlot {
97  unique_ptr<ICassQueryListConsumer> m_consumer;
99  };
100 
101  void Tick();
102  SQrySlot* CheckSlots(bool discard, bool wait = true);
103  SQrySlot* CheckSlot(size_t index, bool discard);
104  void CheckPending(SQrySlot* slot);
105  void AttachSlot(SQrySlot* slot, SPendingSlot&& pending_slot);
106  void DetachSlot(SQrySlot* slot);
107  void ReadRows(SQrySlot* slot);
108  void Release(SQrySlot* slot);
109  void CheckAccess();
110 
111  weak_ptr<CCassQueryList> m_self_weak;
112  shared_ptr<CCassConnection> m_cass_conn;
113  vector<SQrySlot> m_query_arr;
117  vector<SPendingSlot> m_pending_arr;
118  vector<shared_ptr<CQryNotification>> m_notification_arr;
120  atomic_bool m_yield_in_progress;
121  string m_keyspace;
122  atomic_size_t m_attached_slots;
123  atomic<thread::id> m_owning_thread;
124 };
125 
127 public:
128  CCassOneExecConsumer(function<bool(CCassQuery& query, CCassQueryList& list)> cb, function<void(CCassQuery& query, CCassQueryList& list, bool succeeded)> finish_cb = nullptr) :
129  m_cb(cb),
130  m_finish_cb(finish_cb),
134  {}
139  bool Start(shared_ptr<CCassQuery> query, CCassQueryList& list, size_t /*qry_index*/) override {
142  m_is_started = true;
143  return m_cb(*query, list);
144  }
145  bool Finish(shared_ptr<CCassQuery> query, CCassQueryList& list, size_t /*qry_index*/) override {
148  m_is_finished = true;
149  if (m_finish_cb)
150  m_finish_cb(*query, list, !m_is_failed);
151  return true;
152  }
153  bool ProcessRow(shared_ptr<CCassQuery>, CCassQueryList&, size_t /*qry_index*/) override {
154  assert(false);
155  return true;
156  }
157  void Reset(shared_ptr<CCassQuery>, CCassQueryList&, size_t /*qry_index*/) override {
158  }
159  void Failed(shared_ptr<CCassQuery>, CCassQueryList&, size_t /*qry_index*/, const exception*) override {
160  m_is_failed = true;
161  }
162 private:
164  function<void(CCassQuery& query, CCassQueryList& list, bool succeeded)> m_finish_cb;
168 };
169 
171 
172 #endif
#define END_IDBLOB_SCOPE
Definition: IdCassScope.hpp:40
#define BEGIN_IDBLOB_SCOPE
Definition: IdCassScope.hpp:39
BEGIN_IDBLOB_SCOPE USING_NCBI_SCOPE
function< void()> TCassQueryListTickCB
CCassOneExecConsumer(CCassOneExecConsumer &&)=delete
bool Start(shared_ptr< CCassQuery > query, CCassQueryList &list, size_t) override
CCassOneExecConsumer & operator=(const CCassOneExecConsumer &)=delete
void Reset(shared_ptr< CCassQuery >, CCassQueryList &, size_t) override
bool ProcessRow(shared_ptr< CCassQuery >, CCassQueryList &, size_t) override
CCassOneExecConsumer(function< bool(CCassQuery &query, CCassQueryList &list)> cb, function< void(CCassQuery &query, CCassQueryList &list, bool succeeded)> finish_cb=nullptr)
function< bool(CCassQuery &query, CCassQueryList &list)> m_cb
function< void(CCassQuery &query, CCassQueryList &list, bool succeeded)> m_finish_cb
void Failed(shared_ptr< CCassQuery >, CCassQueryList &, size_t, const exception *) override
CCassOneExecConsumer & operator=(CCassOneExecConsumer &&)=delete
bool Finish(shared_ptr< CCassQuery > query, CCassQueryList &list, size_t) override
CCassOneExecConsumer(const CCassOneExecConsumer &)=delete
weak_ptr< CCassQueryList > m_query_list
virtual void OnData() override
CQryNotification(shared_ptr< CCassQueryList > query_list, size_t index)
CCassQueryList::CQryNotification.
string ToString() const
vector< shared_ptr< CQryNotification > > m_notification_arr
void DetachSlot(SQrySlot *slot)
static constexpr const unsigned int kResetRelaxTime
CCassQueryList & SetKeyspace(const string &keyspace)
atomic< thread::id > m_owning_thread
SQrySlot * CheckSlot(size_t index, bool discard)
void AttachSlot(SQrySlot *slot, SPendingSlot &&pending_slot)
void Release(SQrySlot *slot)
static constexpr const size_t kNotifyQueueLen
atomic_bool m_yield_in_progress
static constexpr const uint64_t kReadyPushWaitTimeout
vector< SPendingSlot > m_pending_arr
virtual ~CCassQueryList()
atomic_size_t m_attached_slots
weak_ptr< CCassQueryList > m_self_weak
void Execute(unique_ptr< ICassQueryListConsumer > consumer, int retry_count, bool post_async=false)
static shared_ptr< CCassQueryList > Create(shared_ptr< CCassConnection > cass_conn) noexcept
size_t NumberOfActiveQueries() const
shared_ptr< CCassConnection > m_cass_conn
shared_ptr< CCassQuery > Extract(size_t slot_index)
size_t NumberOfBusySlots() const
vector< SQrySlot > m_query_arr
void CheckPending(SQrySlot *slot)
string GetKeyspace() const
size_t GetMaxQueries() const
void ReadRows(SQrySlot *slot)
TCassQueryListTickCB m_tick_cb
SQrySlot * CheckSlots(bool discard, bool wait=true)
size_t NumberOfPendingSlots() const
CCassQueryList & SetTickCB(TCassQueryListTickCB cb)
bool HasError() const
static string SQrySlotStateStr[ssLast]
CCassQueryList.
CCassQueryList & SetMaxQueries(size_t max_queries)
static constexpr const unsigned int kDfltMaxQuery
static constexpr const uint64_t kReadyPopWaitTimeout
void Yield(bool wait)
void Cancel(const exception *e=nullptr)
mpmc_bounded_queue_w< size_t, kNotifyQueueLen > m_ready
ICassQueryListConsumer & operator=(ICassQueryListConsumer &&)=delete
virtual bool Finish(shared_ptr< CCassQuery >, CCassQueryList &, size_t)
ICassQueryListConsumer & operator=(const ICassQueryListConsumer &)=delete
virtual bool Start(shared_ptr< CCassQuery > query, CCassQueryList &list, size_t query_idx)=0
virtual bool ProcessRow(shared_ptr< CCassQuery > query, CCassQueryList &list, size_t query_idx)=0
ICassQueryListConsumer(const ICassQueryListConsumer &)=delete
virtual ~ICassQueryListConsumer()=default
ICassQueryListConsumer(ICassQueryListConsumer &&)=delete
ICassQueryListConsumer()=default
virtual void Failed(shared_ptr< CCassQuery > query, CCassQueryList &list, size_t query_idx, const exception *e)=0
virtual void Reset(shared_ptr< CCassQuery >, CCassQueryList &, size_t)
#define false
Definition: bool.h:36
#define bool
Definition: bool.h:34
Uint8 uint64_t
#define assert(x)
Definition: srv_diag.hpp:58
unique_ptr< ICassQueryListConsumer > m_consumer
shared_ptr< CCassQuery > m_qry
unique_ptr< ICassQueryListConsumer > m_consumer
static string query
Modified on Fri Sep 20 14:57:05 2024 by modify_doxy.py rev. 669887