NCBI C++ ToolKit
ncbi_pool_balancer.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: ncbi_pool_balancer.cpp 100872 2023-09-21 19:02:29Z ucko $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Aaron Ucko
27 *
28 * File Description:
29 * Help distribute connections within a pool across servers.
30 *
31 * ===========================================================================
32 */
33 
34 #include <ncbi_pch.hpp>
35 
38 #include <corelib/ncbierror.hpp>
39 #include <corelib/error_codes.hpp>
40 
41 #include <numeric>
42 #include <random>
43 
44 #define NCBI_USE_ERRCODE_X Corelib_Balancer
45 
47 
48 DEFINE_STATIC_FAST_MUTEX(s_RandomMutex);
50 static bool s_RandomnessSeeded;
51 
53 {
54 public:
56  : m_MutexGuard(s_RandomMutex)
57  {
58  if ( !s_RandomnessSeeded ) {
59  random_device rdev;
60  s_RandomEngine->seed(rdev());
61  s_RandomnessSeeded = true;
62  }
63  }
64 
65 private:
67 };
68 
69 
70 CPoolBalancer::CPoolBalancer(const string& service_name,
71  const IDBServiceMapper::TOptions& options,
72  TFlags flags)
73  : m_ServiceName(service_name), m_TotalCount(0U), m_Flags(flags)
74 {
75  for (auto it : options) {
76  CTempString name = it->GetName();
77  CEndpointKey key(it->GetHost(), it->GetPort());
78  if (key == 0 && name != service_name) {
79  key = x_NameToKey(name);
80  if (key != 0) {
81  if ((flags & fIgnoreRawIPs) && name == it->GetName() ) {
82  continue;
83  }
84  it.Reset(new CDBServerOption(name, key.GetHost(),
85  key.GetPort(),
86  it->GetRanking(), it->GetState(),
87  it->GetExpireTime()));
88  }
89  }
90  _TRACE_X(1,
91  service_name << ": " << key << " -> " << name << " per DBLB");
92 
93  auto endpoint = x_FindEndpoint(key, name);
94  if (endpoint == m_Endpoints.end()) {
96  info.ref = it;
97  endpoint = m_Endpoints.emplace(key, info);
98  }
99 
100  double ranking = it->GetRanking();
101 
102  if (it->IsPenalized()) {
103  ranking *= numeric_limits<double>::epsilon();
104  ++endpoint->second.penalty_level;
105  }
106  if (it->IsExcluded()) {
107  ranking *= numeric_limits<double>::epsilon();
108  ++endpoint->second.penalty_level;
109  }
110 
111  endpoint->second.ref = it;
112  endpoint->second.effective_ranking = ranking;
113  m_Rankings.insert(ranking);
114  }
115 }
116 
117 
119 {
120  if (server.Empty()) {
121  return;
122  }
123  CEndpointKey key(server->GetHost(), server->GetPort());
124  auto it = x_FindEndpoint(key, server->GetName());
125  if (it == m_Endpoints.end()) {
126  return;
127  }
128  m_Rankings.erase(m_Rankings.find(it->second.effective_ranking));
129  ++it->second.penalty_level;
130  it->second.effective_ranking *= numeric_limits<double>::epsilon();
131  m_Rankings.insert(it->second.effective_ranking);
132 }
133 
134 
136 {
137  if (m_TotalCount != 0) {
138  for (auto& it : m_Endpoints) {
139  it.second.actual_count = 0;
140  }
141  m_TotalCount = 0;
142  }
143  double legacy_base_ranking = 0.0;
144  for (const auto& cit : counts) {
145  CTempString name = cit.first;
146  auto key = x_NameToKey(name);
147  auto eit = m_Endpoints.lower_bound(key);
149  if ((eit == m_Endpoints.end()
150  || key.GetHost() != eit->first.GetHost()
151  || (key.GetPort() != 0
152  && key.GetPort() != eit->first.GetPort()))
153  && ( !(m_Flags & fIgnoreRawIPs) || key == 0
154  || name != cit.first)) {
155  _TRACE_X(2,
156  m_ServiceName << ": " << key << " -> " << name
157  << " per existing connection(s)");
158  eit = m_Endpoints.insert(eit, make_pair(key, SEndpointInfo()));
159  }
160  if ( eit != m_Endpoints.end() ) {
161  auto& endpoint = eit->second;
162  if (endpoint.ref.Empty()) {
163  if (legacy_base_ranking == 0.0) {
164  if (m_Rankings.empty()) {
165  legacy_base_ranking = 1.0; // arbitrary >0
166  } else {
167  legacy_base_ranking = 1e-4 * *m_Rankings.begin();
168  }
169  }
170  // scaled Poisson distribution peaking around 4 and 5,
171  // using a hardcoded approximation of log(5).
172  double ranking
173  = (std::exp(1.61 * cit.second - lgamma(cit.second + 1))
174  * legacy_base_ranking);
175 
176  if (exp.IsEmpty()) {
177  exp.SetCurrent();
178  exp.AddSecond(10);
179  }
180  endpoint.ref.Reset(new CDBServerOption
181  (name, key.GetHost(), key.GetPort(),
182  ranking,
184  exp.GetTimeT()));
185  m_Rankings.insert(ranking);
186  endpoint.effective_ranking = ranking;
187  }
188  endpoint.actual_count += cit.second;
189  }
190  m_TotalCount += cit.second;
191  }
192 }
193 
194 
196 {
197  TSvrRef result;
198  CEndpointKey conn_key(0u);
199 
200  // trivial if <= 1 endpoint
201  if (m_Endpoints.empty()) {
202  return result;
203  } else if (m_Endpoints.size() == 1) {
204  return TSvrRef(&*m_Endpoints.begin()->second.ref);
205  }
206 
207  double total_ranking = accumulate(m_Rankings.begin(), m_Rankings.end(),
208  0.0);
209  if (total_ranking <= 0.0) {
210  ERR_POST_X(3, Info << "No positive rankings found");
211  return result;
212  }
213 
214  if (/* m_TotalCount > 1 && */ conn != nullptr && params != nullptr) {
215  *conn = x_TryPool(params);
216  if (*conn != NULL) {
217  const string& server_name = (*conn)->ServerName();
218  Uint4 host = (*conn)->Host();
219  Uint2 port = (*conn)->Port();
220  double excess;
221  bool keep = true;
222  string penalty_note;
223  conn_key = CEndpointKey(host, port);
224  auto it = x_FindEndpoint(conn_key, server_name);
225  if (it == m_Endpoints.end()) {
226  ERR_POST_X(4,
227  "Unrecognized endpoint for existing connection to "
228  << conn_key << " (" << server_name << ')');
229  excess = x_GetCount(params, server_name);
230  time_t t = CurrentTime(CTime::eUTC).GetTimeT() + 10;
231  result.Reset(new CDBServer(server_name, host, port, t));
232  } else {
233  double scale_factor = m_TotalCount / total_ranking;
234  excess = (it->second.actual_count
235  - it->second.effective_ranking * scale_factor);
236  result.Reset(&*it->second.ref);
237  if (it->second.penalty_level > 0) {
238  auto min_penalty = it->second.penalty_level;
239  for (auto it2 : m_Endpoints) {
240  min_penalty = min(min_penalty,
241  it2.second.penalty_level);
242  }
243  if (min_penalty < it->second.penalty_level) {
244  keep = false;
245  penalty_note = FORMAT(
246  "; penalty level " << it->second.penalty_level
247  << " exceeds minimum value " << min_penalty);
248  }
249  }
250  }
251  _TRACE_X(5,
252  "Considering connection to " << conn_key << " ("
253  << server_name
254  << ") for turnover; projected excess count " << excess
255  << penalty_note);
256  if (keep && excess > 0.0) {
257  unsigned int pool_max = x_GetPoolMax(params);
258  if (pool_max == 0u) {
259  pool_max = m_TotalCount * 2;
260  }
261  CRandomGuard rg;
262  uniform_real_distribution<double> urd(0.0, pool_max);
263  if (urd(*s_RandomEngine) <= excess) {
264  keep = false;
265  }
266  }
267  if (keep) {
268  _TRACE_X(6, "Sparing connection immediately");
269  return result;
270  } else if ( !x_NoPooling() ) {
271  // defer turnover (endpoint may be reselected!) but
272  // speculatively update counts
273  --m_TotalCount;
274  if (it != m_Endpoints.end()) {
275  --it->second.actual_count;
276  }
277  }
278  }
279  }
280 
281  vector<TEndpoints::value_type*> options;
282  vector<double> weights;
283  double scale_factor = (m_TotalCount + 1.0) / total_ranking;
284  _TRACE_X(7,
285  "Scale factor for new connection: " << (m_TotalCount + 1) << " / "
286  << total_ranking << " = " << scale_factor);
287 
288  for (auto& it : m_Endpoints) {
289  it.second.ideal_count = it.second.effective_ranking * scale_factor;
290  double d = it.second.ideal_count - it.second.actual_count;
291  _TRACE_X(8,
292  it.first << " (" << it.second.ref->GetName()
293  << "): current count " << it.second.actual_count
294  << ", ideal count " << it.second.ideal_count << ", delta "
295  << d << (d > 0 ? " > 0" : " <= 0"));
296  if (d > 0) {
297  options.push_back(&it);
298  weights.push_back(d);
299  }
300  }
301  if (weights.empty()) {
302  ERR_POST_X(9, "No positive deltas");
303  return result;
304  }
305 
306  CRandomGuard rg;
307 #if defined(NCBI_COMPILER_MSVC) && _MSC_VER < 1900
308  // Work around limitation in VS 2013's discrete_distribution<>
309  // mitigated by a non-standard initializer_list<> constructor.
310  discrete_distribution<> dd(
311  initializer_list<double>(
312  weights.data(), weights.data() + weights.size()));
313 #else
314  discrete_distribution<> dd(weights.begin(), weights.end());
315 #endif
316  auto i = dd(*s_RandomEngine);
317  _TRACE_X(10,
318  "Picked " << options[i]->first << " ("
319  << options[i]->second.ref->GetName() << ')');
320  if (conn != NULL && *conn != NULL) {
321  if (CDBServer((*conn)->ServerName(), conn_key.GetHost(),
322  conn_key.GetPort())
323  .Matches(*options[i]->second.ref, m_ServiceName)) {
324  _TRACE_X(11, "Sparing connection (endpoint reselected)");
325  if ( !x_NoPooling() ) {
326  ++options[i]->second.actual_count;
327  ++m_TotalCount;
328  }
329  } else {
330  _TRACE_X(12, "Proceeding to request turnover");
331  auto to_discard = *conn;
332  *conn = nullptr;
333  x_Discard(params, to_discard);
334  }
335  }
336  return TSvrRef(&*options[i]->second.ref);
337 }
338 
340 {
341  _TRACE_X(13, name);
342  CTempString address = name;
343  SIZE_TYPE pos = name.find_last_not_of("0123456789.:");
344  if (pos != NPOS) {
345  if (name[pos] == '@') {
346  address = name.substr(pos + 1);
347  name = name.substr(0, pos);
348  } else {
349  for (const auto& it : m_Endpoints) {
350  if (it.first > 0 && it.second.ref->GetName() == name) {
351  _TRACE_X(14, "Found at " << it.first);
352  return it.first;
353  }
354  }
355  return 0;
356  }
357  }
359  if (key == 0) {
360  ERR_POST_X(15, "Error parsing " << address << ": "
361  << CNcbiError::GetLast().Extra());
362  }
363  _TRACE_X(16, key);
364  return key;
365 }
366 
367 
370 {
371  auto it = x_FindEndpointAsIs(key, name);
372  if (it == m_Endpoints.end()) {
373  auto host = key.GetHost();
374  auto port = key.GetPort();
375  if (port != 0U) {
376  it = x_FindEndpointAsIs(CEndpointKey(host, 0U), name);
377  }
378  if (it == m_Endpoints.end() && host != 0U) {
379  it = x_FindEndpointAsIs(CEndpointKey(0U, port), name);
380  }
381  if (it == m_Endpoints.end() && host != 0U && port != 0U) {
382  it = x_FindEndpointAsIs(CEndpointKey(0U, 0U), name);
383  }
384  }
385  return it;
386 }
387 
388 
391 {
392  auto it = m_Endpoints.lower_bound(key);
393  if (it == m_Endpoints.end() || it->first != key) {
394  return m_Endpoints.end();
395  } else if (key.GetHost() == 0U /* || key.GetPort() == 0U */) {
396  for (; it != m_Endpoints.end() && it->first == key; ++it) {
397  if (it->second.ref->GetName() == name) {
398  return it;
399  }
400  }
401  return m_Endpoints.end();
402  }
403  return it;
404 }
405 
CDBServerOption – CDBServer extended with additional information that helps maintain a balanced pool ...
@ fState_Normal
Fully available.
IDBServiceMapper.
bool Matches(const CDBServer &that, CTempString service) const
Uint2 GetPort(void) const
const string & GetName(void) const
Uint4 GetHost(void) const
Lightweight representation of just a host and a port.
Uint2 GetPort(void) const
Uint4 GetHost(void) const
virtual IBalanceable * x_TryPool(const void *)
void x_InitFromCounts(const TCounts &counts)
multiset< double > m_Rankings
unsigned int m_TotalCount
void LocallyPenalize(TSvrRef server)
CPoolBalancer(const string &service_name, const IDBServiceMapper::TOptions &options, TFlags flags)
virtual void x_Discard(const void *, IBalanceable *)
TEndpoints::iterator x_FindEndpointAsIs(CEndpointKey key, CTempString name)
TSvrRef x_GetServer(const void *params, IBalanceable **conn)
bool x_NoPooling(void) const
TEndpoints::iterator x_FindEndpoint(CEndpointKey key, CTempString name)
CEndpointKey x_NameToKey(CTempString &name) const
virtual unsigned int x_GetPoolMax(const void *)
virtual unsigned int x_GetCount(const void *, const string &)
CFastMutexGuard m_MutexGuard
CSafeStatic<>::
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
CTime –.
Definition: ncbitime.hpp:296
list< CRef< CDBServerOption > > TOptions
size_type size() const
Definition: map.hpp:288
const_iterator lower_bound(const key_type &key) const
Definition: map.hpp:294
const_iterator end() const
Definition: map.hpp:292
iterator insert(const value_type &val)
Definition: map.hpp:305
const_iterator begin() const
Definition: map.hpp:291
bool empty() const
Definition: map.hpp:289
const_iterator begin() const
Definition: set.hpp:286
iterator insert(const value_type &val)
Definition: set.hpp:300
const_iterator end() const
Definition: set.hpp:287
const_iterator find(const key_type &key) const
Definition: set.hpp:288
void erase(iterator pos)
Definition: set.hpp:302
bool empty() const
Definition: set.hpp:284
static CS_CONNECTION * conn
Definition: ct_dynamic.c:25
static uch flags
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
Definition: dlist.tmpl.h:46
#define NULL
Definition: ncbistd.hpp:225
#define _TRACE_X(err_subcode, message)
Definition: ncbidbg.hpp:125
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
Definition: ncbidiag.hpp:550
static const CNcbiError & GetLast(void)
Get the error that was last set (in the current thread)
Definition: ncbierror.cpp:72
#define FORMAT(message)
Format message using iostreams library.
Definition: ncbiexpt.hpp:672
void Info(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1185
const float epsilon
Definition: math.hpp:61
bool Empty(void) const THROWS_NONE
Check if CRef is empty – not pointing to any object, which means having a null value.
Definition: ncbiobj.hpp:719
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
uint16_t Uint2
2-byte (16-bit) unsigned integer
Definition: ncbitype.h:101
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
NCBI_NS_STD::string::size_type SIZE_TYPE
Definition: ncbistr.hpp:132
#define NPOS
Definition: ncbistr.hpp:133
size_type find_last_not_of(const CTempString match, size_type pos=npos) const
Find the last occurrence of any character not in the matching string within the current string,...
Definition: tempstr.hpp:610
CTempString substr(size_type pos) const
Obtain a substring from this string, beginning at a given offset.
Definition: tempstr.hpp:776
@ fConvErr_NoThrow
Do not throw an exception on error.
Definition: ncbistr.hpp:285
CTime & AddSecond(TSeconds seconds=1, EDaylight adl=eDaylightDefault)
Add specified seconds.
Definition: ncbitime.cpp:1879
bool IsEmpty(void) const
Is time object empty (date and time)?
Definition: ncbitime.hpp:2378
CTime & SetCurrent(void)
Make the time current in the presently active time zone.
Definition: ncbitime.hpp:2302
CTime CurrentTime(CTime::ETimeZone tz=CTime::eLocal, CTime::ETimeZonePrecision tzp=CTime::eTZPrecisionDefault)
Definition: ncbitime.hpp:2185
time_t GetTimeT(void) const
Get time in time_t format.
Definition: ncbitime.cpp:1395
@ eEmpty
Use "empty" time.
Definition: ncbitime.hpp:301
@ eUTC
UTC (Universal Coordinated Time)
Definition: ncbitime.hpp:307
Definition of all error codes used in corelib (xncbi.lib).
int i
static MDB_envinfo info
Definition: mdb_load.c:37
const struct ncbi::grid::netcache::search::fields::KEY key
CRef< CDBServer > TSvrRef
EIPRangeType t
Definition: ncbi_localip.c:101
static CSafeStatic< default_random_engine > s_RandomEngine
static bool s_RandomnessSeeded
DEFINE_STATIC_FAST_MUTEX(s_RandomMutex)
Help distribute connections within a pool across servers.
Static variables safety - create on demand, destroy on application termination.
Defines NCBI C++ Toolkit portable error codes.
T min(T x_, T y_)
else result
Definition: token2.c:20
Modified on Sat Dec 02 09:19:31 2023 by modify_doxy.py rev. 669887