44 #define NCBI_USE_ERRCODE_X Corelib_Balancer
73 : m_ServiceName(service_name), m_TotalCount(0U), m_Flags(
flags)
75 for (
auto it : options) {
78 if (
key == 0 && name != service_name) {
86 it->GetRanking(), it->GetState(),
87 it->GetExpireTime()));
91 service_name <<
": " <<
key <<
" -> " << name <<
" per DBLB");
100 double ranking = it->GetRanking();
102 if (it->IsPenalized()) {
104 ++endpoint->second.penalty_level;
106 if (it->IsExcluded()) {
108 ++endpoint->second.penalty_level;
111 endpoint->second.ref = it;
112 endpoint->second.effective_ranking = ranking;
120 if (server.
Empty()) {
129 ++it->second.penalty_level;
139 it.second.actual_count = 0;
143 double legacy_base_ranking = 0.0;
144 for (
const auto& cit : counts) {
150 ||
key.GetHost() != eit->first.GetHost()
151 || (
key.GetPort() != 0
152 &&
key.GetPort() != eit->first.GetPort()))
154 || name != cit.first)) {
157 <<
" per existing connection(s)");
161 auto& endpoint = eit->second;
162 if (endpoint.ref.Empty()) {
163 if (legacy_base_ranking == 0.0) {
165 legacy_base_ranking = 1.0;
173 = (std::exp(1.61 * cit.second - lgamma(cit.second + 1))
174 * legacy_base_ranking);
181 (name,
key.GetHost(),
key.GetPort(),
186 endpoint.effective_ranking = ranking;
188 endpoint.actual_count += cit.second;
209 if (total_ranking <= 0.0) {
214 if (
conn !=
nullptr && params !=
nullptr) {
217 const string& server_name = (*conn)->ServerName();
218 Uint4 host = (*conn)->Host();
219 Uint2 port = (*conn)->Port();
227 "Unrecognized endpoint for existing connection to "
228 << conn_key <<
" (" << server_name <<
')');
234 excess = (it->second.actual_count
235 - it->second.effective_ranking * scale_factor);
236 result.Reset(&*it->second.ref);
237 if (it->second.penalty_level > 0) {
238 auto min_penalty = it->second.penalty_level;
240 min_penalty =
min(min_penalty,
241 it2.second.penalty_level);
243 if (min_penalty < it->second.penalty_level) {
246 "; penalty level " << it->second.penalty_level
247 <<
" exceeds minimum value " << min_penalty);
252 "Considering connection to " << conn_key <<
" ("
254 <<
") for turnover; projected excess count " << excess
256 if (keep && excess > 0.0) {
258 if (pool_max == 0u) {
262 uniform_real_distribution<double> urd(0.0, pool_max);
268 _TRACE_X(6,
"Sparing connection immediately");
275 --it->second.actual_count;
281 vector<TEndpoints::value_type*> options;
282 vector<double> weights;
283 double scale_factor = (
m_TotalCount + 1.0) / total_ranking;
285 "Scale factor for new connection: " << (
m_TotalCount + 1) <<
" / "
286 << total_ranking <<
" = " << scale_factor);
289 it.second.ideal_count = it.second.effective_ranking * scale_factor;
290 double d = it.second.ideal_count - it.second.actual_count;
292 it.first <<
" (" << it.second.ref->GetName()
293 <<
"): current count " << it.second.actual_count
294 <<
", ideal count " << it.second.ideal_count <<
", delta "
295 << d << (d > 0 ?
" > 0" :
" <= 0"));
297 options.push_back(&it);
298 weights.push_back(d);
301 if (weights.empty()) {
307 #if defined(NCBI_COMPILER_MSVC) && _MSC_VER < 1900
310 discrete_distribution<> dd(
311 initializer_list<double>(
312 weights.data(), weights.data() + weights.size()));
314 discrete_distribution<> dd(weights.begin(), weights.end());
318 "Picked " << options[
i]->
first <<
" ("
319 << options[
i]->second.ref->GetName() <<
')');
324 _TRACE_X(11,
"Sparing connection (endpoint reselected)");
326 ++options[
i]->second.actual_count;
330 _TRACE_X(12,
"Proceeding to request turnover");
331 auto to_discard = *
conn;
336 return TSvrRef(&*options[
i]->second.ref);
345 if (name[pos] ==
'@') {
346 address = name.
substr(pos + 1);
347 name = name.
substr(0, pos);
350 if (it.first > 0 && it.second.ref->GetName() == name) {
351 _TRACE_X(14,
"Found at " << it.first);
360 ERR_POST_X(15,
"Error parsing " << address <<
": "
373 auto host =
key.GetHost();
374 auto port =
key.GetPort();
395 }
else if (
key.GetHost() == 0U ) {
397 if (it->second.ref->GetName() == name) {
CDBServerOption – CDBServer extended with additional information that helps maintain a balanced pool ...
@ fState_Normal
Fully available.
bool Matches(const CDBServer &that, CTempString service) const
Uint2 GetPort(void) const
const string & GetName(void) const
Uint4 GetHost(void) const
Lightweight representation of just a host and a port.
Uint2 GetPort(void) const
Uint4 GetHost(void) const
virtual IBalanceable * x_TryPool(const void *)
void x_InitFromCounts(const TCounts &counts)
multiset< double > m_Rankings
unsigned int m_TotalCount
void LocallyPenalize(TSvrRef server)
CPoolBalancer(const string &service_name, const IDBServiceMapper::TOptions &options, TFlags flags)
virtual void x_Discard(const void *, IBalanceable *)
TEndpoints::iterator x_FindEndpointAsIs(CEndpointKey key, CTempString name)
TSvrRef x_GetServer(const void *params, IBalanceable **conn)
bool x_NoPooling(void) const
TEndpoints::iterator x_FindEndpoint(CEndpointKey key, CTempString name)
CEndpointKey x_NameToKey(CTempString &name) const
virtual unsigned int x_GetPoolMax(const void *)
virtual unsigned int x_GetCount(const void *, const string &)
CFastMutexGuard m_MutexGuard
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
list< CRef< CDBServerOption > > TOptions
const_iterator lower_bound(const key_type &key) const
const_iterator end() const
iterator insert(const value_type &val)
const_iterator begin() const
container_type::iterator iterator
const_iterator begin() const
iterator insert(const value_type &val)
const_iterator end() const
const_iterator find(const key_type &key) const
static CS_CONNECTION * conn
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
#define _TRACE_X(err_subcode, message)
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
static const CNcbiError & GetLast(void)
Get the error that was last set (in the current thread)
#define FORMAT(message)
Format message using iostreams library.
void Info(CExceptionArgs_Base &args)
bool Empty(void) const THROWS_NONE
Check if CRef is empty – not pointing to any object, which means having a null value.
uint32_t Uint4
4-byte (32-bit) unsigned integer
uint16_t Uint2
2-byte (16-bit) unsigned integer
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
NCBI_NS_STD::string::size_type SIZE_TYPE
size_type find_last_not_of(const CTempString match, size_type pos=npos) const
Find the last occurrence of any character not in the matching string within the current string,...
CTempString substr(size_type pos) const
Obtain a substring from this string, beginning at a given offset.
@ fConvErr_NoThrow
Do not throw an exception on error.
CTime & AddSecond(TSeconds seconds=1, EDaylight adl=eDaylightDefault)
Add specified seconds.
bool IsEmpty(void) const
Is time object empty (date and time)?
CTime & SetCurrent(void)
Make the time current in the presently active time zone.
CTime CurrentTime(CTime::ETimeZone tz=CTime::eLocal, CTime::ETimeZonePrecision tzp=CTime::eTZPrecisionDefault)
time_t GetTimeT(void) const
Get time in time_t format.
@ eEmpty
Use "empty" time.
@ eUTC
UTC (Universal Coordinated Time)
Definition of all error codes used in corelib (xncbi.lib).
const struct ncbi::grid::netcache::search::fields::KEY key
CRef< CDBServer > TSvrRef
static CSafeStatic< default_random_engine > s_RandomEngine
static bool s_RandomnessSeeded
DEFINE_STATIC_FAST_MUTEX(s_RandomMutex)
Help distribute connections within a pool across servers.
Static variables safety - create on demand, destroy on application termination.
Defines NCBI C++ Toolkit portable error codes.