NCBI C++ ToolKit
resolve_processor.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: resolve_processor.cpp 102015 2024-03-19 13:18:47Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: resolve processor
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "resolve_processor.hpp"
35 #include "pubseq_gateway.hpp"
38 
40 
41 using namespace std::placeholders;
42 
43 static const string kResolveProcessorName = "Cassandra-resolve";
44 
46  m_ResolveRequest(nullptr)
47 {}
48 
49 
51  shared_ptr<CPSGS_Request> request,
52  shared_ptr<CPSGS_Reply> reply,
53  TProcessorPriority priority) :
54  CPSGS_CassProcessorBase(request, reply, priority),
55  CPSGS_ResolveBase(request, reply,
56  bind(&CPSGS_ResolveProcessor::x_OnSeqIdResolveFinished,
57  this, _1),
58  bind(&CPSGS_ResolveProcessor::x_OnSeqIdResolveError,
59  this, _1, _2, _3, _4),
60  bind(&CPSGS_ResolveProcessor::x_OnResolutionGoodData,
61  this))
62 {
63  // Convenience to avoid calling
64  // m_Request->GetRequest<SPSGS_ResolveRequest>() everywhere
65  m_ResolveRequest = & request->GetRequest<SPSGS_ResolveRequest>();
66 }
67 
68 
70 {}
71 
72 
73 bool CPSGS_ResolveProcessor::CanProcess(shared_ptr<CPSGS_Request> request,
74  shared_ptr<CPSGS_Reply> reply) const
75 {
76  if (!IsCassandraProcessorEnabled(request))
77  return false;
78 
79  if (request->GetRequestType() != CPSGS_Request::ePSGS_ResolveRequest)
80  return false;
81 
82  auto * app = CPubseqGatewayApp::GetInstance();
83  auto startup_data_state = app->GetStartupDataState();
84  if (startup_data_state != ePSGS_StartupDataOK) {
85  if (request->NeedTrace()) {
86  reply->SendTrace(kResolveProcessorName + " processor cannot process "
87  "request because Cassandra DB is not available.\n" +
88  GetCassStartupDataStateMessage(startup_data_state),
89  request->GetStartTimestamp());
90  }
91  return false;
92  }
93  return true;
94 }
95 
96 
98 CPSGS_ResolveProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
99  shared_ptr<CPSGS_Reply> reply,
100  TProcessorPriority priority) const
101 {
102  if (!CanProcess(request, reply))
103  return nullptr;
104 
105  return new CPSGS_ResolveProcessor(request, reply, priority);
106 }
107 
108 
110 {
111  // Lock the request for all the cassandra processors so that the other
112  // processors may wait on the event
114 
115  // In both cases: sync or async resolution --> a callback will be called
117 }
118 
119 
120 // This callback is called in all cases when there is no valid resolution, i.e.
121 // 404, or any kind of errors
122 void
124  CRequestStatus::ECode status,
125  int code,
126  EDiagSev severity,
127  const string & message)
128 {
129  if (m_Canceled) {
131  return;
132  }
133 
134  CRequestContextResetter context_resetter;
135  IPSGS_Processor::m_Request->SetRequestContext();
136 
137  EPSGS_LoggingFlag logging_flag = ePSGS_NeedLogging;
138  if (status == CRequestStatus::e404_NotFound)
139  logging_flag = ePSGS_SkipLogging;
140  CountError(nullptr, status, code, severity, message,
141  logging_flag, ePSGS_NeedStatusUpdate);
142 
143  size_t item_id = IPSGS_Processor::m_Reply->GetItemId();
144  IPSGS_Processor::m_Reply->PrepareBioseqMessage(item_id, kResolveProcessorName,
145  message, status, code,
146  severity);
147  IPSGS_Processor::m_Reply->PrepareBioseqCompletion(item_id, kResolveProcessorName, 2);
148 
150 }
151 
152 
153 // This callback is called only in case of a valid resolution
154 void
156  SBioseqResolution && bioseq_resolution)
157 {
158  if (m_Canceled) {
160  return;
161  }
162 
163  CRequestContextResetter context_resetter;
164  IPSGS_Processor::m_Request->SetRequestContext();
165 
166  x_SendBioseqInfo(bioseq_resolution);
167 
169 }
170 
171 
172 void
174 {
175  auto effective_output_format = m_ResolveRequest->m_OutputFormat;
176  if (effective_output_format == SPSGS_ResolveRequest::ePSGS_NativeFormat ||
177  effective_output_format == SPSGS_ResolveRequest::ePSGS_UnknownFormat) {
178  effective_output_format = SPSGS_ResolveRequest::ePSGS_JsonFormat;
179  }
180 
181  if (bioseq_resolution.m_ResolutionResult == ePSGS_BioseqDB ||
182  bioseq_resolution.m_ResolutionResult == ePSGS_BioseqCache)
183  AdjustBioseqAccession(bioseq_resolution);
184 
185  string data_to_send;
186  if (effective_output_format == SPSGS_ResolveRequest::ePSGS_JsonFormat) {
187  data_to_send = ToJsonString(bioseq_resolution.GetBioseqInfo(),
189  } else {
190  data_to_send = ToBioseqProtobuf(bioseq_resolution.GetBioseqInfo());
191  }
192 
193  size_t item_id = IPSGS_Processor::m_Reply->GetItemId();
194  IPSGS_Processor::m_Reply->PrepareBioseqData(item_id, kResolveProcessorName,
195  data_to_send,
196  effective_output_format);
197  IPSGS_Processor::m_Reply->PrepareBioseqCompletion(item_id,
199 }
200 
201 
203 {
204  auto status = CPSGS_CassProcessorBase::GetStatus();
205  if (status == IPSGS_Processor::ePSGS_InProgress) {
206  return status;
207  }
208 
209  if (m_Canceled) {
211  }
212 
213  return status;
214 }
215 
216 
218 {
219  return kResolveProcessorName;
220 }
221 
222 
224 {
226 }
227 
228 
230 {
231  x_Peek(true);
232 }
233 
234 
235 void CPSGS_ResolveProcessor::x_Peek(bool need_wait)
236 {
237  if (m_Canceled) {
239  return;
240  }
241 
242  // 1 -> call m_Loader->Wait1 to pick data
243  // 2 -> check if we have ready-to-send buffers
244  // 3 -> call reply->Send() to send what we have if it is ready
245  bool overall_final_state = false;
246 
247  while (true) {
248  auto initial_size = m_FetchDetails.size();
249 
250  for (auto & details: m_FetchDetails) {
251  if (details) {
252  if (details->InPeek()) {
253  continue;
254  }
255  details->SetInPeek(true);
256  overall_final_state |= x_Peek(details, need_wait);
257  details->SetInPeek(false);
258  }
259  }
260 
261  if (initial_size == m_FetchDetails.size())
262  break;
263  }
264 
265  // Ready packets needs to be send only once when everything is finished
266  if (overall_final_state) {
267  if (AreAllFinishedRead()) {
270  }
271  }
272 }
273 
274 
275 bool CPSGS_ResolveProcessor::x_Peek(unique_ptr<CCassFetch> & fetch_details,
276  bool need_wait)
277 {
278  if (!fetch_details->GetLoader())
279  return true;
280 
281  bool final_state = false;
282  if (need_wait)
283  if (!fetch_details->ReadFinished()) {
284  final_state = fetch_details->GetLoader()->Wait();
285  if (final_state) {
286  fetch_details->SetReadFinished();
287  }
288  }
289 
290  if (fetch_details->GetLoader()->HasError() &&
291  IPSGS_Processor::m_Reply->IsOutputReady() &&
292  ! IPSGS_Processor::m_Reply->IsFinished()) {
293  // Send an error
294  string error = fetch_details->GetLoader()->LastError();
295  auto * app = CPubseqGatewayApp::GetInstance();
296 
297  app->GetCounters().Increment(this,
299  PSG_ERROR(error);
300 
301  IPSGS_Processor::m_Reply->PrepareProcessorMessage(
302  IPSGS_Processor::m_Reply->GetItemId(),
306 
307  // Mark finished
309  fetch_details->GetLoader()->ClearError();
310  fetch_details->SetReadFinished();
312  }
313 
314  return final_state;
315 }
316 
317 
319 {
320  // The resolution process started to receive data which look good so
321  // the dispatcher should be notified that the other processors can be
322  // stopped
323  if (m_Canceled) {
325  return;
326  }
327 
328  if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
330  }
331 
332  // If the other processor waits then let it go but after sending the signal
333  // of the good data (it may cancel the other processors)
335 }
336 
const string kCassandraProcessorGroupName
const string kCassandraProcessorEvent
EPSGS_AccessionAdjustmentResult AdjustBioseqAccession(SBioseqResolution &bioseq_resolution)
CRequestStatus::ECode CountError(CCassFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message, EPSGS_LoggingFlag logging_flag, EPSGS_StatusUpdateFlag status_update_flag)
void UpdateOverallStatus(CRequestStatus::ECode status)
bool IsCassandraProcessorEnabled(shared_ptr< CPSGS_Request > request) const
list< unique_ptr< CCassFetch > > m_FetchDetails
bool AreAllFinishedRead(void) const
IPSGS_Processor::EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
@ ePSGS_SendAccumulated
Definition: psgs_reply.hpp:54
void ResolveInputSeqId(void)
virtual string GetGroupName(void) const
Tells the processor group name.
void x_OnSeqIdResolveFinished(SBioseqResolution &&bioseq_resolution)
virtual IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const
Create processor to fulfil PSG request using the data source.
SPSGS_ResolveRequest * m_ResolveRequest
virtual void Process(void)
Main processing function.
virtual EPSGS_Status GetStatus(void)
Tells the processor status (if it has finished or in progress)
virtual string GetName(void) const
Tells the processor name (used in logging and tracing)
void x_SendBioseqInfo(SBioseqResolution &bioseq_resolution)
void x_OnSeqIdResolveError(CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void x_Peek(bool need_wait)
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const
Tells if processor can process the given request.
virtual void ProcessEvent(void)
Called when an event happened which may require to have some processing.
static CPubseqGatewayApp * GetInstance(void)
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
#define nullptr
Definition: ncbimisc.hpp:45
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
string ToBioseqProtobuf(const CBioseqInfoRecord &bioseq_info)
#define PSG_ERROR(message)
@ ePSGS_StartupDataOK
int TProcessorPriority
@ ePSGS_NeedStatusUpdate
@ ePSGS_UnknownError
@ ePSGS_BioseqDB
@ ePSGS_BioseqCache
@ ePSGS_SkipLogging
@ ePSGS_NeedLogging
string GetCassStartupDataStateMessage(EPSGS_StartupDataState state)
static const string kResolveProcessorName
USING_NCBI_SCOPE
EPSGS_ResolutionResult m_ResolutionResult
CBioseqInfoRecord & GetBioseqInfo(void)
TPSGS_BioseqIncludeData m_IncludeDataFlags
EPSGS_OutputFormat m_OutputFormat
Definition: inftrees.h:24
Modified on Mon Apr 22 04:04:50 2024 by modify_doxy.py rev. 669887