NCBI C++ ToolKit
fetch_ipg_report.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /*****************************************************************************
2  * $Id: fetch_ipg_report.cpp 103120 2024-09-11 14:33:06Z saprykin $
3  * ===========================================================================
4  *
5  * PUBLIC DOMAIN NOTICE
6  * National Center for Biotechnology Information
7  *
8  * This software/database is a "United States Government Work" under the
9  * terms of the United States Copyright Act. It was written as part of
10  * the author's official duties as a United States Government employee and
11  * thus cannot be copyrighted. This software/database is freely available
12  * to the public for use. The National Library of Medicine and the U.S.
13  * Government have not placed any restriction on its use or reproduction.
14  *
15  * Although all reasonable efforts have been taken to ensure the accuracy
16  * and reliability of the software and data, the NLM and the U.S.
17  * Government do not and cannot warrant the performance or results that
18  * may be obtained by using this software or data. The NLM and the U.S.
19  * Government disclaim all warranties, express or implied, including
20  * warranties of performance, merchantability or fitness for any particular
21  * purpose.
22  *
23  * Please cite the author in any work or product based on this material.
24  *
25  * Retrieval part of IPG storage library for integration with PubseqGateway
26  *
27  *****************************************************************************/
28 
29 #include <ncbi_pch.hpp>
30 #include <corelib/ncbistd.hpp>
31 #include <corelib/ncbitime.hpp>
32 
35 
37 BEGIN_SCOPE(ipg)
38 
40 
41 inline static constexpr array<pair<const char*, const char *>, 22> ipg_report_fields{{
42  {"ipg", "ipg"},
43  {"accession", "accession"},
44  {"assembly", "assembly"},
45  {"product_name", "product_name"},
46  {"nuc_accession", "nuc_accession"},
47  {"length", "length"},
48  {"div", "div"},
49  {"src_db", "src_db"},
50  {"src_refseq", "src_refseq"},
51  {"taxid", "taxid"},
52  {"cds", "cds"},
53  {"strain", "strain"},
54  {"gb_state", "gb_state"},
55  {"weights", "weights"},
56  {"bioproject", "bioproject"},
57  {"compartment", "compartment"},
58  {"updated", "updated"},
59  {"created", "created"},
60  {"flags", "flags"},
61  {"pubmedids", "pubmedids"},
62  {"def_line", "def_line"},
63  {"write_time", "writetime(gb_state)"},
64 }};
65 
66 constexpr int FieldIndex(const char * field_name)
67 {
68  int i{0};
69  for (const auto& field : ipg_report_fields) {
70  if (field.first == field_name) {
71  return i;
72  }
73  ++i;
74  }
75  return -1;
76 }
77 
79 {
80  string field_list;
81  for (auto const& item : ipg_report_fields) {
82  field_list.append(item.second).append(", ");
83  }
84  if (!field_list.empty()) {
85  field_list.resize(field_list.size() - 2);
86  }
87  return field_list;
88 }
89 
90 void PopulateEntry(shared_ptr<CCassQuery>& query, CIpgStorageReportEntry& entry)
91 {
92  entry.SetIpg(query->FieldGetInt64Value(FieldIndex("ipg")));
93  entry
94  .SetAccession(query->FieldGetStrValue(FieldIndex("accession")))
95  .SetNucAccession(query->FieldGetStrValue(FieldIndex("nuc_accession")))
96  .SetGbState( query->FieldIsNull(FieldIndex("gb_state")) ?
98  query->FieldGetInt32Value(FieldIndex("gb_state"))
99  );
100 
101  entry.SetAssembly(query->FieldGetStrValueDef(FieldIndex("assembly"), ""));
102  entry.SetProductName(query->FieldGetStrValueDef(FieldIndex("product_name"), ""));
103  entry.SetDiv(query->FieldGetStrValueDef(FieldIndex("div"), ""));
104  entry.SetStrain(query->FieldGetStrValueDef(FieldIndex("strain"), ""));
105  entry.SetBioProject(query->FieldGetStrValueDef(FieldIndex("bioproject"), ""));
106  entry.SetGenome(
107  query->FieldIsNull(FieldIndex("compartment")) ?
108  objects::CBioSource::eGenome_unknown :
109  static_cast<objects::CBioSource::EGenome>(query->FieldGetInt32Value(FieldIndex("compartment")))
110  );
111 
112  entry.SetLength(query->FieldGetInt32Value(FieldIndex("length"), 0));
113  entry.SetSrcDb(query->FieldGetInt32Value(FieldIndex("src_db"), 0));
114  entry.SetTaxid(TAX_ID_FROM(int, query->FieldGetInt32Value(FieldIndex("taxid"), 0)));
115  entry.SetFlags(query->FieldGetInt32Value(FieldIndex("flags"), 0));
116  int64_t writetime_mc = query->FieldGetInt64Value(FieldIndex("write_time"));
117  CTime writetime(writetime_mc/1000000);
118  writetime.SetMicroSecond(writetime_mc % 1000000);
119  writetime.ToLocalTime();
120  entry.SetWriteTime(writetime);
121 
122  if (!query->FieldIsNull(FieldIndex("updated"))) {
123  entry.SetUpdated(CTime(query->FieldGetInt64Value(FieldIndex("updated"))/1000).ToLocalTime());
124  } else {
125  entry.SetUpdated(CTime());
126  }
127 
128  if (!query->FieldIsNull(FieldIndex("created"))) {
129  entry.SetCreated(CTime(query->FieldGetInt64Value(FieldIndex("created"))/1000).ToLocalTime());
130  } else {
131  entry.SetCreated(CTime());
132  }
133 
134  if (!query->FieldIsNull(FieldIndex("src_refseq"))) {
135  list<string> src_refseq;
136  query->FieldGetContainerValue(FieldIndex("src_refseq"), back_inserter(src_refseq));
137  entry.SetRefseq(std::move(src_refseq));
138  }
139 
140  if (!query->FieldIsNull(FieldIndex("weights"))) {
141  TIpgWeights weights;
142  query->FieldGetContainerValue(FieldIndex("weights"), back_inserter(weights));
143  entry.SetWeights(std::move(weights));
144  }
145  if (!query->FieldIsNull(FieldIndex("pubmedids"))) {
146  TPubMedIds ids;
147  query->FieldGetContainerValue(FieldIndex("pubmedids"), inserter(ids, ids.end()));
148  entry.SetPubMedIds(std::move(ids));
149  }
150 
151  if (!query->FieldIsNull(FieldIndex("cds"))) {
152  entry.SetCds(SIpgCds(
153  query->FieldGetTupleValue<tuple<TCdsValue, TCdsValue, TCdsValue>>(FieldIndex("cds"))
154  ));
155  }
156 
157  entry.SetDefLine(query->FieldGetStrValueDef(FieldIndex("def_line"), ""));
158 }
159 
160 END_SCOPE()
161 
163  shared_ptr<CCassConnection> connection,
164  const string & keyspace,
167  TDataErrorCallback data_error_cb,
168  bool async
169 )
170  : CCassBlobWaiter(std::move(connection), keyspace, async, std::move(data_error_cb))
171  , m_Request(request)
172  , m_ConsumeCallback(std::move(consume_callback))
173 {}
174 
176 
178 
180 {
181  m_ConsumeCallback = std::move(callback);
182 }
183 
185 {
187 }
188 
190 {
191  m_PageSize = value;
192 }
193 
194 void CPubseqGatewayFetchIpgReport::SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)
195 {
196  if (callback && m_State != eInit) {
197  NCBI_THROW(CCassandraException, eSeqFailed,
198  "CPubseqGatewayFetchIpgReport: DataReadyCB can't be assigned "
199  "after the loading process has started");
200  }
202 }
203 
205 {
206  using THugeHelper = CPubseqGatewayHugeIpgReportHelper;
207  bool restarted;
208  do {
209  restarted = false;
210  switch(m_State) {
211  case eError:
212  case eDone:
213  return;
214  case eInit:
216  m_QueryArr.resize(1);
217  m_QueryArr[0] = {ProduceQuery(), 0};
218  // If accession is provided instead of IPG, resolve IPG, then return
219  // to the 'new task' state.
221  m_QueryArr[0].query->SetSQL("SELECT ipg FROM " + GetKeySpace() + ".accession_to_ipg WHERE accession = ?", 1);
222  m_QueryArr[0].query->BindStr(0, m_Request.GetProtein());
224  m_QueryArr[0].query->Query(m_Consistency, m_Async);
226  } else if (THugeHelper::HugeIpgEnabled()) {
228  THugeHelper::PrepareHugeIpgConfigQuery(m_Request.GetIpgToFetchData(), GetKeySpace(), m_QueryArr[0].query.get(), m_Consistency, m_Async);
230  }
231  else {
232  restarted = true;
234  }
235  break;
236 
238  if (CheckReady(m_QueryArr[0])) {
239  m_QueryArr[0].query->NextRow();
240  if (!m_QueryArr[0].query->IsEOF()) {
241  m_Request.SetResolvedIpg(m_QueryArr[0].query->FieldGetInt64Value(0));
242  }
244  restarted = true;
245  m_State = eInit;
246  }
247  else {
248  restarted = true;
250  }
251  }
252  break;
254  if (CheckReady(m_QueryArr[0])) {
255  THugeHelper::FetchHugeIpgConfigResult(m_Subgroups, m_QueryArr[0].query.get(), m_Async);
258  }
259  if (m_Subgroups.IsReadable()) {
262  }
263  else {
265  }
266  restarted = true;
267  }
268  break;
269 
270  case eTaskFetchReport:
271  {
272  m_QueryArr[0] = {ProduceQuery(), 0};
273  size_t args = 1;
274  string protein_suffix;
275  if (m_Request.HasProtein()) {
276  args = 2;
277  protein_suffix = " AND accession = ?";
278  if (m_Request.HasNucleotide()) {
279  args = 3;
280  protein_suffix += " AND nuc_accession = ?";
281  }
282  }
283  m_QueryArr[0].query->SetSQL("SELECT " + GetSelectFieldList()
284  + " FROM " + GetKeySpace() + ".ipg_report WHERE ipg = ?" + protein_suffix, args);
285  m_QueryArr[0].query->BindInt64(0, m_Request.GetIpgToFetchData());
286  if (m_Request.HasProtein()) {
287  m_QueryArr[0].query->BindStr(1, m_Request.GetProtein());
288  if (m_Request.HasNucleotide()) {
289  m_QueryArr[0].query->BindStr(2, m_Request.GetNucleotide());
290  }
291  }
293  m_QueryArr[0].query->Query(m_Consistency, m_Async, true, m_PageSize);
294  m_LastAccession.clear();
295  m_LastNucAccession.clear();
297  }
298  break;
299 
301  if (m_SubgroupItr == m_Subgroups.subgroups.cend()) {
302  restarted = true;
304  }
305  else {
306  size_t args = 2;
307  string protein_suffix;
308  if (m_Request.HasProtein()) {
309  args = 3;
310  protein_suffix = " AND accession = ?";
311  if (m_Request.HasNucleotide()) {
312  args = 4;
313  protein_suffix += " AND nuc_accession = ?";
314  }
315  }
316  m_QueryArr[0] = {ProduceQuery(), 0};
317  m_QueryArr[0].query->SetSQL("SELECT " + GetSelectFieldList()
318  + " FROM " + GetKeySpace() + ".ipg_report_huge WHERE ipg = ? and subgroup = ?" + protein_suffix, args);
319  m_QueryArr[0].query->BindInt64(0, m_Request.GetIpgToFetchData());
320  m_QueryArr[0].query->BindInt32(1, *m_SubgroupItr);
321  if (m_Request.HasProtein()) {
322  m_QueryArr[0].query->BindStr(2, m_Request.GetProtein());
323  if (m_Request.HasNucleotide()) {
324  m_QueryArr[0].query->BindStr(3, m_Request.GetNucleotide());
325  }
326  }
328  m_QueryArr[0].query->Query(m_Consistency, m_Async, true, m_PageSize);
329  m_LastAccession.clear();
330  m_LastNucAccession.clear();
332  }
333  break;
334 
336  if (CheckReady(m_QueryArr[0])) {
337  while (m_QueryArr[0].query->NextRow() == ar_dataready) {
338  m_Container.resize(m_Container.size() + 1);
339  auto& new_item = *rbegin(m_Container);
340  PopulateEntry(m_QueryArr[0].query, new_item);
341  if (!new_item.IsValid()) {
342  m_Container.resize(m_Container.size() - 1);
343  }
344  }
345  // If query has been restarted we need to filter out whatever we already returned to client
346  if (!m_LastAccession.empty() || !m_LastNucAccession.empty()) {
347  m_Container.erase(
348  remove_if(begin(m_Container), end(m_Container),
349  [this](CIpgStorageReportEntry const& e) -> bool
350  {
351  return e.GetAccession() < m_LastAccession
353  }),
354  end(m_Container)
355  );
356  }
357  bool do_next{true};
358  if (!m_Container.empty()) {
359  m_LastAccession = rbegin(m_Container)->GetAccession();
360  m_LastNucAccession = rbegin(m_Container)->GetNucAccession();
361  do_next = m_ConsumeCallback(std::move(m_Container), false);
362  m_Container.clear();
363  }
364  if (!do_next) {
365  restarted = true;
367  }
368  else if (m_QueryArr[0].query->IsEOF()) {
369  if (m_Subgroups.IsReadable()) {
370  ++m_SubgroupItr;
372  }
373  else {
375  }
376  restarted = true;
377  }
378  }
379  break;
380  case eTaskCleanup:
381  if (m_State != eError) {
382  m_ConsumeCallback({}, true);
383  }
384  CloseAll();
385  m_State = eDone;
386  break;
387 
388  default: { // LCOV_EXCL_LINE
389  char msg[1024]; // LCOV_EXCL_LINE
390  string keyspace = GetKeySpace(); // LCOV_EXCL_LINE
391  snprintf(msg, sizeof(msg), // LCOV_EXCL_LINE
392  "Failed to fetch blob (key=%s.%d) unexpected state (%d)",
393  keyspace.c_str(), GetKey(), static_cast<int>(m_State));
394  Error(CRequestStatus::e502_BadGateway, // LCOV_EXCL_LINE
396  eDiag_Error, msg);
397  }
398  }
399  } while(restarted);
400 }
401 
402 END_SCOPE(ipg)
#define static
function< void(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)> TDataErrorCallback
@ ar_dataready
Definition: cass_driver.hpp:70
void remove_if(Container &c, Predicate *__pred)
Definition: chainer.hpp:69
atomic< int32_t > m_State
string GetKeySpace() const
int32_t GetKey() const
void SetDataReadyCB3(shared_ptr< CCassDataCallbackReceiver > datareadycb3)
void SetupQueryCB3(shared_ptr< CCassQuery > &query)
bool CheckReady(shared_ptr< CCassQuery > qry, unsigned int restart_counter, bool &need_repeat)
void CloseAll(void)
vector< SQueryRec > m_QueryArr
shared_ptr< CCassQuery > ProduceQuery() const
void Error(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
CPubseqGatewayFetchIpgReportRequest & SetResolvedIpg(TIpg value)
void SetConsumeCallback(CPubseqGatewayIpgReportConsumeCallback callback)
CPubseqGatewayIpgReportConsumeCallback m_ConsumeCallback
void SetPageSize(unsigned int value)
vector< CIpgStorageReportEntry > m_Container
static const size_t kReadBufferReserveDefault
CPubseqGatewayFetchIpgReportRequest m_Request
EIpgSubgroupsStatus m_SubgroupsStatusOverride
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
SIpgSubgroupsConfig::TCIterator m_SubgroupItr
void SetConsistency(CassConsistency value)
CTime –.
Definition: ncbitime.hpp:296
const_iterator end() const
Definition: set.hpp:136
Include a standard set of the NCBI C++ Toolkit most basic headers.
static constexpr array< pair< const char *, const char * >, 22 > ipg_report_fields
constexpr int FieldIndex(const char *field_name)
string GetSelectFieldList()
void PopulateEntry(shared_ptr< CCassQuery > &query, CIpgStorageReportEntry &entry)
Int8 int64_t
#define TAX_ID_FROM(T, value)
Definition: ncbimisc.hpp:1111
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define END_SCOPE(ns)
End the previously defined scope.
Definition: ncbistl.hpp:75
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
#define BEGIN_SCOPE(ns)
Define a new scope.
Definition: ncbistl.hpp:72
CTime & ToLocalTime(void)
Convert the time into local time.
Definition: ncbitime.hpp:2464
void SetMicroSecond(long microsecond)
Set microseconds.
Definition: ncbitime.cpp:1120
function< bool(vector< CIpgStorageReportEntry > &&, bool is_last)> CPubseqGatewayIpgReportConsumeCallback
vector< double > TIpgWeights
Definition: ipg_types.hpp:47
int i
const GenericPointer< typename T::ValueType > T2 value
Definition: pointer.h:1227
Defines: CTimeFormat - storage class for time format.
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
CIpgStorageReportEntry & SetDefLine(string const &defline)
CIpgStorageReportEntry & SetDiv(string const &value)
CIpgStorageReportEntry & SetProductName(string const &value)
CIpgStorageReportEntry & SetIpg(TIpg value)
CIpgStorageReportEntry & SetGenome(objects::CBioSource::EGenome value)
CIpgStorageReportEntry & SetNucAccession(string const &value)
CIpgStorageReportEntry & SetBioProject(string const &value)
CIpgStorageReportEntry & SetWriteTime(CTime value)
CIpgStorageReportEntry & SetTaxid(TTaxId value)
CIpgStorageReportEntry & SetWeights(TIpgWeights &&value)
CIpgStorageReportEntry & SetUpdated(CTime value)
CIpgStorageReportEntry & SetAssembly(string const &value)
CIpgStorageReportEntry & SetStrain(string const &value)
CIpgStorageReportEntry & SetPubMedIds(TPubMedIds &&ids)
CIpgStorageReportEntry & SetFlags(Int4 value)
CIpgStorageReportEntry & SetCds(TIpgCds value)
string const & GetAccession() const
CIpgStorageReportEntry & SetAccession(string const &value)
CIpgStorageReportEntry & SetCreated(CTime value)
CIpgStorageReportEntry & SetRefseq(list< string > const &value)
CIpgStorageReportEntry & SetLength(Int4 value)
CIpgStorageReportEntry & SetGbState(TGbState value)
CIpgStorageReportEntry & SetSrcDb(Int4 value)
string const & GetNucAccession() const
EIpgSubgroupsStatus status
Definition: ipg_types.hpp:78
bool IsReadable() const
Definition: ipg_types.hpp:82
vector< int32_t > subgroups
Definition: ipg_types.hpp:80
static string query
@ NCBI_gb_state_eWGSGenBankMissing
Definition: wgsread.hpp:91
#define const
Definition: zconf.h:232
Modified on Fri Sep 20 14:57:43 2024 by modify_doxy.py rev. 669887