NCBI C++ ToolKit
osg_processor_base.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef PSGS_OSGPROCESSORBASE__HPP
2 #define PSGS_OSGPROCESSORBASE__HPP
3 
4 /* $Id: osg_processor_base.hpp 99336 2023-03-13 14:22:34Z vasilche $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Eugene Vasilchenko
30  *
31  * File Description: base class for processors which may generate os_gateway
32  * fetches
33  *
34  */
35 
37 #include "psgs_request.hpp"
38 #include "psgs_reply.hpp"
39 #include "ipsgs_processor.hpp"
40 #include "timing.hpp"
41 #include <thread>
42 
44 
45 class CRequestContext;
46 
48 
49 class CID2_Request;
50 class CID2_Reply;
51 
53 
56 
57 const string kOSGProcessorGroupName = "OSG";
58 
59 class COSGFetch;
60 class COSGConnectionPool;
61 class COSGConnection;
64 
74 };
75 
76 int GetDebugLevel();
77 void SetDebugLevel(int debug_level);
78 
80 void SetDiagSeverity(EDiagSev severity);
81 
83 
84 
86 {
87 public:
88  explicit COSGStateReporter(const CPSGS_OSGProcessorBase* processor)
89  : m_ProcessorPtr(processor)
90  {
91  }
93 private:
95 };
97 {
98  return state.Print(out);
99 }
100 
101 // actual OSG processor base class for communication with OSG
103 {
104 public:
106  fEnabledWGS = 1<<0,
107  fEnabledSNP = 1<<1,
108  fEnabledCDD = 1<<2,
111  };
112  typedef int TEnabledFlags;
113 
115  const CRef<COSGConnectionPool>& pool,
116  const shared_ptr<CPSGS_Request>& request,
117  const shared_ptr<CPSGS_Reply>& reply,
118  TProcessorPriority priority);
119  virtual ~CPSGS_OSGProcessorBase();
120 
121  virtual IPSGS_Processor* CreateProcessor(shared_ptr<CPSGS_Request> request,
122  shared_ptr<CPSGS_Reply> reply,
123  TProcessorPriority priority) const override;
124  virtual void Process(void) override;
125  virtual void Cancel(void) override;
126  virtual EPSGS_Status GetStatus(void) override;
127 
128  virtual void WaitForOtherProcessors();
129 
130  void WaitForCassandra();
131 
132  bool NeedTrace() const
133  {
134  return m_NeedTrace;
135  }
136  void SendTrace(const string& str);
137  virtual void SendError(const string& msg);
138 
139  bool IsCanceled() const
140  {
141  return m_Status == ePSGS_Canceled;
142  }
143 
145  {
146  return m_EnabledFlags;
147  }
148 
149  // notify processor about communication events
150  virtual void NotifyOSGCallStart();
151  virtual void NotifyOSGCallReply(const CID2_Reply& reply);
152  virtual void NotifyOSGCallEnd();
153 
154  typedef vector<CRef<COSGFetch>> TFetches;
155  const TFetches& GetFetches() const
156  {
157  return m_Fetches;
158  }
159 
160 protected:
162  return m_ConnectionPool.GetNCObject();
163  }
164 
165  void StopAsyncThread();
166  void SetFinalStatus(EPSGS_Status status);
167  void FinalizeResult(EPSGS_Status status);
168  void FinalizeResult();
169  // if canceled return false
170  // otherwise increment background processing counter and return true
172  // decrement background processing counter and notify dispatcher if zero
174 
175  void SignalStartOfUVLoop();
176  void SignalEndOfUVLoop();
177 
179  {
180  public:
182  : m_Processor(processor)
183  {
184  processor.SignalStartOfUVLoop();
185  }
187  {
189  }
190  CUVLoopGuard(const CUVLoopGuard& guard) = delete;
191  CUVLoopGuard& operator=(const CUVLoopGuard& guard) = delete;
192  private:
194  };
195  friend class CUVLoopGuard;
196 
197  class CBackgroundProcessingGuard : public CObject
198  {
199  public:
201  : m_ProcessorPtr(processor.SignalStartOfBackgroundProcessing()? &processor: 0)
202  {
203  }
205  {
206  if ( m_ProcessorPtr ) {
208  }
209  }
210 
213 
215  {
216  return m_ProcessorPtr;
217  }
218  private:
220  };
224  static bool x_Valid(const TBGProcToken& token)
225  {
226  return token && token->GetGuardedProcessor();
227  }
229  {
230  return token->GetGuardedProcessor();
231  }
232  static void* x_BGProcTokenToVoidP(TBGProcToken& token)
233  {
234  return token.release();
235  }
237  {
238  return TBGProcToken(static_cast<CBackgroundProcessingGuard*>(ptr));
239  }
240  void x_SignalFinishProcessing(const char* from);
241 
242  void CallDoProcess();
243  void CallDoProcessSync();
244  void CallDoProcessAsync();
246  void DoProcess(TBGProcToken token);
247 
252  static void s_CallDoProcessRepliesUvCallback(void* proc);
253  void DoProcessReplies();
254 
255  friend class COSGCallDoProcessTask;
256 
257  static void s_CallFinalizeUvCallback(void *data);
258 
259  friend class COSGStateReporter;
261  {
262  return COSGStateReporter(this);
263  }
264 
265  void AddRequest(const CRef<CID2_Request>& req);
266 
267  // create ID2 requests for the PSG request
268  virtual void CreateRequests() = 0;
269  // process ID2 replies and issue PSG reply
270  virtual void ProcessReplies() = 0;
271  // reset processing state in case of ID2 communication failure before next attempt
272  virtual void ResetReplies();
273 
275  EPSGOperationStatus status,
276  size_t blob_size);
278 
279  static bool s_SimulateError();
280 
281 private:
292 };
293 
294 
298 
299 #endif // PSGS_OSGPROCESSORBASE__HPP
CID2_Reply –.
Definition: ID2_Reply.hpp:66
CID2_Request –.
Definition: ID2_Request.hpp:66
CMutex –.
Definition: ncbimtx.hpp:749
const CPSGS_OSGProcessorBase * m_ProcessorPtr
CNcbiOstream & Print(CNcbiOstream &out) const
COSGStateReporter(const CPSGS_OSGProcessorBase *processor)
CBackgroundProcessingGuard & operator=(const CBackgroundProcessingGuard &guard)=delete
CBackgroundProcessingGuard(const CBackgroundProcessingGuard &guard)=delete
CBackgroundProcessingGuard(CPSGS_OSGProcessorBase &processor)
CUVLoopGuard(const CUVLoopGuard &guard)=delete
CUVLoopGuard & operator=(const CUVLoopGuard &guard)=delete
CUVLoopGuard(CPSGS_OSGProcessorBase &processor)
void AddRequest(const CRef< CID2_Request > &req)
CRef< COSGConnection > m_Connection
virtual void NotifyOSGCallReply(const CID2_Reply &reply)
void x_SignalFinishProcessing(const char *from)
TEnabledFlags GetEnabledFlags() const
AutoPtr< CBackgroundProcessingGuard > TBGProcToken
void CallDoProcessCallback(TBGProcToken token)
static void s_CallFinalizeUvCallback(void *data)
COSGStateReporter State() const
void DoProcess(TBGProcToken token)
CPSGS_OSGProcessorBase(TEnabledFlags enabled_flags, const CRef< COSGConnectionPool > &pool, const shared_ptr< CPSGS_Request > &request, const shared_ptr< CPSGS_Reply > &reply, TProcessorPriority priority)
void SendTrace(const string &str)
virtual EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
virtual void SendError(const string &msg)
virtual IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
void CallDoProcessRepliesCallback(TBGProcToken token)
CRef< CRequestContext > m_Context
virtual void CreateRequests()=0
static void s_CallDoProcessRepliesUvCallback(void *proc)
const TFetches & GetFetches() const
virtual void Cancel(void) override
The infrastructure request to cancel processing.
static bool x_Valid(const TBGProcToken &token)
void x_RegisterTimingNotFound(EPSGOperation operation)
void x_RegisterTiming(EPSGOperation operation, EPSGOperationStatus status, size_t blob_size)
void SetFinalStatus(EPSGS_Status status)
TBGProcToken x_CreateBGProcToken()
COSGConnectionPool & GetConnectionPool() const
void CallDoProcessRepliesAsync(TBGProcToken token)
static CPSGS_OSGProcessorBase * x_GetProcessor(const TBGProcToken &token)
void CallDoProcessReplies(TBGProcToken token)
static void * x_BGProcTokenToVoidP(TBGProcToken &token)
static TBGProcToken x_BGProcTokenFromVoidP(void *ptr)
virtual void WaitForOtherProcessors()
virtual void ProcessReplies()=0
virtual void Process(void) override
Main processing function.
CRef< COSGConnectionPool > m_ConnectionPool
vector< CRef< COSGFetch > > TFetches
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
EPSGS_Status
The GetStatus() method returns a processor current status.
Severity –.
Definition: ncbidiag.hpp:833
std::ofstream out("events_result.xml")
main entry point for tests
static const char * str(char *buf, int n)
Definition: stats.c:84
static const char * proc
Definition: stats.c:21
char data[12]
Definition: iconv.c:80
element_type * release(void)
Release will release ownership of pointer to caller.
Definition: ncbimisc.hpp:472
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
TObjectType & GetNCObject(void) const
Get object.
Definition: ncbiobj.hpp:1187
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
operation
Bit operations.
Definition: bmconst.h:191
USING_SCOPE(objects)
void SetDebugLevel(int debug_level)
void SetDiagSeverity(EDiagSev severity)
BEGIN_NAMESPACE(objects)
@ eDebug_raw
@ eDebug_none
@ eDebug_open
@ eDebugLevel_default
@ eDebug_asn
@ eDebug_exchange
@ eDebug_error
@ eDebug_blob
const string kOSGProcessorGroupName
Severity GetDiagSeverity()
int GetDebugLevel()
END_NCBI_NAMESPACE
CNcbiOstream & operator<<(CNcbiOstream &out, const COSGStateReporter &state)
END_NAMESPACE(objects)
BEGIN_NCBI_NAMESPACE
int TProcessorPriority
psg_clock_t::time_point psg_time_point_t
Defines CRequestStatus class for NCBI C++ diagnostic API.
EPSGOperationStatus
Definition: timing.hpp:60
EPSGOperation
Definition: timing.hpp:65
Modified on Sun Apr 14 05:25:52 2024 by modify_doxy.py rev. 669887