1 #ifndef PSGS_DISPATCHER__HPP
2 #define PSGS_DISPATCHER__HPP
42 #define MAX_PROCESSOR_GROUPS 16
43 #define PROC_BUCKETS 100
125 PSG_ERROR(
"The request timer (request id: " +
127 ") must be stopped and its handle closed before the "
128 "processor group is destroyed");
150 "Request timer cannot be started twice (request id: " +
178 string(uv_strerror(ret)));
236 void AddProcessor(unique_ptr<IPSGS_Processor> processor);
246 shared_ptr<CPSGS_Reply> reply);
250 list<shared_ptr<IPSGS_Processor>>
252 shared_ptr<CPSGS_Reply> reply,
253 const list<string> & processor_names);
290 shared_ptr<CPSGS_Reply> reply,
291 bool low_level_close);
294 shared_ptr<CPSGS_Reply> reply,
295 vector<IPSGS_Processor::EPSGS_Status> proc_statuses,
296 bool low_level_close);
298 shared_ptr<CPSGS_Request> request,
299 shared_ptr<CPSGS_Reply> reply);
302 shared_ptr<CPSGS_Request> request,
303 shared_ptr<CPSGS_Reply> reply);
322 unordered_map<size_t,
372 size_t * limit_reached)
Based on various attributes of the request: {{seq_id}}; NA name; {{blob_id}}; etc (or a combination t...
void SignalFinishProcessing(IPSGS_Processor *processor, EPSGS_SignalSource source)
The processor signals that it finished one way or another; including when a processor is canceled.
bool IsGroupAlive(size_t request_id)
map< string, size_t > GetConcurrentCounters(void)
void OnRequestTimer(size_t request_id)
void AddProcessor(unique_ptr< IPSGS_Processor > processor)
Register processor (one to serve as a processor factory)
vector< string > m_RegisteredProcessorGroups
void PopulateStatus(CJsonNode &status)
CRequestStatus::ECode x_ConcludeRequestStatus(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, vector< IPSGS_Processor::EPSGS_Status > proc_statuses, bool low_level_close)
uint64_t m_RequestTimeoutMillisec
map< string, size_t > GetProcessorGroupToIndexMap(void) const
Provides a map between a processor group name and a unique zero-based index of the group.
SProcessorConcurrency m_ProcessorConcurrency[16]
void NotifyRequestFinished(size_t request_id)
void OnRequestTimerClose(size_t request_id)
CRequestStatus::ECode x_MapProcessorFinishToStatus(IPSGS_Processor::EPSGS_Status status) const
void OnLibh2oFinished(size_t request_id)
CRequestStatus::ECode x_ConcludeIDGetNARequestStatus(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, bool low_level_close)
void x_PrintRequestStop(shared_ptr< CPSGS_Request > request, CRequestStatus::ECode status, size_t bytes_sent)
void StartRequestTimer(size_t request_id)
void x_SendProgressMessage(IPSGS_Processor::EPSGS_Status finish_status, IPSGS_Processor *processor, shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)
static string SignalSourceToString(EPSGS_SignalSource source)
void x_SendTrace(const string &msg, shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)
list< string > PreliminaryDispatchRequest(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)
Return list of processor names which reported that they can process the request.
void x_DecrementConcurrencyCounter(IPSGS_Processor *processor)
CPSGS_Dispatcher(double request_timeout)
static string ProcessorStatusToString(EPSGS_ProcessorStatus st)
void EraseProcessorGroup(size_t request_id)
size_t x_GetBucketIndex(size_t request_id) const
unordered_map< size_t, unique_ptr< SProcessorGroup > > m_ProcessorGroups[100]
void RegisterProcessorsForMomentousCounters(void)
void SignalConnectionCanceled(size_t request_id)
An http connection can be canceled so this method will be invoked for such a case.
list< shared_ptr< IPSGS_Processor > > DispatchRequest(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, const list< string > &processor_names)
Return list of processors which can be used to process the request.
IPSGS_Processor::EPSGS_StartProcessing SignalStartProcessing(IPSGS_Processor *processor)
The processor signals that it is going to provide data to the client.
list< unique_ptr< IPSGS_Processor > > m_RegisteredProcessors
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
EPSGS_StartProcessing
Tells wether to continue or not after a processor called SignalStartProcessing() method.
EPSGS_Status
The GetStatus() method returns a processor current status.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
const CharType(& source)[N]
#define MAX_PROCESSOR_GROUPS
void request_timer_cb(uv_timer_t *handle)
void request_timer_close_cb(uv_handle_t *handle)
#define PSG_ERROR(message)
static SLJIT_INLINE sljit_ins st(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
size_t GetCurrentCount(void) const
atomic< bool > m_CountLock
void IncrementLimitReachedCount(void)
void IncrementCurrentCount(void)
void DecrementCurrentCount(void)
void GetCurrentAndLimitReachedCounts(size_t *current, size_t *limit_reached)
size_t m_LimitReachedCount
EPSGS_ProcessorStatus m_DispatchStatus
IPSGS_Processor::EPSGS_Status m_LastReportedTraceStatus
bool m_DoneStatusRegistered
bool m_ProcPerformanceRegistered
SProcessorData(shared_ptr< IPSGS_Processor > processor, EPSGS_ProcessorStatus dispatch_status, IPSGS_Processor::EPSGS_Status finish_status)
IPSGS_Processor::EPSGS_Status m_FinishStatus
shared_ptr< IPSGS_Processor > m_Processor
vector< SProcessorData > m_Processors
SProcessorGroup(size_t request_id)
IPSGS_Processor * m_StartedProcessing
bool m_RequestStopPrinted
void RestartTimer(uint64_t timer_millisec)
void StopRequestTimer(void)
uv_timer_t m_RequestTimer
bool m_AllProcessorsFinished
void StartRequestTimer(uv_loop_t *uv_loop, uint64_t timer_millisec)
bool IsSafeToDelete(void) const