NCBI C++ ToolKit
fetch.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: fetch.cpp 103120 2024-09-11 14:33:06Z saprykin $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Evgueni Belyi based on Dmitrii Saprykin's code
27  *
28  * File Description:
29  *
30  * Cassandra fetch named annot record
31  *
32  */
33 
34 #include <ncbi_pch.hpp>
35 
36 #include <memory>
37 #include <string>
38 #include <tuple>
39 #include <utility>
40 #include <vector>
41 
44 
47 
49  shared_ptr<CCassConnection> connection,
50  const string & keyspace,
51  string accession,
52  TAccVerHistConsumeCallback consume_callback,
53  TDataErrorCallback data_error_cb,
55  int16_t seq_id_type
56 )
58  std::move(connection), keyspace,
59  true, // false, // m_Async
60  std::move(data_error_cb)
61  )
62  , m_Accession(std::move( accession))
63  , m_Version(version)
64  , m_SeqIdType(seq_id_type)
65  , m_Consume(std::move( consume_callback))
66 {}
67 
69 {
70  m_Consume = std::move(callback);
71 }
72 
74  shared_ptr<CCassDataCallbackReceiver> callback)
75 {
76  if( callback && m_State != eInit)
77  {
78  NCBI_THROW(CCassandraException, eSeqFailed,
79  "CCassAccVerHistoryTaskFetch: DataReadyCB can't be assigned "
80  "after the loading process has started");
81  }
83 }
84 
86 {
87  bool restarted = false;
88  do
89  {
90  restarted = false;
91  switch (m_State)
92  {
93  case eError:
94  case eDone:
95  return;
96 
97  case eInit:
98  {
99  m_QueryArr.resize(1);
100  m_QueryArr[0] = {ProduceQuery(), 0};
101 
102  string sql =
103  " SELECT"
104  " version, gi, date, chain, id_type, sat, sat_key"
105  " FROM " + GetKeySpace() + ".acc_ver_hist "
106  " WHERE"
107  " accession = ? ";
108  unsigned int params = 1;
109  if( m_Version > 0)
110  {
111  sql += " AND version = ?";
112  ++params;
113  }
114  if( m_SeqIdType > 0)
115  {
116  sql += " AND id_type = ? ALLOW FILTERING";
117  ++params;
118  }
119 
120  m_QueryArr[0].query->SetSQL(sql, params);
121  m_QueryArr[0].query->BindStr(0, m_Accession);
122 
123  unsigned int param = 1;
124  if( m_Version > 0)
125  {
126  m_QueryArr[0].query->BindInt16( param, m_Version); param++;
127  }
128  if( m_SeqIdType > 0)
129  {
130  m_QueryArr[0].query->BindInt16( param, m_SeqIdType);
131  }
132 
134  m_QueryArr[0].query->Query(GetReadConsistency(), m_Async, true, m_PageSize);
135  restarted = true;
137  break;
138  }
139 
140  case eFetchStarted:
141  {
142  if( CheckReady( m_QueryArr[0].query, m_RestartCounter, restarted))
143  {
144  bool do_next = true;
145  auto state = m_QueryArr[0].query->NextRow();
146  while( do_next && state == ar_dataready)
147  {
148  SAccVerHistRec record;
149  record.accession = m_Accession;
150  record.version = m_QueryArr[0].query->FieldGetInt16Value( 0, 0);
151  record.gi = m_QueryArr[0].query->FieldGetInt64Value( 1, 0);
152  record.date = m_QueryArr[0].query->FieldGetInt64Value( 2, 0);
153  record.chain = m_QueryArr[0].query->FieldGetInt64Value( 3, 0);
154  record.seq_id_type = m_QueryArr[0].query->FieldGetInt16Value( 4, 0);
155  record.sat = m_QueryArr[0].query->FieldGetInt16Value( 5, 0);
156  record.sat_key = m_QueryArr[0].query->FieldGetInt32Value( 6, 0);
157 
158  if( m_Consume)
159  {
160  do_next = m_Consume( std::move( record), false);
161  }
162  if( do_next)
163  {
164  state = m_QueryArr[0].query->NextRow();
165  }
166  }
167  if( !do_next || m_QueryArr[0].query->IsEOF())
168  {
169  if( m_Consume)
170  {
171  m_Consume( SAccVerHistRec(), true);
172  }
173  CloseAll();
174  m_State = eDone;
175  }
176  }
177  else if( restarted)
178  {
180  m_QueryArr[0].query->Close();
181  m_State = eInit;
182  }
183  break;
184  }
185 
186  default:
187  {
188  char msg[1024];
189  string keyspace = GetKeySpace();
190  snprintf(msg, sizeof(msg),
191  "Failed to fetch accession history (key=%s.%s|%hd|%hd) unexpected state (%d)",
192  keyspace.c_str(), m_Accession.c_str(), m_Version, m_SeqIdType,
193  static_cast<int>(m_State));
196  }
197  }
198  } while( restarted);
199 }
200 
#define END_IDBLOB_SCOPE
Definition: IdCassScope.hpp:40
#define BEGIN_IDBLOB_SCOPE
Definition: IdCassScope.hpp:39
BEGIN_IDBLOB_SCOPE USING_NCBI_SCOPE
Definition: fetch.cpp:46
function< bool(SAccVerHistRec &&, bool last)> TAccVerHistConsumeCallback
Definition: record.hpp:82
function< void(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)> TDataErrorCallback
@ ar_dataready
Definition: cass_driver.hpp:70
TAccVerHistConsumeCallback m_Consume
Definition: tasks.hpp:97
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
Definition: fetch.cpp:73
unsigned int m_PageSize
Definition: tasks.hpp:99
CCassAccVerHistoryTaskFetch(shared_ptr< CCassConnection > connection, const string &keyspace, string accession, TAccVerHistConsumeCallback consume_callback, TDataErrorCallback data_error_cb, int16_t version=0, int16_t seq_id_type=0)
Definition: fetch.cpp:48
void SetConsumeCallback(TAccVerHistConsumeCallback callback)
Definition: fetch.cpp:68
unsigned int m_RestartCounter
Definition: tasks.hpp:100
void Wait1() override
Definition: fetch.cpp:85
atomic< int32_t > m_State
string GetKeySpace() const
void SetDataReadyCB3(shared_ptr< CCassDataCallbackReceiver > datareadycb3)
TCassConsistency GetReadConsistency() const
void SetupQueryCB3(shared_ptr< CCassQuery > &query)
bool CheckReady(shared_ptr< CCassQuery > qry, unsigned int restart_counter, bool &need_repeat)
void CloseAll(void)
vector< SQueryRec > m_QueryArr
shared_ptr< CCassQuery > ProduceQuery() const
void Error(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
#define true
Definition: bool.h:35
static char sql[1024]
Definition: putdata.c:19
Int2 int16_t
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
const string version
version string
Definition: variables.hpp:66
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
int64_t date
Definition: record.hpp:56
int16_t sat
Definition: record.hpp:58
int16_t version
Definition: record.hpp:53
int64_t gi
Definition: record.hpp:51
int32_t sat_key
Definition: record.hpp:57
string accession
Definition: record.hpp:52
int64_t chain
Definition: record.hpp:59
int16_t seq_id_type
Definition: record.hpp:54
static string query
Modified on Fri Sep 20 14:56:57 2024 by modify_doxy.py rev. 669887