48 using namespace std::placeholders;
53 m_CollectSplitInfo(
false),
54 m_SplitInfoGzipFlag(
false)
59 shared_ptr<CPSGS_Reply> reply,
60 const string & processor_id,
64 m_NeedToParseId2Info(
true),
65 m_ProcessorId(processor_id),
67 m_CollectSplitInfo(
false),
68 m_SplitInfoGzipFlag(
false),
69 m_BlobPropsCB(blob_props_cb),
70 m_BlobChunkCB(blob_chunk_cb),
71 m_BlobErrorCB(blob_error_cb),
72 m_NeedFallbackBlob(
false),
73 m_FallbackBlobRequested(
false)
78 switch (request->GetRequestType()) {
117 m_Reply->SendTrace(
"Blob prop callback; found: " + to_string(is_found),
140 unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info>
148 if (!is_authorized) {
169 m_Id2Info.reset(parsed_id2_info.release());
175 string err_msg =
"Falling back to retrieve "
176 "the original blob due to broken id2 info.";
178 m_Reply->PrepareProcessorMessage(
253 auto fetch_blob = fetch_details->
GetBlobId();
263 unsigned int max_to_send =
max(app->GetSendBlobIfSmall(),
272 if (blob.
GetSize() <= max_to_send) {
282 if (blob.
GetSize() <= max_to_send) {
311 unsigned int max_to_send =
max(app->GetSendBlobIfSmall(),
314 if (blob.
GetSize() <= max_to_send) {
370 auto cass_blob_id = fetch_details->
GetBlobId();
372 shared_ptr<CCassConnection> cass_connection;
374 if (cass_blob_id.m_IsSecureKeyspace.value()) {
375 cass_connection = cass_blob_id.
m_Keyspace->GetSecureConnection(
377 if (!cass_connection) {
383 cass_connection = cass_blob_id.m_Keyspace->GetConnection();
385 }
catch (
const exception & exc) {
399 orig_blob_request(
SPSGS_BlobId(cass_blob_id.ToString()),
404 0, 0,
m_Request->GetIncludeHUP(), trace_flag,
406 vector<string>(), vector<string>(),
410 unique_ptr<CBlobRecord> blob_record(
new CBlobRecord(blob));
413 cass_blob_id.m_Keyspace->keyspace,
414 std::move(blob_record),
417 unique_ptr<CCassBlobFetch> cass_blob_fetch;
418 cass_blob_fetch.reset(
new CCassBlobFetch(orig_blob_request, cass_blob_id));
419 cass_blob_fetch->SetLoader(load_task);
422 cass_blob_fetch->SetBlobPropSent();
459 string message =
"Error mapping id2 info sat (" +
461 ") to a cassandra keyspace for the blob " +
466 app->GetCounters().Increment(
this,
478 shared_ptr<CCassConnection> cass_connection;
482 cass_connection = info_blob_id.
m_Keyspace->GetSecureConnection(
484 if (!cass_connection) {
486 "id2 info sat connection unauthorized "
494 cass_connection = info_blob_id.
m_Keyspace->GetConnection();
496 }
catch (
const exception & exc) {
498 "id2 info sat connection authorization error: " +
506 "id2 info sat connection authorization unknown error",
522 string info_blob_id_as_str = info_blob_id.
ToString();
531 vector<string>(), vector<string>(),
535 unique_ptr<CCassBlobFetch> cass_blob_fetch;
536 cass_blob_fetch.reset(
new CCassBlobFetch(info_blob_request, info_blob_id));
537 bool info_blob_requested =
false;
541 unique_ptr<CBlobRecord> blob_record(
new CBlobRecord);
543 auto blob_prop_cache_lookup_result =
557 std::move(blob_record),
572 message =
"Blob properties are not found";
578 message =
"Blob properties are not found due to LMDB cache error";
595 cass_blob_fetch->SetLoader(load_task);
601 this, _1, _2, _3, _4, _5),
602 cass_blob_fetch.get()));
612 this, _1, _2, _3, _4, _5),
613 cass_blob_fetch.get()));
616 m_Reply->SendTrace(
"Cassandra request: " +
622 info_blob_requested =
true;
623 cass_blob_fetch->SetNeedAddId2ChunkId2Info(
true);
631 if (!info_blob_only) {
641 if (!info_blob_requested) {
649 (*to_init_iter)->GetLoader()->Wait();
662 for (
int chunk_no = 1; chunk_no <=
m_Id2Info->GetChunks(); ++chunk_no) {
673 shared_ptr<CCassConnection> cass_connection;
677 cass_connection = chunks_blob_id.
m_Keyspace->GetSecureConnection(
679 if (!cass_connection) {
681 "id2 split chunk sat connection unauthorized "
689 cass_connection = chunks_blob_id.
m_Keyspace->GetConnection();
691 }
catch (
const exception & exc) {
693 "id2 split chunk sat connection authorization error: " +
701 "id2 split chunk sat connection authorization unknown error",
711 string chunks_blob_id_as_str = chunks_blob_id.
ToString();
721 vector<string>(), vector<string>(),
724 unique_ptr<CCassBlobFetch> details;
734 unique_ptr<CBlobRecord> blob_record(
new CBlobRecord);
736 auto blob_prop_cache_lookup_result =
739 chunks_blob_id.
m_Sat,
749 std::move(blob_record),
751 details->SetLoader(load_task);
764 message =
"Blob properties are not found";
770 message =
"Blob properties are not found "
771 "due to a blob proc cache lookup error";
786 details->SetLoader(load_task);
793 this, _1, _2, _3, _4, _5),
804 this, _1, _2, _3, _4, _5),
808 details->SetNeedAddId2ChunkId2Info(
true);
827 auto request_type =
m_Request->GetRequestType();
830 m_Reply->SendTrace(
"Decision: split info of " + info_blob_id.
ToString() +
831 " will not be collected for further analysis "
832 "because it is not the ID/get request",
841 m_Reply->SendTrace(
"Decision: split info of " + info_blob_id.
ToString() +
842 " will not be collected for further analysis "
843 "because the ID/get request is not "
844 "with 'tse' option set to 'smart'",
852 m_Reply->SendTrace(
"Decision: split info of " + info_blob_id.
ToString() +
853 " will not be collected for further analysis "
854 "because of the failure to construct CSeq_id from the "
855 "resolved input seq_id (see an applog warning)",
867 auto cache_search_result = app->GetSplitInfoCache()->GetBlob(info_blob_id);
868 if (cache_search_result.has_value()) {
871 m_Reply->SendTrace(
"Extra split info blob for " + info_blob_id.
ToString() +
872 " is already in cache. Using the blob to request extra chunks",
875 vector<int> extra_chunks;
878 *cache_search_result.value());
879 }
catch (
const exception & exc) {
880 PSG_ERROR(
"Error getting bioseq chunks from split info: " +
884 PSG_ERROR(
"Unknown error of getting bioseq chunks from split info");
895 m_Reply->SendTrace(
"Decision: split info of " + info_blob_id.
ToString() +
896 " will be collected for further analysis",
905 m_Reply->SendTrace(
"Deserializing collected split info from the buffer "
906 "for further analysis",
920 }
catch (
const exception & exc) {
921 PSG_ERROR(
"Error deserializing split info: " +
string(exc.what()));
924 PSG_ERROR(
"Unknown error of deserializing split info");
931 vector<int> extra_chunks;
934 }
catch (
const exception & exc) {
935 PSG_ERROR(
"Error getting bioseq chunks from split info: " +
939 PSG_ERROR(
"Unknown error of getting bioseq chunks from split info");
948 const vector<int> & extra_chunks,
955 for (
auto chunk_no : extra_chunks) {
960 string chunks_blob_id_as_str = chunks_blob_id.
ToString();
976 vector<string>(), vector<string>(),
979 unique_ptr<CCassBlobFetch> details;
992 shared_ptr<CCassConnection> cass_connection;
996 cass_connection = chunks_blob_id.
m_Keyspace->GetSecureConnection(
998 if (!cass_connection) {
1000 "id2 split chunk sat connection unauthorized "
1008 cass_connection = chunks_blob_id.
m_Keyspace->GetConnection();
1010 }
catch (
const exception & exc) {
1012 "id2 split chunk sat connection authorization error: " +
1020 "id2 split chunk sat connection authorization unknown error",
1027 unique_ptr<CBlobRecord> blob_record(
new CBlobRecord);
1029 auto blob_prop_cache_lookup_result =
1032 chunks_blob_id.
m_Sat,
1035 *blob_record.get());
1042 std::move(blob_record),
1044 details->SetLoader(load_task);
1055 message =
"Blob properties are not found";
1061 message =
"Blob properties are not found "
1062 "due to a blob proc cache lookup error";
1077 details->SetLoader(load_task);
1084 this, _1, _2, _3, _4, _5),
1095 this, _1, _2, _3, _4, _5),
1099 m_Reply->SendTrace(
"Requesting extra chunk from INFO for the 'smart' tse option: " +
1105 details->SetNeedAddId2ChunkId2Info(
true);
1119 bool need_add_id2_chunk_id2_info)
1126 bool completed =
true;
1129 completed, completed_time);
1131 auto request_type =
m_Request->GetRequestType();
1141 unsigned long resend_timeout_mks;
1151 if (resend_timeout_mks == 0) {
1160 need_add_id2_chunk_id2_info);
1168 if (sent_mks_ago < resend_timeout_mks) {
1171 resend_timeout_mks - sent_mks_ago,
1172 need_add_id2_chunk_id2_info);
1185 app->GetExcludeBlobCache()->SetCompleted(
1199 const string & message)
1218 const string & message)
1226 bool is_error =
IsError(severity);
1255 const unsigned char * chunk_data,
1256 unsigned int data_size,
1273 "while the output has finished, ignoring");
1277 if (chunk_no >= 0) {
1279 m_Reply->SendTrace(
"Blob chunk " + to_string(chunk_no) +
" callback",
1290 m_Reply->SendTrace(
"Collecting split info in the buffer "
1291 "for further analysis. Chunk number: " +
1292 to_string(chunk_no) +
" of size " +
1293 to_string(data_size),
1301 m_Reply->SendTrace(
"Blob chunk no-more-data callback",
1329 app->GetCounters().Increment(
this,
1332 auto blob_id = fetch_details->
GetBlobId();
1333 string message =
"Blob " + blob_id.
ToString() +
1334 " properties are not found (last modified: " +
1359 unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info>
1364 unique_ptr<CPSGS_SatInfoChunksVerFlavorId2Info> parsed_id2_info;
1366 if (id2_info.empty())
1367 return parsed_id2_info;
1374 return parsed_id2_info;
1375 }
catch (
const exception & exc) {
1376 err_msg =
"Error extracting id2 info for the blob " +
1379 err_msg =
"Unknown error extracting id2 info for the blob " +
1383 err_msg +=
"\nThe broken id2 info field content will be discarded "
1384 "before sending to the client.";
1387 m_Reply->PrepareProcessorMessage(
1393 return parsed_id2_info;
1414 auto orig_blob_info =
m_Id2Info->GetInfo();
1415 if (orig_blob_info == blob_key) {
1421 return blob_key - orig_blob_info +
m_Id2Info->GetChunks() + 1;
1440 bool is_secure_keyspace =
false;
1445 if (!is_secure_keyspace) {
1449 "Blob " + blob_id.
ToString() +
" is not authorized "
1450 "because it is confidential",
m_Request->GetStartTimestamp());
1459 "Blob " + blob_id.
ToString() +
" is not authorized "
1460 "because it is withdrawn",
m_Request->GetStartTimestamp());
1480 unique_ptr<CCassPublicCommentFetch> comment_fetch_details;
1484 if (need_id2_identification) {
1485 comment_fetch_details->SetId2Identification(
1489 comment_fetch_details->SetCassBlobIdentification(
1499 shared_ptr<CCassConnection> cass_connection;
1505 if (!cass_connection) {
1512 }
catch (
const exception & exc) {
1525 comment_fetch_details->SetLoader(load_task);
1531 this, _1, _2, _3, _4, _5),
1532 comment_fetch_details.get()));
1538 comment_fetch_details.get()));
1539 load_task->
SetMessages(app->GetPublicCommentsMapping());
1543 "Cassandra request: " +
1553 if (need_id2_identification) {
1554 m_Reply->PrepareTSEBlobPropData(
1583 const unsigned char * chunk_data,
1584 unsigned int data_size,
1590 chunk_data, data_size, chunk_no,
1596 chunk_data, data_size, chunk_no,
1617 const string & message,
1623 m_Reply->PrepareTSEBlobPropMessage(
1626 message, status, err_code, severity);
1630 m_Reply->PrepareBlobPropMessage(
1632 message, status, err_code, severity);
1641 const string & message,
1647 m_Reply->PrepareTSEBlobMessage(
1650 message, status, err_code, severity);
1666 bool need_add_id2_chunk_id2_info)
1686 unsigned long sent_mks_ago,
1687 unsigned long until_resend_mks,
1688 bool need_add_id2_chunk_id2_info)
1693 if (need_add_id2_chunk_id2_info) {
1714 const string & message)
1729 bool is_error =
IsError(severity);
1731 m_Reply->PrepareProcessorMessage(
1770 "Public comment callback; found: " + to_string(is_found),
1777 m_Reply->PreparePublicComment(
1784 m_Reply->PreparePublicComment(
1802 const unsigned char * chunk_data,
1803 unsigned int data_size,
1808 m_BlobChunkCB(fetch_details, blob, chunk_data, data_size, chunk_no);
1814 m_BlobChunkCB(fetch_details, blob, chunk_data, data_size, chunk_no);
1825 m_BlobChunkCB(fetch_details, blob, chunk_data, data_size, chunk_no);
1833 m_BlobChunkCB(fetch_details, blob, chunk_data, data_size, chunk_no);
1838 string msg =
"Receiving an ID2 blob " + blob_id_as_str +
1839 " chunk " + to_string(chunk_no) +
1840 " when a fallback original blob has been requested. "
1841 "Ignore and continue.";
1888 msg =
"Blob " + blob_id_as_str +
" properties are not found. "
1889 "Falling back to retrieve the original blob.";
1900 m_Reply->PrepareProcessorMessage(
1914 msg =
"Blob " + blob_id_as_str +
" properties are received when "
1915 "a fallback to request the original blob "
1916 "has already been initiated before.";
1918 msg =
"Blob " + blob_id_as_str +
" properties are not found. "
1919 "Fallback to request the original blob "
1920 "has already been initiated before.";
1938 const string & message)
1980 msg =
"Blob " + blob_id_as_str +
" retrieval error. "
1981 "Fallback to request the original blob is initiated.\n"
1982 "Callback error message: " + message;
1993 m_Reply->PrepareProcessorMessage(
2008 msg =
"Blob " + blob_id_as_str +
" retrieval error. "
2009 "Fallback to request the original blob has already been initiated before.\n"
2010 "Callback error message: " + message;
2017 m_Reply->PrepareProcessorMessage(
CBlobRecord & SetId2Info(string const &value)
bool GetFlag(EBlobFlags flag_value) const
TTimestamp GetModified() const
string GetId2Info() const
bool IsConfidential() const
bool GetNeedAddId2ChunkId2Info(void) const
int32_t GetTotalSentBlobChunks(void) const
CCassBlobTaskLoadBlob * GetLoader(void)
SPSGS_BlobRequestBase::EPSGS_TSEOption GetTSEOption(void) const
bool IsBlobPropStage(void) const
void SetDataReadyCB(shared_ptr< CCassDataCallbackReceiver > callback)
void SetPropsCallback(TBlobPropsCallback callback)
void SetChunkCallback(TBlobChunkCallbackEx callback)
CBlobRecord::TTimestamp GetModified() const
void SetErrorCB(TDataErrorCallback error_cb)
EPSGS_CacheAddResult AddToExcludeBlobCache(bool &completed, psg_time_point_t &completed_time)
void RemoveFromExcludeBlobCache(void)
void SetReadFinished(void)
bool IsBlobFetch(void) const
string GetClientId(void) const
SCass_BlobId GetBlobId(void) const
void SetExcludeBlobCacheUpdated(bool value)
EPSGS_DbFetchType GetFetchType(void) const
EPSGS_CacheLookupResult LookupBlobProp(IPSGS_Processor *processor, int sat, int sat_key, int64_t &last_modified, CBlobRecord &blob_record)
@ ePSGS_ServerSatToSatNameError
@ ePSGS_BlobPropsNotFound
void Increment(IPSGS_Processor *processor, EPSGS_CounterType counter)
void x_RequestMoreChunksForSmartTSE(CCassBlobFetch *fetch_details, const vector< int > &extra_chunks, bool need_wait)
unique_ptr< CPSGS_SatInfoChunksVerFlavorId2Info > x_CheckId2Info(CCassBlobFetch *fetch_details, CBlobRecord &blob)
bool NeedToAddId2CunkId2Info(void) const
EPSGS_BlobCacheCheckResult x_CheckExcludeBlobCache(CCassBlobFetch *fetch_details, bool need_add_id2_chunk_id2_info)
void OnGetBlobChunk(bool cancelled, CCassBlobFetch *fetch_details, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)
void x_OnBlobPropOrigTSE(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
TBlobPropsCB m_BlobPropsCB
bool x_IsAuthorized(EPSGS_BlobOp blob_op, const SCass_BlobId &blob_id, const CBlobRecord &blob, const TAuthToken &auth_token)
bool m_FallbackBlobRequested
EPSGS_BlobCacheCheckResult
@ ePSGS_ProceedRetrieving
SCass_BlobId m_InfoBlobId
TBlobErrorCB m_BlobErrorCB
void x_PrepareBlobPropMessage(CCassBlobFetch *fetch_details, const string &message, CRequestStatus::ECode status, int err_code, EDiagSev severity)
void x_RequestId2SplitBlobs(CCassBlobFetch *fetch_details)
bool m_NeedToParseId2Info
void x_RequestOriginalBlobChunks(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
void x_OnBlobPropNotFound(CCassBlobFetch *fetch_details)
CBlobRecord::TTimestamp m_LastModified
void x_OnBlobPropWholeTSE(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
void x_PrepareBlobPropData(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
CCassBlobFetch * m_InitialBlobPropFetch
void x_PrepareBlobCompletion(CCassBlobFetch *fetch_details)
void x_DeserializeSplitInfo(CCassBlobFetch *fetch_details)
void OnGetBlobProp(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)
void x_PrepareBlobData(CCassBlobFetch *fetch_details, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)
void x_BlobChunkCallback(CCassBlobFetch *fetch_details, CBlobRecord const &blob, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)
void x_BlobPropsCallback(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)
void OnPublicComment(CCassPublicCommentFetch *fetch_details, string comment, bool is_found)
int64_t x_GetId2ChunkNumber(CCassBlobFetch *fetch_details)
void x_OnBlobPropSmartTSE(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
TBlobChunkCB m_BlobChunkCB
void x_RequestID2BlobChunks(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool info_blob_only)
void x_OnBlobPropNoneTSE(CCassBlobFetch *fetch_details)
void x_OnBlobPropSlimTSE(CCassBlobFetch *fetch_details, CBlobRecord const &blob)
void x_PrepareBlobPropCompletion(CCassBlobFetch *fetch_details)
virtual ~CPSGS_CassBlobBase()
vector< string > m_RequestedID2BlobChunks
void x_PrepareBlobExcluded(CCassBlobFetch *fetch_details, EPSGS_BlobSkipReason skip_reason, bool need_add_id2_chunk_id2_info)
void OnPublicCommentError(CCassPublicCommentFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void PrepareServerErrorMessage(CCassBlobFetch *fetch_details, int code, EDiagSev severity, const string &message)
CBlobRecord m_InitialBlobProps
unique_ptr< CPSGS_SatInfoChunksVerFlavorId2Info > m_Id2Info
void x_PrepareBlobMessage(CCassBlobFetch *fetch_details, const string &message, CRequestStatus::ECode status, int err_code, EDiagSev severity)
void x_BlobErrorCallback(CCassBlobFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
void x_DecideToRequestMoreChunksForSmartTSE(CCassBlobFetch *fetch_details, SCass_BlobId const &info_blob_id)
void OnGetBlobError(CCassBlobFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)
psg::CDataChunkStream m_CollectedSplitInfo
void ReportFailureToGetCassConnection(void)
CRequestStatus::ECode CountError(CCassFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message, EPSGS_LoggingFlag logging_flag, EPSGS_StatusUpdateFlag status_update_flag)
void UpdateOverallStatus(CRequestStatus::ECode status)
bool IsError(EDiagSev severity) const
void ReportSecureSatUnauthorized(const string &user_name)
list< unique_ptr< CCassFetch > > m_FetchDetails
void SignalFinishProcessing(void)
optional< string > m_UserName
@ ePSGS_AnnotationRequest
@ ePSGS_BlobBySatSatKeyRequest
@ ePSGS_BlobBySeqIdRequest
CPSGSCounters & GetCounters(void)
static CPubseqGatewayApp * GetInstance(void)
shared_ptr< CPSGS_Reply > m_Reply
shared_ptr< CPSGS_Request > m_Request
function< void(CCassBlobFetch *fetch_details, CRequestStatus::ECode status, int code, EDiagSev severity, const string &message)> TBlobErrorCB
function< void(CCassBlobFetch *fetch_details, CBlobRecord const &blob, bool is_found)> TBlobPropsCB
function< void(CCassBlobFetch *fetch_details, CBlobRecord const &blob, const unsigned char *chunk_data, unsigned int data_size, int chunk_no)> TBlobChunkCB
EDiagSev
Severity level for the posted diagnostics.
@ eDiag_Error
Error message.
@ eDiag_Warning
Warning message.
@ e500_InternalServerError
E_Choice Which(void) const
Which variant is currently selected.
@ e_not_set
No variant selected.
const int64_t kSplitInfoChunk
Defines NCBI C++ diagnostic APIs, classes, and macros.
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
#define PSG_ERROR(message)
#define PSG_WARNING(message)
@ ePSGS_BlobBySatSatKeyFetch
@ ePSGS_ID2ChunkErrorAfterFallbackRequested
@ ePSGS_ID2ChunkErrorWithFallback
@ ePSGS_SecureSatUnauthorized
@ ePSGS_NotFoundID2BlobPropWithFallback
@ ePSGS_BlobRetrievalIsNotAuthorized
@ ePSGS_CassConnectionError
psg_clock_t::time_point psg_time_point_t
unsigned long GetTimespanToNowMks(const psg_time_point_t &t_point)
Defines CRequestStatus class for NCBI C++ diagnostic API.
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
vector< int > GetBioseqChunks(const CSeq_id &seq_id, const CBlobRecord &blob, const unsigned char *data, unsigned int size, int chunk_no)
string ToString(void) const
CBioseqInfoRecord::TSat m_Sat
optional< SSatInfoEntry > m_Keyspace
optional< bool > m_IsSecureKeyspace
bool MapSatToKeyspace(void)
CBioseqInfoRecord::TSatKey m_SatKey
unsigned long m_ResendTimeoutMks
CBlobRecord::TTimestamp m_LastModified
unsigned long m_ResendTimeoutMks
unsigned long m_SendBlobIfSmall