NCBI C++ ToolKit
ncbi_http2_session.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: ncbi_http2_session.cpp 101227 2023-11-16 19:48:12Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Rafael Sadyrov
27  *
28  */
29 
30 #include <ncbi_pch.hpp>
31 
33 
34 #include <corelib/rwstream.hpp>
35 
36 
38 
39 
41  method(m),
42  url(std::move(u)),
43  cred(std::move(c)),
44  headers(std::move(h))
45 {
46 }
47 
48 template <class TBase>
50  TBase(std::move(other)),
51  m_Type(other.m_Type)
52 {
53  switch (m_Type)
54  {
55  case eStart: new(&m_Start) TStart(std::move(other.m_Start)); break;
56  case eData: new(&m_Data) TH2S_Data(std::move(other.m_Data)); break;
57  case eEof: break;
58  case eError: break;
59  }
60 }
61 
62 template <class TBase>
64 {
65  switch (m_Type)
66  {
67  case eStart: m_Start.~TStart(); break;
68  case eData: m_Data.~TH2S_Data(); break;
69  case eEof: break;
70  case eError: break;
71  }
72 }
73 
74 template <class TBase>
75 const char* SH2S_Event<TBase>::GetTypeName() const
76 {
77  switch (m_Type) {
78  case eStart: return "";
79  case eData: return "data ";
80  case eEof: return "eof ";
81  case eError: return "error ";
82  }
83 
84  return nullptr;
85 }
86 
87 SH2S_ReaderWriter::SH2S_ReaderWriter(TUpdateResponse update_response, shared_ptr<TH2S_ResponseQueue> response_queue, TH2S_RequestEvent request) :
88  m_Io(SH2S_Io::GetInstance()),
89  m_UpdateResponse(update_response),
90  m_ResponseQueue(std::move(response_queue))
91 {
93 
94  Push(std::move(request));
95 
96  Process();
97 }
98 
99 ERW_Result SH2S_ReaderWriter::Write(const void* buf, size_t count, size_t* bytes_written)
100 {
101  if (m_State != eWriting) {
102  return eRW_Error;
103  }
104 
105  // No need to send empty data
106  if (count) {
107  const auto begin = static_cast<const char*>(buf);
108  m_OutgoingData.insert(m_OutgoingData.end(), begin, begin + count);
109  }
110 
111  if (bytes_written) *bytes_written = count;
112  return eRW_Success;
113 }
114 
116 {
117  if (m_State != eWriting) {
118  return eRW_Error;
119  }
120 
121  if (!m_OutgoingData.empty()) {
123 
124  Process();
125  }
126 
127  return eRW_Success;
128 }
129 
131 {
132  auto rv = eRW_Success;
133 
134  do {
135  switch (m_State) {
136  case eWriting:
138  m_State = eWaiting;
139  /* FALL THROUGH */
140 
141  case eWaiting:
143  break;
144 
145  case eReading:
146  if (m_IncomingData.empty()) {
148  break;
149  }
150 
151  return impl();
152 
153  case eEof:
154  return eRW_Eof;
155 
156  case eError:
157  return eRW_Error;
158  }
159  }
160  while (rv == eRW_Success);
161 
162  return rv;
163 }
164 
165 ERW_Result SH2S_ReaderWriter::ReadImpl(void* buf, size_t count, size_t* bytes_read)
166 {
167  auto& data = m_IncomingData;
168  const auto copied = min(count, data.size());
169  memcpy(buf, data.data(), copied);
170 
171  if (count < data.size()) {
172  data.erase(data.begin(), data.begin() + copied);
173  } else {
174  m_IncomingData.clear();
175  }
176 
177  if (bytes_read) *bytes_read = copied;
178  return eRW_Success;
179 }
180 
182 {
183  if (count) *count = m_IncomingData.size();
184  return eRW_Success;
185 }
186 
188 {
189  Process();
190 
191  auto queue_locked = m_ResponseQueue->GetLock();
192 
193  if (queue_locked->empty()) {
194  return eRW_Success;
195  }
196 
197  TH2S_ResponseEvent incoming(std::move(queue_locked->front()));
198  queue_locked->pop();
199  queue_locked.Unlock();
200  return (this->*member)(incoming);
201 }
202 
204 {
206 
207  switch (incoming.GetType()) {
209  H2S_RW_TRACE(m_ResponseQueue.get() << " pop " << incoming);
210  m_IncomingData = std::move(incoming.GetData());
211  return eRW_Success;
212 
214  H2S_RW_TRACE(m_ResponseQueue.get() << " pop unexpected " << incoming);
215  break;
216 
218  m_State = eEof;
219  H2S_RW_TRACE(m_ResponseQueue.get() << " pop " << incoming);
220  return eRW_Eof;
221 
223  H2S_RW_TRACE(m_ResponseQueue.get() << " pop " << incoming);
224  break;
225  }
226 
227  m_State = eError;
228  return eRW_Error;
229 }
230 
232 {
234 
235  switch (incoming.GetType()) {
237  H2S_RW_TRACE(m_ResponseQueue.get() << " pop " << incoming);
238  m_State = eReading;
239  m_UpdateResponse(std::move(incoming.GetStart()));
240  return eRW_Success;
241 
243  H2S_RW_TRACE(m_ResponseQueue.get() << " pop unexpected " << incoming);
244  break;
245 
247  H2S_RW_TRACE(m_ResponseQueue.get() << " pop unexpected " << incoming);
248  break;
249 
251  H2S_RW_TRACE(m_ResponseQueue.get() << " pop " << incoming);
252  break;
253  }
254 
255  m_State = eError;
256  return eRW_Error;
257 }
258 
259 ssize_t SH2S_IoStream::DataSourceRead(void* session, uint8_t* buf, size_t length, uint32_t* data_flags)
260 {
261  _ASSERT(buf);
262  _ASSERT(data_flags);
263 
264  for (;;) {
265  if (pending.empty()) {
266  if (!eof) {
267  H2S_SESSION_TRACE(session << '/' << response_queue << " outgoing data deferred");
268  in_progress = false;
269  return NGHTTP2_ERR_DEFERRED;
270  }
271 
272  H2S_SESSION_TRACE(session << '/' << response_queue << " outgoing data EOF");
273  *data_flags = NGHTTP2_DATA_FLAG_EOF;
274  return 0;
275  }
276 
277  if (pending.front().size() > sent) {
278  auto& chunk = pending.front();
279  auto copied = min(length, chunk.size() - sent);
280 
281  memcpy(buf, chunk.data() + sent, copied);
282  sent += copied;
283  H2S_SESSION_TRACE(session << '/' << response_queue << " outgoing data copy " << copied);
284  return copied;
285  }
286 
287  sent = 0;
288  pending.pop();
289  }
290 }
291 
292 constexpr size_t kReadBufSize = 64 * 1024;
293 constexpr size_t kWriteBufSize = 64 * 1024;
294 constexpr uint32_t kMaxStreams = 200;
295 
296 template <class... TNgHttp2Cbs>
297 SH2S_Session::SH2S_Session(uv_loop_t* loop, const TAddrNCred& addr_n_cred, bool https, TH2S_SessionsByQueues& sessions_by_queues, TNgHttp2Cbs&&... callbacks) :
299  loop,
300  addr_n_cred,
301  kReadBufSize,
303  https,
304  kMaxStreams,
305  std::forward<TNgHttp2Cbs>(callbacks)...,
306  s_OnFrameRecv),
307  m_SessionsByQueues(sessions_by_queues)
308 {
309 }
310 
312 
314 {
315  switch (method & ~eReqMethod_v1) {
316  case eReqMethod_Any: return "ANY";
317  case eReqMethod_Get: return "GET";
318  case eReqMethod_Post: return "POST";
319  case eReqMethod_Head: return "HEAD";
320  case eReqMethod_Connect: return "CONNECT";
321  case eReqMethod_Put: return "PUT";
322  case eReqMethod_Patch: return "PATCH";
323  case eReqMethod_Trace: return "TRACE";
324  case eReqMethod_Delete: return "DELETE";
325  case eReqMethod_Options: return "OPTIONS";
326  default: return "UNKNOWN";
327  }
328 }
329 
331 {
332  auto& request = event.GetStart();
333  auto& url = request.url;
334  auto scheme = url.GetScheme();
335  auto query_string = url.GetOriginalArgsString();
336  auto abs_path_ref = url.GetPath();
337 
338  if (!query_string.empty()) {
339  abs_path_ref += "?" + query_string;
340  }
341 
342  vector<THeader> nghttp2_headers{{
343  { ":method", s_GetMethodName(request.method) },
344  { ":scheme", scheme },
345  { ":authority", m_Authority },
346  { ":path", abs_path_ref },
347  { "user-agent", SUvNgHttp2_UserAgent::Get() },
348  }};
349 
350  // This is not precise but at least something
351  nghttp2_headers.reserve(request.headers.size() + nghttp2_headers.size());
352 
353  for (const auto& p : request.headers) {
354  for (const auto& v : p.second) {
355  nghttp2_headers.emplace_back(p.first, v);
356  }
357  }
358 
359  auto& response_queue = event.response_queue;
360  m_Streams.emplace_front(response_queue);
361  auto it = m_Streams.begin();
362 
363  nghttp2_data_provider data_prd;
364  data_prd.source.ptr = &*it;
365  data_prd.read_callback = s_DataSourceRead;
366 
367  it->stream_id = m_Session.Submit(nghttp2_headers.data(), nghttp2_headers.size(), &data_prd);
368 
369  if (it->stream_id < 0) {
370  m_Streams.pop_front();
371  H2S_SESSION_TRACE(this << '/' << response_queue << " fail to submit " << event);
372  return false;
373  }
374 
375  H2S_SESSION_TRACE(this << '/' << response_queue << " submit " << event);
376  m_StreamsByIds.emplace(it->stream_id, it);
377  m_StreamsByQueues.emplace(response_queue, it);
378  m_SessionsByQueues.emplace(std::move(response_queue), *this);
379  return Send();
380 }
381 
382 template <class TFunc>
384 {
385  auto& response_queue = event.response_queue;
386  auto it = Find(response_queue);
387 
388  if (it != m_Streams.end()) {
389  f(*it);
390 
391  if (it->in_progress) {
392  return true;
393  }
394 
395  if (!m_Session.Resume(it->stream_id)) {
396  it->in_progress = true;
397  H2S_SESSION_TRACE(this << '/' << response_queue << " resume for " << event);
398  return Send();
399  }
400  }
401 
402  H2S_SESSION_TRACE(this << '/' << response_queue << " fail to resume for " << event);
403  return false;
404 }
405 
406 int SH2S_Session::OnData(nghttp2_session*, uint8_t, int32_t stream_id, const uint8_t* data, size_t len)
407 {
408  auto it = Find(stream_id);
409 
410  if (it != m_Streams.end()) {
411  auto begin = reinterpret_cast<const char*>(data);
412  Push(it->response_queue, TH2S_Data(begin, begin + len));
413  }
414 
415  return 0;
416 }
417 
418 int SH2S_Session::OnStreamClose(nghttp2_session*, int32_t stream_id, uint32_t error_code)
419 {
420  // Everything is good, nothing to do (Eof is sent in OnFrameRecv)
421  if (!error_code) {
422  return 0;
423  }
424 
425  auto it = Find(stream_id);
426 
427  if (it != m_Streams.end()) {
428  auto response_queue = it->response_queue;
429  H2S_SESSION_TRACE(this << '/' << response_queue << " stream closed with " << SUvNgHttp2_Error::NgHttp2Str(error_code));
430  m_SessionsByQueues.erase(response_queue);
431  m_StreamsByQueues.erase(response_queue);
432  m_StreamsByIds.erase(stream_id);
433  m_Streams.erase(it);
434  Push(response_queue, TH2S_ResponseEvent::eError);
435  }
436 
437  return 0;
438 }
439 
440 int SH2S_Session::OnHeader(nghttp2_session*, const nghttp2_frame* frame, const uint8_t* name, size_t namelen,
441  const uint8_t* value, size_t valuelen, uint8_t)
442 {
443  if ((frame->hd.type == NGHTTP2_HEADERS) && (frame->headers.cat == NGHTTP2_HCAT_RESPONSE)) {
444  auto stream_id = frame->hd.stream_id;
445  auto it = Find(stream_id);
446 
447  if (it != m_Streams.end()) {
448  string n(reinterpret_cast<const char*>(name), namelen);
449  string v(reinterpret_cast<const char*>(value), valuelen);
450  auto& headers = it->headers;
451  auto hit = headers.find(n);
452 
453  if (hit == headers.end()) {
454  headers.emplace(piecewise_construct, forward_as_tuple(std::move(n)), forward_as_tuple(1, std::move(v)));
455  } else {
456  hit->second.emplace_back(std::move(v));
457  }
458  }
459  }
460 
461  return 0;
462 }
463 
465 {
466  for (auto& stream : m_Streams) {
467  auto& response_queue = stream.response_queue;
468  m_SessionsByQueues.erase(response_queue);
469  Push(response_queue, TH2S_ResponseEvent::eError);
470  }
471 
472  m_Streams.clear();
473  m_StreamsByIds.clear();
474  m_StreamsByQueues.clear();
475 }
476 
477 int SH2S_Session::OnFrameRecv(nghttp2_session*, const nghttp2_frame *frame)
478 {
479  const bool is_headers_frame = (frame->hd.type == NGHTTP2_HEADERS) && (frame->headers.cat == NGHTTP2_HCAT_RESPONSE);
480  const bool is_data_frame = frame->hd.type == NGHTTP2_DATA;
481  const bool is_eof = frame->hd.flags & NGHTTP2_FLAG_END_STREAM;
482 
483  if (is_headers_frame || (is_data_frame && is_eof)) {
484  auto stream_id = frame->hd.stream_id;
485  auto it = Find(stream_id);
486 
487  if (it != m_Streams.end()) {
488  auto& response_queue = it->response_queue;
489 
490  if (is_headers_frame) {
491  Push(response_queue, std::move(it->headers));
492  }
493 
494  if (is_eof) {
495  Push(response_queue, TH2S_ResponseEvent::eEof);
496  }
497  }
498  }
499 
500  return 0;
501 }
502 
504  m_Proxy(SSocketAddress::Parse(CNcbiEnvironment().Get("HTTP_PROXY"), SSocketAddress::SHost::EName::eOriginal))
505 {
506 }
507 
509 {
510  for (auto& session : m_Sessions) {
511  session.second.Reset("Shutdown is in progress");
512  }
513 
514  m_Loop.Run(UV_RUN_DEFAULT);
515  m_Sessions.clear();
516 }
517 
519 {
520  m_Loop.Run(UV_RUN_NOWAIT);
521 
522  // Retrieve all events from the queue
523  for (;;) {
524  auto queue_locked = request_queue.GetLock();
525 
526  if (queue_locked->empty()) {
527  break;
528  }
529 
530  TH2S_RequestEvent outgoing(std::move(queue_locked->front()));
531  queue_locked->pop();
532  queue_locked.Unlock();
533 
534  auto response_queue = outgoing.response_queue;
535  auto session = m_SessionsByQueues.find(response_queue);
536  const bool new_request = session == m_SessionsByQueues.end();
537 
538  switch (outgoing.GetType()) {
540  if (new_request) {
541  H2S_IOC_TRACE(response_queue << " pop " << outgoing);
542  auto& request = outgoing.GetStart();
543 
544  if (auto new_session = NewSession(request)) {
545  if (new_session->Request(std::move(outgoing))) {
546  continue;
547  }
548  }
549  } else {
550  H2S_IOC_TRACE(response_queue << " pop unexpected " << outgoing);
551  }
552 
553  // Report error
554  break;
555 
557  if (!new_request) {
558  H2S_IOC_TRACE(response_queue << " pop " << outgoing);
559 
560  auto l = [&](SH2S_IoStream& stream) { stream.pending.emplace(std::move(outgoing.GetData())); };
561 
562  if (session->second.get().Event(outgoing, l)) {
563  continue;
564  }
565  } else {
566  H2S_IOC_TRACE(response_queue << " pop unexpected " << outgoing);
567  }
568 
569  // Report error
570  break;
571 
573  if (!new_request) {
574  H2S_IOC_TRACE(response_queue << " pop " << outgoing);
575  auto l = [](SH2S_IoStream& stream) { stream.eof = true; };
576 
577  if (session->second.get().Event(outgoing, l)) {
578  continue;
579  }
580  } else {
581  H2S_IOC_TRACE(response_queue << " pop unexpected " << outgoing);
582  }
583 
584  // Report error
585  break;
586 
588  // No need to report incoming error back
589  H2S_IOC_TRACE(response_queue << " pop " << outgoing);
590  continue;
591  }
592 
593  // Cannot use outgoing here as it may already be empty (moved from)
594  // We can only report error if we still have corresponding queue
595  if (auto queue = response_queue.lock()) {
597  H2S_IOC_TRACE(response_queue << " push " << event);
598  queue->GetLock()->emplace(std::move(event));
599  }
600  }
601 
602  m_Loop.Run(UV_RUN_NOWAIT);
603 }
604 
606 {
607  const auto& url = request.url;
608  auto scheme = url.GetScheme();
609  auto port = url.GetPort();
610 
611  if (port.empty()) {
612  if (scheme == "http") {
613  port = "80";
614  } else if (scheme == "https") {
615  port = "443";
616  } else {
617  return nullptr;
618  }
619  }
620 
622  SH2S_Session::TAddrNCred addr_n_cred{{SSocketAddress(std::move(host), port), request.cred}, m_Proxy};
623  auto https = scheme == "https" || (scheme.empty() && (port == "443"));
624  auto range = m_Sessions.equal_range(addr_n_cred);
625 
626  for (auto it = range.first; it != range.second; ++it) {
627  if (!it->second.IsFull()) {
628  return &it->second;
629  }
630  }
631 
632  // No such sessions yet or all are full
633  auto it = m_Sessions.emplace(piecewise_construct, forward_as_tuple(addr_n_cred), forward_as_tuple(&m_Loop, addr_n_cred, https, m_SessionsByQueues));
634  return &it->second;
635 }
636 
639  m_ApiLock(GetApiLock())
640 {
641 }
642 
644 {
645  return SH2S_Io::GetInstance();
646 }
647 
649 {
650  int status_code = 0;
651  auto status = headers.find(":status");
652 
653  if (status != headers.end()) {
654  status_code = stoi(status->second.front());
655  headers.erase(status);
656  }
657 
658  req.x_UpdateResponse(std::move(headers), status_code, {});
659 }
660 
662 {
663  if (protocol <= CHttpSession_Base::eHTTP_11) {
664  req.x_InitConnection(use_form_data);
665  return;
666  }
667 
668  req.x_AdjustHeaders(use_form_data);
669 
670  auto update_response = [&](CHttpHeaders::THeaders headers) { UpdateResponse(req, std::move(headers)); };
671  auto response_queue = make_shared<TH2S_ResponseQueue>();
672 
673  const auto& req_cred = req.m_Credentials;
674  SUvNgHttp2_Tls::TCred cred(req_cred ? req_cred->GetCert() : string(), req_cred ? req_cred->GetPKey() : string());
675 
676  // Cannot just pass req itself (accessing private members here)
677  TH2S_RequestEvent request(SH2S_Request::SStart(req.m_Method, req.m_Url, std::move(cred), req.m_Headers->Get()), response_queue);
678 
679  unique_ptr<IReaderWriter> rw(new SH2S_ReaderWriter(std::move(update_response), std::move(response_queue), std::move(request)));
680  auto stream = make_shared<CRWStream>(rw.release(), 0, nullptr, CRWStreambuf::fOwnAll);
681 
682  req.x_InitConnection2(std::move(stream));
683 }
684 
686 {
687  if (resp.GetStatusCode() || (protocol <= CHttpSession_Base::eHTTP_11)) {
688  return false;
689  }
690 
691  protocol = CHttpSession_Base::eHTTP_11;
692  return true;
693 }
694 
695 
bool x_Downgrade(CHttpResponse &resp, CHttpSession_Base::EProtocol &protocol) const override
static void UpdateResponse(CHttpRequest &req, CHttpHeaders::THeaders headers)
shared_ptr< void > TApiLock
Get an API lock.
void x_StartRequest(CHttpSession_Base::EProtocol protocol, CHttpRequest &req, bool use_form_data) override
static TApiLock GetApiLock()
HTTP request.
HTTP response.
HTTP session class, holding common data for multiple requests.
CNcbiEnvironment –.
Definition: ncbienv.hpp:110
CUrl –.
Definition: ncbi_url.hpp:353
void erase(iterator pos)
Definition: map.hpp:167
const_iterator end() const
Definition: map.hpp:152
const_iterator find(const key_type &key) const
Definition: map.hpp:153
const_iterator_pair equal_range(const key_type &key) const
Definition: map.hpp:296
void clear()
Definition: map.hpp:309
static FILE * f
Definition: readconf.c:23
char data[12]
Definition: iconv.c:80
Int4 int32_t
unsigned char uint8_t
Uint4 uint32_t
void x_InitConnection2(shared_ptr< iostream > stream)
const THeaders & Get() const
int GetStatusCode(void) const
Get response status code.
void x_InitConnection(bool use_form_data)
void x_UpdateResponse(CHttpHeaders::THeaders headers, int status_code, string status_text)
CRef< CHttpHeaders > m_Headers
shared_ptr< CTlsCertCredentials > m_Credentials
void x_AdjustHeaders(bool use_form_data)
EProtocol
HTTP protocol version.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
ERW_Result
Result codes for I/O operations.
@ eRW_Eof
End of data, should be considered permanent.
@ eRW_Error
Unrecoverable error, no retry possible.
@ eRW_Success
Everything is okay, I/O completed.
const string & GetScheme(void) const
Definition: ncbi_url.hpp:398
unsigned TReqMethod
EReqMethod
@ eReqMethod_Options
@ eReqMethod_Trace
@ eReqMethod_Any
@ eReqMethod_v1
@ eReqMethod_Put
@ eReqMethod_Delete
@ eReqMethod_Patch
@ eReqMethod_Get
@ eReqMethod_Connect
@ eReqMethod_Head
@ eReqMethod_Post
char * buf
yy_size_t n
int len
const TYPE & Get(const CNamedParameterList *param)
range(_Ty, _Ty) -> range< _Ty >
const GenericPointer< typename T::ValueType > T2 value
Definition: pointer.h:1227
constexpr uint32_t kMaxStreams
THeader::SConvert s_GetMethodName(TReqMethod method)
constexpr size_t kReadBufSize
constexpr size_t kWriteBufSize
#define H2S_SESSION_TRACE(message)
#define H2S_IOC_TRACE(message)
vector< char > TH2S_Data
SH2S_Event< SH2S_Request > TH2S_RequestEvent
#define H2S_RW_TRACE(message)
int ssize_t
Definition: ncbiconf_msvc.h:93
T min(T x_, T y_)
#define count
@ eError
An error was encountered while trying to send request or to read and to process the reply.
Reader-writer based streams.
static SLJIT_INLINE sljit_ins l(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
const char * GetTypeName() const
typename TBase::SStart TStart
TH2S_Data & GetData()
EType GetType() const
SH2S_Event(TStart start, TArgs &&... args)
SH2S_Session * NewSession(const SH2S_Request::SStart &request)
void Process(TH2S_RequestQueue &request_queue)
multimap< SH2S_Session::TAddrNCred, SUvNgHttp2_Session< SH2S_Session > > m_Sessions
TH2S_SessionsByQueues m_SessionsByQueues
TH2S_WeakResponseQueue response_queue
queue< TH2S_Data > pending
ssize_t DataSourceRead(void *session, uint8_t *buf, size_t length, uint32_t *data_flags)
static shared_ptr< SH2S_Io > GetInstance()
TUpdateResponse m_UpdateResponse
ERW_Result Receive(ERW_Result(SH2S_ReaderWriter::*member)(TH2S_ResponseEvent &))
ERW_Result ReadImpl(void *buf, size_t count, size_t *bytes_read)
ERW_Result ReceiveResponse(TH2S_ResponseEvent &incoming)
ERW_Result Write(const void *buf, size_t count, size_t *bytes_written=0) override
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
ERW_Result Flush() override
Flush pending data (if any) down to the output device.
ERW_Result PendingCountImpl(size_t *count)
shared_ptr< TH2S_ResponseQueue > m_ResponseQueue
function< void(CHttpHeaders::THeaders)> TUpdateResponse
ERW_Result ReadFsm(function< ERW_Result()> impl)
void Push(TH2S_RequestEvent event)
SH2S_ReaderWriter(TUpdateResponse update_response, shared_ptr< TH2S_ResponseQueue > response_queue, TH2S_RequestEvent request)
ERW_Result ReceiveData(TH2S_ResponseEvent &incoming)
SStart(EReqMethod m, CUrl u, SUvNgHttp2_Tls::TCred c, CHttpHeaders::THeaders h)
SUvNgHttp2_Tls::TCred cred
void OnReset(SUvNgHttp2_Error error) override
static ssize_t s_DataSourceRead(nghttp2_session *, int32_t, uint8_t *buf, size_t length, uint32_t *data_flags, nghttp2_data_source *source, void *user_data)
int OnFrameRecv(nghttp2_session *session, const nghttp2_frame *frame)
TH2S_SessionsByQueues & m_SessionsByQueues
map< TH2S_WeakResponseQueue, TStreams::iterator, owner_less< weak_ptr< TH2S_ResponseQueue > > > m_StreamsByQueues
bool Event(TH2S_RequestEvent &event, TFunc f)
int OnStreamClose(nghttp2_session *session, int32_t stream_id, uint32_t error_code)
unordered_map< int32_t, TStreams::iterator > m_StreamsByIds
SH2S_Session(uv_loop_t *loop, const TAddrNCred &addr_n_cred, bool https, TH2S_SessionsByQueues &sessions_by_queues, TNgHttp2Cbs &&... callbacks)
bool Request(TH2S_RequestEvent request)
int OnData(nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len)
int OnHeader(nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, uint8_t flags)
TStreams::iterator Find(int32_t stream_id)
void Push(TH2S_WeakResponseQueue &response_queue, TH2S_ResponseEvent event)
int32_t Submit(const nghttp2_nv *nva, size_t nvlen, nghttp2_data_provider *data_prd=nullptr)
int Resume(int32_t stream_id)
SLock< TType > GetLock()
static const char * NgHttp2Str(T e)
SNgHttp2_Session m_Session
pair< string, string > TCred
static const string & Get()
void Run(uv_run_mode mode=UV_RUN_DEFAULT)
#define _ASSERT
Modified on Fri Sep 20 14:57:26 2024 by modify_doxy.py rev. 669887