NCBI C++ ToolKit
getblob_processor.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: getblob_processor.cpp 103123 2024-09-11 18:57:02Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: get blob processor
29  *
30  */
31 #include <ncbi_pch.hpp>
32 
33 #include "getblob_processor.hpp"
34 #include "pubseq_gateway.hpp"
37 #include "get_blob_callback.hpp"
38 #include "my_ncbi_cache.hpp"
39 
41 
42 using namespace std::placeholders;
43 
44 static const string kGetblobProcessorName = "Cassandra-getblob";
45 
46 
48  m_BlobRequest(nullptr)
49 {}
50 
51 
53  shared_ptr<CPSGS_Request> request,
54  shared_ptr<CPSGS_Reply> reply,
55  TProcessorPriority priority,
56  const SCass_BlobId & blob_id) :
57  CPSGS_CassProcessorBase(request, reply, priority),
59  bind(&CPSGS_GetBlobProcessor::OnGetBlobProp,
60  this, _1, _2, _3),
61  bind(&CPSGS_GetBlobProcessor::OnGetBlobChunk,
62  this, _1, _2, _3, _4, _5),
63  bind(&CPSGS_GetBlobProcessor::OnGetBlobError,
64  this, _1, _2, _3, _4, _5))
65 {
66  m_BlobId = blob_id;
67 
68  // Convenience to avoid calling
69  // m_Request->GetRequest<SPSGS_BlobBySatSatKeyRequest>() everywhere
70  m_BlobRequest = & request->GetRequest<SPSGS_BlobBySatSatKeyRequest>();
71 }
72 
73 
75 {
77 }
78 
79 
80 bool
81 CPSGS_GetBlobProcessor::CanProcess(shared_ptr<CPSGS_Request> request,
82  shared_ptr<CPSGS_Reply> reply) const
83 {
84  if (!IsCassandraProcessorEnabled(request))
85  return false;
86 
87  if (request->GetRequestType() != CPSGS_Request::ePSGS_BlobBySatSatKeyRequest)
88  return false;
89 
90  auto blob_request = & request->GetRequest<SPSGS_BlobBySatSatKeyRequest>();
91  SCass_BlobId blob_id(blob_request->m_BlobId.GetId());
92  if (!blob_id.IsValid())
93  return false;
94 
95  auto * app = CPubseqGatewayApp::GetInstance();
96  auto startup_data_state = app->GetStartupDataState();
97  if (startup_data_state != ePSGS_StartupDataOK) {
98  if (request->NeedTrace()) {
99  reply->SendTrace(kGetblobProcessorName + " processor cannot process "
100  "request because Cassandra DB is not available.\n" +
101  GetCassStartupDataStateMessage(startup_data_state),
102  request->GetStartTimestamp());
103  }
104  return false;
105  }
106 
107  return true;
108 }
109 
110 
112 CPSGS_GetBlobProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
113  shared_ptr<CPSGS_Reply> reply,
114  TProcessorPriority priority) const
115 {
116  if (!CanProcess(request, reply))
117  return nullptr;
118 
119  auto blob_request = & request->GetRequest<SPSGS_BlobBySatSatKeyRequest>();
120  SCass_BlobId blob_id(blob_request->m_BlobId.GetId());
121  return new CPSGS_GetBlobProcessor(request, reply, priority, blob_id);
122 }
123 
124 
126 {
127  auto app = CPubseqGatewayApp::GetInstance();
128 
129  if (!m_BlobId.MapSatToKeyspace()) {
130  app->GetCounters().Increment(this,
132 
133  string err_msg = kGetblobProcessorName + " processor failed to map sat " +
134  to_string(m_BlobId.m_Sat) +
135  " to a Cassandra keyspace";
136  IPSGS_Processor::m_Reply->PrepareProcessorMessage(
141  PSG_WARNING(err_msg);
142 
144 
145  if (IPSGS_Processor::m_Reply->IsOutputReady())
146  x_Peek(false);
147  return;
148  }
149 
150  // Lock the request for all the cassandra processors so that the other
151  // processors may wait on the event
153 
154  if (m_BlobId.m_IsSecureKeyspace.value()) {
155 
156  auto populate_result = PopulateMyNCBIUser(
158  this, _1, _2),
160  this, _1, _2, _3, _4, _5));
161  switch (populate_result) {
163  // The user name has been populated so just continue
164  break;
168  if (IPSGS_Processor::m_Reply->IsOutputReady())
169  x_Peek(false);
170  return;
173  // The error handlers have been called while checking the caches.
174  // The error handlers called SignalFinishProcessing() so just
175  // return.
176  return;
179  // Wait for a callback which comes from cache or from the my
180  // ncbi access wrapper asynchronously
181  return;
182  default:
183  break; // Cannot happened
184  }
185  }
186 
187  x_Process();
188 }
189 
190 
192 {
193  // The user name check is done when a connection is requested
194  shared_ptr<CCassConnection> cass_connection;
195  try {
196  if (m_BlobId.m_IsSecureKeyspace.value()) {
197  cass_connection = m_BlobId.m_Keyspace->GetSecureConnection(
198  m_UserName.value());
199  if (!cass_connection) {
202 
203  if (IPSGS_Processor::m_Reply->IsOutputReady())
204  x_Peek(false);
205  return;
206  }
207  } else {
208  cass_connection = m_BlobId.m_Keyspace->GetConnection();
209  }
210  } catch (const exception & exc) {
213 
214  if (IPSGS_Processor::m_Reply->IsOutputReady())
215  x_Peek(false);
216  return;
217  } catch (...) {
220 
221  if (IPSGS_Processor::m_Reply->IsOutputReady())
222  x_Peek(false);
223  return;
224  }
225 
226  unique_ptr<CCassBlobFetch> fetch_details;
227  fetch_details.reset(new CCassBlobFetch(*m_BlobRequest, m_BlobId));
228 
229  unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
232  EPSGS_CacheLookupResult blob_prop_cache_lookup_result = ePSGS_CacheNotHit;
233 
234  if (!m_BlobId.m_IsSecureKeyspace.value()) {
235  // The secure sat blob props are not cached
236  blob_prop_cache_lookup_result = psg_cache.LookupBlobProp(
237  this,
238  m_BlobId.m_Sat,
241  *blob_record.get());
242  }
243 
244  CCassBlobTaskLoadBlob * load_task = nullptr;
245  if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
246  load_task = new CCassBlobTaskLoadBlob(cass_connection,
247  m_BlobId.m_Keyspace->keyspace,
248  std::move(blob_record),
249  false, nullptr);
250  fetch_details->SetLoader(load_task);
251  } else {
253  // No data in cache and not going to the DB
254  size_t item_id = IPSGS_Processor::m_Reply->GetItemId();
255  auto ret_status = CRequestStatus::e404_NotFound;
256  if (blob_prop_cache_lookup_result == ePSGS_CacheNotHit) {
257  IPSGS_Processor::m_Reply->PrepareBlobPropMessage(
258  item_id, kGetblobProcessorName,
259  "Blob properties are not found",
260  ret_status, ePSGS_NoBlobPropsError,
261  eDiag_Error);
262  } else {
264  IPSGS_Processor::m_Reply->PrepareBlobPropMessage(
265  item_id, kGetblobProcessorName,
266  "Blob properties are not found due to a cache lookup error",
267  ret_status, ePSGS_NoBlobPropsError,
268  eDiag_Error);
269  }
270  IPSGS_Processor::m_Reply->PrepareBlobPropCompletion(
271  item_id, kGetblobProcessorName, 2);
272  fetch_details->RemoveFromExcludeBlobCache();
273 
274  // Finished without reaching cassandra
275  UpdateOverallStatus(ret_status);
277  return;
278  }
279 
281  load_task = new CCassBlobTaskLoadBlob(cass_connection,
282  m_BlobId.m_Keyspace->keyspace,
284  false, nullptr);
285  fetch_details->SetLoader(load_task);
286  } else {
287  load_task = new CCassBlobTaskLoadBlob(cass_connection,
288  m_BlobId.m_Keyspace->keyspace,
291  false, nullptr);
292  fetch_details->SetLoader(load_task);
293  }
294  }
295 
296  load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
297  load_task->SetErrorCB(
300  this, _1, _2, _3, _4, _5),
301  fetch_details.get()));
302  load_task->SetPropsCallback(
303  CBlobPropCallback(this,
305  this, _1, _2, _3),
308  fetch_details.get(),
309  blob_prop_cache_lookup_result != ePSGS_CacheHit));
310 
311  if (IPSGS_Processor::m_Request->NeedTrace()) {
312  IPSGS_Processor::m_Reply->SendTrace(
313  "Cassandra request: " +
314  ToJsonString(*load_task),
315  IPSGS_Processor::m_Request->GetStartTimestamp());
316  }
317 
318  m_FetchDetails.push_back(std::move(fetch_details));
319 
320  // Initiate cassandra request
321  load_task->Wait();
322 }
323 
324 
326  CBlobRecord const & blob,
327  bool is_found)
328 {
329  if (m_Canceled) {
331  return;
332  }
333 
334  if (is_found) {
335  if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
337  return;
338  }
339  }
340 
341  // NOTE: getblob processor should unlock waiting processors regardless if
342  // the blob properties are found or not.
343  // - if found => the other processors will be canceled
344  // - if not found => the blob will not be retrieved anyway without the blob
345  // props so the other processors may continue
347 
348  CPSGS_CassBlobBase::OnGetBlobProp(fetch_details, blob, is_found);
349 
350  if (IPSGS_Processor::m_Reply->IsOutputReady())
351  x_Peek(false);
352 }
353 
354 
356  CRequestStatus::ECode status,
357  int code,
358  EDiagSev severity,
359  const string & message)
360 {
361  if (m_Canceled) {
363  return;
364  }
365 
366  CPSGS_CassBlobBase::OnGetBlobError(fetch_details, status, code,
367  severity, message);
368 
369  if (IPSGS_Processor::m_Reply->IsOutputReady())
370  x_Peek(false);
371 }
372 
373 
375  CBlobRecord const & blob,
376  const unsigned char * chunk_data,
377  unsigned int data_size,
378  int chunk_no)
379 {
381  chunk_data, data_size, chunk_no);
382 
383  if (IPSGS_Processor::m_Reply->IsOutputReady())
384  x_Peek(false);
385 }
386 
387 
389 {
390  auto status = CPSGS_CassProcessorBase::GetStatus();
391  if (status == IPSGS_Processor::ePSGS_InProgress)
392  return status;
393 
394  if (m_Canceled)
396 
397  return status;
398 }
399 
400 
402 {
403  return kGetblobProcessorName;
404 }
405 
406 
408 {
410 }
411 
412 
414 {
415  x_Peek(true);
416 }
417 
418 
419 void CPSGS_GetBlobProcessor::x_Peek(bool need_wait)
420 {
421  if (m_Canceled) {
423  return;
424  }
425 
426  // 1 -> call m_Loader->Wait1 to pick data
427  // 2 -> check if we have ready-to-send buffers
428  // 3 -> call reply->Send() to send what we have if it is ready
429  /* bool overall_final_state = false; */
430 
431  while (true) {
432  auto initial_size = m_FetchDetails.size();
433 
434  for (auto & details: m_FetchDetails) {
435  if (details) {
436  if (details->InPeek()) {
437  continue;
438  }
439  details->SetInPeek(true);
440  /* overall_final_state |= */ x_Peek(details, need_wait);
441  details->SetInPeek(false);
442  }
443  }
444 
445  if (initial_size == m_FetchDetails.size()) {
446  break;
447  }
448  }
449 
450  // Blob specific: ready packets need to be sent right away
452 
453  // Blob specific: deal with exclude blob cache
455  for (auto & details: m_FetchDetails) {
456  if (details) {
457  // Update the cache records where needed
458  details->SetExcludeBlobCacheCompleted();
459  }
460  }
462  }
463 }
464 
465 
466 bool CPSGS_GetBlobProcessor::x_Peek(unique_ptr<CCassFetch> & fetch_details,
467  bool need_wait)
468 {
469  if (!fetch_details->GetLoader())
470  return true;
471 
472  bool final_state = false;
473  if (need_wait)
474  if (!fetch_details->ReadFinished())
475  final_state = fetch_details->GetLoader()->Wait();
476 
477  if (fetch_details->GetLoader()->HasError() &&
478  IPSGS_Processor::m_Reply->IsOutputReady() &&
479  ! IPSGS_Processor::m_Reply->IsFinished()) {
480  // Send an error
481  string error = fetch_details->GetLoader()->LastError();
482  auto * app = CPubseqGatewayApp::GetInstance();
483 
484  app->GetCounters().Increment(this,
486  PSG_ERROR(error);
487 
488  CCassBlobFetch * blob_fetch = static_cast<CCassBlobFetch *>(fetch_details.get());
490 
491  // Mark finished
493  fetch_details->GetLoader()->ClearError();
494  fetch_details->SetReadFinished();
496  }
497 
498  return final_state;
499 }
500 
501 
502 void CPSGS_GetBlobProcessor::x_OnMyNCBIError(const string & cookie,
503  CRequestStatus::ECode status,
504  int code,
505  EDiagSev severity,
506  const string & message)
507 {
508  if (status == CRequestStatus::e404_NotFound) {
510  } else {
511  ReportMyNCBIError(status, message);
512  }
513 
515 
516  if (IPSGS_Processor::m_Reply->IsOutputReady())
517  x_Peek(false);
518 }
519 
520 
521 void CPSGS_GetBlobProcessor::x_OnMyNCBIData(const string & cookie,
523 {
524  // All good, can proceed
525  m_UserName = user_info.username;
526  x_Process();
527 }
528 
const string kCassandraProcessorGroupName
const string kCassandraProcessorEvent
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
Definition: load_blob.cpp:161
void SetPropsCallback(TBlobPropsCallback callback)
Definition: load_blob.cpp:151
void SetErrorCB(TDataErrorCallback error_cb)
EPSGS_CacheLookupResult LookupBlobProp(IPSGS_Processor *processor, int sat, int sat_key, int64_t &last_modified, CBlobRecord &blob_record)
void OnGetBlobChunk(bool cancelled, CCassBlobFetch *fetch_details, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)
void OnGetBlobProp(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)
void PrepareServerErrorMessage(CCassBlobFetch *fetch_details, int code, EDiagSev severity, const string &message)
void OnGetBlobError(CCassBlobFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void UpdateOverallStatus(CRequestStatus::ECode status)
bool IsCassandraProcessorEnabled(shared_ptr< CPSGS_Request > request) const
EPSGS_MyNCBILookupResult PopulateMyNCBIUser(TMyNCBIDataCB data_cb, TMyNCBIErrorCB error_cb)
void ReportSecureSatUnauthorized(const string &user_name)
list< unique_ptr< CCassFetch > > m_FetchDetails
bool AreAllFinishedRead(void) const
IPSGS_Processor::EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
void ReportMyNCBIError(CRequestStatus::ECode status, const string &my_ncbi_message)
virtual string GetGroupName(void) const
Tells the processor group name.
virtual void Process(void)
Main processing function.
SPSGS_BlobBySatSatKeyRequest * m_BlobRequest
void x_OnMyNCBIError(const string &cookie, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
virtual EPSGS_Status GetStatus(void)
Tells the processor status (if it has finished or in progress)
void OnGetBlobChunk(CCassBlobFetch *fetch_details, CBlobRecord const &blob, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)
virtual void ProcessEvent(void)
Called when an event happened which may require to have some processing.
void x_Peek(bool need_wait)
void x_OnMyNCBIData(const string &cookie, CPSG_MyNCBIRequest_WhoAmI::SUserInfo info)
void OnGetBlobProp(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)
void OnGetBlobError(CCassBlobFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
virtual string GetName(void) const
Tells the processor name (used in logging and tracing)
virtual IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const
Create processor to fulfil PSG request using the data source.
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const
Tells if processor can process the given request.
@ ePSGS_SendAccumulated
Definition: psgs_reply.hpp:54
@ ePSGS_BlobBySatSatKeyRequest
static CPubseqGatewayApp * GetInstance(void)
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
static const string kGetblobProcessorName
USING_NCBI_SCOPE
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
#define nullptr
Definition: ncbimisc.hpp:45
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
#define PSG_ERROR(message)
#define PSG_WARNING(message)
@ ePSGS_StartupDataOK
EPSGS_CacheLookupResult
@ ePSGS_CacheHit
@ ePSGS_CacheNotHit
int TProcessorPriority
@ ePSGS_UnknownError
@ ePSGS_NoBlobPropsError
@ ePSGS_UnknownResolvedSatellite
string GetCassStartupDataStateMessage(EPSGS_StartupDataState state)
#define INT64_MIN
Definition: stdint.h:184
CBioseqInfoRecord::TSat m_Sat
optional< SSatInfoEntry > m_Keyspace
optional< bool > m_IsSecureKeyspace
bool MapSatToKeyspace(void)
CBioseqInfoRecord::TSatKey m_SatKey
CBlobRecord::TTimestamp m_LastModified
EPSGS_CacheAndDbUse m_UseCache
Definition: inftrees.h:24
Modified on Fri Sep 20 14:57:24 2024 by modify_doxy.py rev. 669887