34 #ifdef HAVE_PSG_CLIENT
44 #include <unordered_map>
45 #include <type_traits>
51 #define __STDC_FORMAT_MACROS
90 NCBI_PARAM_DEF(
unsigned, PSG, throttle_by_consecutive_connection_failures, 0);
150 for (
const auto& cookie : *m_Cookies) {
151 if (cookie.GetName() == name) {
161 auto combine = [](
auto p,
auto v) {
return v.empty()? v : p.Get() +
'=' +
NStr::URLEncode(v); };
163 auto admin_cookie = combine(admin_auth_token_name, admin_auth_token.Get());
164 auto hup_cookie = combine(auth_token_name, auth_token.Get().empty() ? get_auth_token() : auth_token.Get());
166 return admin_cookie.empty() ? hup_cookie : hup_cookie.empty() ? admin_cookie : admin_cookie +
"; " + hup_cookie;
171 auto value = TPSG_RequestTimeout::GetDefault();
173 if (
value < io_timer_period) {
175 " was increased to the minimum allowed value ('" << io_timer_period <<
"')");
176 value = io_timer_period;
179 return static_cast<unsigned>(
value / io_timer_period);
184 auto value = TPSG_CompetitiveAfter::GetDefault();
185 timeout *= io_timer_period;
187 if ((
value > 0.0) && (
value < io_timer_period)) {
189 " was increased to the minimum allowed value ('" << io_timer_period <<
"')");
190 value = io_timer_period;
193 if (
value >= timeout) {
195 "as it was greater or equal to request timeout ('" << timeout <<
"')");
196 }
else if (
value > 0.0) {
200 return static_cast<unsigned>(timeout / io_timer_period);
207 if (!
ip.empty()) os <<
";IP=" <<
ip;
208 if (port) os <<
";PORT=" << port;
209 if (m_Params.proxy) os <<
";PROXY=" << m_Params.proxy;
211 ERR_POST(Message <<
id <<
": " << address << path <<
";SID=" << sid <<
";PHID=" << phid << os.str());
224 os <<
"<BINARY DATA OF " << chunk.size() <<
" BYTES>";
237 ERR_POST(Message <<
id <<
": Retrying (" << retries <<
" retries remaining) after " <<
error);
250 for (
const auto& event : m_Events) {
251 auto ms = get<0>(event);
252 auto type = get<1>(event);
253 auto thread_id = get<2>(event);
254 os << fixed <<
id <<
'\t' <<
ms <<
'\t' <<
type <<
'\t' << thread_id <<
'\n';
257 cout << os.str() << flush;
266 static constexpr
auto prefix =
"\trequest\ttype=";
298 static constexpr
auto prefix =
"\treply_item\ttype=";
339 static constexpr
auto prefix =
"\tskipped_blob\treason=";
368 static constexpr
auto prefix =
"\treply_item_status\tstatus=";
401 static constexpr
auto prefix =
"\tmessage\tseverity=";
439 static constexpr
auto prefix =
"\tretries\tevent=";
459 template <SPSG_Stats::EGroup group>
464 for (
auto& counter :
data.back()) {
469 template <SPSG_Stats::EGroup group>
475 const auto&
g =
data[group];
478 for (
auto i : TGroup::values) {
479 auto n =
g[
static_cast<size_t>(
i)].load();
480 if (
n)
ERR_POST(
Note << prefix << report << TGroup::prefix << TGroup::ValueName(
i) <<
"&count=" <<
n);
486 Apply<SInit>(eRequest, m_Data);
489 template <
class TWhat,
class...
TArgs>
494 switch (start_with) {
495 case eRequest: TWhat::template Func<eRequest> (std::forward<TArgs>(args)...);
496 case eReplyItem: TWhat::template Func<eReplyItem> (std::forward<TArgs>(args)...);
497 case eSkippedBlob: TWhat::template Func<eSkippedBlob> (std::forward<TArgs>(args)...);
498 case eReplyItemStatus: TWhat::template Func<eReplyItemStatus> (std::forward<TArgs>(args)...);
499 case eMessage: TWhat::template Func<eMessage> (std::forward<TArgs>(args)...);
500 case eRetries: TWhat::template Func<eRetries> (std::forward<TArgs>(args)...);
504 template <
class...
TArgs>
507 Apply<SReport>(eRequest, m_Data, std::forward<TArgs>(args)...);
511 m_Data(eTimeUntilResend + 1)
532 auto v =
data.first.load();
533 auto n =
data.second.load();
534 if (
n)
ERR_POST(
Note << prefix << report <<
'\t' <<
GetName(
i) <<
"\taverage=" <<
double(v /
n) / milli::den);
538 template <
class TDataId>
542 static auto Tuple(
const CPSG_BlobId&
id) {
return tie(
id.
GetId(),
id.GetLastModified()); }
543 static auto Tuple(
const CPSG_ChunkId&
id) {
return tuple<int, const string&>(
id.GetId2Chunk(),
id.GetId2Info()); }
544 bool operator()(
const TDataId& lhs,
const TDataId& rhs)
const {
return Tuple(lhs) < Tuple(rhs); }
550 if (
auto locked = m_Ids.GetLock()) {
551 total = locked->
size();
555 for (
const auto& data_id : *locked) {
556 auto created = unique_ids.emplace(data_id, 1);
561 ERR_POST(
Note << prefix << report << data_prefix <<
"\ttotal=" << total <<
"&unique=" << unique_ids.
size());
563 auto received = m_Received.load();
564 auto read = m_Read.load();
565 if (received)
ERR_POST(
Note << prefix << report << data_prefix <<
"_data\treceived=" << received <<
"&read=" << read);
569 for (
const auto& p : unique_ids) {
570 auto created = group_by_count.emplace(p.second, 1);
574 for (
const auto& p : group_by_count) {
575 ERR_POST(
Note << prefix << report << data_prefix <<
"_retrievals\tnumber=" << p.first <<
"&unique_ids=" << p.second);
588 return SecondsToMs(TPSG_StatsPeriod::GetDefault());
600 const auto prefix =
"PSG_STATS\t";
609 for (
const auto& server : *servers_locked) {
610 auto n = server.stats.load();
611 if (
n)
ERR_POST(
Note << prefix << report <<
"\tserver\tname=" << server.address <<
"&requests_sent=" <<
n);
618 if (m && ((min_severity ==
eDiag_Trace) || ((m.severity !=
eDiag_Trace) && (m.severity >= min_severity)))) {
628 m_InProgress.store(
true);
659 const auto message =
"Protocol error: received less than expected";
660 bool missing =
false;
662 if (
auto items_locked =
items.GetLock()) {
663 for (
auto& item : *items_locked) {
673 if (missing || reply_item_locked->expected.Cmp<greater>(reply_item_locked->received)) {
674 reply_item_locked->state.AddError(message);
677 reply_item_locked->state.SetComplete();
686 if (
auto items_locked =
items.GetLock()) {
687 for (
auto& item : *items_locked) {
696 reply_item_locked->state.AddError(message, status);
697 reply_item_locked->state.SetComplete();
707 bool was_in_progress =
reply_item->state.InProgress();
709 if (
auto new_items_locked =
new_items.GetLock()) {
710 if (!new_items_locked->empty()) {
711 auto rv = new_items_locked->front();
712 new_items_locked->pop_front();
718 if (!was_in_progress) {
729 items.GetLock()->clear();
735 auto guard = m_ExistingGuard.lock();
740 m_ExistingGuard = guard;
747 full_path(std::move(p)),
769 static const string kPrefix =
"\n\nPSG-Reply-Chunk: ";
779 if (++index ==
kPrefix.size()) {
787 if (
reply->raw && !index) {
794 const auto message =
"Protocol error: prefix mismatch";
796 if (
Retry(message)) {
800 reply->reply_item.GetLock()->state.AddError(message);
807 while (*
data !=
'\n') {
817 const auto& size_str = args.
GetValue(
"size");
818 const auto size = size_str.empty() ? 0ul : stoul(size_str);
842 chunk.append(
data, data_size);
871 return code.empty() ? optional<int>{} : atoi(
code.c_str());
882 auto& reply_item_ts =
reply->reply_item;
885 if (
auto item_locked = reply_item_ts.GetLock()) {
886 if (
auto update_result =
UpdateItem(item_type, *item_locked, args); update_result ==
eRetry503) {
888 }
else if (update_result ==
eNewItem) {
895 reply_item_ts.NotifyOne();
898 if (
auto reply_item_locked = reply_item_ts.GetLock()) {
899 auto& reply_item = *reply_item_locked;
900 ++reply_item.received;
902 if (reply_item.expected.Cmp<
less>(reply_item.received)) {
903 reply_item.state.AddError(
"Protocol error: received more than expected");
907 auto item_id = args.
GetValue(
"item_id");
909 bool to_create = !item_by_id;
912 if (
auto items_locked =
reply->items.GetLock()) {
913 items_locked->emplace_back();
914 item_by_id = &items_locked->back();
918 if (
auto item_locked = item_by_id->GetLock()) {
919 auto update_result =
UpdateItem(item_type, *item_locked, args);
926 item_locked->args = std::move(args);
933 reply->new_items.GetLock()->emplace_back(item_by_id);
936 reply_item_ts.NotifyOne();
940 item_by_id->NotifyOne();
943 reply->queue->NotifyOne();
960 auto n_chunks = args.
GetValue(
"n_chunks");
962 if (!n_chunks.empty()) {
966 item.
state.
AddError(
"Protocol error: contradicting n_chunks");
972 if (
const auto status = get_status(); can_retry_503(status,
"Server returned a meta with status 503")) {
983 static atomic_bool reported(
false);
985 if (!reported.exchange(
true)) {
986 ERR_POST(
"Received unknown chunk type: " << chunk_type.second.get());
989 if (TPSG_FailOnUnknownChunks::GetDefault()) {
990 item.
state.
AddError(
"Protocol error: unknown chunk type '" + chunk_type.second +
'\'');
1002 }
else if (
const auto status = get_status(); can_retry_503(status, chunk.c_str())) {
1011 auto blob_chunk = args.
GetValue(
"blob_chunk");
1012 auto index = blob_chunk.empty() ? 0 : stoul(blob_chunk);
1025 if (item.
chunks.size() <= index) item.
chunks.resize(index + 1);
1027 item.
chunks[index] = std::move(chunk);
1033 item.
state.
AddError(
"Protocol error: received more than expected");
1037 reply->reply_item.GetLock()->state.AddError(
"Protocol error: received more than expected");
1049 #define HTTP_STATUS_HEADER ":status"
1054 template <
class... TNgHttp2Cbs>
1061 TPSG_Https::GetDefault(),
1063 std::forward<TNgHttp2Cbs>(callbacks)...),
1067 {
":method",
"GET" },
1068 {
":scheme", TPSG_Https::GetDefault() ?
"https" :
"http" },
1069 {
":authority", m_Authority },
1072 {
"http_ncbi_sid" },
1073 {
"http_ncbi_phid" },
1075 {
"x-forwarded-for" }
1087 if (
auto [processor_id, req] = it->second.Get(); req) {
1088 auto result = req->OnReplyData(processor_id, (
const char*)
data,
len);
1091 it->second.ResetTime();
1100 req->reply->SetComplete();
1117 reply->debug_printout << retries <<
error << endl;
1140 m_Buffer.
args =
"item_id=1&item_type=unknown&chunk_type=data_and_meta&n_chunks=1"s;
1142 m_Buffer.
args =
"item_id=0&item_type=reply&chunk_type=meta&n_chunks=2"s;
1149 auto context_guard = req->context.Set();
1150 auto rv = req->Fail(processor_id,
error, refused_stream);
1156 req->reply->debug_printout.id <<
"', " <<
error);
1167 if (
auto [processor_id, req] = it->second.Get(); req) {
1168 auto context_guard = req->context.Set();
1169 auto& debug_printout = req->reply->debug_printout;
1170 debug_printout << error_code << endl;
1176 if (
RetryFail(processor_id, req,
error, error_code == NGHTTP2_REFUSED_STREAM)) {
1180 if (req->reply->raw) {
1184 req->OnReplyDone(processor_id)->SetComplete();
1187 debug_printout.id <<
"' successfully");
1200 if ((frame->hd.type == NGHTTP2_HEADERS) && (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) &&
1203 auto stream_id = frame->hd.stream_id;
1204 auto status_str =
reinterpret_cast<const char*
>(
value);
1213 if (
auto [processor_id, req] = it->second.Get(); req) {
1215 req->OnReplyDone(processor_id)->SetFailed(
error, status);
1230 auto context_guard = req->context.Set();
1233 const auto& path = req->full_path;
1234 const auto& session_id =
context.GetSessionID();
1235 const auto& sub_hit_id =
context.GetNextSubHitID();
1237 const auto& client_ip =
context.GetClientIP();
1245 if (!client_ip.empty()) {
1253 if (stream_id < 0) {
1271 template <
class TOnRetry,
class TOnFail>
1274 auto [processor_id, req] =
Get();
1284 if (req->Retry(
error)) {
1292 on_fail(processor_id, req);
1305 auto on_fail = [&](
auto processor_id,
auto req) {
Fail(processor_id, req,
error); };
1308 if (it->second.CheckExpiration(
m_Params,
error, on_retry, on_fail)) {
1318 bool some_requests_failed =
false;
1321 if (
auto [processor_id, req] = pair.second.Get(); req) {
1323 some_requests_failed =
true;
1328 if (some_requests_failed) {
1340 if (error_rate.empty())
return;
1342 string numerator_str, denominator_str;
1344 if (!
NStr::SplitInTwo(error_rate,
"/", numerator_str, denominator_str))
return;
1373 m_Stats(std::move(p)),
1375 m_Timer(this, s_OnTimer, Configured(), 0)
1395 auto stats_locked =
m_Stats.GetLock();
1416 "' reached the maximum number of failures in a row (" <<
params.
max_failures <<
')');
1430 ERR_POST(
Warning <<
"Server '" << address <<
"' is considered bad/overloaded ("
1446 threshold_reg.first.reset();
1456 for (
auto& server : m_Sessions) {
1457 for (
auto& session : server.sessions) {
1465 auto servers_locked = m_Servers.GetLock();
1466 auto& servers = *servers_locked;
1468 for (
auto& server : servers) {
1469 server.throttling.StartClose();
1477 auto servers_locked = m_Servers.GetLock();
1478 auto& servers = *servers_locked;
1480 for (
auto& server : servers) {
1481 server.throttling.FinishClose();
1488 auto servers_locked = m_Servers.GetLock();
1489 auto& servers = *servers_locked;
1492 const auto servers_size = m_Servers->size();
1493 const auto sessions_size = m_Sessions.size();
1495 _ASSERT(servers_size >= sessions_size);
1497 for (
auto new_servers = servers_size - sessions_size; new_servers; --new_servers) {
1498 auto& server = servers[servers_size - new_servers];
1499 m_Sessions.emplace_back().sessions.emplace_back(server, m_Params, m_Queue, handle->loop);
1500 PSG_IO_TRACE(
"Session for server '" << server.address <<
"' was added");
1506 auto available_servers = 0;
1508 for (
auto& server : m_Sessions) {
1509 server.current_rate = server->rate.load();
1511 if (server.current_rate) {
1512 ++available_servers;
1516 auto remaining_submits = m_Params.max_concurrent_submits.Get();
1518 auto i = m_Sessions.begin();
1519 auto [timed_req, processor_id, req] = decltype(m_Queue.Pop()){};
1520 auto d = m_Random.first;
1521 auto request_rate = 0.0;
1522 auto target_rate = 0.0;
1526 auto get_request = [&, &timed_req = timed_req, &processor_id = processor_id, &req = req]() {
1527 tie(timed_req, processor_id, req) = m_Queue.Pop();
1535 target_rate = d(m_Random.second);
1536 _DEBUG_CODE(req_id = req->reply->debug_printout.id;);
1537 PSG_IO_TRACE(
"Ready to submit request '" << req_id <<
'\'');
1541 auto next_server = [&]() {
1542 if (++
i == m_Sessions.end())
i = m_Sessions.begin();
1545 auto ignore_server = [&]() {
1546 if (--available_servers) {
1547 d = uniform_real_distribution<>(0.0, d.max() -
i->current_rate);
1548 i->current_rate = 0.0;
1553 auto find_session = [&]() {
1554 auto s =
i->sessions.begin();
1555 for (; (s !=
i->sessions.end()) && s->IsFull(); ++s);
1556 return make_pair(s !=
i->sessions.end(), s);
1559 while (available_servers && remaining_submits) {
1561 if (!req && !get_request()) {
1566 for (; request_rate +=
i->current_rate, request_rate < target_rate; next_server());
1571 if (server->throttling.Active()) {
1572 PSG_IO_TRACE(
"Server '" << server->address <<
"' is throttled, ignoring");
1576 }
else if (server->available_streams <= 0) {
1577 PSG_IO_TRACE(
"Server '" << server->address <<
"' is at request limit, ignoring");
1581 }
else if (
auto [found, session] = find_session(); !found) {
1582 PSG_IO_TRACE(
"Server '" << server->address <<
"' has no sessions available, ignoring");
1586 }
else if (!session->CanProcessRequest(req)) {
1587 PSG_IO_TRACE(
"Server '" << session->GetId() <<
"' already working/worked on the request, trying to find another one");
1589 req->submitted_by.Reset();
1593 }
else if (!session->ProcessRequest(*std::move(timed_req), processor_id, std::move(req))) {
1594 PSG_IO_TRACE(
"Server '" << session->GetId() <<
"' failed to get request '" << req_id <<
"' with rate = " << target_rate);
1599 PSG_IO_TRACE(
"Server '" << session->GetId() <<
"' got request '" << req_id <<
"' with rate = " << target_rate);
1600 --remaining_submits;
1604 if (session->IsFull() && (distance(session, server.sessions.end()) == 1)) {
1606 PSG_IO_TRACE(
"Server '" << server->address <<
"' reached session limit");
1609 server.sessions.emplace_back(*server, m_Params, m_Queue, handle->loop);
1610 PSG_IO_TRACE(
"Additional session for server '" << server->address <<
"' was added");
1618 m_Queue.Emplace(*std::move(timed_req));
1621 if (!remaining_submits) {
1622 PSG_IO_TRACE(
"Max concurrent submits reached, submitted: " << m_Params.max_concurrent_submits);
1625 PSG_IO_TRACE(
"No sessions available [anymore], submitted: " << m_Params.max_concurrent_submits - remaining_submits);
1631 const auto kRegularRate = nextafter(0.009, 1.0);
1632 const auto kStandbyRate = 0.001;
1634 const auto& service_name = m_Service.GetServiceName();
1635 auto discovered = m_Service();
1637 auto total_preferred_regular_rate = 0.0;
1638 auto total_preferred_standby_rate = 0.0;
1639 auto total_regular_rate = 0.0;
1640 auto total_standby_rate = 0.0;
1643 for (
auto& server : discovered) {
1646 if (server.second >= kRegularRate) {
1647 if (is_server_preferred) {
1648 total_preferred_regular_rate += server.second;
1651 total_regular_rate += server.second;
1653 }
else if (server.second >= kStandbyRate) {
1654 if (is_server_preferred) {
1655 total_preferred_standby_rate += server.second;
1658 total_standby_rate += server.second;
1662 if (m_NoServers(total_regular_rate || total_standby_rate, SUv_Timer::GetThat<SUv_Timer>(handle))) {
1663 ERR_POST(
"No servers in service '" << service_name <<
'\'');
1667 const auto localhost_preference = TPSG_LocalhostPreference::GetDefault();
1668 const auto total_preferred_standby_percentage = localhost_preference ? 1.0 - 1.0 / localhost_preference : 0.0;
1669 const auto have_any_regular_servers = total_regular_rate > 0.0;
1670 const auto have_nonpreferred_regular_servers = total_regular_rate > total_preferred_regular_rate;
1671 const auto have_nonpreferred_standby_servers = total_standby_rate > total_preferred_standby_rate;
1672 const auto regular_rate_adjustment =
1673 (localhost_preference <= 1) ||
1674 (total_preferred_regular_rate != 0.0) ||
1675 (total_preferred_standby_rate == 0.0) ||
1676 (!have_nonpreferred_regular_servers && !have_nonpreferred_standby_servers);
1677 auto rate_total = 0.0;
1680 for (
auto& server : discovered) {
1682 const auto old_rate = server.second;
1684 if (server.second >= kRegularRate) {
1685 if (is_server_preferred) {
1686 if (regular_rate_adjustment) {
1687 server.second *= localhost_preference;
1688 }
else if (have_nonpreferred_regular_servers) {
1689 server.second = 0.0;
1690 }
else if (have_nonpreferred_standby_servers) {
1691 server.second = 0.0;
1694 if (regular_rate_adjustment) {
1696 }
else if (have_nonpreferred_regular_servers) {
1697 server.second *= (1 - total_preferred_standby_percentage) / total_regular_rate;
1698 }
else if (have_nonpreferred_standby_servers) {
1699 server.second = 0.0;
1702 }
else if (server.second >= kStandbyRate) {
1703 if (is_server_preferred) {
1704 if (regular_rate_adjustment && have_any_regular_servers) {
1705 server.second = 0.0;
1706 }
else if (regular_rate_adjustment) {
1707 server.second *= localhost_preference;
1708 }
else if (have_nonpreferred_regular_servers) {
1709 server.second *= total_preferred_standby_percentage / total_preferred_standby_rate;
1710 }
else if (have_nonpreferred_standby_servers) {
1711 server.second *= total_preferred_standby_percentage / total_preferred_standby_rate;
1714 if (regular_rate_adjustment && have_any_regular_servers && (localhost_preference || have_nonpreferred_regular_servers)) {
1715 server.second = 0.0;
1716 }
else if (regular_rate_adjustment) {
1718 }
else if (have_nonpreferred_regular_servers) {
1719 server.second = 0.0;
1720 }
else if (have_nonpreferred_standby_servers) {
1721 server.second *= (1 - total_preferred_standby_percentage) / (total_standby_rate - total_preferred_standby_rate);
1726 rate_total += server.second;
1728 if (old_rate != server.second) {
1729 PSG_DISCOVERY_TRACE(
"Rate for '" << server.first <<
"' adjusted from " << old_rate <<
" to " << server.second);
1733 auto servers_locked = m_Servers.GetLock();
1734 auto& servers = *servers_locked;
1737 for (
auto& server : servers) {
1739 auto it = find_if(discovered.begin(), discovered.end(), address_same);
1743 PSG_DISCOVERY_TRACE(
"Server '" << server.address <<
"' disabled in service '" << service_name <<
'\'');
1746 server.throttling.Discovered();
1747 auto rate = it->second / rate_total;
1749 if (server.rate != rate) {
1752 (server.rate ?
"' updated in service '" :
"' enabled in service '" ) <<
1753 service_name <<
"' with rate = " << rate);
1764 for (
auto& server : discovered) {
1766 auto rate = server.second / rate_total;
1767 servers.emplace_back(server.first, rate, m_Params.max_concurrent_requests_per_server, m_ThrottleParams, handle->loop);
1770 service_name <<
"' with rate = " << rate);
1774 m_QueuesRef.SignalAll();
1779 if (m_Servers->fail_requests) {
1782 CheckRequestExpiration();
1785 for (
auto& server : m_Sessions) {
1786 for (
auto& session : server.sessions) {
1787 session.CheckRequestExpiration();
1794 auto queue_locked = m_Queue.GetLockedQueue();
1795 list<SPSG_TimedRequest> retries;
1798 auto on_retry = [&](
auto req) { retries.emplace_back(req); m_Queue.Signal(); };
1799 auto on_fail = [&](
auto processor_id,
auto req) { req->Fail(processor_id,
error); };
1801 for (
auto it = queue_locked->begin(); it != queue_locked->end(); ) {
1802 if (it->CheckExpiration(m_Params,
error, on_retry, on_fail)) {
1803 it = queue_locked->erase(it);
1809 queue_locked->splice(queue_locked->end(), retries);
1814 auto queue_locked = m_Queue.GetLockedQueue();
1817 for (
auto& timed_req : *queue_locked) {
1818 if (
auto [processor_id, req] = timed_req.Get(); req) {
1819 auto context_guard = req->context.Set();
1820 auto& debug_printout = req->reply->debug_printout;
1821 debug_printout <<
error << endl;
1822 req->OnReplyDone(processor_id)->SetFailed(
error);
1823 PSG_IO_TRACE(
"No servers to process request '" << debug_printout.id <<
'\'');
1827 queue_locked->clear();
1832 m_Queue.Init(
this, &loop, s_OnQueue);
1843 m_RetryDelay(
SecondsToMs(TPSG_NoServersRetryDelay::GetDefault())),
1844 m_Timeout(
SecondsToMs((params.request_timeout + params.competitive_after * params.request_retries) * params.io_timer_period)),
1845 m_FailRequests(const_cast<atomic_bool&>(servers->fail_requests))
1862 const auto timeout_expired = m_Passed >= m_Timeout;
1863 m_FailRequests = timeout_expired;
1867 }
else if (!timeout_expired) {
1881 if (TPSG_Stats::GetDefault()) {
1882 return make_shared<SPSG_Stats>(servers);
1895 m_StartBarrier(
TPSG_NumIo::GetDefault() + 2),
1898 m_RequestCounter(0),
1907 }
catch (
const std::system_error& e) {
1908 ERR_POST(
Fatal <<
"Failed to create I/O threads: " << e.what());
1919 for (
auto& io :
m_Io) {
1926 if (
m_Io.size() == 0) {
1931 m_Queues[idx].Emplace(std::move(req));
@ eEndOfReply
No more items expected in the (overall!) reply.
bool IsSingleServer() const
pair< SSocketAddress, double > TServer
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
static const char * expected[]
#define _DEBUG_CODE(code)
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
static string GetStdStatusMessage(ECode code)
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
EDiagSev
Severity level for the posted diagnostics.
@ eDiag_Trace
Trace message.
@ eDiag_Info
Informational message.
@ eDiag_Error
Error message.
@ eDiag_Warning
Warning message.
@ eDiag_Fatal
Fatal error – guarantees exit(or abort)
@ eDiag_Critical
Critical error message.
@ e503_ServiceUnavailable
@ e451_Unavailable_For_Legal_Reasons
void Warning(CExceptionArgs_Base &args)
void Fatal(CExceptionArgs_Base &args)
const CSeq_id & GetId(const CSeq_loc &loc, CScope *scope)
If all CSeq_ids embedded in CSeq_loc refer to the same CBioseq, returns the first CSeq_id found,...
@ eParam_Default
Default flags.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
static unsigned int GetLocalHostAddress(ESwitch reget=eDefault)
Local host address in network byte order (cached for faster retrieval)
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
static string URLDecode(const CTempString str, EUrlDecode flag=eUrlDec_All)
URL-decode string.
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
static string URLEncode(const CTempString str, EUrlEncode flag=eUrlEnc_SkipMarkChars)
URL-encode string.
@ fAllowTrailingSpaces
Ignore trailing whitespace characters.
@ fConvErr_NoThrow
Do not throw an exception on error.
@ fAllowLeadingSpaces
Ignore leading whitespace characters in converted string.
string GetQueryString(EAmpEncoding amp_enc, NStr::EUrlEncode encode) const
Construct and return complete query string.
@ eAmp_Char
Use & to separate arguments.
where both of them are integers Note
NCBI_PARAM_TYPE(PSG, throttle_by_connection_error_rate) TPSG_ThrottleThreshold
NCBI_PARAM_TYPE(PSG, throttle_relaxation_period) TPSG_ThrottlePeriod
const TYPE & Get(const CNamedParameterList *param)
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::CREATED created
const GenericPointer< typename T::ValueType > T2 value
int strcmp(const char *str1, const char *str2)
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
static bool less(const CSeq_feat *A, const CSeq_feat *B)
EPSG_Status
Retrieval result.
@ eSuccess
Successfully retrieved.
@ eInProgress
Retrieval is not finalized yet, more info may come.
@ eForbidden
User is not authorized for the retrieval.
@ eCanceled
Request canceled.
@ eError
An error was encountered while trying to send request or to read and to process the reply.
uint64_t s_GetStatsPeriod()
NCBI_PARAM_ENUM_DEF(EPSG_DebugPrintout, PSG, debug_printout, EPSG_DebugPrintout::eNone)
auto s_GetCode(const string &code)
NCBI_PARAM_DEF(double, PSG, request_timeout, 10.0)
#define HTTP_STATUS_HEADER
EPSG_StatsCountersRetries
@ ePSG_StatsCountersRetries_Retry
@ ePSG_StatsCountersRetries_Timeout
uint64_t s_GetDiscoveryRepeat(const CServiceDiscovery &service)
EDiagSev s_GetSeverity(const string &severity)
PSG_PARAM_VALUE_DEF_MIN(unsigned, PSG, rd_buf_size, 64 *1024, 1024)
shared_ptr< SPSG_Stats > s_GetStats(SPSG_Servers::TTS &servers)
SPSG_IoCoordinator.
NCBI_PARAM_DEF_EX(string, PSG, service, "PSG2", eParam_Default, NCBI_PSG_SERVICE)
NCBI_PARAM_ENUM_ARRAY(EPSG_DebugPrintout, PSG, debug_printout)
#define PSG_IO_SESSION_TRACE(message)
#define PSG_THROTTLING_TRACE(message)
#define PSG_IO_TRACE(message)
#define PSG_DISCOVERY_TRACE(message)
uint64_t SecondsToMs(double seconds)
Defines CRequestStatus class for NCBI C++ diagnostic API.
static SLJIT_INLINE sljit_ins ms(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static SLJIT_INLINE sljit_ins l(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
void Print(SSocketAddress address, const string &path, const string &sid, const string &phid, const string &ip, SUv_Tcp::TPort port)
int32_t Submit(const nghttp2_nv *nva, size_t nvlen, nghttp2_data_provider *data_prd=nullptr)
void Emplace(TArgs &&... args)
void NotifyOne() volatile
bool WaitUntil(TArgs &&... args) volatile
SNoServers(const SPSG_Params ¶ms, SPSG_Servers::TTS &servers)
bool operator()(bool discovered, SUv_Timer *timer)
void OnShutdown(uv_async_t *)
void OnTimer(uv_timer_t *handle)
SPSG_Servers::TTS & m_Servers
string GetCookie(const string &name) const
bool AddRequest(shared_ptr< SPSG_Request > req, const atomic_bool &stopped, const CDeadline &deadline)
SPSG_Servers::TTS m_Servers
SUv_Barrier m_StartBarrier
SPSG_IoCoordinator(CServiceDiscovery service)
atomic< size_t > m_RequestCounter
SUv_Barrier m_StopBarrier
TPSG_AsyncQueues m_Queues
SPSG_Thread< SPSG_DiscoveryImpl > m_Discovery
vector< unique_ptr< SPSG_Thread< SPSG_IoImpl > > > m_Io
void OnTimer(uv_timer_t *handle)
void OnQueue(uv_async_t *handle)
void CheckRequestExpiration()
void OnShutdown(uv_async_t *handle)
SPSG_IoImpl.
void OnExecute(uv_loop_t &loop)
void AddNewServers(uv_async_t *handle)
int OnStreamClose(nghttp2_session *session, int32_t stream_id, uint32_t error_code)
bool Fail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)
bool RetryFail(SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req, const SUvNgHttp2_Error &error, bool refused_stream=false)
SPSG_AsyncQueue & m_Queue
SPSG_Requests< SPSG_IoSession > m_Requests
SPSG_Submitter::TId GetInternalId() const
SPSG_IoSession(SPSG_Server &s, const SPSG_Params ¶ms, SPSG_AsyncQueue &queue, uv_loop_t *loop, TNgHttp2Cbs &&... callbacks)
SPSG_IoSession.
void CheckRequestExpiration()
bool ProcessRequest(SPSG_TimedRequest timed_req, SPSG_Processor::TId processor_id, shared_ptr< SPSG_Request > req)
int OnHeader(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags)
void OnReset(SUvNgHttp2_Error error) override
array< SNgHttp2_Header< NGHTTP2_NV_FLAG_NO_COPY_NAME >, eSize > m_Headers
int OnData(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len)
static TValue GetDefault()
TPSG_RequestsPerIo requests_per_io
string GetCookie(function< string()> get_auth_token)
static unsigned s_GetCompetitiveAfter(double io_timer_period, double timeout)
const unsigned competitive_after
static unsigned s_GetRequestTimeout(double io_timer_period)
TPSG_IoTimerPeriod io_timer_period
const unsigned request_timeout
SPSG_Nullable< size_t > expected
vector< SPSG_Chunk > chunks
void SetComplete() volatile
const volatile atomic_bool & InProgress() const volatile
void AddMessage(string message, EDiagSev severity, optional< int > code)
void SetStatus(EPSG_Status status, bool reset) volatile
void AddError(string message, EPSG_Status status=EPSG_Status::eError, EDiagSev severity=eDiag_Error, optional< int > code=nullopt)
deque< SPSG_Message > m_Messages
SPSG_Message GetMessage(EDiagSev min_severity)
static EPSG_Status FromRequestStatus(int status)
shared_ptr< TPSG_Queue > queue
void SetFailed(string message, EPSG_Status status=EPSG_Status::eError)
SThreadSafe< list< SItem::TTS * > > new_items
SThreadSafe< list< SItem::TTS > > items
optional< SItem::TTS * > GetNextItem(CDeadline deadline)
unsigned GetRetries(SPSG_Retries::EType type, bool refused_stream)
unordered_map< string, SPSG_Reply::SItem::TTS * > m_ItemsByID
SPSG_Processor processed_by
bool Retry(const SUvNgHttp2_Error &error, bool refused_stream=false)
EStateResult StateData(const char *&data, size_t &len)
shared_ptr< SPSG_Reply > reply
SPSG_Request(string p, shared_ptr< SPSG_Reply > r, CRef< CRequestContext > c, const SPSG_Params ¶ms)
bool Fail(SPSG_Processor::TId processor_id, const SUvNgHttp2_Error &error, bool refused_stream=false)
EStateResult StatePrefix(const char *&data, size_t &len)
auto & OnReplyDone(SPSG_Processor::TId processor_id)
EUpdateResult UpdateItem(SPSG_Args::EItemType item_type, SPSG_Reply::SItem &item, const SPSG_Args &args)
EStateResult StateArgs(const char *&data, size_t &len)
auto emplace(TArgs &&... args)
SPSG_Throttling throttling
const SSocketAddress address
vector< pair< atomic_uint64_t, atomic_uint > > m_Data
void Report(const char *prefix, unsigned report)
static const char * GetName(EAvgTime avg_time)
static const char * ValueName(type value)
static const char * ValueName(type value)
static const char * ValueName(type value)
static const char * ValueName(type value)
static const char * ValueName(type value)
static const char * ValueName(type value)
static void Func(TData &data)
static void Func(const TData &data, const char *prefix, unsigned report)
void Apply(EGroup start_with, TArgs &&... args)
void Report(TArgs &&... args)
vector< vector< atomic_uint > > TData
void Report(const char *prefix, unsigned report, const char *name)
SData< CPSG_BlobId > m_Blobs
void Report(const char *prefix, unsigned report)
SThreadSafe< unordered_set< string > > m_TSEs
SData< CPSG_ChunkId > m_Chunks
SPSG_Servers::TTS & m_Servers
SPSG_Stats(SPSG_Servers::TTS &servers)
SThreshold(string error_rate)
SPSG_ThrottleParams.
constexpr static size_t kMaxDenominator
TPSG_ThrottleUntilDiscovery until_discovery
const volatile uint64_t period
TPSG_ThrottleMaxFailures max_failures
pair< bitset< SPSG_ThrottleParams::SThreshold::kMaxDenominator >, size_t > threshold_reg
bool Adjust(const SSocketAddress &address, bool result)
SPSG_ThrottleParams params
SPSG_Throttling(const SSocketAddress &address, SPSG_ThrottleParams p, uv_loop_t *l)
SPSG_Throttling.
atomic< EThrottling > m_Active
SThreadSafe< SStats > m_Stats
static void s_OnSignal(uv_async_t *handle)
const SSocketAddress & m_Address
bool CheckExpiration(const SPSG_Params ¶ms, const SUvNgHttp2_Error &error, TOnRetry on_retry, TOnFail on_fail)
static SUvNgHttp2_Error FromNgHttp2(T e, const char *w)
static const char * NgHttp2Str(T e)
void Reset(SUvNgHttp2_Error error, SUv_Tcp::ECloseType close_type=SUv_Tcp::eCloseReset)
SNgHttp2_Session m_Session
pair< string, string > TCred
static const string & Get()
void Init(void *d, uv_loop_t *l, uv_async_cb cb)
TPort GetLocalPort() const
uint64_t GetDefaultRepeat() const
void SetRepeat(uint64_t r)
int g(Seg_Gsm *spe, Seq_Mtf *psm, Thd_Gsm *tdg)
static CS_CONTEXT * context