NCBI C++ ToolKit
fetch.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: fetch.cpp 101970 2024-03-13 14:25:04Z saprykin $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitrii Saprykin
27  *
28  * File Description:
29  *
30  * Cassandra fetch named annot record
31  *
32  */
33 
34 #include <ncbi_pch.hpp>
35 
37 
38 #include <memory>
39 #include <string>
40 #include <tuple>
41 #include <utility>
42 #include <vector>
43 
47 
50 
52  shared_ptr<CCassConnection> connection,
53  const string & keyspace,
54  string accession,
56  int16_t seq_id_type,
57  const vector<string> & annot_names,
58  TNAnnotConsumeCallback consume_callback,
59  TDataErrorCallback data_error_cb
60 )
61  : CCassBlobWaiter(move(connection), keyspace, true, move(data_error_cb))
62  , m_Accession(move(accession))
63  , m_Version(version)
64  , m_SeqIdType(seq_id_type)
65  , m_AnnotNames(annot_names)
66  , m_Consume(move(consume_callback))
67 {}
68 
70  shared_ptr<CCassConnection> connection,
71  const string & keyspace,
72  string accession,
74  int16_t seq_id_type,
75  const vector<CTempString> & annot_names,
76  TNAnnotConsumeCallback consume_callback,
77  TDataErrorCallback data_error_cb
78 )
79  : CCassBlobWaiter(move(connection), keyspace, true, move(data_error_cb))
80  , m_Accession(move(accession))
81  , m_Version(version)
82  , m_SeqIdType(seq_id_type)
83  , m_AnnotNamesTemp(annot_names)
84  , m_Consume(move(consume_callback))
85 {}
86 
88  shared_ptr<CCassConnection> connection,
89  const string & keyspace,
90  string accession,
92  int16_t seq_id_type,
93  TNAnnotConsumeCallback consume_callback,
94  TDataErrorCallback data_error_cb
95 )
96  : CCassBlobWaiter(move(connection), keyspace, true, move(data_error_cb))
97  , m_Accession(move(accession))
98  , m_Version(version)
99  , m_SeqIdType(seq_id_type)
100  , m_Consume(move(consume_callback))
101 {}
102 
104 {
105  m_Consume = move(callback);
106 }
107 
108 void CCassNAnnotTaskFetch::SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)
109 {
110  if (callback && m_State != eInit) {
111  NCBI_THROW(CCassandraException, eSeqFailed,
112  "CCassNAnnotTaskFetch: DataReadyCB can't be assigned "
113  "after the loading process has started");
114  }
116 }
117 
119 {
120  if (m_AnnotNames.empty()) {
121  return m_AnnotNamesTemp.size();
122  } else {
123  assert(m_AnnotNamesTemp.empty());
124  return m_AnnotNames.size();
125  }
126 }
127 
128 size_t CCassNAnnotTaskFetch::x_AnnotNamesCount(string const & more) const
129 {
130  if (m_AnnotNames.empty()) {
131  CTempString t(more);
132  return count_if(m_AnnotNamesTemp.begin(), m_AnnotNamesTemp.end(),
133  [&t](CTempString const & val) {
134  return val > t;
135  }
136  );
137  } else {
138  assert(m_AnnotNamesTemp.empty());
139  return count_if(m_AnnotNames.begin(), m_AnnotNames.end(),
140  [&more](string const & val) {
141  return val > more;
142  }
143  );
144  }
145 }
146 
148  shared_ptr<CCassQuery>& query, string const & more, unsigned int first
149 ) const
150 {
151  if (m_AnnotNames.empty()) {
152  CTempString t(more);
153  for (CTempString const & val : m_AnnotNamesTemp) {
154  if (val > t) {
155  query->BindStr(first++, val);
156  }
157  }
158  } else {
159  assert(m_AnnotNamesTemp.empty());
160  for (string const & val : m_AnnotNames) {
161  if (val > more) {
162  query->BindStr(first++, val);
163  }
164  }
165  }
166 }
167 
169 {
170  bool restarted;
171  do {
172  restarted = false;
173  switch (m_State) {
174  case eError:
175  case eDone:
176  return;
177 
178  case eInit: {
179  m_QueryArr.resize(1);
180  m_QueryArr[0] = {ProduceQuery(), 0};
181 
182  string sql =
183  " SELECT "
184  " annot_name, sat_key, last_modified, start, stop, seq_annot_info, annot_info_modified, state, writetime(stop) "
185  " FROM " + GetKeySpace() + ".bioseq_na "
186  " WHERE"
187  " accession = ? AND version = ? AND seq_id_type = ?";
188  unsigned int params = 3, names_count = 0;
189  if (x_AnnotNamesSize()) {
190  names_count = x_AnnotNamesCount(m_LastConsumedAnnot);
191  if (names_count > 0) {
192  sql += " AND annot_name in (" + NStr::Join(vector<string>(names_count, "?"), ",") + ")";
193  } else {
194  CloseAll();
195  m_State = eDone;
196  break;
197  }
198  } else {
199  if (!m_LastConsumedAnnot.empty()) {
200  sql += " AND annot_name > ?";
201  ++params;
202  }
203  }
204  m_QueryArr[0].query->SetSQL(sql, params + names_count);
205  m_QueryArr[0].query->BindStr(0, m_Accession);
206  m_QueryArr[0].query->BindInt16(1, m_Version);
207  m_QueryArr[0].query->BindInt16(2, m_SeqIdType);
208 
209  if (names_count > 0) {
211  } else {
212  if (!m_LastConsumedAnnot.empty()) {
213  m_QueryArr[0].query->BindStr(3, m_LastConsumedAnnot);
214  }
215  }
216 
218  m_QueryArr[0].query->Query(CASS_CONSISTENCY_LOCAL_QUORUM, m_Async, true, m_PageSize);
220  break;
221  }
222 
223  case eFetchStarted: {
224  if (CheckReady(m_QueryArr[0].query, m_RestartCounter, restarted)) {
225  bool do_next = true;
226  auto state = m_QueryArr[0].query->NextRow();
227  while (do_next && state == ar_dataready) {
228  CNAnnotRecord record;
229  record
233  .SetAnnotName(m_QueryArr[0].query->FieldGetStrValueDef(0, ""))
234  .SetSatKey(m_QueryArr[0].query->FieldGetInt32Value(1, 0))
235  .SetModified(m_QueryArr[0].query->FieldGetInt64Value(2, 0))
236  .SetStart(m_QueryArr[0].query->FieldGetInt32Value(3, 0))
237  .SetStop(m_QueryArr[0].query->FieldGetInt32Value(4, 0))
238  .SetAnnotInfoModified(m_QueryArr[0].query->FieldGetInt64Value(6, 0))
239  .SetState(m_QueryArr[0].query->FieldGetInt8Value(7, CNAnnotRecord::eStateAlive))
240  .SetWritetime(m_QueryArr[0].query->FieldGetInt64Value(8, 0));
241 
242  const unsigned char * rawdata = nullptr;
243  int64_t len = m_QueryArr[0].query->FieldGetBlobRaw(5, &rawdata);
244  if (len > 0) {
245  record.SetSeqAnnotInfo(string(reinterpret_cast<const char*>(rawdata), len));
246  }
247 
248  if (m_Consume && record.GetState() == CNAnnotRecord::eStateAlive) {
249  string annot_name = record.GetAnnotName();
250  do_next = m_Consume(move(record), false);
251  m_LastConsumedAnnot = move(annot_name);
252  }
253  if (do_next) {
254  state = m_QueryArr[0].query->NextRow();
255  }
256  }
257  if (!do_next || m_QueryArr[0].query->IsEOF()) {
258  if (m_Consume) {
259  m_Consume(CNAnnotRecord(), true);
260  }
261  CloseAll();
262  m_State = eDone;
263  }
264  } else if (restarted) {
266  m_QueryArr[0].query->Close();
267  m_State = eInit;
268  }
269  break;
270  }
271 
272  default: {
273  char msg[1024];
274  string keyspace = GetKeySpace();
275  snprintf(msg, sizeof(msg), "Failed to fetch named annot (key=%s.%s|%hd|%hd) unexpected state (%d)",
276  keyspace.c_str(), m_Accession.c_str(), m_Version, m_SeqIdType, static_cast<int>(m_State));
278  }
279  }
280  } while(restarted);
281 }
282 
#define END_IDBLOB_SCOPE
Definition: IdCassScope.hpp:40
#define BEGIN_IDBLOB_SCOPE
Definition: IdCassScope.hpp:39
BEGIN_IDBLOB_SCOPE USING_NCBI_SCOPE
Definition: fetch.cpp:46
function< void(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)> TDataErrorCallback
@ ar_dataready
Definition: cass_driver.hpp:70
atomic< int32_t > m_State
string GetKeySpace() const
void SetDataReadyCB3(shared_ptr< CCassDataCallbackReceiver > datareadycb3)
void SetupQueryCB3(shared_ptr< CCassQuery > &query)
bool CheckReady(shared_ptr< CCassQuery > qry, unsigned int restart_counter, bool &need_repeat)
void CloseAll(void)
vector< SQueryRec > m_QueryArr
shared_ptr< CCassQuery > ProduceQuery() const
void Error(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
TNAnnotConsumeCallback m_Consume
Definition: fetch.hpp:143
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
Definition: fetch.cpp:108
vector< CTempString > m_AnnotNamesTemp
Definition: fetch.hpp:142
void x_AnnotNamesBind(shared_ptr< CCassQuery > &query, string const &more, unsigned int first) const
Definition: fetch.cpp:147
size_t x_AnnotNamesCount(string const &more) const
Definition: fetch.cpp:128
void Wait1() override
Definition: fetch.cpp:168
CCassNAnnotTaskFetch(shared_ptr< CCassConnection > connection, const string &keyspace, string accession, int16_t version, int16_t seq_id_type, const vector< string > &annot_names, TNAnnotConsumeCallback consume_callback, TDataErrorCallback data_error_cb)
Definition: fetch.cpp:51
string m_LastConsumedAnnot
Definition: fetch.hpp:144
unsigned int m_RestartCounter
Definition: fetch.hpp:147
void SetConsumeCallback(TNAnnotConsumeCallback callback)
Definition: fetch.cpp:103
int16_t m_SeqIdType
Definition: fetch.hpp:140
vector< string > m_AnnotNames
Definition: fetch.hpp:141
unsigned int m_PageSize
Definition: fetch.hpp:146
size_t x_AnnotNamesSize() const
Definition: fetch.cpp:118
CNAnnotRecord & SetAccession(string value)
Definition: record.hpp:68
CNAnnotRecord & SetModified(TTimestamp value)
Definition: record.hpp:92
CNAnnotRecord & SetAnnotName(string value)
Definition: record.hpp:104
CNAnnotRecord & SetStop(TCoord value)
Definition: record.hpp:116
string const & GetAnnotName() const
Definition: record.hpp:187
CNAnnotRecord & SetSeqAnnotInfo(TAnnotInfo const &value)
Definition: record.hpp:122
CNAnnotRecord & SetStart(TCoord value)
Definition: record.hpp:110
CNAnnotRecord & SetSeqIdType(int16_t value)
Definition: record.hpp:80
CNAnnotRecord & SetSatKey(TSatKey value)
Definition: record.hpp:86
TState GetState() const
Definition: record.hpp:202
CNAnnotRecord & SetState(TState value)
Definition: record.hpp:140
CNAnnotRecord & SetVersion(int16_t value)
Definition: record.hpp:74
CNAnnotRecord & SetWritetime(TWritetime value)
Definition: record.hpp:134
CNAnnotRecord & SetAnnotInfoModified(TTimestamp value)
Definition: record.hpp:98
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
#define true
Definition: bool.h:35
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
Definition: dlist.tmpl.h:46
static char sql[1024]
Definition: putdata.c:19
Int2 int16_t
Int8 int64_t
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
static string Join(const TContainer &arr, const CTempString &delim)
Join strings using the specified delimiter.
Definition: ncbistr.hpp:2697
int len
static int version
Definition: mdb_load.c:29
function< bool(CNAnnotRecord &&, bool last)> TNAnnotConsumeCallback
Definition: record.hpp:224
EIPRangeType t
Definition: ncbi_localip.c:101
#define assert(x)
Definition: srv_diag.hpp:58
static string query
Modified on Wed Apr 24 14:20:03 2024 by modify_doxy.py rev. 669887