NCBI C++ ToolKit
http_connection.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: http_connection.cpp 103123 2024-09-11 18:57:02Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitri Dmitrienko
27  *
28  * File Description:
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "http_connection.hpp"
35 #include "http_reply.hpp"
36 #include "pubseq_gateway.hpp"
37 #include "backlog_per_request.hpp"
38 
39 
40 static void IncrementBackloggedCounter(void)
41 {
42  auto * app = CPubseqGatewayApp::GetInstance();
43  app->GetCounters().Increment(nullptr, CPSGSCounters::ePSGS_BackloggedRequests);
44 }
45 
46 
48 {
49  auto * app = CPubseqGatewayApp::GetInstance();
50  app->GetCounters().Increment(nullptr, CPSGSCounters::ePSGS_TooManyRequests);
51 }
52 
53 
54 static void NotifyRequestFinished(size_t request_id)
55 {
56  auto * app = CPubseqGatewayApp::GetInstance();
57  app->NotifyRequestFinished(request_id);
58 }
59 
60 
61 
63 {
64  // Note: this should not happened that there are records in the pending
65  // and backlog lists. The reason is that the CHttpConnection instances
66  // are reused and each time the are recycled the ResetForReuse() is
67  // called. So the check below is rather a sanity check.
68  while (!m_BacklogRequests.empty()) {
71  }
72 
73  while (!m_RunningRequests.empty()) {
74  auto it = m_RunningRequests.begin();
75  (*it)->GetHttpReply()->CancelPending();
77  }
78 }
79 
80 
81 void CHttpConnection::SetupMaintainTimer(uv_loop_t * tcp_worker_loop)
82 {
83  uv_timer_init(tcp_worker_loop, &m_ScheduledMaintainTimer);
84  m_ScheduledMaintainTimer.data = (void *)(this);
85 }
86 
87 
89 {
90  uv_timer_stop(&m_ScheduledMaintainTimer);
91  uv_close(reinterpret_cast<uv_handle_t*>(&m_ScheduledMaintainTimer), nullptr);
92 }
93 
94 
96 {
97  while (!m_BacklogRequests.empty()) {
100  }
101 
102  while (!m_RunningRequests.empty()) {
103  auto it = m_RunningRequests.begin();
104  (*it)->GetHttpReply()->CancelPending();
106  }
107 
108  m_IsClosed = false;
109 }
110 
111 
112 void CHttpConnection::PeekAsync(bool chk_data_ready)
113 {
114  for (auto & it: m_RunningRequests) {
115  if (!chk_data_ready ||
116  it->GetHttpReply()->CheckResetDataTriggered()) {
117  it->GetHttpReply()->PeekPending();
118  }
119  }
120 
123 }
124 
125 
127 {
130 }
131 
132 
133 void MaintanTimerCB(uv_timer_t * handle)
134 {
135  CHttpConnection * http_connection = (CHttpConnection *)(handle->data);
136  http_connection->DoScheduledMaintain();
137 }
138 
140 {
141  // Send one time async event to initiate maintain
142  if (uv_is_active((uv_handle_t*)(&m_ScheduledMaintainTimer))) {
143  return; // has already been activated
144  }
145 
146  m_ScheduledMaintainTimer.data = (void *)(this);
147  uv_timer_start(&m_ScheduledMaintainTimer, MaintanTimerCB, 0, 0);
148 }
149 
150 
151 void CHttpConnection::x_RegisterPending(shared_ptr<CPSGS_Request> request,
152  shared_ptr<CPSGS_Reply> reply,
153  list<string> processor_names)
154 {
155  if (m_RunningRequests.size() < m_HttpMaxRunning) {
156  x_Start(request, reply, std::move(processor_names));
157  } else if (m_BacklogRequests.size() < m_HttpMaxBacklog) {
158  RegisterBackloggedRequest(request->GetRequestType());
159  m_BacklogRequests.push_back(
160  SBacklogAttributes{request, reply,
161  std::move(processor_names),
162  psg_clock_t::now()});
164  } else {
166 
167  reply->SetContentType(ePSGS_PSGMime);
168  reply->PrepareReplyMessage("Too many pending requests",
171  reply->PrepareReplyCompletion(CRequestStatus::e503_ServiceUnavailable,
172  psg_clock_t::now());
173  reply->Flush(CPSGS_Reply::ePSGS_SendAndFinish);
174  reply->SetCompleted();
175  }
176 }
177 
178 
179 void
180 CHttpConnection::x_Start(shared_ptr<CPSGS_Request> request,
181  shared_ptr<CPSGS_Reply> reply,
182  list<string> processor_names)
183 {
184  auto http_reply = reply->GetHttpReply();
185  if (!http_reply->IsPostponed())
186  NCBI_THROW(CPubseqGatewayException, eRequestNotPostponed,
187  "Request has not been postponed");
188  if (IsClosed())
189  NCBI_THROW(CPubseqGatewayException, eConnectionClosed,
190  "Request handling can not be started after connection was closed");
191 
192  // Try to instantiate the processors in accordance to the pre-dispatched
193  // names.
194  auto * app = CPubseqGatewayApp::GetInstance();
195  list<shared_ptr<IPSGS_Processor>> processors =
196  app->GetProcessorDispatcher()->DispatchRequest(request, reply, processor_names);
197 
198  if (processors.empty()) {
199  // No processors were actually instantiated
200  // The reply is completed by the dispatcher with all necessary messages.
201  return;
202  }
203 
204  request->SetConcurrentProcessorCount(processors.size());
205  for (auto & processor : processors) {
206  reply->GetHttpReply()->AssignPendingReq(
207  unique_ptr<CPendingOperation>(
208  new CPendingOperation(request, reply, processor)));
209  }
210 
211  // Add the reply to the list of running replies
212  m_RunningRequests.push_back(reply);
213 
214  // To avoid a possibility to have cancel->start in progress messages in the
215  // reply in case of multiple processors due to the first one may do things
216  // synchronously and call Cancel() for the other processors right away: the
217  // sending of the start message will be done for all of them before the
218  // actual Start() call
219  for (auto req: http_reply->GetPendingReqs())
220  req->SendProcessorStartMessage();
221 
222  // Start the request timer
223  app->GetProcessorDispatcher()->StartRequestTimer(reply->GetRequestId());
224 
225  // Now start the processors
226  for (auto req: http_reply->GetPendingReqs())
227  req->Start();
228 }
229 
230 
231 void CHttpConnection::Postpone(shared_ptr<CPSGS_Request> request,
232  shared_ptr<CPSGS_Reply> reply,
233  list<string> processor_names)
234 {
235  auto http_reply = reply->GetHttpReply();
236  switch (http_reply->GetState()) {
238  if (http_reply->IsPostponed())
240  eRequestAlreadyPostponed,
241  "Request has already been postponed");
242  break;
244  NCBI_THROW(CPubseqGatewayException, eRequestCannotBePostponed,
245  "Request that has already started "
246  "can't be postponed");
247  break;
248  default:
249  NCBI_THROW(CPubseqGatewayException, eRequestAlreadyFinished,
250  "Request has already been finished");
251  break;
252  }
253 
254  http_reply->SetPostponed();
255  x_RegisterPending(request, reply, std::move(processor_names));
256 }
257 
258 
260 {
261  x_CancelBacklog();
262  for (auto & it: m_RunningRequests) {
263  if (!it->IsFinished()) {
264  auto http_reply = it->GetHttpReply();
265  http_reply->CancelPending();
266  http_reply->PeekPending();
267  }
268  }
270 }
271 
272 
274 {
275  size_t request_id = (*it)->GetRequestId();
276 
277  // Note: without this call there will be memory leaks.
278  // The infrastructure holds a shared_ptr to the reply, the pending
279  // operation instance also holds a shared_ptr to the very same reply
280  // and the reply holds a shared_ptr to the pending operation instance.
281  // All together it forms a loop which needs to be broken for a correct
282  // memory management.
283  // The call below resets a shared_ptr to the pending operation. It is
284  // safe to do it here because this point is reached only when all
285  // activity on processing a request is over.
286  (*it)->GetHttpReply()->ResetPendingRequest();
287 
288  m_RunningRequests.erase(it);
289  NotifyRequestFinished(request_id);
290 }
291 
292 
294 {
295  // The backlogged requests have not started processing yet. They do not
296  // have any created processors. So there is no need to notify a dispatcher
297  // that the request is finished.
298  // They do not have assigned pending requests either. So, just remove the
299  // record from the list.
300  UnregisterBackloggedRequest(it->m_Request->GetRequestType());
301  m_BacklogRequests.erase(it);
302 }
303 
304 
306 {
307  while (!m_BacklogRequests.empty()) {
308  auto it = m_BacklogRequests.begin();
309  shared_ptr<CPSGS_Reply> reply = it->m_Reply;
310 
311  reply->GetHttpReply()->CancelPending();
313  }
314 }
315 
316 
318 {
320  while (it != m_RunningRequests.end()) {
321  if ((*it)->IsCompleted()) {
322  auto next = it;
323  ++next;
325  it = next;
326  } else {
327  ++it;
328  }
329  }
330 }
331 
332 
334 {
335  while (m_RunningRequests.size() < m_HttpMaxRunning &&
336  !m_BacklogRequests.empty()) {
337  auto & backlog_front = m_BacklogRequests.front();
338 
339  shared_ptr<CPSGS_Request> request = backlog_front.m_Request;
340  shared_ptr<CPSGS_Reply> reply = backlog_front.m_Reply;
341  list<string> processor_names = backlog_front.m_PreliminaryDispatchedProcessors;
342  psg_time_point_t backlog_start = backlog_front.m_BacklogStart;
343 
344  m_BacklogRequests.pop_front();
345  UnregisterBackloggedRequest(request->GetRequestType());
346 
347  auto * app = CPubseqGatewayApp::GetInstance();
348  uint64_t mks = app->GetTiming().Register(nullptr, eBacklog,
350  backlog_start);
351  if (mks == 0) {
352  // The timing was disabled
353  auto now = psg_clock_t::now();
354  mks = chrono::duration_cast<chrono::microseconds>(now - backlog_start).count();
355  }
356  request->SetBacklogTime(mks);
357 
358  auto context = request->GetRequestContext();
359  if (context.NotNull()) {
360  CRequestContextResetter context_resetter;
361  request->SetRequestContext();
362 
363  GetDiagContext().Extra().Print("backlog_time_mks", mks);
364  }
365 
366  x_Start(request, reply, std::move(processor_names));
367  }
368 }
369 
void RegisterBackloggedRequest(CPSGS_Request::EPSGS_Type request_type)
void UnregisterBackloggedRequest(CPSGS_Request::EPSGS_Type request_type)
bool IsClosed(void) const
list< SBacklogAttributes > m_BacklogRequests
void SetupMaintainTimer(uv_loop_t *tcp_worker_loop)
void x_MaintainBacklog(void)
typename list< shared_ptr< CPSGS_Reply > >::iterator running_list_iterator_t
void DoScheduledMaintain(void)
volatile bool m_IsClosed
void PeekAsync(bool chk_data_ready)
void x_Start(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, list< string > processor_names)
list< shared_ptr< CPSGS_Reply > > m_RunningRequests
uv_timer_t m_ScheduledMaintainTimer
void ScheduleMaintain(void)
void x_MaintainFinished(void)
void x_UnregisterBacklog(backlog_list_iterator_t &it)
void x_RegisterPending(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, list< string > processor_names)
void x_UnregisterRunning(running_list_iterator_t &it)
void x_CancelAll(void)
typename list< SBacklogAttributes >::iterator backlog_list_iterator_t
void CleanupToStop(void)
void Postpone(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, list< string > processor_names)
void x_CancelBacklog(void)
void ResetForReuse(void)
@ eReplyInitialized
Definition: http_reply.hpp:76
@ ePSGS_SendAndFinish
Definition: psgs_reply.hpp:55
static CPubseqGatewayApp * GetInstance(void)
static DLIST_TYPE *DLIST_NAME() next(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
Definition: dlist.tmpl.h:56
Uint8 uint64_t
CDiagContext_Extra & Print(const string &name, const string &value)
The method does not print the argument, but adds it to the string.
Definition: ncbidiag.cpp:2622
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:818
CDiagContext_Extra Extra(void) const
Create a temporary CDiagContext_Extra object.
Definition: ncbidiag.hpp:2095
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
static void IncrementTooManyRequestsCounter(void)
void MaintanTimerCB(uv_timer_t *handle)
static void IncrementBackloggedCounter(void)
static void NotifyRequestFinished(size_t request_id)
@ ePSGS_TooManyRequests
psg_clock_t::time_point psg_time_point_t
@ eOpStatusFound
Definition: timing.hpp:61
@ eBacklog
Definition: timing.hpp:88
static CS_CONTEXT * context
Definition: will_convert.c:21
Modified on Fri Sep 20 14:58:05 2024 by modify_doxy.py rev. 669887