44 using namespace std::placeholders;
56 shared_ptr<CPSGS_Request> request,
57 shared_ptr<CPSGS_Reply> reply,
65 this, _1, _2, _3, _4),
82 shared_ptr<CPSGS_Reply> reply)
const
91 auto startup_data_state = app->GetStartupDataState();
93 if (request->NeedTrace()) {
95 "request because Cassandra DB is not available.\n" +
97 request->GetStartTimestamp());
108 shared_ptr<CPSGS_Request> request,
109 shared_ptr<CPSGS_Reply> reply,
137 const string & message)
157 message, status,
code, severity);
180 unique_ptr<CCassAccVerHistoryFetch> details;
187 auto sat_info = app->GetBioseqKeyspace();
194 bioseq_resolution.GetOriginalSeqIdType());
195 details->SetLoader(fetch_task);
206 this, _1, _2, _3, _4, _5),
258 "Accession version history no-more-data callback",
262 "Accession version history data received",
281 "while the output has finished, ignoring");
314 const string & message)
322 bool is_error =
IsError(severity);
382 bool overall_final_state =
false;
389 if (details->InPeek()) {
392 details->SetInPeek(
true);
393 overall_final_state |=
x_Peek(details, need_wait);
394 details->SetInPeek(
false);
402 if (overall_final_state) {
414 if (!fetch_details->GetLoader())
417 bool final_state =
false;
419 if (!fetch_details->ReadFinished()) {
420 final_state = fetch_details->GetLoader()->Wait();
423 if (fetch_details->GetLoader()->HasError() &&
427 string error = fetch_details->GetLoader()->LastError();
436 app->GetCounters().Increment(
this,
440 app->GetCounters().Increment(
this,
451 fetch_details->GetLoader()->ClearError();
452 fetch_details->SetReadFinished();
static const string kAccVerHistProcessorName
const string kCassandraProcessorGroupName
const string kCassandraProcessorEvent
CCassAccVerHistoryTaskFetch * GetLoader(void)
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
void SetConsumeCallback(TAccVerHistConsumeCallback callback)
void SetErrorCB(TDataErrorCallback error_cb)
void SetReadFinished(void)
@ ePSGS_CassQueryTimeoutError
void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)
void x_OnAccVerHistError(CCassAccVerHistoryFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const
Tells if processor can process the given request.
virtual string GetGroupName(void) const
Tells the processor group name.
virtual IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const
Create processor to fulfil PSG request using the data source.
bool x_OnAccVerHistData(SAccVerHistRec &&acc_ver_hist_record, bool last, CCassAccVerHistoryFetch *fetch_details)
void x_OnResolutionGoodData(void)
virtual string GetName(void) const
Tells the processor name (used in logging and tracing)
void x_OnSeqIdResolveFinished(SBioseqResolution &&bioseq_resolution)
virtual ~CPSGS_AccessionVersionHistoryProcessor()
void x_SendBioseqInfo(SBioseqResolution &bioseq_resolution)
virtual void Process(void)
Main processing function.
virtual void ProcessEvent(void)
Called when an event happened which may require to have some processing.
void x_OnSeqIdResolveError(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
CPSGS_AccessionVersionHistoryProcessor()
virtual EPSGS_Status GetStatus(void)
Tells the processor status (if it has finished or in progress)
void x_Peek(bool need_wait)
SPSGS_AccessionVersionHistoryRequest * m_AccVerHistoryRequest
EPSGS_AccessionAdjustmentResult AdjustBioseqAccession(SBioseqResolution &bioseq_resolution)
CRequestStatus::ECode CountError(CCassFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message, EPSGS_LoggingFlag logging_flag, EPSGS_StatusUpdateFlag status_update_flag)
void UpdateOverallStatus(CRequestStatus::ECode status)
bool IsCassandraProcessorEnabled(shared_ptr< CPSGS_Request > request) const
bool IsError(EDiagSev severity) const
list< unique_ptr< CCassFetch > > m_FetchDetails
bool AreAllFinishedRead(void) const
void UnlockWaitingProcessor(void)
IPSGS_Processor::EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
void SignalFinishProcessing(void)
bool IsTimeoutError(const string &msg) const
@ ePSGS_AccessionVersionHistoryRequest
void ResolveInputSeqId(void)
CPSGSCounters & GetCounters(void)
static CPubseqGatewayApp * GetInstance(void)
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
static DLIST_TYPE *DLIST_NAME() last(DLIST_LIST_TYPE *list)
EDiagSev
Severity level for the posted diagnostics.
@ eDiag_Error
Error message.
@ e500_InternalServerError
string StripTrailingVerticalBars(const string &seq_id)
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
#define PSG_ERROR(message)
string GetCassStartupDataStateMessage(EPSGS_StartupDataState state)
EPSGS_ResolutionResult m_ResolutionResult
CBioseqInfoRecord & GetBioseqInfo(void)
C++ wrappers for the Perl-compatible regular expression (PCRE) library.