NCBI C++ ToolKit
cass_blob_base.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: cass_blob_base.cpp 102540 2024-05-28 17:11:29Z grichenk $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: base class for processors which retrieve blobs
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
35 #include <corelib/ncbidiag.hpp>
36 
37 #include "cass_fetch.hpp"
38 #include "psgs_request.hpp"
39 #include "psgs_reply.hpp"
40 #include "pubseq_gateway_utils.hpp"
42 #include "pubseq_gateway.hpp"
43 #include "cass_blob_base.hpp"
47 
48 using namespace std::placeholders;
49 
50 
52  m_LastModified(-1),
53  m_CollectSplitInfo(false),
54  m_SplitInfoGzipFlag(false)
55 {}
56 
57 
58 CPSGS_CassBlobBase::CPSGS_CassBlobBase(shared_ptr<CPSGS_Request> request,
59  shared_ptr<CPSGS_Reply> reply,
60  const string & processor_id,
61  TBlobPropsCB blob_props_cb,
62  TBlobChunkCB blob_chunk_cb,
63  TBlobErrorCB blob_error_cb) :
64  m_NeedToParseId2Info(true),
65  m_ProcessorId(processor_id),
66  m_LastModified(-1),
67  m_CollectSplitInfo(false),
68  m_SplitInfoGzipFlag(false),
69  m_BlobPropsCB(blob_props_cb),
70  m_BlobChunkCB(blob_chunk_cb),
71  m_BlobErrorCB(blob_error_cb),
72  m_NeedFallbackBlob(false),
73  m_FallbackBlobRequested(false)
74 {
75  // Detect if a fallback to the original blob is required if there were
76  // problems
78  switch (request->GetRequestType()) {
80  request_tse_option = request->GetRequest<SPSGS_AnnotRequest>().m_TSEOption;
81  break;
83  request_tse_option = request->GetRequest<SPSGS_BlobBySeqIdRequest>().m_TSEOption;
84  break;
86  request_tse_option = request->GetRequest<SPSGS_BlobBySatSatKeyRequest>().m_TSEOption;
87  break;
88  default:
89  // The m_RequestTSEOption is relevant only for the 3 requests above
90  break;
91  }
92 
93  if (request_tse_option == SPSGS_BlobRequestBase::ePSGS_SlimTSE ||
94  request_tse_option == SPSGS_BlobRequestBase::ePSGS_SmartTSE ||
95  request_tse_option == SPSGS_BlobRequestBase::ePSGS_WholeTSE) {
96  m_NeedFallbackBlob = true;
97  }
98 }
99 
100 
102 {}
103 
104 
105 void
107  CBlobRecord const & in_blob_prop,
108  bool is_found)
109 {
110  // Need a copy because the broken id2 info needs to be overwritten
111  CBlobRecord blob_prop = in_blob_prop;
112 
113  CRequestContextResetter context_resetter;
114  m_Request->SetRequestContext();
115 
116  if (m_Request->NeedTrace()) {
117  m_Reply->SendTrace("Blob prop callback; found: " + to_string(is_found),
118  m_Request->GetStartTimestamp());
119  }
120 
121  if (is_found) {
122  // The method could be called multiple times. First time it is called
123  // for the blob in the request (ID/getblob and ID/get). At this moment
124  // the blob id2info field should be parsed and memorized.
125  // Later the original blob id2info is used to decide if id2_chunk
126  // and id2_info should be present in the reply.
127 
128  if (m_LastModified == -1)
129  m_LastModified = blob_prop.GetModified();
130 
131  // If the split info chunks need to be collected then the zip flag
132  // needs to be memorized for the further deserialization when all the
133  // chunks have been received
134  if (m_CollectSplitInfo) {
135  if (fetch_details->GetBlobId() == m_InfoBlobId) {
137  }
138  }
139 
140  unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info>
141  parsed_id2_info = x_CheckId2Info(fetch_details, blob_prop);
142  x_PrepareBlobPropData(fetch_details, blob_prop);
143 
144  // Blob may be withdrawn or confidential
145  bool is_authorized = x_IsAuthorized(ePSGS_RetrieveBlobData,
146  fetch_details->GetBlobId(),
147  blob_prop, "");
148  if (!is_authorized) {
149  x_PrepareBlobPropCompletion(fetch_details);
150 
151  // Need to send 403 - forbidden
152  x_PrepareBlobMessage(fetch_details, "Blob retrieval is not authorized",
155  eDiag_Error);
156 
157  fetch_details->GetLoader()->ClearError();
158  fetch_details->SetReadFinished();
159  return;
160  }
161 
162  if (m_NeedToParseId2Info) {
163  m_NeedToParseId2Info = false;
164 
165  // Note: here the id2_info field is taken from the original
166  // incoming blob pros. This is because during checking the id2_info
167  // field above (x_CheckId2Info()) it could be reset if it is broken
168  if (!in_blob_prop.GetId2Info().empty()) {
169  m_Id2Info.reset(parsed_id2_info.release());
170 
171  if (m_Id2Info.get() == nullptr) {
172  if (m_NeedFallbackBlob) {
173  m_NeedFallbackBlob = false;
174 
175  string err_msg = "Falling back to retrieve "
176  "the original blob due to broken id2 info.";
177 
178  m_Reply->PrepareProcessorMessage(
179  m_Reply->GetItemId(), m_ProcessorId, err_msg,
182 
183  PSG_ERROR(err_msg);
184 
185  x_OnBlobPropOrigTSE(fetch_details, blob_prop);
186  return;
187  }
188 
189  // Here: basically it is a case of tse=orig or tse=none
190  // In both cases we just continue.
191  }
192  }
193  }
194 
195  if (fetch_details->GetTSEOption() != SPSGS_BlobRequestBase::ePSGS_UnknownTSE) {
196  // Memorize the fetch details and blob props for the case
197  // if a fallback to the original blob is requested. Only the
198  // initial blob props need to be saved and this is when a TSE
199  // option is known.
200  m_InitialBlobPropFetch = fetch_details;
201  m_InitialBlobProps = blob_prop;
202  }
203 
204  // Note: initially only blob_props are requested and at that moment the
205  // TSE option is 'known'. So the initial request should be
206  // finished, see m_FinishedRead = true
207  // In the further requests of the blob data regardless with blob
208  // props or not, the TSE option is set to unknown so the request
209  // will be finished at the moment when blob chunks are handled.
210  switch (fetch_details->GetTSEOption()) {
212  x_OnBlobPropNoneTSE(fetch_details);
213  break;
215  x_OnBlobPropSlimTSE(fetch_details, blob_prop);
216  break;
218  x_OnBlobPropSmartTSE(fetch_details, blob_prop);
219  break;
221  x_OnBlobPropWholeTSE(fetch_details, blob_prop);
222  break;
224  x_OnBlobPropOrigTSE(fetch_details, blob_prop);
225  break;
227  // Used when INFO blobs are asked; i.e. chunks have been
228  // requested as well, so only the prop completion message needs
229  // to be sent.
230  x_PrepareBlobPropCompletion(fetch_details);
231  break;
232  }
233  } else {
234  x_OnBlobPropNotFound(fetch_details);
235  }
236 }
237 
238 
239 void
241 {
242  // Nothing else to be sent
243  x_PrepareBlobPropCompletion(fetch_details);
244  fetch_details->GetLoader()->ClearError();
245  fetch_details->SetReadFinished();
246 }
247 
248 
249 void
251  CBlobRecord const & blob)
252 {
253  auto fetch_blob = fetch_details->GetBlobId();
254 
255  // The handler deals with all kind of blob requests:
256  // - by sat/sat_key
257  // - by seq_id/seq_id_type
258  // - by sat/sat_key after named annotation resolution
259  // So get the reference to the blob base request
260  auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
261  auto * app = CPubseqGatewayApp::GetInstance();
262 
263  unsigned int max_to_send = max(app->GetSendBlobIfSmall(),
264  blob_request.m_SendBlobIfSmall);
265 
266  fetch_details->GetLoader()->ClearError();
267  fetch_details->SetReadFinished();
268  if (blob.GetId2Info().empty()) {
269  x_PrepareBlobPropCompletion(fetch_details);
270 
271  // An original blob may be required if its size is small
272  if (blob.GetSize() <= max_to_send) {
273  // The blob is small so get it
274  x_RequestOriginalBlobChunks(fetch_details, blob);
275  } else {
276  // Nothing else to be sent, the original blob is big
277  }
278  return;
279  }
280 
281  // Not in the cache
282  if (blob.GetSize() <= max_to_send) {
283  // Request the split INFO blob and all split chunks
284  x_RequestID2BlobChunks(fetch_details, blob, false);
285  } else {
286  // Request the split INFO blob only
287  x_RequestID2BlobChunks(fetch_details, blob, true);
288  }
289 
290  // It is important to send completion after: there could be
291  // an error of converting/translating ID2 info
292  x_PrepareBlobPropCompletion(fetch_details);
293 }
294 
295 
296 void
298  CBlobRecord const & blob)
299 {
300  fetch_details->GetLoader()->ClearError();
301  fetch_details->SetReadFinished();
302  if (blob.GetId2Info().empty()) {
303  // Request original blob chunks
304  x_PrepareBlobPropCompletion(fetch_details);
305 
306  x_RequestOriginalBlobChunks(fetch_details, blob);
307  } else {
308  auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
309  auto * app = CPubseqGatewayApp::GetInstance();
310 
311  unsigned int max_to_send = max(app->GetSendBlobIfSmall(),
312  blob_request.m_SendBlobIfSmall);
313 
314  if (blob.GetSize() <= max_to_send) {
315  // Request the split INFO blob and all split chunks
316  x_RequestID2BlobChunks(fetch_details, blob, false);
317  } else {
318  // Request the split INFO blob only
319  x_RequestID2BlobChunks(fetch_details, blob, true);
320  }
321 
322  // It is important to send completion after: there could be
323  // an error of converting/translating ID2 info
324  x_PrepareBlobPropCompletion(fetch_details);
325  }
326 }
327 
328 
329 void
331  CBlobRecord const & blob)
332 {
333  fetch_details->GetLoader()->ClearError();
334  fetch_details->SetReadFinished();
335  if (blob.GetId2Info().empty()) {
336  // Request original blob chunks
337  x_PrepareBlobPropCompletion(fetch_details);
338  x_RequestOriginalBlobChunks(fetch_details, blob);
339  } else {
340  // Request the split INFO blob and all split chunks
341  x_RequestID2BlobChunks(fetch_details, blob, false);
342 
343  // It is important to send completion after: there could be
344  // an error of converting/translating ID2 info
345  x_PrepareBlobPropCompletion(fetch_details);
346  }
347 }
348 
349 
350 void
352  CBlobRecord const & blob)
353 {
354  fetch_details->GetLoader()->ClearError();
355  fetch_details->SetReadFinished();
356  // Request original blob chunks
357  x_PrepareBlobPropCompletion(fetch_details);
358  x_RequestOriginalBlobChunks(fetch_details, blob);
359 }
360 
361 
362 void
364  CBlobRecord const & blob)
365 {
366  auto trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
367  if (m_Request->NeedTrace())
369 
370  auto cass_blob_id = fetch_details->GetBlobId();
371 
372  shared_ptr<CCassConnection> cass_connection;
373  try {
374  if (cass_blob_id.m_IsSecureKeyspace.value()) {
375  cass_connection = cass_blob_id.m_Keyspace->GetSecureConnection(
376  m_UserName.value());
377  if (!cass_connection) {
380  return;
381  }
382  } else {
383  cass_connection = cass_blob_id.m_Keyspace->GetConnection();
384  }
385  } catch (const exception & exc) {
388  return;
389  } catch (...) {
392  return;
393  }
394 
395 
396  // eUnknownTSE is safe here; no blob prop call will happen anyway
397  // eUnknownUseCache is safe here; no further resolution required
399  orig_blob_request(SPSGS_BlobId(cass_blob_id.ToString()),
400  blob.GetModified(),
403  fetch_details->GetClientId(),
404  0, 0, m_Request->GetIncludeHUP(), trace_flag,
405  m_Request->NeedProcessorEvents(),
406  vector<string>(), vector<string>(),
407  psg_clock_t::now());
408 
409  // Create the cass async loader
410  unique_ptr<CBlobRecord> blob_record(new CBlobRecord(blob));
411  CCassBlobTaskLoadBlob * load_task =
412  new CCassBlobTaskLoadBlob(cass_connection,
413  cass_blob_id.m_Keyspace->keyspace,
414  move(blob_record),
415  true, nullptr);
416 
417  unique_ptr<CCassBlobFetch> cass_blob_fetch;
418  cass_blob_fetch.reset(new CCassBlobFetch(orig_blob_request, cass_blob_id));
419  cass_blob_fetch->SetLoader(load_task);
420 
421  // Blob props have already been received
422  cass_blob_fetch->SetBlobPropSent();
423 
424  if (x_CheckExcludeBlobCache(cass_blob_fetch.get(),
425  false) == ePSGS_SkipRetrieving)
426  return;
427 
428  load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
429  load_task->SetErrorCB(
430  CGetBlobErrorCallback(this, m_BlobErrorCB, cass_blob_fetch.get()));
431  load_task->SetPropsCallback(nullptr);
432  load_task->SetChunkCallback(
433  CBlobChunkCallback(this, m_BlobChunkCB, cass_blob_fetch.get()));
434 
435  if (m_Request->NeedTrace()) {
436  m_Reply->SendTrace(
437  "Cassandra request: " + ToJsonString(*load_task),
438  m_Request->GetStartTimestamp());
439  }
440 
441  m_FetchDetails.push_back(std::move(cass_blob_fetch));
442 
443  load_task->Wait();
444 }
445 
446 
447 void
449  CBlobRecord const & blob,
450  bool info_blob_only)
451 {
452  auto * app = CPubseqGatewayApp::GetInstance();
453 
454  // Translate sat to keyspace
455  SCass_BlobId info_blob_id(m_Id2Info->GetSat(), m_Id2Info->GetInfo()); // mandatory
456 
457  if (!info_blob_id.MapSatToKeyspace()) {
458  // Error: send it in the context of the blob props
459  string message = "Error mapping id2 info sat (" +
460  to_string(m_Id2Info->GetSat()) +
461  ") to a cassandra keyspace for the blob " +
462  fetch_details->GetBlobId().ToString();
463  x_PrepareBlobPropMessage(fetch_details, message,
466  app->GetCounters().Increment(this,
469  PSG_ERROR(message);
470  return;
471  }
472 
473  // Note: the blobs from a public keyspace may only point to the blobs in a
474  // public keyspace. A similar rule is about a secure keyspace: HUP blobs
475  // point only to HUP blobs. Thus if a keyspace is secure then the MyNCBI
476  // resultion has already happened and can be used here.
477 
478  shared_ptr<CCassConnection> cass_connection;
479 
480  try {
481  if (info_blob_id.m_IsSecureKeyspace.value()) {
482  cass_connection = info_blob_id.m_Keyspace->GetSecureConnection(
483  m_UserName.value());
484  if (!cass_connection) {
485  x_PrepareBlobPropMessage(fetch_details,
486  "id2 info sat connection unauthorized "
487  "for the blob " + fetch_details->GetBlobId().ToString(),
491  return;
492  }
493  } else {
494  cass_connection = info_blob_id.m_Keyspace->GetConnection();
495  }
496  } catch (const exception & exc) {
497  x_PrepareBlobPropMessage(fetch_details,
498  "id2 info sat connection authorization error: " +
499  string(exc.what()),
503  return;
504  } catch (...) {
505  x_PrepareBlobPropMessage(fetch_details,
506  "id2 info sat connection authorization unknown error",
510  return;
511  }
512 
513  auto trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
514  if (m_Request->NeedTrace())
516 
517  // Create the Id2Info requests.
518  // eUnknownTSE is treated in the blob prop handler as to do nothing (no
519  // sending completion message, no requesting other blobs)
520  // eUnknownUseCache is safe here; no further resolution
521  // empty client_id means that later on there will be no exclude blobs cache ops
522  string info_blob_id_as_str = info_blob_id.ToString();
524  info_blob_request(SPSGS_BlobId(info_blob_id_as_str),
525  INT64_MIN,
528  fetch_details->GetClientId(), 0, 0,
529  m_Request->GetIncludeHUP(), trace_flag,
530  m_Request->NeedProcessorEvents(),
531  vector<string>(), vector<string>(),
532  psg_clock_t::now());
533 
534  // Prepare Id2Info retrieval
535  unique_ptr<CCassBlobFetch> cass_blob_fetch;
536  cass_blob_fetch.reset(new CCassBlobFetch(info_blob_request, info_blob_id));
537  bool info_blob_requested = false;
538 
539  if (x_CheckExcludeBlobCache(cass_blob_fetch.get(),
540  true) == ePSGS_ProceedRetrieving) {
541  unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
542  CPSGCache psg_cache(m_Request, m_Reply);
543  auto blob_prop_cache_lookup_result =
544  psg_cache.LookupBlobProp(
545  this,
546  info_blob_id.m_Sat,
547  info_blob_id.m_SatKey,
548  info_blob_request.m_LastModified,
549  *blob_record.get());
550  CCassBlobTaskLoadBlob * load_task = nullptr;
551 
552 
553  if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
554  load_task = new CCassBlobTaskLoadBlob(
555  cass_connection,
556  info_blob_id.m_Keyspace->keyspace,
557  move(blob_record),
558  true, nullptr);
559  } else {
560  // The handler deals with both kind of blob requests:
561  // - by sat/sat_key
562  // - by seq_id/seq_id_type
563  // So get the reference to the blob base request
564  auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
565 
566  if (blob_request.m_UseCache == SPSGS_RequestBase::ePSGS_CacheOnly) {
567  // No need to continue; it is forbidded to look for blob props in
568  // the Cassandra DB
569  string message;
570 
571  if (blob_prop_cache_lookup_result == ePSGS_CacheNotHit) {
572  message = "Blob properties are not found";
574  x_PrepareBlobPropMessage(fetch_details, message,
577  } else {
578  message = "Blob properties are not found due to LMDB cache error";
580  x_PrepareBlobPropMessage(fetch_details, message,
583  }
584 
585  PSG_WARNING(message);
586  return;
587  }
588 
589  load_task = new CCassBlobTaskLoadBlob(
590  cass_connection,
591  info_blob_id.m_Keyspace->keyspace,
592  info_blob_id.m_SatKey,
593  true, nullptr);
594  }
595  cass_blob_fetch->SetLoader(load_task);
596 
597  load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
598  load_task->SetErrorCB(
601  this, _1, _2, _3, _4, _5),
602  cass_blob_fetch.get()));
603  load_task->SetPropsCallback(
604  CBlobPropCallback(this,
606  this, _1, _2, _3),
607  m_Request, m_Reply, cass_blob_fetch.get(),
608  blob_prop_cache_lookup_result != ePSGS_CacheHit));
609  load_task->SetChunkCallback(
610  CBlobChunkCallback(this,
612  this, _1, _2, _3, _4, _5),
613  cass_blob_fetch.get()));
614 
615  if (m_Request->NeedTrace()) {
616  m_Reply->SendTrace("Cassandra request: " +
617  ToJsonString(*load_task),
618  m_Request->GetStartTimestamp());
619  }
620 
621  m_RequestedID2BlobChunks.push_back(info_blob_id_as_str);
622  info_blob_requested = true;
623  cass_blob_fetch->SetNeedAddId2ChunkId2Info(true);
624  m_FetchDetails.push_back(move(cass_blob_fetch));
625  }
626 
627  auto to_init_iter = m_FetchDetails.end();
628  --to_init_iter;
629 
630  // We may need to request ID2 chunks
631  if (!info_blob_only) {
632  // Sat name is the same
633  x_RequestId2SplitBlobs(fetch_details);
634  } else {
635  // This is the case when only split INFO has been requested.
636  // So may be it is necessary to request some more chunks
637  x_DecideToRequestMoreChunksForSmartTSE(fetch_details, info_blob_id);
638  }
639 
640  // initiate retrieval: only those which were just created
641  if (!info_blob_requested) {
642  // If the info blob was not retrieved then the to_init_iter points to
643  // the previous fetch for which Wait() has been invoked before
644  ++to_init_iter;
645  }
646 
647  while (to_init_iter != m_FetchDetails.end()) {
648  if (*to_init_iter)
649  (*to_init_iter)->GetLoader()->Wait();
650  ++to_init_iter;
651  }
652 }
653 
654 
655 void
657 {
658  auto trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
659  if (m_Request->NeedTrace())
661 
662  for (int chunk_no = 1; chunk_no <= m_Id2Info->GetChunks(); ++chunk_no) {
663  SCass_BlobId chunks_blob_id(m_Id2Info->GetSat(),
664  m_Id2Info->GetInfo() - m_Id2Info->GetChunks() - 1 + chunk_no);
665  // Note: the mapping must be good becuse the same mapping was done
666  // before
667  chunks_blob_id.MapSatToKeyspace();
668 
669  // Note: the blobs from a public keyspace may only point to the blobs in a
670  // public keyspace. A similar rule is about a secure keyspace: HUP blobs
671  // point only to HUP blobs. Thus if a keyspace is secure then the MyNCBI
672  // resultion has already happened and can be used here.
673  shared_ptr<CCassConnection> cass_connection;
674 
675  try {
676  if (chunks_blob_id.m_IsSecureKeyspace.value()) {
677  cass_connection = chunks_blob_id.m_Keyspace->GetSecureConnection(
678  m_UserName.value());
679  if (!cass_connection) {
680  x_PrepareBlobPropMessage(fetch_details,
681  "id2 split chunk sat connection unauthorized "
682  "for the blob " + fetch_details->GetBlobId().ToString(),
686  return;
687  }
688  } else {
689  cass_connection = chunks_blob_id.m_Keyspace->GetConnection();
690  }
691  } catch (const exception & exc) {
692  x_PrepareBlobPropMessage(fetch_details,
693  "id2 split chunk sat connection authorization error: " +
694  string(exc.what()),
698  return;
699  } catch (...) {
700  x_PrepareBlobPropMessage(fetch_details,
701  "id2 split chunk sat connection authorization unknown error",
705  return;
706  }
707 
708  // eUnknownTSE is treated in the blob prop handler as to do nothing (no
709  // sending completion message, no requesting other blobs)
710  // eUnknownUseCache is safe here; no further resolution required
711  string chunks_blob_id_as_str = chunks_blob_id.ToString();
713  chunk_request(SPSGS_BlobId(chunks_blob_id_as_str),
714  INT64_MIN,
717  fetch_details->GetClientId(), 0, 0,
718  m_Request->GetIncludeHUP(),
719  trace_flag,
720  m_Request->NeedProcessorEvents(),
721  vector<string>(), vector<string>(),
722  psg_clock_t::now());
723 
724  unique_ptr<CCassBlobFetch> details;
725  details.reset(new CCassBlobFetch(chunk_request, chunks_blob_id));
726 
727  // Check the already sent cache
728  if (x_CheckExcludeBlobCache(details.get(),
729  true) == ePSGS_SkipRetrieving) {
730  continue;
731  }
732 
733 
734  unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
735  CPSGCache psg_cache(m_Request, m_Reply);
736  auto blob_prop_cache_lookup_result =
737  psg_cache.LookupBlobProp(
738  this,
739  chunks_blob_id.m_Sat,
740  chunks_blob_id.m_SatKey,
741  chunk_request.m_LastModified,
742  *blob_record.get());
743  CCassBlobTaskLoadBlob * load_task = nullptr;
744 
745  if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
746  load_task = new CCassBlobTaskLoadBlob(
747  cass_connection,
748  chunks_blob_id.m_Keyspace->keyspace,
749  move(blob_record),
750  true, nullptr);
751  details->SetLoader(load_task);
752  } else {
753  // The handler deals with both kind of blob requests:
754  // - by sat/sat_key
755  // - by seq_id/seq_id_type
756  // So get the reference to the blob base request
757  auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
758 
759  if (blob_request.m_UseCache == SPSGS_RequestBase::ePSGS_CacheOnly) {
760  // No need to create a request because the Cassandra DB access
761  // is forbidden
762  string message;
763  if (blob_prop_cache_lookup_result == ePSGS_CacheNotHit) {
764  message = "Blob properties are not found";
766  x_PrepareBlobPropMessage(details.get(), message,
769  } else {
770  message = "Blob properties are not found "
771  "due to a blob proc cache lookup error";
773  x_PrepareBlobPropMessage(details.get(), message,
776  }
777  PSG_WARNING(message);
778  continue;
779  }
780 
781  load_task = new CCassBlobTaskLoadBlob(
782  cass_connection,
783  chunks_blob_id.m_Keyspace->keyspace,
784  chunks_blob_id.m_SatKey,
785  true, nullptr);
786  details->SetLoader(load_task);
787  }
788 
789  load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
790  load_task->SetErrorCB(
793  this, _1, _2, _3, _4, _5),
794  details.get()));
795  load_task->SetPropsCallback(
796  CBlobPropCallback(this,
798  this, _1, _2, _3),
799  m_Request, m_Reply, details.get(),
800  blob_prop_cache_lookup_result != ePSGS_CacheHit));
801  load_task->SetChunkCallback(
802  CBlobChunkCallback(this,
804  this, _1, _2, _3, _4, _5),
805  details.get()));
806 
807  m_RequestedID2BlobChunks.push_back(chunks_blob_id_as_str);
808  details->SetNeedAddId2ChunkId2Info(true);
809  m_FetchDetails.push_back(move(details));
810  }
811 }
812 
813 
814 void
816  CCassBlobFetch * fetch_details,
817  SCass_BlobId const & info_blob_id)
818 {
819  // More chunks should be requested for a specific case:
820  // - ID/get request
821  // - the 'tse; request option is set to 'smart'
822  // - blob is splitted
823  // - blob has a large size so the only split INFO has been requested
824 
825  // The last two criterias have been checked before
826 
827  auto request_type = m_Request->GetRequestType();
828  if (request_type != CPSGS_Request::ePSGS_BlobBySeqIdRequest) {
829  if (m_Request->NeedTrace()) {
830  m_Reply->SendTrace("Decision: split info of " + info_blob_id.ToString() +
831  " will not be collected for further analysis "
832  "because it is not the ID/get request",
833  m_Request->GetStartTimestamp());
834  }
835  return;
836  }
837 
838  auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
839  if (blob_request.m_TSEOption != SPSGS_BlobRequestBase::ePSGS_SmartTSE) {
840  if (m_Request->NeedTrace()) {
841  m_Reply->SendTrace("Decision: split info of " + info_blob_id.ToString() +
842  " will not be collected for further analysis "
843  "because the ID/get request is not "
844  "with 'tse' option set to 'smart'",
845  m_Request->GetStartTimestamp());
846  }
847  return;
848  }
849 
851  if (m_Request->NeedTrace()) {
852  m_Reply->SendTrace("Decision: split info of " + info_blob_id.ToString() +
853  " will not be collected for further analysis "
854  "because of the failure to construct CSeq_id from the "
855  "resolved input seq_id (see an applog warning)",
856  m_Request->GetStartTimestamp());
857  }
858  return;
859  }
860 
861  // Here: the split INFO chunks must be fed to function which may tell that
862  // some more chunks need to be sent
863  m_InfoBlobId = info_blob_id;
864 
865  // Check the cached info first
866  auto * app = CPubseqGatewayApp::GetInstance();
867  auto cache_search_result = app->GetSplitInfoCache()->GetBlob(info_blob_id);
868  if (cache_search_result.has_value()) {
869  // The chunks are in cache
870  if (m_Request->NeedTrace()) {
871  m_Reply->SendTrace("Extra split info blob for " + info_blob_id.ToString() +
872  " is already in cache. Using the blob to request extra chunks",
873  m_Request->GetStartTimestamp());
874  }
875  vector<int> extra_chunks;
876  try {
877  extra_chunks = psg::GetBioseqChunks(m_ResolvedSeqID,
878  *cache_search_result.value());
879  } catch (const exception & exc) {
880  PSG_ERROR("Error getting bioseq chunks from split info: " +
881  string(exc.what()));
882  return;
883  } catch (...) {
884  PSG_ERROR("Unknown error of getting bioseq chunks from split info");
885  return;
886  }
887  x_RequestMoreChunksForSmartTSE(fetch_details, extra_chunks, false);
888  return;
889  }
890 
891  // Not in cache; collect the chunks
892  m_CollectSplitInfo = true;
893 
894  if (m_Request->NeedTrace()) {
895  m_Reply->SendTrace("Decision: split info of " + info_blob_id.ToString() +
896  " will be collected for further analysis",
897  m_Request->GetStartTimestamp());
898  }
899 }
900 
901 
903 {
904  if (m_Request->NeedTrace()) {
905  m_Reply->SendTrace("Deserializing collected split info from the buffer "
906  "for further analysis",
907  m_Request->GetStartTimestamp());
908  }
909 
910  // The Gzip flag has been memorized when the split info blob props were
911  // received
913 
914  // Add to cache
915  auto * app = CPubseqGatewayApp::GetInstance();
917 
918  try {
919  blob = m_CollectedSplitInfo.DeserializeSplitInfo();
920  } catch (const exception & exc) {
921  PSG_ERROR("Error deserializing split info: " + string(exc.what()));
922  return;
923  } catch (...) {
924  PSG_ERROR("Unknown error of deserializing split info");
925  return;
926  }
927 
928  app->GetSplitInfoCache()->AddBlob(m_InfoBlobId, blob);
929 
930  // Calculate the extra chunks
931  vector<int> extra_chunks;
932  try {
933  extra_chunks = psg::GetBioseqChunks(m_ResolvedSeqID, *blob);
934  } catch (const exception & exc) {
935  PSG_ERROR("Error getting bioseq chunks from split info: " +
936  string(exc.what()));
937  return;
938  } catch (...) {
939  PSG_ERROR("Unknown error of getting bioseq chunks from split info");
940  return;
941  }
942 
943  x_RequestMoreChunksForSmartTSE(fetch_details, extra_chunks, true);
944 }
945 
946 
948  const vector<int> & extra_chunks,
949  bool need_wait)
950 {
951  auto trace_flag = SPSGS_RequestBase::ePSGS_NoTracing;
952  if (m_Request->NeedTrace())
954 
955  for (auto chunk_no : extra_chunks) {
956  SCass_BlobId chunks_blob_id(m_Id2Info->GetSat(),
957  m_Id2Info->GetInfo() - m_Id2Info->GetChunks() - 1 + chunk_no);
958  chunks_blob_id.m_Keyspace = m_InfoBlobId.m_Keyspace;
960  string chunks_blob_id_as_str = chunks_blob_id.ToString();
961 
962  // eUnknownTSE is treated in the blob prop handler as to do nothing (no
963  // sending completion message, no requesting other blobs)
964  // eUnknownUseCache is safe here; no further resolution required
965  // client_id is "" (empty string) so the split blobs do not participate
966  // in the exclude blob cache
968  chunk_request(SPSGS_BlobId(chunks_blob_id_as_str),
969  INT64_MIN,
972  fetch_details->GetClientId(), 0, 0,
973  m_Request->GetIncludeHUP(),
974  trace_flag,
975  m_Request->NeedProcessorEvents(),
976  vector<string>(), vector<string>(),
977  psg_clock_t::now());
978 
979  unique_ptr<CCassBlobFetch> details;
980  details.reset(new CCassBlobFetch(chunk_request, chunks_blob_id));
981 
982  // Check the already sent cache
983  if (x_CheckExcludeBlobCache(details.get(),
984  true) == ePSGS_SkipRetrieving) {
985  continue;
986  }
987 
988  // Note: the blobs from a public keyspace may only point to the blobs in a
989  // public keyspace. A similar rule is about a secure keyspace: HUP blobs
990  // point only to HUP blobs. Thus if a keyspace is secure then the MyNCBI
991  // resultion has already happened and can be used here.
992  shared_ptr<CCassConnection> cass_connection;
993 
994  try {
995  if (chunks_blob_id.m_IsSecureKeyspace.value()) {
996  cass_connection = chunks_blob_id.m_Keyspace->GetSecureConnection(
997  m_UserName.value());
998  if (!cass_connection) {
999  x_PrepareBlobPropMessage(fetch_details,
1000  "id2 split chunk sat connection unauthorized "
1001  "for the blob " + fetch_details->GetBlobId().ToString(),
1005  return;
1006  }
1007  } else {
1008  cass_connection = chunks_blob_id.m_Keyspace->GetConnection();
1009  }
1010  } catch (const exception & exc) {
1011  x_PrepareBlobPropMessage(fetch_details,
1012  "id2 split chunk sat connection authorization error: " +
1013  string(exc.what()),
1017  return;
1018  } catch (...) {
1019  x_PrepareBlobPropMessage(fetch_details,
1020  "id2 split chunk sat connection authorization unknown error",
1024  return;
1025  }
1026 
1027  unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
1028  CPSGCache psg_cache(m_Request, m_Reply);
1029  auto blob_prop_cache_lookup_result =
1030  psg_cache.LookupBlobProp(
1031  this,
1032  chunks_blob_id.m_Sat,
1033  chunks_blob_id.m_SatKey,
1034  chunk_request.m_LastModified,
1035  *blob_record.get());
1036  CCassBlobTaskLoadBlob * load_task = nullptr;
1037 
1038  if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
1039  load_task = new CCassBlobTaskLoadBlob(
1040  cass_connection,
1041  chunks_blob_id.m_Keyspace->keyspace,
1042  move(blob_record),
1043  true, nullptr);
1044  details->SetLoader(load_task);
1045  } else {
1046  // The handler is only for the ID/get i.e. blob by seq_id/seq_id_type
1047  // Get the reference to the blob base request
1048  auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
1049 
1050  if (blob_request.m_UseCache == SPSGS_RequestBase::ePSGS_CacheOnly) {
1051  // No need to create a request because the Cassandra DB access
1052  // is forbidden
1053  string message;
1054  if (blob_prop_cache_lookup_result == ePSGS_CacheNotHit) {
1055  message = "Blob properties are not found";
1057  x_PrepareBlobPropMessage(details.get(), message,
1060  } else {
1061  message = "Blob properties are not found "
1062  "due to a blob proc cache lookup error";
1064  x_PrepareBlobPropMessage(details.get(), message,
1067  }
1068  PSG_WARNING(message);
1069  continue;
1070  }
1071 
1072  load_task = new CCassBlobTaskLoadBlob(
1073  cass_connection,
1074  chunks_blob_id.m_Keyspace->keyspace,
1075  chunks_blob_id.m_SatKey,
1076  true, nullptr);
1077  details->SetLoader(load_task);
1078  }
1079 
1080  load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
1081  load_task->SetErrorCB(
1082  CGetBlobErrorCallback(this,
1084  this, _1, _2, _3, _4, _5),
1085  details.get()));
1086  load_task->SetPropsCallback(
1087  CBlobPropCallback(this,
1089  this, _1, _2, _3),
1090  m_Request, m_Reply, details.get(),
1091  blob_prop_cache_lookup_result != ePSGS_CacheHit));
1092  load_task->SetChunkCallback(
1093  CBlobChunkCallback(this,
1095  this, _1, _2, _3, _4, _5),
1096  details.get()));
1097 
1098  if (m_Request->NeedTrace()) {
1099  m_Reply->SendTrace("Requesting extra chunk from INFO for the 'smart' tse option: " +
1100  chunks_blob_id.ToString(),
1101  m_Request->GetStartTimestamp());
1102  }
1103 
1104  m_RequestedID2BlobChunks.push_back(chunks_blob_id_as_str);
1105  details->SetNeedAddId2ChunkId2Info(true);
1106  m_FetchDetails.push_back(move(details));
1107 
1108  if (need_wait) {
1109  // Needed only when the method is called from the last chunk
1110  // receive context
1111  load_task->Wait();
1112  }
1113  }
1114 }
1115 
1116 
1119  bool need_add_id2_chunk_id2_info)
1120 {
1121  if (!fetch_details->IsBlobFetch())
1122  return ePSGS_ProceedRetrieving;
1123  if (fetch_details->GetClientId().empty())
1124  return ePSGS_ProceedRetrieving;
1125 
1126  bool completed = true;
1127  psg_time_point_t completed_time;
1128  auto cache_result = fetch_details->AddToExcludeBlobCache(
1129  completed, completed_time);
1130 
1131  auto request_type = m_Request->GetRequestType();
1132  if (request_type != CPSGS_Request::ePSGS_AnnotationRequest &&
1133  request_type != CPSGS_Request::ePSGS_BlobBySeqIdRequest) {
1134  // Only ID/get and ID/get_na may need to skip a blob
1135  return ePSGS_ProceedRetrieving;
1136  }
1137 
1138  if (cache_result == ePSGS_Added)
1139  return ePSGS_ProceedRetrieving;
1140 
1141  unsigned long resend_timeout_mks;
1142  if (request_type == CPSGS_Request::ePSGS_AnnotationRequest) {
1143  auto & annot_request = m_Request->GetRequest<SPSGS_AnnotRequest>();
1144  resend_timeout_mks = annot_request.m_ResendTimeoutMks;
1145  } else {
1146  // This is CPSGS_Request::ePSGS_BlobBySeqIdRequest request
1147  auto & blob_request = m_Request->GetRequest<SPSGS_BlobBySeqIdRequest>();
1148  resend_timeout_mks = blob_request.m_ResendTimeoutMks;
1149  }
1150 
1151  if (resend_timeout_mks == 0) {
1152  // Essentially it is the same as auto blob skipping is set to off
1153  return ePSGS_ProceedRetrieving;
1154  }
1155 
1156  // In case the blob is in process of sending the reply is the same for
1157  // ID/get and ID/get_na requests
1158  if (!completed) {
1160  need_add_id2_chunk_id2_info);
1161  return ePSGS_SkipRetrieving;
1162  }
1163 
1164  // Here: the blob is in case and has already been sent so the
1165  // resend_timeout needs to be respected when a decision send it or not is
1166  // made
1167  unsigned long sent_mks_ago = GetTimespanToNowMks(completed_time);
1168  if (sent_mks_ago < resend_timeout_mks) {
1169  // No sending the blob; it was sent recent enough
1170  x_PrepareBlobExcluded(fetch_details, sent_mks_ago,
1171  resend_timeout_mks - sent_mks_ago,
1172  need_add_id2_chunk_id2_info);
1173  return ePSGS_SkipRetrieving;
1174  }
1175 
1176  // Sending the blob anyway; it was longer than the resend
1177  // timeout or resend_timeout is 0.
1178  // Also need to do two more things:
1179  // - mark the blob in cache as in-progress again
1180  // - make a note in the fetch details that it needs to update
1181  // the cache as completed once blob is finished
1182  auto * app = CPubseqGatewayApp::GetInstance();
1183 
1184  // 'false' means not-completed, i.e. in-progress
1185  app->GetExcludeBlobCache()->SetCompleted(
1186  fetch_details->GetClientId(),
1187  SExcludeBlobId(fetch_details->GetBlobId().m_Sat,
1188  fetch_details->GetBlobId().m_SatKey),
1189  false);
1190  fetch_details->SetExcludeBlobCacheUpdated(true);
1191  return ePSGS_ProceedRetrieving;
1192 }
1193 
1194 
1195 void
1197  int code,
1198  EDiagSev severity,
1199  const string & message)
1200 {
1201  if (fetch_details->IsBlobPropStage()) {
1202  x_PrepareBlobPropMessage(fetch_details, message,
1204  code, severity);
1205  } else {
1206  x_PrepareBlobMessage(fetch_details, message,
1208  code, severity);
1209  }
1210 }
1211 
1212 
1213 void
1215  CRequestStatus::ECode status,
1216  int code,
1217  EDiagSev severity,
1218  const string & message)
1219 {
1220  CRequestContextResetter context_resetter;
1221  m_Request->SetRequestContext();
1222 
1223  // It could be a message or an error
1224  CountError(fetch_details, status, code, severity, message,
1226  bool is_error = IsError(severity);
1227 
1228  if (is_error) {
1229  PrepareServerErrorMessage(fetch_details, code, severity, message);
1230 
1231  // Remove from the already-sent cache if necessary
1232  fetch_details->RemoveFromExcludeBlobCache();
1233 
1234  // If it is an error then regardless what stage it was, props or
1235  // chunks, there will be no more activity
1236  fetch_details->GetLoader()->ClearError();
1237  fetch_details->SetReadFinished();
1238  } else {
1239  if (fetch_details->IsBlobPropStage())
1240  x_PrepareBlobPropMessage(fetch_details, message, status,
1241  code, severity);
1242  else
1243  x_PrepareBlobMessage(fetch_details, message, status,
1244  code, severity);
1245 
1246  // To avoid sending an error in Peek()
1247  fetch_details->GetLoader()->ClearError();
1248  }
1249 }
1250 
1251 
1252 void
1254  CCassBlobFetch * fetch_details,
1255  const unsigned char * chunk_data,
1256  unsigned int data_size,
1257  int chunk_no)
1258 {
1259  CRequestContextResetter context_resetter;
1260  m_Request->SetRequestContext();
1261 
1262  if (cancelled) {
1263  fetch_details->GetLoader()->Cancel();
1264  fetch_details->GetLoader()->ClearError();
1265  fetch_details->SetReadFinished();
1266  return;
1267  }
1268  if (m_Reply->IsFinished()) {
1270  this,
1272  PSG_ERROR("Unexpected data received "
1273  "while the output has finished, ignoring");
1274  return;
1275  }
1276 
1277  if (chunk_no >= 0) {
1278  if (m_Request->NeedTrace()) {
1279  m_Reply->SendTrace("Blob chunk " + to_string(chunk_no) + " callback",
1280  m_Request->GetStartTimestamp());
1281  }
1282 
1283  // A blob chunk; 0-length chunks are allowed too
1284  x_PrepareBlobData(fetch_details, chunk_data, data_size, chunk_no);
1285 
1286  // Collect split info for further analisys if needed
1287  if (m_CollectSplitInfo) {
1288  if (fetch_details->GetBlobId() == m_InfoBlobId) {
1289  if (m_Request->NeedTrace()) {
1290  m_Reply->SendTrace("Collecting split info in the buffer "
1291  "for further analysis. Chunk number: " +
1292  to_string(chunk_no) + " of size " +
1293  to_string(data_size),
1294  m_Request->GetStartTimestamp());
1295  }
1296  m_CollectedSplitInfo.AddDataChunk(chunk_data, data_size, chunk_no);
1297  }
1298  }
1299  } else {
1300  if (m_Request->NeedTrace()) {
1301  m_Reply->SendTrace("Blob chunk no-more-data callback",
1302  m_Request->GetStartTimestamp());
1303  }
1304 
1305  // End of the blob
1306  x_PrepareBlobCompletion(fetch_details);
1307  fetch_details->GetLoader()->ClearError();
1308  fetch_details->SetReadFinished();
1309 
1310  // Note: no need to set the blob completed in the exclude blob cache.
1311  // It will happen in Peek()
1312 
1313  // Analyse split info if needed
1314  if (m_CollectSplitInfo) {
1315  if (fetch_details->GetBlobId() == m_InfoBlobId) {
1316  x_DeserializeSplitInfo(fetch_details);
1317  }
1318  }
1319  }
1320 }
1321 
1322 
1323 void
1325 {
1326  // Not found, report 502 because it is data inconsistency
1327  // or 404 if it was requested via sat.sat_key
1328  auto * app = CPubseqGatewayApp::GetInstance();
1329  app->GetCounters().Increment(this,
1331 
1332  auto blob_id = fetch_details->GetBlobId();
1333  string message = "Blob " + blob_id.ToString() +
1334  " properties are not found (last modified: " +
1335  to_string(fetch_details->GetLoader()->GetModified()) + ")";
1336  if (fetch_details->GetFetchType() == ePSGS_BlobBySatSatKeyFetch) {
1337  // User requested wrong sat_key, so it is a client error
1339  x_PrepareBlobPropMessage(fetch_details, message,
1342  } else {
1343  // Server error, data inconsistency
1344  PSG_ERROR(message);
1346  x_PrepareBlobPropMessage(fetch_details, message,
1349  }
1350 
1351  // Remove from the already-sent cache if necessary
1352  fetch_details->RemoveFromExcludeBlobCache();
1353 
1354  fetch_details->GetLoader()->ClearError();
1355  fetch_details->SetReadFinished();
1356 }
1357 
1358 
1359 unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info>
1361  CBlobRecord & blob_prop)
1362 {
1363  string id2_info = blob_prop.GetId2Info();
1364  unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> parsed_id2_info;
1365 
1366  if (id2_info.empty())
1367  return parsed_id2_info;
1368 
1369  string err_msg;
1370  try {
1371  // Note: in case of an error it is already counted in the constructor
1372  // which parses id2 info field
1373  parsed_id2_info.reset(new CPSGS_SatInfoChunksVerFlavorId2Info(blob_prop.GetId2Info()));
1374  return parsed_id2_info;
1375  } catch (const exception & exc) {
1376  err_msg = "Error extracting id2 info for the blob " +
1377  fetch_details->GetBlobId().ToString() + ": " + exc.what();
1378  } catch (...) {
1379  err_msg = "Unknown error extracting id2 info for the blob " +
1380  fetch_details->GetBlobId().ToString() + ".";
1381  }
1382 
1383  err_msg += "\nThe broken id2 info field content will be discarded "
1384  "before sending to the client.";
1385  blob_prop.SetId2Info("");
1386 
1387  m_Reply->PrepareProcessorMessage(
1388  m_Reply->GetItemId(),
1391 
1392  PSG_ERROR(err_msg);
1393  return parsed_id2_info;
1394 }
1395 
1396 
1397 bool
1399 {
1400  auto & blob_request = m_Request->GetRequest<SPSGS_BlobRequestBase>();
1401  if (blob_request.m_TSEOption == SPSGS_BlobRequestBase::ePSGS_OrigTSE)
1402  return false;
1403 
1404  return m_Id2Info.get() != nullptr && m_NeedToParseId2Info == false;
1405 }
1406 
1407 
1408 int64_t
1410 {
1411  // Note: this member is called only when m_Id2Info is parsed successfully
1412 
1413  auto blob_key = fetch_details->GetBlobId().m_SatKey;
1414  auto orig_blob_info = m_Id2Info->GetInfo();
1415  if (orig_blob_info == blob_key) {
1416  // It is a split info chunk so use a special value
1417  return kSplitInfoChunk;
1418  }
1419 
1420  // Calculate the id2_chunk
1421  return blob_key - orig_blob_info + m_Id2Info->GetChunks() + 1;
1422 }
1423 
1424 
1425 bool
1427  const SCass_BlobId & blob_id,
1428  const CBlobRecord & blob,
1429  const TAuthToken & auth_token)
1430 {
1431  // Future extension: at the moment there is only one blob operation and
1432  // there are no authorization tokens. Later they may come to play
1433 
1434  // By some reasons the function deals not only with the authorization but
1435  // with withdrawal and blob publication date (confidentiality)
1436 
1437  // Note: the IsConfidential() needs to be checked only if it is not a
1438  // secure keyspace. For the secure keyspace the confidential blobs need to
1439  // be supplied anyway.
1440  bool is_secure_keyspace = false;
1441  if (blob_id.m_IsSecureKeyspace.has_value()) {
1442  is_secure_keyspace = blob_id.m_IsSecureKeyspace.value();
1443  }
1444 
1445  if (!is_secure_keyspace) {
1446  if (blob.IsConfidential()) {
1447  if (m_Request->NeedTrace()) {
1448  m_Reply->SendTrace(
1449  "Blob " + blob_id.ToString() + " is not authorized "
1450  "because it is confidential", m_Request->GetStartTimestamp());
1451  }
1452  return false;
1453  }
1454  }
1455 
1456  if (blob.GetFlag(EBlobFlags::eWithdrawn)) {
1457  if (m_Request->NeedTrace()) {
1458  m_Reply->SendTrace(
1459  "Blob " + blob_id.ToString() + " is not authorized "
1460  "because it is withdrawn", m_Request->GetStartTimestamp());
1461  }
1462  return false;
1463  }
1464 
1465  return true;
1466 }
1467 
1468 
1469 void
1471  CBlobRecord const & blob)
1472 {
1473  bool need_id2_identification = blob_fetch_details->GetNeedAddId2ChunkId2Info();
1474 
1475  // CXX-11547: may be public comments request is needed as well
1476  if (blob.GetFlag(EBlobFlags::eSuppress) ||
1478  // Request public comment
1479  auto app = CPubseqGatewayApp::GetInstance();
1480  unique_ptr<CCassPublicCommentFetch> comment_fetch_details;
1481  comment_fetch_details.reset(new CCassPublicCommentFetch());
1482  // Memorize the identification which will be used at the moment of
1483  // sending the comment to the client
1484  if (need_id2_identification) {
1485  comment_fetch_details->SetId2Identification(
1486  x_GetId2ChunkNumber(blob_fetch_details),
1487  m_Id2Info->Serialize());
1488  } else {
1489  comment_fetch_details->SetCassBlobIdentification(
1490  blob_fetch_details->GetBlobId(),
1491  m_LastModified);
1492  }
1493 
1494  // Note: the blobs from a public keyspace may only point to the blobs in a
1495  // public keyspace. A similar rule is about a secure keyspace: HUP blobs
1496  // point only to HUP blobs. Thus if a keyspace is secure then the MyNCBI
1497  // resultion has already happened and can be used here.
1498 
1499  shared_ptr<CCassConnection> cass_connection;
1500 
1501  try {
1502  if (blob_fetch_details->GetBlobId().m_IsSecureKeyspace.value()) {
1503  cass_connection = blob_fetch_details->GetBlobId().m_Keyspace->GetSecureConnection(
1504  m_UserName.value());
1505  if (!cass_connection) {
1507  return;
1508  }
1509  } else {
1510  cass_connection = blob_fetch_details->GetBlobId().m_Keyspace->GetConnection();
1511  }
1512  } catch (const exception & exc) {
1514  return;
1515  } catch (...) {
1517  return;
1518  }
1519 
1522  cass_connection,
1523  blob_fetch_details->GetBlobId().m_Keyspace->keyspace,
1524  blob, nullptr);
1525  comment_fetch_details->SetLoader(load_task);
1526  load_task->SetDataReadyCB(m_Reply->GetDataReadyCB());
1527  load_task->SetErrorCB(
1529  this,
1531  this, _1, _2, _3, _4, _5),
1532  comment_fetch_details.get()));
1533  load_task->SetCommentCallback(
1535  this,
1537  this, _1, _2, _3),
1538  comment_fetch_details.get()));
1539  load_task->SetMessages(app->GetPublicCommentsMapping());
1540 
1541  if (m_Request->NeedTrace()) {
1542  m_Reply->SendTrace(
1543  "Cassandra request: " +
1544  ToJsonString(*load_task),
1545  m_Request->GetStartTimestamp());
1546  }
1547 
1548  m_FetchDetails.push_back(move(comment_fetch_details));
1549  load_task->Wait(); // Initiate cassandra request
1550  }
1551 
1552 
1553  if (need_id2_identification) {
1554  m_Reply->PrepareTSEBlobPropData(
1555  blob_fetch_details, m_ProcessorId,
1556  x_GetId2ChunkNumber(blob_fetch_details), m_Id2Info->Serialize(),
1557  ToJsonString(blob));
1558  } else {
1559  // There is no id2info in the originally requested blob
1560  // so just send blob props without id2_chunk/id2_info
1561  m_Reply->PrepareBlobPropData(
1562  blob_fetch_details, m_ProcessorId,
1563  ToJsonString(blob), m_LastModified);
1564  }
1565 }
1566 
1567 
1568 void
1570 {
1571  if (fetch_details->GetNeedAddId2ChunkId2Info()) {
1572  m_Reply->PrepareTSEBlobPropCompletion(fetch_details, m_ProcessorId);
1573  } else {
1574  // There is no id2info in the originally requested blob
1575  // so just send blob prop completion without id2_chunk/id2_info
1576  m_Reply->PrepareBlobPropCompletion(fetch_details, m_ProcessorId);
1577  }
1578 }
1579 
1580 
1581 void
1583  const unsigned char * chunk_data,
1584  unsigned int data_size,
1585  int chunk_no)
1586 {
1587  if (fetch_details->GetNeedAddId2ChunkId2Info()) {
1588  m_Reply->PrepareTSEBlobData(
1589  fetch_details, m_ProcessorId,
1590  chunk_data, data_size, chunk_no,
1591  x_GetId2ChunkNumber(fetch_details), m_Id2Info->Serialize());
1592  } else {
1593  // There is no id2info in the originally requested blob
1594  // so just send blob prop completion without id2_chunk/id2_info
1595  m_Reply->PrepareBlobData(fetch_details, m_ProcessorId,
1596  chunk_data, data_size, chunk_no,
1597  m_LastModified);
1598  }
1599 }
1600 
1601 
1602 void
1604 {
1605  if (fetch_details->GetNeedAddId2ChunkId2Info()) {
1606  m_Reply->PrepareTSEBlobCompletion(fetch_details, m_ProcessorId);
1607  } else {
1608  // There is no id2info in the originally requested blob
1609  // so just send blob prop completion without id2_chunk/id2_info
1610  m_Reply->PrepareBlobCompletion(fetch_details, m_ProcessorId);
1611  }
1612 }
1613 
1614 
1615 void
1617  const string & message,
1618  CRequestStatus::ECode status,
1619  int err_code,
1620  EDiagSev severity)
1621 {
1622  if (fetch_details->GetNeedAddId2ChunkId2Info()) {
1623  m_Reply->PrepareTSEBlobPropMessage(
1624  fetch_details, m_ProcessorId,
1625  x_GetId2ChunkNumber(fetch_details), m_Id2Info->Serialize(),
1626  message, status, err_code, severity);
1627  } else {
1628  // There is no id2info in the originally requested blob
1629  // so just send blob prop completion without id2_chunk/id2_info
1630  m_Reply->PrepareBlobPropMessage(
1631  fetch_details, m_ProcessorId,
1632  message, status, err_code, severity);
1633  }
1634 
1635  x_PrepareBlobPropCompletion(fetch_details);
1636 }
1637 
1638 
1639 void
1641  const string & message,
1642  CRequestStatus::ECode status,
1643  int err_code,
1644  EDiagSev severity)
1645 {
1646  if (fetch_details->GetNeedAddId2ChunkId2Info()) {
1647  m_Reply->PrepareTSEBlobMessage(
1648  fetch_details, m_ProcessorId,
1649  x_GetId2ChunkNumber(fetch_details), m_Id2Info->Serialize(),
1650  message, status, err_code, severity);
1651  } else {
1652  // There is no id2info in the originally requested blob
1653  // so just send blob prop completion without id2_chunk/id2_info
1654  m_Reply->PrepareBlobMessage(
1655  fetch_details, m_ProcessorId,
1656  message, status, err_code, severity, m_LastModified);
1657  }
1658 
1659  x_PrepareBlobCompletion(fetch_details);
1660 }
1661 
1662 
1663 void
1665  EPSGS_BlobSkipReason skip_reason,
1666  bool need_add_id2_chunk_id2_info)
1667 {
1668  if (skip_reason == ePSGS_BlobExcluded || !need_add_id2_chunk_id2_info) {
1669  m_Reply->PrepareBlobExcluded(fetch_details->GetBlobId().ToString(),
1670  m_ProcessorId, skip_reason,
1671  m_LastModified);
1672  return;
1673  }
1674 
1675  // It is sent or in progress and need to add more info
1676  // NOTE: the blob id argument is temporary to satisfy the older clients
1677  m_Reply->PrepareTSEBlobExcluded(m_ProcessorId, skip_reason,
1678  fetch_details->GetBlobId().ToString(),
1679  x_GetId2ChunkNumber(fetch_details),
1680  m_Id2Info->Serialize());
1681 }
1682 
1683 
1684 void
1686  unsigned long sent_mks_ago,
1687  unsigned long until_resend_mks,
1688  bool need_add_id2_chunk_id2_info)
1689 {
1690  // Note: this version of the method is used only for the ID/get requests so
1691  // the additional resend related fields need to be supplied
1692 
1693  if (need_add_id2_chunk_id2_info) {
1694  // NOTE: the blob id argument is temporary to satisfy the older clients
1695  m_Reply->PrepareTSEBlobExcluded(fetch_details->GetBlobId().ToString(),
1696  x_GetId2ChunkNumber(fetch_details),
1697  m_Id2Info->Serialize(),
1698  m_ProcessorId, sent_mks_ago,
1699  until_resend_mks);
1700  } else {
1701  m_Reply->PrepareBlobExcluded(fetch_details->GetBlobId().ToString(),
1702  m_ProcessorId, sent_mks_ago, until_resend_mks,
1703  m_LastModified);
1704  }
1705 }
1706 
1707 
1708 void
1710  CCassPublicCommentFetch * fetch_details,
1711  CRequestStatus::ECode status,
1712  int code,
1713  EDiagSev severity,
1714  const string & message)
1715 {
1716  if (m_Canceled) {
1717  fetch_details->GetLoader()->Cancel();
1718  fetch_details->GetLoader()->ClearError();
1719  fetch_details->SetReadFinished();
1720  return;
1721  }
1722 
1723  CRequestContextResetter context_resetter;
1724  m_Request->SetRequestContext();
1725 
1726  // It could be a message or an error
1727  CountError(fetch_details, status, code, severity, message,
1729  bool is_error = IsError(severity);
1730 
1731  m_Reply->PrepareProcessorMessage(
1732  m_Reply->GetItemId(),
1733  m_ProcessorId, message, status, code, severity);
1734 
1735  // To avoid sending an error in Peek()
1736  fetch_details->GetLoader()->ClearError();
1737 
1738  if (is_error) {
1739  // If it is an error then there will be no more activity
1740  fetch_details->SetReadFinished();
1741  }
1742 
1743  // Note: is it necessary to call something like x_Peek() of the actual
1744  // processor class to send this immediately? It should work without
1745  // this call and at the moment x_Peek() is not available here
1746  // if (m_Reply->IsOutputReady())
1747  // x_Peek(false);
1748 }
1749 
1750 
1751 void
1753  CCassPublicCommentFetch * fetch_details,
1754  string comment,
1755  bool is_found)
1756 {
1757  CRequestContextResetter context_resetter;
1758  m_Request->SetRequestContext();
1759 
1760  fetch_details->GetLoader()->ClearError();
1761  fetch_details->SetReadFinished();
1762 
1763  if (m_Canceled) {
1764  fetch_details->GetLoader()->Cancel();
1765  return;
1766  }
1767 
1768  if (m_Request->NeedTrace()) {
1769  m_Reply->SendTrace(
1770  "Public comment callback; found: " + to_string(is_found),
1771  m_Request->GetStartTimestamp());
1772  }
1773 
1774  if (is_found) {
1775  if (fetch_details->GetIdentification() ==
1777  m_Reply->PreparePublicComment(
1778  m_ProcessorId, comment,
1779  fetch_details->GetId2Chunk(),
1780  fetch_details->GetId2Info());
1781  } else {
1782  // There is no id2info in the originally requested blob
1783  // so just send blob prop completion without id2_chunk/id2_info
1784  m_Reply->PreparePublicComment(
1785  m_ProcessorId, comment,
1786  fetch_details->GetBlobId().ToString(),
1787  fetch_details->GetLastModified());
1788  }
1789  }
1790 
1791  // Note: is it necessary to call something like x_Peek() of the actual
1792  // processor class to send this immediately? It should work without
1793  // this call and at the moment x_Peek() is not available here
1794  // if (m_Reply->IsOutputReady())
1795  // x_Peek(false);
1796 }
1797 
1798 
1799 void
1801  CBlobRecord const & blob,
1802  const unsigned char * chunk_data,
1803  unsigned int data_size,
1804  int chunk_no)
1805 {
1806  if (!m_NeedFallbackBlob) {
1807  // As usual; no need to request anything extra or skip
1808  m_BlobChunkCB(fetch_details, blob, chunk_data, data_size, chunk_no);
1809  return;
1810  }
1811 
1812  if (!m_FallbackBlobRequested) {
1813  // Chunk came when there were no previous problems; so as usual
1814  m_BlobChunkCB(fetch_details, blob, chunk_data, data_size, chunk_no);
1815  return;
1816  }
1817 
1818  // Here: chunk came when there was an error before and fallback blob has
1819  // been already requested. Check if it is an ID2 chunk.
1820  string blob_id_as_str = fetch_details->GetBlobId().ToString();
1821  if (find(m_RequestedID2BlobChunks.begin(),
1822  m_RequestedID2BlobChunks.end(), blob_id_as_str) ==
1823  m_RequestedID2BlobChunks.end()) {
1824  // It is not an ID2 chunk; proceed as usual
1825  m_BlobChunkCB(fetch_details, blob, chunk_data, data_size, chunk_no);
1826  return;
1827  }
1828 
1829  if (fetch_details->GetTotalSentBlobChunks() > 0) {
1830  // Some chunks of that blob have already been sent. To avoid breaking
1831  // the chunks consistency, e.g. the final closing PSG-Chunk, let's let
1832  // it be finished as usual
1833  m_BlobChunkCB(fetch_details, blob, chunk_data, data_size, chunk_no);
1834  return;
1835  }
1836 
1837  if (m_Request->NeedTrace()) {
1838  string msg = "Receiving an ID2 blob " + blob_id_as_str +
1839  " chunk " + to_string(chunk_no) +
1840  " when a fallback original blob has been requested. "
1841  "Ignore and continue.";
1842  m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
1843  }
1844 
1845  // Discarding the chunk.
1846  fetch_details->RemoveFromExcludeBlobCache();
1847  fetch_details->GetLoader()->Cancel();
1848  fetch_details->GetLoader()->ClearError();
1849  fetch_details->SetReadFinished();
1850 }
1851 
1852 
1853 void
1855  CBlobRecord const & blob,
1856  bool is_found)
1857 {
1858  if (!m_NeedFallbackBlob) {
1859  // As usual
1860  m_BlobPropsCB(fetch_details, blob, is_found);
1861  return;
1862  }
1863 
1864  string blob_id_as_str = fetch_details->GetBlobId().ToString();
1865  if (find(m_RequestedID2BlobChunks.begin(),
1866  m_RequestedID2BlobChunks.end(), blob_id_as_str) ==
1867  m_RequestedID2BlobChunks.end()) {
1868  // It is not an ID2 chunk; proceed as usual
1869  m_BlobPropsCB(fetch_details, blob, is_found);
1870  return;
1871  }
1872 
1873  string msg;
1874 
1875  if (!m_FallbackBlobRequested) {
1876  if (is_found) {
1877  // As usual; blob props are found and everything is going well
1878  m_BlobPropsCB(fetch_details, blob, is_found);
1879  return;
1880  }
1881 
1882  // Here: blob prop not found and falback has not been requested yet.
1883  m_FallbackBlobRequested = true;
1884  fetch_details->GetLoader()->Cancel();
1885  fetch_details->GetLoader()->ClearError();
1886  fetch_details->SetReadFinished();
1887 
1888  msg = "Blob " + blob_id_as_str + " properties are not found. "
1889  "Falling back to retrieve the original blob.";
1890 
1891  CRequestContextResetter context_resetter;
1892  m_Request->SetRequestContext();
1893 
1894  PSG_WARNING(msg);
1895 
1896  // Request original blob as a fallback
1898 
1899  // Send a processor message
1900  m_Reply->PrepareProcessorMessage(
1901  m_Reply->GetItemId(),
1904  eDiag_Warning);
1905 
1906  return;
1907  }
1908 
1909  // Here: fallback has already been requested. The blob props are found or
1910  // not.
1911 
1912  if (m_Request->NeedTrace()) {
1913  if (is_found) {
1914  msg = "Blob " + blob_id_as_str + " properties are received when "
1915  "a fallback to request the original blob "
1916  "has already been initiated before.";
1917  } else {
1918  msg = "Blob " + blob_id_as_str + " properties are not found. "
1919  "Fallback to request the original blob "
1920  "has already been initiated before.";
1921  }
1922 
1923  m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
1924  }
1925 
1926  // To prevent chunks coming, let's cancel the fetch
1927  fetch_details->GetLoader()->Cancel();
1928  fetch_details->GetLoader()->ClearError();
1929  fetch_details->SetReadFinished();
1930 }
1931 
1932 
1933 void
1935  CRequestStatus::ECode status,
1936  int code,
1937  EDiagSev severity,
1938  const string & message)
1939 {
1940  if (!m_NeedFallbackBlob) {
1941  // As usual
1942  m_BlobErrorCB(fetch_details, status, code, severity, message);
1943  return;
1944  }
1945 
1946  string blob_id_as_str = fetch_details->GetBlobId().ToString();
1947  if (find(m_RequestedID2BlobChunks.begin(),
1948  m_RequestedID2BlobChunks.end(), blob_id_as_str) ==
1949  m_RequestedID2BlobChunks.end()) {
1950  // It is not an ID2 chunk; proceed as usual
1951  m_BlobErrorCB(fetch_details, status, code, severity, message);
1952  return;
1953  }
1954 
1955  // Not found chunk is an error here regardless of the reported severity
1956  bool is_error = IsError(severity) || (status == CRequestStatus::e404_NotFound);
1957  if (!is_error) {
1958  // It is not an error; could be a warning or just information.
1959  // Proceed as usual.
1960  m_BlobErrorCB(fetch_details, status, code, severity, message);
1961  return;
1962  }
1963 
1964  // Remove from the already-sent cache if necessary
1965  fetch_details->RemoveFromExcludeBlobCache();
1966 
1967  // If it is an error then regardless what stage it was, props or
1968  // chunks, there will be no more activity
1969  fetch_details->GetLoader()->Cancel();
1970  fetch_details->GetLoader()->ClearError();
1971  fetch_details->SetReadFinished();
1972 
1973  string msg;
1974  if (!m_FallbackBlobRequested) {
1975  // It is an error and fallback has not been requested yet.
1976  // Let's initiate it.
1977 
1978  m_FallbackBlobRequested = true;
1979 
1980  msg = "Blob " + blob_id_as_str + " retrieval error. "
1981  "Fallback to request the original blob is initiated.\n"
1982  "Callback error message: " + message;
1983 
1984  CRequestContextResetter context_resetter;
1985  m_Request->SetRequestContext();
1986 
1987  PSG_WARNING(msg);
1988 
1989  // Request original blob as a fallback
1991 
1992  if (fetch_details->GetTotalSentBlobChunks() == 0) {
1993  m_Reply->PrepareProcessorMessage(
1994  m_Reply->GetItemId(), m_ProcessorId, msg,
1997  } else {
1998  x_PrepareBlobMessage(fetch_details,
2001  eDiag_Warning);
2002  }
2003  return;
2004  }
2005 
2006  // It's an error but a fallback has already been requested because of a
2007  // different event
2008  msg = "Blob " + blob_id_as_str + " retrieval error. "
2009  "Fallback to request the original blob has already been initiated before.\n"
2010  "Callback error message: " + message;
2011 
2012  if (m_Request->NeedTrace()) {
2013  m_Reply->SendTrace(msg, m_Request->GetStartTimestamp());
2014  }
2015 
2016  if (fetch_details->GetTotalSentBlobChunks() == 0) {
2017  m_Reply->PrepareProcessorMessage(
2018  m_Reply->GetItemId(), m_ProcessorId, msg,
2021  } else {
2022  x_PrepareBlobMessage(fetch_details,
2025  eDiag_Warning);
2026  }
2027 }
2028 
CBlobRecord & SetId2Info(string const &value)
bool GetFlag(EBlobFlags flag_value) const
TTimestamp GetModified() const
TSize GetSize() const
string GetId2Info() const
bool IsConfidential() const
bool GetNeedAddId2ChunkId2Info(void) const
Definition: cass_fetch.hpp:361
int32_t GetTotalSentBlobChunks(void) const
Definition: cass_fetch.hpp:341
CCassBlobTaskLoadBlob * GetLoader(void)
Definition: cass_fetch.hpp:367
SPSGS_BlobRequestBase::EPSGS_TSEOption GetTSEOption(void) const
Definition: cass_fetch.hpp:335
bool IsBlobPropStage(void) const
Definition: cass_fetch.hpp:355
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
Definition: load_blob.cpp:161
void SetPropsCallback(TBlobPropsCallback callback)
Definition: load_blob.cpp:151
void SetChunkCallback(TBlobChunkCallbackEx callback)
Definition: load_blob.cpp:146
CBlobRecord::TTimestamp GetModified() const
Definition: load_blob.hpp:151
void SetErrorCB(TDataErrorCallback error_cb)
virtual void Cancel()
EPSGS_CacheAddResult AddToExcludeBlobCache(bool &completed, psg_time_point_t &completed_time)
Definition: cass_fetch.cpp:61
void RemoveFromExcludeBlobCache(void)
Definition: cass_fetch.cpp:41
void SetReadFinished(void)
Definition: cass_fetch.hpp:89
bool IsBlobFetch(void) const
Definition: cass_fetch.hpp:117
string GetClientId(void) const
Definition: cass_fetch.hpp:127
SCass_BlobId GetBlobId(void) const
Definition: cass_fetch.hpp:124
void SetExcludeBlobCacheUpdated(bool value)
Definition: cass_fetch.hpp:134
EPSGS_DbFetchType GetFetchType(void) const
Definition: cass_fetch.hpp:92
string GetId2Info(void) const
Definition: cass_fetch.hpp:564
CCassStatusHistoryTaskGetPublicComment * GetLoader(void)
Definition: cass_fetch.hpp:570
EPSGS_Identification GetIdentification(void) const
Definition: cass_fetch.hpp:534
int64_t GetId2Chunk(void) const
Definition: cass_fetch.hpp:561
SCass_BlobId GetBlobId(void) const
Definition: cass_fetch.hpp:555
int64_t GetLastModified(void) const
Definition: cass_fetch.hpp:558
void SetCommentCallback(TCommentCallback callback)
void SetMessages(shared_ptr< CPSGMessages > messages)
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
EPSGS_CacheLookupResult LookupBlobProp(IPSGS_Processor *processor, int sat, int sat_key, int64_t &last_modified, CBlobRecord &blob_record)
void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)
void x_RequestMoreChunksForSmartTSE(CCassBlobFetch *fetch_details, const vector< int > &extra_chunks, bool need_wait)
unique_ptr< CPSGS_SatInfoChunksVerFlavorId2Info > x_CheckId2Info(CCassBlobFetch *fetch_details, CBlobRecord &blob)
bool NeedToAddId2CunkId2Info(void) const
EPSGS_BlobCacheCheckResult x_CheckExcludeBlobCache(CCassBlobFetch *fetch_details, bool need_add_id2_chunk_id2_info)
void OnGetBlobChunk(bool cancelled, CCassBlobFetch *fetch_details, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)
void x_OnBlobPropOrigTSE(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
TBlobPropsCB m_BlobPropsCB
bool x_IsAuthorized(EPSGS_BlobOp blob_op, const SCass_BlobId &blob_id, const CBlobRecord &blob, const TAuthToken &auth_token)
SCass_BlobId m_InfoBlobId
TBlobErrorCB m_BlobErrorCB
void x_PrepareBlobPropMessage(CCassBlobFetch *fetch_details, const string &message, CRequestStatus::ECode status, int err_code, EDiagSev severity)
void x_RequestId2SplitBlobs(CCassBlobFetch *fetch_details)
void x_RequestOriginalBlobChunks(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
void x_OnBlobPropNotFound(CCassBlobFetch *fetch_details)
CBlobRecord::TTimestamp m_LastModified
void x_OnBlobPropWholeTSE(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
void x_PrepareBlobPropData(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
CCassBlobFetch * m_InitialBlobPropFetch
void x_PrepareBlobCompletion(CCassBlobFetch *fetch_details)
void x_DeserializeSplitInfo(CCassBlobFetch *fetch_details)
void OnGetBlobProp(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)
void x_PrepareBlobData(CCassBlobFetch *fetch_details, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)
void x_BlobChunkCallback(CCassBlobFetch *fetch_details, CBlobRecord const &blob, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)
void x_BlobPropsCallback(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)
void OnPublicComment(CCassPublicCommentFetch *fetch_details, string comment, bool is_found)
int64_t x_GetId2ChunkNumber(CCassBlobFetch *fetch_details)
void x_OnBlobPropSmartTSE(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
TBlobChunkCB m_BlobChunkCB
void x_RequestID2BlobChunks(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool info_blob_only)
void x_OnBlobPropNoneTSE(CCassBlobFetch *fetch_details)
void x_OnBlobPropSlimTSE(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
void x_PrepareBlobPropCompletion(CCassBlobFetch *fetch_details)
virtual ~CPSGS_CassBlobBase()
vector< string > m_RequestedID2BlobChunks
void x_PrepareBlobExcluded(CCassBlobFetch *fetch_details, EPSGS_BlobSkipReason skip_reason, bool need_add_id2_chunk_id2_info)
void OnPublicCommentError(CCassPublicCommentFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void PrepareServerErrorMessage(CCassBlobFetch *fetch_details, int code, EDiagSev severity, const string &message)
CBlobRecord m_InitialBlobProps
unique_ptr< CPSGS_SatInfoChunksVerFlavorId2Info > m_Id2Info
void x_PrepareBlobMessage(CCassBlobFetch *fetch_details, const string &message, CRequestStatus::ECode status, int err_code, EDiagSev severity)
void x_BlobErrorCallback(CCassBlobFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void x_DecideToRequestMoreChunksForSmartTSE(CCassBlobFetch *fetch_details, SCass_BlobId const &info_blob_id)
void OnGetBlobError(CCassBlobFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
psg::CDataChunkStream m_CollectedSplitInfo
CRequestStatus::ECode CountError(CCassFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message, EPSGS_LoggingFlag logging_flag, EPSGS_StatusUpdateFlag status_update_flag)
void UpdateOverallStatus(CRequestStatus::ECode status)
bool IsError(EDiagSev severity) const
void ReportSecureSatUnauthorized(const string &user_name)
list< unique_ptr< CCassFetch > > m_FetchDetails
@ ePSGS_BlobBySatSatKeyRequest
CPSGSCounters & GetCounters(void)
static CPubseqGatewayApp * GetInstance(void)
shared_ptr< CPSGS_Reply > m_Reply
shared_ptr< CPSGS_Request > m_Request
@ ePSGS_Added
#define true
Definition: bool.h:35
#define false
Definition: bool.h:36
Int8 int64_t
function< void(CCassBlobFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)> TBlobErrorCB
function< void(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)> TBlobPropsCB
function< void(CCassBlobFetch *fetch_details, CBlobRecord const &blob, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)> TBlobChunkCB
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
@ eDiag_Warning
Warning message.
Definition: ncbidiag.hpp:652
E_Choice Which(void) const
Which variant is currently selected.
Definition: Seq_id_.hpp:746
@ e_not_set
No variant selected.
Definition: Seq_id_.hpp:94
const int64_t kSplitInfoChunk
Definition: id2info.hpp:42
Defines NCBI C++ diagnostic APIs, classes, and macros.
T max(T x_, T y_)
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
#define PSG_ERROR(message)
#define PSG_WARNING(message)
EPSGS_BlobSkipReason
@ ePSGS_BlobExcluded
@ ePSGS_BlobInProgress
@ ePSGS_CacheHit
@ ePSGS_CacheNotHit
@ ePSGS_NeedStatusUpdate
string TAuthToken
@ ePSGS_BlobBySatSatKeyFetch
@ ePSGS_BadID2Info
@ ePSGS_ID2ChunkErrorAfterFallbackRequested
@ ePSGS_ID2ChunkErrorWithFallback
@ ePSGS_SecureSatUnauthorized
@ ePSGS_NoBlobPropsError
@ ePSGS_NotFoundID2BlobPropWithFallback
@ ePSGS_BlobRetrievalIsNotAuthorized
@ ePSGS_CassConnectionError
psg_clock_t::time_point psg_time_point_t
@ ePSGS_NeedLogging
unsigned long GetTimespanToNowMks(const psg_time_point_t &t_point)
Defines CRequestStatus class for NCBI C++ diagnostic API.
vector< int > GetBioseqChunks(const CSeq_id &seq_id, const CBlobRecord &blob, const unsigned char *data, unsigned int size, int chunk_no)
#define INT64_MIN
Definition: stdint.h:184
string ToString(void) const
CBioseqInfoRecord::TSat m_Sat
optional< SSatInfoEntry > m_Keyspace
optional< bool > m_IsSecureKeyspace
bool MapSatToKeyspace(void)
CBioseqInfoRecord::TSatKey m_SatKey
unsigned long m_ResendTimeoutMks
CBlobRecord::TTimestamp m_LastModified
unsigned long m_ResendTimeoutMks
unsigned long m_SendBlobIfSmall
Definition: inftrees.h:24
Modified on Wed May 29 18:40:03 2024 by modify_doxy.py rev. 669887