NCBI C++ ToolKit
async_resolve_base.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: async_resolve_base.cpp 103123 2024-09-11 18:57:02Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: base class for processors which need to resolve seq_id
29  * asynchronously
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
36 #include <corelib/ncbidiag.hpp>
37 
38 #include "pubseq_gateway.hpp"
39 #include "pubseq_gateway_utils.hpp"
41 #include "cass_fetch.hpp"
42 #include "psgs_request.hpp"
43 #include "psgs_reply.hpp"
44 #include "insdc_utils.hpp"
45 #include "async_resolve_base.hpp"
46 #include "insdc_utils.hpp"
49 #include "psgs_seq_id_utils.hpp"
50 
56 
57 using namespace std::placeholders;
58 
59 
61 {
63  err.m_ErrorMessage = msg;
64  err.m_ErrorCode = code;
65  m_Errors.push_back(err);
66 }
67 
68 
69 string CPSGSResolveErrors::GetCombinedErrorMessage(const list<SPSGSeqId> & seq_id_to_resolve) const
70 {
71  if (m_Errors.empty())
72  return "";
73 
74  size_t err_count = m_Errors.size();
75  string msg = "\n" + to_string(err_count) + " error";
76 
77  if (err_count > 1)
78  msg += "s";
79  msg += " encountered while resolving ";
80 
81  if (seq_id_to_resolve.size() == 1) {
82  msg += "the seq id:\n";
83  } else {
84  msg += "multiple seq ids:\n";
85  }
86 
87  for (size_t index=0; index < err_count; ++index) {
88  if (index != 0) {
89  msg += "\n";
90  }
91  msg += m_Errors[index].m_ErrorMessage;
92  }
93  return msg;
94 }
95 
96 
98 {
99  // The method is called only in case of errors including not found.
100  // If the resolve is plainly not done then there are no errors per se
101  // but the overall outcome is 404, so the initial value for the combined
102  // error code is 404 even if the list of errors is empty.
104  for (const auto & item : m_Errors) {
105  combined_code = max(combined_code, item.m_ErrorCode);
106  }
107  return combined_code;
108 }
109 
110 
112 {}
113 
114 
116  shared_ptr<CPSGS_Request> request,
117  shared_ptr<CPSGS_Reply> reply,
118  TContinueResolveCB continue_resolve_cb,
119  TSeqIdResolutionFinishedCB finished_cb,
120  TSeqIdResolutionErrorCB error_cb,
121  TSeqIdResolutionStartProcessingCB start_processing_cb) :
122  m_ContinueResolveCB(continue_resolve_cb),
123  m_FinishedCB(finished_cb),
124  m_ErrorCB(error_cb),
125  m_StartProcessingCB(start_processing_cb),
126  m_ResolveStage(eInit),
127  m_SecondaryIndex(0),
128  m_CurrentFetch(nullptr),
129  m_NoSeqIdTypeFetch(nullptr),
130  m_StartProcessingCalled(false)
131 {}
132 
133 
135 {}
136 
137 
138 void
140  int16_t effective_seq_id_type,
141  list<string> && secondary_id_list,
142  string && primary_seq_id,
143  bool composed_ok,
144  SBioseqResolution && bioseq_resolution)
145 {
146  m_ComposedOk = composed_ok;
147  m_PrimarySeqId = std::move(primary_seq_id);
148  m_EffectiveVersion = effective_version;
149  m_EffectiveSeqIdType = effective_seq_id_type;
150  m_SecondaryIdList = std::move(secondary_id_list);
151  m_BioseqResolution = std::move(bioseq_resolution);
152  m_AsyncCassResolutionStart = psg_clock_t::now();
153 
155  m_CurrentFetch = nullptr;
156  m_NoSeqIdTypeFetch = nullptr;
157 
158  x_Process();
159 }
160 
161 
162 int16_t
164 {
165  try {
166  if (text_seq_id == nullptr)
167  return -1;
168  if (text_seq_id->CanGetVersion())
169  return text_seq_id->GetVersion();
170  } catch (...) {
171  }
172  return -1;
173 }
174 
175 
177 {
178  string seq_ids;
179  bool need_comma = false;
180  for (const auto & item: m_SeqIdsToResolve) {
181  if (need_comma)
182  seq_ids += ", ";
183  seq_ids += item.seq_id;
184  need_comma = true;
185  }
186  return seq_ids;
187 }
188 
189 
191 {
192  m_SeqIdsToResolve.clear();
193  if (m_Request->GetRequestType() == CPSGS_Request::ePSGS_AnnotationRequest) {
194  // Special logic to select from seq_id/seq_id_type and other_seq_ids
195  string seq_id = x_GetRequestSeqId();
196 
197  if (!seq_id.empty()) {
198  m_SeqIdsToResolve.push_back(SPSGSeqId{x_GetRequestSeqIdType(), seq_id});
199  }
200 
201  SPSGS_AnnotRequest & annot_request = m_Request->GetRequest<SPSGS_AnnotRequest>();
202  for (const auto & item : annot_request.m_SeqIds) {
203  m_SeqIdsToResolve.push_back(SPSGSeqId{-1, item});
204  }
205 
206  if (m_SeqIdsToResolve.size() > 1) {
207  if (m_Request->NeedTrace()) {
208  m_Reply->SendTrace("The seq_ids to resolve list before sorting: " +
210  m_Request->GetStartTimestamp());
211  }
212  }
213 
214  // Sort the seq ids to resolve so that the most likely to resolve are
215  // in front
217 
218  if (m_SeqIdsToResolve.size() > 1) {
219  if (m_Request->NeedTrace()) {
220  m_Reply->SendTrace("The seq_ids to resolve list after sorting: " +
222  m_Request->GetStartTimestamp());
223  }
224  }
226  } else {
227  // Generic case: use what is coming from the user request
230  }
231 }
232 
233 
235  int16_t seq_id_type)
236 {
237  m_SeqIdsToResolve.clear();
238  m_SeqIdsToResolve.push_back(SPSGSeqId{seq_id_type, seq_id});
240 }
241 
242 
243 string
245 {
246  switch (m_Request->GetRequestType()) {
248  return m_Request->GetRequest<SPSGS_ResolveRequest>().m_SeqId;
250  return m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>().m_SeqId;
252  return m_Request->GetRequest<SPSGS_AnnotRequest>().m_SeqId;
254  return m_Request->GetRequest<SPSGS_AccessionVersionHistoryRequest>().m_SeqId;
255  default:
256  break;
257  }
259  "Not handled request type " +
260  CPSGS_Request::TypeToString(m_Request->GetRequestType()));
261 }
262 
263 
264 int16_t
266 {
267  switch (m_Request->GetRequestType()) {
269  return m_Request->GetRequest<SPSGS_ResolveRequest>().m_SeqIdType;
271  return m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>().m_SeqIdType;
273  return m_Request->GetRequest<SPSGS_AnnotRequest>().m_SeqIdType;
275  return m_Request->GetRequest<SPSGS_AccessionVersionHistoryRequest>().m_SeqIdType;
276  default:
277  break;
278  }
280  "Not handled request type " +
281  CPSGS_Request::TypeToString(m_Request->GetRequestType()));
282 }
283 
284 
287 {
288  if (m_Request->GetRequestType() == CPSGS_Request::ePSGS_ResolveRequest)
289  return m_Request->GetRequest<SPSGS_ResolveRequest>().m_IncludeDataFlags;
291 }
292 
293 
294 bool
296 {
297  return (GetBioseqInfoFields() &
299 }
300 
301 
302 // The method tells if the BIOSEQ_INFO record needs to be retrieved.
303 // It can be skipped under very specific conditions.
304 // It makes sense if the source of data is SI2CSI, i.e. only key fields are
305 // available.
306 bool
308  const CBioseqInfoRecord & bioseq_info_record)
309 {
310  if (m_Request->GetRequestType() != CPSGS_Request::ePSGS_ResolveRequest)
311  return false; // The get request supposes the full bioseq info
312 
314  return false; // In the resolve request more bioseq_info fields are requested
315 
316 
317  auto seq_id_type = bioseq_info_record.GetSeqIdType();
318  if (bioseq_info_record.GetVersion() > 0 && seq_id_type != CSeq_id::e_Gi)
319  return true; // This combination in data never requires accession adjustments
320 
321  auto include_flags = m_Request->GetRequest<SPSGS_ResolveRequest>().m_IncludeDataFlags;
322  if ((include_flags & ~SPSGS_ResolveRequest::fPSGS_Gi) == 0)
323  return true; // Only GI field or no fields are requested so no accession
324  // adjustments are required
325 
326  auto acc_subst = m_Request->GetRequest<SPSGS_ResolveRequest>().m_AccSubstOption;
328  return true; // No accession adjustments anyway so key fields are enough
329 
331  seq_id_type != CSeq_id::e_Gi)
332  return true; // No accession adjustments required
333 
334  return false;
335 }
336 
337 
340 {
341  // The substitution makes sense only for resolve/get/annot requests
342  switch (m_Request->GetRequestType()) {
344  return m_Request->GetRequest<SPSGS_ResolveRequest>().m_AccSubstOption;
346  return m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>().m_AccSubstOption;
353  default:
354  break;
355  }
357  "Not handled request type " +
358  to_string(static_cast<int>(m_Request->GetRequestType())));
359 }
360 
361 
364  SBioseqResolution & bioseq_resolution)
365 {
366  if (CanSkipBioseqInfoRetrieval(bioseq_resolution.GetBioseqInfo())) {
367  if (m_Request->NeedTrace()) {
368  m_Reply->SendTrace("Accession adjustment is not required "
369  "(bioseq info is not provided)",
370  m_Request->GetStartTimestamp());
371  }
372  return ePSGS_NotRequired;
373  }
374 
375  auto acc_subst_option = GetAccessionSubstitutionOption();
376  if (acc_subst_option == SPSGS_RequestBase::ePSGS_NeverAccSubstitute) {
377  if (m_Request->NeedTrace()) {
378  m_Reply->SendTrace("Accession adjustment is not required "
379  "(substitute option is 'never')",
380  m_Request->GetStartTimestamp());
381  }
382  return ePSGS_NotRequired;
383  }
384 
385  auto seq_id_type = bioseq_resolution.GetBioseqInfo().GetSeqIdType();
386  auto version = bioseq_resolution.GetBioseqInfo().GetVersion();
387  if (version == 0) {
388  if (acc_subst_option == SPSGS_RequestBase::ePSGS_DefaultAccSubstitution) {
389  if (seq_id_type == CSeq_id::e_Pdb ||
390  seq_id_type == CSeq_id::e_Pir ||
391  seq_id_type == CSeq_id::e_Prf) {
392  // For them there is no substitution
393  if (m_Request->NeedTrace()) {
394  m_Reply->SendTrace("Accession adjustment is not required "
395  "(It is PDB, PIR or PRF with version == 0 "
396  "and substitute option is 'default')",
397  m_Request->GetStartTimestamp());
398  }
399  return ePSGS_NotRequired;
400  }
401  }
402  }
403 
404 
405  if (acc_subst_option == SPSGS_RequestBase::ePSGS_LimitedAccSubstitution &&
406  seq_id_type != CSeq_id::e_Gi) {
407  if (m_Request->NeedTrace()) {
408  m_Reply->SendTrace("Accession adjustment is not required "
409  "(substitute option is 'limited' and seq_id_type is not gi)",
410  m_Request->GetStartTimestamp());
411  }
412  return ePSGS_NotRequired;
413  }
414 
415  auto adj_result = bioseq_resolution.AdjustAccession(m_Request, m_Reply);
416  if (adj_result == ePSGS_LogicError ||
417  adj_result == ePSGS_SeqIdsEmpty) {
418  if (bioseq_resolution.m_ResolutionResult == ePSGS_BioseqCache)
419  PSG_WARNING("BIOSEQ_INFO cache error: " +
420  bioseq_resolution.m_AdjustmentError);
421  else
422  PSG_WARNING("BIOSEQ_INFO Cassandra error: " +
423  bioseq_resolution.m_AdjustmentError);
424  }
425  return adj_result;
426 }
427 
428 
429 // The method is called when:
430 // - resolution is initialized
431 // - there was no record found at any stage, i.e. a next try should be
432 // initiated
433 // NB: if a record was found, the method is not called. Instead, the pending
434 // operation class is called back directly
436 {
437  switch (m_ResolveStage) {
438  case eInit:
439  if (!m_ComposedOk) {
440  // The only thing to try is the AsIs resolution
442  x_Process();
443  break;
444  }
445 
446  if (m_PrimarySeqId.empty()) {
448  x_Process();
449  break;
450  }
451 
453  x_Process();
454  break;
455 
456  case ePrimaryBioseq:
458 
459  // true => with seq_id_type
461  m_EffectiveSeqIdType, -1, true);
462  break;
463 
464  case eSecondarySi2csi:
465  // loop over all secondary seq_id
466  if (m_SecondaryIndex >= m_SecondaryIdList.size()) {
468  x_Process();
469  break;
470  }
473  break;
474 
475  case eSecondaryAsIs:
478  break;
479 
480  case ePostSi2Csi:
481  // Really, there is no stage after that. This is post processing.
482  // What is done is defined in the found or error callbacks.
483  // true => with seq_id_type
489  true);
490  break;
491 
492  case eFinished:
493  default:
494  // 'not found' of PendingOperation
497 
499  }
500 }
501 
502 
503 void
505  const CBioseqInfoRecord::TAccession & seq_id,
507  CBioseqInfoRecord::TSeqIdType seq_id_type,
509  bool with_seq_id_type)
510 {
514  m_BioseqInfoRequestedSeqIdType = seq_id_type;
516 
517  unique_ptr<CCassBioseqInfoFetch> details;
518  details.reset(new CCassBioseqInfoFetch());
519 
520  CBioseqInfoFetchRequest bioseq_info_request;
521  bioseq_info_request.SetAccession(StripTrailingVerticalBars(seq_id));
522  if (version != -1)
523  bioseq_info_request.SetVersion(version);
524  if (with_seq_id_type) {
525  if (seq_id_type != -1)
526  bioseq_info_request.SetSeqIdType(seq_id_type);
527  }
528  if (gi != -1)
529  bioseq_info_request.SetGI(gi);
530 
531  auto bioseq_keyspace = CPubseqGatewayApp::GetInstance()->GetBioseqKeyspace();
532  CCassBioseqInfoTaskFetch * fetch_task =
533  new CCassBioseqInfoTaskFetch(bioseq_keyspace.connection,
534  bioseq_keyspace.keyspace,
535  bioseq_info_request,
536  nullptr, nullptr);
537  details->SetLoader(fetch_task);
538 
539  if (with_seq_id_type)
540  fetch_task->SetConsumeCallback(
541  std::bind(&CPSGS_AsyncResolveBase::x_OnBioseqInfo, this, _1));
542  else
543  fetch_task->SetConsumeCallback(
545 
546  fetch_task->SetErrorCB(
547  std::bind(&CPSGS_AsyncResolveBase::x_OnBioseqInfoError, this, _1, _2, _3, _4));
548  fetch_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
549 
550  m_BioseqInfoStart = psg_clock_t::now();
551  if (with_seq_id_type) {
552  m_CurrentFetch = details.release();
553  m_FetchDetails.push_back(unique_ptr<CCassFetch>(m_CurrentFetch));
554  } else {
555  m_NoSeqIdTypeFetch = details.release();
556  m_FetchDetails.push_back(unique_ptr<CCassFetch>(m_NoSeqIdTypeFetch));
557  }
558 
559  if (m_Request->NeedTrace()) {
560  if (with_seq_id_type)
561  m_Reply->SendTrace(
562  "Cassandra request: " +
563  ToJsonString(bioseq_info_request),
564  m_Request->GetStartTimestamp());
565  else
566  m_Reply->SendTrace(
567  "Cassandra request for INSDC types: " +
568  ToJsonString(bioseq_info_request),
569  m_Request->GetStartTimestamp());
570  }
571 
572  fetch_task->Wait();
573 }
574 
575 
576 void CPSGS_AsyncResolveBase::x_PrepareSi2csiQuery(const string & secondary_id,
577  int16_t effective_seq_id_type)
578 {
580 
581  unique_ptr<CCassSi2csiFetch> details;
582  details.reset(new CCassSi2csiFetch());
583 
584  CSi2CsiFetchRequest si2csi_request;
585  si2csi_request.SetSecSeqId(StripTrailingVerticalBars(secondary_id));
586  if (effective_seq_id_type != -1)
587  si2csi_request.SetSecSeqIdType(effective_seq_id_type);
588 
589  auto bioseq_keyspace = CPubseqGatewayApp::GetInstance()->GetBioseqKeyspace();
590  CCassSI2CSITaskFetch * fetch_task =
591  new CCassSI2CSITaskFetch(bioseq_keyspace.connection,
592  bioseq_keyspace.keyspace,
593  si2csi_request,
594  nullptr, nullptr);
595 
596  details->SetLoader(fetch_task);
597 
598  fetch_task->SetConsumeCallback(std::bind(&CPSGS_AsyncResolveBase::x_OnSi2csiRecord, this, _1));
599  fetch_task->SetErrorCB(std::bind(&CPSGS_AsyncResolveBase::x_OnSi2csiError, this, _1, _2, _3, _4));
600  fetch_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
601 
602  m_CurrentFetch = details.release();
603 
604  m_Si2csiStart = psg_clock_t::now();
605  m_FetchDetails.push_back(unique_ptr<CCassFetch>(m_CurrentFetch));
606 
607  if (m_Request->NeedTrace())
608  m_Reply->SendTrace(
609  "Cassandra request: " +
610  ToJsonString(si2csi_request),
611  m_Request->GetStartTimestamp());
612 
613  fetch_task->Wait();
614 }
615 
616 
618 {
619  // Use m_SecondaryIndex, it was properly formed in the state machine
623 }
624 
625 
627 {
628  // Need to capitalize the seq_id before going to the tables.
629  // Capitalizing in place suites because the other tries are done via copies
630  // provided by OSLT
631  auto upper_request_seq_id = m_CurrentSeqIdToResolve->seq_id;
632  NStr::ToUpper(upper_request_seq_id);
633 
634  if (upper_request_seq_id == m_PrimarySeqId &&
636  // Such a request has already been made; it was because the primary id
637  // matches the one from URL
638  x_Process();
639  } else {
640  x_PrepareSi2csiQuery(upper_request_seq_id,
641  m_CurrentSeqIdToResolve->seq_id_type);
642  }
643 }
644 
645 
646 void CPSGS_AsyncResolveBase::x_OnBioseqInfo(vector<CBioseqInfoRecord>&& records)
647 {
648  auto record_count = records.size();
649  auto app = CPubseqGatewayApp::GetInstance();
650 
653 
654  if (m_Request->NeedTrace()) {
655  string msg = to_string(records.size()) + " hit(s)";
656  for (const auto & item : records) {
658  }
659  m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
660  }
661 
662  ssize_t index_to_pick = 0;
663  if (record_count > 1) {
664  index_to_pick = SelectBioseqInfoRecord(records);
665  if (index_to_pick < 0) {
666  if (m_Request->NeedTrace()) {
667  m_Reply->SendTrace(
668  to_string(records.size()) + " bioseq info records were "
669  "found however it was impossible to choose one of them. "
670  "So report as not found",
671  m_Request->GetStartTimestamp());
672  }
673  } else {
674  // Pretend there was exactly one record
675  record_count = 1;
676  }
677  }
678 
679  if (record_count != 1) {
680  // Did not find anything. Need more tries
681  if (record_count > 1) {
682  app->GetTiming().Register(this, eLookupCassBioseqInfo, eOpStatusFound,
684  app->GetCounters().Increment(this,
686  } else {
687  app->GetTiming().Register(this, eLookupCassBioseqInfo, eOpStatusNotFound,
689  app->GetCounters().Increment(this,
691  }
692 
693  if (record_count == 0 && IsINSDCSeqIdType(m_BioseqInfoRequestedSeqIdType)) {
697  false);
698  return;
699  }
700 
701  if (m_ResolveStage == ePostSi2Csi) {
702  // Special case for post si2csi results; no next stage
703 
704  string msg = "Data inconsistency. ";
705  if (record_count > 1) {
706  msg += "More than one BIOSEQ_INFO table record is found for "
707  "accession " + m_BioseqResolution.GetBioseqInfo().GetAccession();
708  } else {
709  msg += "A BIOSEQ_INFO table record is not found for "
710  "accession " + m_BioseqResolution.GetBioseqInfo().GetAccession();
711  }
712 
714 
715  // May be there is more seq_id/seq_id_type to try
716  if (MoveToNextSeqId()) {
717  m_ContinueResolveCB(); // Call resolution again
718  return;
719  }
720 
721  m_ErrorCB(
727  return;
728  }
729 
730  x_Process();
731  return;
732  }
733 
734  // Looking good data have appeared => inform the upper level
736 
737  if (m_Request->NeedTrace()) {
738  string prefix;
739  if (records.size() == 1)
740  prefix = "Selected record:\n";
741  else
742  prefix = "Record with max version (and max date changed if "
743  "more than one with max version) selected "
744  "(SEQ_STATE_LIFE records are checked first)\n";
745  m_Reply->SendTrace(
746  prefix +
747  ToJsonString(records[index_to_pick],
749  m_Request->GetStartTimestamp());
750  }
751 
753  m_BioseqResolution.SetBioseqInfo(records[index_to_pick]);
754 
755  // Adjust accession if needed
756  auto adj_result = AdjustBioseqAccession(m_BioseqResolution);
757  if (adj_result == ePSGS_LogicError || adj_result == ePSGS_SeqIdsEmpty) {
758  // The problem has already been logged
759 
760  string msg = "BIOSEQ_INFO Cassandra error: " +
763 
764  // May be there is more seq_id/seq_id_type to try
765  if (MoveToNextSeqId()) {
766  m_ContinueResolveCB(); // Call resolution again
767  return;
768  }
769 
770  m_ErrorCB(
776  return;
777  }
778 
779  // Everything is fine
780  app->GetTiming().Register(this, eLookupCassBioseqInfo, eOpStatusFound,
782  app->GetCounters().Increment(this,
784 
786 }
787 
788 
790  vector<CBioseqInfoRecord>&& records)
791 {
794 
795  auto app = CPubseqGatewayApp::GetInstance();
797 
798  if (m_Request->NeedTrace()) {
799  string msg = to_string(records.size()) +
800  " hit(s); decision status: " + to_string(decision.status);
801  for (const auto & item : records) {
803  }
804  m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
805  }
806 
807  switch (decision.status) {
809  // Looking good data have appeared => inform the upper level
811 
813 
814  app->GetTiming().Register(this, eLookupCassBioseqInfo, eOpStatusFound,
816  app->GetCounters().Increment(this,
818  m_BioseqResolution.SetBioseqInfo(records[decision.index]);
819 
820  // Data callback
822  break;
824  app->GetTiming().Register(this, eLookupCassBioseqInfo, eOpStatusNotFound,
826  app->GetCounters().Increment(this,
828  if (m_ResolveStage == ePostSi2Csi) {
829 
830  string msg = "Data inconsistency. A BIOSEQ_INFO table record "
831  "is not found for accession " +
834 
835  // May be there is more seq_id/seq_id_type to try
836  if (MoveToNextSeqId()) {
837  m_ContinueResolveCB(); // Call resolution again
838  return;
839  }
840 
841  m_ErrorCB(
847  } else {
848  // Move to the next stage
849  x_Process();
850  }
851  break;
853  app->GetTiming().Register(this, eLookupCassBioseqInfo, eOpStatusFound,
855  app->GetCounters().Increment(this,
857  if (m_ResolveStage == ePostSi2Csi) {
858  string msg = "Data inconsistency. More than one BIOSEQ_INFO "
859  "table record is found for accession " +
861 
863 
864  // May be there is more seq_id/seq_id_type to try
865  if (MoveToNextSeqId()) {
866  m_ContinueResolveCB(); // Call resolution again
867  return;
868  }
869 
870  m_ErrorCB(
876 
877  } else {
878  // Move to the next stage
879  x_Process();
880  }
881  break;
882  default:
883  // Impossible
884  {
885  string msg = "Unexpected decision code when a secondary INSCD "
886  "request results processed while resolving seq id asynchronously";
888 
889  // May be there is more seq_id/seq_id_type to try
890  if (MoveToNextSeqId()) {
891  m_ContinueResolveCB(); // Call resolution again
892  return;
893  }
894 
895  m_ErrorCB(
897  eDiag_Error,
901  }
902  }
903 }
904 
905 
907  EDiagSev severity, const string & message)
908 {
909  if (m_CurrentFetch) {
912  }
913  if (m_NoSeqIdTypeFetch) {
916  }
917 
918  if (!IsTimeoutError(code)) {
920  this,
922  }
923 
924  m_ResolveErrors.AppendError(message, status);
925 
926  // May be there is more seq_id/seq_id_type to try
927  if (MoveToNextSeqId()) {
928  m_ContinueResolveCB(); // Call resolution again
929  return;
930  }
931 
936 }
937 
938 
939 void CPSGS_AsyncResolveBase::x_OnSi2csiRecord(vector<CSI2CSIRecord> && records)
940 {
941  auto record_count = records.size();
942  auto app = CPubseqGatewayApp::GetInstance();
943 
946 
947  if (m_Request->NeedTrace()) {
948  string msg = to_string(record_count) + " hit(s)";
949  for (const auto & item : records) {
950  msg += "\n" + ToJsonString(item);
951  }
952  if (record_count > 1)
953  msg += "\nMore than one record => may be more tries";
954 
955  m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
956  }
957 
958  if (record_count != 1) {
959  // Multiple records or did not find anything. Need more tries
960  if (record_count > 1) {
961  app->GetTiming().Register(this, eLookupCassSi2csi, eOpStatusFound,
962  m_Si2csiStart);
963  app->GetCounters().Increment(this,
965  } else {
966  app->GetTiming().Register(this, eLookupCassSi2csi, eOpStatusNotFound,
967  m_Si2csiStart);
968  app->GetCounters().Increment(this,
970  }
971 
972  x_Process();
973  return;
974  }
975 
976  // Looking good data have appeared
978 
979  app->GetTiming().Register(this, eLookupCassSi2csi, eOpStatusFound,
980  m_Si2csiStart);
981  app->GetCounters().Increment(this,
983 
984  CBioseqInfoRecord bioseq_info;
985  bioseq_info.SetAccession(records[0].GetAccession());
986  bioseq_info.SetVersion(records[0].GetVersion());
987  bioseq_info.SetSeqIdType(records[0].GetSeqIdType());
988  bioseq_info.SetGI(records[0].GetGI());
989 
990  m_BioseqResolution.SetBioseqInfo(bioseq_info);
992 
993  // Special case for the seq_id like gi|156232
996  x_Process();
997  return;
998  }
999 
1001 }
1002 
1003 
1005  EDiagSev severity, const string & message)
1006 {
1009 
1010  if (!IsTimeoutError(code)) {
1012  this,
1014  }
1015 
1016  m_ResolveErrors.AppendError(message, status);
1017 
1018  // May be there is more seq_id/seq_id_type to try
1019  if (MoveToNextSeqId()) {
1020  m_ContinueResolveCB(); // Call resolution again
1021  return;
1022  }
1023 
1028 }
1029 
1030 
1032 {
1034  return false;
1035 
1036  string current_seq_id = m_CurrentSeqIdToResolve->seq_id;
1038 
1040  return false;
1041  }
1042 
1043  if (m_Request->NeedTrace()) {
1044  m_Reply->SendTrace("Could not resolve seq_id " + current_seq_id +
1045  ". There are more seq_id to try, switching to the next one.",
1046  m_Request->GetStartTimestamp());
1047  }
1048 
1049  return true;
1050 }
1051 
1052 
1054 {
1055  string msg = "Could not resolve ";
1056 
1057  if (m_SeqIdsToResolve.size() == 1) {
1058  msg += "seq_id " + SanitizeInputValue(m_SeqIdsToResolve.begin()->seq_id);
1059  } else {
1060  msg += "any of the seq_ids: ";
1061  bool is_first = true;
1062  for (const auto & item : m_SeqIdsToResolve) {
1063  if (!is_first)
1064  msg += ", ";
1065  msg += SanitizeInputValue(item.seq_id);
1066  is_first = false;
1067  }
1068  }
1069 
1070  return msg;
1071 }
1072 
1073 
1074 void
1076  SBioseqResolution && async_bioseq_resolution)
1077 {
1078  auto app = CPubseqGatewayApp::GetInstance();
1079 
1080  if (async_bioseq_resolution.IsValid()) {
1081  // Just in case; the second call will be prevented anyway
1083 
1084  m_FinishedCB(std::move(async_bioseq_resolution));
1085  } else {
1086  // Could not resolve by some reasons.
1087  // May be there is more seq_id/seq_id_type to try
1088  if (MoveToNextSeqId()) {
1089  m_ContinueResolveCB(); // Call resolution again
1090  return;
1091  }
1092 
1093  app->GetCounters().Increment(this,
1095 
1096  string msg = GetCouldNotResolveMessage();
1097 
1098  if (async_bioseq_resolution.m_Error.HasError()) {
1099  m_ResolveErrors.AppendError(async_bioseq_resolution.m_Error.m_ErrorMessage,
1100  async_bioseq_resolution.m_Error.m_ErrorCode);
1101  }
1102 
1103  if (m_ResolveErrors.HasErrors()) {
1105  }
1106 
1109  }
1110 }
1111 
1112 
1113 
1115 {
1116  if (!m_StartProcessingCalled) {
1117  m_StartProcessingCalled = true;
1119  }
1120 }
1121 
USING_SCOPE(objects)
USING_IDBLOB_SCOPE
function< void(SBioseqResolution &&async_bioseq_resolution)> TSeqIdResolutionFinishedCB
function< void(void)> TSeqIdResolutionStartProcessingCB
function< void(void)> TContinueResolveCB
function< void(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message, EPSGS_LoggingFlag logging_flag)> TSeqIdResolutionErrorCB
ssize_t SelectBioseqInfoRecord(const vector< CBioseqInfoRecord > &records)
CBioseqInfoFetchRequest & SetVersion(CBioseqInfoRecord::TVersion value)
Definition: request.hpp:73
CBioseqInfoFetchRequest & SetGI(CBioseqInfoRecord::TGI value)
Definition: request.hpp:87
CBioseqInfoFetchRequest & SetSeqIdType(CBioseqInfoRecord::TSeqIdType value)
Definition: request.hpp:80
CBioseqInfoFetchRequest & SetAccession(CBioseqInfoRecord::TAccession const &value)
Definition: request.hpp:64
TSeqIdType GetSeqIdType() const
Definition: record.hpp:208
TVersion GetVersion() const
Definition: record.hpp:203
int16_t TVersion
Definition: record.hpp:53
int16_t TSeqIdType
Definition: record.hpp:54
CBioseqInfoRecord & SetSeqIdType(TSeqIdType value)
Definition: record.hpp:106
TGI GetGI() const
Definition: record.hpp:223
CBioseqInfoRecord & SetGI(TGI value)
Definition: record.hpp:124
TAccession const & GetAccession() const
Definition: record.hpp:198
CBioseqInfoRecord & SetVersion(TVersion value)
Definition: record.hpp:100
string TAccession
Definition: record.hpp:51
CBioseqInfoRecord & SetAccession(const TAccession &value)
Definition: record.hpp:94
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
Definition: fetch.cpp:92
void SetConsumeCallback(TBioseqInfoConsumeCallback callback)
Definition: fetch.cpp:87
void SetErrorCB(TDataErrorCallback error_cb)
void SetReadFinished(void)
Definition: cass_fetch.hpp:89
CCassBlobWaiter * GetLoader(void)
Definition: cass_fetch.hpp:86
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
Definition: fetch.cpp:70
void SetConsumeCallback(TSI2CSIConsumeCallback callback)
Definition: fetch.cpp:65
void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)
CRequestStatus::ECode GetCombinedErrorCode(void) const
bool HasErrors(void) const
void AppendError(const string &msg, CRequestStatus::ECode code)
string GetCombinedErrorMessage(const list< SPSGSeqId > &seq_id_to_resolve) const
string GetCouldNotResolveMessage(void) const
SPSGS_RequestBase::EPSGS_AccSubstitutioOption GetAccessionSubstitutionOption(void)
string x_GetSeqIdsToResolveList(void) const
psg_time_point_t m_BioseqInfoStart
void x_OnBioseqInfoWithoutSeqIdType(vector< CBioseqInfoRecord > &&records)
list< SPSGSeqId > m_SeqIdsToResolve
psg_time_point_t m_AsyncCassResolutionStart
void x_OnSi2csiError(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
TSeqIdResolutionErrorCB m_ErrorCB
psg_time_point_t m_Si2csiStart
CBioseqInfoRecord::TSeqIdType m_BioseqInfoRequestedSeqIdType
void x_PreparePrimaryBioseqInfoQuery(const CBioseqInfoRecord::TAccession &seq_id, CBioseqInfoRecord::TVersion version, CBioseqInfoRecord::TSeqIdType seq_id_type, CBioseqInfoRecord::TGI gi, bool with_seq_id_type)
CBioseqInfoRecord::TGI m_BioseqInfoRequestedGI
SBioseqResolution m_BioseqResolution
SPSGS_ResolveRequest::TPSGS_BioseqIncludeData GetBioseqInfoFields(void)
void x_OnBioseqInfoError(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
bool CanSkipBioseqInfoRetrieval(const CBioseqInfoRecord &bioseq_info_record)
CPSGSResolveErrors m_ResolveErrors
list< SPSGSeqId >::const_iterator m_CurrentSeqIdToResolve
bool NonKeyBioseqInfoFieldsRequested(void)
void x_OnBioseqInfo(vector< CBioseqInfoRecord > &&records)
void x_PrepareSecondaryAsIsSi2csiQuery(void)
TContinueResolveCB m_ContinueResolveCB
EPSGS_AccessionAdjustmentResult AdjustBioseqAccession(SBioseqResolution &bioseq_resolution)
TSeqIdResolutionStartProcessingCB m_StartProcessingCB
CBioseqInfoRecord::TVersion m_BioseqInfoRequestedVersion
TSeqIdResolutionFinishedCB m_FinishedCB
int16_t GetEffectiveVersion(const CTextseq_id *text_seq_id)
EPSGS_ResolveStage m_ResolveStage
void x_OnSi2csiRecord(vector< CSI2CSIRecord > &&records)
void x_OnSeqIdAsyncResolutionFinished(SBioseqResolution &&async_bioseq_resolution)
CBioseqInfoRecord::TAccession m_BioseqInfoRequestedAccession
void x_PrepareSi2csiQuery(const string &secondary_id, int16_t effective_seq_id_type)
list< unique_ptr< CCassFetch > > m_FetchDetails
bool IsTimeoutError(const string &msg) const
static string TypeToString(EPSGS_Type req_type)
@ ePSGS_AccessionVersionHistoryRequest
CPSGSCounters & GetCounters(void)
static CPubseqGatewayApp * GetInstance(void)
SSatInfoEntry GetBioseqKeyspace(void) const
CSi2CsiFetchRequest & SetSecSeqId(CSI2CSIRecord::TSecSeqId const &value)
Definition: request.hpp:311
CSi2CsiFetchRequest & SetSecSeqIdType(CSI2CSIRecord::TSecSeqIdType value)
Definition: request.hpp:320
virtual void Process(void)=0
Main processing function.
shared_ptr< CPSGS_Reply > m_Reply
shared_ptr< CPSGS_Request > m_Request
#define false
Definition: bool.h:36
static DLIST_TYPE *DLIST_NAME() next(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
Definition: dlist.tmpl.h:56
Int2 int16_t
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
static string & ToUpper(string &str)
Convert string to upper case – string& version.
Definition: ncbistr.cpp:424
TVersion GetVersion(void) const
Get the Version member data.
bool CanGetVersion(void) const
Check if it is safe to call GetVersion method.
@ e_Gi
GenInfo Integrated Database.
Definition: Seq_id_.hpp:106
@ e_Prf
PRF SEQDB.
Definition: Seq_id_.hpp:108
@ e_Pdb
PDB sequence.
Definition: Seq_id_.hpp:109
SINSDCDecision DecideINSDC(const vector< CBioseqInfoRecord > &records, CBioseqInfoRecord::TVersion version)
Definition: insdc_utils.cpp:81
bool IsINSDCSeqIdType(CBioseqInfoRecord::TSeqIdType seq_id_type)
Definition: insdc_utils.cpp:44
const string version
version string
Definition: variables.hpp:66
int ssize_t
Definition: ncbiconf_msvc.h:93
Defines NCBI C++ diagnostic APIs, classes, and macros.
#define nullptr
Definition: ncbimisc.hpp:45
T max(T x_, T y_)
const CConstRef< CSeq_id > GetAccession(const CSeq_id_Handle &id_handle)
string StripTrailingVerticalBars(const string &seq_id)
void PSGSortSeqIds(list< SPSGSeqId > &seq_ids, IPSGS_Processor *processor)
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
#define PSG_WARNING(message)
@ ePSGS_BioseqInfoAccessionAdjustmentError
@ ePSGS_ServerLogicError
@ ePSGS_NoBioseqInfoForGiError
@ ePSGS_UnresolvedSeqId
@ ePSGS_BioseqDB
@ ePSGS_BioseqCache
@ ePSGS_NotResolved
@ ePSGS_Si2csiDB
@ ePSGS_SkipLogging
@ ePSGS_NeedLogging
EPSGS_AccessionAdjustmentResult
@ ePSGS_NotRequired
@ ePSGS_LogicError
@ ePSGS_SeqIdsEmpty
string SanitizeInputValue(const string &input_val)
Defines CRequestStatus class for NCBI C++ diagnostic API.
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
EPSGS_AccessionAdjustmentResult AdjustAccession(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)
EPSGS_ResolutionResult m_ResolutionResult
CBioseqInfoRecord & GetBioseqInfo(void)
void SetBioseqInfo(const CBioseqInfoRecord &record)
vector< string > m_SeqIds
CRequestStatus::ECode m_ErrorCode
Definition: inftrees.h:24
@ eOpStatusFound
Definition: timing.hpp:61
@ eOpStatusNotFound
Definition: timing.hpp:62
@ eLookupCassBioseqInfo
Definition: timing.hpp:71
@ eLookupCassSi2csi
Definition: timing.hpp:70
Modified on Fri Sep 20 14:56:58 2024 by modify_doxy.py rev. 669887