57 app->NotifyRequestFinished(request_id);
75 (*it)->GetHttpReply()->CancelPending();
104 (*it)->GetHttpReply()->CancelPending();
115 if (!chk_data_ready ||
116 it->GetHttpReply()->CheckResetDataTriggered()) {
117 it->GetHttpReply()->PeekPending();
152 shared_ptr<CPSGS_Reply> reply,
153 list<string> processor_names)
156 x_Start(request, reply, std::move(processor_names));
161 std::move(processor_names),
162 psg_clock_t::now()});
168 reply->PrepareReplyMessage(
"Too many pending requests",
174 reply->SetCompleted();
181 shared_ptr<CPSGS_Reply> reply,
182 list<string> processor_names)
184 auto http_reply = reply->GetHttpReply();
185 if (!http_reply->IsPostponed())
187 "Request has not been postponed");
190 "Request handling can not be started after connection was closed");
195 list<shared_ptr<IPSGS_Processor>> processors =
196 app->GetProcessorDispatcher()->DispatchRequest(request, reply, processor_names);
198 if (processors.empty()) {
204 request->SetConcurrentProcessorCount(processors.size());
205 for (
auto & processor : processors) {
206 reply->GetHttpReply()->AssignPendingReq(
207 unique_ptr<CPendingOperation>(
219 for (
auto req: http_reply->GetPendingReqs())
220 req->SendProcessorStartMessage();
223 app->GetProcessorDispatcher()->StartRequestTimer(reply->GetRequestId());
226 for (
auto req: http_reply->GetPendingReqs())
232 shared_ptr<CPSGS_Reply> reply,
233 list<string> processor_names)
235 auto http_reply = reply->GetHttpReply();
236 switch (http_reply->GetState()) {
238 if (http_reply->IsPostponed())
240 eRequestAlreadyPostponed,
241 "Request has already been postponed");
245 "Request that has already started "
246 "can't be postponed");
250 "Request has already been finished");
254 http_reply->SetPostponed();
263 if (!it->IsFinished()) {
264 auto http_reply = it->GetHttpReply();
265 http_reply->CancelPending();
266 http_reply->PeekPending();
275 size_t request_id = (*it)->GetRequestId();
286 (*it)->GetHttpReply()->ResetPendingRequest();
309 shared_ptr<CPSGS_Reply> reply = it->m_Reply;
311 reply->GetHttpReply()->CancelPending();
321 if ((*it)->IsCompleted()) {
339 shared_ptr<CPSGS_Request> request = backlog_front.m_Request;
340 shared_ptr<CPSGS_Reply> reply = backlog_front.m_Reply;
341 list<string> processor_names = backlog_front.m_PreliminaryDispatchedProcessors;
353 auto now = psg_clock_t::now();
354 mks = chrono::duration_cast<chrono::microseconds>(now - backlog_start).count();
356 request->SetBacklogTime(mks);
358 auto context = request->GetRequestContext();
361 request->SetRequestContext();
366 x_Start(request, reply, std::move(processor_names));
void RegisterBackloggedRequest(CPSGS_Request::EPSGS_Type request_type)
void UnregisterBackloggedRequest(CPSGS_Request::EPSGS_Type request_type)
bool IsClosed(void) const
list< SBacklogAttributes > m_BacklogRequests
void SetupMaintainTimer(uv_loop_t *tcp_worker_loop)
void x_MaintainBacklog(void)
typename list< shared_ptr< CPSGS_Reply > >::iterator running_list_iterator_t
void DoScheduledMaintain(void)
void PeekAsync(bool chk_data_ready)
void x_Start(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, list< string > processor_names)
list< shared_ptr< CPSGS_Reply > > m_RunningRequests
uv_timer_t m_ScheduledMaintainTimer
void ScheduleMaintain(void)
void x_MaintainFinished(void)
void x_UnregisterBacklog(backlog_list_iterator_t &it)
void x_RegisterPending(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, list< string > processor_names)
void x_UnregisterRunning(running_list_iterator_t &it)
typename list< SBacklogAttributes >::iterator backlog_list_iterator_t
void Postpone(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, list< string > processor_names)
void x_CancelBacklog(void)
@ ePSGS_BackloggedRequests
static CPubseqGatewayApp * GetInstance(void)
static DLIST_TYPE *DLIST_NAME() next(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
CDiagContext_Extra & Print(const string &name, const string &value)
The method does not print the argument, but adds it to the string.
CDiagContext & GetDiagContext(void)
Get diag context instance.
CDiagContext_Extra Extra(void) const
Create a temporary CDiagContext_Extra object.
@ eDiag_Error
Error message.
@ e503_ServiceUnavailable
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
static void IncrementTooManyRequestsCounter(void)
void MaintanTimerCB(uv_timer_t *handle)
static void IncrementBackloggedCounter(void)
static void NotifyRequestFinished(size_t request_id)
psg_clock_t::time_point psg_time_point_t
static CS_CONTEXT * context