50 vector<SCassSizeEstimate> NormalizeSizeEstimates(vector<SCassSizeEstimate>
const &
input)
52 vector<SCassSizeEstimate>
output;
55 for (
auto const & estimate :
input) {
56 if (estimate.range_start > current_range_start) {
59 new_estimate.
range_end = estimate.range_start;
63 (estimate.range_end - estimate.range_start);
65 output.push_back(new_estimate);
67 current_range_start = estimate.range_end;
68 output.push_back(estimate);
78 (
last->range_end -
last->range_start);
80 output.push_back(new_estimate);
86 void VerifySizeEstimates(vector<SCassSizeEstimate>
const & estimates)
95 for (
auto itr = estimates.cbegin(); itr != estimates.cend(); ++itr) {
96 if (itr->range_start >= itr->range_end) {
98 << itr->range_start <<
":" << itr->range_end);
100 if (itr->partitions_count < 0) {
102 << itr->range_start <<
":" << itr->range_end <<
" - " << itr->partitions_count);
105 if (
next != estimates.cend() && itr->range_end !=
next->range_start) {
107 << itr->range_start <<
":" << itr->range_end
108 <<
" and " <<
next->range_start <<
":" <<
next->range_end);
156 ERR_POST(
Warning <<
"CCassandraFullscanPlanner::SetPartitionCountPerQueryLimit - wrong value ignored '" <<
value <<
"'");
170 string datacenter,
schema, schema_bytes;
171 int64_t peers_count{0}, partition_count{0};
174 query->SetSQL(
"SELECT data_center, schema_version, uuidAsBlob(schema_version) FROM system.local", 0);
175 query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE,
false,
false);
177 datacenter =
query->FieldGetStrValue(0);
179 schema_bytes =
query->FieldGetStrValue(2);
180 ERR_POST(
Trace <<
"CCassandraFullscanPlanner::GetTableRowsCountEstimate - Datacenter '" << datacenter <<
"'");
181 ERR_POST(
Trace <<
"CCassandraFullscanPlanner::GetTableRowsCountEstimate - Schema '" <<
schema <<
"'");
182 ERR_POST(
Trace <<
"CCassandraFullscanPlanner::GetTableRowsCountEstimate - Bytes size " << schema_bytes.size());
185 query->SetSQL(
"SELECT count(*) FROM system.peers WHERE data_center = ? and schema_version = ? ALLOW FILTERING", 2);
186 query->BindStr(0, datacenter);
187 query->BindBytes(1,
reinterpret_cast<const unsigned char*
>(schema_bytes.c_str()), schema_bytes.size());
188 query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE,
false,
false);
190 peers_count =
query->FieldGetInt64Value(0, 0);
191 ERR_POST(
Trace <<
"CCassandraFullscanPlanner::GetTableRowsCountEstimate - Peers count '" << peers_count <<
"'");
194 query->SetSQL(
"SELECT partitions_count FROM system.size_estimates WHERE table_name = ? AND keyspace_name = ?", 2);
197 query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE,
false,
false);
199 partition_count +=
query->FieldGetInt64Value(0);
202 ERR_POST(
Trace <<
"CCassandraFullscanPlanner::GetTableRowsCountEstimate - "
203 "Local rows estimate - '" << partition_count <<
"'");
204 ERR_POST(
Trace <<
"CCassandraFullscanPlanner::GetTableRowsCountEstimate - "
205 "Total rows estimate - '" << partition_count * (peers_count + 1) <<
"'");
207 return partition_count * (peers_count + 1);
212 shared_ptr<CCassQuery>
query;
236 ERR_POST(
Trace <<
"CCassandraFullscanPlan::SplitTokenRangesForLimits - "
237 "Local dc - '" << local_dc <<
"'");
239 ERR_POST(
Trace <<
"CCassandraFullscanPlan::SplitTokenRangesForLimits - "
240 "Local estimates size - '" << local_estimates.size() <<
"'");
241 auto estimates = NormalizeSizeEstimates(local_estimates);
244 VerifySizeEstimates(estimates);
246 auto search_start = estimates.begin();
249 auto range_start =
range.first;
250 auto range_end =
range.second;
253 auto itr = search_start;
254 while (itr != estimates.end() && itr->range_end <= range_start) {
261 while (itr != estimates.end() && itr->range_start < range_end) {
262 auto intersect_start =
max(itr->range_start, range_start);
263 auto intersect_end =
min(itr->range_end, range_end);
265 (1.0 * (intersect_end - intersect_start)) /
266 (itr->range_end - itr->range_start);
267 partitions_count += size_ratio * itr->partitions_count;
275 int64_t step = (range_end - range_start) / parts;
277 auto start = range_start;
278 while (start < range_end) {
279 auto end = (range_end - start) < step ? range_end : (start + step);
280 result_ranges.push_back(make_pair(start, end));
284 result_ranges.push_back(
range);
287 result_ranges.push_back(
range);
312 string partition =
NStr::Join(partition_fields,
",");
314 +
m_Keyspace +
"." +
m_Table +
" WHERE TOKEN(" + partition +
") > ? AND TOKEN(" + partition +
") <= ?";
#define BEGIN_IDBLOB_SCOPE
vector< pair< TTokenValue, TTokenValue > > TTokenRanges
void SplitTokenRangesForLimits()
CCassandraFullscanPlan & SetConnection(shared_ptr< CCassConnection > connection)
CCassandraFullscanPlan & SetWhereFilter(string const &where_filter)
size_t GetMinPartitionsForSubrangeScan()
TQueryPtr GetNextQuery() override
size_t m_MinPartitionsForSubrangeScan
size_t GetPartitionCountEstimate()
shared_ptr< CCassQuery > TQueryPtr
shared_ptr< CCassConnection > m_Connection
size_t GetQueryCount() const override
int64_t m_PartitionCountPerQueryLimit
CCassandraFullscanPlan & SetTable(string const &table)
CCassandraFullscanPlan & SetFieldList(vector< string > fields)
CCassandraFullscanPlan & SetMinPartitionsForSubrangeScan(size_t value)
vector< string > m_FieldList
CCassandraFullscanPlan & SetKeyspace(string const &keyspace)
CCassandraFullscanPlan & SetPartitionCountPerQueryLimit(int64_t value)
CCassConnection::TTokenRanges & GetTokenRanges()
CCassConnection::TTokenRanges m_TokenRanges
The NCBI C++ standard methods for dealing with std::string.
static DLIST_TYPE *DLIST_NAME() last(DLIST_LIST_TYPE *list)
static DLIST_TYPE *DLIST_NAME() next(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
static SQLCHAR output[256]
static const char * schema
void swap(NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair1, NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair2)
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
void Trace(CExceptionArgs_Base &args)
void Warning(CExceptionArgs_Base &args)
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
static string Join(const TContainer &arr, const CTempString &delim)
Join strings using the specified delimiter.
<!DOCTYPE HTML >< html > n< header > n< title > PubSeq Gateway Help Page</title > n< style > n table
range(_Ty, _Ty) -> range< _Ty >
const GenericPointer< typename T::ValueType > T2 value
BEGIN_IDBLOB_SCOPE USING_NCBI_SCOPE
int64_t mean_partition_size