34 #ifdef HAVE_PSG_CLIENT
44 #include <unordered_map>
45 #include <type_traits>
51 #define __STDC_FORMAT_MACROS
89 NCBI_PARAM_DEF(
unsigned, PSG, throttle_by_consecutive_connection_failures, 0);
144 auto value = TPSG_RequestTimeout::GetDefault();
146 if (
value < io_timer_period) {
148 " was increased to the minimum allowed value ('" << io_timer_period <<
"')");
149 value = io_timer_period;
152 return static_cast<unsigned>(
value / io_timer_period);
157 auto value = TPSG_CompetitiveAfter::GetDefault();
158 timeout *= io_timer_period;
160 if ((
value > 0.0) && (
value < io_timer_period)) {
162 " was increased to the minimum allowed value ('" << io_timer_period <<
"')");
163 value = io_timer_period;
166 if (
value >= timeout) {
168 "as it was greater or equal to request timeout ('" << timeout <<
"')");
169 }
else if (
value > 0.0) {
173 return static_cast<unsigned>(timeout / io_timer_period);
184 for (
const auto& cookie : cookies) {
185 if (cookie.GetName() == auth_token_name.
Get()) {
198 if (!
ip.empty()) os <<
";IP=" <<
ip;
199 if (port) os <<
";PORT=" << port;
200 if (m_Params.proxy) os <<
";PROXY=" << m_Params.proxy;
202 ERR_POST(Message <<
id <<
": " << address << path <<
";SID=" << sid <<
";PHID=" << phid << os.str());
215 os <<
"<BINARY DATA OF " << chunk.size() <<
" BYTES>";
228 ERR_POST(Message <<
id <<
": Retrying (" << retries <<
" retries remaining) after " <<
error);
241 for (
const auto& event : m_Events) {
242 auto ms = get<0>(event);
243 auto type = get<1>(event);
244 auto thread_id = get<2>(event);
245 os << fixed <<
id <<
'\t' << ms <<
'\t' <<
type <<
'\t' << thread_id <<
'\n';
248 cout << os.str() << flush;
257 static constexpr
auto prefix =
"\trequest\ttype=";
289 static constexpr
auto prefix =
"\treply_item\ttype=";
330 static constexpr
auto prefix =
"\tskipped_blob\treason=";
359 static constexpr
auto prefix =
"\treply_item_status\tstatus=";
392 static constexpr
auto prefix =
"\tmessage\tseverity=";
430 static constexpr
auto prefix =
"\tretries\tevent=";
450 template <SPSG_Stats::EGroup group>
455 for (
auto& counter : data.back()) {
460 template <SPSG_Stats::EGroup group>
466 const auto&
g = data[group];
469 for (
auto i : TGroup::values) {
470 auto n =
g[
static_cast<size_t>(
i)].load();
477 Apply<SInit>(eRequest, m_Data);
480 template <
class TWhat,
class...
TArgs>
485 switch (start_with) {
486 case eRequest: TWhat::template Func<eRequest> (std::forward<TArgs>(args)...);
487 case eReplyItem: TWhat::template Func<eReplyItem> (std::forward<TArgs>(args)...);
488 case eSkippedBlob: TWhat::template Func<eSkippedBlob> (std::forward<TArgs>(args)...);
489 case eReplyItemStatus: TWhat::template Func<eReplyItemStatus> (std::forward<TArgs>(args)...);
490 case eMessage: TWhat::template Func<eMessage> (std::forward<TArgs>(args)...);
491 case eRetries: TWhat::template Func<eRetries> (std::forward<TArgs>(args)...);
495 template <
class...
TArgs>
498 Apply<SReport>(eRequest, m_Data, std::forward<TArgs>(args)...);
502 m_Data(eTimeUntilResend + 1)
523 auto v = data.first.load();
524 auto n = data.second.load();
529 template <
class TDataId>
533 static auto Tuple(
const CPSG_BlobId&
id) {
return tie(
id.
GetId(),
id.GetLastModified()); }
534 static auto Tuple(
const CPSG_ChunkId&
id) {
return tuple<int, const string&>(
id.GetId2Chunk(),
id.GetId2Info()); }
535 bool operator()(
const TDataId& lhs,
const TDataId& rhs)
const {
return Tuple(lhs) < Tuple(rhs); }
541 if (
auto locked = m_Ids.GetLock()) {
542 total = locked->
size();
546 for (
const auto& data_id : *locked) {
547 auto created = unique_ids.emplace(data_id, 1);
552 ERR_POST(
Note <<
prefix << report << data_prefix <<
"\ttotal=" << total <<
"&unique=" << unique_ids.
size());
554 auto received = m_Received.load();
555 auto read = m_Read.load();
556 if (received)
ERR_POST(
Note <<
prefix << report << data_prefix <<
"_data\treceived=" << received <<
"&read=" << read);
560 for (
const auto& p : unique_ids) {
561 auto created = group_by_count.emplace(p.second, 1);
565 for (
const auto& p : group_by_count) {
566 ERR_POST(
Note <<
prefix << report << data_prefix <<
"_retrievals\tnumber=" << p.first <<
"&unique_ids=" << p.second);
579 return SecondsToMs(TPSG_StatsPeriod::GetDefault());
591 const auto prefix =
"PSG_STATS\t";
600 for (
const auto& server : *servers_locked) {
601 auto n = server.stats.load();
602 if (
n)
ERR_POST(
Note <<
prefix << report <<
"\tserver\tname=" << server.address <<
"&requests_sent=" <<
n);
609 if (m && ((min_severity ==
eDiag_Trace) || ((m.severity !=
eDiag_Trace) && (m.severity >= min_severity)))) {
619 m_InProgress.store(
true);
650 const auto message =
"Protocol error: received less than expected";
651 bool missing =
false;
653 if (
auto items_locked =
items.GetLock()) {
654 for (
auto& item : *items_locked) {
664 if (missing || reply_item_locked->expected.Cmp<greater>(reply_item_locked->received)) {
665 reply_item_locked->state.AddError(message);
668 reply_item_locked->state.SetComplete();
677 if (
auto items_locked =
items.GetLock()) {
678 for (
auto& item : *items_locked) {
687 reply_item_locked->state.AddError(message, status);
688 reply_item_locked->state.SetComplete();
698 bool was_in_progress =
reply_item->state.InProgress();
700 if (
auto new_items_locked =
new_items.GetLock()) {
701 if (!new_items_locked->empty()) {
702 auto rv = new_items_locked->front();
703 new_items_locked->pop_front();
709 if (!was_in_progress) {
720 items.GetLock()->clear();
726 auto guard = m_ExistingGuard.lock();
731 m_ExistingGuard = guard;
738 full_path(std::move(p)),
761 static const string kPrefix =
"\n\nPSG-Reply-Chunk: ";
766 while (*data ==
kPrefix[index]) {
771 if (++index ==
kPrefix.size()) {
779 if (
reply->raw && !index) {
786 const auto message =
"Protocol error: prefix mismatch";
788 if (
Retry(message)) {
792 reply->reply_item.GetLock()->state.AddError(message);
799 while (*data !=
'\n') {
809 const auto& size_str = args.
GetValue(
"size");
810 const auto size = size_str.empty() ? 0ul : stoul(size_str);
834 chunk.append(data, data_size);
863 return code.empty() ? optional<int>{} : atoi(
code.c_str());
874 auto& reply_item_ts =
reply->reply_item;
877 if (
auto item_locked = reply_item_ts.GetLock()) {
878 if (
auto update_result =
UpdateItem(item_type, *item_locked, args); update_result ==
eRetry503) {
880 }
else if (update_result ==
eNewItem) {
887 reply_item_ts.NotifyOne();
890 if (
auto reply_item_locked = reply_item_ts.GetLock()) {
891 auto& reply_item = *reply_item_locked;
892 ++reply_item.received;
894 if (reply_item.expected.Cmp<
less>(reply_item.received)) {
895 reply_item.state.AddError(
"Protocol error: received more than expected");
899 auto item_id = args.
GetValue(
"item_id");
901 bool to_create = !item_by_id;
904 if (
auto items_locked =
reply->items.GetLock()) {
905 items_locked->emplace_back();
906 item_by_id = &items_locked->back();
910 if (
auto item_locked = item_by_id->GetLock()) {
911 auto update_result =
UpdateItem(item_type, *item_locked, args);
918 item_locked->args = std::move(args);
925 reply->new_items.GetLock()->emplace_back(item_by_id);
928 reply_item_ts.NotifyOne();
932 item_by_id->NotifyOne();
935 reply->queue->NotifyOne();
952 auto n_chunks = args.
GetValue(
"n_chunks");
954 if (!n_chunks.empty()) {
958 item.
state.
AddError(
"Protocol error: contradicting n_chunks");
964 if (
const auto status = get_status(); can_retry_503(status,
"Server returned a meta with status 503")) {
975 static atomic_bool reported(
false);
977 if (!reported.exchange(
true)) {
978 ERR_POST(
"Received unknown chunk type: " << chunk_type.second.get());
981 if (TPSG_FailOnUnknownChunks::GetDefault()) {
982 item.
state.
AddError(
"Protocol error: unknown chunk type '" + chunk_type.second +
'\'');
994 }
else if (
const auto status = get_status(); can_retry_503(status, chunk.c_str())) {
1003 auto blob_chunk = args.
GetValue(
"blob_chunk");
1004 auto index = blob_chunk.empty() ? 0 : stoul(blob_chunk);
1017 if (item.
chunks.size() <= index) item.
chunks.resize(index + 1);
1019 item.
chunks[index] = std::move(chunk);
1025 item.
state.
AddError(
"Protocol error: received more than expected");
1029 reply->reply_item.GetLock()->state.AddError(
"Protocol error: received more than expected");
1041 #define HTTP_STATUS_HEADER ":status"
1046 template <
class... TNgHttp2Cbs>
1051 TPSG_RdBufSize::GetDefault(),
1052 TPSG_WrBufSize::GetDefault(),
1053 TPSG_Https::GetDefault(),
1054 TPSG_MaxConcurrentStreams::GetDefault(),
1055 std::forward<TNgHttp2Cbs>(callbacks)...),
1059 {
":method",
"GET" },
1060 {
":scheme", TPSG_Https::GetDefault() ?
"https" :
"http" },
1061 {
":authority", m_Authority },
1064 {
"http_ncbi_sid" },
1065 {
"http_ncbi_phid" },
1067 {
"x-forwarded-for" }
1078 if (
auto [processor_id, req] = it->second.Get(); req) {
1079 auto result = req->OnReplyData(processor_id, (
const char*)data,
len);
1082 it->second.ResetTime();
1091 req->reply->SetComplete();
1108 reply->debug_printout << retries <<
error << endl;
1131 m_Buffer.
args =
"item_id=1&item_type=unknown&chunk_type=data_and_meta&n_chunks=1"s;
1133 m_Buffer.
args =
"item_id=0&item_type=reply&chunk_type=meta&n_chunks=2"s;
1140 auto context_guard = req->context.Set();
1141 auto rv = req->Fail(processor_id,
error, refused_stream);
1147 req->reply->debug_printout.id <<
"', " <<
error);
1163 if (
auto [processor_id, req] = it->second.Get(); req) {
1164 auto context_guard = req->context.Set();
1165 auto& debug_printout = req->reply->debug_printout;
1166 debug_printout << error_code << endl;
1172 if (
RetryFail(processor_id, req,
error, error_code == NGHTTP2_REFUSED_STREAM)) {
1176 if (req->reply->raw) {
1180 req->OnReplyDone(processor_id)->SetComplete();
1183 debug_printout.id <<
"' successfully");
1196 if ((frame->hd.type == NGHTTP2_HEADERS) && (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) &&
1199 auto stream_id = frame->hd.stream_id;
1200 auto status_str =
reinterpret_cast<const char*
>(
value);
1209 if (
auto [processor_id, req] = it->second.Get(); req) {
1211 req->OnReplyDone(processor_id)->SetFailed(
error, status);
1226 auto context_guard = req->context.Set();
1229 const auto& path = req->full_path;
1232 const auto& cookie =
m_Params.
GetCookie(req->flags, [&]() { return context.GetProperty(
"auth_token"); });
1241 if (!client_ip.empty()) {
1249 if (stream_id < 0) {
1267 m_Requests.emplace(stream_id, std::move(timed_req));
1281 template <
class TOnRetry,
class TOnFail>
1284 auto [processor_id, req] =
Get();
1294 if (req->Retry(
error)) {
1302 on_fail(processor_id, req);
1315 auto on_fail = [&](
auto processor_id,
auto req) {
Fail(processor_id, req,
error); };
1318 if (it->second.CheckExpiration(
m_Params,
error, on_retry, on_fail)) {
1328 bool some_requests_failed =
false;
1331 if (
auto [processor_id, req] = pair.second.Get(); req) {
1333 some_requests_failed =
true;
1338 if (some_requests_failed) {
1350 if (error_rate.empty())
return;
1352 string numerator_str, denominator_str;
1354 if (!
NStr::SplitInTwo(error_rate,
"/", numerator_str, denominator_str))
return;
1383 m_Stats(std::move(p)),
1385 m_Timer(this, s_OnTimer, Configured(), 0)
1405 auto stats_locked =
m_Stats.GetLock();
1426 "' reached the maximum number of failures in a row (" <<
params.
max_failures <<
')');
1440 ERR_POST(
Warning <<
"Server '" << address <<
"' is considered bad/overloaded ("
1456 threshold_reg.first.reset();
1466 for (
auto& server : m_Sessions) {
1467 for (
auto& session : server.sessions) {
1475 auto servers_locked = m_Servers.GetLock();
1476 auto& servers = *servers_locked;
1478 for (
auto& server : servers) {
1479 server.throttling.StartClose();
1487 auto servers_locked = m_Servers.GetLock();
1488 auto& servers = *servers_locked;
1490 for (
auto& server : servers) {
1491 server.throttling.FinishClose();
1498 auto servers_locked = m_Servers.GetLock();
1499 auto& servers = *servers_locked;
1502 const auto servers_size = m_Servers->size();
1503 const auto sessions_size = m_Sessions.size();
1505 _ASSERT(servers_size >= sessions_size);
1507 for (
auto new_servers = servers_size - sessions_size; new_servers; --new_servers) {
1508 auto& server = servers[servers_size - new_servers];
1509 m_Sessions.emplace_back().sessions.emplace_back(server, m_Params, m_Queue, handle->loop);
1510 PSG_IO_TRACE(
"Session for server '" << server.address <<
"' was added");
1516 auto available_servers = 0;
1518 for (
auto& server : m_Sessions) {
1519 server.current_rate = server->rate.load();
1521 if (server.current_rate) {
1522 ++available_servers;
1526 auto remaining_submits = m_Params.max_concurrent_submits.Get();
1528 auto i = m_Sessions.begin();
1529 auto [timed_req, processor_id, req] = decltype(m_Queue.Pop()){};
1530 auto d = m_Random.first;
1531 auto request_rate = 0.0;
1532 auto target_rate = 0.0;
1536 auto get_request = [&, &timed_req = timed_req, &processor_id = processor_id, &req = req]() {
1537 tie(timed_req, processor_id, req) = m_Queue.Pop();
1545 target_rate = d(m_Random.second);
1546 _DEBUG_CODE(req_id = req->reply->debug_printout.id;);
1547 PSG_IO_TRACE(
"Ready to submit request '" << req_id <<
'\'');
1551 auto next_server = [&]() {
1552 if (++
i == m_Sessions.end())
i = m_Sessions.begin();
1555 auto ignore_server = [&]() {
1556 if (--available_servers) {
1557 d = uniform_real_distribution<>(0.0, d.max() -
i->current_rate);
1558 i->current_rate = 0.0;
1563 auto find_session = [&]() {
1564 auto s =
i->sessions.begin();
1565 for (; (s !=
i->sessions.end()) && s->IsFull(); ++s);
1566 return make_pair(s !=
i->sessions.end(), s);
1569 while (available_servers && remaining_submits) {
1571 if (!req && !get_request()) {
1576 for (; request_rate +=
i->current_rate, request_rate < target_rate; next_server());
1581 if (server->throttling.Active()) {
1582 PSG_IO_TRACE(
"Server '" << server->address <<
"' is throttled, ignoring");
1586 }
else if (server->available_streams <= 0) {
1587 PSG_IO_TRACE(
"Server '" << server->address <<
"' is at request limit, ignoring");
1591 }
else if (
auto [found, session] = find_session(); !found) {
1592 PSG_IO_TRACE(
"Server '" << server->address <<
"' has no sessions available, ignoring");
1596 }
else if (!session->CanProcessRequest(req)) {
1597 PSG_IO_TRACE(
"Server '" << session->GetId() <<
"' already working/worked on the request, trying to find another one");
1599 req->submitted_by.Reset();
1603 }
else if (!session->ProcessRequest(*std::move(timed_req), processor_id, std::move(req))) {
1604 PSG_IO_TRACE(
"Server '" << session->GetId() <<
"' failed to get request '" << req_id <<
"' with rate = " << target_rate);
1609 PSG_IO_TRACE(
"Server '" << session->GetId() <<
"' got request '" << req_id <<
"' with rate = " << target_rate);
1610 --remaining_submits;
1614 if (session->IsFull() && (distance(session, server.sessions.end()) == 1)) {
1615 if (server.sessions.size() >= TPSG_MaxSessions::GetDefault()) {
1616 PSG_IO_TRACE(
"Server '" << server->address <<
"' reached session limit");
1619 server.sessions.emplace_back(*server, m_Params, m_Queue, handle->loop);
1620 PSG_IO_TRACE(
"Additional session for server '" << server->address <<
"' was added");
1628 m_Queue.Emplace(*std::move(timed_req));
1631 if (!remaining_submits) {
1632 PSG_IO_TRACE(
"Max concurrent submits reached, submitted: " << m_Params.max_concurrent_submits);
1635 PSG_IO_TRACE(
"No sessions available [anymore], submitted: " << m_Params.max_concurrent_submits - remaining_submits);
1641 const auto kRegularRate = nextafter(0.009, 1.0);
1642 const auto kStandbyRate = 0.001;
1644 const auto& service_name = m_Service.GetServiceName();
1645 auto discovered = m_Service();
1647 auto total_preferred_regular_rate = 0.0;
1648 auto total_preferred_standby_rate = 0.0;
1649 auto total_regular_rate = 0.0;
1650 auto total_standby_rate = 0.0;
1653 for (
auto& server : discovered) {
1656 if (server.second >= kRegularRate) {
1657 if (is_server_preferred) {
1658 total_preferred_regular_rate += server.second;
1661 total_regular_rate += server.second;
1663 }
else if (server.second >= kStandbyRate) {
1664 if (is_server_preferred) {
1665 total_preferred_standby_rate += server.second;
1668 total_standby_rate += server.second;
1672 if (m_NoServers(total_regular_rate || total_standby_rate, SUv_Timer::GetThat<SUv_Timer>(handle))) {
1673 ERR_POST(
"No servers in service '" << service_name <<
'\'');
1677 const auto localhost_preference = TPSG_LocalhostPreference::GetDefault();
1678 const auto total_preferred_standby_percentage = localhost_preference ? 1.0 - 1.0 / localhost_preference : 0.0;
1679 const auto have_any_regular_servers = total_regular_rate > 0.0;
1680 const auto have_nonpreferred_regular_servers = total_regular_rate > total_preferred_regular_rate;
1681 const auto have_nonpreferred_standby_servers = total_standby_rate > total_preferred_standby_rate;
1682 const auto regular_rate_adjustment =
1683 (localhost_preference <= 1) ||
1684 (total_preferred_regular_rate != 0.0) ||
1685 (total_preferred_standby_rate == 0.0) ||
1686 (!have_nonpreferred_regular_servers && !have_nonpreferred_standby_servers);
1687 auto rate_total = 0.0;
1690 for (
auto& server : discovered) {
1692 const auto old_rate = server.second;
1694 if (server.second >= kRegularRate) {
1695 if (is_server_preferred) {
1696 if (regular_rate_adjustment) {
1697 server.second *= localhost_preference;
1698 }
else if (have_nonpreferred_regular_servers) {
1699 server.second = 0.0;
1700 }
else if (have_nonpreferred_standby_servers) {
1701 server.second = 0.0;
1704 if (regular_rate_adjustment) {
1706 }
else if (have_nonpreferred_regular_servers) {
1707 server.second *= (1 - total_preferred_standby_percentage) / total_regular_rate;
1708 }
else if (have_nonpreferred_standby_servers) {
1709 server.second = 0.0;
1712 }
else if (server.second >= kStandbyRate) {
1713 if (is_server_preferred) {
1714 if (regular_rate_adjustment && have_any_regular_servers) {
1715 server.second = 0.0;
1716 }
else if (regular_rate_adjustment) {
1717 server.second *= localhost_preference;
1718 }
else if (have_nonpreferred_regular_servers) {
1719 server.second *= total_preferred_standby_percentage / total_preferred_standby_rate;
1720 }
else if (have_nonpreferred_standby_servers) {
1721 server.second *= total_preferred_standby_percentage / total_preferred_standby_rate;
1724 if (regular_rate_adjustment && have_any_regular_servers && (localhost_preference || have_nonpreferred_regular_servers)) {
1725 server.second = 0.0;
1726 }
else if (regular_rate_adjustment) {
1728 }
else if (have_nonpreferred_regular_servers) {
1729 server.second = 0.0;
1730 }
else if (have_nonpreferred_standby_servers) {
1731 server.second *= (1 - total_preferred_standby_percentage) / (total_standby_rate - total_preferred_standby_rate);
1736 rate_total += server.second;
1738 if (old_rate != server.second) {
1739 PSG_DISCOVERY_TRACE(
"Rate for '" << server.first <<
"' adjusted from " << old_rate <<
" to " << server.second);
1743 auto servers_locked = m_Servers.GetLock();
1744 auto& servers = *servers_locked;
1747 for (
auto& server : servers) {
1749 auto it = find_if(discovered.begin(), discovered.end(), address_same);
1753 PSG_DISCOVERY_TRACE(
"Server '" << server.address <<
"' disabled in service '" << service_name <<
'\'');
1756 server.throttling.Discovered();
1757 auto rate = it->second / rate_total;
1759 if (server.rate != rate) {
1762 (server.rate ?
"' updated in service '" :
"' enabled in service '" ) <<
1763 service_name <<
"' with rate = " << rate);
1774 for (
auto& server : discovered) {
1776 auto rate = server.second / rate_total;
1777 servers.emplace_back(server.first, rate, m_Params.max_concurrent_requests_per_server, m_ThrottleParams, handle->loop);
1780 service_name <<
"' with rate = " << rate);
1784 m_QueuesRef.SignalAll();
1789 if (m_Servers->fail_requests) {
1792 CheckRequestExpiration();
1795 for (
auto& server : m_Sessions) {
1796 for (
auto& session : server.sessions) {
1797 session.CheckRequestExpiration();
1804 auto queue_locked = m_Queue.GetLockedQueue();
1805 list<SPSG_TimedRequest> retries;
1808 auto on_retry = [&](
auto req) { retries.emplace_back(req); m_Queue.Signal(); };
1809 auto on_fail = [&](
auto processor_id,
auto req) { req->Fail(processor_id,
error); };
1811 for (
auto it = queue_locked->begin(); it != queue_locked->end(); ) {
1812 if (it->CheckExpiration(m_Params,
error, on_retry, on_fail)) {
1813 it = queue_locked->erase(it);
1819 queue_locked->splice(queue_locked->end(), retries);
1824 auto queue_locked = m_Queue.GetLockedQueue();
1827 for (
auto& timed_req : *queue_locked) {
1828 if (
auto [processor_id, req] = timed_req.Get(); req) {
1829 auto context_guard = req->context.Set();
1830 auto& debug_printout = req->reply->debug_printout;
1831 debug_printout <<
error << endl;
1832 req->OnReplyDone(processor_id)->SetFailed(
error);
1833 PSG_IO_TRACE(
"No servers to process request '" << debug_printout.id <<
'\'');
1837 queue_locked->clear();
1842 m_Queue.Init(
this, &loop, s_OnQueue);
1853 m_RetryDelay(
SecondsToMs(TPSG_NoServersRetryDelay::GetDefault())),
1854 m_Timeout(
SecondsToMs((params.request_timeout + params.competitive_after * params.request_retries) * params.io_timer_period)),
1855 m_FailRequests(const_cast<atomic_bool&>(servers->fail_requests))
1872 const auto timeout_expired = m_Passed >= m_Timeout;
1873 m_FailRequests = timeout_expired;
1877 }
else if (!timeout_expired) {
1891 if (TPSG_Stats::GetDefault()) {
1892 return make_shared<SPSG_Stats>(servers);
1905 m_StartBarrier(TPSG_NumIo::GetDefault() + 2),
1906 m_StopBarrier(TPSG_NumIo::GetDefault() + 1),
1908 m_RequestCounter(0),
1913 for (
unsigned i = 0;
i < TPSG_NumIo::GetDefault();
i++) {
1925 for (
auto& io :
m_Io) {
1932 if (
m_Io.size() == 0) {
1937 m_Queues[idx].Emplace(std::move(req));
@ eEndOfReply
No more items expected in the (overall!) reply.
bool IsSingleServer() const
pair< SSocketAddress, double > TServer
constexpr const tuple_element< _Index, ct_const_tuple< _Types... > >::type & get(const ct_const_tuple< _Types... > &_Tuple) noexcept
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
#define _DEBUG_CODE(code)
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
string GetSessionID(void) const
Session ID.
const string & GetNextSubHitID(CTempString prefix=CTempString())
Get current hit id appended with auto-incremented sub-hit id.
static string GetStdStatusMessage(ECode code)
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
string GetClientIP(void) const
Client IP/hostname.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
EDiagSev
Severity level for the posted diagnostics.
@ eDiag_Trace
Trace message.
@ eDiag_Info
Informational message.
@ eDiag_Error
Error message.
@ eDiag_Warning
Warning message.
@ eDiag_Fatal
Fatal error – guarantees exit(or abort)
@ eDiag_Critical
Critical error message.
@ e503_ServiceUnavailable
@ e451_Unavailable_For_Legal_Reasons
void Warning(CExceptionArgs_Base &args)
void Fatal(CExceptionArgs_Base &args)
void Add(const CHttpCookie &cookie)
Add a single cookie.
const CSeq_id & GetId(const CSeq_loc &loc, CScope *scope)
If all CSeq_ids embedded in CSeq_loc refer to the same CBioseq, returns the first CSeq_id found,...
@ eParam_Default
Default flags.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
static unsigned int GetLocalHostAddress(ESwitch reget=eDefault)
Local host address in network byte order (cached for faster retrieval)
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
static string URLDecode(const CTempString str, EUrlDecode flag=eUrlDec_All)
URL-decode string.
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
@ fAllowTrailingSpaces
Ignore trailing space characters.
@ fConvErr_NoThrow
Do not throw an exception on error.
@ fAllowLeadingSpaces
Ignore leading spaces in converted string.
string GetQueryString(EAmpEncoding amp_enc, NStr::EUrlEncode encode) const
Construct and return complete query string.
@ eAmp_Char
Use & to separate arguments.
where both of them are integers Note
NCBI_PARAM_TYPE(PSG, throttle_by_connection_error_rate) TPSG_ThrottleThreshold
SPSG_ParamValue< NCBI_PARAM_TYPE(PSG, auth_token)> TPSG_AuthToken
NCBI_PARAM_TYPE(PSG, throttle_relaxation_period) TPSG_ThrottlePeriod
const TYPE & Get(const CNamedParameterList *param)
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::CREATED created
int strcmp(const char *str1, const char *str2)
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
double f(double x_, const double &y_)
static const char * expected[]
static bool less(const CSeq_feat *A, const CSeq_feat *B)
static const char * prefix[]
EPSG_Status
Retrieval result.
@ eSuccess
Successfully retrieved.
@ eInProgress
Retrieval is not finalized yet, more info may come.
@ eForbidden
User is not authorized for the retrieval.
@ eCanceled
Request canceled.
@ eError
An error was encountered while trying to send request or to read and to process the reply.
uint64_t s_GetStatsPeriod()
NCBI_PARAM_ENUM_DEF(EPSG_DebugPrintout, PSG, debug_printout, EPSG_DebugPrintout::eNone)
auto s_GetCode(const string &code)
NCBI_PARAM_DEF(double, PSG, request_timeout, 10.0)
#define HTTP_STATUS_HEADER
EPSG_StatsCountersRetries
@ ePSG_StatsCountersRetries_Retry
@ ePSG_StatsCountersRetries_Timeout
uint64_t s_GetDiscoveryRepeat(const CServiceDiscovery &service)
EDiagSev s_GetSeverity(const string &severity)
PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, rd_buf_size, 64 *1024, 1024)
shared_ptr< SPSG_Stats > s_GetStats(SPSG_Servers::TTS &servers)
SPSG_IoCoordinator.
NCBI_PARAM_DEF_EX(string, PSG, service, "PSG2", eParam_Default, NCBI_PSG_SERVICE)
NCBI_PARAM_ENUM_ARRAY(EPSG_DebugPrintout, PSG, debug_printout)
#define PSG_IO_SESSION_TRACE(message)
#define PSG_THROTTLING_TRACE(message)
#define PSG_IO_TRACE(message)
#define PSG_DISCOVERY_TRACE(message)
uint64_t SecondsToMs(double seconds)
Defines CRequestStatus class for NCBI C++ diagnostic API.
unsigned __int64 uint64_t
void Print(SSocketAddress address, const string &path, const string &sid, const string &phid, const string &ip, SUv_Tcp::TPort port)
int32_t Submit(const nghttp2_nv *nva, size_t nvlen, nghttp2_data_provider *data_prd=nullptr)
void Emplace(TArgs &&... args)
void NotifyOne() volatile
bool WaitUntil(TArgs &&... args) volatile
SNoServers(const SPSG_Params ¶ms, SPSG_Servers::TTS &servers)
bool operator()(bool discovered, SUv_Timer *timer)
void OnShutdown(uv_async_t *)
void OnTimer(uv_timer_t *handle)
SPSG_Servers::TTS & m_Servers
bool AddRequest(shared_ptr< SPSG_Request > req, const atomic_bool &stopped, const CDeadline &deadline)
SPSG_Servers::TTS m_Servers
SUv_Barrier m_StartBarrier
SPSG_IoCoordinator(CServiceDiscovery service)
atomic< size_t > m_RequestCounter
SUv_Barrier m_StopBarrier
TPSG_AsyncQueues m_Queues
SPSG_Thread< SPSG_DiscoveryImpl > m_Discovery
vector< unique_ptr< SPSG_Thread< SPSG_IoImpl > > > m_Io
void OnTimer(uv_timer_t *handle)
void OnQueue(uv_async_t *handle)
void CheckRequestExpiration()
void OnShutdown(uv_async_t *handle)
SPSG_IoImpl.
void OnExecute(uv_loop_t &loop)
void AddNewServers(uv_async_t *handle)
int OnStreamClose(nghttp2_session *session, int32_t stream_id, uint32_t error_code)
bool Fail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)
bool RetryFail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)
void EraseAndMoveToNext(TRequests::iterator &it)
SPSG_AsyncQueue & m_Queue
SPSG_Submitter::TId GetInternalId() const
SPSG_IoSession(SPSG_Server &s, const SPSG_Params ¶ms, SPSG_AsyncQueue &queue, uv_loop_t *loop, TNgHttp2Cbs &&... callbacks)
SPSG_IoSession.
void CheckRequestExpiration()
bool ProcessRequest(SPSG_TimedRequest timed_req, SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req)
int OnHeader(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags)
void OnReset(SUvNgHttp2_Error error) override
array< SNgHttp2_Header< NGHTTP2_NV_FLAG_NO_COPY_NAME >, eSize > m_Headers
int OnData(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len)
static string s_GetAuthToken(const CNcbiEnvironment &env, const TPSG_AuthTokenName &auth_token_name)
TPSG_RequestsPerIo requests_per_io
static unsigned s_GetCompetitiveAfter(double io_timer_period, double timeout)
string GetCookie(const CPSG_Request::TFlags &request_flags, TGet get)
const unsigned competitive_after
static unsigned s_GetRequestTimeout(double io_timer_period)
TPSG_IoTimerPeriod io_timer_period
const unsigned request_timeout
SPSG_Nullable< size_t > expected
vector< SPSG_Chunk > chunks
void SetComplete() volatile
const volatile atomic_bool & InProgress() const volatile
void AddMessage(string message, EDiagSev severity, optional< int > code)
void SetStatus(EPSG_Status status, bool reset) volatile
void AddError(string message, EPSG_Status status=EPSG_Status::eError, EDiagSev severity=eDiag_Error, optional< int > code=nullopt)
deque< SPSG_Message > m_Messages
SPSG_Message GetMessage(EDiagSev min_severity)
static EPSG_Status FromRequestStatus(int status)
shared_ptr< TPSG_Queue > queue
void SetFailed(string message, EPSG_Status status=EPSG_Status::eError)
SThreadSafe< list< SItem::TTS * > > new_items
SThreadSafe< list< SItem::TTS > > items
optional< SItem::TTS * > GetNextItem(CDeadline deadline)
unsigned GetRetries(SPSG_Retries::EType type, bool refused_stream)
unordered_map< string, SPSG_Reply::SItem::TTS * > m_ItemsByID
SPSG_Processor processed_by
bool Retry(const SUvNgHttp2_Error &error, bool refused_stream=false)
EStateResult StateData(const char *&data, size_t &len)
shared_ptr< SPSG_Reply > reply
SPSG_Request(string p, CPSG_Request::TFlags f, shared_ptr< SPSG_Reply > r, CRef< CRequestContext > c, const SPSG_Params ¶ms)
bool Fail(SPSG_Processor::TId processor_id, const SUvNgHttp2_Error &error, bool refused_stream=false)
EStateResult StatePrefix(const char *&data, size_t &len)
auto & OnReplyDone(SPSG_Processor::TId processor_id)
EUpdateResult UpdateItem(SPSG_Args::EItemType item_type, SPSG_Reply::SItem &item, const SPSG_Args &args)
EStateResult StateArgs(const char *&data, size_t &len)
SPSG_Throttling throttling
atomic_int available_streams
const SSocketAddress address
vector< pair< atomic_uint64_t, atomic_uint > > m_Data
void Report(const char *prefix, unsigned report)
static const char * GetName(EAvgTime avg_time)
static const char * ValueName(type value)
static const char * ValueName(type value)
static const char * ValueName(type value)
static const char * ValueName(type value)
static const char * ValueName(type value)
static const char * ValueName(type value)
static void Func(TData &data)
static void Func(const TData &data, const char *prefix, unsigned report)
void Apply(EGroup start_with, TArgs &&... args)
void Report(TArgs &&... args)
vector< vector< atomic_uint > > TData
void Report(const char *prefix, unsigned report, const char *name)
SData< CPSG_BlobId > m_Blobs
void Report(const char *prefix, unsigned report)
SThreadSafe< unordered_set< string > > m_TSEs
SData< CPSG_ChunkId > m_Chunks
SPSG_Servers::TTS & m_Servers
SPSG_Stats(SPSG_Servers::TTS &servers)
SThreshold(string error_rate)
SPSG_ThrottleParams.
constexpr static size_t kMaxDenominator
TPSG_ThrottleUntilDiscovery until_discovery
const volatile uint64_t period
TPSG_ThrottleMaxFailures max_failures
pair< bitset< SPSG_ThrottleParams::SThreshold::kMaxDenominator >, size_t > threshold_reg
bool Adjust(const SSocketAddress &address, bool result)
SPSG_ThrottleParams params
SPSG_Throttling(const SSocketAddress &address, SPSG_ThrottleParams p, uv_loop_t *l)
SPSG_Throttling.
atomic< EThrottling > m_Active
SThreadSafe< SStats > m_Stats
static void s_OnSignal(uv_async_t *handle)
const SSocketAddress & m_Address
bool CheckExpiration(const SPSG_Params ¶ms, const SUvNgHttp2_Error &error, TOnRetry on_retry, TOnFail on_fail)
static SUvNgHttp2_Error FromNgHttp2(T e, const char *w)
static const char * NgHttp2Str(T e)
void Reset(SUvNgHttp2_Error error, SUv_Tcp::ECloseType close_type=SUv_Tcp::eCloseReset)
SNgHttp2_Session m_Session
pair< string, string > TCred
static const string & Get()
void Init(void *d, uv_loop_t *l, uv_async_cb cb)
TPort GetLocalPort() const
uint64_t GetDefaultRepeat() const
void SetRepeat(uint64_t r)
int g(Seg_Gsm *spe, Seq_Mtf *psm, Thd_Gsm *tdg)