NCBI C++ ToolKit
plan.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /*****************************************************************************
2  * $Id: plan.cpp 103120 2024-09-11 14:33:06Z saprykin $
3  * ===========================================================================
4  *
5  * PUBLIC DOMAIN NOTICE
6  * National Center for Biotechnology Information
7  *
8  * This software/database is a "United States Government Work" under the
9  * terms of the United States Copyright Act. It was written as part of
10  * the author's official duties as a United States Government employee and
11  * thus cannot be copyrighted. This software/database is freely available
12  * to the public for use. The National Library of Medicine and the U.S.
13  * Government have not placed any restriction on its use or reproduction.
14  *
15  * Although all reasonable efforts have been taken to ensure the accuracy
16  * and reliability of the software and data, the NLM and the U.S.
17  * Government do not and cannot warrant the performance or results that
18  * may be obtained by using this software or data. The NLM and the U.S.
19  * Government disclaim all warranties, express or implied, including
20  * warranties of performance, merchantability or fitness for any particular
21  * purpose.
22  *
23  * Please cite the author in any work or product based on this material.
24  *
25  * Db Cassandra: class generating execution plans for cassandra table fullscans.
26  *
27  */
28 
29 #include <ncbi_pch.hpp>
30 
32 
33 #include <vector>
34 #include <string>
35 #include <utility>
36 #include <cmath>
37 #include <memory>
38 #include <algorithm>
39 
40 #include <corelib/ncbistr.hpp>
41 
44 
45 namespace {
46 
47 // In multidc environment size estimates are incomplete
48 // (contain data for local primary ranges only).
49 // We will fill gaps using neighboring ranges data.
50 vector<SCassSizeEstimate> NormalizeSizeEstimates(vector<SCassSizeEstimate> const & input)
51 {
52  vector<SCassSizeEstimate> output;
53  output.reserve(input.size() * 3);
54  int64_t current_range_start = numeric_limits<int64_t>::min();
55  for (auto const & estimate : input) {
56  if (estimate.range_start > current_range_start) {
57  SCassSizeEstimate new_estimate;
58  new_estimate.range_start = current_range_start;
59  new_estimate.range_end = estimate.range_start;
60  new_estimate.mean_partition_size = estimate.mean_partition_size;
61  double size_ratio =
62  (1.0 * (new_estimate.range_end - new_estimate.range_start)) /
63  (estimate.range_end - estimate.range_start);
64  new_estimate.partitions_count = static_cast<int64_t>(size_ratio * estimate.partitions_count);
65  output.push_back(new_estimate);
66  }
67  current_range_start = estimate.range_end;
68  output.push_back(estimate);
69  }
70  if (current_range_start < numeric_limits<int64_t>::max() && !input.empty()) {
71  SCassSizeEstimate new_estimate;
72  new_estimate.range_start = current_range_start;
73  new_estimate.range_end = numeric_limits<int64_t>::max();
74  auto last = input.rbegin();
75  new_estimate.mean_partition_size = last->mean_partition_size;
76  double size_ratio =
77  (1.0 * (new_estimate.range_end - new_estimate.range_start)) /
78  (last->range_end - last->range_start);
79  new_estimate.partitions_count = static_cast<int64_t>(size_ratio * last->partitions_count);
80  output.push_back(new_estimate);
81  }
82  return output;
83 }
84 
85 // @todo in the future leave for Debug only
86 void VerifySizeEstimates(vector<SCassSizeEstimate> const & estimates)
87 {
88  if (
89  estimates.empty()
90  || estimates.cbegin()->range_start != numeric_limits<int64_t>::min()
91  || estimates.crbegin()->range_end != numeric_limits<int64_t>::max()
92  ) {
93  NCBI_THROW(CCassandraException, eFatal, "Token range size estimates empty or have wrong min/max");
94  }
95  for (auto itr = estimates.cbegin(); itr != estimates.cend(); ++itr) {
96  if (itr->range_start >= itr->range_end) {
97  NCBI_THROW_FMT(CCassandraException, eFatal, "Token range has wrong borders: "
98  << itr->range_start << ":" << itr->range_end);
99  }
100  if (itr->partitions_count < 0) {
101  NCBI_THROW_FMT(CCassandraException, eFatal, "Token range has wrong partitions_count: "
102  << itr->range_start << ":" << itr->range_end << " - " << itr->partitions_count);
103  }
104  auto next = itr + 1;
105  if (next != estimates.cend() && itr->range_end != next->range_start) {
106  NCBI_THROW_FMT(CCassandraException, eFatal, "Adjacent ranges have gap: "
107  << itr->range_start << ":" << itr->range_end
108  << " and " << next->range_start << ":" << next->range_end);
109  }
110  }
111 }
112 
113 }
114 
116 
117 CCassandraFullscanPlan& CCassandraFullscanPlan::SetConnection(shared_ptr<CCassConnection> connection)
118 {
119  swap(m_Connection, connection);
120  return *this;
121 }
122 
124 {
125  m_FieldList = std::move(fields);
126  return *this;
127 }
128 
130 {
131  m_WhereFilter = where_filter;
132  return *this;
133 }
134 
136 {
138  return *this;
139 }
140 
142 {
143  m_Keyspace = keyspace;
144  return *this;
145 }
146 
148 {
149  m_Table = table;
150  return *this;
151 }
152 
154 {
155  if (value < 0) {
156  ERR_POST(Warning << "CCassandraFullscanPlanner::SetPartitionCountPerQueryLimit - wrong value ignored '" << value << "'");
157  } else {
159  }
160  return *this;
161 }
162 
164 {
166 }
167 
169 {
170  string datacenter, schema, schema_bytes;
171  int64_t peers_count{0}, partition_count{0};
172 
173  shared_ptr<CCassQuery> query = m_Connection->NewQuery();
174  query->SetSQL("SELECT data_center, schema_version, uuidAsBlob(schema_version) FROM system.local", 0);
175  query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
176  query->NextRow();
177  datacenter = query->FieldGetStrValue(0);
178  schema = query->FieldGetStrValue(1);
179  schema_bytes = query->FieldGetStrValue(2);
180  ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Datacenter '" << datacenter << "'");
181  ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Schema '" << schema << "'");
182  ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Bytes size " << schema_bytes.size());
183 
184  query = m_Connection->NewQuery();
185  query->SetSQL("SELECT count(*) FROM system.peers WHERE data_center = ? and schema_version = ? ALLOW FILTERING", 2);
186  query->BindStr(0, datacenter);
187  query->BindBytes(1, reinterpret_cast<const unsigned char*>(schema_bytes.c_str()), schema_bytes.size());
188  query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
189  query->NextRow();
190  peers_count = query->FieldGetInt64Value(0, 0);
191  ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - Peers count '" << peers_count << "'");
192 
193  query = m_Connection->NewQuery();
194  query->SetSQL("SELECT partitions_count FROM system.size_estimates WHERE table_name = ? AND keyspace_name = ?", 2);
195  query->BindStr(0, m_Table);
196  query->BindStr(1, m_Keyspace);
197  query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
198  while (query->NextRow() == ar_dataready) {
199  partition_count += query->FieldGetInt64Value(0);
200  }
201 
202  ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - "
203  "Local rows estimate - '" << partition_count << "'");
204  ERR_POST(Trace << "CCassandraFullscanPlanner::GetTableRowsCountEstimate - "
205  "Total rows estimate - '" << partition_count * (peers_count + 1) << "'");
206 
207  return partition_count * (peers_count + 1);
208 }
209 
211 {
212  shared_ptr<CCassQuery> query;
213  if (m_TokenRanges.empty()) {
214  return nullptr;
215  } else if (m_TokenRanges.size() == 1 && m_TokenRanges[0].first == 0 && m_TokenRanges[0].second == 0) {
216  query = m_Connection->NewQuery();
217  query->SetSQL(m_SqlTemplate, 0);
218  } else {
219  query = m_Connection->NewQuery();
220  query->SetSQL(m_SqlTemplate, 2);
221  query->BindInt64(0, m_TokenRanges.back().first);
222  query->BindInt64(1, m_TokenRanges.back().second);
223  }
224  m_TokenRanges.pop_back();
225  return query;
226 }
227 
229 {
230  return m_TokenRanges.size();
231 }
232 
234 {
235  auto local_dc = m_Connection->GetDatacenterName();
236  ERR_POST(Trace << "CCassandraFullscanPlan::SplitTokenRangesForLimits - "
237  "Local dc - '" << local_dc << "'");
238  auto local_estimates = m_Connection->GetSizeEstimates(local_dc, m_Keyspace, m_Table);
239  ERR_POST(Trace << "CCassandraFullscanPlan::SplitTokenRangesForLimits - "
240  "Local estimates size - '" << local_estimates.size() << "'");
241  auto estimates = NormalizeSizeEstimates(local_estimates);
242 
243  // Additional verification of estimates data
244  VerifySizeEstimates(estimates);
245 
246  auto search_start = estimates.begin();
247  CCassConnection::TTokenRanges result_ranges;
248  for (auto const & range : m_TokenRanges) {
249  auto range_start = range.first;
250  auto range_end = range.second;
251 
252  // Search for first intersecting range
253  auto itr = search_start;
254  while (itr != estimates.end() && itr->range_end <= range_start) {
255  ++itr;
256  }
257  search_start = itr;
258 
259  // Counting total partitions from size estimates
260  int64_t partitions_count{0};
261  while (itr != estimates.end() && itr->range_start < range_end) {
262  auto intersect_start = max(itr->range_start, range_start);
263  auto intersect_end = min(itr->range_end, range_end);
264  double size_ratio =
265  (1.0 * (intersect_end - intersect_start)) /
266  (itr->range_end - itr->range_start);
267  partitions_count += size_ratio * itr->partitions_count;
268  ++itr;
269  }
270 
271  // Split token range if required
272  if (partitions_count > m_PartitionCountPerQueryLimit) {
273  int64_t parts = static_cast<int64_t>(ceil(1.0 * partitions_count / m_PartitionCountPerQueryLimit));
274  if (parts > 1) {
275  int64_t step = (range_end - range_start) / parts;
276  assert(step > 0);
277  auto start = range_start;
278  while (start < range_end) {
279  auto end = (range_end - start) < step ? range_end : (start + step);
280  result_ranges.push_back(make_pair(start, end));
281  start = end;
282  }
283  } else {
284  result_ranges.push_back(range);
285  }
286  } else {
287  result_ranges.push_back(range);
288  }
289  }
290  swap(result_ranges, m_TokenRanges);
291 }
292 
294 {
295  if (!m_Connection || m_Keyspace.empty() || m_Table.empty()) {
296  NCBI_THROW(CCassandraException, eSeqFailed, "Invalid sequence of operations, connection should be provided");
297  }
298 
299  m_TokenRanges.clear();
301  m_SqlTemplate = "SELECT " + NStr::Join(m_FieldList, ", ") + " FROM " + m_Keyspace + "." + m_Table;
302  if (!m_WhereFilter.empty()) {
303  m_SqlTemplate += " WHERE " + m_WhereFilter + " ALLOW FILTERING";
304  }
305  m_TokenRanges.push_back(make_pair(0, 0));
306  } else {
307  vector<string> partition_fields = m_Connection->GetPartitionKeyColumnNames(m_Keyspace, m_Table);
308  m_Connection->GetTokenRanges(m_TokenRanges);
311  }
312  string partition = NStr::Join(partition_fields, ",");
313  m_SqlTemplate = "SELECT " + NStr::Join(m_FieldList, ", ") + " FROM "
314  + m_Keyspace + "." + m_Table + " WHERE TOKEN(" + partition + ") > ? AND TOKEN(" + partition + ") <= ?";
315  if (!m_WhereFilter.empty()) {
316  m_SqlTemplate += " AND " + m_WhereFilter + " ALLOW FILTERING";
317  }
318  ERR_POST(Trace << "CCassandraFullscanPlanner::Generate - Sql template = '" << m_SqlTemplate << "'");
319  }
320 }
321 
323 {
324  return m_TokenRanges;
325 }
326 
#define END_IDBLOB_SCOPE
Definition: IdCassScope.hpp:40
#define BEGIN_IDBLOB_SCOPE
Definition: IdCassScope.hpp:39
@ ar_dataready
Definition: cass_driver.hpp:70
vector< pair< TTokenValue, TTokenValue > > TTokenRanges
void SplitTokenRangesForLimits()
Definition: plan.cpp:233
CCassandraFullscanPlan & SetConnection(shared_ptr< CCassConnection > connection)
Definition: plan.cpp:117
CCassandraFullscanPlan & SetWhereFilter(string const &where_filter)
Definition: plan.cpp:129
void Generate() override
Definition: plan.cpp:293
size_t GetMinPartitionsForSubrangeScan()
Definition: plan.cpp:163
TQueryPtr GetNextQuery() override
Definition: plan.cpp:210
size_t m_MinPartitionsForSubrangeScan
Definition: plan.hpp:104
size_t GetPartitionCountEstimate()
Definition: plan.cpp:168
shared_ptr< CCassQuery > TQueryPtr
Definition: plan.hpp:59
shared_ptr< CCassConnection > m_Connection
Definition: plan.hpp:97
size_t GetQueryCount() const override
Definition: plan.cpp:228
int64_t m_PartitionCountPerQueryLimit
Definition: plan.hpp:105
CCassandraFullscanPlan & SetTable(string const &table)
Definition: plan.cpp:147
CCassandraFullscanPlan & SetFieldList(vector< string > fields)
Definition: plan.cpp:123
CCassandraFullscanPlan & SetMinPartitionsForSubrangeScan(size_t value)
Definition: plan.cpp:135
vector< string > m_FieldList
Definition: plan.hpp:98
CCassandraFullscanPlan & SetKeyspace(string const &keyspace)
Definition: plan.cpp:141
CCassandraFullscanPlan & SetPartitionCountPerQueryLimit(int64_t value)
Definition: plan.cpp:153
CCassConnection::TTokenRanges & GetTokenRanges()
Definition: plan.cpp:322
CCassConnection::TTokenRanges m_TokenRanges
Definition: plan.hpp:103
The NCBI C++ standard methods for dealing with std::string.
@ eFatal
static DLIST_TYPE *DLIST_NAME() last(DLIST_LIST_TYPE *list)
Definition: dlist.tmpl.h:51
static DLIST_TYPE *DLIST_NAME() next(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
Definition: dlist.tmpl.h:56
static SQLCHAR output[256]
Definition: print.c:5
static const char * schema
Definition: stats.c:20
Int8 int64_t
void swap(NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair1, NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair2)
Definition: ncbimisc.hpp:1508
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Trace(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1179
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
Definition: ncbiexpt.hpp:719
static string Join(const TContainer &arr, const CTempString &delim)
Join strings using the specified delimiter.
Definition: ncbistr.hpp:2699
<!DOCTYPE HTML >< html > n< header > n< title > PubSeq Gateway Help Page</title > n< style > n table
static int input()
range(_Ty, _Ty) -> range< _Ty >
const GenericPointer< typename T::ValueType > T2 value
Definition: pointer.h:1227
T max(T x_, T y_)
T min(T x_, T y_)
BEGIN_IDBLOB_SCOPE USING_NCBI_SCOPE
Definition: plan.cpp:43
#define assert(x)
Definition: srv_diag.hpp:58
int64_t mean_partition_size
int64_t partitions_count
static string query
Modified on Fri Sep 20 14:57:53 2024 by modify_doxy.py rev. 669887