133 static int error_rate =
NCBI_PARAM_TYPE(CDD_PROCESSOR, ERROR_RATE)::GetDefault();
134 if ( error_rate > 0 ) {
135 static int error_counter = 0;
136 if ( ++error_counter >= error_rate ) {
147 m_Status(ePSGS_NotFound),
165 min(3u, max_conn), max_conn)));
169 shared_ptr<CCDDClientPool> client_pool,
170 shared_ptr<CThreadPool> thread_pool,
171 shared_ptr<CPSGS_Request> request,
172 shared_ptr<CPSGS_Reply> reply,
174 : m_ClientPool(client_pool),
176 m_Status(ePSGS_InProgress),
179 m_ThreadPool(thread_pool)
220 shared_ptr<CPSGS_Reply> reply)
const
223 vector<string> can_process;
227 for (
auto& name : annot_request.
m_Names ) {
229 can_process.push_back(name);
237 catch ( exception& exc ) {
238 x_SendError(reply,
"Exception in WhatCanProcess: ", exc);
245 shared_ptr<CPSGS_Reply> reply)
const
248 auto req_type = request->GetRequestType();
269 catch ( exception& exc ) {
270 x_SendError(reply,
"Exception in CanProcess: ", exc);
278 shared_ptr<CPSGS_Reply> reply,
282 auto req_type = request->GetRequestType();
303 catch ( exception& exc ) {
304 x_SendError(reply,
"Exception in CreateProcessor: ", exc);
339 const string&
msg,
const exception& exc)
370 auto req_type =
GetRequest()->GetRequestType();
383 catch (exception& exc) {
384 x_SendError(
"Exception when handling a request: ", exc);
413 for (
auto&
id : annot_request.
m_SeqIds) {
419 if (annot_request.
m_TSEOption == SPSGS_BlobRequestBase::EPSGS_TSEOption::ePSGS_SmartTSE ||
420 annot_request.
m_TSEOption == SPSGS_BlobRequestBase::EPSGS_TSEOption::ePSGS_WholeTSE ||
421 annot_request.
m_TSEOption == SPSGS_BlobRequestBase::EPSGS_TSEOption::ePSGS_OrigTSE) {
459 kCDDProcessorName +
" processor trying to get blob-id by seq-id " +
id.AsString(),
465 catch (exception& exc) {
468 kCDDProcessorName +
" processor failed to get blob-id by seq-id, exception: " + exc.what(),
471 m_Error =
"Exception when handling get_na request: " +
string(exc.what());
494 catch (exception& exc) {
497 kCDDProcessorName +
" processor failed to get blob by seq-id, exception: " + exc.what(),
500 m_Error =
"Exception when handling get_na request: " +
string(exc.what());
521 catch (exception& exc) {
524 kCDDProcessorName +
" processor failed to get blob by blob-id, exception: " + exc.what(),
527 m_Error =
"Exception when handling getblob request: " +
string(exc.what());
543 m_Error =
"simulated CDD processor error";
571 catch (exception& exc) {
572 m_Error =
"Exception when sending get_na reply: " +
string(exc.what());
590 m_Error =
"simulated CDD processor error";
619 catch (exception& exc) {
620 m_Error =
"Exception when sending get_na reply: " +
string(exc.what());
638 m_Error =
"simulated CDD processor error";
664 catch (exception& exc) {
665 m_Error =
"Exception when sending getblob reply: " +
string(exc.what());
696 kCDDProcessorName +
" processor stops sending annot-info because a higher priority processor has already sent it",
717 annot_info->SetFeat().push_back(feat_info);
721 annot_info->SetFeat().push_back(feat_info);
724 if ( annot_id.
IsGi() ) {
725 annot_info->SetSeq_loc().SetWhole_gi(annot_id.
GetGi());
728 annot_info->SetSeq_loc().SetWhole_seq_id().Assign(annot_id);
731 ostringstream annot_str;
746 ostringstream blob_str;
748 string blob_data = blob_str.str();
757 size_t item_id =
GetReply()->GetItemId();
770 (
const unsigned char*)blob_data.data(), blob_data.size(), 0);
823 kCDDProcessorName +
" processor stops processing request because a higher priority processor has already processed it",
844 if (!
id.IsGi() && !
id.GetTextseq_Id())
return false;
847 catch (exception& e) {
857 for (
const auto&
id: annot_request.
m_SeqIds) {
881 return blob_id && blob_id->GetSat() ==
kCDDSat;
887 for (
auto& name :
names ) {
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
static bool s_SimulateError()
static void s_OnGotBlobBySeqId(void *data)
static void s_OnGotBlobId(void *data)
static const string kCDDProcessorSection
static const string kCDDProcessorName
NCBI_PARAM_DEF(int, CDD_PROCESSOR, ERROR_RATE, 0)
NCBI_PARAM_DECL(int, CDD_PROCESSOR, ERROR_RATE)
static const string kCDDAnnotName
const CID2_Blob_Id::TSat kCDDSat
static const string kParamMaxConn
static const int kDefaultMaxConn
static void s_OnGotBlobByBlobId(void *data)
static const string kCDDProcessorGroupName
static const double kDefaultCDDBackendTimeout
static const string kParamCDDBackendTimeout
const string kCDDProcessorEvent
CBlobRecord & SetNChunks(int32_t value)
CBlobRecord & SetModified(TTimestamp value)
static CRef< TBlobId > StringToBlobId(const string &s)
static string BlobIdToString(const TBlobId &blob_id)
CCDDThreadPoolTask_GetBlobByBlobId(CPSGS_CDDProcessor &processor)
virtual EStatus Execute(void) override
Do the actual job.
CPSGS_CDDProcessor & m_Processor
CPSGS_CDDProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
CCDDThreadPoolTask_GetBlobBySeqId(CPSGS_CDDProcessor &processor)
virtual EStatus Execute(void) override
Do the actual job.
CCDDThreadPoolTask_GetBlobId(CPSGS_CDDProcessor &processor)
CPSGS_CDDProcessor & m_Processor
CCDD_Reply_Get_Blob_Id –.
string Repr(TReprFlags flags=0) const
Return a string representation of this node.
void SetString(const string &key, const string &value)
Set a JSON object element to the specified string value.
void SetInteger(const string &key, Int8 value)
Set a JSON object element to the specified integer value.
static CJsonNode NewObjectNode()
Create a new JSON object node.
void x_UnlockRequest(void)
void x_SendAnnotInfo(const objects::CCDD_Reply_Get_Blob_Id &blob_info)
vector< objects::CSeq_id_Handle > m_SeqIds
void x_SendAnnot(const objects::CID2_Blob_Id &id2_blob_id, CRef< objects::CSeq_annot > &annot)
bool x_CanProcessAnnotRequestIds(SPSGS_AnnotRequest &annot_request) const
vector< string > WhatCanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Needs to be implemented only for the ID/get_na requests.
bool x_SignalStartProcessing()
void GetBlobByBlobId(void)
void OnGotBlobByBlobId(void)
void x_Finish(EPSGS_Status status)
objects::CCDDClientPool::SCDDBlob m_CDDBlob
void x_ProcessGetBlobRequest(void)
string GetGroupName(void) const override
Tells the processor group name.
bool x_CanProcessSeq_id(const string &psg_id) const
void x_RegisterTimingNotFound(EPSGOperation operation)
void Process(void) override
Main processing function.
~CPSGS_CDDProcessor(void) override
void GetBlobBySeqId(void)
bool x_NameIncluded(const vector< string > &names) const
IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
bool x_IsEnabled(CPSGS_Request &request) const
void Cancel(void) override
The infrastructure request to cancel processing.
bool x_CanProcessAnnotRequest(SPSGS_AnnotRequest &annot_request, TProcessorPriority priority) const
void x_ReportResultStatus(SPSGS_AnnotRequest::EPSGS_ResultStatus status)
void OnGotBlobBySeqId(void)
string GetName(void) const override
Tells the processor name (used in logging and tracing)
bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Tells if processor can process the given request.
EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
shared_ptr< ncbi::CThreadPool > m_ThreadPool
CRef< objects::CCDDClientPool::TBlobId > m_BlobId
static void x_SendError(shared_ptr< CPSGS_Reply > reply, const string &msg)
void x_RegisterTiming(EPSGOperation operation, EPSGOperationStatus status, size_t size)
bool x_CanProcessBlobRequest(SPSGS_BlobBySatSatKeyRequest &blob_request) const
shared_ptr< objects::CCDDClientPool > m_ClientPool
void x_ProcessResolveRequest(void)
@ ePSGS_AnnotationRequest
@ ePSGS_BlobBySatSatKeyRequest
TRequest & GetRequest(void)
bool GetCDDProcessorsEnabled() const
static CPubseqGatewayApp * GetInstance(void)
Abstract class for representing single task executing in pool of threads To use this class in applica...
Main class implementing functionality of pool of threads.
CTimeout – Timeout interval.
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > GetReply(void) const
Provides the reply wrapper.
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
shared_ptr< CPSGS_Reply > m_Reply
TProcessorPriority GetPriority(void) const
Provides the processor priority.
EPSGS_Status
The GetStatus() method returns a processor current status.
void PostponeInvoke(CPSGS_UvLoopBinder::TProcessorCB cb, void *user_data)
The provided callback will be called from the libuv loop assigned to the processor.
void SignalFinishProcessing(void)
A processor should call this method when it decides that there is nothing else to be done.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
TProcessorPriority m_Priority
static const struct name_t names[]
@ eDiag_Error
Error message.
@ e500_InternalServerError
#define MSerial_AsnBinary
static CSeq_id_Handle GetHandle(const CSeq_id &id)
Normal way of getting a handle, works for any seq-id.
void Reset(void)
Reset reference object.
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
virtual int GetInt(const string §ion, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
virtual double GetDouble(const string §ion, const string &name, double default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get double value of specified parameter name.
static string Base64Encode(const CTempString str, size_t line_len=0)
Base64-encode string.
static bool EqualNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive equality of a substring with another string.
EStatus
Status of the task.
@ eCompleted
executed successfully
@ eDefault
Default timeout (to be interpreted by the client code)
const TBlob_id & GetBlob_id(void) const
Get the Blob_id member data.
const TSeq_id & GetSeq_id(void) const
Get the Seq_id member data.
TVersion GetVersion(void) const
Get the Version member data.
bool IsSetVersion(void) const
version of blob, optional in some requests Check if a value has been assigned to Version data member.
@ e_Region
named region (globin locus)
TGi GetGi(void) const
Get the variant data.
bool IsGi(void) const
Check if variant Gi is selected.
TSet & SetSet(void)
Select the variant.
TSeq_set & SetSeq_set(void)
Assign a value to Seq_set data member.
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
chrono::steady_clock psg_clock_t
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
TProcessorPriority RegisterProcessedName(TProcessorPriority priority, const string &name)
vector< string > GetNotProcessedName(TProcessorPriority priority)
vector< string > m_SeqIds
void ReportResultStatus(const string &annot_name, EPSGS_ResultStatus rs)
EPSGS_TSEOption m_TSEOption
vector< string > m_DisabledProcessors
vector< string > m_EnabledProcessors
Pool of generic task-executing threads.