NCBI C++ ToolKit
netservice_api.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: netservice_api.cpp 100701 2023-08-31 19:21:12Z lavr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  */
30 
31 #include <ncbi_pch.hpp>
32 
33 #include "../ncbi_comm.h"
34 #include "../ncbi_lbsmd.h"
35 #include "../ncbi_servicep.h"
36 
37 #include "netservice_api_impl.hpp"
38 
43 
47 #include <connect/ncbi_localip.hpp>
48 
49 #include <corelib/ncbi_config.hpp>
50 #include <corelib/ncbi_message.hpp>
51 #include <corelib/ncbi_system.hpp>
52 
53 #include <util/random_gen.hpp>
54 #include <util/checksum.hpp>
55 
56 #include <deque>
57 
58 #define NCBI_USE_ERRCODE_X ConnServ_Connection
59 
60 #define LBSMD_PENALIZED_RATE_BOUNDARY -0.01
61 
63 
64 // The purpose of these classes is to execute commands suppressing possible errors and avoiding retries
66 {
68  m_Service(service)
69  {
71 
73  }
74 
76  {
78  }
79 
80 protected:
82 
83 private:
84  unsigned m_MaxRetries = 0;
85 };
86 
88 {
90  SNoRetry(service)
91  {
92  Set([](const string&, CNetServer) { return true; });
93  }
94 
96  {
97  Set(nullptr);
98  }
99 
100 private:
101  void Set(CNetService::TEventHandler error_handler)
102  {
103  m_Service->m_Listener->SetErrorHandler(error_handler);
104  }
105 };
106 
108 {
109  CNetService service(m_Service);
110 
111  if (!service)
112  return;
113 
114  // Before resetting the m_Service pointer, verify that no other object
115  // has acquired a reference to this server group object yet (between
116  // the time the reference counter went to zero, and the current moment
117  // when m_Service is about to be reset).
118  CFastMutexGuard discovery_mutex_lock(service->m_DiscoveryMutex);
119 
120  service = NULL;
121 
122  if (!Referenced() && m_Service) {
123  if (m_Service->m_DiscoveredServers != this) {
126  }
127  m_Service = NULL;
128  }
129 }
130 
132 {
133  return m_Impl->GetServer();
134 }
135 
137 {
138  auto& service = m_ServerGroup->m_Service;
140  return new SNetServerImpl(service, service->m_ServerPool->ReturnServer(m_Position->first));
141 }
142 
144 {
145  if (m_Impl->Next())
146  return true;
147 
148  m_Impl.Reset(NULL);
149  return false;
150 }
151 
153 {
154  if (m_Impl->Prev())
155  return true;
156 
157  m_Impl.Reset(NULL);
158  return false;
159 }
160 
162 {
163  return m_Impl->GetRate();
164 }
165 
167 {
168  return ++m_Position != m_ServerGroup->m_Servers.end();
169 }
170 
172 {
173  if (m_Position == m_ServerGroup->m_Servers.begin())
174  return false;
175  --m_Position;
176  return true;
177 }
178 
180 {
182 }
183 
185 static struct R : CRandom { R() { Randomize(); } } s_RandomIteratorGen;
186 
187 static CRandom::TValue
189 {
190  CFastMutexGuard guard(s_RndLock);
191  return s_RandomIteratorGen.GetRand(0, max_value);
192 }
193 
195  SDiscoveredServers* server_group_impl) :
196  SNetServiceIteratorImpl(server_group_impl,
197  server_group_impl->m_Servers.begin() + s_GetRand(
198  CRandom::TValue((server_group_impl->m_SuppressedBegin -
199  server_group_impl->m_Servers.begin()) - 1)))
200 {
201 }
202 
204 {
205  if (m_RandomIterators.empty()) {
206  TNetServerList::const_iterator it = m_ServerGroup->m_Servers.begin();
207  size_t number_of_servers = m_ServerGroup->m_SuppressedBegin - it;
208  if (number_of_servers <= 1)
209  return false; // There are no servers to advance to.
210  m_RandomIterators.reserve(number_of_servers);
211  m_RandomIterators.push_back(m_Position);
212  --number_of_servers;
213  do {
214  if (it != m_Position) {
215  m_RandomIterators.push_back(it);
216  --number_of_servers;
217  }
218  ++it;
219  } while (number_of_servers > 0);
220  // Shuffle m_RandomIterators starting from the element with index '1'.
221  if (m_RandomIterators.size() > 2) {
222  TRandomIterators::iterator rnt_it = m_RandomIterators.begin();
223  while (++rnt_it != m_RandomIterators.end())
225  CRandom::TValue(m_RandomIterators.size() - 1))]);
226  }
229  } else
230  if (++m_RandomIterator == m_RandomIterators.end())
231  return false;
232 
234 
235  return true;
236 }
237 
239 {
240  if (m_RandomIterators.empty() ||
242  return false;
243 
245 
246  return true;
247 }
248 
250 {
251  if (++m_Position == m_ServerGroup->m_Servers.end())
253  return m_Position != m_Pivot;
254 }
255 
257 {
258  if (m_Position == m_Pivot)
259  return false;
260  if (m_Position == m_ServerGroup->m_Servers.begin())
262  --m_Position;
263  return true;
264 }
265 
267  SDiscoveredServers* server_group_impl, Uint4 key_crc32) :
268  SNetServiceIteratorImpl(server_group_impl),
269  m_KeyCRC32(key_crc32)
270 {
271  TNetServerList::const_iterator server_list_iter(m_Position);
272 
273  if ((m_SingleServer =
274  (++server_list_iter == server_group_impl->m_SuppressedBegin)))
275  // Nothing to do if there's only one server.
276  return;
277 
278  // Find the server with the highest rank.
279  SServerRank highest_rank(x_GetServerRank(m_Position));
280 
281  do {
282  SServerRank server_rank(x_GetServerRank(server_list_iter));
283  if (highest_rank < server_rank)
284  highest_rank = server_rank;
285  // To avoid unnecessary memory allocations, do not save
286  // the calculated server ranks in hope that Next()
287  // will be called very rarely for this type of iterators.
288  } while (++server_list_iter != server_group_impl->m_SuppressedBegin);
289 
290  m_Position = highest_rank.m_ServerListIter;
291 }
292 
294 {
295  if (m_SingleServer)
296  return false;
297 
298  if (m_ServerRanks.empty()) {
299  TNetServerList::const_iterator server_list_iter(
300  m_ServerGroup->m_Servers.begin());
301  do
302  m_ServerRanks.push_back(x_GetServerRank(server_list_iter));
303  while (++server_list_iter != m_ServerGroup->m_SuppressedBegin);
304 
305  // Sort the ranks in *reverse* order.
306  sort(m_ServerRanks.rbegin(), m_ServerRanks.rend());
307 
308  // Skip the server with the highest rank, which was the first
309  // server returned by this iterator object.
310  m_CurrentServerRank = m_ServerRanks.begin() + 1;
311  } else if (++m_CurrentServerRank == m_ServerRanks.end())
312  return false;
313 
314  m_Position = m_CurrentServerRank->m_ServerListIter;
315  return true;
316 }
317 
319 {
320  if (m_SingleServer)
321  return false;
322 
323  _ASSERT(!m_ServerRanks.empty());
324 
325  if (m_CurrentServerRank == m_ServerRanks.begin())
326  return false;
327 
328  m_Position = (--m_CurrentServerRank)->m_ServerListIter;
329  return true;
330 }
331 
333  m_PropCreator(listener->GetPropCreator()),
334  m_EnforcedServer(0, 0),
335  m_MaxTotalTime(CTimeout::eInfinite),
336  m_UseOldStyleAuth(false)
337 {
338 }
339 
340 SNetServiceImpl::SNetServiceImpl(const string& api_name, const string& service_name, const string& client_name,
341  INetServerConnectionListener* listener, CSynRegistry& registry, const SRegSynonyms& sections) :
342  m_Listener(listener),
343  m_ServerPool(new SNetServerPoolImpl(listener)),
344  m_ServiceName(service_name),
345  m_RebalanceStrategy(registry, sections),
346  m_RoundRobin(0),
347  m_APIName(api_name),
348  m_ClientName(client_name)
349 {
350 }
351 
353  m_Listener(prototype->m_Listener->Clone()),
354  m_ServerPool(prototype->m_ServerPool),
355  m_ServiceName(server->m_Address.AsString()),
356  m_RebalanceStrategy(prototype->m_RebalanceStrategy),
357  m_RoundRobin(prototype->m_RoundRobin.load()),
358  m_APIName(prototype->m_APIName),
359  m_ClientName(prototype->m_ClientName),
360  m_UseSmartRetries(prototype->m_UseSmartRetries),
361  m_ConnectionMaxRetries(prototype->m_ConnectionMaxRetries),
362  m_ConnectionRetryDelay(prototype->m_ConnectionRetryDelay),
363  m_NetInfo(prototype->m_NetInfo)
364 {
365  Construct(server);
366 }
367 
368 SNetServiceImpl::SNetServiceImpl(const string& service_name, SNetServiceImpl* prototype) :
369  m_Listener(prototype->m_Listener->Clone()),
370  m_ServerPool(prototype->m_ServerPool),
371  m_ServiceName(service_name),
372  m_RebalanceStrategy(prototype->m_RebalanceStrategy),
373  m_RoundRobin(prototype->m_RoundRobin.load()),
374  m_APIName(prototype->m_APIName),
375  m_ClientName(prototype->m_ClientName),
376  m_UseSmartRetries(prototype->m_UseSmartRetries),
377  m_ConnectionMaxRetries(prototype->m_ConnectionMaxRetries),
378  m_ConnectionRetryDelay(prototype->m_ConnectionRetryDelay),
379  m_NetInfo(prototype->m_NetInfo)
380 {
381  Construct();
382 }
383 
385 {
388  CFastMutexGuard server_mutex_lock(m_ServerPool->m_ServerMutex);
389  m_DiscoveredServers->m_Servers.push_back(TServerRate(server, 1));
392 }
393 
395 {
396  if (!m_ServiceName.empty()) {
397  if (auto address = SSocketAddress::Parse(m_ServiceName)) {
398  Construct(m_ServerPool->FindOrCreateServerImpl(std::move(address)));
399  } else {
401  }
402  }
403 }
404 
406  const string& api_name, const string& service_name, const string& client_name,
408  CSynRegistry& registry, SRegSynonyms& sections, const string& ns_client_name)
409 {
410  CNetRef<SNetServiceImpl> rv(new SNetServiceImpl(api_name, service_name, client_name, listener, registry, sections));
411  rv->Init(registry, sections, ns_client_name);
412  return rv.Release();
413 }
414 
416 {
417  return new SNetServiceImpl(server, prototype);
418 }
419 
420 SNetServiceImpl* SNetServiceImpl::Clone(const string& service_name, SNetServiceImpl* prototype)
421 {
422  return new SNetServiceImpl(service_name, prototype);
423 }
424 
425 #ifdef NCBI_GRID_XSITE_CONN_SUPPORT
427 {
428  SNetServiceXSiteAPI::AllowXSiteConnections();
429 }
430 
432 {
433  return SNetServiceXSiteAPI::IsUsingXSiteProxy();
434 }
435 
436 static const char kXSiteFwd[] = "XSITEFWD";
437 
438 void SNetServiceXSiteAPI::AllowXSiteConnections()
439 {
440  const auto local_ip = CSocketAPI::GetLocalHostAddress();
441  const auto local_domain = GetDomain(local_ip);
442 
443  if (!local_domain) {
444  NCBI_THROW(CNetSrvConnException, eLBNameNotFound, "Cannot determine local domain");
445  }
446 
447  m_LocalDomain.store(local_domain);
448  m_AllowXSiteConnections.store(true);
449 }
450 
451 bool SNetServiceXSiteAPI::IsUsingXSiteProxy()
452 {
453  return m_AllowXSiteConnections.load();
454 }
455 
457 {
458  if (registry.Get({ "netservice_api", sections }, "allow_xsite_conn", false)) {
459  AllowXSiteConnections();
460  }
461 }
462 
465  const SSocketAddress& original, const string& service)
466 {
467  SSocketAddress actual(original);
468  _ASSERT(actual.port);
469  ticket_t ticket = 0;
470 
471  if (IsForeignAddr(actual.host)) {
472  union {
473  SFWDRequestReply rr;
474  char buf[FWD_RR_MAX_SIZE + 1];
475  };
476  memset(&rr, 0, sizeof(rr));
477 
478  rr.host = actual.host;
479  rr.port = SOCK_HostToNetShort(actual.port);
481 
482  auto text_max = sizeof(buf)-1 - offsetof(SFWDRequestReply,text);
483  auto text_len = service.size() ? min(service.size() + 1, text_max) : 0;
484  memcpy(rr.text, service.c_str(), text_len);
485 
486  size_t len = 0;
487 
488  CConn_ServiceStream svc(kXSiteFwd);
489  svc.rdbuf()->PUBSETBUF(0, 0); // quick way to make stream unbuffered
490  if (svc.write((const char*) &rr.ticket/*0*/, sizeof(rr.ticket)) &&
491  svc.write(buf, offsetof(SFWDRequestReply,text) + text_len)) {
492  svc.read(buf, sizeof(buf)-1);
493  len = (size_t) svc.gcount();
494  _ASSERT(len < sizeof(buf));
495  }
496 
497  memset(buf + len, 0, sizeof(buf) - len); // NB: terminates "text" field
498 
500  || (rr.flag & FWD_RR_ERRORMASK) || !rr.port) {
501  const char* err;
502  if (len == 0)
503  err = "Connection refused";
504  else if (len < offsetof(SFWDRequestReply,text))
505  err = "Short response received";
506  else if (!(rr.flag & FWD_RR_ERRORMASK))
507  err = rr.flag & FWD_RR_REJECTMASK
508  ? "Client rejected" : "Unknown error";
509  else if (memcmp(buf, "NCBI", 4) != 0)
510  err = rr.text[0] ? rr.text : "Unspecified error";
511  else
512  err = buf;
513  NCBI_THROW_FMT(CNetSrvConnException, eConnectionFailure,
514  "Error while acquiring auth ticket from"
515  " cross-site connection proxy "
516  << kXSiteFwd << ": " << err);
517  }
518 
519  if (rr.ticket) {
520  ticket = rr.ticket;
521  actual.host = rr.host;
522  actual.port = SOCK_NetToHostShort(rr.port);
523  } else {
524  SOCK sock;
525  EIO_Status io_st = CONN_GetSOCK(svc.GetCONN(), &sock);
526  if (sock)
527  io_st = SOCK_CreateOnTop(sock, 0, &sock);
528  _ASSERT(!sock == !(io_st == eIO_Success));
529  if (sock) {
530  // excess read data to return into sock
531  text_len = strlen(rr.text) + 1/*'\0'-terminated*/;
532  if (text_len > text_max)
533  text_len = text_max;
534  text_len += offsetof(SFWDRequestReply,text);
535  _ASSERT(text_len <= len);
536  io_st = SOCK_Pushback(sock, buf + text_len, len - text_len);
537  }
538  if (io_st != eIO_Success) {
539  SOCK_Destroy(sock);
540  const char* err = IO_StatusStr(io_st);
541  NCBI_THROW_FMT(CNetSrvConnException, eConnectionFailure,
542  "Error while tunneling through proxy "
543  << kXSiteFwd << ": " << err);
544  }
546  actual.port = 0;
547  }
548  }
549 
550  if (actual.port) {
551  SNetServerImpl::ConnectImpl(socket, deadline, actual, original);
552  }
553 
554  if (ticket && socket.Write(&ticket, sizeof(ticket)) != eIO_Success) {
555  NCBI_THROW(CNetSrvConnException, eConnectionFailure,
556  "Error while sending proxy auth ticket");
557  }
558 }
559 
560 int SNetServiceXSiteAPI::GetDomain(unsigned int ip)
561 {
562  TNCBI_IPv6Addr addr;
563  NcbiIPv4ToIPv6(&addr, ip, 0);
564 
566  NcbiIsLocalIPEx(&addr, &info);
567 
568  return info.num;
569 }
570 
571 bool SNetServiceXSiteAPI::IsForeignAddr(unsigned int ip)
572 {
573  if (!IsUsingXSiteProxy()) return false;
574 
575  const auto d = GetDomain(ip);
576  return d && (d != m_LocalDomain);
577 }
578 
579 atomic<int> SNetServiceXSiteAPI::m_LocalDomain{0};
580 atomic<bool> SNetServiceXSiteAPI::m_AllowXSiteConnections{false};
581 
582 #else
583 
585 {
586 }
587 
590  const SSocketAddress& original, const string&)
591 {
592  SNetServerImpl::ConnectImpl(socket, deadline, original, original);
593 }
594 
595 #endif
596 
597 void SNetServiceImpl::Init(CSynRegistry& registry, SRegSynonyms& sections, const string& ns_client_name)
598 {
599  // Initialize the connect library and LBSM structures
600  // used in DiscoverServersIfNeeded().
601  {
602  class CInPlaceConnIniter : protected CConnIniter
603  {
604  public:
605  void NoOp() {}
606  } conn_initer;
607  conn_initer.NoOp();
608  }
609 
611 
612  // TODO:
613  // Do not check for emptiness and always read values (client, service, etc) from registry
614  // after values provided in ctors get into an underlying memory registry.
615 
616  // Do not override explicitly set client name
617  if (m_ClientName.empty()) m_ClientName = registry.Get(sections, { "client_name", "client" }, "");
618 
619  // Use client name from NetSchedule API if it's not provided for NetCache API
620  if (m_ClientName.empty()) m_ClientName = ns_client_name;
621 
622  if (m_ServiceName.empty()) {
623  m_ServiceName = registry.Get(sections, { "service", "service_name" }, "");
624 
625  if (m_ServiceName.empty()) {
626  string host = registry.Get(sections, { "server", "host" }, "");
627  string port = registry.Get(sections, "port", "");
628 
629  if (!host.empty() && !port.empty()) m_ServiceName = host + ":" + port;
630  }
631  }
632 
633  InitXSite(registry, sections);
634 
635  m_UseSmartRetries = registry.Get(sections, "smart_service_retries", true);
636 
637  int max_retries = registry.Get({ sections, "netservice_api" }, "connection_max_retries", CONNECTION_MAX_RETRIES);
638  m_ConnectionMaxRetries = max_retries >= 0 ? max_retries : CONNECTION_MAX_RETRIES;
639 
640  double retry_delay = registry.Get({ sections, "netservice_api" }, "retry_delay", RETRY_DELAY_DEFAULT);
641  if (retry_delay < 0) retry_delay = RETRY_DELAY_DEFAULT;
642  m_ConnectionRetryDelay = static_cast<unsigned long>(retry_delay * kMilliSecondsPerSecond);
643 
644  if (m_ClientName.empty() || m_ClientName == "noname" ||
645  NStr::FindNoCase(m_ClientName, "unknown") != NPOS) {
647  if (!app) {
648  NCBI_THROW_FMT(CArgException, eNoValue,
649  m_APIName << ": client name is not set");
650  }
652  }
653 
654  m_ServerPool->Init(registry, sections);
655 
656  Construct();
657 }
658 
660 {
661  const auto kConnTimeoutDefault = 2.0;
662  const auto kCommTimeoutDefault = 12.0;
663  const auto kFirstServerTimeoutDefault = 0.0;
664 
665  m_LBSMAffinity.first = registry.Get(sections, "use_lbsm_affinity", "");
666 
667  // Get affinity value from the local LBSM configuration file.
668  if (!m_LBSMAffinity.first.empty()) {
670  }
671 
672  double conn_timeout = registry.Get(sections, "connection_timeout", kConnTimeoutDefault);
673  g_CTimeoutToSTimeout(CTimeout(conn_timeout > 0 ? conn_timeout : kConnTimeoutDefault), m_ConnTimeout);
674 
675  double comm_timeout = registry.Get({ sections, "netservice_api" }, "communication_timeout", kCommTimeoutDefault);
676  g_CTimeoutToSTimeout(CTimeout(comm_timeout > 0 ? comm_timeout : kCommTimeoutDefault), m_CommTimeout);
677 
678  double first_srv_timeout = registry.Get(sections, "first_server_timeout", kFirstServerTimeoutDefault);
679  g_CTimeoutToSTimeout(CTimeout(first_srv_timeout > 0 ? first_srv_timeout : kFirstServerTimeoutDefault), m_FirstServerTimeout);
680 
681  double max_total_time = registry.Get(sections, "max_connection_time", 0.0);
682  if (max_total_time > 0) m_MaxTotalTime = CTimeout(max_total_time);
683 
684  m_ThrottleParams.Init(registry, sections);
685 }
686 
688  unsigned discovery_iteration)
689 {
690  if (m_ServerGroupPool == NULL)
691  return new SDiscoveredServers(discovery_iteration);
692  else {
693  SDiscoveredServers* server_group = m_ServerGroupPool;
694  m_ServerGroupPool = server_group->m_NextGroupInPool;
695 
696  server_group->Reset(discovery_iteration);
697 
698  return server_group;
699  }
700 }
701 
703 {
704  string auth;
705  auth.reserve(256);
706 
707  auth += "client=\"";
709  auth += '\"';
710 
713  auth += " svc=\"";
715  auth += '\"';
716  }
717 
719  if (app) {
720  auth += " client_path=\"";
722  auth += '\"';
723  }
724  }
725 
726  return auth;
727 }
728 
729 const string& CNetService::GetServiceName() const
730 {
731  return m_Impl->m_ServiceName;
732 }
733 
735 {
736  return m_Impl->m_ServerPool;
737 }
738 
740 {
741  return m_Impl->IsLoadBalanced();
742 }
743 
745 {
746  m_Impl->m_EnforcedServer = std::move(address);
747 }
748 
749 void CNetService::PrintCmdOutput(const string& cmd,
750  CNcbiOstream& output_stream, CNetService::ECmdOutputStyle output_style,
751  CNetService::EIterationMode iteration_mode)
752 {
753  bool load_balanced = IsLoadBalanced() ?
754  output_style != eMultilineOutput_NoHeaders : false;
755 
756  for (CNetServiceIterator it = Iterate(iteration_mode); it; ++it) {
757  if (load_balanced)
758  output_stream << '[' << (*it).GetServerAddress() << ']' << endl;
759 
760  switch (output_style) {
761  case eSingleLineOutput:
762  output_stream << (*it).ExecWithRetry(cmd, false).response << endl;
763  break;
764 
765  case eUrlEncodedOutput:
766  {
767  CUrlArgs url_parser((*it).ExecWithRetry(cmd, false).response);
768 
769  ITERATE(CUrlArgs::TArgs, field, url_parser.GetArgs()) {
770  output_stream << field->name <<
771  ": " << field->value << endl;
772  }
773  }
774  break;
775 
776  default:
777  {
779  (*it).ExecWithRetry(cmd, true));
780 
781  if (output_style == eMultilineOutput_NetCacheStyle)
782  output->SetNetCacheCompatMode();
783 
784  string line;
785 
786  while (output.ReadLine(line))
787  output_stream << line << endl;
788  }
789  }
790 
791  if (load_balanced)
792  output_stream << endl;
793  }
794 }
795 
797  SSocketAddress server_address)
798 {
799  pair<TNetServerByAddress::iterator, bool> loc(m_Servers.insert(
800  TNetServerByAddress::value_type(server_address,
801  (SNetServerInPool*) NULL)));
802 
803  if (!loc.second)
804  return loc.first->second;
805 
806  auto* server = new SNetServerInPool(std::move(server_address), m_PropCreator(), m_ThrottleParams);
807 
808  loc.first->second = server;
809 
810  return server;
811 }
812 
814  SNetServerInPool* server_impl)
815 {
816  CFastMutexGuard server_mutex_lock(m_ServerMutex);
817 
818  server_impl->m_ServerPool = this;
819  return CRef<SNetServerInPool>(server_impl);
820 }
821 
823 {
824  CFastMutexGuard server_mutex_lock(m_ServerMutex);
825 
826  auto* server = FindOrCreateServerImpl(m_EnforcedServer.host == 0 ? std::move(server_address) : m_EnforcedServer);
827  server->m_ServerPool = this;
828 
829  return new SNetServerImpl(service, server);
830 }
831 
833 {
835  return m_ServerPool->GetServer(this, std::move(server_address));
836 }
837 
839  unsigned short port)
840 {
841  return m_Impl->GetServer(SSocketAddress(host, port));
842 }
843 
844 CNetServer CNetService::GetServer(unsigned host, unsigned short port)
845 {
846  return m_Impl->GetServer(SSocketAddress(host, port));
847 }
848 
850 {
851 public:
853  m_Service(service)
854  {
855  }
856 
857  virtual CNetServer BeginIteration();
858  virtual CNetServer NextServer();
859 
860 private:
863 };
864 
866 {
868 }
869 
871 {
872  return ++m_Iterator ? *m_Iterator : CNetServer();
873 }
874 
876  bool multiline_output)
877 {
878  switch (m_ServiceType) {
879  default: // eServiceNotDefined
880  NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
881  m_APIName << ": service name is not set");
882 
884  {
885  CNetServer::SExecResult exec_result;
886 
887  SRandomServiceTraversal random_traversal(this);
888 
889  IterateUntilExecOK(cmd, multiline_output,
890  exec_result, &random_traversal,
892 
893  return exec_result;
894  }
895 
897  {
898  CNetServer server(new SNetServerImpl(this,
900  m_DiscoveredServers->m_Servers.front().first)));
901 
902  return server.ExecWithRetry(cmd, multiline_output);
903  }
904  }
905 }
906 
908  bool multiline_output)
909 {
910  return m_Impl->FindServerAndExec(cmd, multiline_output);
911 }
912 
914 {
915  for (CNetServiceIterator it = Iterate(eIncludePenalized); it; ++it)
916  (*it).ExecWithRetry(cmd, false);
917 }
918 
920 {
922  NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
923  m_APIName << ": service name is not set");
924  }
925 
927  // The service is load-balanced, check if rebalancing is required.
931 
932  if (m_DiscoveredServers == NULL ||
935  // The requested server group either does not exist or
936  // does not contain up-to-date server list, thus it needs
937  // to be created anew.
938 
941 
943  TServConn_MaxFineLBNameRetries::GetDefault(), m_ConnectionRetryDelay);
944 
945  SDiscoveredServers* server_group = m_DiscoveredServers;
946 
947  if (server_group != NULL && !server_group->m_Service)
948  server_group->Reset(m_LatestDiscoveryIteration);
949  else
950  // Either the group does not exist or it has been
951  // "issued" to the outside callers; allocate a new one.
952  server_group = m_DiscoveredServers =
954 
955  CFastMutexGuard server_mutex_lock(m_ServerPool->m_ServerMutex);
956 
957  TNetServerList& servers = server_group->m_Servers;
958  TNetServerList::size_type number_of_regular_servers = 0;
959  TNetServerList::size_type number_of_standby_servers = 0;
960  double max_standby_rate = LBSMD_PENALIZED_RATE_BOUNDARY;
961 
962  // Fill the 'servers' array in accordance with the layout
963  // described above the SDiscoveredServers::m_Servers declaration.
964  for (const auto& d : discovered) {
966  server->m_ThrottleStats.Discover();
967 
968  TServerRate server_rate(server, d.second);
969 
970  if (d.second > 0)
971  servers.insert(servers.begin() +
972  number_of_regular_servers++, server_rate);
973  else if (d.second < max_standby_rate ||
974  d.second <= LBSMD_PENALIZED_RATE_BOUNDARY)
975  servers.push_back(server_rate);
976  else {
977  servers.insert(servers.begin() +
978  number_of_regular_servers, server_rate);
979  if (d.second == max_standby_rate)
980  ++number_of_standby_servers;
981  else {
982  max_standby_rate = d.second;
983  number_of_standby_servers = 1;
984  }
985  }
986  }
987 
988  server_group->m_SuppressedBegin = servers.begin() +
989  (number_of_regular_servers > 0 ?
990  number_of_regular_servers : number_of_standby_servers);
991 
992  server_mutex_lock.Release();
993  }
994  }
995 }
996 
998  CRef<SDiscoveredServers>& discovered_servers)
999 {
1000  CFastMutexGuard discovery_mutex_lock(m_DiscoveryMutex);
1002  discovered_servers = m_DiscoveredServers;
1003  discovered_servers->m_Service = this;
1004 }
1005 
1007 {
1008  CRef<SDiscoveredServers> servers;
1009  GetDiscoveredServers(servers);
1010 
1011  // Find the requested server among the discovered servers.
1012  ITERATE(TNetServerList, it, servers->m_Servers) {
1013  if (it->first == server->m_ServerInPool)
1014  return true;
1015  }
1016 
1017  return false;
1018 }
1019 
1020 struct SFailOnlyWarnings : deque<pair<string, CNetServer>>
1021 {
1024 
1026  {
1027  for (auto& w : *this) {
1028  m_Listener->OnWarning(w.first, w.second);
1029  }
1030 
1031  clear();
1032  }
1033 
1034 private:
1036 };
1037 
1039  bool multiline_output,
1040  CNetServer::SExecResult& exec_result,
1041  IServiceTraversal* service_traversal,
1043 {
1044  int retry_count = m_ConnectionMaxRetries;
1045  const CTimeout& max_total_time = m_ServerPool->m_MaxTotalTime;
1046  CDeadline deadline(max_total_time);
1047 
1048  enum EIterationMode {
1049  eInitialIteration,
1050  eRetry
1051  } iteration_mode = eInitialIteration;
1052  CNetServer server = service_traversal->BeginIteration();
1053 
1054  vector<CNetServer> servers_to_retry;
1055  unsigned current_server = 0;
1056 
1057  bool skip_server;
1058 
1059  unsigned number_of_servers = 0;
1060  unsigned ns_with_submits_disabled = 0;
1061  unsigned servers_throttled = 0;
1062  bool blob_not_found = false;
1063 
1064  const auto& fst = m_ServerPool->m_FirstServerTimeout;
1065  const bool use_fst = (fst.sec || fst.usec) && (retry_count > 0 || m_UseSmartRetries);
1066  const STimeout* timeout = use_fst ? &fst : nullptr;
1067 
1068  SFailOnlyWarnings fail_only_warnings(m_Listener);
1069 
1070  for (;;) {
1071  skip_server = false;
1072 
1073  try {
1074  server->ConnectAndExec(cmd, multiline_output, exec_result,
1075  timeout);
1076  fail_only_warnings.clear();
1077  return;
1078  }
1079  catch (CNetCacheBlobTooOldException& /*ex rethrown*/) {
1080  throw;
1081  }
1082  catch (CNetCacheException& ex) {
1083  if (retry_count <= 0 && !m_UseSmartRetries)
1084  throw;
1085  if (error_handling == eRethrowAllServerErrors)
1086  throw;
1088  blob_not_found = true;
1089  skip_server = true;
1090  } else if (error_handling == eRethrowServerErrors)
1091  throw;
1092  else
1093  m_Listener->OnWarning(ex.GetMsg(), server);
1094  }
1095  catch (CNetScheduleException& ex) {
1096  if (retry_count <= 0 && !m_UseSmartRetries)
1097  throw;
1098  if (error_handling == eRethrowAllServerErrors)
1099  throw;
1101  ++ns_with_submits_disabled;
1102  skip_server = true;
1103  fail_only_warnings.emplace_back(ex.GetMsg(), server);
1104  } else if (error_handling == eRethrowServerErrors)
1105  throw;
1106  else
1107  m_Listener->OnWarning(ex.GetMsg(), server);
1108  }
1109  catch (CNetSrvConnException& ex) {
1110  if (retry_count <= 0 && !m_UseSmartRetries)
1111  throw;
1112  switch (ex.GetErrCode()) {
1115  m_Listener->OnWarning(ex.GetMsg(), server);
1116  break;
1117 
1119  ++servers_throttled;
1120  fail_only_warnings.emplace_back(ex.GetMsg(), server);
1121  break;
1122 
1123  default:
1124  throw;
1125  }
1126  }
1127 
1128  ++number_of_servers;
1129 
1130  if (iteration_mode == eInitialIteration) {
1131  if (!skip_server)
1132  servers_to_retry.push_back(server);
1133  server = service_traversal->NextServer();
1134  } else {
1135  if (!skip_server)
1136  ++current_server;
1137  else
1138  servers_to_retry.erase(servers_to_retry.begin() +
1139  current_server);
1140 
1141  if (current_server < servers_to_retry.size())
1142  server = servers_to_retry[current_server];
1143  else
1144  server = NULL;
1145  }
1146 
1147  if (!blob_not_found && !deadline.IsInfinite() &&
1148  deadline.GetRemainingTime().GetAsMilliSeconds() <= (server ? 0 : m_ConnectionRetryDelay)) {
1149  NCBI_THROW_FMT(CNetSrvConnException, eReadTimeout, "Exceeded max_connection_time=" <<
1150  max_total_time.GetAsMilliSeconds() << "; cmd=[" << cmd << "]");
1151  }
1152 
1153  if (!server) {
1154  if (number_of_servers == ns_with_submits_disabled) {
1155  NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
1156  "Cannot execute [" << cmd <<
1157  "]: all NetSchedule servers are "
1158  "in REFUSESUBMITS mode for the " + m_ServiceName + " service.");
1159  }
1160 
1161  if (number_of_servers == servers_throttled) {
1162  NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
1163  "Cannot execute [" << cmd <<
1164  "]: all servers are throttled for the " + m_ServiceName + " service.");
1165  }
1166 
1167  if (retry_count <= 0 || servers_to_retry.empty()) {
1168  if (blob_not_found) {
1169  NCBI_THROW_FMT(CNetCacheException, eBlobNotFound,
1170  "Cannot execute [" << cmd << "]: blob not found.");
1171  }
1172  NCBI_THROW_FMT(CNetSrvConnException, eSrvListEmpty,
1173  "Unable to execute [" << cmd <<
1174  "] on any of the discovered servers for the " + m_ServiceName + " service.");
1175  }
1176 
1177  fail_only_warnings.IssueAndClear();
1178  ERR_POST(Warning << "Unable to send [" << cmd << "] to any "
1179  "of the discovered servers; will retry after delay.");
1180 
1182 
1183  number_of_servers = 0;
1184  ns_with_submits_disabled = 0;
1185  servers_throttled = 0;
1186 
1187  iteration_mode = eRetry;
1188  server = servers_to_retry[current_server = 0];
1189  }
1190 
1191  --retry_count;
1192 
1193  timeout = NULL;
1194  }
1195 }
1196 
1198 {
1199  switch (type)
1200  {
1201  case SRetry::eNoRetryNoErrors: return make_shared<SNoRetryNoErrors>(this);
1202  case SRetry::eNoRetry: return make_shared<SNoRetry>(this);
1203  default: return nullptr;
1204  }
1205 }
1206 
1208 {
1209  CFastMutexGuard server_mutex_lock(m_ServerMutex);
1210 
1212  it->second->m_CurrentConnectionGeneration.Add(1);
1213  }
1214 }
1215 
1217 {
1218  // Clean up m_Servers
1220  delete it->second;
1221  }
1222 
1223  if (m_LBSMAffinity.second) free(const_cast<char*>(m_LBSMAffinity.second));
1224 }
1225 
1227 {
1228  delete m_DiscoveredServers;
1229 
1230  // Clean up m_ServerGroupPool
1231  SDiscoveredServers* server_group = m_ServerGroupPool;
1232  while (server_group != NULL) {
1233  SDiscoveredServers* next_group = server_group->m_NextGroupInPool;
1234  delete server_group;
1235  server_group = next_group;
1236  }
1237 }
1238 
1240 {
1241  m_Impl->m_CommTimeout = to;
1242 }
1244 {
1245  return m_Impl->m_CommTimeout;
1246 }
1247 
1249 {
1250  CRef<SDiscoveredServers> servers;
1251  m_Impl->GetDiscoveredServers(servers);
1252 
1253  if (mode != eIncludePenalized) {
1254  if (servers->m_Servers.begin() < servers->m_SuppressedBegin) {
1255  if (mode == eSortByLoad)
1256  return new SNetServiceIterator_OmitPenalized(servers);
1257  else if (mode == eRoundRobin) {
1258  auto begin = servers->m_Servers.begin() + m_Impl->m_RoundRobin++ % servers->m_Servers.size();
1259  return new SNetServiceIterator_Circular(servers, begin);
1260  } else
1261  return new SNetServiceIterator_RandomPivot(servers);
1262  }
1263  } else
1264  if (!servers->m_Servers.empty())
1265  return new SNetServiceIteratorImpl(servers);
1266 
1267  NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
1268  "Couldn't find any available servers for the " +
1269  m_Impl->m_ServiceName + " service.");
1270 }
1271 
1273 {
1274  return m_Impl->Iterate(priority_server);
1275 }
1276 
1278 {
1279  CRef<SDiscoveredServers> servers;
1280  GetDiscoveredServers(servers);
1281 
1282  // Find the requested server among the discovered servers.
1283  ITERATE(TNetServerList, it, servers->m_Servers) {
1284  if (it->first == priority_server->m_ServerInPool)
1285  return new SNetServiceIterator_Circular(servers, it);
1286  }
1287 
1288  if (!servers->m_Servers.empty())
1289  // The requested server is not found in this service,
1290  // however there are servers, so return them.
1291  return new SNetServiceIteratorImpl(servers);
1292 
1293  NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
1294  "Couldn't find any available servers for the " +
1295  m_ServiceName + " service.");
1296 }
1297 
1299 {
1300  CRef<SDiscoveredServers> servers;
1301  m_Impl->GetDiscoveredServers(servers);
1302 
1303  if (servers->m_Servers.begin() == servers->m_SuppressedBegin) {
1304  NCBI_THROW(CNetSrvConnException, eSrvListEmpty,
1305  "Couldn't find any available servers for the " +
1306  m_Impl->m_ServiceName + " service.");
1307  }
1308 
1309  CChecksum key_crc32(CChecksum::eCRC32);
1310 
1311  key_crc32.AddChars(key.data(), key.length());
1312 
1313  return new SNetServiceIterator_Weighted(servers, key_crc32.GetChecksum());
1314 }
1315 
1317 {
1318  CRef<SDiscoveredServers> servers;
1319  m_Impl->GetDiscoveredServers(servers);
1320 
1321  // Find the requested server among the discovered servers.
1322  ITERATE(TNetServerList, it, servers->m_Servers) {
1323  if (it->first == server->m_ServerInPool) {
1324  // The server is found. Make an iterator and
1325  // skip to the next server (the iterator may become NULL).
1326  CNetServiceIterator circular_iter(
1327  new SNetServiceIterator_Circular(servers, it));
1328  return ++circular_iter;
1329  }
1330  }
1331 
1332  // The requested server is not found in this service, so
1333  // return the rest of servers or NULL if there are none.
1334  return !servers->m_Servers.empty() ?
1335  new SNetServiceIteratorImpl(servers) : NULL;
1336 }
1337 
1340 {
1341  string error_messages;
1342 
1344 
1345  for (; it; ++it) {
1346  try {
1347  if (finder->Consider(*it))
1348  break;
1349  }
1350  catch (CNetServiceException& ex) {
1352  throw;
1353 
1354  if (!error_messages.empty())
1355  error_messages += '\n';
1356 
1357  error_messages += (*it)->m_ServerInPool->m_Address.AsString();
1358  error_messages += ": ";
1359  error_messages += ex.what();
1360  }
1361  catch (CIO_Exception& ex) {
1362  if (!error_messages.empty())
1363  error_messages += '\n';
1364 
1365  error_messages += (*it)->m_ServerInPool->m_Address.AsString();
1366  error_messages += ": ";
1367  error_messages += ex.what();
1368  }
1369  }
1370 
1371  if (!error_messages.empty()) {
1372  LOG_POST(error_messages);
1373  }
1374 
1375  return it;
1376 }
1377 
1378 CNetService CNetService::Clone(const string& name)
1379 {
1380  _ASSERT(m_Impl);
1381  return name == m_Impl->m_ServiceName ? m_Impl :
1383 }
1384 
1386 {
1387  m_Impl->m_Listener->SetErrorHandler(error_handler);
1388 }
1389 
1391 {
1392  m_Impl->m_Listener->SetWarningHandler(warning_handler);
1393 }
1394 
1396  SNetServiceImpl* prototype)
1397 {
1399  return GetServiceByNameImpl(service_name, prototype);
1400 }
1401 
1403  SNetServiceImpl* prototype)
1404 {
1405  pair<TNetServiceByName::iterator, bool> loc(m_ServiceByName.insert(
1406  TNetServiceByName::value_type(service_name, CNetService())));
1407 
1408  return !loc.second ? loc.first->second :
1409  (loc.first->second =
1410  SNetServiceImpl::Clone(service_name, prototype));
1411 }
1412 
1413 bool SNetServiceMap::IsAllowed(const string& service_name) const
1414 {
1415  // Not restricted or found
1416  return !m_Restricted || m_Allowed.count(service_name);
1417 }
1418 
1420  SNetServiceImpl* prototype)
1421 {
1422  if (!m_Restricted) return true;
1423 
1425 
1426  // Check if server belongs to some allowed service
1427  for (auto& service_name: m_Allowed) {
1428  CNetService service(GetServiceByNameImpl(service_name, prototype));
1429 
1430  if (service->IsInService(server)) return true;
1431  }
1432 
1433  return false;
1434 }
1435 
1436 void SNetServiceMap::AddToAllowed(const string& service_name)
1437 {
1438  m_Allowed.insert(service_name);
1439 }
1440 
1442  CNetService::EIterationMode iteration_mode)
1443 {
1444  if (!service.IsLoadBalanced())
1445  return exec_to_json.ExecOn(service.Iterate().GetServer());
1446 
1448 
1449  for (CNetServiceIterator it = service.Iterate(iteration_mode); it; ++it)
1450  result.SetByKey((*it).GetServerAddress(), exec_to_json.ExecOn(*it));
1451 
1452  return result;
1453 }
1454 
1455 CNetService CNetService::Create(const string& api_name, const string& service_name, const string& client_name)
1456 {
1457  struct SNoOpConnectionListener : public INetServerConnectionListener
1458  {
1459  INetServerConnectionListener* Clone() override { return new SNoOpConnectionListener(*this); }
1460  void OnConnected(CNetServerConnection&) override {}
1461 
1462  private:
1463  void OnErrorImpl(const string&, CNetServer&) override {}
1464  void OnWarningImpl(const string&, CNetServer&) override {}
1465  };
1466 
1467  CSynRegistryBuilder registry_builder;
1468  SRegSynonyms sections(api_name);
1469 
1470  return SNetServiceImpl::Create(api_name, service_name, client_name, new SNoOpConnectionListener,
1471  registry_builder, sections);
1472 }
1473 
1475 {
1478  g_AppendHitID(cmd, req);
1479 }
1480 
Checksum and hash calculation classes.
CArgException –.
Definition: ncbiargs.hpp:120
CChecksum – Checksum calculator.
Definition: checksum.hpp:302
Helper hook-up class that installs default logging/registry/locking (but only if they have not yet be...
This stream exchanges data with a named service, in a constraint that the service is implemented as o...
CDeadline.
Definition: ncbitime.hpp:1830
void Release()
Manually force the resource to be released.
Definition: guard.hpp:166
I/O exception.
JSON node abstraction.
static CJsonNode NewObjectNode()
Create a new JSON object node.
Exception thrown when the requested blob is older than the requested age.
NetCache internal exception.
NetSchedule internal exception.
void StickToServer(SSocketAddress address)
CNetRef< SNetServerPoolImpl > m_Impl
void SetCommunicationTimeout(const STimeout &to)
const STimeout & GetCommunicationTimeout() const
SExecResult ExecWithRetry(const string &cmd, bool multiline_output=false)
Execute remote command 'cmd', wait for the reply, check if it starts with 'OK:', and return the remai...
Net Service exception.
double GetRate() const
CNetRef< SNetServiceIteratorImpl > m_Impl
function< bool(const string &, CNetServer)> TEventHandler
void SetErrorHandler(TEventHandler error_handler)
CNetRef< SNetServiceImpl > m_Impl
CNetServiceIterator FindServer(INetServerFinder *finder, EIterationMode mode=eSortByLoad)
const string & GetServiceName() const
static bool IsUsingXSiteProxy()
CNetServer GetServer(unsigned host, unsigned short port)
void PrintCmdOutput(const string &cmd, CNcbiOstream &output_stream, ECmdOutputStyle output_style, CNetService::EIterationMode=CNetService::eSortByLoad)
CNetServiceIterator ExcludeServer(CNetServer::TInstance server)
Start iteration excluding 'server' (return the next server after 'server' or NULL).
@ eMultilineOutput_NetCacheStyle
void ExecOnAllServers(const string &cmd)
CNetServiceIterator IterateByWeight(const string &key)
static CNetService Create(const string &api_name, const string &service_name, const string &client_name)
CNetServer::SExecResult FindServerAndExec(const string &cmd, bool multiline_output=false)
void SetWarningHandler(TEventHandler warning_handler)
bool IsLoadBalanced() const
CNetServerPool GetServerPool()
CNetServiceIterator Iterate(EIterationMode mode=eSortByLoad)
CNetService Clone(const string &name)
static void AllowXSiteConnections()
Net Service exception.
CRandom::
Definition: random_gen.hpp:66
CRef –.
Definition: ncbiobj.hpp:618
static TServers DiscoverImpl(const string &, unsigned, shared_ptr< void > &, pair< string, const char * >, int, unsigned long)
CSocket::
CTimeout – Timeout interval.
Definition: ncbitime.hpp:1693
CUrlArgs::
Definition: ncbi_url.hpp:240
This class is for use by the grid_cli utility only.
virtual CJsonNode ExecOn(CNetServer server)=0
virtual bool Consider(CNetServer server)=0
virtual CNetServer BeginIteration()=0
virtual CNetServer NextServer()=0
SRandomServiceTraversal(CNetService::TInstance service)
virtual CNetServer NextServer()
CNetServiceIterator m_Iterator
virtual CNetServer BeginIteration()
iterator_bool insert(const value_type &val)
Definition: map.hpp:165
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
static const char ip[]
Definition: des.c:75
static CS_COMMAND * cmd
Definition: ct_dynamic.c:26
#define false
Definition: bool.h:36
static SQLCHAR output[256]
Definition: print.c:5
static const struct type types[]
Definition: type.c:22
const string & GetProgramExecutablePath(EFollowLinks follow_links=eIgnoreLinks) const
Get the application's executable path.
static CNcbiApplicationGuard InstanceGuard(void)
Singleton method.
Definition: ncbiapp.cpp:133
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
const string & GetProgramDisplayName(void) const
Get the application's "display" name.
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
void swap(NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair1, NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair2)
Definition: ncbimisc.hpp:1508
@ eTakeOwnership
An object can take ownership of another.
Definition: ncbi_types.h:136
#define NULL
Definition: ncbistd.hpp:225
Uint4 GetChecksum(void) const
Return calculated checksum.
Definition: checksum.hpp:341
void AddChars(const char *str, size_t len)
Update current control sum with data provided.
Definition: checksum.hpp:602
EIO_Status CONN_GetSOCK(CONN conn, SOCK *sock)
Get an underlying SOCK handle for connection that is implemented as a socket.
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
Definition: ncbidiag.cpp:1901
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
const string & GetMsg(void) const
Get message string.
Definition: ncbiexpt.cpp:461
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
Definition: ncbiexpt.hpp:719
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
Definition: ncbiexpt.cpp:342
@ eBlobNotFound
Access denied.
bool Referenced(void) const THROWS_NONE
Check if object is referenced.
Definition: ncbiobj.hpp:468
TObjectType * Release(void)
Release a reference to the object and return a pointer to the object.
Definition: ncbiobj.hpp:846
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
Uint4 TValue
Type of the generated integer value and/or the seed value.
Definition: random_gen.hpp:69
TValue GetRand(void)
Get the next random number in the interval [0..GetMax()] (inclusive)
Definition: random_gen.hpp:238
void Randomize(void)
Re-initialize (re-seed) the generator using platform-specific randomization.
Definition: random_gen.cpp:267
virtual const string & Get(const string &section, const string &name, TFlags flags=0) const
Get the parameter value.
Definition: ncbireg.cpp:262
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
unsigned int TSERV_Type
Bitwise OR of ESERV_Type[Special].
Definition: ncbi_service.h:94
#define SERV_LOCALHOST
Special values for the "preferred_host" parameter.
Definition: ncbi_service.h:54
@ fSERV_Standalone
@ fSERV_IncludeSuppressed
Definition: ncbi_service.h:89
@ fSERV_IncludeStandby
Definition: ncbi_service.h:87
@ fSERV_IncludeReserved
Definition: ncbi_service.h:88
#define SOCK_Destroy(s)
Definition: ncbi_socket.h:875
#define SOCK_NetToHostShort
Definition: ncbi_socket.h:2106
unsigned short SOCK_HostToNetShort(unsigned short value)
See man for the BSDisms, htonl() and htons().
Definition: ncbi_socket.c:8742
void Reset(SOCK sock, EOwnership if_to_own, ECopyTimeout whence)
Close the current underlying "SOCK" (if any, and if owned), and from now on use "sock" as the underly...
EIO_Status SOCK_Pushback(SOCK sock, const void *data, size_t size)
Push the specified data back to the socket's input queue (in the socket's internal read buffer).
Definition: ncbi_socket.c:7439
EIO_Status SOCK_CreateOnTop(const void *handle, size_t handle_size, SOCK *sock)
[SERVER-side] Create a socket on top of a "handle".
Definition: ncbi_socket.c:6730
EIO_Status Write(const void *buf, size_t size, size_t *n_written=0, EIO_WriteMethod how=eIO_WritePersist)
Write to socket.
static unsigned int GetLocalHostAddress(ESwitch reget=eDefault)
Local host address in network byte order (cached for faster retrieval)
@ eCopyTimeoutsToSOCK
Definition: ncbi_socket.hpp:53
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
Definition: ncbistr.cpp:3953
static SIZE_TYPE FindNoCase(const CTempString str, const CTempString pattern, SIZE_TYPE start, SIZE_TYPE end, EOccurrence which=eFirst)
Find the pattern in the specified range of a string using a case insensitive search.
Definition: ncbistr.cpp:2993
#define NPOS
Definition: ncbistr.hpp:133
static void TruncateSpacesInPlace(string &str, ETrunc where=eTrunc_Both)
Truncate spaces in a string (in-place)
Definition: ncbistr.cpp:3201
CNanoTimeout GetRemainingTime(void) const
Get time left to the expiration.
Definition: ncbitime.cpp:3859
const long kMilliSecondsPerSecond
Number milliseconds in one second.
Definition: ncbitime.hpp:96
unsigned long GetAsMilliSeconds(void) const
Get as number of milliseconds.
Definition: ncbitime.cpp:3490
bool IsInfinite(void) const
Check if the deadline is infinite.
Definition: ncbitime.hpp:1852
const TArgs & GetArgs(void) const
Get the const list of arguments.
Definition: ncbi_url.hpp:300
list< TArg > TArgs
Definition: ncbi_url.hpp:276
const STimeout * g_CTimeoutToSTimeout(const CTimeout &cto, STimeout &sto)
CTimeout/STimeout adapters.
EIO_Status
I/O status.
Definition: ncbi_core.h:132
const char * IO_StatusStr(EIO_Status status)
Get the text form of an enum status value.
Definition: ncbi_core.c:56
@ eIO_Success
everything is fine, no error occurred
Definition: ncbi_core.h:133
Definition of all error codes used in connect services library (xconnserv.lib and others).
char * buf
int len
static void text(MDB_val *v)
Definition: mdb_dump.c:62
static MDB_envinfo info
Definition: mdb_load.c:37
constexpr auto sort(_Init &&init)
mdb_mode_t mode
Definition: lmdb++.h:38
const struct ncbi::grid::netcache::search::fields::KEY key
#define FWD_RR_KEEPALIVE
Try to reuse the connection.
Definition: ncbi_comm.h:126
#define FWD_RR_FIREWALL
FWDaemon request / reply.
Definition: ncbi_comm.h:125
#define FWD_RR_REJECTMASK
Definition: ncbi_comm.h:137
#define FWD_RR_ERRORMASK
Definition: ncbi_comm.h:136
#define FWD_RR_MAX_SIZE
Maximal accepted request/reply size.
Definition: ncbi_comm.h:150
unsigned int ticket_t
Definition: ncbi_comm.h:66
Parameters initialization model.
int NcbiIPv4ToIPv6(TNCBI_IPv6Addr *addr, unsigned int ipv4, size_t pfxlen)
Embed a passed network byte order IPv4 address into an IPv6 address using the specified prefix length...
Definition: ncbi_ipv6.c:142
const char * LBSMD_GetHostParameter(unsigned int addr, const char *name)
Definition: ncbi_lbsmd.c:1148
#define NcbiIsLocalIPEx
IMessage/IMessageListener interfaces and basic implementations.
#define offsetof(T, F)
Definition: ncbi_socket.c:700
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
NetCache API exception declarations.
CJsonNode g_ExecToJson(IExecToJson &exec_to_json, CNetService service, CNetService::EIterationMode iteration_mode)
This function is for use by the grid_cli utility only.
R s_RandomIteratorGen
void g_AppendClientIPSessionIDHitID(string &cmd)
#define LBSMD_PENALIZED_RATE_BOUNDARY
DEFINE_STATIC_FAST_MUTEX(s_RndLock)
static CRandom::TValue s_GetRand(CRandom::TValue max_value)
void g_AppendClientIPAndSessionID(string &cmd, const CRequestContext &req)
pair< SNetServerInPool *, double > TServerRate
vector< TServerRate > TNetServerList
void g_AppendHitID(string &cmd, CRequestContext &req)
#define RETRY_DELAY_DEFAULT
#define CONNECTION_MAX_RETRIES
T min(T x_, T y_)
static CMiniMutex s_RndLock
void OnWarning(const string &warn_msg, CNetServer &server)
virtual void DeleteThis()
Virtual method "deleting" this object.
TNetServerList::const_iterator m_SuppressedBegin
SDiscoveredServers * m_NextGroupInPool
void Reset(unsigned discovery_iteration)
CRef< INetServerConnectionListener > m_Listener
SFailOnlyWarnings(CRef< INetServerConnectionListener > listener)
CRef< SNetServerInPool > m_ServerInPool
static void ConnectImpl(CSocket &, SConnectDeadline &, const SSocketAddress &, const SSocketAddress &)
CNetServer::SExecResult ConnectAndExec(const string &cmd, bool multiline_output, bool retry_on_exception=false)
SThrottleStats m_ThrottleStats
CNetServerPool m_ServerPool
CNetServer GetServer(SNetServiceImpl *service, SSocketAddress server_address)
CRef< SNetServerInPool > ReturnServer(SNetServerInPool *server_impl)
void Init(CSynRegistry &registry, const SRegSynonyms &sections)
SNetServerPoolImpl(INetServerConnectionListener *listener)
SSocketAddress m_EnforcedServer
SNetServerInPool * FindOrCreateServerImpl(SSocketAddress server_address)
SThrottleParams m_ThrottleParams
INetServerConnectionListener::TPropCreator m_PropCreator
TNetServerByAddress m_Servers
void Swap(SNetServiceImpl &impl, unsigned &n)
EServiceType m_ServiceType
CRef< INetServerConnectionListener > m_Listener
CNetServerPool m_ServerPool
void DiscoverServersIfNeeded()
CNetServer GetServer(SSocketAddress server_address)
static SNetServiceImpl * Create(const string &api_name, const string &service_name, const string &client_name, INetServerConnectionListener *listener, CSynRegistry &registry, SRegSynonyms &sections, const string &ns_client_name=kEmptyStr)
shared_ptr< void > m_NetInfo
void Init(CSynRegistry &registry, SRegSynonyms &sections, const string &ns_client_name)
unsigned m_LatestDiscoveryIteration
CNetServer::SExecResult FindServerAndExec(const string &cmd, bool multiline_output)
CSimpleRebalanceStrategy m_RebalanceStrategy
SNetServiceImpl(const string &api_name, const string &service_name, const string &client_name, INetServerConnectionListener *listener, CSynRegistry &registry, const SRegSynonyms &sections)
SNetServiceIteratorImpl * Iterate(CNetServer::TInstance priority_server)
unsigned long m_ConnectionRetryDelay
void GetDiscoveredServers(CRef< SDiscoveredServers > &discovered_servers)
void IterateUntilExecOK(const string &cmd, bool multiline_output, CNetServer::SExecResult &exec_result, IServiceTraversal *service_traversal, EServerErrorHandling error_handling)
shared_ptr< void > CreateRetryGuard(SRetry::EType type)
SDiscoveredServers * m_DiscoveredServers
SDiscoveredServers * m_ServerGroupPool
static SNetServiceImpl * Clone(SNetServerInPool *server, SNetServiceImpl *prototype)
virtual ~SNetServiceImpl()
bool IsInService(CNetServer::TInstance server)
SDiscoveredServers * AllocServerGroup(unsigned discovery_iteration)
TNetServerList::const_iterator m_Position
CRef< SDiscoveredServers > m_ServerGroup
TNetServerList::const_iterator m_Pivot
SNetServiceIterator_RandomPivot(SDiscoveredServers *server_group_impl, TNetServerList::const_iterator pivot)
TRandomIterators::const_iterator m_RandomIterator
TNetServerList::const_iterator m_ServerListIter
SNetServiceIterator_Weighted(SDiscoveredServers *server_group_impl, Uint4 key_crc32)
vector< SServerRank >::const_iterator m_CurrentServerRank
SServerRank x_GetServerRank(TNetServerList::const_iterator server) const
vector< SServerRank > m_ServerRanks
bool IsAllowed(const string &service_name) const
void AddToAllowed(const string &service_name)
set< string, PNocase > m_Allowed
TNetServiceByName m_ServiceByName
CNetService GetServiceByNameImpl(const string &, SNetServiceImpl *)
CFastMutex m_ServiceMapMutex
CNetService GetServiceByName(const string &service_name, SNetServiceImpl *prototype)
static void ConnectXSite(CSocket &, SNetServerImpl::SConnectDeadline &, const SSocketAddress &, const string &)
static void InitXSite(CSynRegistry &registry, const SRegSynonyms &sections)
SNoRetryNoErrors(SNetServiceImpl *service)
void Set(CNetService::TEventHandler error_handler)
CNetRef< SNetServiceImpl > m_Service
unsigned m_MaxRetries
SNoRetry(SNetServiceImpl *service)
static SSocketAddress Parse(const string &address, SHost::EName name=SHost::EName::eResolved)
void Init(CSynRegistry &registry, const SRegSynonyms &sections)
Timeout structure.
Definition: ncbi_types.h:76
Definition: type.c:6
#define _ASSERT
else result
Definition: token2.c:20
void free(voidpf ptr)
Modified on Sat Jun 15 11:50:40 2024 by modify_doxy.py rev. 669887