NCBI C++ ToolKit
wgs_processor.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: wgs_processor.cpp 100818 2023-09-14 17:33:33Z vasilche $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Aleksey Grichenko, Eugene Vasilchenko
27  *
28  * File Description: processor for data from WGS
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "wgs_processor.hpp"
35 #include "wgs_client.hpp"
36 #include "pubseq_gateway.hpp"
40 #include "osg_getblob_base.hpp"
41 #include "osg_resolve_base.hpp"
42 #include "id2info.hpp"
43 #include "cass_processor_base.hpp"
44 #include "psgs_thread_pool.hpp"
47 #include <corelib/rwstream.hpp>
48 #include <serial/serial.hpp>
49 #include <serial/objostrasnb.hpp>
50 #include <util/compress/zlib.hpp>
51 #include <util/thread_pool.hpp>
59 
60 
64 
66 
67 static const string kWGSProcessorName = "WGS";
68 static const string kWGSProcessorGroupName = "WGS";
69 static const string kWGSProcessorSection = "WGS_PROCESSOR";
70 
71 static const string kParamMaxConn = "maxconn";
72 static const int kDefaultMaxConn = 64;
73 
74 
75 /////////////////////////////////////////////////////////////////////////////
76 // Helper classes
77 /////////////////////////////////////////////////////////////////////////////
78 
80 
81 class COSSWriter : public IWriter
82 {
83 public:
84  typedef vector<char> TOctetString;
85  typedef list<TOctetString*> TOctetStringSequence;
86 
88  : m_Output(out)
89  {
90  }
91 
92  virtual ERW_Result Write(const void* buffer,
93  size_t count,
94  size_t* written)
95  {
96  const char* data = static_cast<const char*>(buffer);
97  m_Output.push_back(new TOctetString(data, data+count));
98  if ( written ) {
99  *written = count;
100  }
101  return eRW_Success;
102  }
103  virtual ERW_Result Flush(void)
104  {
105  return eRW_Success;
106  }
107 
108 private:
110 };
111 
112 
114 {
115 public:
117 
118  virtual EStatus Execute(void) override
119  {
121  return eCompleted;
122  }
123 
124 private:
126 };
127 
128 
130 {
131 public:
133 
134  virtual EStatus Execute(void) override
135  {
137  return eCompleted;
138  }
139 
140 private:
142 };
143 
144 
146 {
147 public:
149 
150  virtual EStatus Execute(void) override
151  {
153  return eCompleted;
154  }
155 
156 private:
158 };
159 
160 
162 {
163 public:
165 
166  virtual EStatus Execute(void) override
167  {
169  return eCompleted;
170  }
171 
172 private:
174 };
175 
176 
178 
179 
180 NCBI_PARAM_DECL(int, WGS_PROCESSOR, ERROR_RATE);
181 NCBI_PARAM_DEF(int, WGS_PROCESSOR, ERROR_RATE, 0);
182 
183 static bool s_SimulateError()
184 {
185  static int error_rate = NCBI_PARAM_TYPE(WGS_PROCESSOR, ERROR_RATE)::GetDefault();
186  if ( error_rate > 0 ) {
187  static int error_counter = 0;
188  if ( ++error_counter >= error_rate ) {
189  error_counter = 0;
190  return true;
191  }
192  }
193  return false;
194 }
195 
196 
197 /////////////////////////////////////////////////////////////////////////////
198 // CPSGS_WGSProcessor
199 /////////////////////////////////////////////////////////////////////////////
200 
201 
202 #define PARAM_VDB_CACHE_SIZE "vdb_cache_size"
203 #define PARAM_INDEX_UPDATE_TIME "index_update_time"
204 #define PARAM_FILE_REOPEN_TIME "file_reopen_time"
205 #define PARAM_FILE_RECHECK_TIME "file_recheck_time"
206 #define PARAM_COMPRESS_DATA "compress_data"
207 
208 #define DEFAULT_VDB_CACHE_SIZE 100
209 #define DEFAULT_INDEX_UPDATE_TIME 600
210 #define DEFAULT_FILE_REOPEN_TIME 3600
211 #define DEFAULT_FILE_RECHECK_TIME 600
212 #define DEFAULT_COMPRESS_DATA SWGSProcessor_Config::eCompressData_some
213 
214 
216  : m_Config(new SWGSProcessor_Config),
217  m_Status(ePSGS_NotFound),
218  m_Canceled(false),
219  m_ChunkId(0),
220  m_OutputFormat(SPSGS_ResolveRequest::ePSGS_NativeFormat),
221  m_Unlocked(true)
222 {
223  x_LoadConfig();
224 }
225 
226 
228  const shared_ptr<CWGSClient>& client,
229  shared_ptr<ncbi::CThreadPool> thread_pool,
230  shared_ptr<CPSGS_Request> request,
231  shared_ptr<CPSGS_Reply> reply,
232  TProcessorPriority priority)
233  : m_Client(client),
234  m_Start(psg_clock_t::now()),
235  m_Status(ePSGS_InProgress),
236  m_Canceled(false),
237  m_ChunkId(0),
238  m_OutputFormat(SPSGS_ResolveRequest::ePSGS_NativeFormat),
239  m_Unlocked(true),
240  m_ThreadPool(thread_pool)
241 {
242  m_Request = request;
243  m_Reply = reply;
244  m_Priority = priority;
245 }
246 
247 
249 {
251  x_UnlockRequest();
252 }
253 
254 
256 {
257  const CNcbiRegistry& registry = CPubseqGatewayApp::GetInstance()->GetConfig();
259 
261  if ( compress_data >= SWGSProcessor_Config::eCompressData_never &&
262  compress_data <= SWGSProcessor_Config::eCompressData_always ) {
263  m_Config->m_CompressData = SWGSProcessor_Config::ECompressData(compress_data);
264  }
268 
270  if (max_conn == 0) {
271  max_conn = kDefaultMaxConn;
272  }
275  min(3u, max_conn), max_conn)));
276 }
277 
278 
279 bool CPSGS_WGSProcessor::CanProcess(shared_ptr<CPSGS_Request> request,
280  shared_ptr<CPSGS_Reply> /*reply*/) const
281 {
282  if ( !x_IsEnabled(*request) ) return false;
283  x_InitClient();
284  _ASSERT(m_Client);
285  return m_Client->CanProcessRequest(*request);
286 }
287 
288 
290 CPSGS_WGSProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
291  shared_ptr<CPSGS_Reply> reply,
292  TProcessorPriority priority) const
293 {
294  if ( !x_IsEnabled(*request) ) return nullptr;
295  x_InitClient();
296  _ASSERT(m_Client);
297  if ( !m_Client->CanProcessRequest(*request) ) return nullptr;
298 
299  return new CPSGS_WGSProcessor(m_Client, m_ThreadPool, request, reply, priority);
300 }
301 
302 
304 {
305  return kWGSProcessorName;
306 }
307 
308 
310 {
311  return kWGSProcessorGroupName;
312 }
313 
314 
316 {
317  CRequestContextResetter context_resetter;
318  GetRequest()->SetRequestContext();
319  if ( x_IsCanceled() ) {
320  return;
321  }
322 
323  try {
324  {
325  CFastMutexGuard guard(m_Mutex);
326  m_Unlocked = false;
327  }
329  auto req_type = GetRequest()->GetRequestType();
330  switch (req_type) {
333  break;
336  break;
339  break;
342  break;
343  default:
345  break;
346  }
347  }
348  catch (exception& exc) {
349  x_SendError("Exception when handling a request: ", exc);
351  }
352 }
353 
354 
356 {
357  auto app = CPubseqGatewayApp::GetInstance();
358  bool enabled = app->GetWGSProcessorsEnabled();
359  if ( enabled ) {
360  for (const auto& name : request.GetRequest<SPSGS_RequestBase>().m_DisabledProcessors ) {
361  if ( NStr::EqualNocase(name, kWGSProcessorName) ) return false;
362  }
363  }
364  else {
365  for (const auto& name : request.GetRequest<SPSGS_RequestBase>().m_EnabledProcessors ) {
366  if ( NStr::EqualNocase(name, kWGSProcessorName) ) return true;
367  }
368  }
369  return enabled;
370 }
371 
372 
373 inline
375 {
376  DEFINE_STATIC_FAST_MUTEX(s_ClientMutex);
377  if ( m_Client ) return;
378  CFastMutexGuard guard(s_ClientMutex);
379  if ( !m_Client ) {
380  m_Client = make_shared<CWGSClient>(*m_Config);
381  }
382 }
383 
384 
385 static void s_OnResolvedSeqId(void* data)
386 {
387  static_cast<CPSGS_WGSProcessor*>(data)->OnResolvedSeqId();
388 }
389 
390 
392 {
393  SPSGS_ResolveRequest& resolve_request = GetRequest()->GetRequest<SPSGS_ResolveRequest>();
394  m_OutputFormat = resolve_request.m_OutputFormat;
395  m_SeqId.Reset(new CSeq_id());
396  string err;
397  if (ParseInputSeqId(*m_SeqId, resolve_request.m_SeqId, resolve_request.m_SeqIdType, &err) != ePSGS_ParsedOK) {
398  PSG_ERROR("Error parsing seq-id: " << (err.empty() ? resolve_request.m_SeqId : err));
400  return;
401  }
402  m_ThreadPool->AddTask(new CWGSThreadPoolTask_ResolveSeqId(*this));
403 }
404 
405 
407 {
408  CRequestContextResetter context_resetter;
409  GetRequest()->SetRequestContext();
410  try {
411  m_WGSData = m_Client->ResolveSeqId(*m_SeqId);
413  }
414  catch (exception& exc) {
415  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
416  m_WGSData.reset();
417  }
419 }
420 
421 
423 {
424  CRequestContextResetter context_resetter;
425  GetRequest()->SetRequestContext();
426  if ( x_IsCanceled() ) {
427  return;
428  }
429  if ( s_SimulateError() ) {
430  m_WGSDataError = "simulated WGS processor error";
431  m_WGSData.reset();
432  }
433  if ( !m_WGSData || !m_WGSData->m_BioseqInfo ) {
434  if ( m_WGSDataError.empty() ) {
436  }
437  else {
440  }
441  return;
442  }
443  if ( !x_SignalStartProcessing() ) {
444  return;
445  }
446  try {
448  }
449  catch (exception& exc) {
450  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
453  return;
454  }
456 }
457 
458 
459 static void s_OnGotBlobBySeqId(void* data)
460 {
461  static_cast<CPSGS_WGSProcessor*>(data)->OnGotBlobBySeqId();
462 }
463 
464 
466 {
467  SPSGS_BlobBySeqIdRequest& get_request = GetRequest()->GetRequest<SPSGS_BlobBySeqIdRequest>();
468  m_SeqId.Reset(new CSeq_id());
469  string err;
470  if (ParseInputSeqId(*m_SeqId, get_request.m_SeqId, get_request.m_SeqIdType, &err) != ePSGS_ParsedOK) {
471  PSG_ERROR("Error parsing seq-id: " << (err.empty() ? get_request.m_SeqId : err));
473  return;
474  }
475 
477  m_ThreadPool->AddTask(new CWGSThreadPoolTask_ResolveSeqId(*this));
478  }
479  else {
480  m_ExcludedBlobs = get_request.m_ExcludeBlobs;
481  m_ThreadPool->AddTask(new CWGSThreadPoolTask_GetBlobBySeqId(*this));
482  }
483 }
484 
485 
487 {
488  CRequestContextResetter context_resetter;
489  GetRequest()->SetRequestContext();
490  try {
491  m_WGSData = m_Client->GetBlobBySeqId(*m_SeqId, m_ExcludedBlobs);
493  }
494  catch (exception& exc) {
495  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
496  m_WGSData.reset();
497  }
499 }
500 
501 
503 {
504  CRequestContextResetter context_resetter;
505  GetRequest()->SetRequestContext();
506  if ( x_IsCanceled() ) {
507  return;
508  }
509  if ( s_SimulateError() ) {
510  m_WGSDataError = "simulated WGS processor error";
511  m_WGSData.reset();
512  }
513  // NOTE: m_Data may be null if the blob was excluded.
514  if ( !m_WGSData || !m_WGSData->m_BioseqInfo ) {
515  if ( m_WGSDataError.empty() ) {
518  }
519  else {
522  }
523  return;
524  }
525  if ( !x_SignalStartProcessing() ) {
526  return;
527  }
528  try {
530  x_SendBlob();
531  }
532  catch (exception& exc) {
533  x_SendError("Exception when handling a request: ", exc);
535  return;
536  }
538 }
539 
540 
541 static void s_OnGotBlobByBlobId(void* data)
542 {
543  static_cast<CPSGS_WGSProcessor*>(data)->OnGotBlobByBlobId();
544 }
545 
546 
548 {
550  m_PSGBlobId = blob_request.m_BlobId.GetId();
552 }
553 
554 
556 {
557  CRequestContextResetter context_resetter;
558  GetRequest()->SetRequestContext();
559  try {
560  m_WGSData = m_Client->GetBlobByBlobId(m_PSGBlobId);
561  }
562  catch (exception& exc) {
563  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
564  m_WGSData.reset();
565  }
567 }
568 
569 
571 {
572  CRequestContextResetter context_resetter;
573  GetRequest()->SetRequestContext();
574  if ( x_IsCanceled() ) {
575  return;
576  }
577  if ( s_SimulateError() ) {
578  m_WGSDataError = "simulated WGS processor error";
579  m_WGSData.reset();
580  }
581  if ( !m_WGSData ) {
582  if ( m_WGSDataError.empty() ) {
585  }
586  else {
589  }
590  return;
591  }
592  if ( !x_SignalStartProcessing() ) {
593  return;
594  }
595  try {
596  x_SendBlob();
597  }
598  catch (exception& exc) {
599  x_SendError("Exception when handling a request: ", exc);
601  return;
602  }
604 }
605 
606 
607 static void s_OnGotChunk(void* data)
608 {
609  static_cast<CPSGS_WGSProcessor*>(data)->OnGotChunk();
610 }
611 
612 
614 {
615  SPSGS_TSEChunkRequest& chunk_request = GetRequest()->GetRequest<SPSGS_TSEChunkRequest>();
616  m_Id2Info = chunk_request.m_Id2Info;
617  m_ChunkId = chunk_request.m_Id2Chunk;
618  m_ThreadPool->AddTask(new CWGSThreadPoolTask_GetChunk(*this));
619 }
620 
621 
623 {
624  CRequestContextResetter context_resetter;
625  GetRequest()->SetRequestContext();
626  try {
627  m_WGSData = m_Client->GetChunk(m_Id2Info, m_ChunkId);
628  }
629  catch (exception& exc) {
630  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
631  m_WGSData.reset();
632  }
634 }
635 
636 
638 {
639  CRequestContextResetter context_resetter;
640  GetRequest()->SetRequestContext();
641  if ( x_IsCanceled() ) {
642  return;
643  }
644  if ( s_SimulateError() ) {
645  m_WGSDataError = "simulated WGS processor error";
646  m_WGSData.reset();
647  }
648  if ( !m_WGSData ) {
649  if ( m_WGSDataError.empty() ) {
652  }
653  else {
656  }
657  return;
658  }
659  if ( !x_SignalStartProcessing() ) {
660  return;
661  }
662  try {
663  if ( m_WGSData->IsForbidden() ) {
664  x_SendForbidden();
665  }
666  else {
667  x_SendChunk();
668  }
669  }
670  catch (exception& exc) {
671  x_SendError("Exception when handling a request: ", exc);
673  return;
674  }
676 }
677 
678 
680 {
684  }
685  return m_OutputFormat;
686 }
687 
688 
689 void s_SetBlobVersion(CBlobRecord& blob_props, const CID2_Blob_Id& blob_id)
690 {
691  if ( blob_id.IsSetVersion() ) {
692  blob_props.SetModified(int64_t(blob_id.GetVersion())*60000);
693  }
694 }
695 
696 
697 void s_SetBlobState(CBlobRecord& blob_props, int id2_blob_state)
698 {
699  if ( id2_blob_state & (1 << eID2_Blob_State_withdrawn) ) {
700  blob_props.SetWithdrawn(true);
701  }
702  if ( id2_blob_state & ((1 << eID2_Blob_State_suppressed) |
704  blob_props.SetSuppress(true);
705  }
706  if ( id2_blob_state & (1 << eID2_Blob_State_dead) ) {
707  blob_props.SetDead(true);
708  }
709 }
710 
711 
712 void s_SetBlobDataProps(CBlobRecord& blob_props, const CID2_Reply_Data& data)
713 {
714  if ( data.GetData_compression() == data.eData_compression_gzip ) {
715  blob_props.SetGzip(true);
716  }
717  blob_props.SetNChunks(data.GetData().size());
718 }
719 
720 
723  EPSGOperationStatus status,
724  size_t blob_size)
725 {
727  GetTiming().Register(this, operation, status, start, blob_size);
728 }
729 
730 
733  const CID2_Reply_Data& data)
734 {
735  size_t blob_size = 0;
736  for ( auto& chunk : data.GetData() ) {
737  blob_size += chunk->size();
738  }
739  x_RegisterTiming(start, operation, eOpStatusFound, blob_size);
740 }
741 
742 
744 {
746 }
747 
748 
749 void CPSGS_WGSProcessor::x_SendResult(const string& data_to_send, EOutputFormat output_format)
750 {
751  size_t item_id = GetReply()->GetItemId();
752  GetReply()->PrepareBioseqData(item_id, GetName(), data_to_send, output_format);
753  GetReply()->PrepareBioseqCompletion(item_id, GetName(), 2);
754 }
755 
756 
758 {
759  EOutputFormat output_format = x_GetOutputFormat();
760  string data_to_send;
761  if ( output_format == SPSGS_ResolveRequest::ePSGS_JsonFormat ) {
762  data_to_send = ToJsonString(
763  *m_WGSData->m_BioseqInfo,
764  m_WGSData->m_BioseqInfoFlags,
765  m_WGSData->m_BlobId);
766  } else {
767  data_to_send = ToBioseqProtobuf(*m_WGSData->m_BioseqInfo);
768  }
769 
770  x_SendResult(data_to_send, output_format);
771 }
772 
773 
774 void CPSGS_WGSProcessor::x_SendBlobProps(const string& psg_blob_id, CBlobRecord& blob_props)
775 {
776  auto& reply = *GetReply();
777  size_t item_id = reply.GetItemId();
778  string data_to_send = ToJsonString(blob_props);
779  reply.PrepareBlobPropData(item_id, GetName(), psg_blob_id, data_to_send);
780  reply.PrepareBlobPropCompletion(item_id, GetName(), 2);
781 }
782 
783 
784 void CPSGS_WGSProcessor::x_SendBlobForbidden(const string& psg_blob_id)
785 {
786  auto& reply = *GetReply();
787  size_t item_id = reply.GetItemId();
788  reply.PrepareBlobMessage(item_id, GetName(),
789  psg_blob_id,
790  "Blob retrieval is not authorized",
793  eDiag_Error);
794  reply.PrepareBlobCompletion(item_id, GetName(), 2);
795 }
796 
797 
798 void CPSGS_WGSProcessor::x_SendBlobData(const string& psg_blob_id, const CID2_Reply_Data& data)
799 {
800  size_t item_id = GetReply()->GetItemId();
801  int chunk_no = 0;
802  for ( auto& chunk : data.GetData() ) {
803  GetReply()->PrepareBlobData(item_id, GetName(), psg_blob_id,
804  (const unsigned char*)chunk->data(), chunk->size(), chunk_no++);
805  }
806  GetReply()->PrepareBlobCompletion(item_id, GetName(), chunk_no + 1);
807 }
808 
809 
811  const string& id2_info,
812  TID2ChunkId chunk_id,
813  CBlobRecord& blob_props)
814 {
815  size_t item_id = GetReply()->GetItemId();
816  string data_to_send = ToJsonString(blob_props);
817  GetReply()->PrepareTSEBlobPropData(item_id, GetName(), chunk_id, id2_info, data_to_send);
818  GetReply()->PrepareBlobPropCompletion(item_id, GetName(), 2);
819 }
820 
821 
823  const string& id2_info,
824  TID2ChunkId chunk_id,
825  const CID2_Reply_Data& data)
826 {
827  size_t item_id = GetReply()->GetItemId();
828  int chunk_no = 0;
829  for ( auto& chunk : data.GetData() ) {
830  GetReply()->PrepareTSEBlobData(item_id, GetName(),
831  (const unsigned char*)chunk->data(), chunk->size(), chunk_no++,
832  chunk_id, id2_info);
833  }
834  GetReply()->PrepareTSEBlobCompletion(item_id, GetName(), chunk_no+1);
835 }
836 
837 
839 {
840  if (!m_WGSData->m_Id2BlobId) return;
841  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
842  string blob_id = m_WGSData->m_BlobId;
843  auto split_version = m_WGSData->m_SplitVersion;
844 
845  string id2_info = osg::CPSGS_OSGGetBlobBase::GetPSGId2Info(id2_blob_id, split_version);
846 
847  CBlobRecord main_blob_props;
848  s_SetBlobVersion(main_blob_props, id2_blob_id);
849  s_SetBlobState(main_blob_props, m_WGSData->GetID2BlobState());
850  main_blob_props.SetId2Info(id2_info);
851  x_SendBlobProps(blob_id, main_blob_props);
852 
853  CBlobRecord split_info_blob_props;
854  CID2_Reply_Data data;
855  x_WriteData(data, *m_WGSData->m_Data, m_WGSData->m_Compress);
857  s_SetBlobDataProps(split_info_blob_props, data);
858  x_SendChunkBlobProps(id2_info, kSplitInfoChunk, split_info_blob_props);
859  x_SendChunkBlobData(id2_info, kSplitInfoChunk, data);
860 }
861 
862 
864 {
865  _ASSERT(m_WGSData->m_Id2BlobId);
866  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
867  string main_blob_id = m_WGSData->m_BlobId;
868 
869  CID2_Reply_Data data;
870  x_WriteData(data, *m_WGSData->m_Data, m_WGSData->m_Compress);
872 
873  CBlobRecord main_blob_props;
874  s_SetBlobVersion(main_blob_props, id2_blob_id);
875  s_SetBlobState(main_blob_props, m_WGSData->GetID2BlobState());
876  s_SetBlobDataProps(main_blob_props, data);
877  x_SendBlobProps(main_blob_id, main_blob_props);
878  x_SendBlobData(main_blob_id, data);
879 }
880 
881 
883 {
884  size_t item_id = GetReply()->GetItemId();
885  GetReply()->PrepareBlobExcluded(item_id, GetName(), m_WGSData->m_BlobId, ePSGS_BlobExcluded);
886 }
887 
888 
890 {
891  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
892  const string& psg_blob_id = m_WGSData->m_BlobId;
893 
894  CBlobRecord blob_props;
895  s_SetBlobVersion(blob_props, id2_blob_id);
896  s_SetBlobState(blob_props, m_WGSData->GetID2BlobState());
897  x_SendBlobProps(psg_blob_id, blob_props);
898  x_SendBlobForbidden(psg_blob_id);
899 }
900 
901 
903 {
904  if ( m_WGSData->IsForbidden() ) {
905  x_SendForbidden();
906  return;
907  }
908  if ( m_WGSData->m_Excluded ) {
909  x_SendExcluded();
910  return;
911  }
912  if ( m_WGSData->m_Data->GetMainObject().GetThisTypeInfo() == CID2S_Split_Info::GetTypeInfo() ) {
913  // split info
914  x_SendSplitInfo();
915  }
916  else {
917  x_SendMainEntry();
918  }
919 }
920 
921 
923 {
924  _ASSERT(m_WGSData && m_WGSData->m_Data);
925  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
926  auto split_version = m_WGSData->m_SplitVersion;
927  string id2_info = osg::CPSGS_OSGGetBlobBase::GetPSGId2Info(id2_blob_id, split_version);
928 
929  CID2_Reply_Data data;
930  x_WriteData(data, *m_WGSData->m_Data, m_WGSData->m_Compress);
932 
933  CBlobRecord chunk_blob_props;
934  s_SetBlobDataProps(chunk_blob_props, data);
935  x_SendChunkBlobProps(id2_info, m_ChunkId, chunk_blob_props);
936  x_SendChunkBlobData(id2_info, m_ChunkId, data);
937 }
938 
939 
941  const CAsnBinData& obj,
942  bool compress) const
943 {
945  COSSWriter writer(data.SetData());
946  CWStream writer_stream(&writer);
948  if ( compress ) {
950  str.reset(new CCompressionOStream(writer_stream,
953  }
954  else {
956  str.reset(&writer_stream, eNoOwnership);
957  }
958  CObjectOStreamAsnBinary objstr(*str);
959  obj.Serialize(objstr);
960 }
961 
962 
964 {
965  {
966  CFastMutexGuard guard(m_Mutex);
967  if (m_Unlocked) return;
968  m_Unlocked = true;
969  }
970  if (m_Request) m_Request->Unlock(kWGSProcessorEvent);
971 }
972 
973 
975 {
976  if (m_Canceled) return;
978 }
979 
980 
982 {
983  m_Canceled = true;
984  if (!IsUVThreadAssigned()) {
987  }
988  else {
989  x_UnlockRequest();
990  }
991 }
992 
993 
995 {
996  return m_Status;
997 }
998 
999 
1001 {
1002  if ( m_Canceled ) {
1004  return true;
1005  }
1006  return false;
1007 }
1008 
1009 
1011 {
1012  if ( SignalStartProcessing() == ePSGS_Cancel ) {
1014  return false;
1015  }
1016  return true;
1017 }
1018 
1019 
1021 {
1022  _ASSERT(status != ePSGS_InProgress);
1023  m_Status = status;
1024  x_UnlockRequest();
1026 }
1027 
1028 
1029 void CPSGS_WGSProcessor::x_SendError(shared_ptr<CPSGS_Reply> reply,
1030  const string& msg)
1031 {
1032  reply->PrepareProcessorMessage(reply->GetItemId(), "WGS", msg,
1035  eDiag_Error);
1036 }
1037 
1038 
1039 void CPSGS_WGSProcessor::x_SendError(const string& msg)
1040 {
1041  x_SendError(m_Reply, msg);
1042 }
1043 
1044 
1045 void CPSGS_WGSProcessor::x_SendError(shared_ptr<CPSGS_Reply> reply,
1046  const string& msg, const exception& exc)
1047 {
1048  x_SendError(reply, msg+string(exc.what()));
1049 }
1050 
1051 
1052 void CPSGS_WGSProcessor::x_SendError(const string& msg, const exception& exc)
1053 {
1054  x_SendError(m_Reply, msg+string(exc.what()));
1055 }
1056 
1057 
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
#define true
Definition: bool.h:35
#define false
Definition: bool.h:36
const string kCassandraProcessorEvent
virtual void Serialize(CObjectOStreamAsnBinary &out) const
Definition: wgsread.cpp:325
CBlobRecord & SetNChunks(int32_t value)
CBlobRecord & SetId2Info(string const &value)
CBlobRecord & SetSuppress(bool value)
CBlobRecord & SetGzip(bool value)
CBlobRecord & SetWithdrawn(bool value)
CBlobRecord & SetDead(bool value)
CBlobRecord & SetModified(TTimestamp value)
Definition: blob_record.cpp:70
CID2_Blob_Id –.
Definition: ID2_Blob_Id.hpp:66
CNcbiRegistry –.
Definition: ncbireg.hpp:913
list< TOctetString * > TOctetStringSequence
virtual ERW_Result Flush(void)
Flush pending data (if any) down to the output device.
COSSWriter(TOctetStringSequence &out)
TOctetStringSequence & m_Output
vector< char > TOctetString
virtual ERW_Result Write(const void *buffer, size_t count, size_t *written)
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
CObjectOStreamAsnBinary –.
Definition: objostrasnb.hpp:58
@ ePSGS_BlobBySatSatKeyRequest
TRequest & GetRequest(void)
void x_SendResult(const string &data_to_send, EOutputFormat output_format)
void x_SendChunkBlobData(const string &id2_info, TID2ChunkId chunk_id, const objects::CID2_Reply_Data &data)
void x_ProcessBlobBySatSatKeyRequest(void)
void OnGotBlobBySeqId(void)
void GetBlobBySeqId(void)
void x_ProcessResolveRequest(void)
void x_InitClient(void) const
static void x_SendError(shared_ptr< CPSGS_Reply > reply, const string &msg)
void x_SendBlobProps(const string &psg_blob_id, CBlobRecord &blob_props)
EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
EOutputFormat x_GetOutputFormat(void)
string GetName(void) const override
Tells the processor name (used in logging and tracing)
void Process(void) override
Main processing function.
void x_ProcessTSEChunkRequest(void)
void x_Finish(EPSGS_Status status)
string GetGroupName(void) const override
Tells the processor group name.
void x_SendBlobData(const string &psg_blob_id, const objects::CID2_Reply_Data &data)
CRef< objects::CSeq_id > m_SeqId
void x_WriteData(objects::CID2_Reply_Data &data, const objects::CAsnBinData &obj, bool compress) const
void OnGotBlobByBlobId(void)
void x_UnlockRequest(void)
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Tells if processor can process the given request.
shared_ptr< CWGSClient > m_Client
void x_RegisterTimingNotFound(EPSGOperation operation)
shared_ptr< ncbi::CThreadPool > m_ThreadPool
void OnResolvedSeqId(void)
shared_ptr< SWGSData > m_WGSData
void GetBlobByBlobId(void)
void x_ProcessBlobBySeqIdRequest(void)
void x_SendSplitInfo(void)
void x_SendForbidden(void)
psg_time_point_t m_Start
~CPSGS_WGSProcessor(void) override
IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
void x_SendBlobForbidden(const string &psg_blob_id)
void Cancel(void) override
The infrastructure request to cancel processing.
void x_SendExcluded(void)
void x_RegisterTiming(psg_time_point_t start, EPSGOperation operation, EPSGOperationStatus status, size_t blob_size)
void x_WaitForOtherProcessors(void)
void x_SendChunkBlobProps(const string &id2_info, TID2ChunkId chunk_id, CBlobRecord &blob_props)
void x_SendMainEntry(void)
void x_SendBioseqInfo(void)
shared_ptr< SWGSProcessor_Config > m_Config
EOutputFormat m_OutputFormat
void x_RegisterTimingFound(psg_time_point_t start, EPSGOperation operation, const objects::CID2_Reply_Data &data)
bool x_IsEnabled(CPSGS_Request &request) const
EPSGS_Status m_Status
static CPubseqGatewayApp * GetInstance(void)
Abstract class for representing single task executing in pool of threads To use this class in applica...
Definition: thread_pool.hpp:76
Main class implementing functionality of pool of threads.
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_GetBlobByBlobId(CPSGS_WGSProcessor &processor)
CWGSThreadPoolTask_GetBlobBySeqId(CPSGS_WGSProcessor &processor)
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_GetChunk(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_ResolveSeqId(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
Writer-based output stream.
Definition: rwstream.hpp:171
CZipStreamCompressor – zlib based compression stream processor.
Definition: zlib.hpp:765
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > GetReply(void) const
Provides the reply wrapper.
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
void PostponeInvoke(CPSGS_UvLoopBinder::TProcessorCB cb, void *user_data)
The provided callback will be called from the libuv loop assigned to the processor.
void SignalFinishProcessing(void)
A processor should call this method when it decides that there is nothing else to be done.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
EPSGS_SeqIdParsingResult ParseInputSeqId(objects::CSeq_id &seq_id, const string &request_seq_id, int request_seq_id_type, string *err_msg=nullptr)
Parse seq-id from a string and type representation.
TProcessorPriority m_Priority
A very basic data-write interface.
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
std::ofstream out("events_result.xml")
main entry point for tests
@ eNoOwnership
No ownership is assumed.
Definition: ncbi_types.h:135
string
Definition: cgiapp.hpp:687
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:773
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
Definition: ncbi_param.hpp:149
#define kMax_UInt
Definition: ncbi_limits.h:185
virtual int GetInt(const string &section, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
Definition: ncbireg.cpp:362
ERW_Result
Result codes for I/O operations.
@ eRW_Success
Everything is okay, I/O completed.
static bool EqualNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive equality of a substring with another string.
Definition: ncbistr.hpp:5352
EStatus
Status of the task.
Definition: thread_pool.hpp:79
@ eCompleted
executed successfully
Definition: thread_pool.hpp:83
#define DEFINE_STATIC_FAST_MUTEX(id)
Define static fast mutex and initialize it.
Definition: ncbimtx.hpp:496
operation
Bit operations.
Definition: bmconst.h:191
TVersion GetVersion(void) const
Get the Version member data.
TData_compression GetData_compression(void) const
Get the Data_compression member data.
const TData & GetData(void) const
Get the Data member data.
void SetData_compression(TData_compression value)
Assign a value to Data_compression data member.
bool IsSetVersion(void) const
version of blob, optional in some requests Check if a value has been assigned to Version data member.
TData & SetData(void)
Assign a value to Data data member.
void SetData_format(TData_format value)
Assign a value to Data_format data member.
@ eID2_Blob_State_dead
@ eID2_Blob_State_suppressed
@ eID2_Blob_State_suppressed_temp
@ eID2_Blob_State_withdrawn
const int64_t kSplitInfoChunk
Definition: id2info.hpp:42
T min(T x_, T y_)
static pcre_uint8 * buffer
Definition: pcretest.c:1051
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
string ToBioseqProtobuf(const CBioseqInfoRecord &bioseq_info)
#define PSG_ERROR(message)
@ ePSGS_BlobExcluded
chrono::steady_clock psg_clock_t
@ ePSGS_ParsedOK
int TProcessorPriority
@ ePSGS_UnknownError
@ ePSGS_BlobRetrievalIsNotAuthorized
psg_clock_t::time_point psg_time_point_t
Reader-writer based streams.
static CNamedPipeClient * client
static const char * str(char *buf, int n)
Definition: stats.c:84
signed __int64 int64_t
Definition: stdint.h:135
vector< string > m_ExcludeBlobs
string GetId(void) const
EPSGS_TSEOption m_TSEOption
vector< string > m_DisabledProcessors
vector< string > m_EnabledProcessors
EPSGS_OutputFormat m_OutputFormat
#define _ASSERT
Pool of generic task-executing threads.
EPSGOperationStatus
Definition: timing.hpp:59
@ eOpStatusFound
Definition: timing.hpp:60
@ eOpStatusNotFound
Definition: timing.hpp:61
EPSGOperation
Definition: timing.hpp:64
@ eTseChunkRetrieve
Definition: timing.hpp:100
@ eBlobRetrieve
Definition: timing.hpp:97
#define DEFAULT_INDEX_UPDATE_TIME
USING_SCOPE(objects)
#define DEFAULT_COMPRESS_DATA
#define DEFAULT_FILE_REOPEN_TIME
static bool s_SimulateError()
void s_SetBlobState(CBlobRecord &blob_props, int id2_blob_state)
#define PARAM_FILE_RECHECK_TIME
static void s_OnGotBlobBySeqId(void *data)
NCBI_PARAM_DEF(int, WGS_PROCESSOR, ERROR_RATE, 0)
END_NAMESPACE(wgs)
#define DEFAULT_VDB_CACHE_SIZE
static const string kWGSProcessorSection
#define DEFAULT_FILE_RECHECK_TIME
BEGIN_LOCAL_NAMESPACE
#define PARAM_COMPRESS_DATA
#define PARAM_FILE_REOPEN_TIME
END_LOCAL_NAMESPACE
NCBI_PARAM_DECL(int, WGS_PROCESSOR, ERROR_RATE)
static void s_OnResolvedSeqId(void *data)
static const string kParamMaxConn
END_NCBI_NAMESPACE
static const string kWGSProcessorGroupName
static const string kWGSProcessorName
static const int kDefaultMaxConn
#define PARAM_INDEX_UPDATE_TIME
BEGIN_NCBI_NAMESPACE
static void s_OnGotBlobByBlobId(void *data)
static void s_OnGotChunk(void *data)
BEGIN_NAMESPACE(psg)
#define PARAM_VDB_CACHE_SIZE
void s_SetBlobVersion(CBlobRecord &blob_props, const CID2_Blob_Id &blob_id)
void s_SetBlobDataProps(CBlobRecord &blob_props, const CID2_Reply_Data &data)
const string kWGSProcessorEvent
ZLib Compression API.
#define compress
Definition: zconf.h:37
Modified on Fri Dec 08 08:21:45 2023 by modify_doxy.py rev. 669887