NCBI C++ ToolKit
osg_processor_base.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: osg_processor_base.cpp 99336 2023-03-13 14:22:34Z vasilche $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Eugene Vasilchenko
27  *
28  * File Description: base class for processors which may generate os_gateway
29  * fetches
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "osg_processor_base.hpp"
36 
37 #include "osg_fetch.hpp"
38 #include "osg_caller.hpp"
39 #include "osg_connection.hpp"
40 #include "cass_processor_base.hpp"
41 
47 #include <util/thread_pool.hpp>
48 #include "pubseq_gateway.hpp"
49 #include <thread>
50 
51 
55 
56 
57 #if 0
58 # define tLOG_POST(m) LOG_POST(m)
59 #else
60 # define tLOG_POST(m) ((void)0)
61 #endif
62 
63 #define SEND_TRACE(str) SendTrace(str)
64 #define SEND_TRACE_FMT(m) \
65  do { \
66  if ( NeedTrace() ) { \
67  ostringstream str; \
68  str << m; \
69  SendTrace(str.str()); \
70  } \
71  } while(0)
72 
73 
74 /////////////////////////////////////////////////////////////////////////////
75 // CPSGS_OSGProcessorBase
76 /////////////////////////////////////////////////////////////////////////////
77 
78 static thread_local bool m_UVLoop;
79 
80 NCBI_PARAM_DECL(int, OSG_PROCESSOR, ERROR_RATE);
81 NCBI_PARAM_DEF(int, OSG_PROCESSOR, ERROR_RATE, 0);
82 
84 {
85  static int error_rate = NCBI_PARAM_TYPE(OSG_PROCESSOR, ERROR_RATE)::GetDefault();
86  if ( error_rate > 0 ) {
87  static int error_counter = 0;
88  if ( ++error_counter >= error_rate ) {
89  error_counter = 0;
90  return true;
91  }
92  }
93  return false;
94 }
95 
96 
98  const CRef<COSGConnectionPool>& pool,
99  const shared_ptr<CPSGS_Request>& request,
100  const shared_ptr<CPSGS_Reply>& reply,
101  TProcessorPriority priority)
102  : m_Context(request->GetRequestContext()),
103  m_ConnectionPool(pool),
104  m_EnabledFlags(enabled_flags),
105  m_Start(psg_clock_t::now()),
106  m_Status(IPSGS_Processor::ePSGS_InProgress),
107  m_BackgroundProcesing(0),
108  m_NeedTrace(request->NeedTrace())
109 {
110  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CPSGS_OSGProcessorBase()");
111  m_Request = request;
112  m_Reply = reply;
113  m_Priority = priority;
116  _ASSERT(m_Reply);
117 }
118 
119 
122 
123 
125 {
126  return s_DebugLevel;
127 }
128 
129 
130 void SetDebugLevel(int level)
131 {
132  s_DebugLevel = level;
133 }
134 
135 
137 {
138  return Severity(s_DiagSeverity);
139 }
140 
141 
142 void SetDiagSeverity(EDiagSev severity)
143 {
144  s_DiagSeverity = severity;
145 }
146 
147 
148 IPSGS_Processor* CPSGS_OSGProcessorBase::CreateProcessor(shared_ptr<CPSGS_Request> request,
149  shared_ptr<CPSGS_Reply> reply,
150  TProcessorPriority priority) const
151 {
152  return nullptr;
153 }
154 
155 
157 {
158  StopAsyncThread();
159  CMutexGuard guard(m_StatusMutex);
160  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::~CPSGS_OSGProcessorBase() status: "<<State());
164  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::~CPSGS_OSGProcessorBase() return: "<<State());
165 }
166 
167 
169 {
170 }
171 
172 
174 {
175  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::WaitForCassandra() waiting: "<<State());
176  SendTrace("OSG: waiting for Cassandra results");
178  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::WaitForCassandra() waited: "<<State());
179 }
180 
181 
183 {
184  if ( NeedTrace() ) {
185  CMutexGuard guard(m_StatusMutex);
186  if ( !IsCanceled() ) {
187  m_Reply->SendTrace(str, m_Request->GetStartTimestamp());
188  }
189  }
190 }
191 
192 
193 void CPSGS_OSGProcessorBase::SendError(const string& msg)
194 {
195  m_Reply->PrepareProcessorMessage(m_Reply->GetItemId(), GetName(), msg,
198  eDiag_Error);
199 }
200 
201 
203 {
204  /*
205  if ( m_Thread ) {
206  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::~CPSGS_OSGProcessorBase() joining status: "<<m_Status);
207  m_Thread->join();
208  m_Thread.reset();
209  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::~CPSGS_OSGProcessorBase() joined status: "<<m_Status);
210  }
211  */
212 }
213 
214 
216 {
217  CRequestContextResetter context_resetter;
218  GetRequest()->SetRequestContext();
219  CUVLoopGuard uv_loop_guard(*this);
220 
221  SEND_TRACE("OSG: Process() called");
222 
223  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::Process(): "<<State());
224  if ( IsCanceled() ) {
225  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::Process() canceled: "<<State());
226  return;
227  }
228  if ( GetFetches().empty() ) {
229  CreateRequests();
230  if ( m_Context ) {
231  for ( auto& f : m_Fetches ) {
232  f->SetContext(*m_Context);
233  }
234  }
235  }
236  _ASSERT(!GetFetches().empty());
237  CallDoProcess();
238  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::Process() return: "<<State());
239 }
240 
241 
243 {
246  }
247  else {
249  }
250 }
251 
252 
254 {
255  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessSync() start: "<<State());
256  DoProcess(nullptr);
257  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessSync() return: "<<State());
258 }
259 
260 
262 {
263  CRequestContextResetter context_resetter;
264  GetRequest()->SetRequestContext();
265 
266  if ( !x_Valid(token) ) {
267  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessCallback() canceled: "<<State());
268  return;
269  }
270  _ASSERT(x_GetProcessor(token) == this);
271  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessCallback() start: "<<State());
272  DoProcess(token);
273  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessCallback() return: "<<State());
274 }
275 
276 
278 {
279  return TBGProcToken(new CBackgroundProcessingGuard(*this));
280 }
281 
282 
284 {
285 public:
287  : m_Token(token)
288  {
289  }
290 
291  virtual EStatus Execute(void) override
292  {
294  return eCompleted;
295  }
296 
297 protected:
299 };
300 
301 
303 {
304  SEND_TRACE("OSG: switching Process() to background thread");
305  try {
307  if ( !x_Valid(token) ) {
308  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessAsync(): canceled: "<<State());
309  return;
310  }
311  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessAsync(): starting: "<<State());
313  //thread(bind(&CPSGS_OSGProcessorBase::CallDoProcessCallback, this, token)).detach();
314  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessAsync() started: "<<State());
315  //SleepMilliSec(10);
316  }
317  catch (exception& exc) {
318  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessAsync() failed: "<<exc.what()<<": "<<State());
319  PSG_ERROR("OSG: DoProcessAsync: failed to create thread: "<<exc.what());
321  }
322 }
323 
324 
326 {
327  SEND_TRACE("OSG: DoProcess() started");
328  try {
329  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::DoProcess() start: "<<State());
330  for ( double retry_count = m_ConnectionPool->GetRetryCount(); retry_count > 0; ) {
331  if ( IsCanceled() ) {
332  SEND_TRACE("OSG: DoProcess() canceled 1");
333  return;
334  }
335 
336  // We need to distinguish different kinds of communication failures with different
337  // effect on retry logic.
338  // 1. stale/disconnected connection failure - there maybe multiple in active connection pool
339  // 2. multiple simultaneous failures from concurrent incoming requests
340  // 3. repeated failure of specific request at OSG server
341  // In the first case we shouldn't account all such failures in the same retry counter -
342  // it will overflow easily, and quite unnecessary.
343  // In the first case we shouldn't increase wait time too much -
344  // the failures should be treated as single failure for the sake of waiting before
345  // next connection attempt.
346  // In the third case we should make sure we abandon the failing request when retry limit
347  // is reached. It should be detected no matter of concurrent successful requests.
348 
349  bool last_attempt = retry_count <= 1;
350  COSGCaller caller;
351  try {
352  SEND_TRACE("OSG: DoProcess() allocating connection");
354  SEND_TRACE_FMT("OSG: DoProcess() allocated connection: "<<caller.GetConnectionID());
355  }
356  catch ( exception& exc ) {
357  if ( last_attempt ) {
358  PSG_ERROR("OSG: DoProcess() failed opening connection: "<<exc.what());
359  throw;
360  }
361  else {
362  // failed new connection - consume full retry
363  PSG_ERROR("OSG: DoProcess() retrying after failure opening connection: "<<exc.what());
364  retry_count -= 1;
365  continue;
366  }
367  }
368 
372  if ( IsCanceled() ) {
373  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::DoProcess() canceled 1: "<<State());
374  caller.ReleaseConnection();
375  return;
376  }
377  }
378 
379  if ( IsCanceled() ) {
380  SEND_TRACE_FMT("OSG: DoProcess() canceled 2, releasing connection: "<<caller.GetConnectionID());
381  caller.ReleaseConnection();
382  return;
383  }
384 
385  try {
386  caller.SendRequest(*this);
387  caller.WaitForReplies(*this);
388  }
389  catch ( exception& exc ) {
390  if ( last_attempt ) {
391  PSG_ERROR("OSG: DoProcess() failed receiving replies: "<<exc.what());
392  throw;
393  }
394  else {
395  // this may be failure of old connection
396  PSG_ERROR("OSG: retrying after failure receiving replies: "<<exc.what());
397  if ( caller.GetRequestPacket().Get().front()->GetSerial_number() <= 1 ) {
398  // new connection - consume full retry
399  retry_count -= 1;
400  }
401  else {
402  // old connection from pool - consume part of retry
403  retry_count -= 1./m_ConnectionPool->GetMaxConnectionCount();
404  }
405  continue;
406  }
407  }
408 
409  // successful
410  break;
411  }
412 
413  if ( IsCanceled() ) {
414  SEND_TRACE("OSG: DoProcess() canceled 3");
415  return;
416  }
417  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::DoProcess() got replies: "<<State());
418 
421  if ( IsCanceled() ) {
422  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::DoProcess() canceled: "<<State());
423  return;
424  }
425  }
426 
427  CallDoProcessReplies(token);
428  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::Process() done: "<<State());
429  }
430  catch ( exception& exc ) {
431  PSG_ERROR("OSG: DoProcess() failed: "<<exc.what());
432  SendError("Exception when handling a get request: " + string(exc.what()));
434  }
435 }
436 
437 
439 {
440  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::DoProcessReplies() start: "<<State());
441  SEND_TRACE("OSG: processing replies");
442  ProcessReplies();
443  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::DoProcessReplies() return: "<<State());
445 }
446 
447 
449 {
450  if ( x_Valid(token) ) {
452  }
453  else {
455  }
456 }
457 
458 
460 {
461  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessRepliesSync() start: "<<State());
463  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessRepliesSync() return: "<<State());
464 }
465 
466 
468 {
469  CRequestContextResetter context_resetter;
470  GetRequest()->SetRequestContext();
471  CUVLoopGuard uv_loop_guard(*this);
472 
473  if ( !x_Valid(token) ) {
474  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessRepliesCallback() canceled: "<<State());
475  return;
476  }
477  _ASSERT(x_GetProcessor(token) == this);
478  try {
479  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessRepliesCallback() signal start: "<<State());
480  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessRepliesCallback() start: "<<State());
482  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessRepliesCallback() return: "<<State());
483  }
484  catch ( exception& exc ) {
485  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessRepliesCallback() exc: "<<exc.what()<<": "<<State());
486  PSG_ERROR("OSG: ProcessReplies() failed: "<<exc.what());
488  }
489 }
490 
491 
493 {
494  TBGProcToken token = x_BGProcTokenFromVoidP(data);
496 }
497 
498 
500 {
501  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessRepliesAsync() start: "<<State());
502  SEND_TRACE("OSG: scheduling ProcessReplies() to UV loop");
503  if ( !x_Valid(token) ) {
504  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessRepliesAsync() canceled: "<<State());
505  return;
506  }
508  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::CallDoProcessRepliesAsync() return: "<<State());
509  //SleepMilliSec(10);
510 }
511 
512 
514 {
515  CBackgroundProcessingGuard bg_guard(*this);
516  CMutexGuard guard(m_StatusMutex);
517  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::Cancel(): before: "<<State());
519  FinalizeResult();
520  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::Cancel(): after: "<<State());
521 }
522 
523 
525 {
526  return out << "ST=" << m_ProcessorPtr->m_Status
528  << " UV=" << m_UVLoop
529  << " FS=" << m_ProcessorPtr->m_FinishSignalled;
530 }
531 
532 
534 {
535 }
536 
537 
539 {
540 }
541 
542 
544 {
545 }
546 
547 
549 {
550 }
551 
552 
554 {
555  CRef<CID2_Request> req = req0;
556  if ( 1 ) {
557  // set hops
558  auto hops = GetRequest()->GetHops() + 1;
559  CRef<CID2_Param> param(new CID2_Param);
560  param->SetName("hops");
561  param->SetValue().push_back(to_string(hops));
562  req->SetParams().Set().push_back(param);
563  }
564  m_Fetches.push_back(Ref(new COSGFetch(req)));
565 }
566 
567 
569 {
570  //tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::GetStatus(): "<<State());
571  return m_Status;
572 }
573 
574 
576 {
577  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::SetFinalStatus(): status: "<<State());
578  _ASSERT(m_Status == ePSGS_InProgress || status == m_Status ||
579  m_Status == ePSGS_Canceled || status == ePSGS_Canceled);
580  m_Status = status;
581  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::SetFinalStatus(): return: "<<State());
582 }
583 
584 
586 {
587  CMutexGuard guard(m_StatusMutex);
588  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::FinalizeResult(): state: "<<State());
590  x_SignalFinishProcessing("FinalizeResult");
591 }
592 
593 
595 {
596  SetFinalStatus(status);
597  FinalizeResult();
598 }
599 
600 
602 {
603  CMutexGuard guard(m_StatusMutex);
604  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::SignalStartOfBackgroundProcessing(): "<<State());
606  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::SignalStartOfBackgroundProcessing(): return cancel: "<<State());
607  return false;
608  }
610  return true;
611 }
612 
613 
615 {
616  try {
618  if ( (m_UVLoop && m_BackgroundProcesing == 0) || !IsUVThreadAssigned() ) {
619  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::"<<from<<"()::x_SignalFinishProcessing(): signal: "<<State());
621  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::"<<from<<"()::x_SignalFinishProcessing(): return: "<<State());
623  }
624  else {
625  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::"<<from<<"()::x_SignalFinishProcessing(): sending to uv-loop "<<State());
627  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::"<<from<<"()::x_SignalFinishProcessing(): sent to uv-loop "<<State());
628  }
629  }
630  }
631  catch ( exception& exc ) {
632  ERR_POST("CPSGS_OSGProcessorBase("<<this<<")::"<<from<<"()::x_SignalFinishProcessing(): exception: "<<exc.what());
633  }
634 }
635 
636 
638 {
639  CUVLoopGuard uv_loop_guard(*static_cast<CPSGS_OSGProcessorBase*>(data));
640 }
641 
642 
644 {
645  CMutexGuard guard(m_StatusMutex);
646  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::SignalEndOfBackgroundProcessing(): "<<State());
649  x_SignalFinishProcessing("SignalEndOfBackgroundProcessing");
650 }
651 
652 
654 {
655  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::SignalStartOfUVLoop(): "<<State());
656  _ASSERT(!m_UVLoop);
657  m_UVLoop = true;
658  CMutexGuard guard(m_StatusMutex);
659  x_SignalFinishProcessing("SignalStartOfUVLoop");
660 }
661 
662 
664 {
665  tLOG_POST("CPSGS_OSGProcessorBase("<<this<<")::SignalEndOfUVLoop(): "<<State());
666  _ASSERT(m_UVLoop);
667  m_UVLoop = false;
668 }
669 
670 
672  EPSGOperationStatus status,
673  size_t blob_size)
674 {
676  GetTiming().Register(this, operation, status, m_Start, blob_size);
677 }
678 
679 
681 {
683 }
684 
685 
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
const string kCassandraProcessorEvent
CID2_Param –.
Definition: ID2_Param.hpp:66
CID2_Reply –.
Definition: ID2_Reply.hpp:66
virtual EStatus Execute(void) override
Do the actual job.
CPSGS_OSGProcessorBase::TBGProcToken m_Token
COSGCallDoProcessTask(CPSGS_OSGProcessorBase::TBGProcToken token)
void WaitForReplies(CPSGS_OSGProcessorBase &processor)
Definition: osg_caller.cpp:193
void ReleaseConnection()
Definition: osg_caller.cpp:157
size_t GetConnectionID() const
Definition: osg_caller.cpp:142
void SendRequest(CPSGS_OSGProcessorBase &processor)
Definition: osg_caller.cpp:167
const CID2_Request_Packet & GetRequestPacket() const
Definition: osg_caller.hpp:78
void AllocateConnection(const CRef< COSGConnectionPool > &connection_pool)
Definition: osg_caller.cpp:148
bool GetWaitBeforeOSG() const
size_t GetRetryCount() const
size_t GetMaxConnectionCount() const
bool GetAsyncProcessing() const
void Queue(CRef< CThreadPool_Task > task)
const CPSGS_OSGProcessorBase * m_ProcessorPtr
CNcbiOstream & Print(CNcbiOstream &out) const
void AddRequest(const CRef< CID2_Request > &req)
virtual void NotifyOSGCallReply(const CID2_Reply &reply)
void x_SignalFinishProcessing(const char *from)
AutoPtr< CBackgroundProcessingGuard > TBGProcToken
void CallDoProcessCallback(TBGProcToken token)
static void s_CallFinalizeUvCallback(void *data)
COSGStateReporter State() const
void DoProcess(TBGProcToken token)
CPSGS_OSGProcessorBase(TEnabledFlags enabled_flags, const CRef< COSGConnectionPool > &pool, const shared_ptr< CPSGS_Request > &request, const shared_ptr< CPSGS_Reply > &reply, TProcessorPriority priority)
void SendTrace(const string &str)
virtual EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
virtual void SendError(const string &msg)
virtual IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
void CallDoProcessRepliesCallback(TBGProcToken token)
CRef< CRequestContext > m_Context
virtual void CreateRequests()=0
static void s_CallDoProcessRepliesUvCallback(void *proc)
const TFetches & GetFetches() const
virtual void Cancel(void) override
The infrastructure request to cancel processing.
static bool x_Valid(const TBGProcToken &token)
void x_RegisterTimingNotFound(EPSGOperation operation)
void x_RegisterTiming(EPSGOperation operation, EPSGOperationStatus status, size_t blob_size)
void SetFinalStatus(EPSGS_Status status)
TBGProcToken x_CreateBGProcToken()
void CallDoProcessRepliesAsync(TBGProcToken token)
static CPSGS_OSGProcessorBase * x_GetProcessor(const TBGProcToken &token)
void CallDoProcessReplies(TBGProcToken token)
friend class CBackgroundProcessingGuard
static void * x_BGProcTokenToVoidP(TBGProcToken &token)
static TBGProcToken x_BGProcTokenFromVoidP(void *ptr)
virtual void WaitForOtherProcessors()
virtual void ProcessReplies()=0
virtual void Process(void) override
Main processing function.
CRef< COSGConnectionPool > m_ConnectionPool
static CPubseqGatewayApp * GetInstance(void)
Abstract class for representing single task executing in pool of threads To use this class in applica...
Definition: thread_pool.hpp:76
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
virtual string GetName(void) const =0
Tells the processor name (used in logging and tracing)
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
void PostponeInvoke(CPSGS_UvLoopBinder::TProcessorCB cb, void *user_data)
The provided callback will be called from the libuv loop assigned to the processor.
void SignalFinishProcessing(void)
A processor should call this method when it decides that there is nothing else to be done.
shared_ptr< CPSGS_Request > m_Request
TProcessorPriority m_Priority
Severity –.
Definition: ncbidiag.hpp:833
std::ofstream out("events_result.xml")
main entry point for tests
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Trace
Trace message.
Definition: ncbidiag.hpp:657
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
CRef< C > Ref(C *object)
Helper functions to get CRef<> and CConstRef<> objects.
Definition: ncbiobj.hpp:2015
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
Definition: ncbi_param.hpp:149
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
EStatus
Status of the task.
Definition: thread_pool.hpp:79
@ eCompleted
executed successfully
Definition: thread_pool.hpp:83
operation
Bit operations.
Definition: bmconst.h:191
void SetParams(TParams &value)
Assign a value to Params data member.
const Tdata & Get(void) const
Get the member data.
constexpr bool empty(list< Ts... >) noexcept
double f(double x_, const double &y_)
Definition: njn_root.hpp:188
NCBI_PARAM_DEF(int, OSG_PROCESSOR, ERROR_RATE, 0)
void SetDiagSeverity(EDiagSev severity)
static EDiagSev s_DiagSeverity
void SetDebugLevel(int level)
static int s_DebugLevel
Severity GetDiagSeverity()
int GetDebugLevel()
#define SEND_TRACE_FMT(m)
#define SEND_TRACE(str)
END_NCBI_NAMESPACE
#define tLOG_POST(m)
BEGIN_NCBI_NAMESPACE
NCBI_PARAM_DECL(int, OSG_PROCESSOR, ERROR_RATE)
BEGIN_NAMESPACE(psg)
static thread_local bool m_UVLoop
END_NAMESPACE(osg)
@ eDebugLevel_default
#define PSG_ERROR(message)
chrono::steady_clock psg_clock_t
int TProcessorPriority
@ ePSGS_UnknownError
static const char * str(char *buf, int n)
Definition: stats.c:84
#define _ASSERT
Pool of generic task-executing threads.
EPSGOperationStatus
Definition: timing.hpp:59
@ eOpStatusNotFound
Definition: timing.hpp:61
EPSGOperation
Definition: timing.hpp:64
Modified on Tue Nov 28 02:22:50 2023 by modify_doxy.py rev. 669887