NCBI C++ ToolKit
fetch.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: fetch.cpp 97228 2022-06-28 14:10:54Z saprykin $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description:
29  *
30  * Cassandra fetch si2csi record
31  *
32  */
33 
34 #include <ncbi_pch.hpp>
35 
37 
38 #include <memory>
39 #include <set>
40 #include <string>
41 #include <tuple>
42 #include <utility>
43 #include <vector>
44 
47 
49 
51  const CassConsistency kBioSeqInfoConsistency = CassConsistency::CASS_CONSISTENCY_LOCAL_QUORUM;
52 
53  // Select field numbers; must be in sync with the select statements
54  const int fnVersion = 0;
55  const int fnSeqIdType = 1;
57  const int fnHash = 3;
58  const int fnGI = 4;
59  const int fnLength = 5;
60  const int fnMol = 6;
61  const int fnSat = 7;
62  const int fnSatKey = 8;
63  const int fnSeqIds = 9;
64  const int fnSeqState = 10;
65  const int fnState = 11;
66  const int fnTaxId = 12;
67  const int fnName = 13;
68  const int fnWritetime = 14;
69 
70  const string kSelectStatement = "SELECT version, seq_id_type, "
71  "date_changed, hash, gi, length, mol, sat, "
72  "sat_key, seq_ids, seq_state, state, tax_id, name, writetime(sat_key) FROM ";
73 
74  using TField = CBioseqInfoFetchRequest::EFields;
75 END_SCOPE()
76 
78  shared_ptr<CCassConnection> connection,
79  const string & keyspace,
81  TBioseqInfoConsumeCallback consume_callback,
82  TDataErrorCallback data_error_cb) :
83  CCassBlobWaiter(move(connection), keyspace, true, move(data_error_cb)),
84  m_Request(request),
85  m_Accession(request.GetAccession()),
86  m_ConsumeCallback(move(consume_callback))
87 {}
88 
90 {
91  m_ConsumeCallback = move(callback);
92 }
93 
94 void CCassBioseqInfoTaskFetch::SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)
95 {
96  if (callback && m_State != eInit) {
97  NCBI_THROW(CCassandraException, eSeqFailed,
98  "CCassBioseqInfoTaskFetch: DataReadyCB can't be assigned "
99  "after the loading process has started");
100  }
102 }
103 
105 {
107 }
108 
110 {
111  static const string s_Where_1 = ".bioseq_info WHERE accession = ?";
112  static const string s_Where_2 = s_Where_1 + " AND version = ?";
113  static const string s_Where_3 = s_Where_2 + " AND seq_id_type = ?";
114  static const string s_Where_4 = s_Where_3 + " AND gi = ?";
115 
116  string sql = kSelectStatement;
117  sql.append(GetKeySpace());
118  if (
122  {
123  sql.append(s_Where_4);
124  m_QueryArr[0].query->SetSQL(sql, 4);
125  m_QueryArr[0].query->BindStr(0, m_Accession);
126  m_QueryArr[0].query->BindInt16(1, m_Request.GetVersion());
127  m_QueryArr[0].query->BindInt16(2, m_Request.GetSeqIdType());
128  m_QueryArr[0].query->BindInt64(3, m_Request.GetGI());
130  sql.append(s_Where_3);
131  m_QueryArr[0].query->SetSQL(sql, 3);
132  m_QueryArr[0].query->BindStr(0, m_Accession);
133  m_QueryArr[0].query->BindInt16(1, m_Request.GetVersion());
134  m_QueryArr[0].query->BindInt16(2, m_Request.GetSeqIdType());
135  } else if (m_Request.HasField(TField::eVersion)) {
136  sql.append(s_Where_2);
137  m_QueryArr[0].query->SetSQL(sql, 2);
138  m_QueryArr[0].query->BindStr(0, m_Accession);
139  m_QueryArr[0].query->BindInt16(1, m_Request.GetVersion());
140  } else {
141  sql.append(s_Where_1);
142  m_QueryArr[0].query->SetSQL(sql, 1);
143  m_QueryArr[0].query->BindStr(0, m_Accession);
144  }
145 }
146 
148 {
149  string sql = kSelectStatement + GetKeySpace() + ".bioseq_info WHERE accession = ?";
150  m_QueryArr[0].query->SetSQL(sql, 1);
151  m_QueryArr[0].query->BindStr(0, m_Accession);
152 }
153 
155 {
157  m_QueryArr[0].query->Query(kBioSeqInfoConsistency, m_Async, true, m_PageSize);
158 }
159 
161 {
162  if (m_InheritanceAllowed) {
163  for (size_t i = 0; i < m_Records.size(); ++i) {
164  if (m_Records[i].GetState() != CBioseqInfoRecord::kStateAlive) {
166  }
167  }
168  }
169 
170  return !m_InheritanceRequired.empty();
171 }
172 
174 {
175  int version = m_QueryArr[0].query->FieldGetInt16Value(fnVersion);
176  int seq_id_type = m_QueryArr[0].query->FieldGetInt16Value(fnSeqIdType);
177  CBioseqInfoRecord::TGI gi = m_QueryArr[0].query->FieldGetInt64Value(fnGI);
178 
179  bool acceptable = true;
181  acceptable = acceptable && (gi == m_Request.GetGI());
182  }
184  acceptable = acceptable && (version == m_Request.GetVersion());
185  }
187  acceptable = acceptable && (seq_id_type == m_Request.GetSeqIdType());
188  }
189  return acceptable;
190 }
191 
193  CBioseqInfoRecord const& old_record, CBioseqInfoRecord const& new_record)
194 {
195  return
196  (
197  new_record.GetVersion() > old_record.GetVersion()
198  || (new_record.GetVersion() == old_record.GetVersion() && new_record.GetGI() > old_record.GetGI())
199  )
200  && new_record.GetSeqIdType() == old_record.GetSeqIdType();
201 }
202 
204 {
205  record.SetAccession(m_Accession)
206  .SetVersion(m_QueryArr[0].query->FieldGetInt16Value(fnVersion))
207  .SetSeqIdType(m_QueryArr[0].query->FieldGetInt16Value(fnSeqIdType))
208  .SetDateChanged(m_QueryArr[0].query->FieldGetInt64Value(fnDateChanged))
209  .SetHash(m_QueryArr[0].query->FieldGetInt32Value(fnHash))
210  .SetGI(m_QueryArr[0].query->FieldGetInt64Value(fnGI))
211  .SetLength(m_QueryArr[0].query->FieldGetInt32Value(fnLength))
212  .SetMol(m_QueryArr[0].query->FieldGetInt8Value(fnMol))
213  .SetSat(m_QueryArr[0].query->FieldGetInt16Value(fnSat))
214  .SetSatKey(m_QueryArr[0].query->FieldGetInt32Value(fnSatKey))
215  .SetSeqState(m_QueryArr[0].query->FieldGetInt8Value(fnSeqState))
216  .SetState(m_QueryArr[0].query->FieldGetInt8Value(fnState))
217  .SetTaxId(m_QueryArr[0].query->FieldGetInt32Value(fnTaxId))
218  .SetName(m_QueryArr[0].query->FieldGetStrValueDef(fnName, ""))
219  .SetWritetime(m_QueryArr[0].query->FieldGetInt64Value(fnWritetime));
220  m_QueryArr[0].query->FieldGetContainerValue(
221  fnSeqIds, inserter(record.GetSeqIds(), record.GetSeqIds().end()));
222 }
223 
225 {
226  while (m_QueryArr[0].query->NextRow() == ar_dataready) {
227  if (x_IsMatchingRecord()) {
228  m_Records.resize(m_Records.size() + 1);
229  x_PopulateRecord(m_Records[m_Records.size() - 1]);
230  }
231  }
232 }
233 
235 {
236  if (source.GetState() == CBioseqInfoRecord::kStateAlive) {
237  set<int16_t> seq_id_types;
238  for (auto const & seq_id : target.GetSeqIds()) {
239  seq_id_types.insert(get<0>(seq_id));
240  }
241 
242  for (auto const & seq_id : source.GetSeqIds()) {
243  if (seq_id_types.count(get<0>(seq_id)) == 0) {
244  target.GetSeqIds().insert(seq_id);
245  }
246  }
247  }
248 }
249 
250 
252 {
253  bool restarted;
254  do {
255  restarted = false;
256  switch (m_State) {
257  case eError:
258  case eDone:
259  return;
260 
261  case eInit:
262  {
263  m_Records.clear();
265  m_QueryArr.resize(1);
266  m_QueryArr[0] = {m_Conn->NewQuery(), 0};
268  x_StartQuery();
270  break;
271  }
272 
273  case eFetchStarted:
274  {
275  if (CheckReady(m_QueryArr[0].query, m_RestartCounter, restarted)) {
276  x_ReadingLoop();
277  if (m_QueryArr[0].query->IsEOF()) {
278  bool final_state = true;
279  if (m_ConsumeCallback) {
280  if (m_Records.empty()) {
282  } else {
283  if (x_InheritanceRequired()) {
284  final_state = false;
285  m_QueryArr[0].query->Close();
287  x_StartQuery();
289  } else {
291  }
292  }
293  }
294  if (final_state) {
295  CloseAll();
296  m_State = eDone;
297  }
298  }
299  } else if (restarted) {
301  m_QueryArr[0].query->Close();
302  m_State = eInit;
303  }
304  break;
305  }
306  // We can assume that getting into this state we have m_ConsumeCallback
308  {
309  if (CheckReady(m_QueryArr[0])) {
310  while (
311  m_QueryArr[0].query->NextRow() == ar_dataready
313  ) {
314  CBioseqInfoRecord record;
315  x_PopulateRecord(record);
316  auto itr = m_InheritanceRequired.begin();
317  while (itr != m_InheritanceRequired.end()) {
318  if (x_IsMatchingNewRecord(m_Records[*itr], record)) {
319  x_MergeSeqIds(m_Records[*itr], record);
320  itr = m_InheritanceRequired.erase(itr);
321  } else {
322  ++itr;
323  }
324  }
325  }
326  if (m_QueryArr[0].query->IsEOF() || m_InheritanceRequired.empty()) {
328  CloseAll();
329  m_State = eDone;
330  }
331  }
332  break;
333  }
334  default: {
335  char msg[1024];
336  string keyspace = GetKeySpace();
338  auto seq_id_type = m_Request.HasField(TField::eSeqIdType) ? m_Request.GetSeqIdType() : -1;
339  snprintf(msg, sizeof(msg), "Failed to fetch bioseq info (key=%s.%s.%d.%d) unexpected state (%d)",
340  keyspace.c_str(), m_Accession.c_str(),
341  static_cast<int>(version), static_cast<int>(seq_id_type),
342  static_cast<int>(m_State));
344  }
345  }
346  } while(restarted);
347 }
348 
#define END_IDBLOB_SCOPE
Definition: IdCassScope.hpp:40
#define BEGIN_IDBLOB_SCOPE
Definition: IdCassScope.hpp:39
function< void(vector< CBioseqInfoRecord > &&)> TBioseqInfoConsumeCallback
Definition: record.hpp:305
const int fnSeqIdType
Definition: fetch.cpp:55
const int fnSatKey
Definition: fetch.cpp:62
const string kSelectStatement
Definition: fetch.cpp:70
const int fnGI
Definition: fetch.cpp:58
BEGIN_IDBLOB_SCOPE const CassConsistency kBioSeqInfoConsistency
Definition: fetch.cpp:51
const int fnMol
Definition: fetch.cpp:60
const int fnDateChanged
Definition: fetch.cpp:56
const int fnHash
Definition: fetch.cpp:57
const int fnName
Definition: fetch.cpp:67
const int fnState
Definition: fetch.cpp:65
const int fnLength
Definition: fetch.cpp:59
const int fnSeqState
Definition: fetch.cpp:64
const int fnTaxId
Definition: fetch.cpp:66
const int fnVersion
Definition: fetch.cpp:54
const int fnWritetime
Definition: fetch.cpp:68
const int fnSeqIds
Definition: fetch.cpp:63
const int fnSat
Definition: fetch.cpp:61
#define true
Definition: bool.h:35
function< void(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)> TDataErrorCallback
@ ar_dataready
Definition: cass_driver.hpp:70
CBioseqInfoRecord::TSeqIdType GetSeqIdType() const
Definition: request.hpp:118
CBioseqInfoRecord::TVersion GetVersion() const
Definition: request.hpp:110
CBioseqInfoRecord::TGI GetGI() const
Definition: request.hpp:102
bool HasField(EFields field) const
Definition: request.hpp:126
TSeqIdType GetSeqIdType() const
Definition: record.hpp:208
TVersion GetVersion() const
Definition: record.hpp:203
CBioseqInfoRecord & SetWritetime(TWritetime value)
Definition: record.hpp:190
CBioseqInfoRecord & SetDateChanged(TDateChanged value)
Definition: record.hpp:112
CBioseqInfoRecord & SetSat(TSat value)
Definition: record.hpp:142
TSeqIds & GetSeqIds()
Definition: record.hpp:248
CBioseqInfoRecord & SetName(TName value)
Definition: record.hpp:184
CBioseqInfoRecord & SetLength(TLength value)
Definition: record.hpp:130
CBioseqInfoRecord & SetTaxId(TTaxId value)
Definition: record.hpp:178
CBioseqInfoRecord & SetSeqIdType(TSeqIdType value)
Definition: record.hpp:106
TGI GetGI() const
Definition: record.hpp:223
CBioseqInfoRecord & SetSatKey(TSatKey value)
Definition: record.hpp:148
CBioseqInfoRecord & SetState(TState value)
Definition: record.hpp:172
CBioseqInfoRecord & SetGI(TGI value)
Definition: record.hpp:124
CBioseqInfoRecord & SetMol(TMol value)
Definition: record.hpp:136
CBioseqInfoRecord & SetSeqState(TSeqState value)
Definition: record.hpp:166
static const TState kStateAlive
Definition: record.hpp:68
CBioseqInfoRecord & SetVersion(TVersion value)
Definition: record.hpp:100
CBioseqInfoRecord & SetHash(THash value)
Definition: record.hpp:118
CBioseqInfoRecord & SetAccession(const TAccession &value)
Definition: record.hpp:94
void x_InitializeAliveRecordQuery()
Definition: fetch.cpp:147
void Wait1() override
Definition: fetch.cpp:251
void x_PopulateRecord(CBioseqInfoRecord &record) const
Definition: fetch.cpp:203
unsigned int m_RestartCounter
Definition: fetch.hpp:98
vector< CBioseqInfoRecord > m_Records
Definition: fetch.hpp:92
void AllowInheritance(bool value)
Definition: fetch.cpp:104
bool x_InheritanceRequired()
Definition: fetch.cpp:160
CBioseqInfoFetchRequest m_Request
Definition: fetch.hpp:89
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
Definition: fetch.cpp:94
TBioseqInfoConsumeCallback m_ConsumeCallback
Definition: fetch.hpp:91
void SetConsumeCallback(TBioseqInfoConsumeCallback callback)
Definition: fetch.cpp:89
set< size_t > m_InheritanceRequired
Definition: fetch.hpp:93
void x_MergeSeqIds(CBioseqInfoRecord &target, CBioseqInfoRecord const &source)
Definition: fetch.cpp:234
unsigned int m_PageSize
Definition: fetch.hpp:97
bool x_IsMatchingNewRecord(CBioseqInfoRecord const &old_record, CBioseqInfoRecord const &new_record)
Definition: fetch.cpp:192
atomic< int32_t > m_State
shared_ptr< CCassConnection > m_Conn
string GetKeySpace() const
void SetDataReadyCB3(shared_ptr< CCassDataCallbackReceiver > datareadycb3)
void SetupQueryCB3(shared_ptr< CCassQuery > &query)
bool CheckReady(shared_ptr< CCassQuery > qry, unsigned int restart_counter, bool &need_repeat)
void CloseAll(void)
vector< SQueryRec > m_QueryArr
void Error(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
Definition: set.hpp:45
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
const_iterator begin() const
Definition: set.hpp:135
void clear()
Definition: set.hpp:153
bool empty() const
Definition: set.hpp:133
void erase(iterator pos)
Definition: set.hpp:151
const_iterator end() const
Definition: set.hpp:136
char value[7]
Definition: config.c:431
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
#define END_SCOPE(ns)
End the previously defined scope.
Definition: ncbistl.hpp:75
#define BEGIN_SCOPE(ns)
Define a new scope.
Definition: ncbistl.hpp:72
int i
static int version
Definition: mdb_load.c:29
const CharType(& source)[N]
Definition: pointer.h:1149
const CConstRef< CSeq_id > GetAccession(const CSeq_id_Handle &id_handle)
static char sql[1024]
Definition: putdata.c:19
static string query
Definition: _hash_fun.h:40
#define const
Definition: zconf.h:230
Modified on Thu Nov 30 04:57:27 2023 by modify_doxy.py rev. 669887