58 # define tLOG_POST(m) LOG_POST(m)
60 # define tLOG_POST(m) ((void)0)
63 #define SEND_TRACE(str) SendTrace(str)
64 #define SEND_TRACE_FMT(m) \
66 if ( NeedTrace() ) { \
69 SendTrace(str.str()); \
85 static int error_rate =
NCBI_PARAM_TYPE(OSG_PROCESSOR, ERROR_RATE)::GetDefault();
86 if ( error_rate > 0 ) {
87 static int error_counter = 0;
88 if ( ++error_counter >= error_rate ) {
99 const shared_ptr<CPSGS_Request>& request,
100 const shared_ptr<CPSGS_Reply>& reply,
102 : m_Context(request->GetRequestContext()),
103 m_ConnectionPool(pool),
104 m_EnabledFlags(enabled_flags),
107 m_BackgroundProcesing(0),
108 m_NeedTrace(request->NeedTrace())
110 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CPSGS_OSGProcessorBase()");
149 shared_ptr<CPSGS_Reply> reply,
160 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::~CPSGS_OSGProcessorBase() status: "<<
State());
164 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::~CPSGS_OSGProcessorBase() return: "<<
State());
175 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::WaitForCassandra() waiting: "<<
State());
176 SendTrace(
"OSG: waiting for Cassandra results");
178 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::WaitForCassandra() waited: "<<
State());
223 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::Process(): "<<
State());
225 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::Process() canceled: "<<
State());
238 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::Process() return: "<<
State());
255 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessSync() start: "<<
State());
257 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessSync() return: "<<
State());
267 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessCallback() canceled: "<<
State());
271 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessCallback() start: "<<
State());
273 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessCallback() return: "<<
State());
304 SEND_TRACE(
"OSG: switching Process() to background thread");
308 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessAsync(): canceled: "<<
State());
311 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessAsync(): starting: "<<
State());
314 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessAsync() started: "<<
State());
317 catch (exception& exc) {
318 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessAsync() failed: "<<exc.what()<<
": "<<
State());
319 PSG_ERROR(
"OSG: DoProcessAsync: failed to create thread: "<<exc.what());
329 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::DoProcess() start: "<<
State());
349 bool last_attempt = retry_count <= 1;
352 SEND_TRACE(
"OSG: DoProcess() allocating connection");
356 catch ( exception& exc ) {
357 if ( last_attempt ) {
358 PSG_ERROR(
"OSG: DoProcess() failed opening connection: "<<exc.what());
363 PSG_ERROR(
"OSG: DoProcess() retrying after failure opening connection: "<<exc.what());
373 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::DoProcess() canceled 1: "<<
State());
389 catch ( exception& exc ) {
390 if ( last_attempt ) {
391 PSG_ERROR(
"OSG: DoProcess() failed receiving replies: "<<exc.what());
396 PSG_ERROR(
"OSG: retrying after failure receiving replies: "<<exc.what());
417 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::DoProcess() got replies: "<<
State());
422 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::DoProcess() canceled: "<<
State());
428 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::Process() done: "<<
State());
430 catch ( exception& exc ) {
431 PSG_ERROR(
"OSG: DoProcess() failed: "<<exc.what());
432 SendError(
"Exception when handling a get request: " +
string(exc.what()));
440 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::DoProcessReplies() start: "<<
State());
443 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::DoProcessReplies() return: "<<
State());
461 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessRepliesSync() start: "<<
State());
463 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessRepliesSync() return: "<<
State());
474 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessRepliesCallback() canceled: "<<
State());
479 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessRepliesCallback() signal start: "<<
State());
480 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessRepliesCallback() start: "<<
State());
482 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessRepliesCallback() return: "<<
State());
484 catch ( exception& exc ) {
485 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessRepliesCallback() exc: "<<exc.what()<<
": "<<
State());
486 PSG_ERROR(
"OSG: ProcessReplies() failed: "<<exc.what());
501 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessRepliesAsync() start: "<<
State());
502 SEND_TRACE(
"OSG: scheduling ProcessReplies() to UV loop");
504 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessRepliesAsync() canceled: "<<
State());
508 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::CallDoProcessRepliesAsync() return: "<<
State());
517 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::Cancel(): before: "<<
State());
520 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::Cancel(): after: "<<
State());
560 param->SetName(
"hops");
561 param->SetValue().push_back(to_string(hops));
577 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::SetFinalStatus(): status: "<<
State());
581 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::SetFinalStatus(): return: "<<
State());
588 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::FinalizeResult(): state: "<<
State());
604 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::SignalStartOfBackgroundProcessing(): "<<
State());
606 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::SignalStartOfBackgroundProcessing(): return cancel: "<<
State());
619 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::"<<from<<
"()::x_SignalFinishProcessing(): signal: "<<
State());
621 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::"<<from<<
"()::x_SignalFinishProcessing(): return: "<<
State());
625 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::"<<from<<
"()::x_SignalFinishProcessing(): sending to uv-loop "<<
State());
627 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::"<<from<<
"()::x_SignalFinishProcessing(): sent to uv-loop "<<
State());
631 catch ( exception& exc ) {
632 ERR_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::"<<from<<
"()::x_SignalFinishProcessing(): exception: "<<exc.what());
646 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::SignalEndOfBackgroundProcessing(): "<<
State());
655 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::SignalStartOfUVLoop(): "<<
State());
665 tLOG_POST(
"CPSGS_OSGProcessorBase("<<
this<<
")::SignalEndOfUVLoop(): "<<
State());
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
const string kCassandraProcessorEvent
virtual EStatus Execute(void) override
Do the actual job.
CPSGS_OSGProcessorBase::TBGProcToken m_Token
COSGCallDoProcessTask(CPSGS_OSGProcessorBase::TBGProcToken token)
void WaitForReplies(CPSGS_OSGProcessorBase &processor)
size_t GetConnectionID() const
void SendRequest(CPSGS_OSGProcessorBase &processor)
const CID2_Request_Packet & GetRequestPacket() const
void AllocateConnection(const CRef< COSGConnectionPool > &connection_pool)
bool GetWaitBeforeOSG() const
size_t GetRetryCount() const
size_t GetMaxConnectionCount() const
bool GetAsyncProcessing() const
void Queue(CRef< CThreadPool_Task > task)
const CPSGS_OSGProcessorBase * m_ProcessorPtr
CNcbiOstream & Print(CNcbiOstream &out) const
void AddRequest(const CRef< CID2_Request > &req)
virtual void NotifyOSGCallStart()
virtual void NotifyOSGCallReply(const CID2_Reply &reply)
void x_SignalFinishProcessing(const char *from)
void SignalStartOfUVLoop()
AutoPtr< CBackgroundProcessingGuard > TBGProcToken
void CallDoProcessCallback(TBGProcToken token)
static void s_CallFinalizeUvCallback(void *data)
COSGStateReporter State() const
void DoProcess(TBGProcToken token)
CPSGS_OSGProcessorBase(TEnabledFlags enabled_flags, const CRef< COSGConnectionPool > &pool, const shared_ptr< CPSGS_Request > &request, const shared_ptr< CPSGS_Reply > &reply, TProcessorPriority priority)
static bool s_SimulateError()
bool SignalStartOfBackgroundProcessing()
void SendTrace(const string &str)
virtual EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
virtual void SendError(const string &msg)
virtual IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
void CallDoProcessRepliesCallback(TBGProcToken token)
CRef< CRequestContext > m_Context
virtual void CreateRequests()=0
static void s_CallDoProcessRepliesUvCallback(void *proc)
const TFetches & GetFetches() const
virtual void Cancel(void) override
The infrastructure request to cancel processing.
virtual void ResetReplies()
static bool x_Valid(const TBGProcToken &token)
void x_RegisterTimingNotFound(EPSGOperation operation)
virtual ~CPSGS_OSGProcessorBase()
void x_RegisterTiming(EPSGOperation operation, EPSGOperationStatus status, size_t blob_size)
void SetFinalStatus(EPSGS_Status status)
friend class COSGCallDoProcessTask
void SignalEndOfBackgroundProcessing()
void CallDoProcessAsync()
TBGProcToken x_CreateBGProcToken()
void CallDoProcessRepliesAsync(TBGProcToken token)
static CPSGS_OSGProcessorBase * x_GetProcessor(const TBGProcToken &token)
void CallDoProcessReplies(TBGProcToken token)
friend class CBackgroundProcessingGuard
int m_BackgroundProcesing
static void * x_BGProcTokenToVoidP(TBGProcToken &token)
static TBGProcToken x_BGProcTokenFromVoidP(void *ptr)
virtual void WaitForOtherProcessors()
void CallDoProcessRepliesSync()
virtual void NotifyOSGCallEnd()
virtual void ProcessReplies()=0
virtual void Process(void) override
Main processing function.
CRef< COSGConnectionPool > m_ConnectionPool
static CPubseqGatewayApp * GetInstance(void)
Abstract class for representing single task executing in pool of threads To use this class in applica...
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
virtual string GetName(void) const =0
Tells the processor name (used in logging and tracing)
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
void PostponeInvoke(CPSGS_UvLoopBinder::TProcessorCB cb, void *user_data)
The provided callback will be called from the libuv loop assigned to the processor.
void SignalFinishProcessing(void)
A processor should call this method when it decides that there is nothing else to be done.
shared_ptr< CPSGS_Request > m_Request
TProcessorPriority m_Priority
std::ofstream out("events_result.xml")
main entry point for tests
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
EDiagSev
Severity level for the posted diagnostics.
@ eDiag_Trace
Trace message.
@ eDiag_Error
Error message.
@ e500_InternalServerError
CRef< C > Ref(C *object)
Helper functions to get CRef<> and CConstRef<> objects.
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
EStatus
Status of the task.
@ eCompleted
executed successfully
void SetParams(TParams &value)
Assign a value to Params data member.
const Tdata & Get(void) const
Get the member data.
constexpr bool empty(list< Ts... >) noexcept
double f(double x_, const double &y_)
NCBI_PARAM_DEF(int, OSG_PROCESSOR, ERROR_RATE, 0)
void SetDiagSeverity(EDiagSev severity)
static EDiagSev s_DiagSeverity
void SetDebugLevel(int level)
Severity GetDiagSeverity()
#define SEND_TRACE_FMT(m)
NCBI_PARAM_DECL(int, OSG_PROCESSOR, ERROR_RATE)
static thread_local bool m_UVLoop
#define PSG_ERROR(message)
chrono::steady_clock psg_clock_t
static const char * str(char *buf, int n)
Pool of generic task-executing threads.