48 template <
class TBase>
50 TBase(std::move(other)),
62 template <
class TBase>
67 case eStart: m_Start.~TStart();
break;
68 case eData: m_Data.~TH2S_Data();
break;
74 template <
class TBase>
79 case eData:
return "data ";
80 case eEof:
return "eof ";
81 case eError:
return "error ";
89 m_UpdateResponse(update_response),
90 m_ResponseQueue(std::move(response_queue))
94 Push(std::move(request));
107 const auto begin =
static_cast<const char*
>(
buf);
111 if (bytes_written) *bytes_written =
count;
169 memcpy(
buf,
data.data(), copied);
177 if (bytes_read) *bytes_read = copied;
193 if (queue_locked->empty()) {
199 queue_locked.Unlock();
200 return (this->*member)(incoming);
269 return NGHTTP2_ERR_DEFERRED;
273 *data_flags = NGHTTP2_DATA_FLAG_EOF;
279 auto copied =
min(length, chunk.size() -
sent);
281 memcpy(
buf, chunk.data() +
sent, copied);
296 template <
class... TNgHttp2Cbs>
305 std::forward<TNgHttp2Cbs>(callbacks)...,
307 m_SessionsByQueues(sessions_by_queues)
326 default:
return "UNKNOWN";
332 auto& request =
event.GetStart();
333 auto& url = request.url;
334 auto scheme = url.GetScheme();
335 auto query_string = url.GetOriginalArgsString();
336 auto abs_path_ref = url.GetPath();
338 if (!query_string.empty()) {
339 abs_path_ref +=
"?" + query_string;
342 vector<THeader> nghttp2_headers{{
344 {
":scheme", scheme },
346 {
":path", abs_path_ref },
351 nghttp2_headers.reserve(request.headers.size() + nghttp2_headers.size());
353 for (
const auto& p : request.headers) {
354 for (
const auto& v : p.second) {
355 nghttp2_headers.emplace_back(p.first, v);
359 auto& response_queue =
event.response_queue;
363 nghttp2_data_provider data_prd;
364 data_prd.source.ptr = &*it;
367 it->stream_id =
m_Session.
Submit(nghttp2_headers.data(), nghttp2_headers.size(), &data_prd);
369 if (it->stream_id < 0) {
382 template <
class TFunc>
385 auto& response_queue =
event.response_queue;
386 auto it =
Find(response_queue);
391 if (it->in_progress) {
396 it->in_progress =
true;
402 H2S_SESSION_TRACE(
this <<
'/' << response_queue <<
" fail to resume for " << event);
408 auto it =
Find(stream_id);
411 auto begin =
reinterpret_cast<const char*
>(
data);
425 auto it =
Find(stream_id);
428 auto response_queue = it->response_queue;
443 if ((frame->hd.type == NGHTTP2_HEADERS) && (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)) {
444 auto stream_id = frame->hd.stream_id;
445 auto it =
Find(stream_id);
448 string n(
reinterpret_cast<const char*
>(name), namelen);
449 string v(
reinterpret_cast<const char*
>(
value), valuelen);
450 auto& headers = it->headers;
451 auto hit = headers.find(
n);
453 if (hit == headers.end()) {
454 headers.emplace(piecewise_construct, forward_as_tuple(std::move(
n)), forward_as_tuple(1, std::move(v)));
456 hit->second.emplace_back(std::move(v));
467 auto& response_queue = stream.response_queue;
479 const bool is_headers_frame = (frame->hd.type == NGHTTP2_HEADERS) && (frame->headers.cat == NGHTTP2_HCAT_RESPONSE);
480 const bool is_data_frame = frame->hd.type == NGHTTP2_DATA;
481 const bool is_eof = frame->hd.flags & NGHTTP2_FLAG_END_STREAM;
483 if (is_headers_frame || (is_data_frame && is_eof)) {
484 auto stream_id = frame->hd.stream_id;
485 auto it =
Find(stream_id);
488 auto& response_queue = it->response_queue;
490 if (is_headers_frame) {
491 Push(response_queue, std::move(it->headers));
511 session.second.Reset(
"Shutdown is in progress");
524 auto queue_locked = request_queue.
GetLock();
526 if (queue_locked->empty()) {
532 queue_locked.Unlock();
534 auto response_queue = outgoing.response_queue;
542 auto& request = outgoing.
GetStart();
545 if (new_session->Request(std::move(outgoing))) {
550 H2S_IOC_TRACE(response_queue <<
" pop unexpected " << outgoing);
562 if (session->second.get().Event(outgoing,
l)) {
566 H2S_IOC_TRACE(response_queue <<
" pop unexpected " << outgoing);
577 if (session->second.get().Event(outgoing,
l)) {
581 H2S_IOC_TRACE(response_queue <<
" pop unexpected " << outgoing);
595 if (
auto queue = response_queue.lock()) {
598 queue->GetLock()->emplace(std::move(event));
607 const auto& url = request.
url;
609 auto port = url.GetPort();
612 if (scheme ==
"http") {
614 }
else if (scheme ==
"https") {
623 auto https = scheme ==
"https" || (scheme.empty() && (port ==
"443"));
626 for (
auto it =
range.first; it !=
range.second; ++it) {
627 if (!it->second.IsFull()) {
639 m_ApiLock(GetApiLock())
651 auto status = headers.
find(
":status");
653 if (status != headers.
end()) {
654 status_code = stoi(status->second.front());
655 headers.
erase(status);
671 auto response_queue = make_shared<TH2S_ResponseQueue>();
674 SUvNgHttp2_Tls::TCred cred(req_cred ? req_cred->GetCert() :
string(), req_cred ? req_cred->GetPKey() :
string());
679 unique_ptr<IReaderWriter> rw(
new SH2S_ReaderWriter(std::move(update_response), std::move(response_queue), std::move(request)));
bool x_Downgrade(CHttpResponse &resp, CHttpSession_Base::EProtocol &protocol) const override
static void UpdateResponse(CHttpRequest &req, CHttpHeaders::THeaders headers)
shared_ptr< void > TApiLock
Get an API lock.
void x_StartRequest(CHttpSession_Base::EProtocol protocol, CHttpRequest &req, bool use_form_data) override
static TApiLock GetApiLock()
HTTP session class, holding common data for multiple requests.
const_iterator end() const
const_iterator find(const key_type &key) const
const_iterator_pair equal_range(const key_type &key) const
void x_InitConnection2(shared_ptr< iostream > stream)
const THeaders & Get() const
int GetStatusCode(void) const
Get response status code.
void x_InitConnection(bool use_form_data)
void x_UpdateResponse(CHttpHeaders::THeaders headers, int status_code, string status_text)
CRef< CHttpHeaders > m_Headers
shared_ptr< CTlsCertCredentials > m_Credentials
void x_AdjustHeaders(bool use_form_data)
EProtocol
HTTP protocol version.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
ERW_Result
Result codes for I/O operations.
@ eRW_Eof
End of data, should be considered permanent.
@ eRW_Error
Unrecoverable error, no retry possible.
@ eRW_Success
Everything is okay, I/O completed.
const string & GetScheme(void) const
const TYPE & Get(const CNamedParameterList *param)
range(_Ty, _Ty) -> range< _Ty >
const GenericPointer< typename T::ValueType > T2 value
constexpr uint32_t kMaxStreams
THeader::SConvert s_GetMethodName(TReqMethod method)
constexpr size_t kReadBufSize
constexpr size_t kWriteBufSize
#define H2S_SESSION_TRACE(message)
#define H2S_IOC_TRACE(message)
SH2S_Event< SH2S_Request > TH2S_RequestEvent
#define H2S_RW_TRACE(message)
@ eError
An error was encountered while trying to send request or to read and to process the reply.
Reader-writer based streams.
static SLJIT_INLINE sljit_ins l(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
const char * GetTypeName() const
typename TBase::SStart TStart
SH2S_Event(TStart start, TArgs &&... args)
SH2S_Session * NewSession(const SH2S_Request::SStart &request)
void Process(TH2S_RequestQueue &request_queue)
multimap< SH2S_Session::TAddrNCred, SUvNgHttp2_Session< SH2S_Session > > m_Sessions
TH2S_SessionsByQueues m_SessionsByQueues
TH2S_WeakResponseQueue response_queue
queue< TH2S_Data > pending
ssize_t DataSourceRead(void *session, uint8_t *buf, size_t length, uint32_t *data_flags)
static shared_ptr< SH2S_Io > GetInstance()
TUpdateResponse m_UpdateResponse
ERW_Result Receive(ERW_Result(SH2S_ReaderWriter::*member)(TH2S_ResponseEvent &))
ERW_Result ReadImpl(void *buf, size_t count, size_t *bytes_read)
ERW_Result ReceiveResponse(TH2S_ResponseEvent &incoming)
ERW_Result Write(const void *buf, size_t count, size_t *bytes_written=0) override
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
ERW_Result Flush() override
Flush pending data (if any) down to the output device.
ERW_Result PendingCountImpl(size_t *count)
shared_ptr< TH2S_ResponseQueue > m_ResponseQueue
function< void(CHttpHeaders::THeaders)> TUpdateResponse
ERW_Result ReadFsm(function< ERW_Result()> impl)
void Push(TH2S_RequestEvent event)
SH2S_ReaderWriter(TUpdateResponse update_response, shared_ptr< TH2S_ResponseQueue > response_queue, TH2S_RequestEvent request)
ERW_Result ReceiveData(TH2S_ResponseEvent &incoming)
SStart(EReqMethod m, CUrl u, SUvNgHttp2_Tls::TCred c, CHttpHeaders::THeaders h)
SUvNgHttp2_Tls::TCred cred
void OnReset(SUvNgHttp2_Error error) override
static ssize_t s_DataSourceRead(nghttp2_session *, int32_t, uint8_t *buf, size_t length, uint32_t *data_flags, nghttp2_data_source *source, void *user_data)
int OnFrameRecv(nghttp2_session *session, const nghttp2_frame *frame)
TH2S_SessionsByQueues & m_SessionsByQueues
map< TH2S_WeakResponseQueue, TStreams::iterator, owner_less< weak_ptr< TH2S_ResponseQueue > > > m_StreamsByQueues
bool Event(TH2S_RequestEvent &event, TFunc f)
int OnStreamClose(nghttp2_session *session, int32_t stream_id, uint32_t error_code)
unordered_map< int32_t, TStreams::iterator > m_StreamsByIds
SH2S_Session(uv_loop_t *loop, const TAddrNCred &addr_n_cred, bool https, TH2S_SessionsByQueues &sessions_by_queues, TNgHttp2Cbs &&... callbacks)
bool Request(TH2S_RequestEvent request)
int OnData(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len)
int OnHeader(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags)
TStreams::iterator Find(int32_t stream_id)
void Push(TH2S_WeakResponseQueue &response_queue, TH2S_ResponseEvent event)
int32_t Submit(const nghttp2_nv *nva, size_t nvlen, nghttp2_data_provider *data_prd=nullptr)
int Resume(int32_t stream_id)
static const char * NgHttp2Str(T e)
SNgHttp2_Session m_Session
pair< string, string > TCred
static const string & Get()
void Run(uv_run_mode mode=UV_RUN_DEFAULT)