93 const char*
data =
static_cast<const char*
>(
buffer);
182 static int error_rate =
NCBI_PARAM_TYPE(WGS_PROCESSOR, ERROR_RATE)::GetDefault();
183 if ( error_rate > 0 ) {
184 static int error_counter = 0;
185 if ( ++error_counter >= error_rate ) {
209 #define PARAM_VDB_CACHE_SIZE "vdb_cache_size"
210 #define PARAM_INDEX_UPDATE_TIME "index_update_time"
211 #define PARAM_FILE_REOPEN_TIME "file_reopen_time"
212 #define PARAM_FILE_RECHECK_TIME "file_recheck_time"
213 #define PARAM_COMPRESS_DATA "compress_data"
215 #define DEFAULT_VDB_CACHE_SIZE 100
216 #define DEFAULT_INDEX_UPDATE_TIME 600
217 #define DEFAULT_FILE_REOPEN_TIME 3600
218 #define DEFAULT_FILE_RECHECK_TIME 600
219 #define DEFAULT_COMPRESS_DATA SWGSProcessor_Config::eCompressData_some
224 m_Status(ePSGS_NotFound),
235 const shared_ptr<CWGSClient>&
client,
236 shared_ptr<ncbi::CThreadPool> thread_pool,
237 shared_ptr<CPSGS_Request> request,
238 shared_ptr<CPSGS_Reply> reply,
242 m_Status(ePSGS_InProgress),
247 m_ThreadPool(thread_pool)
282 min(3u, max_conn), max_conn)));
287 shared_ptr<CPSGS_Reply> )
const
292 return m_Client->CanProcessRequest(*request);
298 shared_ptr<CPSGS_Reply> reply,
304 if ( !
m_Client->CanProcessRequest(*request) )
return nullptr;
336 auto req_type =
GetRequest()->GetRequestType();
355 catch (exception& exc) {
356 x_SendError(
"Exception when handling a request: ", exc);
365 bool enabled = app->GetWGSProcessorsEnabled();
405 PSG_ERROR(
"Error parsing seq-id: " << (err.empty() ? resolve_request.
m_SeqId : err));
426 kWGSProcessorName +
" processor finished resolving seq-id " +
m_SeqId->AsFastaString() +
", waiting for other processors",
431 catch (exception& exc) {
476 catch (exception& exc) {
499 PSG_ERROR(
"Error parsing seq-id: " << (err.empty() ? get_request.
m_SeqId : err));
540 kWGSProcessorName +
" processor finished getting blob for seq-id " +
m_SeqId->AsFastaString() +
", waiting for other processors",
545 catch (exception& exc) {
593 catch (exception& exc) {
594 x_SendError(
"Exception when handling a request: ", exc);
634 catch (exception& exc) {
680 catch (exception& exc) {
681 x_SendError(
"Exception when handling a request: ", exc);
721 catch (exception& exc) {
777 catch (exception& exc) {
778 x_SendError(
"Exception when handling a request: ", exc);
821 if (
data.GetData_compression() ==
data.eData_compression_gzip ) {
834 GetTiming().Register(
this,
operation, status, start, blob_size);
842 size_t blob_size = 0;
843 for (
auto& chunk :
data.GetData() ) {
844 blob_size += chunk->size();
858 size_t item_id =
GetReply()->GetItemId();
859 GetReply()->PrepareBioseqData(item_id,
GetName(), data_to_send, output_format);
884 size_t item_id = reply.GetItemId();
886 reply.PrepareBlobPropData(item_id,
GetName(), psg_blob_id, data_to_send);
887 reply.PrepareBlobPropCompletion(item_id,
GetName(), 2);
894 size_t item_id = reply.GetItemId();
895 reply.PrepareBlobMessage(item_id,
GetName(),
897 "Blob retrieval is not authorized",
901 reply.PrepareBlobCompletion(item_id,
GetName(), 2);
907 size_t item_id =
GetReply()->GetItemId();
909 for (
auto& chunk :
data.GetData() ) {
911 (
const unsigned char*)chunk->data(), chunk->size(), chunk_no++);
918 const string& id2_info,
922 size_t item_id =
GetReply()->GetItemId();
924 GetReply()->PrepareTSEBlobPropData(item_id,
GetName(), chunk_id, id2_info, data_to_send);
930 const string& id2_info,
934 size_t item_id =
GetReply()->GetItemId();
936 for (
auto& chunk :
data.GetData() ) {
938 (
const unsigned char*)chunk->data(), chunk->size(), chunk_no++,
950 auto split_version =
m_WGSData->m_SplitVersion;
974 string main_blob_id =
m_WGSData->m_BlobId;
992 const string& psg_blob_id =
m_WGSData->m_BlobId;
1022 if (
m_WGSData->m_Data->GetMainObject().GetThisTypeInfo() == CID2S_Split_Info::GetTypeInfo() ) {
1037 auto split_version =
m_WGSData->m_SplitVersion;
1038 string id2_info =
GetPSGId2Info(id2_blob_id, split_version);
1056 COSSWriter writer(
data.SetData());
1147 reply->PrepareProcessorMessage(reply->GetItemId(),
"WGS",
msg,
1161 const string&
msg,
const exception& exc)
1180 s <<
'.' << blob_version <<
'.' << split_version;
1195 auto cache_result = app->GetExcludeBlobCache()->AddBlobId(
1197 completed, completed_time);
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
const string kCassandraProcessorEvent
virtual void Serialize(CObjectOStreamAsnBinary &out) const
CBlobRecord & SetNChunks(int32_t value)
CBlobRecord & SetId2Info(string const &value)
CBlobRecord & SetSuppress(bool value)
CBlobRecord & SetGzip(bool value)
CBlobRecord & SetWithdrawn(bool value)
CBlobRecord & SetDead(bool value)
CBlobRecord & SetModified(TTimestamp value)
list< TOctetString * > TOctetStringSequence
virtual ERW_Result Flush(void)
Flush pending data (if any) down to the output device.
COSSWriter(TOctetStringSequence &out)
TOctetStringSequence & m_Output
vector< char > TOctetString
virtual ERW_Result Write(const void *buffer, size_t count, size_t *written)
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
CObjectOStreamAsnBinary –.
@ ePSGS_BlobBySatSatKeyRequest
@ ePSGS_BlobBySeqIdRequest
TRequest & GetRequest(void)
void x_SendResult(const string &data_to_send, EOutputFormat output_format)
void x_SendChunkBlobData(const string &id2_info, TID2ChunkId chunk_id, const objects::CID2_Reply_Data &data)
void x_ProcessBlobBySatSatKeyRequest(void)
void OnGotBlobBySeqId(void)
void GetBlobBySeqId(void)
void x_ProcessResolveRequest(void)
void x_InitClient(void) const
static void x_SendError(shared_ptr< CPSGS_Reply > reply, const string &msg)
void x_SendBlobProps(const string &psg_blob_id, CBlobRecord &blob_props)
EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
EOutputFormat x_GetOutputFormat(void)
string GetName(void) const override
Tells the processor name (used in logging and tracing)
void Process(void) override
Main processing function.
bool x_CheckExcludedCache(void)
void x_ProcessTSEChunkRequest(void)
void x_Finish(EPSGS_Status status)
string GetGroupName(void) const override
Tells the processor group name.
void x_SendBlobData(const string &psg_blob_id, const objects::CID2_Reply_Data &data)
CRef< objects::CSeq_id > m_SeqId
unsigned long m_ResendTimeoutMks
void x_WriteData(objects::CID2_Reply_Data &data, const objects::CAsnBinData &obj, bool compress) const
void OnGotBlobByBlobId(void)
void x_UnlockRequest(void)
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Tells if processor can process the given request.
shared_ptr< CWGSClient > m_Client
void x_RegisterTimingNotFound(EPSGOperation operation)
shared_ptr< ncbi::CThreadPool > m_ThreadPool
void OnResolvedSeqId(void)
static string GetPSGId2Info(const CID2_Blob_Id &tse_id, CWGSClient::TID2SplitVersion split_version)
shared_ptr< SWGSData > m_WGSData
void GetBlobByBlobId(void)
void x_ProcessBlobBySeqIdRequest(void)
void x_SendSplitInfo(void)
unsigned long m_SentMksAgo
void x_SendForbidden(void)
~CPSGS_WGSProcessor(void) override
IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
void x_SendBlobForbidden(const string &psg_blob_id)
void Cancel(void) override
The infrastructure request to cancel processing.
void x_SetExcludedCacheCompleted(void)
void x_RegisterTiming(psg_time_point_t start, EPSGOperation operation, EPSGOperationStatus status, size_t blob_size)
void x_WaitForOtherProcessors(void)
void x_RemoveFromExcludedCache(void)
bool m_AddedToExcludedCache
bool x_SignalStartProcessing()
void x_SendChunkBlobProps(const string &id2_info, TID2ChunkId chunk_id, CBlobRecord &blob_props)
void x_SendMainEntry(void)
void x_SendBioseqInfo(void)
shared_ptr< SWGSProcessor_Config > m_Config
EOutputFormat m_OutputFormat
void x_RegisterTimingFound(psg_time_point_t start, EPSGOperation operation, const objects::CID2_Reply_Data &data)
bool x_IsEnabled(CPSGS_Request &request) const
static CPubseqGatewayApp * GetInstance(void)
Abstract class for representing single task executing in pool of threads To use this class in applica...
Main class implementing functionality of pool of threads.
static bool IsOSGBlob(const CID2_Blob_Id &blob_id)
virtual EStatus Execute(void) override
Do the actual job.
CPSGS_WGSProcessor & m_Processor
CWGSThreadPoolTask_GetBlobByBlobId(CPSGS_WGSProcessor &processor)
CWGSThreadPoolTask_GetBlobBySeqId(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_GetChunk(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_ResolveSeqId(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
Writer-based output stream.
CZipStreamCompressor – zlib based compression stream processor.
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > GetReply(void) const
Provides the reply wrapper.
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
void PostponeInvoke(CPSGS_UvLoopBinder::TProcessorCB cb, void *user_data)
The provided callback will be called from the libuv loop assigned to the processor.
void SignalFinishProcessing(void)
A processor should call this method when it decides that there is nothing else to be done.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
EPSGS_SeqIdParsingResult ParseInputSeqId(objects::CSeq_id &seq_id, const string &request_seq_id, int request_seq_id_type, string *err_msg=nullptr)
Parse seq-id from a string and type representation.
TProcessorPriority m_Priority
A very basic data-write interface.
std::ofstream out("events_result.xml")
main entry point for tests
static const char * str(char *buf, int n)
@ eNoOwnership
No ownership is assumed.
@ eDiag_Error
Error message.
@ e500_InternalServerError
void Reset(void)
Reset reference object.
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
virtual int GetInt(const string §ion, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
ERW_Result
Result codes for I/O operations.
@ eRW_Success
Everything is okay, I/O completed.
static bool EqualNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive equality of a substring with another string.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
EStatus
Status of the task.
@ eCompleted
executed successfully
#define DEFINE_STATIC_FAST_MUTEX(id)
Define static fast mutex and initialize it.
TVersion GetVersion(void) const
Get the Version member data.
TSat_key GetSat_key(void) const
Get the Sat_key member data.
bool IsSetVersion(void) const
version of blob, optional in some requests Check if a value has been assigned to Version data member.
TSub_sat GetSub_sat(void) const
Get the Sub_sat member data.
TSat GetSat(void) const
Get the Sat member data.
@ eData_format_asn_binary
@ eID2_Blob_State_suppressed
@ eID2_Blob_State_suppressed_temp
@ eID2_Blob_State_withdrawn
const int64_t kSplitInfoChunk
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
string ToBioseqProtobuf(const CBioseqInfoRecord &bioseq_info)
#define PSG_ERROR(message)
chrono::steady_clock psg_clock_t
@ ePSGS_BlobRetrievalIsNotAuthorized
psg_clock_t::time_point psg_time_point_t
unsigned long GetTimespanToNowMks(const psg_time_point_t &t_point)
Reader-writer based streams.
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static CNamedPipeClient * client
vector< string > m_ExcludeBlobs
unsigned long m_ResendTimeoutMks
EPSGS_TSEOption m_TSEOption
vector< string > m_DisabledProcessors
vector< string > m_EnabledProcessors
EPSGS_OutputFormat m_OutputFormat
Pool of generic task-executing threads.
#define DEFAULT_INDEX_UPDATE_TIME
#define DEFAULT_COMPRESS_DATA
#define DEFAULT_FILE_REOPEN_TIME
static bool s_SimulateError()
void s_SetBlobState(CBlobRecord &blob_props, int id2_blob_state)
#define PARAM_FILE_RECHECK_TIME
static void s_OnGotBlobBySeqId(void *data)
NCBI_PARAM_DEF(int, WGS_PROCESSOR, ERROR_RATE, 0)
#define DEFAULT_VDB_CACHE_SIZE
static const string kWGSProcessorSection
#define DEFAULT_FILE_RECHECK_TIME
#define PARAM_COMPRESS_DATA
#define PARAM_FILE_REOPEN_TIME
NCBI_PARAM_DECL(int, WGS_PROCESSOR, ERROR_RATE)
static void s_OnResolvedSeqId(void *data)
static const string kParamMaxConn
static const string kWGSProcessorGroupName
static const string kWGSProcessorName
static const int kDefaultMaxConn
#define PARAM_INDEX_UPDATE_TIME
static const char kSubSatSeparator
static void s_OnGotBlobByBlobId(void *data)
static void s_OnGotChunk(void *data)
#define PARAM_VDB_CACHE_SIZE
void s_SetBlobVersion(CBlobRecord &blob_props, const CID2_Blob_Id &blob_id)
void s_SetBlobDataProps(CBlobRecord &blob_props, const CID2_Reply_Data &data)
static void s_FormatBlobId(ostream &s, const CID2_Blob_Id &blob_id)
const string kWGSProcessorEvent