96 const char* data =
static_cast<const char*
>(
buffer);
185 static int error_rate =
NCBI_PARAM_TYPE(WGS_PROCESSOR, ERROR_RATE)::GetDefault();
186 if ( error_rate > 0 ) {
187 static int error_counter = 0;
188 if ( ++error_counter >= error_rate ) {
202 #define PARAM_VDB_CACHE_SIZE "vdb_cache_size"
203 #define PARAM_INDEX_UPDATE_TIME "index_update_time"
204 #define PARAM_FILE_REOPEN_TIME "file_reopen_time"
205 #define PARAM_FILE_RECHECK_TIME "file_recheck_time"
206 #define PARAM_COMPRESS_DATA "compress_data"
208 #define DEFAULT_VDB_CACHE_SIZE 100
209 #define DEFAULT_INDEX_UPDATE_TIME 600
210 #define DEFAULT_FILE_REOPEN_TIME 3600
211 #define DEFAULT_FILE_RECHECK_TIME 600
212 #define DEFAULT_COMPRESS_DATA SWGSProcessor_Config::eCompressData_some
217 m_Status(ePSGS_NotFound),
228 const shared_ptr<CWGSClient>&
client,
229 shared_ptr<ncbi::CThreadPool> thread_pool,
230 shared_ptr<CPSGS_Request> request,
231 shared_ptr<CPSGS_Reply> reply,
235 m_Status(ePSGS_InProgress),
240 m_ThreadPool(thread_pool)
275 min(3u, max_conn), max_conn)));
280 shared_ptr<CPSGS_Reply> )
const
285 return m_Client->CanProcessRequest(*request);
291 shared_ptr<CPSGS_Reply> reply,
297 if ( !
m_Client->CanProcessRequest(*request) )
return nullptr;
329 auto req_type =
GetRequest()->GetRequestType();
348 catch (exception& exc) {
349 x_SendError(
"Exception when handling a request: ", exc);
358 bool enabled = app->GetWGSProcessorsEnabled();
398 PSG_ERROR(
"Error parsing seq-id: " << (err.empty() ? resolve_request.
m_SeqId : err));
414 catch (exception& exc) {
449 catch (exception& exc) {
471 PSG_ERROR(
"Error parsing seq-id: " << (err.empty() ? get_request.
m_SeqId : err));
494 catch (exception& exc) {
532 catch (exception& exc) {
533 x_SendError(
"Exception when handling a request: ", exc);
562 catch (exception& exc) {
598 catch (exception& exc) {
599 x_SendError(
"Exception when handling a request: ", exc);
629 catch (exception& exc) {
670 catch (exception& exc) {
671 x_SendError(
"Exception when handling a request: ", exc);
727 GetTiming().Register(
this,
operation, status, start, blob_size);
735 size_t blob_size = 0;
736 for (
auto& chunk : data.
GetData() ) {
737 blob_size += chunk->size();
751 size_t item_id =
GetReply()->GetItemId();
752 GetReply()->PrepareBioseqData(item_id,
GetName(), data_to_send, output_format);
777 size_t item_id = reply.GetItemId();
779 reply.PrepareBlobPropData(item_id,
GetName(), psg_blob_id, data_to_send);
780 reply.PrepareBlobPropCompletion(item_id,
GetName(), 2);
787 size_t item_id = reply.GetItemId();
788 reply.PrepareBlobMessage(item_id,
GetName(),
790 "Blob retrieval is not authorized",
794 reply.PrepareBlobCompletion(item_id,
GetName(), 2);
800 size_t item_id =
GetReply()->GetItemId();
802 for (
auto& chunk : data.
GetData() ) {
804 (
const unsigned char*)chunk->data(), chunk->size(), chunk_no++);
811 const string& id2_info,
815 size_t item_id =
GetReply()->GetItemId();
817 GetReply()->PrepareTSEBlobPropData(item_id,
GetName(), chunk_id, id2_info, data_to_send);
823 const string& id2_info,
827 size_t item_id =
GetReply()->GetItemId();
829 for (
auto& chunk : data.
GetData() ) {
831 (
const unsigned char*)chunk->data(), chunk->size(), chunk_no++,
843 auto split_version =
m_WGSData->m_SplitVersion;
845 string id2_info = osg::CPSGS_OSGGetBlobBase::GetPSGId2Info(id2_blob_id, split_version);
867 string main_blob_id =
m_WGSData->m_BlobId;
884 size_t item_id =
GetReply()->GetItemId();
892 const string& psg_blob_id =
m_WGSData->m_BlobId;
912 if (
m_WGSData->m_Data->GetMainObject().GetThisTypeInfo() == CID2S_Split_Info::GetTypeInfo() ) {
926 auto split_version =
m_WGSData->m_SplitVersion;
927 string id2_info = osg::CPSGS_OSGGetBlobBase::GetPSGId2Info(id2_blob_id, split_version);
945 COSSWriter writer(data.
SetData());
1032 reply->PrepareProcessorMessage(reply->GetItemId(),
"WGS", msg,
1046 const string& msg,
const exception& exc)
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
const string kCassandraProcessorEvent
virtual void Serialize(CObjectOStreamAsnBinary &out) const
CBlobRecord & SetNChunks(int32_t value)
CBlobRecord & SetId2Info(string const &value)
CBlobRecord & SetSuppress(bool value)
CBlobRecord & SetGzip(bool value)
CBlobRecord & SetWithdrawn(bool value)
CBlobRecord & SetDead(bool value)
CBlobRecord & SetModified(TTimestamp value)
list< TOctetString * > TOctetStringSequence
virtual ERW_Result Flush(void)
Flush pending data (if any) down to the output device.
COSSWriter(TOctetStringSequence &out)
TOctetStringSequence & m_Output
vector< char > TOctetString
virtual ERW_Result Write(const void *buffer, size_t count, size_t *written)
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
CObjectOStreamAsnBinary –.
@ ePSGS_BlobBySatSatKeyRequest
@ ePSGS_BlobBySeqIdRequest
TRequest & GetRequest(void)
void x_SendResult(const string &data_to_send, EOutputFormat output_format)
void x_SendChunkBlobData(const string &id2_info, TID2ChunkId chunk_id, const objects::CID2_Reply_Data &data)
void x_ProcessBlobBySatSatKeyRequest(void)
void OnGotBlobBySeqId(void)
void GetBlobBySeqId(void)
void x_ProcessResolveRequest(void)
void x_InitClient(void) const
static void x_SendError(shared_ptr< CPSGS_Reply > reply, const string &msg)
void x_SendBlobProps(const string &psg_blob_id, CBlobRecord &blob_props)
EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
EOutputFormat x_GetOutputFormat(void)
string GetName(void) const override
Tells the processor name (used in logging and tracing)
void Process(void) override
Main processing function.
void x_ProcessTSEChunkRequest(void)
void x_Finish(EPSGS_Status status)
string GetGroupName(void) const override
Tells the processor group name.
void x_SendBlobData(const string &psg_blob_id, const objects::CID2_Reply_Data &data)
CRef< objects::CSeq_id > m_SeqId
void x_WriteData(objects::CID2_Reply_Data &data, const objects::CAsnBinData &obj, bool compress) const
void OnGotBlobByBlobId(void)
void x_UnlockRequest(void)
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Tells if processor can process the given request.
shared_ptr< CWGSClient > m_Client
void x_RegisterTimingNotFound(EPSGOperation operation)
shared_ptr< ncbi::CThreadPool > m_ThreadPool
void OnResolvedSeqId(void)
shared_ptr< SWGSData > m_WGSData
void GetBlobByBlobId(void)
void x_ProcessBlobBySeqIdRequest(void)
void x_SendSplitInfo(void)
void x_SendForbidden(void)
~CPSGS_WGSProcessor(void) override
IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
void x_SendBlobForbidden(const string &psg_blob_id)
void Cancel(void) override
The infrastructure request to cancel processing.
void x_SendExcluded(void)
void x_RegisterTiming(psg_time_point_t start, EPSGOperation operation, EPSGOperationStatus status, size_t blob_size)
void x_WaitForOtherProcessors(void)
bool x_SignalStartProcessing()
void x_SendChunkBlobProps(const string &id2_info, TID2ChunkId chunk_id, CBlobRecord &blob_props)
void x_SendMainEntry(void)
void x_SendBioseqInfo(void)
shared_ptr< SWGSProcessor_Config > m_Config
EOutputFormat m_OutputFormat
void x_RegisterTimingFound(psg_time_point_t start, EPSGOperation operation, const objects::CID2_Reply_Data &data)
bool x_IsEnabled(CPSGS_Request &request) const
static CPubseqGatewayApp * GetInstance(void)
Abstract class for representing single task executing in pool of threads To use this class in applica...
Main class implementing functionality of pool of threads.
virtual EStatus Execute(void) override
Do the actual job.
CPSGS_WGSProcessor & m_Processor
CWGSThreadPoolTask_GetBlobByBlobId(CPSGS_WGSProcessor &processor)
CWGSThreadPoolTask_GetBlobBySeqId(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_GetChunk(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_ResolveSeqId(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
Writer-based output stream.
CZipStreamCompressor – zlib based compression stream processor.
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > GetReply(void) const
Provides the reply wrapper.
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
void PostponeInvoke(CPSGS_UvLoopBinder::TProcessorCB cb, void *user_data)
The provided callback will be called from the libuv loop assigned to the processor.
void SignalFinishProcessing(void)
A processor should call this method when it decides that there is nothing else to be done.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
EPSGS_SeqIdParsingResult ParseInputSeqId(objects::CSeq_id &seq_id, const string &request_seq_id, int request_seq_id_type, string *err_msg=nullptr)
Parse seq-id from a string and type representation.
TProcessorPriority m_Priority
A very basic data-write interface.
std::ofstream out("events_result.xml")
main entry point for tests
@ eNoOwnership
No ownership is assumed.
@ eDiag_Error
Error message.
@ e500_InternalServerError
void Reset(void)
Reset reference object.
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
virtual int GetInt(const string §ion, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
ERW_Result
Result codes for I/O operations.
@ eRW_Success
Everything is okay, I/O completed.
static bool EqualNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive equality of a substring with another string.
EStatus
Status of the task.
@ eCompleted
executed successfully
#define DEFINE_STATIC_FAST_MUTEX(id)
Define static fast mutex and initialize it.
TVersion GetVersion(void) const
Get the Version member data.
TData_compression GetData_compression(void) const
Get the Data_compression member data.
const TData & GetData(void) const
Get the Data member data.
void SetData_compression(TData_compression value)
Assign a value to Data_compression data member.
bool IsSetVersion(void) const
version of blob, optional in some requests Check if a value has been assigned to Version data member.
TData & SetData(void)
Assign a value to Data data member.
void SetData_format(TData_format value)
Assign a value to Data_format data member.
@ eData_format_asn_binary
@ eID2_Blob_State_suppressed
@ eID2_Blob_State_suppressed_temp
@ eID2_Blob_State_withdrawn
const int64_t kSplitInfoChunk
static pcre_uint8 * buffer
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
string ToBioseqProtobuf(const CBioseqInfoRecord &bioseq_info)
#define PSG_ERROR(message)
chrono::steady_clock psg_clock_t
@ ePSGS_BlobRetrievalIsNotAuthorized
psg_clock_t::time_point psg_time_point_t
Reader-writer based streams.
static CNamedPipeClient * client
static const char * str(char *buf, int n)
vector< string > m_ExcludeBlobs
EPSGS_TSEOption m_TSEOption
vector< string > m_DisabledProcessors
vector< string > m_EnabledProcessors
EPSGS_OutputFormat m_OutputFormat
Pool of generic task-executing threads.
#define DEFAULT_INDEX_UPDATE_TIME
#define DEFAULT_COMPRESS_DATA
#define DEFAULT_FILE_REOPEN_TIME
static bool s_SimulateError()
void s_SetBlobState(CBlobRecord &blob_props, int id2_blob_state)
#define PARAM_FILE_RECHECK_TIME
static void s_OnGotBlobBySeqId(void *data)
NCBI_PARAM_DEF(int, WGS_PROCESSOR, ERROR_RATE, 0)
#define DEFAULT_VDB_CACHE_SIZE
static const string kWGSProcessorSection
#define DEFAULT_FILE_RECHECK_TIME
#define PARAM_COMPRESS_DATA
#define PARAM_FILE_REOPEN_TIME
NCBI_PARAM_DECL(int, WGS_PROCESSOR, ERROR_RATE)
static void s_OnResolvedSeqId(void *data)
static const string kParamMaxConn
static const string kWGSProcessorGroupName
static const string kWGSProcessorName
static const int kDefaultMaxConn
#define PARAM_INDEX_UPDATE_TIME
static void s_OnGotBlobByBlobId(void *data)
static void s_OnGotChunk(void *data)
#define PARAM_VDB_CACHE_SIZE
void s_SetBlobVersion(CBlobRecord &blob_props, const CID2_Blob_Id &blob_id)
void s_SetBlobDataProps(CBlobRecord &blob_props, const CID2_Reply_Data &data)
const string kWGSProcessorEvent