NCBI C++ ToolKit
tse_chunk_processor.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: tse_chunk_processor.cpp 102015 2024-03-19 13:18:47Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: get TSE chunk processor
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "tse_chunk_processor.hpp"
35 #include "pubseq_gateway.hpp"
38 #include "get_blob_callback.hpp"
40 
42 
43 using namespace std::placeholders;
44 
45 static const string kTSEChunkProcessorName = "Cassandra-gettsechunk";
46 
47 
49  m_TSEChunkRequest(nullptr)
50 {}
51 
52 
54  shared_ptr<CPSGS_Request> request,
55  shared_ptr<CPSGS_Reply> reply,
56  TProcessorPriority priority,
57  shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> sat_info_chunk_ver_id2info,
58  shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> id_mod_ver_id2info) :
59  CPSGS_CassProcessorBase(request, reply, priority),
61  bind(&CPSGS_TSEChunkProcessor::OnGetBlobProp,
62  this, _1, _2, _3),
63  bind(&CPSGS_TSEChunkProcessor::OnGetBlobChunk,
64  this, _1, _2, _3, _4, _5),
65  bind(&CPSGS_TSEChunkProcessor::OnGetBlobError,
66  this, _1, _2, _3, _4, _5)),
67  m_SatInfoChunkVerId2Info(sat_info_chunk_ver_id2info),
68  m_IdModVerId2Info(id_mod_ver_id2info)
69 {
70  // Convenience to avoid calling
71  // m_Request->GetRequest<SPSGS_TSEChunkRequest>() everywhere
72  m_TSEChunkRequest = & request->GetRequest<SPSGS_TSEChunkRequest>();
73 }
74 
75 
77 {
79 }
80 
81 
82 bool
83 CPSGS_TSEChunkProcessor::CanProcess(shared_ptr<CPSGS_Request> request,
84  shared_ptr<CPSGS_Reply> reply) const
85 {
86  if (!IsCassandraProcessorEnabled(request))
87  return false;
88 
89  if (request->GetRequestType() != CPSGS_Request::ePSGS_TSEChunkRequest)
90  return false;
91 
92  auto tse_chunk_request = & request->GetRequest<SPSGS_TSEChunkRequest>();
93 
94  // CXX-11478: some VDB chunks start with 0 but in Cassandra they always
95  // start with 1. Non negative condition is checked at the time when the
96  // request is received.
97  if (tse_chunk_request->m_Id2Chunk == 0)
98  return false;
99 
100  // Check parseability of the id2_info parameter
101  shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> sat_info_chunk_ver_id2info;
102  shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> id_mod_ver_id2info;
103  if (x_DetectId2InfoFlavor(tse_chunk_request->m_Id2Info,
104  sat_info_chunk_ver_id2info,
105  id_mod_ver_id2info) ==
107  return false;
108 
109  // Check the DB availability
110  auto * app = CPubseqGatewayApp::GetInstance();
111  auto startup_data_state = app->GetStartupDataState();
112  if (startup_data_state != ePSGS_StartupDataOK) {
113  if (request->NeedTrace()) {
114  reply->SendTrace(kTSEChunkProcessorName + " processor cannot process "
115  " request because Cassandra DB is not available.\n" +
116  GetCassStartupDataStateMessage(startup_data_state),
117  request->GetStartTimestamp());
118  }
119  return false;
120  }
121 
122  return true;
123 }
124 
125 
127 CPSGS_TSEChunkProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
128  shared_ptr<CPSGS_Reply> reply,
129  TProcessorPriority priority) const
130 {
131  if (!CanProcess(request, reply))
132  return nullptr;
133 
134  auto tse_chunk_request = & request->GetRequest<SPSGS_TSEChunkRequest>();
135  shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> sat_info_chunk_ver_id2info;
136  shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> id_mod_ver_id2info;
137 
138  // No need to check the return value. It has been already checked in
139  // CanProcess()
140  x_DetectId2InfoFlavor(tse_chunk_request->m_Id2Info,
141  sat_info_chunk_ver_id2info,
142  id_mod_ver_id2info);
143 
144  return new CPSGS_TSEChunkProcessor(request, reply, priority,
145  sat_info_chunk_ver_id2info,
146  id_mod_ver_id2info);
147 }
148 
149 
152  const string & id2_info,
153  shared_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> & sat_info_chunk_ver_id2info,
154  shared_ptr<CPSGS_IdModifiedVerFlavorId2Info> & id_mod_ver_id2info) const
155 {
156  try {
157  // false -> do not count errors
158  sat_info_chunk_ver_id2info.reset(
159  new CPSGS_SatInfoChunksVerFlavorId2Info(id2_info, false));
161  } catch (...) {
162  // Parsing error: may be it is another id2_info format
163  }
164 
165  try {
166  id_mod_ver_id2info.reset(
167  new CPSGS_IdModifiedVerFlavorId2Info(id2_info));
169  } catch (...) {
170  // Parsing error: may be it is for another processor
171  }
173 }
174 
175 
177  const string & info,
178  unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> & id2_info,
179  const SCass_BlobId & blob_id,
180  bool need_finish)
181 {
182  string err_msg;
183  try {
184  id2_info.reset(new CPSGS_SatInfoChunksVerFlavorId2Info(info));
185  return true;
186  } catch (const exception & exc) {
187  err_msg = "Error extracting id2 info for blob " +
188  blob_id.ToString() + ": " + exc.what();
189  } catch (...) {
190  err_msg = "Unknown error while extracting id2 info for blob " +
191  blob_id.ToString();
192  }
193 
194  auto * app = CPubseqGatewayApp::GetInstance();
195  app->GetCounters().Increment(this,
197  if (need_finish) {
202  } else {
203  PSG_WARNING(err_msg);
204  }
205  return false;
206 }
207 
208 
209 bool
211  bool need_finish)
212 {
213  if (blob_id.MapSatToKeyspace())
214  return true;
215 
216  auto * app = CPubseqGatewayApp::GetInstance();
217  app->GetCounters().Increment(this,
219  string msg = "Unknown TSE chunk satellite number " +
220  to_string(blob_id.m_Sat) +
221  " for the blob " + blob_id.ToString();
222  if (need_finish) {
225 
226  // This method is used only in case of the TSE chunk requests.
227  // So in case of errors - synchronous or asynchronous - it is
228  // necessary to finish the reply anyway.
231  } else {
232  PSG_WARNING(msg);
233  }
234  return false;
235 }
236 
237 
239 {
240  // Lock the request for all the cassandra processors so that the other
241  // processors may wait on the event
243 
244  if (m_SatInfoChunkVerId2Info.get() != nullptr) {
246  return;
247  }
248  if (m_IdModVerId2Info.get() != nullptr) {
250  return;
251  }
252 
254  "Logic error: none of the id2_info options were initialized");
255 }
256 
257 
259 {
260  // This option is when id2info came in a shape of
261  // tse_id~~last_modified~~split_version
262 
263  CRequestContextResetter context_resetter;
264  IPSGS_Processor::m_Request->SetRequestContext();
265 
266  auto app = CPubseqGatewayApp::GetInstance();
267  string err_msg;
268 
269  if (!m_IdModVerId2Info->GetTSEId().MapSatToKeyspace()) {
270  app->GetCounters().Increment(this,
272 
273  err_msg = kTSEChunkProcessorName + " failed to map sat " +
274  to_string(m_IdModVerId2Info->GetTSEId().m_Sat) +
275  " to a Cassandra keyspace";
280 
281  if (IPSGS_Processor::m_Reply->IsOutputReady())
282  x_Peek(false);
283  return;
284  }
285 
286  if (m_IdModVerId2Info->GetTSEId().m_IsSecureKeyspace.value()) {
287  // Need the user name from MyNCBI
288  if (!x_GetMyNCBIUser()) {
289  return;
290  }
291  }
292 
294 }
295 
296 
298 {
299  auto app = CPubseqGatewayApp::GetInstance();
300  string err_msg;
301 
302  shared_ptr<CCassConnection> cass_connection;
303  try {
304  if (m_IdModVerId2Info->GetTSEId().m_IsSecureKeyspace.value()) {
305  cass_connection = m_IdModVerId2Info->GetTSEId().m_Keyspace->GetSecureConnection(
306  m_UserName.value());
307  if (!cass_connection) {
310 
311  if (IPSGS_Processor::m_Reply->IsOutputReady())
312  x_Peek(false);
313  return;
314  }
315  } else {
316  cass_connection = m_IdModVerId2Info->GetTSEId().m_Keyspace->GetConnection();
317  }
318  } catch (const exception & exc) {
321 
322  if (IPSGS_Processor::m_Reply->IsOutputReady())
323  x_Peek(false);
324  return;
325  } catch (...) {
328 
329  if (IPSGS_Processor::m_Reply->IsOutputReady())
330  x_Peek(false);
331  return;
332  }
333 
334  // First, check the blob prop cache, may be the requested version matches
335  // the requested one
336  unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
339 
340  // CXX-11478: last modified is to be ignored for now
341  int64_t last_modified = INT64_MIN; // last modified is unknown
342  auto blob_prop_cache_lookup_result =
343  psg_cache.LookupBlobProp(this, m_IdModVerId2Info->GetTSEId().m_Sat,
344  m_IdModVerId2Info->GetTSEId().m_SatKey,
345  last_modified, *blob_record.get());
346  if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
347  do {
348  // Step 1: check the id2info presense
349  if (blob_record->GetId2Info().empty()) {
350  app->GetCounters().Increment(this,
352  PSG_WARNING("Blob " + m_IdModVerId2Info->GetTSEId().ToString() +
353  " properties id2info is empty in cache");
354  break; // Continue with cassandra
355  }
356 
357  // Step 2: check that the id2info is parsable
358  unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> cache_id2_info;
359  // false -> do not finish the request
360  if (!x_ParseTSEChunkId2Info(blob_record->GetId2Info(),
361  cache_id2_info,
362  m_IdModVerId2Info->GetTSEId(),
363  false)) {
364  app->GetCounters().Increment(this,
366  break; // Continue with cassandra
367  }
368 
369  // Step 3: check the split version in cache
370  if (cache_id2_info->GetSplitVersion() != m_IdModVerId2Info->GetSplitVersion()) {
371  app->GetCounters().Increment(this,
373  PSG_WARNING("Blob " + m_IdModVerId2Info->GetTSEId().ToString() +
374  " split version in cache does not match the requested one");
375  break; // Continue with cassandra
376  }
377 
378  app->GetCounters().Increment(this,
380 
381  // Step 4: validate the chunk number
383  cache_id2_info->GetChunks(), false)) {
384  break; // Continue with cassandra
385  }
386 
387  // Step 5: For the target chunk - convert sat to sat name
388  // Chunk's blob id
389  int64_t sat_key;
391  // Special case
392  sat_key = cache_id2_info->GetInfo();
393  } else {
394  // For the target chunk - convert sat to sat name chunk's blob id
395  sat_key = cache_id2_info->GetInfo() -
396  cache_id2_info->GetChunks() - 1 +
398  }
399  SCass_BlobId chunk_blob_id(cache_id2_info->GetSat(), sat_key);
400  if (!x_TSEChunkSatToKeyspace(chunk_blob_id, false)) {
401  break; // Continue with cassandra
402  }
403 
404  // Step 6: search in cache the TSE chunk properties
405  last_modified = INT64_MIN;
406  auto tse_blob_prop_cache_lookup_result = psg_cache.LookupBlobProp(
407  this,
408  chunk_blob_id.m_Sat, chunk_blob_id.m_SatKey,
409  last_modified, *blob_record.get());
410  if (tse_blob_prop_cache_lookup_result != ePSGS_CacheHit) {
411  err_msg = "TSE chunk blob " + chunk_blob_id.ToString() +
412  " properties are not found in cache";
413  if (tse_blob_prop_cache_lookup_result == ePSGS_CacheFailure)
414  err_msg += " due to LMDB error";
415  PSG_WARNING(err_msg);
416  break; // Continue with cassandra
417  }
418 
419  // Step 7: initiate the chunk request
421  if (IPSGS_Processor::m_Request->NeedTrace())
424  chunk_request(SPSGS_BlobId(chunk_blob_id.ToString()),
425  INT64_MIN,
428  "", 0, 0, IPSGS_Processor::m_Request->GetIncludeHUP(),
429  trace_flag,
430  IPSGS_Processor::m_Request->NeedProcessorEvents(),
431  vector<string>(), vector<string>(),
432  psg_clock_t::now());
433 
434  unique_ptr<CCassBlobFetch> fetch_details;
435  fetch_details.reset(new CCassBlobFetch(chunk_request, chunk_blob_id));
436  CCassBlobTaskLoadBlob * load_task =
437  new CCassBlobTaskLoadBlob(cass_connection,
438  chunk_blob_id.m_Keyspace->keyspace,
439  move(blob_record),
440  true, nullptr);
441  fetch_details->SetLoader(load_task);
442  load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
443  load_task->SetErrorCB(
446  this, _1, _2, _3, _4, _5),
447  fetch_details.get(),
449  load_task->SetPropsCallback(
450  CBlobPropCallback(this,
452  this, _1, _2, _3),
455  fetch_details.get(), false));
456  load_task->SetChunkCallback(
457  CBlobChunkCallback(this,
459  this, _1, _2, _3, _4, _5),
460  fetch_details.get(),
462 
463  if (IPSGS_Processor::m_Request->NeedTrace()) {
464  IPSGS_Processor::m_Reply->SendTrace(
465  "Cassandra request: " +
466  ToJsonString(*load_task),
467  IPSGS_Processor::m_Request->GetStartTimestamp());
468  }
469 
470  m_FetchDetails.push_back(move(fetch_details));
471  load_task->Wait(); // Initiate cassandra request
472  return;
473  } while (false);
474  } else {
475  if (psg_cache.IsAllowed()) {
476  err_msg = "Blob " + m_IdModVerId2Info->GetTSEId().ToString() +
477  " properties are not found in cache";
478  if (blob_prop_cache_lookup_result == ePSGS_CacheFailure) {
479  err_msg += " due to LMDB error";
480  PSG_WARNING(err_msg);
481  } else {
482  // This warning could be confusing for the following cases:
483  // - requested blob has been recently removed
484  // but the split history still has the record for it
485  // - so the cache has nothing but split history directs to the
486  // still hanging blo
487  // The data will be sent back sucessfully and a log file will
488  // have a confusing warning. So it was decided to comment the
489  // warning out.
490  // PSG_WARNING(err_msg);
491  }
492  }
493  }
494 
495 
496  // Here:
497  // - fallback to cassandra
498  // - cache is not allowed
499  // - not found in cache
500 
501  // Initiate async the history request
502  unique_ptr<CCassSplitHistoryFetch> fetch_details;
503  fetch_details.reset(new CCassSplitHistoryFetch(*m_TSEChunkRequest,
504  m_IdModVerId2Info->GetTSEId(),
505  m_IdModVerId2Info->GetSplitVersion()));
506  CCassBlobTaskFetchSplitHistory * load_task =
507  new CCassBlobTaskFetchSplitHistory(cass_connection,
508  m_IdModVerId2Info->GetTSEId().m_Keyspace->keyspace,
509  m_IdModVerId2Info->GetTSEId().m_SatKey,
510  m_IdModVerId2Info->GetSplitVersion(),
511  nullptr, nullptr);
512  fetch_details->SetLoader(load_task);
513  load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
514  load_task->SetErrorCB(
516  this,
518  this, _1, _2, _3, _4, _5),
519  fetch_details.get()));
520  load_task->SetConsumeCallback(
522  this,
524  this, _1, _2),
525  fetch_details.get()));
526 
527  if (IPSGS_Processor::m_Request->NeedTrace()) {
528  IPSGS_Processor::m_Reply->SendTrace(
529  "Cassandra request: " +
530  ToJsonString(*load_task),
531  IPSGS_Processor::m_Request->GetStartTimestamp());
532  }
533 
534  m_FetchDetails.push_back(move(fetch_details));
535  load_task->Wait(); // Initiate cassandra request
536 }
537 
538 
540 {
541  auto result = PopulateMyNCBIUser(
543  this, _1, _2),
545  this, _1, _2, _3, _4, _5));
546  switch (result) {
548  // The user name has been populated
549  return true;
553  if (IPSGS_Processor::m_Reply->IsOutputReady())
554  x_Peek(false);
555  return false;
558  // The error handlers have been called while checking the caches.
559  // The error handlers called SignalFinishProcessing()
560  return false;
563  // Wait for a callback which comes from cache or from the my
564  // ncbi access wrapper asynchronously
565  return false;
566  default:
567  break;
568  }
569 
570  // Cannot happened
571  return true;
572 }
573 
574 
576 {
577  // This option is when id2info came in a shape of sat.info.chunks[.ver]
578 
579  string err_msg;
580 
581  // Note: the TSE id (blob id) is not used in the chunk retrieval.
582  // So there is no need to map its sat to keyspace. The TSE id
583  // will be sent to the client as as.
584  // The user provided id2_info is used to calculate the chunk blob_id
585  // so the sat from id2_info will be mapped to the cassandra keyspace
586 
587  // Validate the chunk number
588  // true -> finish if failed
590  m_SatInfoChunkVerId2Info->GetChunks(),
591  true))
592  return;
593 
594 
595  int64_t sat_key;
597  // Special case
598  sat_key = m_SatInfoChunkVerId2Info->GetInfo();
599  } else {
600  // For the target chunk - convert sat to sat name chunk's blob id
601  sat_key = m_SatInfoChunkVerId2Info->GetInfo() -
603  }
607  return;
608 
610  // Need the user name from MyNCBI
611  if (!x_GetMyNCBIUser()) {
612  return;
613  }
614  }
615 
617 }
618 
619 
621 {
622  shared_ptr<CCassConnection> cass_connection;
623  try {
625  cass_connection = m_SatInfoChunkVerBlobId.m_Keyspace->GetSecureConnection(
626  m_UserName.value());
627  if (!cass_connection) {
630 
631  if (IPSGS_Processor::m_Reply->IsOutputReady())
632  x_Peek(false);
633  return;
634  }
635  } else {
636  cass_connection = m_SatInfoChunkVerBlobId.m_Keyspace->GetConnection();
637  }
638  } catch (const exception & exc) {
641 
642  if (IPSGS_Processor::m_Reply->IsOutputReady())
643  x_Peek(false);
644  return;
645  } catch (...) {
648 
649  if (IPSGS_Processor::m_Reply->IsOutputReady())
650  x_Peek(false);
651  return;
652  }
653 
654  // Search in cache the TSE chunk properties
657  int64_t last_modified = INT64_MIN;
658  unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
659  auto tse_blob_prop_cache_lookup_result =
661  last_modified, *blob_record.get());
662 
663 
664  // Initiate the chunk request
666  if (IPSGS_Processor::m_Request->NeedTrace())
670  INT64_MIN,
673  "", 0, 0, IPSGS_Processor::m_Request->GetIncludeHUP(),
674  trace_flag,
675  IPSGS_Processor::m_Request->NeedProcessorEvents(),
676  vector<string>(), vector<string>(),
677  psg_clock_t::now());
678 
679  unique_ptr<CCassBlobFetch> fetch_details;
680  fetch_details.reset(new CCassBlobFetch(chunk_request, m_SatInfoChunkVerBlobId));
681 
682  CCassBlobTaskLoadBlob * load_task = nullptr;
683  if (tse_blob_prop_cache_lookup_result != ePSGS_CacheHit) {
684  // Cassandra should look for blob props as well
685  load_task = new CCassBlobTaskLoadBlob(cass_connection,
688  true, nullptr);
689  } else {
690  // Blob props are already here
691  load_task = new CCassBlobTaskLoadBlob(cass_connection,
693  move(blob_record),
694  true, nullptr);
695  }
696 
697  fetch_details->SetLoader(load_task);
698  load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
699  load_task->SetErrorCB(
702  this, _1, _2, _3, _4, _5),
703  fetch_details.get(),
705  load_task->SetPropsCallback(
706  CBlobPropCallback(this,
708  this, _1, _2, _3),
711  fetch_details.get(), false));
712  load_task->SetChunkCallback(
713  CBlobChunkCallback(this,
715  this, _1, _2, _3, _4, _5),
716  fetch_details.get(),
718 
719  if (IPSGS_Processor::m_Request->NeedTrace()) {
720  IPSGS_Processor::m_Reply->SendTrace(
721  "Cassandra request: " +
722  ToJsonString(*load_task),
723  IPSGS_Processor::m_Request->GetStartTimestamp());
724  }
725 
726  m_FetchDetails.push_back(move(fetch_details));
727  load_task->Wait(); // Initiate cassandra request
728 }
729 
730 
732  CBlobRecord const & blob,
733  bool is_found)
734 {
735  if (m_Canceled) {
737  return;
738  }
739 
740  if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
742  return;
743  }
744 
745  // If the other processor waits then let it go but after sending the signal
746  // of the good data (it may cancel the other processors)
748 
749  // Note: cannot use CPSGS_CassBlobBase::OnGetBlobChunk() anymore because
750  // the reply has to have a few more fields for ID/get_tse_chunk request
751  CRequestContextResetter context_resetter;
752  IPSGS_Processor::m_Request->SetRequestContext();
753 
754  if (IPSGS_Processor::m_Request->NeedTrace()) {
755  IPSGS_Processor::m_Reply->SendTrace(
756  "Blob prop callback; found: " + to_string(is_found),
757  IPSGS_Processor::m_Request->GetStartTimestamp());
758  }
759 
760  if (is_found) {
761  IPSGS_Processor::m_Reply->PrepareTSEBlobPropData(
762  fetch_details, kTSEChunkProcessorName,
765  ToJsonString(blob));
766  IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
767  fetch_details, kTSEChunkProcessorName);
768  } else {
769  // Not found; it is the user error, not the data inconsistency
770  auto * app = CPubseqGatewayApp::GetInstance();
771  app->GetCounters().Increment(this,
773 
774  auto blob_id = fetch_details->GetBlobId();
775  string message = "Blob " + blob_id.ToString() + " properties are not found";
776  PSG_WARNING(message);
778  IPSGS_Processor::m_Reply->PrepareTSEBlobPropMessage(
779  fetch_details, kTSEChunkProcessorName,
783  IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
784  fetch_details, kTSEChunkProcessorName);
785  fetch_details->GetLoader()->ClearError();
786  fetch_details->SetReadFinished();
787  }
788 
789  if (IPSGS_Processor::m_Reply->IsOutputReady())
790  x_Peek(false);
791 }
792 
793 
795  CRequestStatus::ECode status,
796  int code,
797  EDiagSev severity,
798  const string & message)
799 {
800  if (m_Canceled) {
802  return;
803  }
804 
805  // Cannot use CPSGS_CassBlobBase::OnGetBlobError() anymore because
806  // the TSE messages have different parameters
807 
808  CRequestContextResetter context_resetter;
809  IPSGS_Processor::m_Request->SetRequestContext();
810 
811  // It could be a message or an error
812  CountError(fetch_details, status, code, severity, message,
814  bool is_error = IsError(severity);
815 
816  if (fetch_details->IsBlobPropStage()) {
817  IPSGS_Processor::m_Reply->PrepareTSEBlobPropMessage(
818  fetch_details, kTSEChunkProcessorName,
820  message, status, code, severity);
821  IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
822  fetch_details, kTSEChunkProcessorName);
823  } else {
824  IPSGS_Processor::m_Reply->PrepareTSEBlobMessage(
825  fetch_details, kTSEChunkProcessorName,
827  message, status, code, severity);
828  IPSGS_Processor::m_Reply->PrepareTSEBlobCompletion(
829  fetch_details, kTSEChunkProcessorName);
830  }
831 
832  // To avoid sending an error in Peek()
833  fetch_details->GetLoader()->ClearError();
834 
835  if (is_error) {
836  // If it is an error then regardless what stage it was, props or
837  // chunks, there will be no more activity
838  fetch_details->SetReadFinished();
839  }
840 
841  if (IPSGS_Processor::m_Reply->IsOutputReady())
842  x_Peek(false);
843 }
844 
845 
847  CBlobRecord const & blob,
848  const unsigned char * chunk_data,
849  unsigned int data_size,
850  int chunk_no)
851 {
852  // Note: cannot use CPSGS_CassBlobBase::OnGetBlobChunk() anymore because
853  // the reply has to have a few more fields for TSE chunks
854  CRequestContextResetter context_resetter;
855  IPSGS_Processor::m_Request->SetRequestContext();
856 
857  if (m_Canceled) {
858  fetch_details->GetLoader()->Cancel();
859  fetch_details->GetLoader()->ClearError();
860  fetch_details->SetReadFinished();
861  if (IPSGS_Processor::m_Reply->IsOutputReady())
862  x_Peek(false);
863 
865  return;
866  }
867  if (IPSGS_Processor::m_Reply->IsFinished()) {
869  this,
871  PSG_ERROR("Unexpected data received "
872  "while the output has finished, ignoring");
873  if (IPSGS_Processor::m_Reply->IsOutputReady())
874  x_Peek(false);
875  return;
876  }
877 
878  if (chunk_no >= 0) {
879  if (IPSGS_Processor::m_Request->NeedTrace()) {
880  IPSGS_Processor::m_Reply->SendTrace(
881  "Blob chunk " + to_string(chunk_no) + " callback",
882  IPSGS_Processor::m_Request->GetStartTimestamp());
883  }
884 
885  // A blob chunk; 0-length chunks are allowed too
886  IPSGS_Processor::m_Reply->PrepareTSEBlobData(
887  fetch_details, kTSEChunkProcessorName,
888  chunk_data, data_size, chunk_no,
891  } else {
892  if (IPSGS_Processor::m_Request->NeedTrace()) {
893  IPSGS_Processor::m_Reply->SendTrace(
894  "Blob chunk no-more-data callback",
895  IPSGS_Processor::m_Request->GetStartTimestamp());
896  }
897 
898  // End of the blob
899  IPSGS_Processor::m_Reply->PrepareTSEBlobCompletion(
900  fetch_details, kTSEChunkProcessorName);
901  fetch_details->GetLoader()->ClearError();
902  fetch_details->SetReadFinished();
903 
904  // Note: no need to set the blob completed in the exclude blob cache.
905  // It will happen in Peek()
906  }
907 
908  if (IPSGS_Processor::m_Reply->IsOutputReady())
909  x_Peek(false);
910 }
911 
912 
913 void
915  CCassSplitHistoryFetch * fetch_details,
916  CRequestStatus::ECode status,
917  int code,
918  EDiagSev severity,
919  const string & message)
920 {
921  if (m_Canceled) {
923  return;
924  }
925 
926  CRequestContextResetter context_resetter;
927  IPSGS_Processor::m_Request->SetRequestContext();
928 
929  // It could be a message or an error
930  CountError(fetch_details, status, code, severity, message,
932  bool is_error = IsError(severity);
933 
934  IPSGS_Processor::m_Reply->PrepareProcessorMessage(
935  IPSGS_Processor::m_Reply->GetItemId(),
936  kTSEChunkProcessorName, message, status, code, severity);
937 
938  // To avoid sending an error in Peek()
939  fetch_details->GetLoader()->ClearError();
940 
941  if (is_error) {
942  // If it is an error then there will be no more activity
943  fetch_details->SetReadFinished();
944  }
945 
946  if (IPSGS_Processor::m_Reply->IsOutputReady())
947  x_Peek(false);
948 }
949 
950 
951 void
953  CCassSplitHistoryFetch * fetch_details,
954  vector<SSplitHistoryRecord> && result)
955 {
956  CRequestContextResetter context_resetter;
957  IPSGS_Processor::m_Request->SetRequestContext();
958 
959  fetch_details->GetLoader()->ClearError();
960  fetch_details->SetReadFinished();
961 
962  if (m_Canceled) {
963  fetch_details->GetLoader()->Cancel();
965  return;
966  }
967 
968  if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
970  return;
971  }
972 
973  // If the other processor waits then let it go but after sending the signal
974  // of the good data (it may cancel the other processors)
976 
977  if (IPSGS_Processor::m_Request->NeedTrace()) {
978  IPSGS_Processor::m_Reply->SendTrace(
979  "Split history callback; number of records: " + to_string(result.size()),
980  IPSGS_Processor::m_Request->GetStartTimestamp());
981  }
982 
983  auto * app = CPubseqGatewayApp::GetInstance();
984  if (result.empty()) {
985  // Split history is not found
986  app->GetCounters().Increment(this,
988 
989  string message = "Split history version " +
990  to_string(fetch_details->GetSplitVersion()) +
991  " is not found for the TSE id " +
992  fetch_details->GetTSEId().ToString();
993  PSG_WARNING(message);
995  IPSGS_Processor::m_Reply->PrepareProcessorMessage(
996  IPSGS_Processor::m_Reply->GetItemId(),
1000  } else {
1001  // Split history found.
1002  // Note: the request was issued so that there could be exactly one
1003  // split history record or none at all. So it is not checked that
1004  // there are more than one record.
1005  x_RequestTSEChunk(result[0], fetch_details);
1006  }
1007 
1008  if (IPSGS_Processor::m_Reply->IsOutputReady())
1009  x_Peek(false);
1010 }
1011 
1012 
1013 void
1015  const SSplitHistoryRecord & split_record,
1016  CCassSplitHistoryFetch * fetch_details)
1017 {
1018  // Parse id2info
1019  unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> id2_info;
1020  if (!x_ParseTSEChunkId2Info(split_record.id2_info,
1021  id2_info, fetch_details->GetTSEId(), true))
1022  return;
1023 
1024  // Check the requested chunk
1025  // true -> finish the request if failed
1026  if (!x_ValidateTSEChunkNumber(fetch_details->GetChunk(),
1027  id2_info->GetChunks(), true))
1028  return;
1029 
1030  // Resolve sat to satkey
1031  int64_t sat_key;
1032  if (fetch_details->GetChunk() == kSplitInfoChunk) {
1033  // Special case
1034  sat_key = id2_info->GetInfo();
1035  } else {
1036  sat_key = id2_info->GetInfo() - id2_info->GetChunks() - 1 +
1037  fetch_details->GetChunk();
1038  }
1039  SCass_BlobId chunk_blob_id(id2_info->GetSat(), sat_key);
1040  if (!x_TSEChunkSatToKeyspace(chunk_blob_id, true))
1041  return;
1042 
1043  shared_ptr<CCassConnection> cass_connection;
1044  try {
1045  if (chunk_blob_id.m_IsSecureKeyspace.value()) {
1046  cass_connection = chunk_blob_id.m_Keyspace->GetSecureConnection(
1047  m_UserName.value());
1048  if (!cass_connection) {
1051 
1052  if (IPSGS_Processor::m_Reply->IsOutputReady())
1053  x_Peek(false);
1054  return;
1055  }
1056  } else {
1057  cass_connection = chunk_blob_id.m_Keyspace->GetConnection();
1058  }
1059  } catch (const exception & exc) {
1062 
1063  if (IPSGS_Processor::m_Reply->IsOutputReady())
1064  x_Peek(false);
1065  return;
1066  } catch (...) {
1069 
1070  if (IPSGS_Processor::m_Reply->IsOutputReady())
1071  x_Peek(false);
1072  return;
1073  }
1074 
1075  // Look for the blob props
1076  // Form the chunk request with/without blob props
1077  unique_ptr<CBlobRecord> blob_record(new CBlobRecord);
1078  CPSGCache psg_cache(
1079  fetch_details->GetUseCache() != SPSGS_RequestBase::ePSGS_DbOnly,
1082  int64_t last_modified = INT64_MIN; // last modified is unknown
1083  auto blob_prop_cache_lookup_result =
1084  psg_cache.LookupBlobProp(this, chunk_blob_id.m_Sat,
1085  chunk_blob_id.m_SatKey,
1086  last_modified, *blob_record.get());
1087  if (blob_prop_cache_lookup_result != ePSGS_CacheHit &&
1088  fetch_details->GetUseCache() == SPSGS_RequestBase::ePSGS_CacheOnly) {
1089  // Cassandra is forbidden for the blob prop
1090  string err_msg = "TSE chunk blob " + chunk_blob_id.ToString() +
1091  " properties are not found in cache";
1092  if (blob_prop_cache_lookup_result == ePSGS_CacheFailure)
1093  err_msg += " due to LMDB error";
1098  return;
1099  }
1100 
1102  if (IPSGS_Processor::m_Request->NeedTrace())
1105  chunk_request(SPSGS_BlobId(chunk_blob_id.ToString()), INT64_MIN,
1108  "", 0, 0, IPSGS_Processor::m_Request->GetIncludeHUP(),
1109  trace_flag,
1110  IPSGS_Processor::m_Request->NeedProcessorEvents(),
1111  vector<string>(), vector<string>(),
1112  psg_clock_t::now());
1113  unique_ptr<CCassBlobFetch> cass_blob_fetch;
1114  cass_blob_fetch.reset(new CCassBlobFetch(chunk_request, chunk_blob_id));
1115 
1116  CCassBlobTaskLoadBlob * load_task = nullptr;
1117 
1118  if (blob_prop_cache_lookup_result == ePSGS_CacheHit) {
1119  load_task = new CCassBlobTaskLoadBlob(
1120  cass_connection,
1121  chunk_blob_id.m_Keyspace->keyspace,
1122  move(blob_record),
1123  true, nullptr);
1124  } else {
1125  load_task = new CCassBlobTaskLoadBlob(
1126  cass_connection,
1127  chunk_blob_id.m_Keyspace->keyspace,
1128  chunk_blob_id.m_SatKey,
1129  true, nullptr);
1130  }
1131  cass_blob_fetch->SetLoader(load_task);
1132 
1133  load_task->SetDataReadyCB(IPSGS_Processor::m_Reply->GetDataReadyCB());
1134  load_task->SetErrorCB(
1136  this,
1138  this, _1, _2, _3, _4, _5),
1139  cass_blob_fetch.get(),
1141  load_task->SetPropsCallback(
1143  this,
1145  this, _1, _2, _3),
1148  cass_blob_fetch.get(),
1149  blob_prop_cache_lookup_result != ePSGS_CacheHit));
1150  load_task->SetChunkCallback(
1152  this,
1154  this, _1, _2, _3, _4, _5),
1155  cass_blob_fetch.get(),
1157 
1158  if (IPSGS_Processor::m_Request->NeedTrace()) {
1159  IPSGS_Processor::m_Reply->SendTrace(
1160  "Cassandra request: " +
1161  ToJsonString(*load_task),
1162  IPSGS_Processor::m_Request->GetStartTimestamp());
1163  }
1164 
1165  m_FetchDetails.push_back(move(cass_blob_fetch));
1166  load_task->Wait();
1167 }
1168 
1169 
1170 void
1172  CRequestStatus::ECode status,
1173  int code)
1174 {
1175  if (m_Canceled) {
1177  return;
1178  }
1179 
1180  CRequestContextResetter context_resetter;
1181  IPSGS_Processor::m_Request->SetRequestContext();
1182 
1183  IPSGS_Processor::m_Reply->PrepareProcessorMessage(
1184  IPSGS_Processor::m_Reply->GetItemId(),
1185  kTSEChunkProcessorName, msg, status, code, eDiag_Error);
1186  UpdateOverallStatus(status);
1187 
1188  if (status >= CRequestStatus::e400_BadRequest &&
1190  PSG_WARNING(msg);
1191  } else {
1192  PSG_ERROR(msg);
1193  }
1194 }
1195 
1196 
1197 bool
1199  int64_t requested_chunk,
1201  bool need_finish)
1202 {
1203  if (requested_chunk == kSplitInfoChunk) {
1204  // Special value: the info chunk must be provided
1205  return true;
1206  }
1207 
1208  if (requested_chunk > total_chunks) {
1209  string msg = "Invalid chunk requested. "
1210  "The number of available chunks: " +
1211  to_string(total_chunks) + ", requested number: " +
1212  to_string(requested_chunk);
1213  if (need_finish) {
1214  auto * app = CPubseqGatewayApp::GetInstance();
1215  app->GetCounters().Increment(this,
1221  } else {
1222  PSG_WARNING(msg);
1223  }
1224  return false;
1225  }
1226  return true;
1227 }
1228 
1229 
1230 bool
1232 {
1233  if (blob_id.MapSatToKeyspace())
1234  return true;
1235 
1236  auto * app = CPubseqGatewayApp::GetInstance();
1237  app->GetCounters().Increment(this,
1239 
1240  string msg = "Unknown TSE chunk satellite number " +
1241  to_string(blob_id.m_Sat) +
1242  " for the blob " + blob_id.ToString();
1245 
1246  // This method is used only in case of the TSE chunk requests.
1247  // So in case of errors - synchronous or asynchronous - it is
1248  // necessary to finish the reply anyway.
1251  return false;
1252 }
1253 
1254 
1256 {
1257  auto status = CPSGS_CassProcessorBase::GetStatus();
1258  if (status == IPSGS_Processor::ePSGS_InProgress)
1259  return status;
1260 
1261  if (m_Canceled)
1263 
1264  return status;
1265 }
1266 
1267 
1269 {
1270  return kTSEChunkProcessorName;
1271 }
1272 
1273 
1275 {
1277 }
1278 
1279 
1281 {
1282  x_Peek(true);
1283 }
1284 
1285 
1287 {
1288  if (m_Canceled) {
1290  return;
1291  }
1292 
1293  // 1 -> call m_Loader->Wait1 to pick data
1294  // 2 -> check if we have ready-to-send buffers
1295  // 3 -> call reply->Send() to send what we have if it is ready
1296  bool overall_final_state = false;
1297 
1298  while (true) {
1299  auto initial_size = m_FetchDetails.size();
1300 
1301  for (auto & details: m_FetchDetails) {
1302  if (details) {
1303  if (details->InPeek()) {
1304  continue;
1305  }
1306  details->SetInPeek(true);
1307  overall_final_state |= x_Peek(details, need_wait);
1308  details->SetInPeek(false);
1309  }
1310  }
1311 
1312  if (initial_size == m_FetchDetails.size())
1313  break;
1314  }
1315 
1316  // TSE chunk: ready packets need to be sent right away
1318 
1319  if (AreAllFinishedRead() && IsMyNCBIFinished()) {
1321  }
1322 }
1323 
1324 
1325 bool CPSGS_TSEChunkProcessor::x_Peek(unique_ptr<CCassFetch> & fetch_details,
1326  bool need_wait)
1327 {
1328  if (!fetch_details->GetLoader())
1329  return true;
1330 
1331  bool final_state = false;
1332  if (need_wait) {
1333  if (!fetch_details->ReadFinished()) {
1334  final_state = fetch_details->GetLoader()->Wait();
1335  }
1336  }
1337 
1338  if (fetch_details->GetLoader()->HasError() &&
1339  IPSGS_Processor::m_Reply->IsOutputReady() &&
1340  ! IPSGS_Processor::m_Reply->IsFinished()) {
1341  // Send an error
1342  string error = fetch_details->GetLoader()->LastError();
1343  auto * app = CPubseqGatewayApp::GetInstance();
1344 
1345  app->GetCounters().Increment(this, CPSGSCounters::ePSGS_ProcUnknownError);
1346  PSG_ERROR(error);
1347 
1348  CCassBlobFetch * blob_fetch = static_cast<CCassBlobFetch *>(fetch_details.get());
1349  if (blob_fetch->IsBlobPropStage()) {
1350  IPSGS_Processor::m_Reply->PrepareTSEBlobPropMessage(
1351  blob_fetch, kTSEChunkProcessorName,
1355  IPSGS_Processor::m_Reply->PrepareTSEBlobPropCompletion(
1356  blob_fetch, kTSEChunkProcessorName);
1357  } else {
1358  IPSGS_Processor::m_Reply->PrepareTSEBlobMessage(
1359  blob_fetch, kTSEChunkProcessorName,
1363  IPSGS_Processor::m_Reply->PrepareTSEBlobCompletion(
1364  blob_fetch, kTSEChunkProcessorName);
1365  }
1366 
1367  // Mark finished
1369  fetch_details->GetLoader()->ClearError();
1370  fetch_details->SetReadFinished();
1372  }
1373 
1374  return final_state;
1375 }
1376 
1377 
1378 void CPSGS_TSEChunkProcessor::x_OnMyNCBIError(const string & cookie,
1379  CRequestStatus::ECode status,
1380  int code,
1381  EDiagSev severity,
1382  const string & message)
1383 {
1384  if (status == CRequestStatus::e404_NotFound) {
1386  } else {
1387  ReportMyNCBIError(message);
1388  }
1389 
1391 
1392  if (IPSGS_Processor::m_Reply->IsOutputReady())
1393  x_Peek(false);
1394 }
1395 
1396 
1397 void CPSGS_TSEChunkProcessor::x_OnMyNCBIData(const string & cookie,
1399 {
1400  // All good, can proceed
1401  m_UserName = user_info.username;
1402 
1403  if (m_SatInfoChunkVerId2Info.get() != nullptr) {
1405  return;
1406  }
1407  if (m_IdModVerId2Info.get() != nullptr) {
1409  return;
1410  }
1411 }
1412 
const string kCassandraProcessorGroupName
const string kCassandraProcessorEvent
CCassBlobTaskLoadBlob * GetLoader(void)
Definition: cass_fetch.hpp:367
bool IsBlobPropStage(void) const
Definition: cass_fetch.hpp:355
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
void SetConsumeCallback(TConsumeCallback callback)
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
Definition: load_blob.cpp:161
void SetPropsCallback(TBlobPropsCallback callback)
Definition: load_blob.cpp:151
void SetChunkCallback(TBlobChunkCallbackEx callback)
Definition: load_blob.cpp:146
void SetErrorCB(TDataErrorCallback error_cb)
virtual void Cancel()
void SetReadFinished(void)
Definition: cass_fetch.hpp:89
SCass_BlobId GetBlobId(void) const
Definition: cass_fetch.hpp:124
CCassBlobTaskFetchSplitHistory * GetLoader(void)
Definition: cass_fetch.hpp:494
SCass_BlobId GetTSEId(void) const
Definition: cass_fetch.hpp:479
int64_t GetChunk(void) const
Definition: cass_fetch.hpp:482
SPSGS_RequestBase::EPSGS_CacheAndDbUse GetUseCache(void) const
Definition: cass_fetch.hpp:488
int64_t GetSplitVersion(void) const
Definition: cass_fetch.hpp:485
EPSGS_CacheLookupResult LookupBlobProp(IPSGS_Processor *processor, int sat, int sat_key, int64_t &last_modified, CBlobRecord &blob_record)
bool IsAllowed(void) const
@ ePSGS_TSEChunkSplitVersionCacheNotMatched
void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)
CRequestStatus::ECode CountError(CCassFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message, EPSGS_LoggingFlag logging_flag, EPSGS_StatusUpdateFlag status_update_flag)
void UpdateOverallStatus(CRequestStatus::ECode status)
bool IsCassandraProcessorEnabled(shared_ptr< CPSGS_Request > request) const
void ReportMyNCBIError(const string &my_ncbi_message)
bool IsError(EDiagSev severity) const
EPSGS_MyNCBILookupResult PopulateMyNCBIUser(TMyNCBIDataCB data_cb, TMyNCBIErrorCB error_cb)
list< unique_ptr< CCassFetch > > m_FetchDetails
bool AreAllFinishedRead(void) const
IPSGS_Processor::EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
@ ePSGS_SendAccumulated
Definition: psgs_reply.hpp:54
void x_SendProcessorError(const string &msg, CRequestStatus::ECode status, int code)
void OnGetBlobError(CCassBlobFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
virtual void ProcessEvent(void)
Called when an event happened which may require to have some processing.
shared_ptr< CPSGS_IdModifiedVerFlavorId2Info > m_IdModVerId2Info
EPSGSId2InfoFlavor x_DetectId2InfoFlavor(const string &id2_info, shared_ptr< CPSGS_SatInfoChunksVerFlavorId2Info > &sat_info_chunk_ver_id2info, shared_ptr< CPSGS_IdModifiedVerFlavorId2Info > &id_mod_ver_id2info) const
shared_ptr< CPSGS_SatInfoChunksVerFlavorId2Info > m_SatInfoChunkVerId2Info
void OnGetSplitHistoryError(CCassSplitHistoryFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void OnGetSplitHistory(CCassSplitHistoryFetch *fetch_details, vector< SSplitHistoryRecord > &&result)
void OnGetBlobChunk(CCassBlobFetch *fetch_details, CBlobRecord const &blob, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)
void x_OnMyNCBIError(const string &cookie, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
bool x_ValidateTSEChunkNumber(int64_t requested_chunk, CPSGS_SatInfoChunksVerFlavorId2Info::TChunks total_chunks, bool need_finish)
void x_ProcessSatInfoChunkVerId2InfoFinalStage(void)
virtual EPSGS_Status GetStatus(void)
Tells the processor status (if it has finished or in progress)
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const
Tells if processor can process the given request.
virtual IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const
Create processor to fulfil PSG request using the data source.
bool x_TSEChunkSatToKeyspace(SCass_BlobId &blob_id)
virtual string GetName(void) const
Tells the processor name (used in logging and tracing)
void x_OnMyNCBIData(const string &cookie, CPSG_MyNCBIRequest_WhoAmI::SUserInfo user_info)
virtual string GetGroupName(void) const
Tells the processor group name.
virtual void Process(void)
Main processing function.
void x_RequestTSEChunk(const SSplitHistoryRecord &split_record, CCassSplitHistoryFetch *fetch_details)
bool x_ParseTSEChunkId2Info(const string &info, unique_ptr< CPSGS_SatInfoChunksVerFlavorId2Info > &id2_info, const SCass_BlobId &blob_id, bool need_finish)
void OnGetBlobProp(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)
SPSGS_TSEChunkRequest * m_TSEChunkRequest
CPSGSCounters & GetCounters(void)
static CPubseqGatewayApp * GetInstance(void)
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
Int8 int64_t
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
const int64_t kSplitInfoChunk
Definition: id2info.hpp:42
EPSGSId2InfoFlavor
Definition: id2info.hpp:46
@ ePSGS_UnknownId2InfoFlavor
Definition: id2info.hpp:49
@ ePSGS_IdModifiedVerId2InfoFlavor
Definition: id2info.hpp:48
@ ePSGS_SatInfoChunksVerId2InfoFlavor
Definition: id2info.hpp:47
static MDB_envinfo info
Definition: mdb_load.c:37
#define nullptr
Definition: ncbimisc.hpp:45
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
#define PSG_ERROR(message)
#define PSG_WARNING(message)
@ ePSGS_StartupDataOK
@ ePSGS_CacheHit
@ ePSGS_CacheFailure
int TProcessorPriority
@ ePSGS_NeedStatusUpdate
@ ePSGS_UnknownError
@ ePSGS_NoBlobPropsError
@ ePSGS_MalformedParameter
@ ePSGS_UnknownResolvedSatellite
@ ePSGS_InvalidId2Info
@ ePSGS_NoSplitHistoryError
@ ePSGS_NeedLogging
string GetCassStartupDataStateMessage(EPSGS_StartupDataState state)
#define INT64_MIN
Definition: stdint.h:184
string ToString(void) const
CBioseqInfoRecord::TSat m_Sat
optional< SSatInfoEntry > m_Keyspace
optional< bool > m_IsSecureKeyspace
bool MapSatToKeyspace(void)
CBioseqInfoRecord::TSatKey m_SatKey
Definition: inftrees.h:24
@ eTseChunkRetrieve
Definition: timing.hpp:101
else result
Definition: token2.c:20
static const string kTSEChunkProcessorName
USING_NCBI_SCOPE
Modified on Tue Apr 23 07:39:04 2024 by modify_doxy.py rev. 669887