NCBI C++ ToolKit
async_bioseq_info_base.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: async_bioseq_info_base.cpp 100131 2023-06-22 15:40:36Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: base class for processors which need to retrieve bioseq
29  * info asynchronously
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "pubseq_gateway.hpp"
36 #include "insdc_utils.hpp"
40 #include "psgs_seq_id_utils.hpp"
41 
42 using namespace std::placeholders;
43 
44 
46 {}
47 
48 
50  shared_ptr<CPSGS_Request> request,
51  shared_ptr<CPSGS_Reply> reply,
52  TSeqIdResolutionFinishedCB finished_cb,
53  TSeqIdResolutionErrorCB error_cb) :
54  m_FinishedCB(finished_cb),
55  m_ErrorCB(error_cb),
56  m_NeedTrace(request->NeedTrace()),
57  m_Fetch(nullptr),
58  m_NoSeqIdTypeFetch(nullptr),
59  m_WithSeqIdType(true)
60 {}
61 
62 
64 {}
65 
66 
67 void
69 {
70  m_BioseqResolution = move(bioseq_resolution);
71  x_MakeRequest();
72 }
73 
74 
75 void
77 {
78  unique_ptr<CCassBioseqInfoFetch> details;
79  details.reset(new CCassBioseqInfoFetch());
80 
81  string accession = StripTrailingVerticalBars(
83  CBioseqInfoFetchRequest bioseq_info_request;
84  bioseq_info_request.SetAccession(accession);
85 
88 
89  if (version != -1)
90  bioseq_info_request.SetVersion(version);
91  if (m_WithSeqIdType) {
92  auto seq_id_type = m_BioseqResolution.GetBioseqInfo().GetSeqIdType();
93  if (seq_id_type != -1)
94  bioseq_info_request.SetSeqIdType(seq_id_type);
95  }
96  if (gi != -1)
97  bioseq_info_request.SetGI(gi);
98 
99  auto sat_info_entry = CPubseqGatewayApp::GetInstance()->GetBioseqKeyspace();
100  CCassBioseqInfoTaskFetch * fetch_task =
102  sat_info_entry.connection,
103  sat_info_entry.keyspace,
104  bioseq_info_request,
105  nullptr, nullptr);
106  details->SetLoader(fetch_task);
107 
108  if (m_WithSeqIdType)
109  fetch_task->SetConsumeCallback(
111  this, _1));
112  else
113  fetch_task->SetConsumeCallback(
115  this, _1));
116 
117  fetch_task->SetErrorCB(
119  this, _1, _2, _3, _4));
120  fetch_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
121 
122 
123  m_BioseqRequestStart = psg_clock_t::now();
124  if (m_WithSeqIdType) {
125  m_Fetch = details.release();
126  m_FetchDetails.push_back(unique_ptr<CCassFetch>(m_Fetch));
127  } else {
128  m_NoSeqIdTypeFetch = details.release();
129  m_FetchDetails.push_back(unique_ptr<CCassFetch>(m_NoSeqIdTypeFetch));
130  }
131 
132  if (m_NeedTrace) {
133  if (m_WithSeqIdType)
134  m_Reply->SendTrace(
135  "Cassandra request: " +
136  ToJsonString(bioseq_info_request),
137  m_Request->GetStartTimestamp());
138  else
139  m_Reply->SendTrace(
140  "Cassandra request for INSDC types: " +
141  ToJsonString(bioseq_info_request),
142  m_Request->GetStartTimestamp());
143  }
144 
145  fetch_task->Wait();
146 }
147 
148 
149 void
150 CPSGS_AsyncBioseqInfoBase::x_OnBioseqInfo(vector<CBioseqInfoRecord>&& records)
151 {
152  auto app = CPubseqGatewayApp::GetInstance();
153 
156 
157  if (m_NeedTrace) {
158  string msg = to_string(records.size()) + " hit(s)";
159  for (const auto & item : records) {
160  msg += "\n" +
162  }
163  m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
164  }
165 
166  if (records.empty()) {
167  // Nothing was found
168  app->GetTiming().Register(this, eLookupCassBioseqInfo,
170  app->GetCounters().Increment(this, CPSGSCounters::ePSGS_BioseqInfoNotFound);
171 
173  // Second try without seq_id_type
174  m_WithSeqIdType = false;
175  x_MakeRequest();
176  return;
177  }
178 
179  if (m_NeedTrace)
180  m_Reply->SendTrace("Report not found",
181  m_Request->GetStartTimestamp());
182 
184 
185  // Empty message means for the upper level that it is a generic case
186  // when a seq_id could not be resolved.
189  return;
190  }
191 
192  if (records.size() == 1) {
193  // Exactly one match; no complications
194  if (m_NeedTrace) {
195  m_Reply->SendTrace("Report found", m_Request->GetStartTimestamp());
196  }
197 
199 
200  app->GetTiming().Register(this, eLookupCassBioseqInfo, eOpStatusFound,
202  app->GetCounters().Increment(this, CPSGSCounters::ePSGS_BioseqInfoFoundOne);
203  m_BioseqResolution.SetBioseqInfo(records[0]);
205  return;
206  }
207  // Here: there are more than one records so a record will be picked for
208  // sure.
209  ssize_t index = SelectBioseqInfoRecord(records);
210  if (index < 0) {
211  // More than one and it was impossible to make a choice
212  app->GetTiming().Register(this, eLookupCassBioseqInfo,
214  app->GetCounters().Increment(this, CPSGSCounters::ePSGS_BioseqInfoNotFound);
215 
216  if (m_NeedTrace)
217  m_Reply->SendTrace(
218  to_string(records.size()) + " bioseq info records were found however "
219  "it was impossible to choose one of them. So report as not found",
220  m_Request->GetStartTimestamp());
221 
224  eDiag_Error, "Many bioseq info records found and not able to "
225  "choose one while resolving " + m_BioseqResolution.GetBioseqInfo().GetAccession(),
227  return;
228  }
229 
230  if (m_NeedTrace) {
231  m_Reply->SendTrace(
232  "Record with max version (and max date changed if "
233  "more than one with max version) selected "
234  "(SEQ_STATE_LIFE records are checked first)\n" +
235  ToJsonString(records[index],
237  "\nReport found",
238  m_Request->GetStartTimestamp());
239  }
240 
242 
243  app->GetTiming().Register(this, eLookupCassBioseqInfo, eOpStatusFound,
245  app->GetCounters().Increment(this, CPSGSCounters::ePSGS_BioseqInfoFoundOne);
246  m_BioseqResolution.SetBioseqInfo(records[index]);
248 }
249 
250 
251 void
253  vector<CBioseqInfoRecord>&& records)
254 {
257 
258  auto app = CPubseqGatewayApp::GetInstance();
259  auto request_version = m_BioseqResolution.GetBioseqInfo().GetVersion();
260  SINSDCDecision decision = DecideINSDC(records, request_version);
261 
262  if (m_NeedTrace) {
263  string msg = to_string(records.size()) +
264  " hit(s); decision status: " + to_string(decision.status);
265  for (const auto & item : records) {
267  }
268  m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
269  }
270 
271  switch (decision.status) {
273  if (m_NeedTrace)
274  m_Reply->SendTrace("Report found",
275  m_Request->GetStartTimestamp());
276 
278 
279  app->GetTiming().Register(this, eLookupCassBioseqInfo,
281  app->GetCounters().Increment(this, CPSGSCounters::ePSGS_BioseqInfoFoundOne);
282  m_BioseqResolution.SetBioseqInfo(records[decision.index]);
283 
284  // Data callback
286  break;
288  if (m_NeedTrace)
289  m_Reply->SendTrace("Report not found",
290  m_Request->GetStartTimestamp());
291 
293 
294  app->GetTiming().Register(this, eLookupCassBioseqInfo,
296  app->GetCounters().Increment(this, CPSGSCounters::ePSGS_BioseqInfoNotFound);
297 
298  // Data Callback
299  // An empty message means for the upper level that this is a
300  // generic case when a seq_id could not be resolved
303  break;
305  if (m_NeedTrace)
306  m_Reply->SendTrace("Report not found",
307  m_Request->GetStartTimestamp());
308 
310 
311  app->GetTiming().Register(this, eLookupCassBioseqInfo,
313  app->GetCounters().Increment(this, CPSGSCounters::ePSGS_BioseqInfoFoundMany);
314 
315  // Error callback
318  decision.message, ePSGS_NeedLogging);
319  break;
320  default:
321  // Impossible
322  m_ErrorCB(
324  eDiag_Error, "Unexpected decision code when a secondary INSCD "
325  "request results processed while retrieving bioseq info",
327  }
328 }
329 
330 
331 void
333  int code,
334  EDiagSev severity,
335  const string & message)
336 {
337  if (m_Fetch) {
340  }
341  if (m_NoSeqIdTypeFetch) {
344  }
345 
347  this,
349 
350  m_ErrorCB(status, code, severity, message, ePSGS_NeedLogging);
351 }
352 
function< void(SBioseqResolution &&async_bioseq_resolution)> TSeqIdResolutionFinishedCB
function< void(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message, EPSGS_LoggingFlag logging_flag)> TSeqIdResolutionErrorCB
ssize_t SelectBioseqInfoRecord(const vector< CBioseqInfoRecord > &records)
CBioseqInfoFetchRequest & SetVersion(CBioseqInfoRecord::TVersion value)
Definition: request.hpp:73
CBioseqInfoFetchRequest & SetGI(CBioseqInfoRecord::TGI value)
Definition: request.hpp:87
CBioseqInfoFetchRequest & SetSeqIdType(CBioseqInfoRecord::TSeqIdType value)
Definition: request.hpp:80
CBioseqInfoFetchRequest & SetAccession(CBioseqInfoRecord::TAccession const &value)
Definition: request.hpp:64
TSeqIdType GetSeqIdType() const
Definition: record.hpp:208
TVersion GetVersion() const
Definition: record.hpp:203
TGI GetGI() const
Definition: record.hpp:223
TAccession const & GetAccession() const
Definition: record.hpp:198
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
Definition: fetch.cpp:92
void SetConsumeCallback(TBioseqInfoConsumeCallback callback)
Definition: fetch.cpp:87
void SetErrorCB(TDataErrorCallback error_cb)
void SetReadFinished(void)
Definition: cass_fetch.hpp:89
CCassBlobWaiter * GetLoader(void)
Definition: cass_fetch.hpp:86
void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)
void x_OnBioseqInfoWithoutSeqIdType(vector< CBioseqInfoRecord > &&records)
TSeqIdResolutionErrorCB m_ErrorCB
void x_OnBioseqInfo(vector< CBioseqInfoRecord > &&records)
void x_OnBioseqInfoError(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void MakeRequest(SBioseqResolution &&bioseq_resolution)
TSeqIdResolutionFinishedCB m_FinishedCB
list< unique_ptr< CCassFetch > > m_FetchDetails
CPSGSCounters & GetCounters(void)
static CPubseqGatewayApp * GetInstance(void)
SSatInfoEntry GetBioseqKeyspace(void) const
shared_ptr< CPSGS_Reply > m_Reply
shared_ptr< CPSGS_Request > m_Request
#define true
Definition: bool.h:35
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
SINSDCDecision DecideINSDC(const vector< CBioseqInfoRecord > &records, CBioseqInfoRecord::TVersion version)
Definition: insdc_utils.cpp:81
bool IsINSDCSeqIdType(CBioseqInfoRecord::TSeqIdType seq_id_type)
Definition: insdc_utils.cpp:44
static int version
Definition: mdb_load.c:29
int ssize_t
Definition: ncbiconf_msvc.h:93
#define nullptr
Definition: ncbimisc.hpp:45
string StripTrailingVerticalBars(const string &seq_id)
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
@ ePSGS_ServerLogicError
@ ePSGS_UnresolvedSeqId
@ ePSGS_BioseqInfoMultipleRecords
@ ePSGS_BioseqDB
@ ePSGS_NotResolved
@ ePSGS_SkipLogging
@ ePSGS_NeedLogging
EPSGS_ResolutionResult m_ResolutionResult
CBioseqInfoRecord & GetBioseqInfo(void)
void SetBioseqInfo(const CBioseqInfoRecord &record)
Definition: inftrees.h:24
@ eOpStatusFound
Definition: timing.hpp:61
@ eOpStatusNotFound
Definition: timing.hpp:62
@ eLookupCassBioseqInfo
Definition: timing.hpp:71
Modified on Fri Jul 12 16:24:25 2024 by modify_doxy.py rev. 669887