NCBI C++ ToolKit
osg_mapper.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: osg_mapper.cpp 96512 2022-04-08 19:45:33Z vasilche $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Eugene Vasilchenko, Aaron Ucko
27 *
28 * File Description:
29 * PSG to OSG connection service mapper.
30 *
31 * ===========================================================================
32 */
33 
34 #include <ncbi_pch.hpp>
35 #include "osg_mapper.hpp"
36 
37 #include <corelib/ncbiapp.hpp>
38 #include <connect/ncbi_socket.hpp>
40 
41 #include <cmath>
42 
46 
50 static double s_DefaultDecayRate;
53 
54 
56 {
60 }
61 
62 
64 {
65 }
66 
67 
69 {
71  _ASSERT(app);
72  string section = app->GetAppName();
73 
74  reg.Set(section, "positive_feedback_weight", "0.01",
76  reg.Set(section, "negative_feedback_weight", "0.5",
78  reg.Set(section, "penalty_normalization_interval", "10.0",
80  reg.Set(section, "penalty_half_life", "3600.0", IRegistry::fNoOverride);
81  reg.Set(section, "initial_penalty", "0.15", IRegistry::fNoOverride);
82 
84  = reg.GetDouble(section, "positive_feedback_weight", 0);
86  = reg.GetDouble(section, "negative_feedback_weight", 0);
88  = reg.GetDouble(section, "penalty_normalization_interval", 0);
90  = -M_LN2 / reg.GetDouble(section, "penalty_half_life", 0);
91  s_DefaultInitialPenalty = reg.GetDouble(section, "initial_penalty", 0);
92 
93  s_DefaultsInitialized = true;
94 }
95 
96 
98 {
100 
102  _ASSERT(app);
103  string section = app->GetAppName();
104 
105  if (registry == NULL) {
106  registry = &app->GetConfig();
107  }
108 
110 
112  = registry->GetDouble(section, "positive_feedback_weight",
115  = registry->GetDouble(section, "negative_feedback_weight",
118  = registry->GetDouble(section, "penalty_normalization_interval",
121  = -M_LN2 / registry->GetDouble(section, "penalty_half_life",
124  = registry->GetDouble(section, "initial_penalty",
126 
127  list<string> entries;
128  string msn_section = section + "/main_service_name";
129  registry->EnumerateEntries(msn_section, &entries);
130  for (const auto &it : entries) {
131  m_MainServiceNameMap[it] = registry->Get(msn_section, it);
132  }
133 }
134 
135 
136 inline
138 {
140 }
141 
142 
144  const TTried* tried)
145 {
146  TServerRatings ratings;
147  double min_penalty = 1.0;
148  size_t full_count = 0, num_ratings = 0;
149 
150  _TRACE("Finding a server for " << service);
151 
153 
154  {{
157  = m_AllServerRatings.find(service);
158  if (raw_ratings == m_AllServerRatings.end()
159  || raw_ratings->second.empty()) {
160  LOCK.Release();
161  list<string> serv_list;
162  GetServersList(service, &serv_list);
163  } else {
164  double min_untried_penalty = 1.0;
165 
166  ITERATE (TServerRatings, it, raw_ratings->second) {
167  bool tried_it
168  = (tried != NULL
169  && (find(tried->begin(), tried->end(), it->first)
170  != tried->end()));
171  if (tried_it || it->second.excluded
172  || it->second.penalty > 0.0) {
173  _ASSERT(it->second.ref.NotEmpty());
174  ratings.insert(*it);
175  ++num_ratings;
176  }
177  if ( !it->second.excluded ) {
178  if (it->second.penalty < min_penalty) {
179  min_penalty = it->second.penalty;
180  }
181  if ( !tried_it
182  && it->second.penalty < min_untried_penalty) {
183  min_untried_penalty = it->second.penalty;
184  }
185  }
186  ++full_count;
187  }
188 
189  if (tried != NULL && !tried->empty()) {
190  if (min_untried_penalty < 1.0) {
191  min_penalty = min_untried_penalty;
192  ITERATE (TTried, it, *tried) {
193  TServerRatings::iterator rit = ratings.find(*it);
194  if (rit == ratings.end()) {
195  string msg
197  << " unknown, but listed as tried.\n"
198  << CStackTrace());
199  PSG_WARNING("OSG: " << msg);
200  } else {
201  _TRACE("Skipping " << s_EndpointKeyName(*it)
202  << " (already tried for this request)");
203  rit->second.excluded = true;
204  }
205  }
206  } else {
207  _TRACE("Re-allowing previously tried backends. "
208  "(Out of alternatives.)");
209  }
210  }
211  }
212  }}
213 
214  vector<TSvrRef> to_exclude;
215  if ( !ratings.empty() ) {
217  CRandom::TValue max_random = CRandom::GetMax();
218  to_exclude.reserve(num_ratings);
219  ITERATE (TServerRatings, it, ratings) {
220  if (it->second.excluded
221  || (m_Random.GetRand() < it->second.penalty * max_random)) {
222  to_exclude.push_back(it->second.ref);
223  _TRACE("Temporarily excluding "
224  << s_EndpointKeyName(it->first) << " (penalty: "
225  << it->second.penalty << ')');
226  } else {
227  _TRACE("Considering " << s_EndpointKeyName(it->first)
228  << " (penalty: " << it->second.penalty << ')');
229  }
230  }
231  if (to_exclude.size() == full_count) {
232  // Apparently excluded everything; rescale and try again.
233  // (Not done right away, as this technique can introduce skew
234  // if somehow starting with an incomplete list.)
235  to_exclude.clear();
236  to_exclude.reserve(full_count);
237  if (min_penalty < 1.0) {
238  // Give the numerator a slight boost to keep roundoff
239  // error from yielding a (slim!) possibility of still
240  // excluding everything.
241  double scale_factor = (max_random + 1.0) / (1.0 - min_penalty);
242  ITERATE (TServerRatings, it, ratings) {
243  double score = (1.0 - it->second.penalty) * scale_factor;
244  if (it->second.excluded || m_Random.GetRand() > score) {
245  to_exclude.push_back(it->second.ref);
246  } else {
247  _TRACE("Reconsidering "
248  << s_EndpointKeyName(it->first) << " (score: "
249  << NStr::UInt8ToString(score,
251  << ')');
252  }
253  }
254  }
255  }
256  }
257 
259  TParent::CleanExcluded(service);
260  ITERATE (vector<TSvrRef>, it, to_exclude) {
261  TParent::Exclude(service, *it);
262  }
263  TSvrRef result = TParent::GetServer(service);
264  TParent::CleanExcluded(service);
265  ITERATE (TServerRatings, it, ratings) {
266  if (it->second.excluded) {
267  TParent::Exclude(service, it->second.ref);
268  }
269  }
270  _TRACE("Returning " << CSocketAPI::ntoa(result->GetHost())
271  << ", expiring "
272  << CTime(result->GetExpireTime()).ToLocalTime().AsString());
273  return result;
274 }
275 
276 
277 void COSGServiceMapper::Exclude(const string& service, const TSvrRef& server)
278 {
279  {{
281  bool was_new = false;
282  SServerRating& rating
283  = x_SetRating(m_AllServerRatings[service], server->GetHost(),
284  server->GetPort(), &was_new, server->GetName());
285  rating.excluded = true;
286  if (was_new) {
287  string msg
288  = "Excluding previously undiscovered " + service + " on "
289  + CSocketAPI::ntoa(server->GetHost()) + '.';
290  PSG_WARNING("OSG: " << msg);
291  }
292  }}
293  {{
295  TParent::Exclude(service, server);
296  }}
297  auto msn = m_MainServiceNameMap.find(service);
298  if (msn != m_MainServiceNameMap.end()) {
299  Exclude(msn->second, server);
300  }
301 }
302 
303 
304 void COSGServiceMapper::CleanExcluded(const string& service)
305 {
306  {{
308  TServerRatings& ratings = m_AllServerRatings[service];
309  NON_CONST_ITERATE (TServerRatings, it, ratings) {
310  it->second.excluded = false;
311  }
312  }}
313  {{
315  TParent::CleanExcluded(service);
316  }}
317  auto msn = m_MainServiceNameMap.find(service);
318  if (msn != m_MainServiceNameMap.end()) {
319  CleanExcluded(msn->second);
320  }
321 }
322 
323 
324 void COSGServiceMapper::GetServersList(const string& service,
325  list<string>* serv_list) const
326 {
327  unique_ptr<set<string>> main_set;
328  TServerRatings* main_ratings = nullptr;
329  TParent::GetServersList(service, serv_list);
330  auto msn = m_MainServiceNameMap.find(service);
331  if (msn != m_MainServiceNameMap.end()) {
332  list<string> main_list;
333  TParent::GetServersList(msn->second, &main_list);
334  main_set.reset(new set<string>(main_list.begin(), main_list.end()));
335  }
336 
338  TServerRatings& ratings = m_AllServerRatings[service];
339  if (main_set.get() != nullptr) {
340  main_ratings = &m_AllServerRatings[msn->second];
341  }
342  ITERATE (list<string>, it, *serv_list) {
343  Uint4 host;
344  Uint2 port;
345  bool was_new = false;
346  CSocketAPI::StringToHostPort(*it, &host, &port);
347  auto& rating = x_SetRating(ratings, host, port, &was_new);
348  if (was_new) {
349  _TRACE("Discovered " << service << " on " << *it);
350  if (main_ratings != nullptr) {
351  auto& main_rating = x_SetRating(*main_ratings, host, port,
352  &was_new);
353  if (main_set->find(*it) == main_set->end()) {
354  main_rating.penalty = 1.0;
355  main_rating.excluded = true;
356  } else {
357  _TRACE("Discovered " << msn->second << " on " << *it);
358  main_rating = rating;
359  }
360  }
361  }
362  }
363 }
364 
365 
366 void COSGServiceMapper::GetServerOptions(const string& service,
367  TOptions* options)
368 {
369  set<TOSGEndpointKey> main_set;
370  TServerRatings* main_ratings = nullptr;
371  TParent::GetServerOptions(service, options);
372  auto msn = m_MainServiceNameMap.find(service);
373  if (msn != m_MainServiceNameMap.end()) {
374  TOptions main_options;
375  TParent::GetServerOptions(msn->second, &main_options);
376  for (const auto &it : main_options) {
377  main_set.insert(g_OSG_MakeEndpointKey(it->GetHost(),
378  it->GetPort()));
379  }
380  }
381 
383  TServerRatings& ratings = m_AllServerRatings[service];
384  if (msn != m_MainServiceNameMap.end()) {
385  main_ratings = &m_AllServerRatings[msn->second];
386  }
387  for (const auto& it : *options) {
388  auto host = it->GetHost();
389  auto port = it->GetPort();
390  bool was_new = false;
391  auto& rating = x_SetRating(ratings, host, port, &was_new);
392  if (was_new) {
393  _TRACE("Discovered " << service << " on "
394  << CSocketAPI::HostPortToString(host, port));
395  if (main_ratings != nullptr) {
396  auto& main_rating = x_SetRating(*main_ratings, host, port,
397  &was_new);
398  auto key = g_OSG_MakeEndpointKey(host, port);
399  if (main_set.find(key) == main_set.end()) {
400  main_rating.penalty = 1.0;
401  main_rating.excluded = true;
402  } else {
403  _TRACE("Discovered " << msn->second << " on "
404  << CSocketAPI::HostPortToString(host, port));
405  main_rating = rating;
406  }
407  }
408  }
409  }
410 }
411 
412 
413 void COSGServiceMapper::AcceptFeedback(const string& service,
414  unsigned int host, unsigned short port,
415  EFeedback feedback)
416 {
419  bool was_new = false;
420  SServerRating& rating = x_SetRating(m_AllServerRatings[service],
421  host, port, &was_new);
422  double& penalty = rating.penalty;
423  _DEBUG_ARG(double old_penalty = penalty);
424  if (was_new) {
425  string msg = ("Accepting feedback for previously undiscovered "
426  + service + " on " + rating.ref->GetName() + '.');
427  PSG_WARNING("OSG: " << msg);
428  }
429  if (feedback == ePositiveFeedback) {
430  penalty *= (1.0 - m_PositiveFeedbackWeight);
431  } else {
432  penalty = penalty * (1.0 - m_NegativeFeedbackWeight)
434  }
435  _TRACE(((feedback == ePositiveFeedback)
436  ? "Reducing" : "Increasing")
437  << " penalty for " << service << " on "
438  << CSocketAPI::HostPortToString(host, port)
439  << " from " << old_penalty << " to " << penalty);
440  LOCK.Release();
441  auto msn = m_MainServiceNameMap.find(service);
442  if (msn != m_MainServiceNameMap.end()) {
443  AcceptFeedback(msn->second, host, port, feedback);
444  }
445 }
446 
447 
449 {
450  {{
454  return;
455  }
456  }}
457 
459  // Recheck, as another thread could have just taken care of it.
460  double elapsed = m_AllServerRatingsTimer.Elapsed();
461  if (elapsed < m_PenaltyNormalizationInterval) {
462  return;
463  }
464 
465  double decay = exp(m_PenaltyDecayRate * elapsed);
466  _TRACE("Decaying penalties by " << decay << " after " << elapsed
467  << " s");
469  NON_CONST_ITERATE (TServerRatings, it2, it->second) {
470  it2->second.penalty
471  = (it2->second.penalty - m_InitialPenalty) * decay
473  }
474  }
476 }
477 
478 
480 (TServerRatings& ratings, Uint4 host, Uint2 port, bool* was_new,
481  CTempString name) const
482 {
485 
486  if (it == ratings.end() || it->first != key) {
487  if (was_new != NULL) {
488  *was_new = true;
489  }
491  string name_str
492  = (name.empty() ? CSocketAPI::HostPortToString(host, port)
493  : string(name));
494  node.second.ref.Reset(new CDBServer(name_str, host, port, kMax_Auto));
495  node.second.penalty = m_InitialPenalty;
496  node.second.excluded = false;
497  it = ratings.insert(it, node);
498  }
499 
500  return it->second;
501 }
502 
503 
virtual void GetServerOptions(const string &service, TOptions *options)
Get an annotated list of all servers for the given service.
virtual void GetServersList(const string &service, list< string > *serv_list) const
Get list of all servers for the given service disregarding any exclusions.
virtual TSvrRef GetServer(const string &service)
Map a service to a server.
void ConfigureFromRegistry(const IRegistry *registry=NULL)
IDBServiceMapper.
Uint2 GetPort(void) const
const string & GetName(void) const
Uint4 GetHost(void) const
void Release()
Manually force the resource to be released.
Definition: guard.hpp:166
static CNcbiApplication * Instance(void)
Singleton method.
Definition: ncbiapp.cpp:264
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
CTime –.
Definition: ncbitime.hpp:296
virtual void CleanExcluded(const string &service)
Clean the list of excluded servers for the given service.
list< CRef< CDBServerOption > > TOptions
virtual void Exclude(const string &service, const TSvrRef &server)
Exclude a server from the mapping for a service.
IRWRegistry –.
Definition: ncbireg.hpp:407
IRegistry –.
Definition: ncbireg.hpp:73
container_type::iterator iterator
Definition: map.hpp:54
const_iterator end() const
Definition: map.hpp:152
const_iterator lower_bound(const key_type &key) const
Definition: map.hpp:154
iterator_bool insert(const value_type &val)
Definition: map.hpp:165
bool empty() const
Definition: map.hpp:149
container_type::value_type value_type
Definition: map.hpp:52
const_iterator find(const key_type &key) const
Definition: map.hpp:153
Definition: map.hpp:338
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
const_iterator find(const key_type &key) const
Definition: set.hpp:137
const_iterator end() const
Definition: set.hpp:136
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
static string GetAppName(EAppNameType name_type=eBaseName, int argc=0, const char *const *argv=NULL)
Definition: ncbiapp.cpp:1377
const CNcbiRegistry & GetConfig(void) const
Get the application's cached configuration parameters (read-only).
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
string
Definition: cgiapp.hpp:687
#define NULL
Definition: ncbistd.hpp:225
#define _TRACE(message)
Definition: ncbidbg.hpp:122
#define _DEBUG_ARG(arg)
Definition: ncbidbg.hpp:134
#define FORMAT(message)
Format message using iostreams library.
Definition: ncbiexpt.hpp:672
void GetServerOptions(const string &service, TOptions *options)
Get an annotated list of all servers for the given service.
Definition: osg_mapper.cpp:366
void Exclude(const string &service, const TSvrRef &server)
Exclude a server from the mapping for a service.
Definition: osg_mapper.cpp:277
void CleanExcluded(const string &service)
Clean the list of excluded servers for the given service.
Definition: osg_mapper.cpp:304
CFastRWLock m_AllServerRatingsLock
Definition: osg_mapper.hpp:131
CFastMutex m_RandomMutex
Definition: osg_mapper.hpp:135
double m_PenaltyDecayRate
Definition: osg_mapper.hpp:127
COSGServiceMapper(const IRegistry *registry=NULL)
Definition: osg_mapper.cpp:55
void x_NormalizePenalties(void)
Definition: osg_mapper.cpp:448
void GetServersList(const string &service, list< string > *serv_list) const
Get list of all servers for the given service disregarding any exclusions.
Definition: osg_mapper.cpp:324
Uint4 g_OSG_GetHost(TOSGEndpointKey key)
Definition: osg_mapper.hpp:61
void AcceptFeedback(const string &service, unsigned int host, unsigned short port, EFeedback feedback)
Definition: osg_mapper.cpp:413
double m_PositiveFeedbackWeight
Definition: osg_mapper.hpp:124
SServerRating & x_SetRating(TServerRatings &ratings, Uint4 host, Uint2 port, bool *was_new, CTempString name=kEmptyStr) const
Definition: osg_mapper.cpp:480
TOSGEndpointKey g_OSG_MakeEndpointKey(Uint4 host, Uint2 port)
Definition: osg_mapper.hpp:55
Uint2 g_OSG_GetPort(TOSGEndpointKey key)
Definition: osg_mapper.hpp:67
CFastMutex m_DBLBExclusionsMutex
Definition: osg_mapper.hpp:133
vector< TOSGEndpointKey > TTried
Definition: osg_mapper.hpp:90
Uint8 TOSGEndpointKey
Definition: osg_mapper.hpp:52
double m_PenaltyNormalizationInterval
Definition: osg_mapper.hpp:126
CStopWatch m_AllServerRatingsTimer
Definition: osg_mapper.hpp:132
TAllServerRatings m_AllServerRatings
Definition: osg_mapper.hpp:130
map< string, string > m_MainServiceNameMap
Definition: osg_mapper.hpp:123
void Configure(const IRegistry *registry=NULL)
Definition: osg_mapper.cpp:97
static void InitDefaults(IRWRegistry &reg)
Definition: osg_mapper.cpp:68
TSvrRef x_GetServer(const string &service, const TTried *tried)
Definition: osg_mapper.cpp:143
virtual ~COSGServiceMapper()
Definition: osg_mapper.cpp:63
double m_NegativeFeedbackWeight
Definition: osg_mapper.hpp:125
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
static const SAutoMax kMax_Auto
Generic stand-in for type-specific kMax_* constants from ncbi_limits.h, useful in any context with ex...
uint16_t Uint2
2-byte (16-bit) unsigned integer
Definition: ncbitype.h:101
Uint4 TValue
Type of the generated integer value and/or the seed value.
Definition: random_gen.hpp:69
TValue GetRand(void)
Get the next random number in the interval [0..GetMax()] (inclusive)
Definition: random_gen.hpp:238
static TValue GetMax(void)
The max. value GetRand() returns.
Definition: random_gen.hpp:295
void Randomize(void)
Re-initialize (re-seed) the generator using platform-specific randomization.
Definition: random_gen.cpp:267
virtual const string & Get(const string &section, const string &name, TFlags flags=0) const
Get the parameter value.
Definition: ncbireg.cpp:262
virtual double GetDouble(const string &section, const string &name, double default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get double value of specified parameter name.
Definition: ncbireg.cpp:420
virtual void EnumerateEntries(const string &section, list< string > *entries, TFlags flags=fAllLayers) const
Enumerate parameter names for a specified section.
Definition: ncbireg.cpp:514
bool Set(const string &section, const string &name, const string &value, TFlags flags=0, const string &comment=kEmptyStr)
Set the configuration parameter value.
Definition: ncbireg.cpp:826
@ fNoOverride
Cannot change existing value.
Definition: ncbireg.hpp:86
static SIZE_TYPE StringToHostPort(const string &str, unsigned int *host, unsigned short *port)
Return position past the end of the parsed portion, NPOS on error.
static string HostPortToString(unsigned int host, unsigned short port)
See SOCK_HostPortToString()
static string ntoa(unsigned int host)
BSD-like API. NB: when int, "host" must be in network byte order.
bool empty(void) const
Return true if the represented string is empty (i.e., the length is zero)
Definition: tempstr.hpp:334
static string UInt8ToString(Uint8 value, TNumToStringFlags flags=0, int base=10)
Convert UInt8 to string.
Definition: ncbistr.hpp:5168
@ fWithCommas
Use commas as thousands separator.
Definition: ncbistr.hpp:254
CTime & ToLocalTime(void)
Convert the time into local time.
Definition: ncbitime.hpp:2465
double Restart(void)
Return time elapsed since first Start() or last Restart() call (in seconds).
Definition: ncbitime.hpp:2817
double Elapsed(void) const
Return time elapsed since first Start() or last Restart() call (in seconds).
Definition: ncbitime.hpp:2776
string AsString(const CTimeFormat &format=kEmptyStr, TSeconds out_tz=eCurrentTimeZone) const
Transform time to string.
Definition: ncbitime.cpp:1511
void Start(void)
Start the timer.
Definition: ncbitime.hpp:2765
const struct ncbi::grid::netcache::search::fields::KEY key
Defines the CNcbiApplication and CAppException classes for creating NCBI applications.
static string s_EndpointKeyName(TOSGEndpointKey k)
Definition: osg_mapper.cpp:137
static double s_DefaultInitialPenalty
Definition: osg_mapper.cpp:51
static double s_DefaultNormalizationInterval
Definition: osg_mapper.cpp:49
static double s_DefaultNegativeWeight
Definition: osg_mapper.cpp:48
static double s_DefaultPositiveWeight
Definition: osg_mapper.cpp:47
static bool s_DefaultsInitialized
Definition: osg_mapper.cpp:52
END_NCBI_NAMESPACE
Definition: osg_mapper.cpp:506
BEGIN_NCBI_NAMESPACE
Definition: osg_mapper.cpp:43
static double s_DefaultDecayRate
Definition: osg_mapper.cpp:50
BEGIN_NAMESPACE(psg)
END_NAMESPACE(osg)
#define PSG_WARNING(message)
#define _ASSERT
else result
Definition: token2.c:20
static wxAcceleratorEntry entries[3]
Modified on Sun Apr 14 05:29:06 2024 by modify_doxy.py rev. 669887