NCBI C++ ToolKit
psg_client_transport.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: psg_client_transport.cpp 101788 2024-02-12 18:28:17Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitri Dmitrienko, Rafael Sadyrov
27  *
28  */
29 
30 #include <ncbi_pch.hpp>
31 
33 
34 #ifdef HAVE_PSG_CLIENT
35 
36 #include <memory>
37 #include <string>
38 #include <sstream>
39 #include <list>
40 #include <vector>
41 #include <cassert>
42 #include <exception>
43 #include <thread>
44 #include <unordered_map>
45 #include <type_traits>
46 #include <utility>
47 #include <functional>
48 #include <numeric>
49 #include <cmath>
50 
51 #define __STDC_FORMAT_MACROS
52 
54 
55 #include "psg_client_transport.hpp"
56 
58 
59 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, rd_buf_size, 64 * 1024, 1024 );
60 PSG_PARAM_VALUE_DEF_MIN(size_t, PSG, wr_buf_size, 64 * 1024, 1024 );
61 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, max_concurrent_streams, 100, 10 );
62 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, max_concurrent_submits, 150, 1 );
63 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, max_sessions, 40, 1 );
64 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, max_concurrent_requests_per_server, 500, 100 );
65 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, num_io, 6, 1 );
66 PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, reader_timeout, 12, 1 );
67 PSG_PARAM_VALUE_DEF_MIN(double, PSG, rebalance_time, 10.0, 1.0 );
68 PSG_PARAM_VALUE_DEF_MIN(size_t, PSG, requests_per_io, 1, 1 );
69 PSG_PARAM_VALUE_DEF_MIN(double, PSG, io_timer_period, 1.0, 0.1 );
70 NCBI_PARAM_DEF(double, PSG, request_timeout, 10.0);
71 NCBI_PARAM_DEF(double, PSG, competitive_after, 0.0);
72 NCBI_PARAM_DEF(unsigned, PSG, request_retries, 2);
73 NCBI_PARAM_DEF(unsigned, PSG, refused_stream_retries, 2);
74 NCBI_PARAM_DEF(string, PSG, request_user_args, "");
75 NCBI_PARAM_DEF(bool, PSG, user_request_ids, false);
76 NCBI_PARAM_DEF(unsigned, PSG, localhost_preference, 1);
77 NCBI_PARAM_DEF(bool, PSG, fail_on_unknown_items, false);
78 NCBI_PARAM_DEF(bool, PSG, fail_on_unknown_chunks, false);
79 NCBI_PARAM_DEF(bool, PSG, https, false);
80 NCBI_PARAM_DEF(double, PSG, no_servers_retry_delay, 1.0);
81 NCBI_PARAM_DEF(bool, PSG, stats, false);
82 NCBI_PARAM_DEF(double, PSG, stats_period, 0.0);
83 NCBI_PARAM_DEF_EX(string, PSG, service, "PSG2", eParam_Default, NCBI_PSG_SERVICE);
84 NCBI_PARAM_DEF_EX(string, PSG, auth_token_name, "WebCubbyUser", eParam_Default, NCBI_PSG_AUTH_TOKEN_NAME);
85 NCBI_PARAM_DEF_EX(string, PSG, auth_token, "", eParam_Default, NCBI_PSG_AUTH_TOKEN);
86 NCBI_PARAM_DEF_EX(string, PSG, admin_auth_token_name, "AdminAuthToken", eParam_Default, NCBI_PSG_ADMIN_AUTH_TOKEN_NAME);
87 NCBI_PARAM_DEF_EX(string, PSG, admin_auth_token, "", eParam_Default, NCBI_PSG_ADMIN_AUTH_TOKEN);
88 
89 NCBI_PARAM_DEF(double, PSG, throttle_relaxation_period, 0.0);
90 NCBI_PARAM_DEF(unsigned, PSG, throttle_by_consecutive_connection_failures, 0);
91 NCBI_PARAM_DEF(bool, PSG, throttle_hold_until_active_in_lb, false);
92 NCBI_PARAM_DEF(string, PSG, throttle_by_connection_error_rate, "");
93 
95 {
96  { "none", EPSG_DebugPrintout::eNone },
97  { "some", EPSG_DebugPrintout::eSome },
98  { "all", EPSG_DebugPrintout::eAll }
99 };
101 
103 {
104  { "default", EPSG_UseCache::eDefault },
105  { "no", EPSG_UseCache::eNo },
106  { "yes", EPSG_UseCache::eYes }
107 };
109 
110 // Performance reporting/request IDs for psg_client app
111 NCBI_PARAM_ENUM_ARRAY(EPSG_PsgClientMode, PSG, internal_psg_client_mode)
112 {
113  { "off", EPSG_PsgClientMode::eOff },
114  { "performance", EPSG_PsgClientMode::ePerformance },
115 };
117 
119 {
120  if (value == "bioseq_info") return { SPSG_ArgsBase::eBioseqInfo, cref(value) };
121  if (value == "blob_prop") return { SPSG_ArgsBase::eBlobProp, cref(value) };
122  if (value == "blob") return { SPSG_ArgsBase::eBlob, cref(value) };
123  if (value == "reply") return { SPSG_ArgsBase::eReply, cref(value) };
124  if (value == "bioseq_na") return { SPSG_ArgsBase::eBioseqNa, cref(value) };
125  if (value == "na_status") return { SPSG_ArgsBase::eNaStatus, cref(value) };
126  if (value == "public_comment") return { SPSG_ArgsBase::ePublicComment, cref(value) };
127  if (value == "processor") return { SPSG_ArgsBase::eProcessor, cref(value) };
128  if (value == "ipg_info") return { SPSG_ArgsBase::eIpgInfo, cref(value) };
129  if (value.empty()) return { SPSG_ArgsBase::eReply, cref(value) };
130  return { SPSG_ArgsBase::eUnknownItem, cref(value) };
131 };
132 
134 {
135  if (value == "meta") return { SPSG_ArgsBase::eMeta, cref(value) };
136  if (value == "data") return { SPSG_ArgsBase::eData, cref(value) };
137  if (value == "message") return { SPSG_ArgsBase::eMessage, cref(value) };
138  if (value == "data_and_meta") return { SPSG_ArgsBase::eDataAndMeta, cref(value) };
139  if (value == "message_and_meta") return { SPSG_ArgsBase::eMessageAndMeta, cref(value) };
140  return { SPSG_ArgsBase::eUnknownChunk, cref(value) };
141 };
142 
143 string SPSG_Env::GetCookie(const string& name) const
144 {
145  if (!m_Cookies) {
146  m_Cookies.emplace();
147  m_Cookies->Add(CHttpCookies::eHeader_Cookie, Get("HTTP_COOKIE"), nullptr);
148  }
149 
150  for (const auto& cookie : *m_Cookies) {
151  if (cookie.GetName() == name) {
152  return NStr::URLDecode(cookie.GetValue());
153  }
154  }
155 
156  return {};
157 }
158 
159 string SPSG_Params::GetCookie(function<string()> get_auth_token)
160 {
161  auto combine = [](auto p, auto v) { return v.empty()? v : p.Get() + '=' + NStr::URLEncode(v); };
162 
163  auto admin_cookie = combine(admin_auth_token_name, admin_auth_token.Get());
164  auto hup_cookie = combine(auth_token_name, auth_token.Get().empty() ? get_auth_token() : auth_token.Get());
165 
166  return admin_cookie.empty() ? hup_cookie : hup_cookie.empty() ? admin_cookie : admin_cookie + "; " + hup_cookie;
167 }
168 
169 unsigned SPSG_Params::s_GetRequestTimeout(double io_timer_period)
170 {
171  auto value = TPSG_RequestTimeout::GetDefault();
172 
173  if (value < io_timer_period) {
174  ERR_POST(Warning << "[PSG] request_timeout ('" << value << "')"
175  " was increased to the minimum allowed value ('" << io_timer_period << "')");
176  value = io_timer_period;
177  }
178 
179  return static_cast<unsigned>(value / io_timer_period);
180 }
181 
182 unsigned SPSG_Params::s_GetCompetitiveAfter(double io_timer_period, double timeout)
183 {
184  auto value = TPSG_CompetitiveAfter::GetDefault();
185  timeout *= io_timer_period;
186 
187  if ((value > 0.0) && (value < io_timer_period)) {
188  ERR_POST(Warning << "[PSG] competitive_after ('" << value << "')"
189  " was increased to the minimum allowed value ('" << io_timer_period << "')");
190  value = io_timer_period;
191  }
192 
193  if (value >= timeout) {
194  ERR_POST(Warning << "[PSG] competitive_after ('" << value << "') was disabled, "
195  "as it was greater or equal to request timeout ('" << timeout << "')");
196  } else if (value > 0.0) {
197  timeout = value;
198  }
199 
200  return static_cast<unsigned>(timeout / io_timer_period);
201 }
202 
203 void SDebugPrintout::Print(SSocketAddress address, const string& path, const string& sid, const string& phid, const string& ip, SUv_Tcp::TPort port)
204 {
205  ostringstream os;
206 
207  if (!ip.empty()) os << ";IP=" << ip;
208  if (port) os << ";PORT=" << port;
209  if (m_Params.proxy) os << ";PROXY=" << m_Params.proxy;
210 
211  ERR_POST(Message << id << ": " << address << path << ";SID=" << sid << ";PHID=" << phid << os.str());
212 }
213 
214 void SDebugPrintout::Print(const SPSG_Args& args, const SPSG_Chunk& chunk)
215 {
216  ostringstream os;
217 
218  os << args.GetQueryString(CUrlArgs::eAmp_Char) << '\n';
219 
220  if ((m_Params.debug_printout == EPSG_DebugPrintout::eAll) ||
222  os << chunk;
223  } else {
224  os << "<BINARY DATA OF " << chunk.size() << " BYTES>";
225  }
226 
227  ERR_POST(Message << id << ": " << NStr::PrintableString(os.str()));
228 }
229 
231 {
232  ERR_POST(Message << id << ": Closed with status " << SUvNgHttp2_Error::NgHttp2Str(error_code));
233 }
234 
235 void SDebugPrintout::Print(unsigned retries, const SUvNgHttp2_Error& error)
236 {
237  ERR_POST(Message << id << ": Retrying (" << retries << " retries remaining) after " << error);
238 }
239 
241 {
242  ERR_POST(Message << id << ": Gave up after " << error);
243 }
244 
246 {
247  if (IsPerf()) {
248  ostringstream os;
249 
250  for (const auto& event : m_Events) {
251  auto ms = get<0>(event);
252  auto type = get<1>(event);
253  auto thread_id = get<2>(event);
254  os << fixed << id << '\t' << ms << '\t' << type << '\t' << thread_id << '\n';
255  }
256 
257  cout << os.str() << flush;
258  }
259 }
260 
261 template <>
263 {
265  static constexpr size_t size = CPSG_Request::eChunk + 1;
266  static constexpr auto prefix = "\trequest\ttype=";
267 
268  static constexpr array<type, size> values = {
274  };
275 
276  static const char* ValueName(type value)
277  {
278  switch (value) {
279  case CPSG_Request::eBiodata: return "biodata";
280  case CPSG_Request::eResolve: return "resolve";
281  case CPSG_Request::eBlob: return "blob";
282  case CPSG_Request::eNamedAnnotInfo: return "named_annot_info";
283  case CPSG_Request::eChunk: return "chunk";
284  case CPSG_Request::eIpgResolve: return "ipg_resolve";
285  }
286 
287  // Should not happen
288  _TROUBLE;
289  return "unknown";
290  }
291 };
292 
293 template <>
295 {
297  static constexpr size_t size = CPSG_ReplyItem::eEndOfReply + 1;
298  static constexpr auto prefix = "\treply_item\ttype=";
299 
300  static constexpr array<type, size> values = {
311  };
312 
313  static const char* ValueName(type value)
314  {
315  switch (value) {
316  case CPSG_ReplyItem::eBlobData: return "blob_data";
317  case CPSG_ReplyItem::eBlobInfo: return "blob_info";
318  case CPSG_ReplyItem::eSkippedBlob: return "skipped_blob";
319  case CPSG_ReplyItem::eBioseqInfo: return "bioseq_info";
320  case CPSG_ReplyItem::eNamedAnnotInfo: return "named_annot_info";
321  case CPSG_ReplyItem::eNamedAnnotStatus: return "named_annot_status";
322  case CPSG_ReplyItem::ePublicComment: return "public_comment";
323  case CPSG_ReplyItem::eProcessor: return "processor";
324  case CPSG_ReplyItem::eIpgInfo: return "ipg_info";
325  case CPSG_ReplyItem::eEndOfReply: return "end_of_reply";
326  }
327 
328  // Should not happen
329  _TROUBLE;
330  return "unknown";
331  }
332 };
333 
334 template <>
336 {
338  static constexpr size_t size = CPSG_SkippedBlob::eUnknown + 1;
339  static constexpr auto prefix = "\tskipped_blob\treason=";
340 
341  static constexpr array<type, size> values = {
346  };
347 
348  static const char* ValueName(type value)
349  {
350  switch (value) {
351  case CPSG_SkippedBlob::eExcluded: return "excluded";
352  case CPSG_SkippedBlob::eInProgress: return "in_progress";
353  case CPSG_SkippedBlob::eSent: return "sent";
354  case CPSG_SkippedBlob::eUnknown: return "unknown";
355  }
356 
357  // Should not happen
358  _TROUBLE;
359  return "unknown";
360  }
361 };
362 
363 template <>
365 {
366  using type = EPSG_Status;
367  static constexpr size_t size = static_cast<size_t>(EPSG_Status::eError) + 1;
368  static constexpr auto prefix = "\treply_item_status\tstatus=";
369 
370  static constexpr array<type, size> values = {
377  };
378 
379  static const char* ValueName(type value)
380  {
381  switch (value) {
382  case EPSG_Status::eSuccess: return "success";
383  case EPSG_Status::eInProgress: return "in_progress";
384  case EPSG_Status::eNotFound: return "not_found";
385  case EPSG_Status::eCanceled: return "canceled";
386  case EPSG_Status::eForbidden: return "forbidden";
387  case EPSG_Status::eError: return "error";
388  }
389 
390  // Should not happen
391  _TROUBLE;
392  return "unknown";
393  }
394 };
395 
396 template <>
398 {
399  using type = EDiagSev;
400  static constexpr size_t size = EDiagSev::eDiag_Trace + 1;
401  static constexpr auto prefix = "\tmessage\tseverity=";
402 
403  static constexpr array<type, size> values = {
410  };
411 
412  static const char* ValueName(type value)
413  {
414  switch (value) {
415  case EDiagSev::eDiag_Info: return "info";
416  case EDiagSev::eDiag_Warning: return "warning";
417  case EDiagSev::eDiag_Error: return "error";
418  case EDiagSev::eDiag_Critical: return "critical";
419  case EDiagSev::eDiag_Fatal: return "fatal";
420  case EDiagSev::eDiag_Trace: return "trace";
421  }
422 
423  // Should not happen
424  _TROUBLE;
425  return "unknown";
426  }
427 };
428 
432 };
433 
434 template <>
436 {
438  static constexpr size_t size = ePSG_StatsCountersRetries_Timeout + 1;
439  static constexpr auto prefix = "\tretries\tevent=";
440 
441  static constexpr array<type, size> values = {
444  };
445 
446  static const char* ValueName(type value)
447  {
448  switch (value) {
449  case ePSG_StatsCountersRetries_Retry: return "retry";
450  case ePSG_StatsCountersRetries_Timeout: return "timeout";
451  }
452 
453  // Should not happen
454  _TROUBLE;
455  return "unknown";
456  }
457 };
458 
459 template <SPSG_Stats::EGroup group>
461 {
462  data.emplace_back(SGroup<group>::size);
463 
464  for (auto& counter : data.back()) {
465  counter = 0;
466  }
467 };
468 
469 template <SPSG_Stats::EGroup group>
470 void SPSG_StatsCounters::SReport::Func(const TData& data, const char* prefix, unsigned report)
471 {
472  using TGroup = SGroup<group>;
473 
474  _ASSERT(data.size() > group);
475  const auto& g = data[group];
476  _ASSERT(g.size() == TGroup::size);
477 
478  for (auto i : TGroup::values) {
479  auto n = g[static_cast<size_t>(i)].load();
480  if (n) ERR_POST(Note << prefix << report << TGroup::prefix << TGroup::ValueName(i) << "&count=" << n);
481  }
482 }
483 
485 {
486  Apply<SInit>(eRequest, m_Data);
487 }
488 
489 template <class TWhat, class... TArgs>
490 void SPSG_StatsCounters::Apply(EGroup start_with, TArgs&&... args)
491 {
492  // This method is always called with start_with == eRequest (so, all cases are run with one call, one by one).
493  // This switch usage however makes compilers warn if any enum value is missing/not handled
494  switch (start_with) {
495  case eRequest: TWhat::template Func<eRequest> (std::forward<TArgs>(args)...);
496  case eReplyItem: TWhat::template Func<eReplyItem> (std::forward<TArgs>(args)...);
497  case eSkippedBlob: TWhat::template Func<eSkippedBlob> (std::forward<TArgs>(args)...);
498  case eReplyItemStatus: TWhat::template Func<eReplyItemStatus> (std::forward<TArgs>(args)...);
499  case eMessage: TWhat::template Func<eMessage> (std::forward<TArgs>(args)...);
500  case eRetries: TWhat::template Func<eRetries> (std::forward<TArgs>(args)...);
501  }
502 }
503 
504 template <class... TArgs>
506 {
507  Apply<SReport>(eRequest, m_Data, std::forward<TArgs>(args)...);
508 }
509 
511  m_Data(eTimeUntilResend + 1)
512 {
513 }
514 
516 {
517  switch (avg_time) {
518  case SPSG_Stats::eSentSecondsAgo: return "sent_seconds_ago";
519  case SPSG_Stats::eTimeUntilResend: return "time_until_resend";
520  }
521 
522  // Should not happen
523  _TROUBLE;
524  return "unknown";
525 }
526 
527 void SPSG_StatsAvgTime::Report(const char* prefix, unsigned report)
528 {
529  for (auto i : { eSentSecondsAgo, eTimeUntilResend }) {
530  _ASSERT(m_Data.size() > i);
531  const auto& data = m_Data[i];
532  auto v = data.first.load();
533  auto n = data.second.load();
534  if (n) ERR_POST(Note << prefix << report << '\t' << GetName(i) << "\taverage=" << double(v / n) / milli::den);
535  }
536 }
537 
538 template <class TDataId>
539 void SPSG_StatsData::SData<TDataId>::Report(const char* prefix, unsigned report, const char* data_prefix)
540 {
541  struct SLess {
542  static auto Tuple(const CPSG_BlobId& id) { return tie(id.GetId(), id.GetLastModified()); }
543  static auto Tuple(const CPSG_ChunkId& id) { return tuple<int, const string&>(id.GetId2Chunk(), id.GetId2Info()); }
544  bool operator()(const TDataId& lhs, const TDataId& rhs) const { return Tuple(lhs) < Tuple(rhs); }
545  };
546 
547  size_t total = 0;
549 
550  if (auto locked = m_Ids.GetLock()) {
551  total = locked->size();
552 
553  if (!total) return;
554 
555  for (const auto& data_id : *locked) {
556  auto created = unique_ids.emplace(data_id, 1);
557  if (!created.second) ++created.first->second;
558  }
559  }
560 
561  ERR_POST(Note << prefix << report << data_prefix << "\ttotal=" << total << "&unique=" << unique_ids.size());
562 
563  auto received = m_Received.load();
564  auto read = m_Read.load();
565  if (received) ERR_POST(Note << prefix << report << data_prefix << "_data\treceived=" << received << "&read=" << read);
566 
567  map<unsigned, unsigned> group_by_count;
568 
569  for (const auto& p : unique_ids) {
570  auto created = group_by_count.emplace(p.second, 1);
571  if (!created.second) ++created.first->second;
572  }
573 
574  for (const auto& p : group_by_count) {
575  ERR_POST(Note << prefix << report << data_prefix << "_retrievals\tnumber=" << p.first << "&unique_ids=" << p.second);
576  }
577 }
578 
579 void SPSG_StatsData::Report(const char* prefix, unsigned report)
580 {
581  m_Blobs.Report(prefix, report, "\tblob");
582  m_Chunks.Report(prefix, report, "\tchunk");
583  if (auto n = m_TSEs.GetLock()->size()) ERR_POST(Note << prefix << report << "\tchunk_tse\tunique=" << n);
584 }
585 
587 {
588  return SecondsToMs(TPSG_StatsPeriod::GetDefault());
589 }
590 
592  m_Timer(this, s_OnTimer, s_GetStatsPeriod(), s_GetStatsPeriod()),
593  m_Report(0),
594  m_Servers(servers)
595 {
596 }
597 
599 {
600  const auto prefix = "PSG_STATS\t";
601  const auto report = ++m_Report;
602 
606 
607  auto servers_locked = m_Servers.GetLock();
608 
609  for (const auto& server : *servers_locked) {
610  auto n = server.stats.load();
611  if (n) ERR_POST(Note << prefix << report << "\tserver\tname=" << server.address << "&requests_sent=" << n);
612  }
613 }
614 
616 {
617  for (auto& m : m_Messages) {
618  if (m && ((min_severity == eDiag_Trace) || ((m.severity != eDiag_Trace) && (m.severity >= min_severity)))) {
619  return exchange(m, SPSG_Message{});
620  }
621  }
622 
623  return {};
624 }
625 
627 {
628  m_InProgress.store(true);
629  m_Status.store(EPSG_Status::eSuccess);
630  m_Messages.clear();
631 }
632 
634 {
635  switch (status) {
643  default: return EPSG_Status::eError;
644  }
645 }
646 
648 {
649  chunks.clear();
650  args = SPSG_Args{};
651  expected = null;
652  received = 0;
653  state.Reset();
654 }
655 
657 {
658  // If it were 'more' (instead of 'less'), items would not be in progress then
659  const auto message = "Protocol error: received less than expected";
660  bool missing = false;
661 
662  if (auto items_locked = items.GetLock()) {
663  for (auto& item : *items_locked) {
664  if (item->state.InProgress()) {
665  item.GetLock()->state.AddError(message);
666  item->state.SetComplete();
667  missing = true;
668  }
669  }
670  }
671 
672  if (auto reply_item_locked = reply_item.GetLock()) {
673  if (missing || reply_item_locked->expected.Cmp<greater>(reply_item_locked->received)) {
674  reply_item_locked->state.AddError(message);
675  }
676 
677  reply_item_locked->state.SetComplete();
678  }
679 
681  queue->NotifyOne();
682 }
683 
684 void SPSG_Reply::SetFailed(string message, EPSG_Status status)
685 {
686  if (auto items_locked = items.GetLock()) {
687  for (auto& item : *items_locked) {
688  if (item->state.InProgress()) {
689  item.GetLock()->state.AddError(message);
690  item->state.SetComplete();
691  }
692  }
693  }
694 
695  if (auto reply_item_locked = reply_item.GetLock()) {
696  reply_item_locked->state.AddError(message, status);
697  reply_item_locked->state.SetComplete();
698  }
699 
701  queue->NotifyOne();
702 }
703 
704 optional<SPSG_Reply::SItem::TTS*> SPSG_Reply::GetNextItem(CDeadline deadline)
705 {
706  do {
707  bool was_in_progress = reply_item->state.InProgress();
708 
709  if (auto new_items_locked = new_items.GetLock()) {
710  if (!new_items_locked->empty()) {
711  auto rv = new_items_locked->front();
712  new_items_locked->pop_front();
713  return rv;
714  }
715  }
716 
717  // No more reply items
718  if (!was_in_progress) {
719  return nullptr;
720  }
721  }
722  while (reply_item.WaitUntil(reply_item->state.InProgress(), deadline, false, true));
723 
724  return nullopt;
725 }
726 
728 {
729  items.GetLock()->clear();
730  reply_item.GetLock()->Reset();
731 }
732 
733 shared_ptr<void> SPSG_Request::SContext::Set()
734 {
735  auto guard = m_ExistingGuard.lock();
736 
737  if (!guard) {
739  guard.reset(this, [](void*) { CDiagContext::SetRequestContext(nullptr); });
740  m_ExistingGuard = guard;
741  }
742 
743  return guard;
744 }
745 
746 SPSG_Request::SPSG_Request(string p, shared_ptr<SPSG_Reply> r, CRef<CRequestContext> c, const SPSG_Params& params) :
747  full_path(std::move(p)),
748  reply(r),
749  context(c),
750  m_State(&SPSG_Request::StatePrefix),
751  m_Retries(params)
752 {
753  _ASSERT(reply);
754 }
755 
757 {
758  // Reduce failure counter as well (retry counter is reduced in Retry() before returning eRetry)
760 
761  reply->Reset();
763  m_Buffer = SBuffer{};
764  m_ItemsByID.clear();
765 }
766 
768 {
769  static const string kPrefix = "\n\nPSG-Reply-Chunk: ";
770 
771  auto& index = m_Buffer.prefix_index;
772 
773  // Checking prefix
774  while (*data == kPrefix[index]) {
775  ++data;
776  --len;
777 
778  // Full prefix matched
779  if (++index == kPrefix.size()) {
781  return eContinue;
782  }
783 
784  if (!len) return eContinue;
785  }
786 
787  if (reply->raw && !index) {
790  return eContinue;
791  }
792 
793  // Check failed
794  const auto message = "Protocol error: prefix mismatch";
795 
796  if (Retry(message)) {
797  return eRetry;
798  }
799 
800  reply->reply_item.GetLock()->state.AddError(message);
801  return eStop;
802 }
803 
805 {
806  // Accumulating args
807  while (*data != '\n') {
808  m_Buffer.args_buffer.push_back(*data++);
809  if (!--len) return eContinue;
810  }
811 
812  ++data;
813  --len;
814 
816 
817  const auto& size_str = args.GetValue("size");
818  const auto size = size_str.empty() ? 0ul : stoul(size_str);
819 
820  m_Buffer.args = std::move(args);
821 
822  if (size) {
825  } else {
827  return Add();
828  }
829 
830  return eContinue;
831 }
832 
834 {
835  // Accumulating data
836  const auto data_size = min(m_Buffer.data_to_read, len);
837 
838  // Do not add an empty part
839  if (!data_size) return eContinue;
840 
841  auto& chunk = m_Buffer.chunk;
842  chunk.append(data, data_size);
843  data += data_size;
844  len -= data_size;
845  m_Buffer.data_to_read -= data_size;
846 
847  if (!m_Buffer.data_to_read) {
849  return Add();
850  }
851 
852  return eContinue;
853 }
854 
855 EDiagSev s_GetSeverity(const string& severity)
856 {
857  if (severity == "error") return eDiag_Error;
858  if (severity == "warning") return eDiag_Warning;
859  if (severity == "info") return eDiag_Info;
860  if (severity == "trace") return eDiag_Trace;
861  if (severity == "fatal") return eDiag_Fatal;
862  if (severity == "critical") return eDiag_Critical;
863 
864  // Should not happen
865  _TROUBLE;
866  return eDiag_Error;
867 }
868 
869 auto s_GetCode(const string& code)
870 {
871  return code.empty() ? optional<int>{} : atoi(code.c_str());
872 }
873 
875 {
876  auto context_guard = context.Set();
877 
878  auto& args = m_Buffer.args;
879  reply->debug_printout << args << m_Buffer.chunk << endl;
880 
881  const auto item_type = args.GetValue<SPSG_Args::eItemType>().first;
882  auto& reply_item_ts = reply->reply_item;
883 
884  if (item_type == SPSG_Args::eReply) {
885  if (auto item_locked = reply_item_ts.GetLock()) {
886  if (auto update_result = UpdateItem(item_type, *item_locked, args); update_result == eRetry503) {
887  return eRetry;
888  } else if (update_result == eNewItem) {
889  // No retries after returning any data to users
890  m_Retries.Zero();
891  }
892  }
893 
894  // Item must be unlocked before notifying
895  reply_item_ts.NotifyOne();
896 
897  } else {
898  if (auto reply_item_locked = reply_item_ts.GetLock()) {
899  auto& reply_item = *reply_item_locked;
900  ++reply_item.received;
901 
902  if (reply_item.expected.Cmp<less>(reply_item.received)) {
903  reply_item.state.AddError("Protocol error: received more than expected");
904  }
905  }
906 
907  auto item_id = args.GetValue("item_id");
908  auto& item_by_id = m_ItemsByID[item_id];
909  bool to_create = !item_by_id;
910 
911  if (to_create) {
912  if (auto items_locked = reply->items.GetLock()) {
913  items_locked->emplace_back();
914  item_by_id = &items_locked->back();
915  }
916  }
917 
918  if (auto item_locked = item_by_id->GetLock()) {
919  auto update_result = UpdateItem(item_type, *item_locked, args);
920 
921  if (update_result == eRetry503) {
922  return eRetry;
923  }
924 
925  if (to_create) {
926  item_locked->args = std::move(args);
927  }
928 
929  if (update_result == eNewItem) {
930  // No retries after returning any data to users
931  m_Retries.Zero();
932 
933  reply->new_items.GetLock()->emplace_back(item_by_id);
934  }
935 
936  reply_item_ts.NotifyOne();
937  }
938 
939  // Item must be unlocked before notifying
940  item_by_id->NotifyOne();
941  }
942 
943  reply->queue->NotifyOne();
944  m_Buffer = SBuffer();
945  return eContinue;
946 }
947 
949 {
950  auto get_status = [&]() { return NStr::StringToInt(args.GetValue("status"), NStr::fConvErr_NoThrow); };
951  auto can_retry_503 = [&](auto s, auto m) { return (s == CRequestStatus::e503_ServiceUnavailable) && Retry(m); };
952 
953  ++item.received;
954 
955  auto chunk_type = args.GetValue<SPSG_Args::eChunkType>();
956  auto& chunk = m_Buffer.chunk;
957  auto rv = eSuccess;
958 
959  if (chunk_type.first & SPSG_Args::eMeta) {
960  auto n_chunks = args.GetValue("n_chunks");
961 
962  if (!n_chunks.empty()) {
963  auto expected = stoul(n_chunks);
964 
965  if (item.expected.Cmp<not_equal_to>(expected)) {
966  item.state.AddError("Protocol error: contradicting n_chunks");
967  } else {
968  item.expected = expected;
969  }
970  }
971 
972  if (const auto status = get_status(); can_retry_503(status, "Server returned a meta with status 503")) {
973  return eRetry503;
974  } else if (status) {
976  }
977 
978  if ((item_type != SPSG_Args::eBlob) || item.chunks.empty()) {
979  rv = eNewItem;
980  }
981 
982  } else if (chunk_type.first == SPSG_Args::eUnknownChunk) {
983  static atomic_bool reported(false);
984 
985  if (!reported.exchange(true)) {
986  ERR_POST("Received unknown chunk type: " << chunk_type.second.get());
987  }
988 
989  if (TPSG_FailOnUnknownChunks::GetDefault()) {
990  item.state.AddError("Protocol error: unknown chunk type '" + chunk_type.second + '\'');
991  }
992  }
993 
994  if (chunk_type.first & SPSG_Args::eMessage) {
995  auto severity = s_GetSeverity(args.GetValue("severity"));
996  auto code = s_GetCode(args.GetValue("code"));
997 
998  if (severity <= eDiag_Warning) {
999  item.state.AddMessage(std::move(chunk), severity, code);
1000  } else if (severity == eDiag_Trace) {
1001  _DEBUG_CODE(item.state.AddMessage(std::move(chunk), severity, code););
1002  } else if (const auto status = get_status(); can_retry_503(status, chunk.c_str())) {
1003  return eRetry503;
1004  } else {
1005  item.state.AddError(std::move(chunk), SPSG_Reply::SState::FromRequestStatus(status), severity, code);
1006  }
1007 
1008  if (auto stats = reply->stats.lock()) stats->IncCounter(SPSG_Stats::eMessage, severity);
1009 
1010  } else if (chunk_type.first & SPSG_Args::eData) {
1011  auto blob_chunk = args.GetValue("blob_chunk");
1012  auto index = blob_chunk.empty() ? 0 : stoul(blob_chunk);
1013 
1014  if (item_type == SPSG_Args::eBlob) {
1015  if (!index) {
1016  rv = eNewItem;
1017  }
1018 
1019  if (auto stats = reply->stats.lock()) {
1020  auto has_blob_id = !args.GetValue<SPSG_Args::eBlobId>().get().empty();
1021  stats->AddData(has_blob_id, SPSG_Stats::eReceived, chunk.size());
1022  }
1023  }
1024 
1025  if (item.chunks.size() <= index) item.chunks.resize(index + 1);
1026 
1027  item.chunks[index] = std::move(chunk);
1028  }
1029 
1030  const bool is_not_reply = item_type != SPSG_Args::eReply;
1031 
1032  if (item.expected.Cmp<less>(item.received)) {
1033  item.state.AddError("Protocol error: received more than expected");
1034 
1035  // If item is not a reply itself, add the error to its reply as well
1036  if (is_not_reply) {
1037  reply->reply_item.GetLock()->state.AddError("Protocol error: received more than expected");
1038  }
1039 
1040  // Set item complete if received everything. Reply is set complete when stream closes
1041  } else if (is_not_reply && item.expected.Cmp<equal_to>(item.received)) {
1042  item.state.SetComplete();
1043  }
1044 
1045  return rv;
1046 }
1047 
1048 
1049 #define HTTP_STATUS_HEADER ":status"
1050 
1051 
1052 /** SPSG_IoSession */
1053 
1054 template <class... TNgHttp2Cbs>
1055 SPSG_IoSession::SPSG_IoSession(SPSG_Server& s, const SPSG_Params& params, SPSG_AsyncQueue& queue, uv_loop_t* loop, TNgHttp2Cbs&&... callbacks) :
1057  loop,
1058  TAddrNCred{{s.address, SUvNgHttp2_Tls::TCred()}, params.proxy},
1059  TPSG_RdBufSize::GetDefault(),
1060  TPSG_WrBufSize::GetDefault(),
1061  TPSG_Https::GetDefault(),
1062  TPSG_MaxConcurrentStreams::GetDefault(),
1063  std::forward<TNgHttp2Cbs>(callbacks)...),
1064  server(s),
1065  m_Params(params),
1066  m_Headers{{
1067  { ":method", "GET" },
1068  { ":scheme", TPSG_Https::GetDefault() ? "https" : "http" },
1069  { ":authority", m_Authority },
1070  { ":path" },
1071  { "user-agent", SUvNgHttp2_UserAgent::Get() },
1072  { "http_ncbi_sid" },
1073  { "http_ncbi_phid" },
1074  { "cookie" },
1075  { "x-forwarded-for" }
1076  }},
1077  m_Queue(queue),
1078  m_Requests(*this)
1079 {
1080 }
1081 
1082 int SPSG_IoSession::OnData(nghttp2_session*, uint8_t, int32_t stream_id, const uint8_t* data, size_t len)
1083 {
1084  PSG_IO_SESSION_TRACE(this << '/' << stream_id << " received: " << len);
1085 
1086  if (auto it = m_Requests.find(stream_id); it != m_Requests.end()) {
1087  if (auto [processor_id, req] = it->second.Get(); req) {
1088  auto result = req->OnReplyData(processor_id, (const char*)data, len);
1089 
1091  it->second.ResetTime();
1092  return 0;
1093 
1094  } else if (result == SPSG_Request::eRetry) {
1095  req->Reset();
1096  m_Queue.Emplace(req);
1097  m_Queue.Signal();
1098 
1099  } else {
1100  req->reply->SetComplete();
1101  }
1102 
1104  }
1105 
1106  m_Requests.erase(it);
1107  }
1108 
1109  return 0;
1110 }
1111 
1112 bool SPSG_Request::Retry(const SUvNgHttp2_Error& error, bool refused_stream)
1113 {
1114  auto context_guard = context.Set();
1115 
1116  if (auto retries = GetRetries(SPSG_Retries::eRetry, refused_stream)) {
1117  reply->debug_printout << retries << error << endl;
1118  return true;
1119  }
1120 
1121  return false;
1122 }
1123 
1124 bool SPSG_Request::Fail(SPSG_Processor::TId processor_id, const SUvNgHttp2_Error& error, bool refused_stream)
1125 {
1126  if (GetRetries(SPSG_Retries::eFail, refused_stream)) {
1127  return false;
1128  }
1129 
1130  auto context_guard = context.Set();
1131 
1132  reply->debug_printout << error << endl;
1133  OnReplyDone(processor_id)->SetFailed(error);
1134  return true;
1135 }
1136 
1138 {
1139  if (m_Buffer.args_buffer.empty() && !m_Buffer.chunk.empty()) {
1140  m_Buffer.args = "item_id=1&item_type=unknown&chunk_type=data_and_meta&n_chunks=1"s;
1141  Add();
1142  m_Buffer.args = "item_id=0&item_type=reply&chunk_type=meta&n_chunks=2"s;
1143  Add();
1144  }
1145 }
1146 
1147 bool SPSG_IoSession::Fail(SPSG_Processor::TId processor_id, shared_ptr<SPSG_Request> req, const SUvNgHttp2_Error& error, bool refused_stream)
1148 {
1149  auto context_guard = req->context.Set();
1150  auto rv = req->Fail(processor_id, error, refused_stream);
1151 
1153 
1154  if (rv) {
1155  PSG_THROTTLING_TRACE("Server '" << GetId() << "' failed to process request '" <<
1156  req->reply->debug_printout.id << "', " << error);
1157  }
1158 
1159  return rv;
1160 }
1161 
1162 int SPSG_IoSession::OnStreamClose(nghttp2_session*, int32_t stream_id, uint32_t error_code)
1163 {
1164  PSG_IO_SESSION_TRACE(this << '/' << stream_id << " closed: " << error_code);
1165 
1166  if (auto it = m_Requests.find(stream_id); it != m_Requests.end()) {
1167  if (auto [processor_id, req] = it->second.Get(); req) {
1168  auto context_guard = req->context.Set();
1169  auto& debug_printout = req->reply->debug_printout;
1170  debug_printout << error_code << endl;
1171 
1172  // If there is an error and the request is allowed to Retry
1173  if (error_code) {
1174  auto error(SUvNgHttp2_Error::FromNgHttp2(error_code, "on close"));
1175 
1176  if (RetryFail(processor_id, req, error, error_code == NGHTTP2_REFUSED_STREAM)) {
1177  ERR_POST("Request for " << GetId() << " failed with " << error);
1178  }
1179  } else {
1180  if (req->reply->raw) {
1181  req->ConvertRaw();
1182  }
1183 
1184  req->OnReplyDone(processor_id)->SetComplete();
1186  PSG_THROTTLING_TRACE("Server '" << GetId() << "' processed request '" <<
1187  debug_printout.id << "' successfully");
1188  }
1189  }
1190 
1191  m_Requests.erase(it);
1192  }
1193 
1194  return 0;
1195 }
1196 
1197 int SPSG_IoSession::OnHeader(nghttp2_session*, const nghttp2_frame* frame, const uint8_t* name,
1198  size_t namelen, const uint8_t* value, size_t, uint8_t)
1199 {
1200  if ((frame->hd.type == NGHTTP2_HEADERS) && (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) &&
1201  (namelen == sizeof(HTTP_STATUS_HEADER) - 1) && (strcmp((const char*)name, HTTP_STATUS_HEADER) == 0)) {
1202 
1203  auto stream_id = frame->hd.stream_id;
1204  auto status_str = reinterpret_cast<const char*>(value);
1205 
1206  PSG_IO_SESSION_TRACE(this << '/' << stream_id << " status: " << status_str);
1207 
1208  if (auto it = m_Requests.find(stream_id); it != m_Requests.end()) {
1209  const auto request_status = static_cast<CRequestStatus::ECode>(atoi(status_str));
1210  const auto status = SPSG_Reply::SState::FromRequestStatus(request_status);
1211 
1212  if (status != EPSG_Status::eSuccess) {
1213  if (auto [processor_id, req] = it->second.Get(); req) {
1214  const auto error = to_string(request_status) + ' ' + CRequestStatus::GetStdStatusMessage(request_status);
1215  req->OnReplyDone(processor_id)->SetFailed(error, status);
1216  } else {
1217  m_Requests.erase(it);
1218  }
1219  }
1220  }
1221  }
1222 
1223  return 0;
1224 }
1225 
1226 bool SPSG_IoSession::ProcessRequest(SPSG_TimedRequest timed_req, SPSG_Processor::TId processor_id, shared_ptr<SPSG_Request> req)
1227 {
1228  PSG_IO_SESSION_TRACE(this << " processing requests");
1229 
1230  auto context_guard = req->context.Set();
1232 
1233  const auto& path = req->full_path;
1234  const auto& session_id = context.GetSessionID();
1235  const auto& sub_hit_id = context.GetNextSubHitID();
1236  const auto& cookie = m_Params.GetCookie([&]() { return context.GetProperty("auth_token"); });
1237  const auto& client_ip = context.GetClientIP();
1238  auto headers_size = m_Headers.size();
1239 
1240  m_Headers[ePath] = path;
1241  m_Headers[eSessionID] = session_id;
1242  m_Headers[eSubHitID] = sub_hit_id;
1243  m_Headers[eCookie] = cookie;
1244 
1245  if (!client_ip.empty()) {
1246  m_Headers[eClientIP] = client_ip;
1247  } else {
1248  --headers_size;
1249  }
1250 
1251  auto stream_id = m_Session.Submit(m_Headers.data(), headers_size);
1252 
1253  if (stream_id < 0) {
1254  auto error(SUvNgHttp2_Error::FromNgHttp2(stream_id, "on submit"));
1255 
1256  // Do not reset all requests unless throttling has been activated
1257  if (RetryFail(processor_id, req, error) && server.throttling.Active()) {
1258  Reset(std::move(error));
1259  }
1260 
1261  return false;
1262  }
1263 
1264  req->submitted_by.Set(GetInternalId());
1265  req->reply->debug_printout << server.address << path << session_id << sub_hit_id << client_ip << m_Tcp.GetLocalPort() << endl;
1266  PSG_IO_SESSION_TRACE(this << '/' << stream_id << " submitted");
1267  m_Requests.emplace(stream_id, std::move(timed_req));
1268  return Send();
1269 }
1270 
1271 template <class TOnRetry, class TOnFail>
1272 bool SPSG_TimedRequest::CheckExpiration(const SPSG_Params& params, const SUvNgHttp2_Error& error, TOnRetry on_retry, TOnFail on_fail)
1273 {
1274  auto [processor_id, req] = Get();
1275 
1276  // Remove competitive requests if one is already being processed
1277  if (!req) {
1278  return true;
1279  }
1280 
1281  auto time = AddTime();
1282 
1283  if (time == params.competitive_after) {
1284  if (req->Retry(error)) {
1285  if (auto stats = req->reply->stats.lock()) stats->IncCounter(SPSG_Stats::eRetries, ePSG_StatsCountersRetries_Retry);
1286  on_retry(req);
1287  }
1288  }
1289 
1290  if (time >= params.request_timeout) {
1291  if (auto stats = req->reply->stats.lock()) stats->IncCounter(SPSG_Stats::eRetries, ePSG_StatsCountersRetries_Timeout);
1292  on_fail(processor_id, req);
1293  return true;
1294  }
1295 
1296  return false;
1297 }
1298 
1300 {
1301  SUvNgHttp2_Error error("Request timeout for ");
1302  error << GetId();
1303 
1304  auto on_retry = [&](auto req) { m_Queue.Emplace(req); m_Queue.Signal(); };
1305  auto on_fail = [&](auto processor_id, auto req) { Fail(processor_id, req, error); };
1306 
1307  for (auto it = m_Requests.begin(); it != m_Requests.end(); ) {
1308  if (it->second.CheckExpiration(m_Params, error, on_retry, on_fail)) {
1309  it = m_Requests.erase(it);
1310  } else {
1311  ++it;
1312  }
1313  }
1314 }
1315 
1317 {
1318  bool some_requests_failed = false;
1319 
1320  for (auto& pair : m_Requests) {
1321  if (auto [processor_id, req] = pair.second.Get(); req) {
1322  if (RetryFail(processor_id, req, error)) {
1323  some_requests_failed = true;
1324  }
1325  }
1326  }
1327 
1328  if (some_requests_failed) {
1329  ERR_POST("Some requests for " << GetId() << " failed with " << error);
1330  }
1331 
1332  m_Requests.clear();
1333 }
1334 
1335 
1336 /** SPSG_ThrottleParams */
1337 
1339 {
1340  if (error_rate.empty()) return;
1341 
1342  string numerator_str, denominator_str;
1343 
1344  if (!NStr::SplitInTwo(error_rate, "/", numerator_str, denominator_str)) return;
1345 
1347 
1348  int n = NStr::StringToInt(numerator_str, flags);
1349  int d = NStr::StringToInt(denominator_str, flags);
1350 
1351  if (n > 0) numerator = static_cast<size_t>(n);
1352  if (d > 1) denominator = static_cast<size_t>(d);
1353 
1354  if (denominator > kMaxDenominator) {
1357  }
1358 }
1359 
1361  period(SecondsToMs(TPSG_ThrottlePeriod::GetDefault())),
1362  max_failures(TPSG_ThrottleMaxFailures::eGetDefault),
1364  threshold(TPSG_ThrottleThreshold::GetDefault())
1365 {
1366 }
1367 
1368 
1369 /** SPSG_Throttling */
1370 
1372  m_Address(address),
1373  m_Stats(std::move(p)),
1374  m_Active(eOff),
1375  m_Timer(this, s_OnTimer, Configured(), 0)
1376 {
1377  m_Timer.Init(l);
1378  m_Signal.Init(this, l, s_OnSignal);
1379 }
1380 
1382 {
1383  m_Signal.Unref();
1384  m_Timer.Close();
1385 }
1386 
1388 {
1389  m_Signal.Ref();
1390  m_Signal.Close();
1391 }
1392 
1394 {
1395  auto stats_locked = m_Stats.GetLock();
1396 
1397  if (stats_locked->Adjust(m_Address, result)) {
1398  m_Active.store(eOnTimer);
1399 
1400  // We cannot start throttle timer from any thread (it's not thread-safe),
1401  // so we use async signal to start timer in the discovery thread
1402  m_Signal.Signal();
1403  return true;
1404  }
1405 
1406  return false;
1407 }
1408 
1410 {
1411  if (result) {
1412  failures = 0;
1413 
1414  } else if (params.max_failures && (++failures >= params.max_failures)) {
1415  ERR_POST(Warning << "Server '" << address <<
1416  "' reached the maximum number of failures in a row (" << params.max_failures << ')');
1417  Reset();
1418  return true;
1419  }
1420 
1421  if (params.threshold.numerator > 0) {
1422  auto& reg = threshold_reg.first;
1423  auto& index = threshold_reg.second;
1424  const auto failure = !result;
1425 
1426  if (reg[index] != failure) {
1427  reg[index] = failure;
1428 
1429  if (failure && (reg.count() >= params.threshold.numerator)) {
1430  ERR_POST(Warning << "Server '" << address << "' is considered bad/overloaded ("
1431  << params.threshold.numerator << '/' << params.threshold.denominator << ')');
1432  Reset();
1433  return true;
1434  }
1435  }
1436 
1437  if (++index >= params.threshold.denominator) index = 0;
1438  }
1439 
1440  return false;
1441 }
1442 
1444 {
1445  failures = 0;
1446  threshold_reg.first.reset();
1447 }
1448 
1449 
1450 /** SPSG_IoImpl */
1451 
1452 void SPSG_IoImpl::OnShutdown(uv_async_t*)
1453 {
1454  m_Queue.Unref();
1455 
1456  for (auto& server : m_Sessions) {
1457  for (auto& session : server.sessions) {
1458  session.Reset("Shutdown is in process", SUv_Tcp::eNormalClose);
1459  }
1460  }
1461 }
1462 
1464 {
1465  auto servers_locked = m_Servers.GetLock();
1466  auto& servers = *servers_locked;
1467 
1468  for (auto& server : servers) {
1469  server.throttling.StartClose();
1470  }
1471 
1472  if (m_Stats) m_Stats->Stop();
1473 }
1474 
1476 {
1477  auto servers_locked = m_Servers.GetLock();
1478  auto& servers = *servers_locked;
1479 
1480  for (auto& server : servers) {
1481  server.throttling.FinishClose();
1482  }
1483 }
1484 
1485 void SPSG_IoImpl::AddNewServers(uv_async_t* handle)
1486 {
1487  // Add new session(s) if new server(s) have been added
1488  auto servers_locked = m_Servers.GetLock();
1489  auto& servers = *servers_locked;
1490 
1491  // m_Servers->size() can be used for new_servers only after locking m_Servers to avoid a race
1492  const auto servers_size = m_Servers->size();
1493  const auto sessions_size = m_Sessions.size();
1494 
1495  _ASSERT(servers_size >= sessions_size);
1496 
1497  for (auto new_servers = servers_size - sessions_size; new_servers; --new_servers) {
1498  auto& server = servers[servers_size - new_servers];
1499  m_Sessions.emplace_back().sessions.emplace_back(server, m_Params, m_Queue, handle->loop);
1500  PSG_IO_TRACE("Session for server '" << server.address << "' was added");
1501  }
1502 }
1503 
1504 void SPSG_IoImpl::OnQueue(uv_async_t* handle)
1505 {
1506  auto available_servers = 0;
1507 
1508  for (auto& server : m_Sessions) {
1509  server.current_rate = server->rate.load();
1510 
1511  if (server.current_rate) {
1512  ++available_servers;
1513  }
1514  }
1515 
1516  auto remaining_submits = m_Params.max_concurrent_submits.Get();
1517 
1518  auto i = m_Sessions.begin();
1519  auto [timed_req, processor_id, req] = decltype(m_Queue.Pop()){};
1520  auto d = m_Random.first;
1521  auto request_rate = 0.0;
1522  auto target_rate = 0.0;
1523  _DEBUG_ARG(string req_id);
1524 
1525  // Clang requires '&timed_req = timed_req, &processor_id = processor_id, &req = req'
1526  auto get_request = [&, &timed_req = timed_req, &processor_id = processor_id, &req = req]() {
1527  tie(timed_req, processor_id, req) = m_Queue.Pop();
1528 
1529  if (!req) {
1530  PSG_IO_TRACE("No [more] requests pending");
1531  return false;
1532  }
1533 
1534  request_rate = 0.0;
1535  target_rate = d(m_Random.second);
1536  _DEBUG_CODE(req_id = req->reply->debug_printout.id;);
1537  PSG_IO_TRACE("Ready to submit request '" << req_id << '\'');
1538  return true;
1539  };
1540 
1541  auto next_server = [&]() {
1542  if (++i == m_Sessions.end()) i = m_Sessions.begin();
1543  };
1544 
1545  auto ignore_server = [&]() {
1546  if (--available_servers) {
1547  d = uniform_real_distribution<>(0.0, d.max() - i->current_rate);
1548  i->current_rate = 0.0;
1549  next_server();
1550  }
1551  };
1552 
1553  auto find_session = [&]() {
1554  auto s = i->sessions.begin();
1555  for (; (s != i->sessions.end()) && s->IsFull(); ++s);
1556  return make_pair(s != i->sessions.end(), s);
1557  };
1558 
1559  while (available_servers && remaining_submits) {
1560  // Try to get a request if needed
1561  if (!req && !get_request()) {
1562  return;
1563  }
1564 
1565  // Select a server from available according to target rate
1566  for (; request_rate += i->current_rate, request_rate < target_rate; next_server());
1567 
1568  auto& server = *i;
1569 
1570  // If throttling has been activated (possibly in a different thread)
1571  if (server->throttling.Active()) {
1572  PSG_IO_TRACE("Server '" << server->address << "' is throttled, ignoring");
1573  ignore_server();
1574 
1575  // If server has reached its limit (possibly in a different thread)
1576  } else if (server->available_streams <= 0) {
1577  PSG_IO_TRACE("Server '" << server->address << "' is at request limit, ignoring");
1578  ignore_server();
1579 
1580  // If all server sessions are full
1581  } else if (auto [found, session] = find_session(); !found) {
1582  PSG_IO_TRACE("Server '" << server->address << "' has no sessions available, ignoring");
1583  ignore_server();
1584 
1585  // If this is a competitive stream, try a different server
1586  } else if (!session->CanProcessRequest(req)) {
1587  PSG_IO_TRACE("Server '" << session->GetId() << "' already working/worked on the request, trying to find another one");
1588  // Reset submitter ID and move to next server (the same server will be submitted to if it's the only one available)
1589  req->submitted_by.Reset();
1590  next_server();
1591 
1592  // If failed to submit
1593  } else if (!session->ProcessRequest(*std::move(timed_req), processor_id, std::move(req))) {
1594  PSG_IO_TRACE("Server '" << session->GetId() << "' failed to get request '" << req_id << "' with rate = " << target_rate);
1595  next_server();
1596 
1597  // Submitted successfully
1598  } else {
1599  PSG_IO_TRACE("Server '" << session->GetId() << "' got request '" << req_id << "' with rate = " << target_rate);
1600  --remaining_submits;
1601  ++server->stats;
1602 
1603  // Add new session if needed and allowed to
1604  if (session->IsFull() && (distance(session, server.sessions.end()) == 1)) {
1605  if (server.sessions.size() >= TPSG_MaxSessions::GetDefault()) {
1606  PSG_IO_TRACE("Server '" << server->address << "' reached session limit");
1607  ignore_server();
1608  } else {
1609  server.sessions.emplace_back(*server, m_Params, m_Queue, handle->loop);
1610  PSG_IO_TRACE("Additional session for server '" << server->address << "' was added");
1611  }
1612  }
1613  }
1614  }
1615 
1616  if (req) {
1617  // Do not need to signal here
1618  m_Queue.Emplace(*std::move(timed_req));
1619  }
1620 
1621  if (!remaining_submits) {
1622  PSG_IO_TRACE("Max concurrent submits reached, submitted: " << m_Params.max_concurrent_submits);
1623  m_Queue.Signal();
1624  } else {
1625  PSG_IO_TRACE("No sessions available [anymore], submitted: " << m_Params.max_concurrent_submits - remaining_submits);
1626  }
1627 }
1628 
1629 void SPSG_DiscoveryImpl::OnTimer(uv_timer_t* handle)
1630 {
1631  const auto kRegularRate = nextafter(0.009, 1.0);
1632  const auto kStandbyRate = 0.001;
1633 
1634  const auto& service_name = m_Service.GetServiceName();
1635  auto discovered = m_Service();
1636 
1637  auto total_preferred_regular_rate = 0.0;
1638  auto total_preferred_standby_rate = 0.0;
1639  auto total_regular_rate = 0.0;
1640  auto total_standby_rate = 0.0;
1641 
1642  // Accumulate totals
1643  for (auto& server : discovered) {
1644  const auto is_server_preferred = server.first.host == CSocketAPI::GetLocalHostAddress();
1645 
1646  if (server.second >= kRegularRate) {
1647  if (is_server_preferred) {
1648  total_preferred_regular_rate += server.second;
1649  }
1650 
1651  total_regular_rate += server.second;
1652 
1653  } else if (server.second >= kStandbyRate) {
1654  if (is_server_preferred) {
1655  total_preferred_standby_rate += server.second;
1656  }
1657 
1658  total_standby_rate += server.second;
1659  }
1660  }
1661 
1662  if (m_NoServers(total_regular_rate || total_standby_rate, SUv_Timer::GetThat<SUv_Timer>(handle))) {
1663  ERR_POST("No servers in service '" << service_name << '\'');
1664  return;
1665  }
1666 
1667  const auto localhost_preference = TPSG_LocalhostPreference::GetDefault();
1668  const auto total_preferred_standby_percentage = localhost_preference ? 1.0 - 1.0 / localhost_preference : 0.0;
1669  const auto have_any_regular_servers = total_regular_rate > 0.0;
1670  const auto have_nonpreferred_regular_servers = total_regular_rate > total_preferred_regular_rate;
1671  const auto have_nonpreferred_standby_servers = total_standby_rate > total_preferred_standby_rate;
1672  const auto regular_rate_adjustment =
1673  (localhost_preference <= 1) ||
1674  (total_preferred_regular_rate != 0.0) ||
1675  (total_preferred_standby_rate == 0.0) ||
1676  (!have_nonpreferred_regular_servers && !have_nonpreferred_standby_servers);
1677  auto rate_total = 0.0;
1678 
1679  // Adjust discovered rates
1680  for (auto& server : discovered) {
1681  const auto is_server_preferred = server.first.host == CSocketAPI::GetLocalHostAddress();
1682  const auto old_rate = server.second;
1683 
1684  if (server.second >= kRegularRate) {
1685  if (is_server_preferred) {
1686  if (regular_rate_adjustment) {
1687  server.second *= localhost_preference;
1688  } else if (have_nonpreferred_regular_servers) {
1689  server.second = 0.0;
1690  } else if (have_nonpreferred_standby_servers) {
1691  server.second = 0.0;
1692  }
1693  } else {
1694  if (regular_rate_adjustment) {
1695  // No adjustments
1696  } else if (have_nonpreferred_regular_servers) {
1697  server.second *= (1 - total_preferred_standby_percentage) / total_regular_rate;
1698  } else if (have_nonpreferred_standby_servers) {
1699  server.second = 0.0;
1700  }
1701  }
1702  } else if (server.second >= kStandbyRate) {
1703  if (is_server_preferred) {
1704  if (regular_rate_adjustment && have_any_regular_servers) {
1705  server.second = 0.0;
1706  } else if (regular_rate_adjustment) {
1707  server.second *= localhost_preference;
1708  } else if (have_nonpreferred_regular_servers) {
1709  server.second *= total_preferred_standby_percentage / total_preferred_standby_rate;
1710  } else if (have_nonpreferred_standby_servers) {
1711  server.second *= total_preferred_standby_percentage / total_preferred_standby_rate;
1712  }
1713  } else {
1714  if (regular_rate_adjustment && have_any_regular_servers && (localhost_preference || have_nonpreferred_regular_servers)) {
1715  server.second = 0.0;
1716  } else if (regular_rate_adjustment) {
1717  // No adjustments
1718  } else if (have_nonpreferred_regular_servers) {
1719  server.second = 0.0;
1720  } else if (have_nonpreferred_standby_servers) {
1721  server.second *= (1 - total_preferred_standby_percentage) / (total_standby_rate - total_preferred_standby_rate);
1722  }
1723  }
1724  }
1725 
1726  rate_total += server.second;
1727 
1728  if (old_rate != server.second) {
1729  PSG_DISCOVERY_TRACE("Rate for '" << server.first << "' adjusted from " << old_rate << " to " << server.second);
1730  }
1731  }
1732 
1733  auto servers_locked = m_Servers.GetLock();
1734  auto& servers = *servers_locked;
1735 
1736  // Update existing servers
1737  for (auto& server : servers) {
1738  auto address_same = [&](CServiceDiscovery::TServer& s) { return s.first == server.address; };
1739  auto it = find_if(discovered.begin(), discovered.end(), address_same);
1740 
1741  if ((it == discovered.end()) || (it->second <= numeric_limits<double>::epsilon())) {
1742  server.rate = 0.0;
1743  PSG_DISCOVERY_TRACE("Server '" << server.address << "' disabled in service '" << service_name << '\'');
1744 
1745  } else {
1746  server.throttling.Discovered();
1747  auto rate = it->second / rate_total;
1748 
1749  if (server.rate != rate) {
1750  // This has to be before the rate change for the condition to work (uses old rate)
1751  PSG_DISCOVERY_TRACE("Server '" << server.address <<
1752  (server.rate ? "' updated in service '" : "' enabled in service '" ) <<
1753  service_name << "' with rate = " << rate);
1754 
1755  server.rate = rate;
1756  }
1757 
1758  // Reset rate to avoid adding the server below
1759  it->second = 0.0;
1760  }
1761  }
1762 
1763  // Add new servers
1764  for (auto& server : discovered) {
1765  if (server.second > numeric_limits<double>::epsilon()) {
1766  auto rate = server.second / rate_total;
1767  servers.emplace_back(server.first, rate, m_Params.max_concurrent_requests_per_server, m_ThrottleParams, handle->loop);
1768  _DEBUG_CODE(server.first.GetHostName();); // To avoid splitting the trace message below by gethostbyaddr
1769  PSG_DISCOVERY_TRACE("Server '" << server.first << "' added to service '" <<
1770  service_name << "' with rate = " << rate);
1771  }
1772  }
1773 
1774  m_QueuesRef.SignalAll();
1775 }
1776 
1777 void SPSG_IoImpl::OnTimer(uv_timer_t*)
1778 {
1779  if (m_Servers->fail_requests) {
1780  FailRequests();
1781  } else {
1782  CheckRequestExpiration();
1783  }
1784 
1785  for (auto& server : m_Sessions) {
1786  for (auto& session : server.sessions) {
1787  session.CheckRequestExpiration();
1788  }
1789  }
1790 }
1791 
1793 {
1794  auto queue_locked = m_Queue.GetLockedQueue();
1795  list<SPSG_TimedRequest> retries;
1796  SUvNgHttp2_Error error("Request timeout before submitting");
1797 
1798  auto on_retry = [&](auto req) { retries.emplace_back(req); m_Queue.Signal(); };
1799  auto on_fail = [&](auto processor_id, auto req) { req->Fail(processor_id, error); };
1800 
1801  for (auto it = queue_locked->begin(); it != queue_locked->end(); ) {
1802  if (it->CheckExpiration(m_Params, error, on_retry, on_fail)) {
1803  it = queue_locked->erase(it);
1804  } else {
1805  ++it;
1806  }
1807  }
1808 
1809  queue_locked->splice(queue_locked->end(), retries);
1810 }
1811 
1813 {
1814  auto queue_locked = m_Queue.GetLockedQueue();
1815  SUvNgHttp2_Error error("No servers to process request");
1816 
1817  for (auto& timed_req : *queue_locked) {
1818  if (auto [processor_id, req] = timed_req.Get(); req) {
1819  auto context_guard = req->context.Set();
1820  auto& debug_printout = req->reply->debug_printout;
1821  debug_printout << error << endl;
1822  req->OnReplyDone(processor_id)->SetFailed(error);
1823  PSG_IO_TRACE("No servers to process request '" << debug_printout.id << '\'');
1824  }
1825  }
1826 
1827  queue_locked->clear();
1828 }
1829 
1830 void SPSG_IoImpl::OnExecute(uv_loop_t& loop)
1831 {
1832  m_Queue.Init(this, &loop, s_OnQueue);
1833 }
1834 
1836 {
1837  m_Queue.Ref();
1838  m_Queue.Close();
1839  m_Sessions.clear();
1840 }
1841 
1843  m_RetryDelay(SecondsToMs(TPSG_NoServersRetryDelay::GetDefault())),
1844  m_Timeout(SecondsToMs((params.request_timeout + params.competitive_after * params.request_retries) * params.io_timer_period)),
1845  m_FailRequests(const_cast<atomic_bool&>(servers->fail_requests))
1846 {
1847 }
1848 
1850 {
1851  // If there is a separate timer set for the case of no servers discovered
1852  if (m_RetryDelay) {
1853  if (discovered) {
1854  timer->ResetRepeat();
1855  } else {
1856  timer->SetRepeat(m_RetryDelay);
1857  }
1858  }
1859 
1860  // If there is a request timeout set
1861  if (m_Timeout) {
1862  const auto timeout_expired = m_Passed >= m_Timeout;
1863  m_FailRequests = timeout_expired;
1864 
1865  if (discovered) {
1866  m_Passed = 0;
1867  } else if (!timeout_expired) {
1868  // Passed is increased after fail flag is set, the flag would be set too early otherwise
1869  m_Passed += m_RetryDelay ? m_RetryDelay : timer->GetDefaultRepeat();
1870  }
1871  }
1872 
1873  return !discovered;
1874 }
1875 
1876 
1877 /** SPSG_IoCoordinator */
1878 
1879 shared_ptr<SPSG_Stats> s_GetStats(SPSG_Servers::TTS& servers)
1880 {
1881  if (TPSG_Stats::GetDefault()) {
1882  return make_shared<SPSG_Stats>(servers);
1883  } else {
1884  return {};
1885  }
1886 }
1887 
1889 {
1890  return service.IsSingleServer() ? 0 : SecondsToMs(TPSG_RebalanceTime::GetDefault());
1891 }
1892 
1895  m_StartBarrier(TPSG_NumIo::GetDefault() + 2),
1896  m_StopBarrier(TPSG_NumIo::GetDefault() + 1),
1897  m_Discovery(m_StartBarrier, m_StopBarrier, 0, s_GetDiscoveryRepeat(service), service, stats, params, m_Servers, m_Queues),
1898  m_RequestCounter(0),
1899  m_RequestId(1)
1900 {
1901  const auto io_timer_period = SecondsToMs(params.io_timer_period);
1902 
1903  for (unsigned i = 0; i < TPSG_NumIo::GetDefault(); i++) {
1904  // This timing cannot be changed without changes in SPSG_IoSession::CheckRequestExpiration
1905  m_Io.emplace_back(new SPSG_Thread<SPSG_IoImpl>(m_StartBarrier, m_StopBarrier, io_timer_period, io_timer_period, params, m_Servers, m_Queues.emplace_back(m_Queues)));
1906  }
1907 
1908  m_StartBarrier.Wait();
1909 }
1910 
1912 {
1914 
1915  for (auto& io : m_Io) {
1916  io->Shutdown();
1917  }
1918 }
1919 
1920 bool SPSG_IoCoordinator::AddRequest(shared_ptr<SPSG_Request> req, const atomic_bool&, const CDeadline&)
1921 {
1922  if (m_Io.size() == 0) {
1923  ERR_POST(Fatal << "IO is not open");
1924  }
1925 
1926  const auto idx = (m_RequestCounter++ / params.requests_per_io) % m_Io.size();
1927  m_Queues[idx].Emplace(std::move(req));
1928  m_Queues[idx].Signal();
1929  return true;
1930 }
1931 
1932 
1934 
1935 #endif
CDeadline.
Definition: ncbitime.hpp:1830
Blob unique ID.
Definition: psg_client.hpp:226
Chunk unique ID.
Definition: psg_client.hpp:265
@ eEndOfReply
No more items expected in the (overall!) reply.
Definition: psg_client.hpp:675
bool IsSingleServer() const
pair< SSocketAddress, double > TServer
size_type size() const
Definition: map.hpp:148
Definition: map.hpp:338
static uch flags
static const char ip[]
Definition: des.c:75
unsigned short TGroup
static int failure
Definition: t0019.c:11
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
Definition: dlist.tmpl.h:46
static const char * expected[]
Definition: bcp.c:42
char data[12]
Definition: iconv.c:80
Uint8 uint64_t
Int4 int32_t
unsigned char uint8_t
Uint4 uint32_t
@ eOff
Definition: ncbi_types.h:110
#define _DEBUG_CODE(code)
Definition: ncbidbg.hpp:136
#define _DEBUG_ARG(arg)
Definition: ncbidbg.hpp:134
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
Definition: ncbidiag.cpp:1907
static string GetStdStatusMessage(ECode code)
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
Definition: ncbidiag.cpp:1901
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Trace
Trace message.
Definition: ncbidiag.hpp:657
@ eDiag_Info
Informational message.
Definition: ncbidiag.hpp:651
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
@ eDiag_Warning
Warning message.
Definition: ncbidiag.hpp:652
@ eDiag_Fatal
Fatal error – guarantees exit(or abort)
Definition: ncbidiag.hpp:655
@ eDiag_Critical
Critical error message.
Definition: ncbidiag.hpp:654
@ e451_Unavailable_For_Legal_Reasons
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
void Fatal(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1209
const float epsilon
Definition: math.hpp:61
const CSeq_id & GetId(const CSeq_loc &loc, CScope *scope)
If all CSeq_ids embedded in CSeq_loc refer to the same CBioseq, returns the first CSeq_id found,...
@ eParam_Default
Default flags.
Definition: ncbi_param.hpp:416
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
static unsigned int GetLocalHostAddress(ESwitch reget=eDefault)
Local host address in network byte order (cached for faster retrieval)
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
Definition: ncbistr.cpp:3953
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
Definition: ncbistr.cpp:630
static string URLDecode(const CTempString str, EUrlDecode flag=eUrlDec_All)
URL-decode string.
Definition: ncbistr.cpp:6214
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3554
static string URLEncode(const CTempString str, EUrlEncode flag=eUrlEnc_SkipMarkChars)
URL-encode string.
Definition: ncbistr.cpp:6062
@ fAllowTrailingSpaces
Ignore trailing space characters.
Definition: ncbistr.hpp:297
@ fConvErr_NoThrow
Do not throw an exception on error.
Definition: ncbistr.hpp:285
@ fAllowLeadingSpaces
Ignore leading spaces in converted string.
Definition: ncbistr.hpp:294
string GetQueryString(EAmpEncoding amp_enc, NStr::EUrlEncode encode) const
Construct and return complete query string.
Definition: ncbi_url.cpp:225
list< TArg > TArgs
Definition: ncbi_url.hpp:276
@ eAmp_Char
Use & to separate arguments.
Definition: ncbi_url.hpp:254
static string kPrefix
Definition: id2info.cpp:146
where both of them are integers Note
int i
yy_size_t n
int len
NCBI_PARAM_TYPE(PSG, throttle_by_connection_error_rate) TPSG_ThrottleThreshold
Definition: misc.hpp:374
EPSG_UseCache
Definition: misc.hpp:380
EPSG_DebugPrintout
Definition: misc.hpp:376
EPSG_PsgClientMode
Definition: misc.hpp:385
NCBI_PARAM_TYPE(PSG, throttle_relaxation_period) TPSG_ThrottlePeriod
Definition: misc.hpp:365
const TYPE & Get(const CNamedParameterList *param)
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::CREATED created
const GenericPointer< typename T::ValueType > T2 value
Definition: pointer.h:1227
int strcmp(const char *str1, const char *str2)
Definition: odbc_utils.hpp:160
T max(T x_, T y_)
T min(T x_, T y_)
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
static bool less(const CSeq_feat *A, const CSeq_feat *B)
static const char * prefix[]
Definition: pcregrep.c:405
EPSG_Status
Retrieval result.
Definition: psg_client.hpp:626
@ eSuccess
Successfully retrieved.
@ eInProgress
Retrieval is not finalized yet, more info may come.
@ eForbidden
User is not authorized for the retrieval.
@ eCanceled
Request canceled.
@ eError
An error was encountered while trying to send request or to read and to process the reply.
@ eNotFound
Not found.
uint64_t s_GetStatsPeriod()
NCBI_PARAM_ENUM_DEF(EPSG_DebugPrintout, PSG, debug_printout, EPSG_DebugPrintout::eNone)
auto s_GetCode(const string &code)
NCBI_PARAM_DEF(double, PSG, request_timeout, 10.0)
#define HTTP_STATUS_HEADER
EPSG_StatsCountersRetries
@ ePSG_StatsCountersRetries_Retry
@ ePSG_StatsCountersRetries_Timeout
uint64_t s_GetDiscoveryRepeat(const CServiceDiscovery &service)
EDiagSev s_GetSeverity(const string &severity)
PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, rd_buf_size, 64 *1024, 1024)
shared_ptr< SPSG_Stats > s_GetStats(SPSG_Servers::TTS &servers)
SPSG_IoCoordinator.
NCBI_PARAM_DEF_EX(string, PSG, service, "PSG2", eParam_Default, NCBI_PSG_SERVICE)
NCBI_PARAM_ENUM_ARRAY(EPSG_DebugPrintout, PSG, debug_printout)
#define PSG_IO_SESSION_TRACE(message)
#define PSG_THROTTLING_TRACE(message)
#define PSG_IO_TRACE(message)
#define PSG_DISCOVERY_TRACE(message)
uint64_t SecondsToMs(double seconds)
Defines CRequestStatus class for NCBI C++ diagnostic API.
void Print(SSocketAddress address, const string &path, const string &sid, const string &phid, const string &ip, SUv_Tcp::TPort port)
int32_t Submit(const nghttp2_nv *nva, size_t nvlen, nghttp2_data_provider *data_prd=nullptr)
auto GetValue() const
void Emplace(TArgs &&... args)
void NotifyOne() volatile
Definition: misc.hpp:59
bool WaitUntil(TArgs &&... args) volatile
Definition: misc.hpp:63
SNoServers(const SPSG_Params &params, SPSG_Servers::TTS &servers)
bool operator()(bool discovered, SUv_Timer *timer)
void OnShutdown(uv_async_t *)
void OnTimer(uv_timer_t *handle)
SPSG_Servers::TTS & m_Servers
string GetCookie(const string &name) const
bool AddRequest(shared_ptr< SPSG_Request > req, const atomic_bool &stopped, const CDeadline &deadline)
SPSG_Servers::TTS m_Servers
SPSG_IoCoordinator(CServiceDiscovery service)
atomic< size_t > m_RequestCounter
SPSG_Thread< SPSG_DiscoveryImpl > m_Discovery
vector< unique_ptr< SPSG_Thread< SPSG_IoImpl > > > m_Io
void OnTimer(uv_timer_t *handle)
void OnQueue(uv_async_t *handle)
void OnShutdown(uv_async_t *handle)
SPSG_IoImpl.
void OnExecute(uv_loop_t &loop)
void AddNewServers(uv_async_t *handle)
int OnStreamClose(nghttp2_session *session, int32_t stream_id, uint32_t error_code)
bool Fail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)
bool RetryFail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)
SPSG_AsyncQueue & m_Queue
SPSG_Requests< SPSG_IoSession > m_Requests
SPSG_Submitter::TId GetInternalId() const
SPSG_IoSession(SPSG_Server &s, const SPSG_Params &params, SPSG_AsyncQueue &queue, uv_loop_t *loop, TNgHttp2Cbs &&... callbacks)
SPSG_IoSession.
bool ProcessRequest(SPSG_TimedRequest timed_req, SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req)
int OnHeader(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags)
void OnReset(SUvNgHttp2_Error error) override
array< SNgHttp2_Header< NGHTTP2_NV_FLAG_NO_COPY_NAME >, eSize > m_Headers
int OnData(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len)
bool Cmp(TValue s) const
TPSG_RequestsPerIo requests_per_io
string GetCookie(function< string()> get_auth_token)
static unsigned s_GetCompetitiveAfter(double io_timer_period, double timeout)
const unsigned competitive_after
static unsigned s_GetRequestTimeout(double io_timer_period)
TPSG_IoTimerPeriod io_timer_period
const unsigned request_timeout
SPSG_Nullable< size_t > expected
vector< SPSG_Chunk > chunks
const volatile atomic_bool & InProgress() const volatile
void AddMessage(string message, EDiagSev severity, optional< int > code)
void SetStatus(EPSG_Status status, bool reset) volatile
void AddError(string message, EPSG_Status status=EPSG_Status::eError, EDiagSev severity=eDiag_Error, optional< int > code=nullopt)
deque< SPSG_Message > m_Messages
SPSG_Message GetMessage(EDiagSev min_severity)
static EPSG_Status FromRequestStatus(int status)
shared_ptr< TPSG_Queue > queue
void SetFailed(string message, EPSG_Status status=EPSG_Status::eError)
SThreadSafe< list< SItem::TTS * > > new_items
SThreadSafe< list< SItem::TTS > > items
optional< SItem::TTS * > GetNextItem(CDeadline deadline)
unsigned GetRetries(SPSG_Retries::EType type, bool refused_stream)
unordered_map< string, SPSG_Reply::SItem::TTS * > m_ItemsByID
EStateResult Add()
SPSG_Processor processed_by
SPSG_Retries m_Retries
bool Retry(const SUvNgHttp2_Error &error, bool refused_stream=false)
EStateResult StateData(const char *&data, size_t &len)
shared_ptr< SPSG_Reply > reply
SPSG_Request(string p, shared_ptr< SPSG_Reply > r, CRef< CRequestContext > c, const SPSG_Params &params)
bool Fail(SPSG_Processor::TId processor_id, const SUvNgHttp2_Error &error, bool refused_stream=false)
EStateResult StatePrefix(const char *&data, size_t &len)
auto & OnReplyDone(SPSG_Processor::TId processor_id)
EUpdateResult UpdateItem(SPSG_Args::EItemType item_type, SPSG_Reply::SItem &item, const SPSG_Args &args)
EStateResult StateArgs(const char *&data, size_t &len)
auto erase(iterator it)
auto emplace(TArgs &&... args)
SPSG_Throttling throttling
const SSocketAddress address
vector< pair< atomic_uint64_t, atomic_uint > > m_Data
void Report(const char *prefix, unsigned report)
static const char * GetName(EAvgTime avg_time)
static void Func(TData &data)
static void Func(const TData &data, const char *prefix, unsigned report)
void Apply(EGroup start_with, TArgs &&... args)
void Report(TArgs &&... args)
vector< vector< atomic_uint > > TData
void Report(const char *prefix, unsigned report, const char *name)
SData< CPSG_BlobId > m_Blobs
void Report(const char *prefix, unsigned report)
SThreadSafe< unordered_set< string > > m_TSEs
SData< CPSG_ChunkId > m_Chunks
SPSG_Servers::TTS & m_Servers
SPSG_Stats(SPSG_Servers::TTS &servers)
SThreshold(string error_rate)
SPSG_ThrottleParams.
constexpr static size_t kMaxDenominator
TPSG_ThrottleUntilDiscovery until_discovery
const volatile uint64_t period
TPSG_ThrottleMaxFailures max_failures
pair< bitset< SPSG_ThrottleParams::SThreshold::kMaxDenominator >, size_t > threshold_reg
bool Adjust(const SSocketAddress &address, bool result)
SPSG_Throttling(const SSocketAddress &address, SPSG_ThrottleParams p, uv_loop_t *l)
SPSG_Throttling.
atomic< EThrottling > m_Active
SThreadSafe< SStats > m_Stats
static void s_OnSignal(uv_async_t *handle)
bool Adjust(bool result)
const SSocketAddress & m_Address
bool CheckExpiration(const SPSG_Params &params, const SUvNgHttp2_Error &error, TOnRetry on_retry, TOnFail on_fail)
SLock< TType > GetLock()
static SUvNgHttp2_Error FromNgHttp2(T e, const char *w)
static const char * NgHttp2Str(T e)
void Reset(SUvNgHttp2_Error error, SUv_Tcp::ECloseType close_type=SUv_Tcp::eCloseReset)
SNgHttp2_Session m_Session
pair< string, string > TCred
static const string & Get()
void Init(void *d, uv_loop_t *l, uv_async_cb cb)
TPort GetLocalPort() const
unsigned short TPort
uint64_t GetDefaultRepeat() const
void ResetRepeat()
void Init(uv_loop_t *l)
void SetRepeat(uint64_t r)
Definition: inftrees.h:24
Definition: type.c:6
#define _TROUBLE
#define _ASSERT
int g(Seg_Gsm *spe, Seq_Mtf *psm, Thd_Gsm *tdg)
Definition: thrddgri.c:44
else result
Definition: token2.c:20
static CS_CONTEXT * context
Definition: will_convert.c:21
Modified on Sun Apr 14 05:29:06 2024 by modify_doxy.py rev. 669887