NCBI C++ ToolKit
runner.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /*****************************************************************************
2  * $Id: runner.cpp 100970 2023-10-06 14:32:13Z saprykin $
3  * ===========================================================================
4  *
5  * PUBLIC DOMAIN NOTICE
6  * National Center for Biotechnology Information
7  *
8  * This software/database is a "United States Government Work" under the
9  * terms of the United States Copyright Act. It was written as part of
10  * the author's official duties as a United States Government employee and
11  * thus cannot be copyrighted. This software/database is freely available
12  * to the public for use. The National Library of Medicine and the U.S.
13  * Government have not placed any restriction on its use or reproduction.
14  *
15  * Although all reasonable efforts have been taken to ensure the accuracy
16  * and reliability of the software and data, the NLM and the U.S.
17  * Government do not and cannot warrant the performance or results that
18  * may be obtained by using this software or data. The NLM and the U.S.
19  * Government disclaim all warranties, express or implied, including
20  * warranties of performance, merchantability or fitness for any particular
21  * purpose.
22  *
23  * Please cite the author in any work or product based on this material.
24  *
25  * Db Cassandra: class managing Cassandra fullscans.
26  *
27  */
28 
29 #include <ncbi_pch.hpp>
30 
32 
33 #include <memory>
34 #include <utility>
35 #include <vector>
36 #include <string>
37 #include <thread>
38 
39 #include <corelib/ncbistr.hpp>
40 #include "worker.hpp"
41 
44 
45 const unsigned int CCassandraFullscanRunner::kPageSizeDefault = 4096;
48 
51 {
53  return *this;
54 }
55 
57 {
59  return *this;
60 }
61 
63 {
64  if (value > 0) {
65  m_PageSize = value;
66  }
67  return *this;
68 }
69 
71 {
72  if (value > 0) {
74  }
75  return *this;
76 }
77 
79  TCassandraFullscanConsumerFactory consumer_factory
80 )
81 {
82  m_ConsumerFactory = move(consumer_factory);
83  return *this;
84 }
85 
87 {
88  m_ConsumerCreationPolicy = policy;
89  return *this;
90 }
91 
93  unique_ptr<ICassandraFullscanPlan> plan
94 )
95 {
96  if (m_ExecutionPlan) {
97  NCBI_THROW(CCassandraException, eSeqFailed,
98  "Invalid sequence of operations, execution plan should not be overriden"
99  );
100  }
101  m_ExecutionPlan = move(plan);
102  return *this;
103 }
104 
106  unsigned int max_retry_count
107 )
108 {
109  m_MaxRetryCount = max_retry_count;
110  return *this;
111 }
112 
114 {
115  if (!m_ExecutionPlan) {
116  NCBI_THROW(CCassandraException, eSeqFailed,
117  "Invalid sequence of operations, execution plan should be provided"
118  );
119  }
120  m_ExecutionPlan->Generate();
122 
123  size_t thread_count = min(m_ThreadCount, plan->GetQueryCount());
124  thread_count = max(thread_count, 1UL);
125  if (thread_count > m_MaxActiveStatements) {
126  NCBI_THROW(CCassandraException, eSeqFailed,
127  "Invalid sequence of operations. Thread count is greater than max_active_statements total."
128  );
129  }
130  vector<thread> worker_threads;
131  vector<CCassandraFullscanWorker> workers;
132  CFastMutex plan_mutex;
133  worker_threads.reserve(thread_count - 1);
134  workers.reserve(thread_count);
135 
137  if (thread_count > 1) {
138  task_provider = [&plan_mutex, plan]() -> CCassandraFullscanPlan::TQueryPtr {
140  {
141  CFastMutexGuard _(plan_mutex);
142  query = plan->GetNextQuery();
143  }
144  return query;
145  };
146  } else {
147  task_provider = [plan]() -> CCassandraFullscanPlan::TQueryPtr {
148  return plan->GetNextQuery();
149  };
150  }
151 
152  auto consumer_factory_fn =
153  [this]() -> unique_ptr<ICassandraFullscanConsumer> {
154  lock_guard _(m_ConsumerFactoryMutex);
155  return m_ConsumerFactory();
156  };
157  // Creating workers
158  for (size_t i = 0; i < thread_count; ++i) {
160  worker
165  .SetConsumerFactory(consumer_factory_fn)
167  .SetTaskProvider(task_provider);
168  workers.emplace_back(move(worker));
169  }
170 
171  // Starting threads for all except the last worker
172  for (size_t i = 0; i < workers.size() - 1; ++i) {
173  worker_threads.emplace_back(
174  [i, &workers]() {
175  workers[i]();
176  }
177  );
178  }
179 
180  // Executing last worker in the current thread
181  workers[workers.size() - 1]();
182 
183  // Waiting for threads of all except the last worker
184  for (thread& t : worker_threads) {
185  t.join();
186  }
187  worker_threads.clear();
188 
189  bool all_finished = true;
190  for (size_t i = 0; i < workers.size(); ++i) {
191  if (workers[i].HadError()) {
193  "Fullscan failed: one of workers got exception with message - " + workers[i].GetFirstError()
194  );
195  }
196  if (!workers[i].IsFinished()) {
197  all_finished = false;
198  }
199  }
200  workers.clear();
201  return all_finished;
202 }
203 
#define END_IDBLOB_SCOPE
Definition: IdCassScope.hpp:40
#define BEGIN_IDBLOB_SCOPE
Definition: IdCassScope.hpp:39
shared_ptr< CCassQuery > TQueryPtr
Definition: plan.hpp:59
static const unsigned int kMaxRetryCountDefault
Definition: runner.hpp:52
CCassandraFullscanRunner & SetConsumerFactory(TCassandraFullscanConsumerFactory consumer_factory)
Definition: runner.cpp:78
unsigned int m_MaxActiveStatements
Definition: runner.hpp:74
unsigned int m_PageSize
Definition: runner.hpp:73
TCassandraFullscanConsumerFactory m_ConsumerFactory
Definition: runner.hpp:75
unsigned int m_MaxRetryCount
Definition: runner.hpp:78
CCassandraFullscanRunner & SetConsumerCreationPolicy(ECassandraFullscanConsumerPolicy policy)
Definition: runner.cpp:86
CCassandraFullscanRunner & SetConsistency(CassConsistency value)
Definition: runner.cpp:56
CCassandraFullscanRunner & SetMaxActiveStatements(unsigned int value)
Definition: runner.cpp:70
static const unsigned int kMaxActiveStatementsDefault
Definition: runner.hpp:51
static const unsigned int kPageSizeDefault
Definition: runner.hpp:50
CCassandraFullscanRunner & SetThreadCount(size_t value)
Definition: runner.cpp:50
CCassandraFullscanRunner & SetMaxRetryCount(unsigned int max_retry_count)
Definition: runner.cpp:105
CCassandraFullscanRunner & SetExecutionPlan(unique_ptr< ICassandraFullscanPlan > plan)
Definition: runner.cpp:92
ECassandraFullscanConsumerPolicy m_ConsumerCreationPolicy
Definition: runner.hpp:79
TCassConsistency m_Consistency
Definition: runner.hpp:72
unique_ptr< ICassandraFullscanPlan > m_ExecutionPlan
Definition: runner.hpp:77
CCassandraFullscanRunner & SetPageSize(unsigned int value)
Definition: runner.cpp:62
CCassandraFullscanWorker & SetMaxRetryCount(unsigned int max_retry_count)
Definition: worker.cpp:87
function< CCassandraFullscanPlan::TQueryPtr()> TTaskProvider
Definition: worker.hpp:103
CCassandraFullscanWorker & SetPageSize(unsigned int value)
Definition: worker.cpp:51
CCassandraFullscanWorker & SetMaxActiveStatements(unsigned int value)
Definition: worker.cpp:59
CCassandraFullscanWorker & SetConsumerCreationPolicy(ECassandraFullscanConsumerPolicy policy)
Definition: worker.cpp:75
CCassandraFullscanWorker & SetTaskProvider(TTaskProvider provider)
Definition: worker.cpp:81
CCassandraFullscanWorker & SetConsumerFactory(TCassandraFullscanConsumerFactory consumer_factory)
Definition: worker.cpp:67
CCassandraFullscanWorker & SetConsistency(CassConsistency value)
Definition: worker.cpp:45
CFastMutex –.
Definition: ncbimtx.hpp:667
virtual size_t GetQueryCount() const =0
virtual TQueryPtr GetNextQuery()=0
char value[7]
Definition: config.c:431
function< unique_ptr< ICassandraFullscanConsumer >()> TCassandraFullscanConsumerFactory
Definition: consumer.hpp:72
ECassandraFullscanConsumerPolicy
Definition: consumer.hpp:44
The NCBI C++ standard methods for dealing with std::string.
#define _(proto)
Definition: ct_nlmzip_i.h:78
@ eFatal
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
int i
EIPRangeType t
Definition: ncbi_localip.c:101
T max(T x_, T y_)
T min(T x_, T y_)
BEGIN_IDBLOB_SCOPE USING_NCBI_SCOPE
Definition: runner.cpp:43
static string query
Modified on Tue Dec 05 02:19:16 2023 by modify_doxy.py rev. 669887