NCBI C++ ToolKit
psg_client_transport.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: psg_client_transport.cpp 101378 2023-12-07 15:24:53Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitri Dmitrienko, Rafael Sadyrov
27  *
28  */
29 
30 #include <ncbi_pch.hpp>
31 
33 
34 #ifdef HAVE_PSG_CLIENT
35 
36 #include <memory>
37 #include <string>
38 #include <sstream>
39 #include <list>
40 #include <vector>
41 #include <cassert>
42 #include <exception>
43 #include <thread>
44 #include <unordered_map>
45 #include <type_traits>
46 #include <utility>
47 #include <functional>
48 #include <numeric>
49 #include <cmath>
50 
51 #define __STDC_FORMAT_MACROS
52 
54 #include <corelib/ncbi_cookies.hpp>
55 
56 #include "psg_client_transport.hpp"
57 
59 
60 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, rd_buf_size, 64 * 1024, 1024 );
61 PSG_PARAM_VALUE_DEF_MIN(size_t, PSG, wr_buf_size, 64 * 1024, 1024 );
62 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, max_concurrent_streams, 100, 10 );
63 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, max_concurrent_submits, 150, 1 );
64 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, max_sessions, 40, 1 );
65 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, max_concurrent_requests_per_server, 500, 100 );
66 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, num_io, 6, 1 );
67 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, reader_timeout, 12, 1 );
68 PSG_PARAM_VALUE_DEF_MIN(double, PSG, rebalance_time, 10.0, 1.0 );
69 PSG_PARAM_VALUE_DEF_MIN(size_t, PSG, requests_per_io, 1, 1 );
70 PSG_PARAM_VALUE_DEF_MIN(double, PSG, io_timer_period, 1.0, 0.1 );
71 NCBI_PARAM_DEF(double, PSG, request_timeout, 10.0);
72 NCBI_PARAM_DEF(double, PSG, competitive_after, 0.0);
73 NCBI_PARAM_DEF(unsigned, PSG, request_retries, 2);
74 NCBI_PARAM_DEF(unsigned, PSG, refused_stream_retries, 2);
75 NCBI_PARAM_DEF(string, PSG, request_user_args, "");
76 NCBI_PARAM_DEF(bool, PSG, user_request_ids, false);
77 NCBI_PARAM_DEF(unsigned, PSG, localhost_preference, 1);
78 NCBI_PARAM_DEF(bool, PSG, fail_on_unknown_items, false);
79 NCBI_PARAM_DEF(bool, PSG, fail_on_unknown_chunks, false);
80 NCBI_PARAM_DEF(bool, PSG, https, false);
81 NCBI_PARAM_DEF(double, PSG, no_servers_retry_delay, 1.0);
82 NCBI_PARAM_DEF(bool, PSG, stats, false);
83 NCBI_PARAM_DEF(double, PSG, stats_period, 0.0);
84 NCBI_PARAM_DEF_EX(string, PSG, service, "PSG2", eParam_Default, NCBI_PSG_SERVICE);
85 NCBI_PARAM_DEF_EX(string, PSG, auth_token_name, "WebCubbyUser", eParam_Default, NCBI_PSG_AUTH_TOKEN_NAME);
86 NCBI_PARAM_DEF_EX(string, PSG, auth_token, "", eParam_Default, NCBI_PSG_AUTH_TOKEN);
87 
88 NCBI_PARAM_DEF(double, PSG, throttle_relaxation_period, 0.0);
89 NCBI_PARAM_DEF(unsigned, PSG, throttle_by_consecutive_connection_failures, 0);
90 NCBI_PARAM_DEF(bool, PSG, throttle_hold_until_active_in_lb, false);
91 NCBI_PARAM_DEF(string, PSG, throttle_by_connection_error_rate, "");
92 
94 {
95  { "none", EPSG_DebugPrintout::eNone },
96  { "some", EPSG_DebugPrintout::eSome },
97  { "all", EPSG_DebugPrintout::eAll }
98 };
100 
102 {
103  { "default", EPSG_UseCache::eDefault },
104  { "no", EPSG_UseCache::eNo },
105  { "yes", EPSG_UseCache::eYes }
106 };
108 
109 // Performance reporting/request IDs for psg_client app
110 NCBI_PARAM_ENUM_ARRAY(EPSG_PsgClientMode, PSG, internal_psg_client_mode)
111 {
112  { "off", EPSG_PsgClientMode::eOff },
113  { "performance", EPSG_PsgClientMode::ePerformance },
114 };
116 
118 {
119  if (value == "bioseq_info") return { SPSG_ArgsBase::eBioseqInfo, cref(value) };
120  if (value == "blob_prop") return { SPSG_ArgsBase::eBlobProp, cref(value) };
121  if (value == "blob") return { SPSG_ArgsBase::eBlob, cref(value) };
122  if (value == "reply") return { SPSG_ArgsBase::eReply, cref(value) };
123  if (value == "bioseq_na") return { SPSG_ArgsBase::eBioseqNa, cref(value) };
124  if (value == "na_status") return { SPSG_ArgsBase::eNaStatus, cref(value) };
125  if (value == "public_comment") return { SPSG_ArgsBase::ePublicComment, cref(value) };
126  if (value == "processor") return { SPSG_ArgsBase::eProcessor, cref(value) };
127  if (value == "ipg_info") return { SPSG_ArgsBase::eIpgInfo, cref(value) };
128  if (value.empty()) return { SPSG_ArgsBase::eReply, cref(value) };
129  return { SPSG_ArgsBase::eUnknownItem, cref(value) };
130 };
131 
133 {
134  if (value == "meta") return { SPSG_ArgsBase::eMeta, cref(value) };
135  if (value == "data") return { SPSG_ArgsBase::eData, cref(value) };
136  if (value == "message") return { SPSG_ArgsBase::eMessage, cref(value) };
137  if (value == "data_and_meta") return { SPSG_ArgsBase::eDataAndMeta, cref(value) };
138  if (value == "message_and_meta") return { SPSG_ArgsBase::eMessageAndMeta, cref(value) };
139  return { SPSG_ArgsBase::eUnknownChunk, cref(value) };
140 };
141 
142 unsigned SPSG_Params::s_GetRequestTimeout(double io_timer_period)
143 {
144  auto value = TPSG_RequestTimeout::GetDefault();
145 
146  if (value < io_timer_period) {
147  ERR_POST(Warning << "[PSG] request_timeout ('" << value << "')"
148  " was increased to the minimum allowed value ('" << io_timer_period << "')");
149  value = io_timer_period;
150  }
151 
152  return static_cast<unsigned>(value / io_timer_period);
153 }
154 
155 unsigned SPSG_Params::s_GetCompetitiveAfter(double io_timer_period, double timeout)
156 {
157  auto value = TPSG_CompetitiveAfter::GetDefault();
158  timeout *= io_timer_period;
159 
160  if ((value > 0.0) && (value < io_timer_period)) {
161  ERR_POST(Warning << "[PSG] competitive_after ('" << value << "')"
162  " was increased to the minimum allowed value ('" << io_timer_period << "')");
163  value = io_timer_period;
164  }
165 
166  if (value >= timeout) {
167  ERR_POST(Warning << "[PSG] competitive_after ('" << value << "') was disabled, "
168  "as it was greater or equal to request timeout ('" << timeout << "')");
169  } else if (value > 0.0) {
170  timeout = value;
171  }
172 
173  return static_cast<unsigned>(timeout / io_timer_period);
174 }
175 
177 {
179 
180  if (rv.empty()) {
181  CHttpCookies cookies;
182  cookies.Add(CHttpCookies::eHeader_Cookie, env.Get("HTTP_COOKIE"), nullptr);
183 
184  for (const auto& cookie : cookies) {
185  if (cookie.GetName() == auth_token_name.Get()) {
186  return NStr::URLDecode(cookie.GetValue());
187  }
188  }
189  }
190 
191  return rv;
192 }
193 
194 void SDebugPrintout::Print(SSocketAddress address, const string& path, const string& sid, const string& phid, const string& ip, SUv_Tcp::TPort port)
195 {
196  ostringstream os;
197 
198  if (!ip.empty()) os << ";IP=" << ip;
199  if (port) os << ";PORT=" << port;
200  if (m_Params.proxy) os << ";PROXY=" << m_Params.proxy;
201 
202  ERR_POST(Message << id << ": " << address << path << ";SID=" << sid << ";PHID=" << phid << os.str());
203 }
204 
205 void SDebugPrintout::Print(const SPSG_Args& args, const SPSG_Chunk& chunk)
206 {
207  ostringstream os;
208 
209  os << args.GetQueryString(CUrlArgs::eAmp_Char) << '\n';
210 
211  if ((m_Params.debug_printout == EPSG_DebugPrintout::eAll) ||
213  os << chunk;
214  } else {
215  os << "<BINARY DATA OF " << chunk.size() << " BYTES>";
216  }
217 
218  ERR_POST(Message << id << ": " << NStr::PrintableString(os.str()));
219 }
220 
222 {
223  ERR_POST(Message << id << ": Closed with status " << SUvNgHttp2_Error::NgHttp2Str(error_code));
224 }
225 
226 void SDebugPrintout::Print(unsigned retries, const SUvNgHttp2_Error& error)
227 {
228  ERR_POST(Message << id << ": Retrying (" << retries << " retries remaining) after " << error);
229 }
230 
232 {
233  ERR_POST(Message << id << ": Gave up after " << error);
234 }
235 
237 {
238  if (IsPerf()) {
239  ostringstream os;
240 
241  for (const auto& event : m_Events) {
242  auto ms = get<0>(event);
243  auto type = get<1>(event);
244  auto thread_id = get<2>(event);
245  os << fixed << id << '\t' << ms << '\t' << type << '\t' << thread_id << '\n';
246  }
247 
248  cout << os.str() << flush;
249  }
250 }
251 
252 template <>
254 {
256  static constexpr size_t size = CPSG_Request::eChunk + 1;
257  static constexpr auto prefix = "\trequest\ttype=";
258 
259  static constexpr array<type, size> values = {
265  };
266 
267  static const char* ValueName(type value)
268  {
269  switch (value) {
270  case CPSG_Request::eBiodata: return "biodata";
271  case CPSG_Request::eResolve: return "resolve";
272  case CPSG_Request::eBlob: return "blob";
273  case CPSG_Request::eNamedAnnotInfo: return "named_annot_info";
274  case CPSG_Request::eChunk: return "chunk";
275  case CPSG_Request::eIpgResolve: return "ipg_resolve";
276  }
277 
278  // Should not happen
279  _TROUBLE;
280  return "unknown";
281  }
282 };
283 
284 template <>
286 {
288  static constexpr size_t size = CPSG_ReplyItem::eEndOfReply + 1;
289  static constexpr auto prefix = "\treply_item\ttype=";
290 
291  static constexpr array<type, size> values = {
302  };
303 
304  static const char* ValueName(type value)
305  {
306  switch (value) {
307  case CPSG_ReplyItem::eBlobData: return "blob_data";
308  case CPSG_ReplyItem::eBlobInfo: return "blob_info";
309  case CPSG_ReplyItem::eSkippedBlob: return "skipped_blob";
310  case CPSG_ReplyItem::eBioseqInfo: return "bioseq_info";
311  case CPSG_ReplyItem::eNamedAnnotInfo: return "named_annot_info";
312  case CPSG_ReplyItem::eNamedAnnotStatus: return "named_annot_status";
313  case CPSG_ReplyItem::ePublicComment: return "public_comment";
314  case CPSG_ReplyItem::eProcessor: return "processor";
315  case CPSG_ReplyItem::eIpgInfo: return "ipg_info";
316  case CPSG_ReplyItem::eEndOfReply: return "end_of_reply";
317  }
318 
319  // Should not happen
320  _TROUBLE;
321  return "unknown";
322  }
323 };
324 
325 template <>
327 {
329  static constexpr size_t size = CPSG_SkippedBlob::eUnknown + 1;
330  static constexpr auto prefix = "\tskipped_blob\treason=";
331 
332  static constexpr array<type, size> values = {
337  };
338 
339  static const char* ValueName(type value)
340  {
341  switch (value) {
342  case CPSG_SkippedBlob::eExcluded: return "excluded";
343  case CPSG_SkippedBlob::eInProgress: return "in_progress";
344  case CPSG_SkippedBlob::eSent: return "sent";
345  case CPSG_SkippedBlob::eUnknown: return "unknown";
346  }
347 
348  // Should not happen
349  _TROUBLE;
350  return "unknown";
351  }
352 };
353 
354 template <>
356 {
357  using type = EPSG_Status;
358  static constexpr size_t size = static_cast<size_t>(EPSG_Status::eError) + 1;
359  static constexpr auto prefix = "\treply_item_status\tstatus=";
360 
361  static constexpr array<type, size> values = {
368  };
369 
370  static const char* ValueName(type value)
371  {
372  switch (value) {
373  case EPSG_Status::eSuccess: return "success";
374  case EPSG_Status::eInProgress: return "in_progress";
375  case EPSG_Status::eNotFound: return "not_found";
376  case EPSG_Status::eCanceled: return "canceled";
377  case EPSG_Status::eForbidden: return "forbidden";
378  case EPSG_Status::eError: return "error";
379  }
380 
381  // Should not happen
382  _TROUBLE;
383  return "unknown";
384  }
385 };
386 
387 template <>
389 {
390  using type = EDiagSev;
391  static constexpr size_t size = EDiagSev::eDiag_Trace + 1;
392  static constexpr auto prefix = "\tmessage\tseverity=";
393 
394  static constexpr array<type, size> values = {
401  };
402 
403  static const char* ValueName(type value)
404  {
405  switch (value) {
406  case EDiagSev::eDiag_Info: return "info";
407  case EDiagSev::eDiag_Warning: return "warning";
408  case EDiagSev::eDiag_Error: return "error";
409  case EDiagSev::eDiag_Critical: return "critical";
410  case EDiagSev::eDiag_Fatal: return "fatal";
411  case EDiagSev::eDiag_Trace: return "trace";
412  }
413 
414  // Should not happen
415  _TROUBLE;
416  return "unknown";
417  }
418 };
419 
423 };
424 
425 template <>
427 {
429  static constexpr size_t size = ePSG_StatsCountersRetries_Timeout + 1;
430  static constexpr auto prefix = "\tretries\tevent=";
431 
432  static constexpr array<type, size> values = {
435  };
436 
437  static const char* ValueName(type value)
438  {
439  switch (value) {
440  case ePSG_StatsCountersRetries_Retry: return "retry";
441  case ePSG_StatsCountersRetries_Timeout: return "timeout";
442  }
443 
444  // Should not happen
445  _TROUBLE;
446  return "unknown";
447  }
448 };
449 
450 template <SPSG_Stats::EGroup group>
452 {
453  data.emplace_back(SGroup<group>::size);
454 
455  for (auto& counter : data.back()) {
456  counter = 0;
457  }
458 };
459 
460 template <SPSG_Stats::EGroup group>
461 void SPSG_StatsCounters::SReport::Func(const TData& data, const char* prefix, unsigned report)
462 {
463  using TGroup = SGroup<group>;
464 
465  _ASSERT(data.size() > group);
466  const auto& g = data[group];
467  _ASSERT(g.size() == TGroup::size);
468 
469  for (auto i : TGroup::values) {
470  auto n = g[static_cast<size_t>(i)].load();
471  if (n) ERR_POST(Note << prefix << report << TGroup::prefix << TGroup::ValueName(i) << "&count=" << n);
472  }
473 }
474 
476 {
477  Apply<SInit>(eRequest, m_Data);
478 }
479 
480 template <class TWhat, class... TArgs>
481 void SPSG_StatsCounters::Apply(EGroup start_with, TArgs&&... args)
482 {
483  // This method is always called with start_with == eRequest (so, all cases are run with one call, one by one).
484  // This switch usage however makes compilers warn if any enum value is missing/not handled
485  switch (start_with) {
486  case eRequest: TWhat::template Func<eRequest> (std::forward<TArgs>(args)...);
487  case eReplyItem: TWhat::template Func<eReplyItem> (std::forward<TArgs>(args)...);
488  case eSkippedBlob: TWhat::template Func<eSkippedBlob> (std::forward<TArgs>(args)...);
489  case eReplyItemStatus: TWhat::template Func<eReplyItemStatus> (std::forward<TArgs>(args)...);
490  case eMessage: TWhat::template Func<eMessage> (std::forward<TArgs>(args)...);
491  case eRetries: TWhat::template Func<eRetries> (std::forward<TArgs>(args)...);
492  }
493 }
494 
495 template <class... TArgs>
497 {
498  Apply<SReport>(eRequest, m_Data, std::forward<TArgs>(args)...);
499 }
500 
502  m_Data(eTimeUntilResend + 1)
503 {
504 }
505 
507 {
508  switch (avg_time) {
509  case SPSG_Stats::eSentSecondsAgo: return "sent_seconds_ago";
510  case SPSG_Stats::eTimeUntilResend: return "time_until_resend";
511  }
512 
513  // Should not happen
514  _TROUBLE;
515  return "unknown";
516 }
517 
518 void SPSG_StatsAvgTime::Report(const char* prefix, unsigned report)
519 {
520  for (auto i : { eSentSecondsAgo, eTimeUntilResend }) {
521  _ASSERT(m_Data.size() > i);
522  const auto& data = m_Data[i];
523  auto v = data.first.load();
524  auto n = data.second.load();
525  if (n) ERR_POST(Note << prefix << report << '\t' << GetName(i) << "\taverage=" << double(v / n) / milli::den);
526  }
527 }
528 
529 template <class TDataId>
530 void SPSG_StatsData::SData<TDataId>::Report(const char* prefix, unsigned report, const char* data_prefix)
531 {
532  struct SLess {
533  static auto Tuple(const CPSG_BlobId& id) { return tie(id.GetId(), id.GetLastModified()); }
534  static auto Tuple(const CPSG_ChunkId& id) { return tuple<int, const string&>(id.GetId2Chunk(), id.GetId2Info()); }
535  bool operator()(const TDataId& lhs, const TDataId& rhs) const { return Tuple(lhs) < Tuple(rhs); }
536  };
537 
538  size_t total = 0;
540 
541  if (auto locked = m_Ids.GetLock()) {
542  total = locked->size();
543 
544  if (!total) return;
545 
546  for (const auto& data_id : *locked) {
547  auto created = unique_ids.emplace(data_id, 1);
548  if (!created.second) ++created.first->second;
549  }
550  }
551 
552  ERR_POST(Note << prefix << report << data_prefix << "\ttotal=" << total << "&unique=" << unique_ids.size());
553 
554  auto received = m_Received.load();
555  auto read = m_Read.load();
556  if (received) ERR_POST(Note << prefix << report << data_prefix << "_data\treceived=" << received << "&read=" << read);
557 
558  map<unsigned, unsigned> group_by_count;
559 
560  for (const auto& p : unique_ids) {
561  auto created = group_by_count.emplace(p.second, 1);
562  if (!created.second) ++created.first->second;
563  }
564 
565  for (const auto& p : group_by_count) {
566  ERR_POST(Note << prefix << report << data_prefix << "_retrievals\tnumber=" << p.first << "&unique_ids=" << p.second);
567  }
568 }
569 
570 void SPSG_StatsData::Report(const char* prefix, unsigned report)
571 {
572  m_Blobs.Report(prefix, report, "\tblob");
573  m_Chunks.Report(prefix, report, "\tchunk");
574  if (auto n = m_TSEs.GetLock()->size()) ERR_POST(Note << prefix << report << "\tchunk_tse\tunique=" << n);
575 }
576 
578 {
579  return SecondsToMs(TPSG_StatsPeriod::GetDefault());
580 }
581 
583  m_Timer(this, s_OnTimer, s_GetStatsPeriod(), s_GetStatsPeriod()),
584  m_Report(0),
585  m_Servers(servers)
586 {
587 }
588 
590 {
591  const auto prefix = "PSG_STATS\t";
592  const auto report = ++m_Report;
593 
597 
598  auto servers_locked = m_Servers.GetLock();
599 
600  for (const auto& server : *servers_locked) {
601  auto n = server.stats.load();
602  if (n) ERR_POST(Note << prefix << report << "\tserver\tname=" << server.address << "&requests_sent=" << n);
603  }
604 }
605 
607 {
608  for (auto& m : m_Messages) {
609  if (m && ((min_severity == eDiag_Trace) || ((m.severity != eDiag_Trace) && (m.severity >= min_severity)))) {
610  return exchange(m, SPSG_Message{});
611  }
612  }
613 
614  return {};
615 }
616 
618 {
619  m_InProgress.store(true);
620  m_Status.store(EPSG_Status::eSuccess);
621  m_Messages.clear();
622 }
623 
625 {
626  switch (status) {
634  default: return EPSG_Status::eError;
635  }
636 }
637 
639 {
640  chunks.clear();
641  args = SPSG_Args{};
642  expected = null;
643  received = 0;
644  state.Reset();
645 }
646 
648 {
649  // If it were 'more' (instead of 'less'), items would not be in progress then
650  const auto message = "Protocol error: received less than expected";
651  bool missing = false;
652 
653  if (auto items_locked = items.GetLock()) {
654  for (auto& item : *items_locked) {
655  if (item->state.InProgress()) {
656  item.GetLock()->state.AddError(message);
657  item->state.SetComplete();
658  missing = true;
659  }
660  }
661  }
662 
663  if (auto reply_item_locked = reply_item.GetLock()) {
664  if (missing || reply_item_locked->expected.Cmp<greater>(reply_item_locked->received)) {
665  reply_item_locked->state.AddError(message);
666  }
667 
668  reply_item_locked->state.SetComplete();
669  }
670 
672  queue->NotifyOne();
673 }
674 
675 void SPSG_Reply::SetFailed(string message, EPSG_Status status)
676 {
677  if (auto items_locked = items.GetLock()) {
678  for (auto& item : *items_locked) {
679  if (item->state.InProgress()) {
680  item.GetLock()->state.AddError(message);
681  item->state.SetComplete();
682  }
683  }
684  }
685 
686  if (auto reply_item_locked = reply_item.GetLock()) {
687  reply_item_locked->state.AddError(message, status);
688  reply_item_locked->state.SetComplete();
689  }
690 
692  queue->NotifyOne();
693 }
694 
695 optional<SPSG_Reply::SItem::TTS*> SPSG_Reply::GetNextItem(CDeadline deadline)
696 {
697  do {
698  bool was_in_progress = reply_item->state.InProgress();
699 
700  if (auto new_items_locked = new_items.GetLock()) {
701  if (!new_items_locked->empty()) {
702  auto rv = new_items_locked->front();
703  new_items_locked->pop_front();
704  return rv;
705  }
706  }
707 
708  // No more reply items
709  if (!was_in_progress) {
710  return nullptr;
711  }
712  }
713  while (reply_item.WaitUntil(reply_item->state.InProgress(), deadline, false, true));
714 
715  return nullopt;
716 }
717 
719 {
720  items.GetLock()->clear();
721  reply_item.GetLock()->Reset();
722 }
723 
724 shared_ptr<void> SPSG_Request::SContext::Set()
725 {
726  auto guard = m_ExistingGuard.lock();
727 
728  if (!guard) {
730  guard.reset(this, [](void*) { CDiagContext::SetRequestContext(nullptr); });
731  m_ExistingGuard = guard;
732  }
733 
734  return guard;
735 }
736 
737 SPSG_Request::SPSG_Request(string p, CPSG_Request::TFlags f, shared_ptr<SPSG_Reply> r, CRef<CRequestContext> c, const SPSG_Params& params) :
738  full_path(std::move(p)),
739  flags(f),
740  reply(r),
741  context(c),
742  m_State(&SPSG_Request::StatePrefix),
743  m_Retries(params)
744 {
745  _ASSERT(reply);
746 }
747 
749 {
750  // Reduce failure counter as well (retry counter is reduced in Retry() before returning eRetry)
752 
753  reply->Reset();
755  m_Buffer = SBuffer{};
756  m_ItemsByID.clear();
757 }
758 
760 {
761  static const string kPrefix = "\n\nPSG-Reply-Chunk: ";
762 
763  auto& index = m_Buffer.prefix_index;
764 
765  // Checking prefix
766  while (*data == kPrefix[index]) {
767  ++data;
768  --len;
769 
770  // Full prefix matched
771  if (++index == kPrefix.size()) {
773  return eContinue;
774  }
775 
776  if (!len) return eContinue;
777  }
778 
779  if (reply->raw && !index) {
782  return eContinue;
783  }
784 
785  // Check failed
786  const auto message = "Protocol error: prefix mismatch";
787 
788  if (Retry(message)) {
789  return eRetry;
790  }
791 
792  reply->reply_item.GetLock()->state.AddError(message);
793  return eStop;
794 }
795 
797 {
798  // Accumulating args
799  while (*data != '\n') {
800  m_Buffer.args_buffer.push_back(*data++);
801  if (!--len) return eContinue;
802  }
803 
804  ++data;
805  --len;
806 
808 
809  const auto& size_str = args.GetValue("size");
810  const auto size = size_str.empty() ? 0ul : stoul(size_str);
811 
812  m_Buffer.args = std::move(args);
813 
814  if (size) {
817  } else {
819  return Add();
820  }
821 
822  return eContinue;
823 }
824 
826 {
827  // Accumulating data
828  const auto data_size = min(m_Buffer.data_to_read, len);
829 
830  // Do not add an empty part
831  if (!data_size) return eContinue;
832 
833  auto& chunk = m_Buffer.chunk;
834  chunk.append(data, data_size);
835  data += data_size;
836  len -= data_size;
837  m_Buffer.data_to_read -= data_size;
838 
839  if (!m_Buffer.data_to_read) {
841  return Add();
842  }
843 
844  return eContinue;
845 }
846 
847 EDiagSev s_GetSeverity(const string& severity)
848 {
849  if (severity == "error") return eDiag_Error;
850  if (severity == "warning") return eDiag_Warning;
851  if (severity == "info") return eDiag_Info;
852  if (severity == "trace") return eDiag_Trace;
853  if (severity == "fatal") return eDiag_Fatal;
854  if (severity == "critical") return eDiag_Critical;
855 
856  // Should not happen
857  _TROUBLE;
858  return eDiag_Error;
859 }
860 
861 auto s_GetCode(const string& code)
862 {
863  return code.empty() ? optional<int>{} : atoi(code.c_str());
864 }
865 
867 {
868  auto context_guard = context.Set();
869 
870  auto& args = m_Buffer.args;
871  reply->debug_printout << args << m_Buffer.chunk << endl;
872 
873  const auto item_type = args.GetValue<SPSG_Args::eItemType>().first;
874  auto& reply_item_ts = reply->reply_item;
875 
876  if (item_type == SPSG_Args::eReply) {
877  if (auto item_locked = reply_item_ts.GetLock()) {
878  if (auto update_result = UpdateItem(item_type, *item_locked, args); update_result == eRetry503) {
879  return eRetry;
880  } else if (update_result == eNewItem) {
881  // No retries after returning any data to users
882  m_Retries.Zero();
883  }
884  }
885 
886  // Item must be unlocked before notifying
887  reply_item_ts.NotifyOne();
888 
889  } else {
890  if (auto reply_item_locked = reply_item_ts.GetLock()) {
891  auto& reply_item = *reply_item_locked;
892  ++reply_item.received;
893 
894  if (reply_item.expected.Cmp<less>(reply_item.received)) {
895  reply_item.state.AddError("Protocol error: received more than expected");
896  }
897  }
898 
899  auto item_id = args.GetValue("item_id");
900  auto& item_by_id = m_ItemsByID[item_id];
901  bool to_create = !item_by_id;
902 
903  if (to_create) {
904  if (auto items_locked = reply->items.GetLock()) {
905  items_locked->emplace_back();
906  item_by_id = &items_locked->back();
907  }
908  }
909 
910  if (auto item_locked = item_by_id->GetLock()) {
911  auto update_result = UpdateItem(item_type, *item_locked, args);
912 
913  if (update_result == eRetry503) {
914  return eRetry;
915  }
916 
917  if (to_create) {
918  item_locked->args = std::move(args);
919  }
920 
921  if (update_result == eNewItem) {
922  // No retries after returning any data to users
923  m_Retries.Zero();
924 
925  reply->new_items.GetLock()->emplace_back(item_by_id);
926  }
927 
928  reply_item_ts.NotifyOne();
929  }
930 
931  // Item must be unlocked before notifying
932  item_by_id->NotifyOne();
933  }
934 
935  reply->queue->NotifyOne();
936  m_Buffer = SBuffer();
937  return eContinue;
938 }
939 
941 {
942  auto get_status = [&]() { return NStr::StringToInt(args.GetValue("status"), NStr::fConvErr_NoThrow); };
943  auto can_retry_503 = [&](auto s, auto m) { return (s == CRequestStatus::e503_ServiceUnavailable) && Retry(m); };
944 
945  ++item.received;
946 
947  auto chunk_type = args.GetValue<SPSG_Args::eChunkType>();
948  auto& chunk = m_Buffer.chunk;
949  auto rv = eSuccess;
950 
951  if (chunk_type.first & SPSG_Args::eMeta) {
952  auto n_chunks = args.GetValue("n_chunks");
953 
954  if (!n_chunks.empty()) {
955  auto expected = stoul(n_chunks);
956 
957  if (item.expected.Cmp<not_equal_to>(expected)) {
958  item.state.AddError("Protocol error: contradicting n_chunks");
959  } else {
960  item.expected = expected;
961  }
962  }
963 
964  if (const auto status = get_status(); can_retry_503(status, "Server returned a meta with status 503")) {
965  return eRetry503;
966  } else if (status) {
968  }
969 
970  if ((item_type != SPSG_Args::eBlob) || item.chunks.empty()) {
971  rv = eNewItem;
972  }
973 
974  } else if (chunk_type.first == SPSG_Args::eUnknownChunk) {
975  static atomic_bool reported(false);
976 
977  if (!reported.exchange(true)) {
978  ERR_POST("Received unknown chunk type: " << chunk_type.second.get());
979  }
980 
981  if (TPSG_FailOnUnknownChunks::GetDefault()) {
982  item.state.AddError("Protocol error: unknown chunk type '" + chunk_type.second + '\'');
983  }
984  }
985 
986  if (chunk_type.first & SPSG_Args::eMessage) {
987  auto severity = s_GetSeverity(args.GetValue("severity"));
988  auto code = s_GetCode(args.GetValue("code"));
989 
990  if (severity <= eDiag_Warning) {
991  item.state.AddMessage(std::move(chunk), severity, code);
992  } else if (severity == eDiag_Trace) {
993  _DEBUG_CODE(item.state.AddMessage(std::move(chunk), severity, code););
994  } else if (const auto status = get_status(); can_retry_503(status, chunk.c_str())) {
995  return eRetry503;
996  } else {
997  item.state.AddError(std::move(chunk), SPSG_Reply::SState::FromRequestStatus(status), severity, code);
998  }
999 
1000  if (auto stats = reply->stats.lock()) stats->IncCounter(SPSG_Stats::eMessage, severity);
1001 
1002  } else if (chunk_type.first & SPSG_Args::eData) {
1003  auto blob_chunk = args.GetValue("blob_chunk");
1004  auto index = blob_chunk.empty() ? 0 : stoul(blob_chunk);
1005 
1006  if (item_type == SPSG_Args::eBlob) {
1007  if (!index) {
1008  rv = eNewItem;
1009  }
1010 
1011  if (auto stats = reply->stats.lock()) {
1012  auto has_blob_id = !args.GetValue<SPSG_Args::eBlobId>().get().empty();
1013  stats->AddData(has_blob_id, SPSG_Stats::eReceived, chunk.size());
1014  }
1015  }
1016 
1017  if (item.chunks.size() <= index) item.chunks.resize(index + 1);
1018 
1019  item.chunks[index] = std::move(chunk);
1020  }
1021 
1022  const bool is_not_reply = item_type != SPSG_Args::eReply;
1023 
1024  if (item.expected.Cmp<less>(item.received)) {
1025  item.state.AddError("Protocol error: received more than expected");
1026 
1027  // If item is not a reply itself, add the error to its reply as well
1028  if (is_not_reply) {
1029  reply->reply_item.GetLock()->state.AddError("Protocol error: received more than expected");
1030  }
1031 
1032  // Set item complete if received everything. Reply is set complete when stream closes
1033  } else if (is_not_reply && item.expected.Cmp<equal_to>(item.received)) {
1034  item.state.SetComplete();
1035  }
1036 
1037  return rv;
1038 }
1039 
1040 
1041 #define HTTP_STATUS_HEADER ":status"
1042 
1043 
1044 /** SPSG_IoSession */
1045 
1046 template <class... TNgHttp2Cbs>
1047 SPSG_IoSession::SPSG_IoSession(SPSG_Server& s, const SPSG_Params& params, SPSG_AsyncQueue& queue, uv_loop_t* loop, TNgHttp2Cbs&&... callbacks) :
1049  loop,
1050  TAddrNCred{{s.address, SUvNgHttp2_Tls::TCred()}, params.proxy},
1051  TPSG_RdBufSize::GetDefault(),
1052  TPSG_WrBufSize::GetDefault(),
1053  TPSG_Https::GetDefault(),
1054  TPSG_MaxConcurrentStreams::GetDefault(),
1055  std::forward<TNgHttp2Cbs>(callbacks)...),
1056  server(s),
1057  m_Params(params),
1058  m_Headers{{
1059  { ":method", "GET" },
1060  { ":scheme", TPSG_Https::GetDefault() ? "https" : "http" },
1061  { ":authority", m_Authority },
1062  { ":path" },
1063  { "user-agent", SUvNgHttp2_UserAgent::Get() },
1064  { "http_ncbi_sid" },
1065  { "http_ncbi_phid" },
1066  { "cookie" },
1067  { "x-forwarded-for" }
1068  }},
1069  m_Queue(queue)
1070 {
1071 }
1072 
1073 int SPSG_IoSession::OnData(nghttp2_session*, uint8_t, int32_t stream_id, const uint8_t* data, size_t len)
1074 {
1075  PSG_IO_SESSION_TRACE(this << '/' << stream_id << " received: " << len);
1076 
1077  if (auto it = m_Requests.find(stream_id); it != m_Requests.end()) {
1078  if (auto [processor_id, req] = it->second.Get(); req) {
1079  auto result = req->OnReplyData(processor_id, (const char*)data, len);
1080 
1082  it->second.ResetTime();
1083  return 0;
1084 
1085  } else if (result == SPSG_Request::eRetry) {
1086  req->Reset();
1087  m_Queue.Emplace(req);
1088  m_Queue.Signal();
1089 
1090  } else {
1091  req->reply->SetComplete();
1092  }
1093 
1095  }
1096 
1097  m_Requests.erase(it);
1098  }
1099 
1100  return 0;
1101 }
1102 
1103 bool SPSG_Request::Retry(const SUvNgHttp2_Error& error, bool refused_stream)
1104 {
1105  auto context_guard = context.Set();
1106 
1107  if (auto retries = GetRetries(SPSG_Retries::eRetry, refused_stream)) {
1108  reply->debug_printout << retries << error << endl;
1109  return true;
1110  }
1111 
1112  return false;
1113 }
1114 
1115 bool SPSG_Request::Fail(SPSG_Processor::TId processor_id, const SUvNgHttp2_Error& error, bool refused_stream)
1116 {
1117  if (GetRetries(SPSG_Retries::eFail, refused_stream)) {
1118  return false;
1119  }
1120 
1121  auto context_guard = context.Set();
1122 
1123  reply->debug_printout << error << endl;
1124  OnReplyDone(processor_id)->SetFailed(error);
1125  return true;
1126 }
1127 
1129 {
1130  if (m_Buffer.args_buffer.empty() && !m_Buffer.chunk.empty()) {
1131  m_Buffer.args = "item_id=1&item_type=unknown&chunk_type=data_and_meta&n_chunks=1"s;
1132  Add();
1133  m_Buffer.args = "item_id=0&item_type=reply&chunk_type=meta&n_chunks=2"s;
1134  Add();
1135  }
1136 }
1137 
1138 bool SPSG_IoSession::Fail(SPSG_Processor::TId processor_id, shared_ptr<SPSG_Request> req, const SUvNgHttp2_Error& error, bool refused_stream)
1139 {
1140  auto context_guard = req->context.Set();
1141  auto rv = req->Fail(processor_id, error, refused_stream);
1142 
1144 
1145  if (rv) {
1146  PSG_THROTTLING_TRACE("Server '" << GetId() << "' failed to process request '" <<
1147  req->reply->debug_printout.id << "', " << error);
1148  }
1149 
1150  return rv;
1151 }
1152 
1153 int SPSG_IoSession::OnStreamClose(nghttp2_session*, int32_t stream_id, uint32_t error_code)
1154 {
1155  PSG_IO_SESSION_TRACE(this << '/' << stream_id << " closed: " << error_code);
1156 
1157  if (!server.available_streams++) {
1158  PSG_IO_TRACE("Server '" << server.address << "' became available");
1159  m_Queue.SignalAll();
1160  }
1161 
1162  if (auto it = m_Requests.find(stream_id); it != m_Requests.end()) {
1163  if (auto [processor_id, req] = it->second.Get(); req) {
1164  auto context_guard = req->context.Set();
1165  auto& debug_printout = req->reply->debug_printout;
1166  debug_printout << error_code << endl;
1167 
1168  // If there is an error and the request is allowed to Retry
1169  if (error_code) {
1170  auto error(SUvNgHttp2_Error::FromNgHttp2(error_code, "on close"));
1171 
1172  if (RetryFail(processor_id, req, error, error_code == NGHTTP2_REFUSED_STREAM)) {
1173  ERR_POST("Request for " << GetId() << " failed with " << error);
1174  }
1175  } else {
1176  if (req->reply->raw) {
1177  req->ConvertRaw();
1178  }
1179 
1180  req->OnReplyDone(processor_id)->SetComplete();
1182  PSG_THROTTLING_TRACE("Server '" << GetId() << "' processed request '" <<
1183  debug_printout.id << "' successfully");
1184  }
1185  }
1186 
1187  EraseAndMoveToNext(it);
1188  }
1189 
1190  return 0;
1191 }
1192 
1193 int SPSG_IoSession::OnHeader(nghttp2_session*, const nghttp2_frame* frame, const uint8_t* name,
1194  size_t namelen, const uint8_t* value, size_t, uint8_t)
1195 {
1196  if ((frame->hd.type == NGHTTP2_HEADERS) && (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) &&
1197  (namelen == sizeof(HTTP_STATUS_HEADER) - 1) && (strcmp((const char*)name, HTTP_STATUS_HEADER) == 0)) {
1198 
1199  auto stream_id = frame->hd.stream_id;
1200  auto status_str = reinterpret_cast<const char*>(value);
1201 
1202  PSG_IO_SESSION_TRACE(this << '/' << stream_id << " status: " << status_str);
1203 
1204  if (auto it = m_Requests.find(stream_id); it != m_Requests.end()) {
1205  const auto request_status = static_cast<CRequestStatus::ECode>(atoi(status_str));
1206  const auto status = SPSG_Reply::SState::FromRequestStatus(request_status);
1207 
1208  if (status != EPSG_Status::eSuccess) {
1209  if (auto [processor_id, req] = it->second.Get(); req) {
1210  const auto error = to_string(request_status) + ' ' + CRequestStatus::GetStdStatusMessage(request_status);
1211  req->OnReplyDone(processor_id)->SetFailed(error, status);
1212  } else {
1213  m_Requests.erase(it);
1214  }
1215  }
1216  }
1217  }
1218 
1219  return 0;
1220 }
1221 
1222 bool SPSG_IoSession::ProcessRequest(SPSG_TimedRequest timed_req, SPSG_Processor::TId processor_id, shared_ptr<SPSG_Request> req)
1223 {
1224  PSG_IO_SESSION_TRACE(this << " processing requests");
1225 
1226  auto context_guard = req->context.Set();
1228 
1229  const auto& path = req->full_path;
1230  const auto& session_id = context.GetSessionID();
1231  const auto& sub_hit_id = context.GetNextSubHitID();
1232  const auto& cookie = m_Params.GetCookie(req->flags, [&]() { return context.GetProperty("auth_token"); });
1233  const auto& client_ip = context.GetClientIP();
1234  auto headers_size = m_Headers.size();
1235 
1236  m_Headers[ePath] = path;
1237  m_Headers[eSessionID] = session_id;
1238  m_Headers[eSubHitID] = sub_hit_id;
1239  m_Headers[eCookie] = cookie;
1240 
1241  if (!client_ip.empty()) {
1242  m_Headers[eClientIP] = client_ip;
1243  } else {
1244  --headers_size;
1245  }
1246 
1247  auto stream_id = m_Session.Submit(m_Headers.data(), headers_size);
1248 
1249  if (stream_id < 0) {
1250  auto error(SUvNgHttp2_Error::FromNgHttp2(stream_id, "on submit"));
1251 
1252  // Do not reset all requests unless throttling has been activated
1253  if (RetryFail(processor_id, req, error) && server.throttling.Active()) {
1254  Reset(std::move(error));
1255  }
1256 
1257  return false;
1258  }
1259 
1260  if (!--server.available_streams) {
1261  PSG_IO_TRACE("Server '" << server.address << "' reached request limit");
1262  }
1263 
1264  req->submitted_by.Set(GetInternalId());
1265  req->reply->debug_printout << server.address << path << session_id << sub_hit_id << client_ip << m_Tcp.GetLocalPort() << endl;
1266  PSG_IO_SESSION_TRACE(this << '/' << stream_id << " submitted");
1267  m_Requests.emplace(stream_id, std::move(timed_req));
1268  return Send();
1269 }
1270 
1271 void SPSG_IoSession::EraseAndMoveToNext(TRequests::iterator& it)
1272 {
1273  if (IsFull()) {
1274  // Continue processing of requests in the IO thread queue on next UV loop iteration
1275  m_Queue.Signal();
1276  }
1277 
1278  it = m_Requests.erase(it);
1279 }
1280 
1281 template <class TOnRetry, class TOnFail>
1282 bool SPSG_TimedRequest::CheckExpiration(const SPSG_Params& params, const SUvNgHttp2_Error& error, TOnRetry on_retry, TOnFail on_fail)
1283 {
1284  auto [processor_id, req] = Get();
1285 
1286  // Remove competitive requests if one is already being processed
1287  if (!req) {
1288  return true;
1289  }
1290 
1291  auto time = AddTime();
1292 
1293  if (time == params.competitive_after) {
1294  if (req->Retry(error)) {
1295  if (auto stats = req->reply->stats.lock()) stats->IncCounter(SPSG_Stats::eRetries, ePSG_StatsCountersRetries_Retry);
1296  on_retry(req);
1297  }
1298  }
1299 
1300  if (time >= params.request_timeout) {
1301  if (auto stats = req->reply->stats.lock()) stats->IncCounter(SPSG_Stats::eRetries, ePSG_StatsCountersRetries_Timeout);
1302  on_fail(processor_id, req);
1303  return true;
1304  }
1305 
1306  return false;
1307 }
1308 
1310 {
1311  SUvNgHttp2_Error error("Request timeout for ");
1312  error << GetId();
1313 
1314  auto on_retry = [&](auto req) { m_Queue.Emplace(req); m_Queue.Signal(); };
1315  auto on_fail = [&](auto processor_id, auto req) { Fail(processor_id, req, error); };
1316 
1317  for (auto it = m_Requests.begin(); it != m_Requests.end(); ) {
1318  if (it->second.CheckExpiration(m_Params, error, on_retry, on_fail)) {
1319  EraseAndMoveToNext(it);
1320  } else {
1321  ++it;
1322  }
1323  }
1324 }
1325 
1327 {
1328  bool some_requests_failed = false;
1329 
1330  for (auto& pair : m_Requests) {
1331  if (auto [processor_id, req] = pair.second.Get(); req) {
1332  if (RetryFail(processor_id, req, error)) {
1333  some_requests_failed = true;
1334  }
1335  }
1336  }
1337 
1338  if (some_requests_failed) {
1339  ERR_POST("Some requests for " << GetId() << " failed with " << error);
1340  }
1341 
1342  m_Requests.clear();
1343 }
1344 
1345 
1346 /** SPSG_ThrottleParams */
1347 
1349 {
1350  if (error_rate.empty()) return;
1351 
1352  string numerator_str, denominator_str;
1353 
1354  if (!NStr::SplitInTwo(error_rate, "/", numerator_str, denominator_str)) return;
1355 
1357 
1358  int n = NStr::StringToInt(numerator_str, flags);
1359  int d = NStr::StringToInt(denominator_str, flags);
1360 
1361  if (n > 0) numerator = static_cast<size_t>(n);
1362  if (d > 1) denominator = static_cast<size_t>(d);
1363 
1364  if (denominator > kMaxDenominator) {
1367  }
1368 }
1369 
1371  period(SecondsToMs(TPSG_ThrottlePeriod::GetDefault())),
1372  max_failures(TPSG_ThrottleMaxFailures::eGetDefault),
1374  threshold(TPSG_ThrottleThreshold::GetDefault())
1375 {
1376 }
1377 
1378 
1379 /** SPSG_Throttling */
1380 
1382  m_Address(address),
1383  m_Stats(std::move(p)),
1384  m_Active(eOff),
1385  m_Timer(this, s_OnTimer, Configured(), 0)
1386 {
1387  m_Timer.Init(l);
1388  m_Signal.Init(this, l, s_OnSignal);
1389 }
1390 
1392 {
1393  m_Signal.Unref();
1394  m_Timer.Close();
1395 }
1396 
1398 {
1399  m_Signal.Ref();
1400  m_Signal.Close();
1401 }
1402 
1404 {
1405  auto stats_locked = m_Stats.GetLock();
1406 
1407  if (stats_locked->Adjust(m_Address, result)) {
1408  m_Active.store(eOnTimer);
1409 
1410  // We cannot start throttle timer from any thread (it's not thread-safe),
1411  // so we use async signal to start timer in the discovery thread
1412  m_Signal.Signal();
1413  return true;
1414  }
1415 
1416  return false;
1417 }
1418 
1420 {
1421  if (result) {
1422  failures = 0;
1423 
1424  } else if (params.max_failures && (++failures >= params.max_failures)) {
1425  ERR_POST(Warning << "Server '" << address <<
1426  "' reached the maximum number of failures in a row (" << params.max_failures << ')');
1427  Reset();
1428  return true;
1429  }
1430 
1431  if (params.threshold.numerator > 0) {
1432  auto& reg = threshold_reg.first;
1433  auto& index = threshold_reg.second;
1434  const auto failure = !result;
1435 
1436  if (reg[index] != failure) {
1437  reg[index] = failure;
1438 
1439  if (failure && (reg.count() >= params.threshold.numerator)) {
1440  ERR_POST(Warning << "Server '" << address << "' is considered bad/overloaded ("
1441  << params.threshold.numerator << '/' << params.threshold.denominator << ')');
1442  Reset();
1443  return true;
1444  }
1445  }
1446 
1447  if (++index >= params.threshold.denominator) index = 0;
1448  }
1449 
1450  return false;
1451 }
1452 
1454 {
1455  failures = 0;
1456  threshold_reg.first.reset();
1457 }
1458 
1459 
1460 /** SPSG_IoImpl */
1461 
1462 void SPSG_IoImpl::OnShutdown(uv_async_t*)
1463 {
1464  m_Queue.Unref();
1465 
1466  for (auto& server : m_Sessions) {
1467  for (auto& session : server.sessions) {
1468  session.Reset("Shutdown is in process", SUv_Tcp::eNormalClose);
1469  }
1470  }
1471 }
1472 
1474 {
1475  auto servers_locked = m_Servers.GetLock();
1476  auto& servers = *servers_locked;
1477 
1478  for (auto& server : servers) {
1479  server.throttling.StartClose();
1480  }
1481 
1482  if (m_Stats) m_Stats->Stop();
1483 }
1484 
1486 {
1487  auto servers_locked = m_Servers.GetLock();
1488  auto& servers = *servers_locked;
1489 
1490  for (auto& server : servers) {
1491  server.throttling.FinishClose();
1492  }
1493 }
1494 
1495 void SPSG_IoImpl::AddNewServers(uv_async_t* handle)
1496 {
1497  // Add new session(s) if new server(s) have been added
1498  auto servers_locked = m_Servers.GetLock();
1499  auto& servers = *servers_locked;
1500 
1501  // m_Servers->size() can be used for new_servers only after locking m_Servers to avoid a race
1502  const auto servers_size = m_Servers->size();
1503  const auto sessions_size = m_Sessions.size();
1504 
1505  _ASSERT(servers_size >= sessions_size);
1506 
1507  for (auto new_servers = servers_size - sessions_size; new_servers; --new_servers) {
1508  auto& server = servers[servers_size - new_servers];
1509  m_Sessions.emplace_back().sessions.emplace_back(server, m_Params, m_Queue, handle->loop);
1510  PSG_IO_TRACE("Session for server '" << server.address << "' was added");
1511  }
1512 }
1513 
1514 void SPSG_IoImpl::OnQueue(uv_async_t* handle)
1515 {
1516  auto available_servers = 0;
1517 
1518  for (auto& server : m_Sessions) {
1519  server.current_rate = server->rate.load();
1520 
1521  if (server.current_rate) {
1522  ++available_servers;
1523  }
1524  }
1525 
1526  auto remaining_submits = m_Params.max_concurrent_submits.Get();
1527 
1528  auto i = m_Sessions.begin();
1529  auto [timed_req, processor_id, req] = decltype(m_Queue.Pop()){};
1530  auto d = m_Random.first;
1531  auto request_rate = 0.0;
1532  auto target_rate = 0.0;
1533  _DEBUG_ARG(string req_id);
1534 
1535  // Clang requires '&timed_req = timed_req, &processor_id = processor_id, &req = req'
1536  auto get_request = [&, &timed_req = timed_req, &processor_id = processor_id, &req = req]() {
1537  tie(timed_req, processor_id, req) = m_Queue.Pop();
1538 
1539  if (!req) {
1540  PSG_IO_TRACE("No [more] requests pending");
1541  return false;
1542  }
1543 
1544  request_rate = 0.0;
1545  target_rate = d(m_Random.second);
1546  _DEBUG_CODE(req_id = req->reply->debug_printout.id;);
1547  PSG_IO_TRACE("Ready to submit request '" << req_id << '\'');
1548  return true;
1549  };
1550 
1551  auto next_server = [&]() {
1552  if (++i == m_Sessions.end()) i = m_Sessions.begin();
1553  };
1554 
1555  auto ignore_server = [&]() {
1556  if (--available_servers) {
1557  d = uniform_real_distribution<>(0.0, d.max() - i->current_rate);
1558  i->current_rate = 0.0;
1559  next_server();
1560  }
1561  };
1562 
1563  auto find_session = [&]() {
1564  auto s = i->sessions.begin();
1565  for (; (s != i->sessions.end()) && s->IsFull(); ++s);
1566  return make_pair(s != i->sessions.end(), s);
1567  };
1568 
1569  while (available_servers && remaining_submits) {
1570  // Try to get a request if needed
1571  if (!req && !get_request()) {
1572  return;
1573  }
1574 
1575  // Select a server from available according to target rate
1576  for (; request_rate += i->current_rate, request_rate < target_rate; next_server());
1577 
1578  auto& server = *i;
1579 
1580  // If throttling has been activated (possibly in a different thread)
1581  if (server->throttling.Active()) {
1582  PSG_IO_TRACE("Server '" << server->address << "' is throttled, ignoring");
1583  ignore_server();
1584 
1585  // If server has reached its limit (possibly in a different thread)
1586  } else if (server->available_streams <= 0) {
1587  PSG_IO_TRACE("Server '" << server->address << "' is at request limit, ignoring");
1588  ignore_server();
1589 
1590  // If all server sessions are full
1591  } else if (auto [found, session] = find_session(); !found) {
1592  PSG_IO_TRACE("Server '" << server->address << "' has no sessions available, ignoring");
1593  ignore_server();
1594 
1595  // If this is a competitive stream, try a different server
1596  } else if (!session->CanProcessRequest(req)) {
1597  PSG_IO_TRACE("Server '" << session->GetId() << "' already working/worked on the request, trying to find another one");
1598  // Reset submitter ID and move to next server (the same server will be submitted to if it's the only one available)
1599  req->submitted_by.Reset();
1600  next_server();
1601 
1602  // If failed to submit
1603  } else if (!session->ProcessRequest(*std::move(timed_req), processor_id, std::move(req))) {
1604  PSG_IO_TRACE("Server '" << session->GetId() << "' failed to get request '" << req_id << "' with rate = " << target_rate);
1605  next_server();
1606 
1607  // Submitted successfully
1608  } else {
1609  PSG_IO_TRACE("Server '" << session->GetId() << "' got request '" << req_id << "' with rate = " << target_rate);
1610  --remaining_submits;
1611  ++server->stats;
1612 
1613  // Add new session if needed and allowed to
1614  if (session->IsFull() && (distance(session, server.sessions.end()) == 1)) {
1615  if (server.sessions.size() >= TPSG_MaxSessions::GetDefault()) {
1616  PSG_IO_TRACE("Server '" << server->address << "' reached session limit");
1617  ignore_server();
1618  } else {
1619  server.sessions.emplace_back(*server, m_Params, m_Queue, handle->loop);
1620  PSG_IO_TRACE("Additional session for server '" << server->address << "' was added");
1621  }
1622  }
1623  }
1624  }
1625 
1626  if (req) {
1627  // Do not need to signal here
1628  m_Queue.Emplace(*std::move(timed_req));
1629  }
1630 
1631  if (!remaining_submits) {
1632  PSG_IO_TRACE("Max concurrent submits reached, submitted: " << m_Params.max_concurrent_submits);
1633  m_Queue.Signal();
1634  } else {
1635  PSG_IO_TRACE("No sessions available [anymore], submitted: " << m_Params.max_concurrent_submits - remaining_submits);
1636  }
1637 }
1638 
1639 void SPSG_DiscoveryImpl::OnTimer(uv_timer_t* handle)
1640 {
1641  const auto kRegularRate = nextafter(0.009, 1.0);
1642  const auto kStandbyRate = 0.001;
1643 
1644  const auto& service_name = m_Service.GetServiceName();
1645  auto discovered = m_Service();
1646 
1647  auto total_preferred_regular_rate = 0.0;
1648  auto total_preferred_standby_rate = 0.0;
1649  auto total_regular_rate = 0.0;
1650  auto total_standby_rate = 0.0;
1651 
1652  // Accumulate totals
1653  for (auto& server : discovered) {
1654  const auto is_server_preferred = server.first.host == CSocketAPI::GetLocalHostAddress();
1655 
1656  if (server.second >= kRegularRate) {
1657  if (is_server_preferred) {
1658  total_preferred_regular_rate += server.second;
1659  }
1660 
1661  total_regular_rate += server.second;
1662 
1663  } else if (server.second >= kStandbyRate) {
1664  if (is_server_preferred) {
1665  total_preferred_standby_rate += server.second;
1666  }
1667 
1668  total_standby_rate += server.second;
1669  }
1670  }
1671 
1672  if (m_NoServers(total_regular_rate || total_standby_rate, SUv_Timer::GetThat<SUv_Timer>(handle))) {
1673  ERR_POST("No servers in service '" << service_name << '\'');
1674  return;
1675  }
1676 
1677  const auto localhost_preference = TPSG_LocalhostPreference::GetDefault();
1678  const auto total_preferred_standby_percentage = localhost_preference ? 1.0 - 1.0 / localhost_preference : 0.0;
1679  const auto have_any_regular_servers = total_regular_rate > 0.0;
1680  const auto have_nonpreferred_regular_servers = total_regular_rate > total_preferred_regular_rate;
1681  const auto have_nonpreferred_standby_servers = total_standby_rate > total_preferred_standby_rate;
1682  const auto regular_rate_adjustment =
1683  (localhost_preference <= 1) ||
1684  (total_preferred_regular_rate != 0.0) ||
1685  (total_preferred_standby_rate == 0.0) ||
1686  (!have_nonpreferred_regular_servers && !have_nonpreferred_standby_servers);
1687  auto rate_total = 0.0;
1688 
1689  // Adjust discovered rates
1690  for (auto& server : discovered) {
1691  const auto is_server_preferred = server.first.host == CSocketAPI::GetLocalHostAddress();
1692  const auto old_rate = server.second;
1693 
1694  if (server.second >= kRegularRate) {
1695  if (is_server_preferred) {
1696  if (regular_rate_adjustment) {
1697  server.second *= localhost_preference;
1698  } else if (have_nonpreferred_regular_servers) {
1699  server.second = 0.0;
1700  } else if (have_nonpreferred_standby_servers) {
1701  server.second = 0.0;
1702  }
1703  } else {
1704  if (regular_rate_adjustment) {
1705  // No adjustments
1706  } else if (have_nonpreferred_regular_servers) {
1707  server.second *= (1 - total_preferred_standby_percentage) / total_regular_rate;
1708  } else if (have_nonpreferred_standby_servers) {
1709  server.second = 0.0;
1710  }
1711  }
1712  } else if (server.second >= kStandbyRate) {
1713  if (is_server_preferred) {
1714  if (regular_rate_adjustment && have_any_regular_servers) {
1715  server.second = 0.0;
1716  } else if (regular_rate_adjustment) {
1717  server.second *= localhost_preference;
1718  } else if (have_nonpreferred_regular_servers) {
1719  server.second *= total_preferred_standby_percentage / total_preferred_standby_rate;
1720  } else if (have_nonpreferred_standby_servers) {
1721  server.second *= total_preferred_standby_percentage / total_preferred_standby_rate;
1722  }
1723  } else {
1724  if (regular_rate_adjustment && have_any_regular_servers && (localhost_preference || have_nonpreferred_regular_servers)) {
1725  server.second = 0.0;
1726  } else if (regular_rate_adjustment) {
1727  // No adjustments
1728  } else if (have_nonpreferred_regular_servers) {
1729  server.second = 0.0;
1730  } else if (have_nonpreferred_standby_servers) {
1731  server.second *= (1 - total_preferred_standby_percentage) / (total_standby_rate - total_preferred_standby_rate);
1732  }
1733  }
1734  }
1735 
1736  rate_total += server.second;
1737 
1738  if (old_rate != server.second) {
1739  PSG_DISCOVERY_TRACE("Rate for '" << server.first << "' adjusted from " << old_rate << " to " << server.second);
1740  }
1741  }
1742 
1743  auto servers_locked = m_Servers.GetLock();
1744  auto& servers = *servers_locked;
1745 
1746  // Update existing servers
1747  for (auto& server : servers) {
1748  auto address_same = [&](CServiceDiscovery::TServer& s) { return s.first == server.address; };
1749  auto it = find_if(discovered.begin(), discovered.end(), address_same);
1750 
1751  if ((it == discovered.end()) || (it->second <= numeric_limits<double>::epsilon())) {
1752  server.rate = 0.0;
1753  PSG_DISCOVERY_TRACE("Server '" << server.address << "' disabled in service '" << service_name << '\'');
1754 
1755  } else {
1756  server.throttling.Discovered();
1757  auto rate = it->second / rate_total;
1758 
1759  if (server.rate != rate) {
1760  // This has to be before the rate change for the condition to work (uses old rate)
1761  PSG_DISCOVERY_TRACE("Server '" << server.address <<
1762  (server.rate ? "' updated in service '" : "' enabled in service '" ) <<
1763  service_name << "' with rate = " << rate);
1764 
1765  server.rate = rate;
1766  }
1767 
1768  // Reset rate to avoid adding the server below
1769  it->second = 0.0;
1770  }
1771  }
1772 
1773  // Add new servers
1774  for (auto& server : discovered) {
1775  if (server.second > numeric_limits<double>::epsilon()) {
1776  auto rate = server.second / rate_total;
1777  servers.emplace_back(server.first, rate, m_Params.max_concurrent_requests_per_server, m_ThrottleParams, handle->loop);
1778  _DEBUG_CODE(server.first.GetHostName();); // To avoid splitting the trace message below by gethostbyaddr
1779  PSG_DISCOVERY_TRACE("Server '" << server.first << "' added to service '" <<
1780  service_name << "' with rate = " << rate);
1781  }
1782  }
1783 
1784  m_QueuesRef.SignalAll();
1785 }
1786 
1787 void SPSG_IoImpl::OnTimer(uv_timer_t*)
1788 {
1789  if (m_Servers->fail_requests) {
1790  FailRequests();
1791  } else {
1792  CheckRequestExpiration();
1793  }
1794 
1795  for (auto& server : m_Sessions) {
1796  for (auto& session : server.sessions) {
1797  session.CheckRequestExpiration();
1798  }
1799  }
1800 }
1801 
1803 {
1804  auto queue_locked = m_Queue.GetLockedQueue();
1805  list<SPSG_TimedRequest> retries;
1806  SUvNgHttp2_Error error("Request timeout before submitting");
1807 
1808  auto on_retry = [&](auto req) { retries.emplace_back(req); m_Queue.Signal(); };
1809  auto on_fail = [&](auto processor_id, auto req) { req->Fail(processor_id, error); };
1810 
1811  for (auto it = queue_locked->begin(); it != queue_locked->end(); ) {
1812  if (it->CheckExpiration(m_Params, error, on_retry, on_fail)) {
1813  it = queue_locked->erase(it);
1814  } else {
1815  ++it;
1816  }
1817  }
1818 
1819  queue_locked->splice(queue_locked->end(), retries);
1820 }
1821 
1823 {
1824  auto queue_locked = m_Queue.GetLockedQueue();
1825  SUvNgHttp2_Error error("No servers to process request");
1826 
1827  for (auto& timed_req : *queue_locked) {
1828  if (auto [processor_id, req] = timed_req.Get(); req) {
1829  auto context_guard = req->context.Set();
1830  auto& debug_printout = req->reply->debug_printout;
1831  debug_printout << error << endl;
1832  req->OnReplyDone(processor_id)->SetFailed(error);
1833  PSG_IO_TRACE("No servers to process request '" << debug_printout.id << '\'');
1834  }
1835  }
1836 
1837  queue_locked->clear();
1838 }
1839 
1840 void SPSG_IoImpl::OnExecute(uv_loop_t& loop)
1841 {
1842  m_Queue.Init(this, &loop, s_OnQueue);
1843 }
1844 
1846 {
1847  m_Queue.Ref();
1848  m_Queue.Close();
1849  m_Sessions.clear();
1850 }
1851 
1853  m_RetryDelay(SecondsToMs(TPSG_NoServersRetryDelay::GetDefault())),
1854  m_Timeout(SecondsToMs((params.request_timeout + params.competitive_after * params.request_retries) * params.io_timer_period)),
1855  m_FailRequests(const_cast<atomic_bool&>(servers->fail_requests))
1856 {
1857 }
1858 
1860 {
1861  // If there is a separate timer set for the case of no servers discovered
1862  if (m_RetryDelay) {
1863  if (discovered) {
1864  timer->ResetRepeat();
1865  } else {
1866  timer->SetRepeat(m_RetryDelay);
1867  }
1868  }
1869 
1870  // If there is a request timeout set
1871  if (m_Timeout) {
1872  const auto timeout_expired = m_Passed >= m_Timeout;
1873  m_FailRequests = timeout_expired;
1874 
1875  if (discovered) {
1876  m_Passed = 0;
1877  } else if (!timeout_expired) {
1878  // Passed is increased after fail flag is set, the flag would be set too early otherwise
1879  m_Passed += m_RetryDelay ? m_RetryDelay : timer->GetDefaultRepeat();
1880  }
1881  }
1882 
1883  return !discovered;
1884 }
1885 
1886 
1887 /** SPSG_IoCoordinator */
1888 
1889 shared_ptr<SPSG_Stats> s_GetStats(SPSG_Servers::TTS& servers)
1890 {
1891  if (TPSG_Stats::GetDefault()) {
1892  return make_shared<SPSG_Stats>(servers);
1893  } else {
1894  return {};
1895  }
1896 }
1897 
1899 {
1900  return service.IsSingleServer() ? 0 : SecondsToMs(TPSG_RebalanceTime::GetDefault());
1901 }
1902 
1905  m_StartBarrier(TPSG_NumIo::GetDefault() + 2),
1906  m_StopBarrier(TPSG_NumIo::GetDefault() + 1),
1907  m_Discovery(m_StartBarrier, m_StopBarrier, 0, s_GetDiscoveryRepeat(service), service, stats, params, m_Servers, m_Queues),
1908  m_RequestCounter(0),
1909  m_RequestId(1)
1910 {
1911  const auto io_timer_period = SecondsToMs(params.io_timer_period);
1912 
1913  for (unsigned i = 0; i < TPSG_NumIo::GetDefault(); i++) {
1914  // This timing cannot be changed without changes in SPSG_IoSession::CheckRequestExpiration
1915  m_Io.emplace_back(new SPSG_Thread<SPSG_IoImpl>(m_StartBarrier, m_StopBarrier, io_timer_period, io_timer_period, params, m_Servers, m_Queues.emplace_back(m_Queues)));
1916  }
1917 
1918  m_StartBarrier.Wait();
1919 }
1920 
1922 {
1924 
1925  for (auto& io : m_Io) {
1926  io->Shutdown();
1927  }
1928 }
1929 
1930 bool SPSG_IoCoordinator::AddRequest(shared_ptr<SPSG_Request> req, const atomic_bool&, const CDeadline&)
1931 {
1932  if (m_Io.size() == 0) {
1933  ERR_POST(Fatal << "IO is not open");
1934  }
1935 
1936  const auto idx = (m_RequestCounter++ / params.requests_per_io) % m_Io.size();
1937  m_Queues[idx].Emplace(std::move(req));
1938  m_Queues[idx].Signal();
1939  return true;
1940 }
1941 
1942 
1944 
1945 #endif
CDeadline.
Definition: ncbitime.hpp:1830
CNcbiEnvironment –.
Definition: ncbienv.hpp:104
Blob unique ID.
Definition: psg_client.hpp:226
Chunk unique ID.
Definition: psg_client.hpp:265
@ eEndOfReply
No more items expected in the (overall!) reply.
Definition: psg_client.hpp:675
bool IsSingleServer() const
pair< SSocketAddress, double > TServer
size_type size() const
Definition: map.hpp:148
Definition: map.hpp:338
char value[7]
Definition: config.c:431
constexpr const tuple_element< _Index, ct_const_tuple< _Types... > >::type & get(const ct_const_tuple< _Types... > &_Tuple) noexcept
Definition: const_tuple.hpp:97
static uch flags
static const char ip[]
Definition: des.c:75
unsigned short TGroup
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
Definition: dlist.tmpl.h:46
@ eOff
Definition: ncbi_types.h:110
#define _DEBUG_CODE(code)
Definition: ncbidbg.hpp:136
#define _DEBUG_ARG(arg)
Definition: ncbidbg.hpp:134
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
Definition: ncbidiag.cpp:1907
string GetSessionID(void) const
Session ID.
const string & GetNextSubHitID(CTempString prefix=CTempString())
Get current hit id appended with auto-incremented sub-hit id.
static string GetStdStatusMessage(ECode code)
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
Definition: ncbidiag.cpp:1901
string GetClientIP(void) const
Client IP/hostname.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Trace
Trace message.
Definition: ncbidiag.hpp:657
@ eDiag_Info
Informational message.
Definition: ncbidiag.hpp:651
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
@ eDiag_Warning
Warning message.
Definition: ncbidiag.hpp:652
@ eDiag_Fatal
Fatal error – guarantees exit(or abort)
Definition: ncbidiag.hpp:655
@ eDiag_Critical
Critical error message.
Definition: ncbidiag.hpp:654
@ e451_Unavailable_For_Legal_Reasons
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
void Fatal(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1209
const float epsilon
Definition: math.hpp:61
void Add(const CHttpCookie &cookie)
Add a single cookie.
const CSeq_id & GetId(const CSeq_loc &loc, CScope *scope)
If all CSeq_ids embedded in CSeq_loc refer to the same CBioseq, returns the first CSeq_id found,...
@ eParam_Default
Default flags.
Definition: ncbi_param.hpp:416
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
static unsigned int GetLocalHostAddress(ESwitch reget=eDefault)
Local host address in network byte order (cached for faster retrieval)
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
Definition: ncbistr.cpp:3949
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
Definition: ncbistr.cpp:630
static string URLDecode(const CTempString str, EUrlDecode flag=eUrlDec_All)
URL-decode string.
Definition: ncbistr.cpp:6210
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3550
@ fAllowTrailingSpaces
Ignore trailing space characters.
Definition: ncbistr.hpp:297
@ fConvErr_NoThrow
Do not throw an exception on error.
Definition: ncbistr.hpp:285
@ fAllowLeadingSpaces
Ignore leading spaces in converted string.
Definition: ncbistr.hpp:294
string GetQueryString(EAmpEncoding amp_enc, NStr::EUrlEncode encode) const
Construct and return complete query string.
Definition: ncbi_url.cpp:225
list< TArg > TArgs
Definition: ncbi_url.hpp:276
@ eAmp_Char
Use & to separate arguments.
Definition: ncbi_url.hpp:254
static string kPrefix
Definition: id2info.cpp:146
where both of them are integers Note
int i
yy_size_t n
int len
NCBI_PARAM_TYPE(PSG, throttle_by_connection_error_rate) TPSG_ThrottleThreshold
Definition: misc.hpp:368
EPSG_UseCache
Definition: misc.hpp:374
SPSG_ParamValue< NCBI_PARAM_TYPE(PSG, auth_token)> TPSG_AuthToken
Definition: misc.hpp:350
EPSG_DebugPrintout
Definition: misc.hpp:370
EPSG_PsgClientMode
Definition: misc.hpp:379
NCBI_PARAM_TYPE(PSG, throttle_relaxation_period) TPSG_ThrottlePeriod
Definition: misc.hpp:359
const TYPE & Get(const CNamedParameterList *param)
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::CREATED created
int strcmp(const char *str1, const char *str2)
Definition: odbc_utils.hpp:160
T max(T x_, T y_)
T min(T x_, T y_)
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
double f(double x_, const double &y_)
Definition: njn_root.hpp:188
static const char * expected[]
Definition: bcp.c:42
static bool less(const CSeq_feat *A, const CSeq_feat *B)
static const char * prefix[]
Definition: pcregrep.c:405
EPSG_Status
Retrieval result.
Definition: psg_client.hpp:626
@ eSuccess
Successfully retrieved.
@ eInProgress
Retrieval is not finalized yet, more info may come.
@ eForbidden
User is not authorized for the retrieval.
@ eCanceled
Request canceled.
@ eError
An error was encountered while trying to send request or to read and to process the reply.
@ eNotFound
Not found.
uint64_t s_GetStatsPeriod()
NCBI_PARAM_ENUM_DEF(EPSG_DebugPrintout, PSG, debug_printout, EPSG_DebugPrintout::eNone)
auto s_GetCode(const string &code)
NCBI_PARAM_DEF(double, PSG, request_timeout, 10.0)
#define HTTP_STATUS_HEADER
EPSG_StatsCountersRetries
@ ePSG_StatsCountersRetries_Retry
@ ePSG_StatsCountersRetries_Timeout
uint64_t s_GetDiscoveryRepeat(const CServiceDiscovery &service)
EDiagSev s_GetSeverity(const string &severity)
PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, rd_buf_size, 64 *1024, 1024)
shared_ptr< SPSG_Stats > s_GetStats(SPSG_Servers::TTS &servers)
SPSG_IoCoordinator.
NCBI_PARAM_DEF_EX(string, PSG, service, "PSG2", eParam_Default, NCBI_PSG_SERVICE)
NCBI_PARAM_ENUM_ARRAY(EPSG_DebugPrintout, PSG, debug_printout)
#define PSG_IO_SESSION_TRACE(message)
#define PSG_THROTTLING_TRACE(message)
#define PSG_IO_TRACE(message)
#define PSG_DISCOVERY_TRACE(message)
uint64_t SecondsToMs(double seconds)
Defines CRequestStatus class for NCBI C++ diagnostic API.
unsigned int uint32_t
Definition: stdint.h:126
signed int int32_t
Definition: stdint.h:123
unsigned char uint8_t
Definition: stdint.h:124
unsigned __int64 uint64_t
Definition: stdint.h:136
void Print(SSocketAddress address, const string &path, const string &sid, const string &phid, const string &ip, SUv_Tcp::TPort port)
int32_t Submit(const nghttp2_nv *nva, size_t nvlen, nghttp2_data_provider *data_prd=nullptr)
auto GetValue() const
void Emplace(TArgs &&... args)
void NotifyOne() volatile
Definition: misc.hpp:59
bool WaitUntil(TArgs &&... args) volatile
Definition: misc.hpp:63
SNoServers(const SPSG_Params &params, SPSG_Servers::TTS &servers)
bool operator()(bool discovered, SUv_Timer *timer)
void OnShutdown(uv_async_t *)
void OnTimer(uv_timer_t *handle)
SPSG_Servers::TTS & m_Servers
bool AddRequest(shared_ptr< SPSG_Request > req, const atomic_bool &stopped, const CDeadline &deadline)
SPSG_Servers::TTS m_Servers
SPSG_IoCoordinator(CServiceDiscovery service)
atomic< size_t > m_RequestCounter
SPSG_Thread< SPSG_DiscoveryImpl > m_Discovery
vector< unique_ptr< SPSG_Thread< SPSG_IoImpl > > > m_Io
void OnTimer(uv_timer_t *handle)
void OnQueue(uv_async_t *handle)
void OnShutdown(uv_async_t *handle)
SPSG_IoImpl.
void OnExecute(uv_loop_t &loop)
void AddNewServers(uv_async_t *handle)
int OnStreamClose(nghttp2_session *session, int32_t stream_id, uint32_t error_code)
bool Fail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)
bool RetryFail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)
void EraseAndMoveToNext(TRequests::iterator &it)
SPSG_AsyncQueue & m_Queue
SPSG_Submitter::TId GetInternalId() const
SPSG_IoSession(SPSG_Server &s, const SPSG_Params &params, SPSG_AsyncQueue &queue, uv_loop_t *loop, TNgHttp2Cbs &&... callbacks)
SPSG_IoSession.
bool ProcessRequest(SPSG_TimedRequest timed_req, SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req)
int OnHeader(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags)
void OnReset(SUvNgHttp2_Error error) override
array< SNgHttp2_Header< NGHTTP2_NV_FLAG_NO_COPY_NAME >, eSize > m_Headers
int OnData(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len)
bool Cmp(TValue s) const
TValue Get() const
Definition: misc.hpp:208
static string s_GetAuthToken(const CNcbiEnvironment &env, const TPSG_AuthTokenName &auth_token_name)
TPSG_RequestsPerIo requests_per_io
static unsigned s_GetCompetitiveAfter(double io_timer_period, double timeout)
string GetCookie(const CPSG_Request::TFlags &request_flags, TGet get)
const unsigned competitive_after
static unsigned s_GetRequestTimeout(double io_timer_period)
TPSG_IoTimerPeriod io_timer_period
const unsigned request_timeout
SPSG_Nullable< size_t > expected
vector< SPSG_Chunk > chunks
const volatile atomic_bool & InProgress() const volatile
void AddMessage(string message, EDiagSev severity, optional< int > code)
void SetStatus(EPSG_Status status, bool reset) volatile
void AddError(string message, EPSG_Status status=EPSG_Status::eError, EDiagSev severity=eDiag_Error, optional< int > code=nullopt)
deque< SPSG_Message > m_Messages
SPSG_Message GetMessage(EDiagSev min_severity)
static EPSG_Status FromRequestStatus(int status)
shared_ptr< TPSG_Queue > queue
void SetFailed(string message, EPSG_Status status=EPSG_Status::eError)
SThreadSafe< list< SItem::TTS * > > new_items
SThreadSafe< list< SItem::TTS > > items
optional< SItem::TTS * > GetNextItem(CDeadline deadline)
unsigned GetRetries(SPSG_Retries::EType type, bool refused_stream)
unordered_map< string, SPSG_Reply::SItem::TTS * > m_ItemsByID
EStateResult Add()
SPSG_Processor processed_by
SPSG_Retries m_Retries
bool Retry(const SUvNgHttp2_Error &error, bool refused_stream=false)
EStateResult StateData(const char *&data, size_t &len)
shared_ptr< SPSG_Reply > reply
SPSG_Request(string p, CPSG_Request::TFlags f, shared_ptr< SPSG_Reply > r, CRef< CRequestContext > c, const SPSG_Params &params)
bool Fail(SPSG_Processor::TId processor_id, const SUvNgHttp2_Error &error, bool refused_stream=false)
EStateResult StatePrefix(const char *&data, size_t &len)
auto & OnReplyDone(SPSG_Processor::TId processor_id)
EUpdateResult UpdateItem(SPSG_Args::EItemType item_type, SPSG_Reply::SItem &item, const SPSG_Args &args)
EStateResult StateArgs(const char *&data, size_t &len)
SPSG_Throttling throttling
atomic_int available_streams
const SSocketAddress address
vector< pair< atomic_uint64_t, atomic_uint > > m_Data
void Report(const char *prefix, unsigned report)
static const char * GetName(EAvgTime avg_time)
static void Func(TData &data)
static void Func(const TData &data, const char *prefix, unsigned report)
void Apply(EGroup start_with, TArgs &&... args)
void Report(TArgs &&... args)
vector< vector< atomic_uint > > TData
void Report(const char *prefix, unsigned report, const char *name)
SData< CPSG_BlobId > m_Blobs
void Report(const char *prefix, unsigned report)
SThreadSafe< unordered_set< string > > m_TSEs
SData< CPSG_ChunkId > m_Chunks
SPSG_Servers::TTS & m_Servers
SPSG_Stats(SPSG_Servers::TTS &servers)
SThreshold(string error_rate)
SPSG_ThrottleParams.
constexpr static size_t kMaxDenominator
TPSG_ThrottleUntilDiscovery until_discovery
const volatile uint64_t period
TPSG_ThrottleMaxFailures max_failures
pair< bitset< SPSG_ThrottleParams::SThreshold::kMaxDenominator >, size_t > threshold_reg
bool Adjust(const SSocketAddress &address, bool result)
SPSG_Throttling(const SSocketAddress &address, SPSG_ThrottleParams p, uv_loop_t *l)
SPSG_Throttling.
atomic< EThrottling > m_Active
SThreadSafe< SStats > m_Stats
static void s_OnSignal(uv_async_t *handle)
bool Adjust(bool result)
const SSocketAddress & m_Address
bool CheckExpiration(const SPSG_Params &params, const SUvNgHttp2_Error &error, TOnRetry on_retry, TOnFail on_fail)
SLock< TType > GetLock()
static SUvNgHttp2_Error FromNgHttp2(T e, const char *w)
static const char * NgHttp2Str(T e)
void Reset(SUvNgHttp2_Error error, SUv_Tcp::ECloseType close_type=SUv_Tcp::eCloseReset)
SNgHttp2_Session m_Session
pair< string, string > TCred
static const string & Get()
void Init(void *d, uv_loop_t *l, uv_async_cb cb)
TPort GetLocalPort() const
unsigned short TPort
uint64_t GetDefaultRepeat() const
void ResetRepeat()
void Init(uv_loop_t *l)
void SetRepeat(uint64_t r)
Definition: inftrees.h:24
Definition: type.c:6
static int failure
Definition: t0019.c:11
#define _TROUBLE
#define _ASSERT
int g(Seg_Gsm *spe, Seq_Mtf *psm, Thd_Gsm *tdg)
Definition: thrddgri.c:44
else result
Definition: token2.c:20
static HENV env
Definition: transaction2.c:38
Modified on Sat Dec 09 04:49:09 2023 by modify_doxy.py rev. 669887