NCBI C++ ToolKit
wgs_processor.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef WGS_PROCESSOR__HPP
2 #define WGS_PROCESSOR__HPP
3 
4 /* $Id: wgs_processor.hpp 102541 2024-05-28 17:12:13Z grichenk $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Aleksey Grichenko, Eugene Vasilchenko
30  *
31  * File Description: processor for data from WGS
32  *
33  */
34 
35 #include "ipsgs_processor.hpp"
36 #include "psgs_request.hpp"
37 #include "psgs_reply.hpp"
38 #include "timing.hpp"
39 #include "wgs_client.hpp"
41 
42 
44 
45 class CThreadPool;
46 
48 class CID2_Blob_Id;
49 class CID2_Reply_Data;
50 class CWGSResolver;
51 class CSeq_id;
52 class CAsnBinData;
54 
57 
58 
59 const string kWGSProcessorEvent = "WGS";
60 
62 {
63 public:
64  CPSGS_WGSProcessor(void);
65  ~CPSGS_WGSProcessor(void) override;
66 
67  virtual bool CanProcess(shared_ptr<CPSGS_Request> request,
68  shared_ptr<CPSGS_Reply> reply) const override;
69  IPSGS_Processor* CreateProcessor(shared_ptr<CPSGS_Request> request,
70  shared_ptr<CPSGS_Reply> reply,
71  TProcessorPriority priority) const override;
72 
73  void Process(void) override;
74  void Cancel(void) override;
75  EPSGS_Status GetStatus(void) override;
76  string GetName(void) const override;
77  string GetGroupName(void) const override;
78 
79  void ResolveSeqId(void);
80  void OnResolvedSeqId(void);
81 
82  void GetBlobBySeqId(void);
83  void OnGotBlobBySeqId(void);
84 
85  void GetBlobByBlobId(void);
86  void OnGotBlobByBlobId(void);
87 
88  void GetChunk(void);
89  void OnGotChunk(void);
90 
91 public:
92  static string GetPSGId2Info(const CID2_Blob_Id& tse_id,
93  CWGSClient::TID2SplitVersion split_version);
94 
95 private:
96  CPSGS_WGSProcessor(const shared_ptr<CWGSClient>& client,
97  shared_ptr<ncbi::CThreadPool> thread_pool,
98  shared_ptr<CPSGS_Request> request,
99  shared_ptr<CPSGS_Reply> reply,
100  TProcessorPriority priority);
101 
102  void x_LoadConfig(void);
103  bool x_IsEnabled(CPSGS_Request& request) const;
104  void x_InitClient(void) const;
105 
106  void x_ProcessResolveRequest(void);
107  void x_ProcessBlobBySeqIdRequest(void);
109  void x_ProcessTSEChunkRequest(void);
110 
113  typedef int TID2ChunkId;
114  typedef vector<string> TBlobIds;
115 
118  EPSGOperationStatus status,
119  size_t blob_size);
122  const objects::CID2_Reply_Data& data);
125  void x_SendResult(const string& data_to_send, EOutputFormat output_format);
126  void x_SendBioseqInfo(void);
127  void x_SendBlobProps(const string& psg_blob_id, CBlobRecord& blob_props);
128  void x_SendBlobForbidden(const string& psg_blob_id);
129  void x_SendBlobData(const string& psg_blob_id, const objects::CID2_Reply_Data& data);
130  void x_SendChunkBlobProps(const string& id2_info,
131  TID2ChunkId chunk_id,
132  CBlobRecord& blob_props);
133  void x_SendChunkBlobData(const string& id2_info,
134  TID2ChunkId chunk_id,
135  const objects::CID2_Reply_Data& data);
136  void x_SendSplitInfo(void);
137  void x_SendMainEntry(void);
138  void x_SendForbidden(void);
139  void x_SendBlob(void);
140  void x_SendChunk(void);
141  void x_WriteData(objects::CID2_Reply_Data& data,
142  const objects::CAsnBinData& obj,
143  bool compress) const;
144 
145  static void x_SendError(shared_ptr<CPSGS_Reply> reply,
146  const string& msg);
147  static void x_SendError(shared_ptr<CPSGS_Reply> reply,
148  const string& msg, const exception& exc);
149  void x_SendError(const string& msg);
150  void x_SendError(const string& msg, const exception& exc);
151 
152  template<class C> static int x_GetBlobState(const C& obj) {
153  return obj.IsSetBlob_state() ? obj.GetBlob_state() : 0;
154  }
155 
156  void x_UnlockRequest(void);
157  void x_WaitForOtherProcessors(void);
158  void x_Finish(EPSGS_Status status);
159  bool x_IsCanceled();
161 
162  bool x_CheckExcludedCache(void);
163  void x_RemoveFromExcludedCache(void);
164  void x_SetExcludedCacheCompleted(void);
165 
167  shared_ptr<SWGSProcessor_Config> m_Config;
168  mutable shared_ptr<CWGSClient> m_Client;
172  CRef<objects::CSeq_id> m_SeqId; // requested seq-id
173  string m_PSGBlobId; // requested blob-id
174  string m_Id2Info; // requested id2-info
175  int64_t m_ChunkId; // requested chunk-id
177  string m_ClientId;
178  unsigned long m_ResendTimeoutMks = 0;
179  unsigned long m_SentMksAgo = 0;
181  shared_ptr<SWGSData> m_WGSData;
185  shared_ptr<ncbi::CThreadPool> m_ThreadPool;
186 };
187 
188 
192 
193 #endif // CDD_PROCESSOR__HPP
CFastMutex –.
Definition: ncbimtx.hpp:667
CID2_Blob_Id –.
Definition: ID2_Blob_Id.hpp:66
void x_SendResult(const string &data_to_send, EOutputFormat output_format)
void x_SendChunkBlobData(const string &id2_info, TID2ChunkId chunk_id, const objects::CID2_Reply_Data &data)
void x_ProcessBlobBySatSatKeyRequest(void)
void OnGotBlobBySeqId(void)
void GetBlobBySeqId(void)
void x_ProcessResolveRequest(void)
vector< string > TBlobIds
void x_InitClient(void) const
static void x_SendError(shared_ptr< CPSGS_Reply > reply, const string &msg)
void x_SendBlobProps(const string &psg_blob_id, CBlobRecord &blob_props)
EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
EOutputFormat x_GetOutputFormat(void)
string GetName(void) const override
Tells the processor name (used in logging and tracing)
void Process(void) override
Main processing function.
bool x_CheckExcludedCache(void)
void x_ProcessTSEChunkRequest(void)
void x_Finish(EPSGS_Status status)
string GetGroupName(void) const override
Tells the processor group name.
void x_SendBlobData(const string &psg_blob_id, const objects::CID2_Reply_Data &data)
CRef< objects::CSeq_id > m_SeqId
unsigned long m_ResendTimeoutMks
void x_WriteData(objects::CID2_Reply_Data &data, const objects::CAsnBinData &obj, bool compress) const
void OnGotBlobByBlobId(void)
void x_UnlockRequest(void)
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Tells if processor can process the given request.
SPSGS_ResolveRequest::EPSGS_OutputFormat EOutputFormat
static int x_GetBlobState(const C &obj)
SPSGS_ResolveRequest::TPSGS_BioseqIncludeData TBioseqInfoFlags
shared_ptr< CWGSClient > m_Client
void x_RegisterTimingNotFound(EPSGOperation operation)
shared_ptr< ncbi::CThreadPool > m_ThreadPool
void OnResolvedSeqId(void)
static string GetPSGId2Info(const CID2_Blob_Id &tse_id, CWGSClient::TID2SplitVersion split_version)
shared_ptr< SWGSData > m_WGSData
void GetBlobByBlobId(void)
void x_ProcessBlobBySeqIdRequest(void)
void x_SendSplitInfo(void)
unsigned long m_SentMksAgo
void x_SendForbidden(void)
psg_time_point_t m_Start
~CPSGS_WGSProcessor(void) override
IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
void x_SendBlobForbidden(const string &psg_blob_id)
void Cancel(void) override
The infrastructure request to cancel processing.
void x_SetExcludedCacheCompleted(void)
void x_RegisterTiming(psg_time_point_t start, EPSGOperation operation, EPSGOperationStatus status, size_t blob_size)
void x_WaitForOtherProcessors(void)
void x_RemoveFromExcludedCache(void)
void x_SendChunkBlobProps(const string &id2_info, TID2ChunkId chunk_id, CBlobRecord &blob_props)
void x_SendMainEntry(void)
void x_SendBioseqInfo(void)
shared_ptr< SWGSProcessor_Config > m_Config
EOutputFormat m_OutputFormat
void x_RegisterTimingFound(psg_time_point_t start, EPSGOperation operation, const objects::CID2_Reply_Data &data)
bool x_IsEnabled(CPSGS_Request &request) const
EPSGS_Status m_Status
Main class implementing functionality of pool of threads.
int TID2SplitVersion
Definition: wgs_client.hpp:192
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
EPSGS_Status
The GetStatus() method returns a processor current status.
#define C(s)
Definition: common.h:231
char data[12]
Definition: iconv.c:80
Int8 int64_t
operation
Bit operations.
Definition: bmconst.h:191
int TProcessorPriority
psg_clock_t::time_point psg_time_point_t
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static CNamedPipeClient * client
EPSGOperationStatus
Definition: timing.hpp:60
EPSGOperation
Definition: timing.hpp:65
const string kWGSProcessorEvent
BEGIN_NAMESPACE(objects)
END_NCBI_NAMESPACE
END_NAMESPACE(objects)
BEGIN_NCBI_NAMESPACE
#define compress
Definition: zconf_cf.h:39
Modified on Wed Sep 04 15:05:36 2024 by modify_doxy.py rev. 669887