NCBI C++ ToolKit
accession_version_history_processor.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: accession_version_history_processor.cpp 103123 2024-09-11 18:57:02Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: accession version history processor
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 #include <util/xregexp/regexp.hpp>
34 
36 #include "pubseq_gateway.hpp"
40 #include "psgs_seq_id_utils.hpp"
41 
43 
44 using namespace std::placeholders;
45 
46 static const string kAccVerHistProcessorName = "Cassandra-accession-version-history";
47 
48 
50  m_RecordCount(0),
51  m_AccVerHistoryRequest(nullptr)
52 {}
53 
54 
56  shared_ptr<CPSGS_Request> request,
57  shared_ptr<CPSGS_Reply> reply,
58  TProcessorPriority priority) :
59  CPSGS_CassProcessorBase(request, reply, priority),
61  request, reply,
62  bind(&CPSGS_AccessionVersionHistoryProcessor::x_OnSeqIdResolveFinished,
63  this, _1),
64  bind(&CPSGS_AccessionVersionHistoryProcessor::x_OnSeqIdResolveError,
65  this, _1, _2, _3, _4),
66  bind(&CPSGS_AccessionVersionHistoryProcessor::x_OnResolutionGoodData,
67  this)),
68  m_RecordCount(0)
69 {
70  // Convenience to avoid calling
71  // m_Request->GetRequest<SPSGS_AccessionVersionHistoryRequest>() everywhere
73 }
74 
75 
77 {}
78 
79 
80 bool
81 CPSGS_AccessionVersionHistoryProcessor::CanProcess(shared_ptr<CPSGS_Request> request,
82  shared_ptr<CPSGS_Reply> reply) const
83 {
84  if (!IsCassandraProcessorEnabled(request))
85  return false;
86 
87  if (request->GetRequestType() != CPSGS_Request::ePSGS_AccessionVersionHistoryRequest)
88  return false;
89 
90  auto * app = CPubseqGatewayApp::GetInstance();
91  auto startup_data_state = app->GetStartupDataState();
92  if (startup_data_state != ePSGS_StartupDataOK) {
93  if (request->NeedTrace()) {
94  reply->SendTrace(kAccVerHistProcessorName + " processor cannot process "
95  "request because Cassandra DB is not available.\n" +
96  GetCassStartupDataStateMessage(startup_data_state),
97  request->GetStartTimestamp());
98  }
99  return false;
100  }
101 
102  return true;
103 }
104 
105 
108  shared_ptr<CPSGS_Request> request,
109  shared_ptr<CPSGS_Reply> reply,
110  TProcessorPriority priority) const
111 {
112  if (!CanProcess(request, reply))
113  return nullptr;
114 
115  return new CPSGS_AccessionVersionHistoryProcessor(request, reply, priority);
116 }
117 
118 
120 {
121  // Lock the request for all the cassandra processors so that the other
122  // processors may wait on the event
124 
125  // In both cases: sync or async resolution --> a callback will be called
127 }
128 
129 
130 // This callback is called in all cases when there is no valid resolution, i.e.
131 // 404, or any kind of errors
132 void
134  CRequestStatus::ECode status,
135  int code,
136  EDiagSev severity,
137  const string & message)
138 {
139  if (m_Canceled) {
141  return;
142  }
143 
144  CRequestContextResetter context_resetter;
145  IPSGS_Processor::m_Request->SetRequestContext();
146 
147  EPSGS_LoggingFlag logging_flag = ePSGS_NeedLogging;
148  if (status == CRequestStatus::e404_NotFound)
149  logging_flag = ePSGS_SkipLogging;
150  CountError(nullptr, status, code, severity, message,
151  logging_flag, ePSGS_NeedStatusUpdate);
152 
153  size_t item_id = IPSGS_Processor::m_Reply->GetItemId();
154 
155  IPSGS_Processor::m_Reply->PrepareBioseqMessage(
156  item_id, kAccVerHistProcessorName,
157  message, status, code, severity);
158  IPSGS_Processor::m_Reply->PrepareBioseqCompletion(
159  item_id, kAccVerHistProcessorName, 2);
160 
162 }
163 
164 
165 // This callback is called only in case of a valid resolution
166 void
168  SBioseqResolution && bioseq_resolution)
169 {
170  CRequestContextResetter context_resetter;
171  IPSGS_Processor::m_Request->SetRequestContext();
172 
173  x_SendBioseqInfo(bioseq_resolution);
174 
175  // Note: there is no need to translate the keyspace form the bioseq info.
176  // The keyspace is the same where si2csi/bioseq_info tables are
177 
178  // Initiate the accession history request
179  auto * app = CPubseqGatewayApp::GetInstance();
180  unique_ptr<CCassAccVerHistoryFetch> details;
181  details.reset(new CCassAccVerHistoryFetch(*m_AccVerHistoryRequest));
182 
183  // Note: the part of the resolution process is a accession substitution
184  // However the request must be done using the original accession and
185  // seq_id_type
186  string accession = StripTrailingVerticalBars(bioseq_resolution.GetOriginalAccession());
187  auto sat_info = app->GetBioseqKeyspace();
188  CCassAccVerHistoryTaskFetch * fetch_task =
189  new CCassAccVerHistoryTaskFetch(sat_info.connection,
190  sat_info.keyspace,
191  accession,
192  nullptr, nullptr,
193  0, // version is not used here
194  bioseq_resolution.GetOriginalSeqIdType());
195  details->SetLoader(fetch_task);
196 
197  fetch_task->SetConsumeCallback(
199  this,
201  this, _1, _2, _3),
202  details.get()));
203  fetch_task->SetErrorCB(
206  this, _1, _2, _3, _4, _5),
207  details.get()));
208  fetch_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
209 
210  if (IPSGS_Processor::m_Request->NeedTrace()) {
211  IPSGS_Processor::m_Reply->SendTrace("Cassandra request: " +
212  ToJsonString(*fetch_task),
213  IPSGS_Processor::m_Request->GetStartTimestamp());
214  }
215 
216  m_FetchDetails.push_back(std::move(details));
217  fetch_task->Wait();
218 }
219 
220 
221 void
223  SBioseqResolution & bioseq_resolution)
224 {
225  if (m_Canceled) {
227  return;
228  }
229 
230  if (bioseq_resolution.m_ResolutionResult == ePSGS_BioseqDB ||
231  bioseq_resolution.m_ResolutionResult == ePSGS_BioseqCache)
232  AdjustBioseqAccession(bioseq_resolution);
233 
234  size_t item_id = IPSGS_Processor::m_Reply->GetItemId();
235  auto data_to_send = ToJsonString(bioseq_resolution.GetBioseqInfo(),
237 
238  IPSGS_Processor::m_Reply->PrepareBioseqData(
239  item_id, kAccVerHistProcessorName, data_to_send,
241  IPSGS_Processor::m_Reply->PrepareBioseqCompletion(
242  item_id, kAccVerHistProcessorName, 2);
243 }
244 
245 
246 bool
248  SAccVerHistRec && acc_ver_hist_record,
249  bool last,
250  CCassAccVerHistoryFetch * fetch_details)
251 {
252  CRequestContextResetter context_resetter;
253  IPSGS_Processor::m_Request->SetRequestContext();
254 
255  if (IPSGS_Processor::m_Request->NeedTrace()) {
256  if (last) {
257  IPSGS_Processor::m_Reply->SendTrace(
258  "Accession version history no-more-data callback",
259  IPSGS_Processor::m_Request->GetStartTimestamp());
260  } else {
261  IPSGS_Processor::m_Reply->SendTrace(
262  "Accession version history data received",
263  IPSGS_Processor::m_Request->GetStartTimestamp());
264  }
265  }
266 
267  if (m_Canceled) {
268  fetch_details->GetLoader()->Cancel();
269  fetch_details->GetLoader()->ClearError();
270  fetch_details->SetReadFinished();
271 
273  return false;
274  }
275 
276  if (IPSGS_Processor::m_Reply->IsFinished()) {
278  this,
280  PSG_ERROR("Unexpected data received "
281  "while the output has finished, ignoring");
282 
285  return false;
286  }
287 
288  if (last) {
289  fetch_details->GetLoader()->ClearError();
290  fetch_details->SetReadFinished();
291 
292  if (m_RecordCount == 0)
294 
296  return false;
297  }
298 
299  ++m_RecordCount;
300  IPSGS_Processor::m_Reply->PrepareAccVerHistoryData(
301  kAccVerHistProcessorName, ToJsonString(acc_ver_hist_record));
302 
303  x_Peek(false);
304  return true;
305 }
306 
307 
308 void
310  CCassAccVerHistoryFetch * fetch_details,
311  CRequestStatus::ECode status,
312  int code,
313  EDiagSev severity,
314  const string & message)
315 {
316  CRequestContextResetter context_resetter;
317  IPSGS_Processor::m_Request->SetRequestContext();
318 
319  // It could be a message or an error
320  CountError(fetch_details, status, code, severity, message,
322  bool is_error = IsError(severity);
323 
324  IPSGS_Processor::m_Reply->PrepareProcessorMessage(
325  IPSGS_Processor::m_Reply->GetItemId(),
326  kAccVerHistProcessorName, message, status, code, severity);
327 
328  // To avoid sending an error in Peek()
329  fetch_details->GetLoader()->ClearError();
330 
331  if (is_error) {
332  // There will be no more activity
333  fetch_details->SetReadFinished();
335  } else {
336  x_Peek(false);
337  }
338 }
339 
340 
342 {
343  auto status = CPSGS_CassProcessorBase::GetStatus();
344  if (status == IPSGS_Processor::ePSGS_InProgress)
345  return status;
346 
347  if (m_Canceled)
349 
350  return status;
351 }
352 
353 
355 {
357 }
358 
359 
361 {
363 }
364 
365 
367 {
368  x_Peek(true);
369 }
370 
371 
373 {
374  if (m_Canceled) {
376  return;
377  }
378 
379  // 1 -> call m_Loader->Wait1 to pick data
380  // 2 -> check if we have ready-to-send buffers
381  // 3 -> call reply->Send() to send what we have if it is ready
382  bool overall_final_state = false;
383 
384  while (true) {
385  auto initial_size = m_FetchDetails.size();
386 
387  for (auto & details: m_FetchDetails) {
388  if (details) {
389  if (details->InPeek()) {
390  continue;
391  }
392  details->SetInPeek(true);
393  overall_final_state |= x_Peek(details, need_wait);
394  details->SetInPeek(false);
395  }
396  }
397  if (initial_size == m_FetchDetails.size())
398  break;
399  }
400 
401  // Ready packets needs to be send only once when everything is finished
402  if (overall_final_state) {
403  if (AreAllFinishedRead()) {
406  }
407  }
408 }
409 
410 
411 bool CPSGS_AccessionVersionHistoryProcessor::x_Peek(unique_ptr<CCassFetch> & fetch_details,
412  bool need_wait)
413 {
414  if (!fetch_details->GetLoader())
415  return true;
416 
417  bool final_state = false;
418  if (need_wait)
419  if (!fetch_details->ReadFinished()) {
420  final_state = fetch_details->GetLoader()->Wait();
421  }
422 
423  if (fetch_details->GetLoader()->HasError() &&
424  IPSGS_Processor::m_Reply->IsOutputReady() &&
425  ! IPSGS_Processor::m_Reply->IsFinished()) {
426  // Send an error
427  string error = fetch_details->GetLoader()->LastError();
428  auto * app = CPubseqGatewayApp::GetInstance();
429 
430  PSG_ERROR(error);
431 
432  // Last resort to detect if it was a timeout
433  CRequestStatus::ECode status;
434  if (IsTimeoutError(error)) {
436  app->GetCounters().Increment(this,
438  } else {
440  app->GetCounters().Increment(this,
442  }
443 
444  IPSGS_Processor::m_Reply->PrepareProcessorMessage(
445  IPSGS_Processor::m_Reply->GetItemId(),
448 
449  // Mark finished
450  UpdateOverallStatus(status);
451  fetch_details->GetLoader()->ClearError();
452  fetch_details->SetReadFinished();
454  }
455 
456  return final_state;
457 }
458 
459 
461 {
462  // The resolution process started to receive data which look good so
463  // the dispatcher should be notified that the other processors can be
464  // stopped
465  if (m_Canceled) {
467  return;
468  }
469 
470  if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
472  }
473 
474  // If the other processor waits then let it go but after sending the signal
475  // of the good data (it may cancel the other processors)
477 }
478 
static const string kAccVerHistProcessorName
const string kCassandraProcessorGroupName
const string kCassandraProcessorEvent
CCassAccVerHistoryTaskFetch * GetLoader(void)
Definition: cass_fetch.hpp:237
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
Definition: fetch.cpp:73
void SetConsumeCallback(TAccVerHistConsumeCallback callback)
Definition: fetch.cpp:68
void SetErrorCB(TDataErrorCallback error_cb)
virtual void Cancel()
void SetReadFinished(void)
Definition: cass_fetch.hpp:89
void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)
void x_OnAccVerHistError(CCassAccVerHistoryFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const
Tells if processor can process the given request.
virtual string GetGroupName(void) const
Tells the processor group name.
virtual IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const
Create processor to fulfil PSG request using the data source.
bool x_OnAccVerHistData(SAccVerHistRec &&acc_ver_hist_record, bool last, CCassAccVerHistoryFetch *fetch_details)
virtual string GetName(void) const
Tells the processor name (used in logging and tracing)
void x_OnSeqIdResolveFinished(SBioseqResolution &&bioseq_resolution)
void x_SendBioseqInfo(SBioseqResolution &bioseq_resolution)
virtual void Process(void)
Main processing function.
virtual void ProcessEvent(void)
Called when an event happened which may require to have some processing.
void x_OnSeqIdResolveError(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
virtual EPSGS_Status GetStatus(void)
Tells the processor status (if it has finished or in progress)
SPSGS_AccessionVersionHistoryRequest * m_AccVerHistoryRequest
EPSGS_AccessionAdjustmentResult AdjustBioseqAccession(SBioseqResolution &bioseq_resolution)
CRequestStatus::ECode CountError(CCassFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message, EPSGS_LoggingFlag logging_flag, EPSGS_StatusUpdateFlag status_update_flag)
void UpdateOverallStatus(CRequestStatus::ECode status)
bool IsCassandraProcessorEnabled(shared_ptr< CPSGS_Request > request) const
bool IsError(EDiagSev severity) const
list< unique_ptr< CCassFetch > > m_FetchDetails
bool AreAllFinishedRead(void) const
IPSGS_Processor::EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
bool IsTimeoutError(const string &msg) const
@ ePSGS_SendAccumulated
Definition: psgs_reply.hpp:54
@ ePSGS_AccessionVersionHistoryRequest
void ResolveInputSeqId(void)
CPSGSCounters & GetCounters(void)
static CPubseqGatewayApp * GetInstance(void)
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
static DLIST_TYPE *DLIST_NAME() last(DLIST_LIST_TYPE *list)
Definition: dlist.tmpl.h:51
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
#define nullptr
Definition: ncbimisc.hpp:45
string StripTrailingVerticalBars(const string &seq_id)
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
#define PSG_ERROR(message)
@ ePSGS_StartupDataOK
int TProcessorPriority
@ ePSGS_NeedStatusUpdate
@ ePSGS_UnknownError
@ ePSGS_BioseqDB
@ ePSGS_BioseqCache
@ ePSGS_SkipLogging
@ ePSGS_NeedLogging
string GetCassStartupDataStateMessage(EPSGS_StartupDataState state)
EPSGS_ResolutionResult m_ResolutionResult
CBioseqInfoRecord & GetBioseqInfo(void)
Definition: inftrees.h:24
C++ wrappers for the Perl-compatible regular expression (PCRE) library.
Modified on Fri Sep 20 14:57:57 2024 by modify_doxy.py rev. 669887