NCBI C++ ToolKit
wgs_processor.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: wgs_processor.cpp 101533 2023-12-28 14:00:00Z grichenk $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Aleksey Grichenko, Eugene Vasilchenko
27  *
28  * File Description: processor for data from WGS
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "wgs_processor.hpp"
35 #include "wgs_client.hpp"
36 #include "pubseq_gateway.hpp"
40 #include "osg_getblob_base.hpp"
41 #include "osg_resolve_base.hpp"
42 #include "id2info.hpp"
43 #include "cass_processor_base.hpp"
44 #include "psgs_thread_pool.hpp"
47 #include <corelib/rwstream.hpp>
48 #include <serial/serial.hpp>
49 #include <serial/objostrasnb.hpp>
50 #include <util/compress/zlib.hpp>
51 #include <util/thread_pool.hpp>
59 
60 
64 
66 
67 static const string kWGSProcessorName = "WGS";
68 static const string kWGSProcessorGroupName = "WGS";
69 static const string kWGSProcessorSection = "WGS_PROCESSOR";
70 
71 static const string kParamMaxConn = "maxconn";
72 static const int kDefaultMaxConn = 64;
73 
74 
75 /////////////////////////////////////////////////////////////////////////////
76 // Helper classes
77 /////////////////////////////////////////////////////////////////////////////
78 
80 
81 class COSSWriter : public IWriter
82 {
83 public:
84  typedef vector<char> TOctetString;
85  typedef list<TOctetString*> TOctetStringSequence;
86 
88  : m_Output(out)
89  {
90  }
91 
92  virtual ERW_Result Write(const void* buffer,
93  size_t count,
94  size_t* written)
95  {
96  const char* data = static_cast<const char*>(buffer);
97  m_Output.push_back(new TOctetString(data, data+count));
98  if ( written ) {
99  *written = count;
100  }
101  return eRW_Success;
102  }
103  virtual ERW_Result Flush(void)
104  {
105  return eRW_Success;
106  }
107 
108 private:
110 };
111 
112 
114 {
115 public:
117 
118  virtual EStatus Execute(void) override
119  {
121  return eCompleted;
122  }
123 
124 private:
126 };
127 
128 
130 {
131 public:
133 
134  virtual EStatus Execute(void) override
135  {
137  return eCompleted;
138  }
139 
140 private:
142 };
143 
144 
146 {
147 public:
149 
150  virtual EStatus Execute(void) override
151  {
153  return eCompleted;
154  }
155 
156 private:
158 };
159 
160 
162 {
163 public:
165 
166  virtual EStatus Execute(void) override
167  {
169  return eCompleted;
170  }
171 
172 private:
174 };
175 
176 
178 
179 
180 NCBI_PARAM_DECL(int, WGS_PROCESSOR, ERROR_RATE);
181 NCBI_PARAM_DEF(int, WGS_PROCESSOR, ERROR_RATE, 0);
182 
183 static bool s_SimulateError()
184 {
185  static int error_rate = NCBI_PARAM_TYPE(WGS_PROCESSOR, ERROR_RATE)::GetDefault();
186  if ( error_rate > 0 ) {
187  static int error_counter = 0;
188  if ( ++error_counter >= error_rate ) {
189  error_counter = 0;
190  return true;
191  }
192  }
193  return false;
194 }
195 
196 
197 /////////////////////////////////////////////////////////////////////////////
198 // CPSGS_WGSProcessor
199 /////////////////////////////////////////////////////////////////////////////
200 
201 
202 #define PARAM_VDB_CACHE_SIZE "vdb_cache_size"
203 #define PARAM_INDEX_UPDATE_TIME "index_update_time"
204 #define PARAM_FILE_REOPEN_TIME "file_reopen_time"
205 #define PARAM_FILE_RECHECK_TIME "file_recheck_time"
206 #define PARAM_COMPRESS_DATA "compress_data"
207 
208 #define DEFAULT_VDB_CACHE_SIZE 100
209 #define DEFAULT_INDEX_UPDATE_TIME 600
210 #define DEFAULT_FILE_REOPEN_TIME 3600
211 #define DEFAULT_FILE_RECHECK_TIME 600
212 #define DEFAULT_COMPRESS_DATA SWGSProcessor_Config::eCompressData_some
213 
214 
216  : m_Config(new SWGSProcessor_Config),
217  m_Status(ePSGS_NotFound),
218  m_Canceled(false),
219  m_ChunkId(0),
220  m_OutputFormat(SPSGS_ResolveRequest::ePSGS_NativeFormat),
221  m_Unlocked(true)
222 {
223  x_LoadConfig();
224 }
225 
226 
228  const shared_ptr<CWGSClient>& client,
229  shared_ptr<ncbi::CThreadPool> thread_pool,
230  shared_ptr<CPSGS_Request> request,
231  shared_ptr<CPSGS_Reply> reply,
232  TProcessorPriority priority)
233  : m_Client(client),
234  m_Start(psg_clock_t::now()),
235  m_Status(ePSGS_InProgress),
236  m_Canceled(false),
237  m_ChunkId(0),
238  m_OutputFormat(SPSGS_ResolveRequest::ePSGS_NativeFormat),
239  m_Unlocked(true),
240  m_ThreadPool(thread_pool)
241 {
242  m_Request = request;
243  m_Reply = reply;
244  m_Priority = priority;
245 }
246 
247 
249 {
251  x_UnlockRequest();
252 }
253 
254 
256 {
257  const CNcbiRegistry& registry = CPubseqGatewayApp::GetInstance()->GetConfig();
259 
261  if ( compress_data >= SWGSProcessor_Config::eCompressData_never &&
262  compress_data <= SWGSProcessor_Config::eCompressData_always ) {
263  m_Config->m_CompressData = SWGSProcessor_Config::ECompressData(compress_data);
264  }
268 
270  if (max_conn == 0) {
271  max_conn = kDefaultMaxConn;
272  }
275  min(3u, max_conn), max_conn)));
276 }
277 
278 
279 bool CPSGS_WGSProcessor::CanProcess(shared_ptr<CPSGS_Request> request,
280  shared_ptr<CPSGS_Reply> /*reply*/) const
281 {
282  if ( !x_IsEnabled(*request) ) return false;
283  x_InitClient();
284  _ASSERT(m_Client);
285  return m_Client->CanProcessRequest(*request);
286 }
287 
288 
290 CPSGS_WGSProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
291  shared_ptr<CPSGS_Reply> reply,
292  TProcessorPriority priority) const
293 {
294  if ( !x_IsEnabled(*request) ) return nullptr;
295  x_InitClient();
296  _ASSERT(m_Client);
297  if ( !m_Client->CanProcessRequest(*request) ) return nullptr;
298 
299  return new CPSGS_WGSProcessor(m_Client, m_ThreadPool, request, reply, priority);
300 }
301 
302 
304 {
305  return kWGSProcessorName;
306 }
307 
308 
310 {
311  return kWGSProcessorGroupName;
312 }
313 
314 
316 {
317  CRequestContextResetter context_resetter;
318  GetRequest()->SetRequestContext();
319  if ( x_IsCanceled() ) {
320  return;
321  }
322 
323  try {
324  {
325  CFastMutexGuard guard(m_Mutex);
326  m_Unlocked = false;
327  }
329  auto req_type = GetRequest()->GetRequestType();
330  switch (req_type) {
333  break;
336  break;
339  break;
342  break;
343  default:
345  break;
346  }
347  }
348  catch (exception& exc) {
349  x_SendError("Exception when handling a request: ", exc);
351  }
352 }
353 
354 
356 {
357  auto app = CPubseqGatewayApp::GetInstance();
358  bool enabled = app->GetWGSProcessorsEnabled();
359  if ( enabled ) {
360  for (const auto& name : request.GetRequest<SPSGS_RequestBase>().m_DisabledProcessors ) {
361  if ( NStr::EqualNocase(name, kWGSProcessorName) ) return false;
362  }
363  }
364  else {
365  for (const auto& name : request.GetRequest<SPSGS_RequestBase>().m_EnabledProcessors ) {
366  if ( NStr::EqualNocase(name, kWGSProcessorName) ) return true;
367  }
368  }
369  return enabled;
370 }
371 
372 
373 inline
375 {
376  DEFINE_STATIC_FAST_MUTEX(s_ClientMutex);
377  if ( m_Client ) return;
378  CFastMutexGuard guard(s_ClientMutex);
379  if ( !m_Client ) {
380  m_Client = make_shared<CWGSClient>(*m_Config);
381  }
382 }
383 
384 
385 static void s_OnResolvedSeqId(void* data)
386 {
387  static_cast<CPSGS_WGSProcessor*>(data)->OnResolvedSeqId();
388 }
389 
390 
392 {
393  SPSGS_ResolveRequest& resolve_request = GetRequest()->GetRequest<SPSGS_ResolveRequest>();
394  m_OutputFormat = resolve_request.m_OutputFormat;
395  m_SeqId.Reset(new CSeq_id());
396  string err;
397  if (ParseInputSeqId(*m_SeqId, resolve_request.m_SeqId, resolve_request.m_SeqIdType, &err) != ePSGS_ParsedOK) {
398  PSG_ERROR("Error parsing seq-id: " << (err.empty() ? resolve_request.m_SeqId : err));
400  return;
401  }
402  if ( GetRequest()->NeedTrace() ) {
403  GetReply()->SendTrace(
404  kWGSProcessorName + " processor is resolving seq-id " + m_SeqId->AsFastaString(),
405  GetRequest()->GetStartTimestamp());
406  }
407  m_ThreadPool->AddTask(new CWGSThreadPoolTask_ResolveSeqId(*this));
408 }
409 
410 
412 {
413  CRequestContextResetter context_resetter;
414  GetRequest()->SetRequestContext();
415  try {
416  m_WGSData = m_Client->ResolveSeqId(*m_SeqId);
417  if ( GetRequest()->NeedTrace() ) {
418  GetReply()->SendTrace(
419  kWGSProcessorName + " processor finished resolving seq-id " + m_SeqId->AsFastaString() + ", waiting for other processors",
420  GetRequest()->GetStartTimestamp());
421  }
423  }
424  catch (exception& exc) {
425  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
426  m_WGSData.reset();
427  }
429 }
430 
431 
433 {
434  CRequestContextResetter context_resetter;
435  GetRequest()->SetRequestContext();
436  if ( x_IsCanceled() ) {
437  return;
438  }
439  if ( s_SimulateError() ) {
440  m_WGSDataError = "simulated WGS processor error";
441  m_WGSData.reset();
442  }
443  if ( !m_WGSData || !m_WGSData->m_BioseqInfo ) {
444  if ( m_WGSDataError.empty() ) {
445  if ( GetRequest()->NeedTrace() ) {
446  GetReply()->SendTrace(
447  kWGSProcessorName + " processor could not find info for seq-id " + m_SeqId->AsFastaString(),
448  GetRequest()->GetStartTimestamp());
449  }
451  }
452  else {
455  }
456  return;
457  }
458  if ( !x_SignalStartProcessing() ) {
459  return;
460  }
461  try {
462  if ( GetRequest()->NeedTrace() ) {
463  GetReply()->SendTrace(
464  kWGSProcessorName + " processor resolved seq-id " + m_SeqId->AsFastaString() + " to blob-id " + m_WGSData->m_BlobId,
465  GetRequest()->GetStartTimestamp());
466  }
468  }
469  catch (exception& exc) {
470  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
473  return;
474  }
476 }
477 
478 
479 static void s_OnGotBlobBySeqId(void* data)
480 {
481  static_cast<CPSGS_WGSProcessor*>(data)->OnGotBlobBySeqId();
482 }
483 
484 
486 {
487  SPSGS_BlobBySeqIdRequest& get_request = GetRequest()->GetRequest<SPSGS_BlobBySeqIdRequest>();
488  m_SeqId.Reset(new CSeq_id());
489  string err;
490  if (ParseInputSeqId(*m_SeqId, get_request.m_SeqId, get_request.m_SeqIdType, &err) != ePSGS_ParsedOK) {
491  PSG_ERROR("Error parsing seq-id: " << (err.empty() ? get_request.m_SeqId : err));
493  return;
494  }
495 
497  if ( GetRequest()->NeedTrace() ) {
498  GetReply()->SendTrace(
499  kWGSProcessorName + " processor is getting info for seq-id " + m_SeqId->AsFastaString(),
500  GetRequest()->GetStartTimestamp());
501  }
502  m_ThreadPool->AddTask(new CWGSThreadPoolTask_ResolveSeqId(*this));
503  }
504  else {
505  if ( GetRequest()->NeedTrace() ) {
506  GetReply()->SendTrace(
507  kWGSProcessorName + " processor is getting blob for seq-id " + m_SeqId->AsFastaString(),
508  GetRequest()->GetStartTimestamp());
509  }
510  m_ExcludedBlobs = get_request.m_ExcludeBlobs;
511  m_ThreadPool->AddTask(new CWGSThreadPoolTask_GetBlobBySeqId(*this));
512  }
513 }
514 
515 
517 {
518  CRequestContextResetter context_resetter;
519  GetRequest()->SetRequestContext();
520  try {
521  m_WGSData = m_Client->GetBlobBySeqId(*m_SeqId, m_ExcludedBlobs);
522  if ( GetRequest()->NeedTrace() ) {
523  GetReply()->SendTrace(
524  kWGSProcessorName + " processor finished getting blob for seq-id " + m_SeqId->AsFastaString() + ", waiting for other processors",
525  GetRequest()->GetStartTimestamp());
526  }
528  }
529  catch (exception& exc) {
530  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
531  m_WGSData.reset();
532  }
534 }
535 
536 
538 {
539  CRequestContextResetter context_resetter;
540  GetRequest()->SetRequestContext();
541  if ( x_IsCanceled() ) {
542  return;
543  }
544  if ( s_SimulateError() ) {
545  m_WGSDataError = "simulated WGS processor error";
546  m_WGSData.reset();
547  }
548  // NOTE: m_Data may be null if the blob was excluded.
549  if ( !m_WGSData || !m_WGSData->m_BioseqInfo ) {
550  if ( m_WGSDataError.empty() ) {
551  if ( GetRequest()->NeedTrace() ) {
552  GetReply()->SendTrace(
553  kWGSProcessorName + " processor could not find info for seq-id " + m_SeqId->AsFastaString(),
554  GetRequest()->GetStartTimestamp());
555  }
558  }
559  else {
562  }
563  return;
564  }
565  if ( !x_SignalStartProcessing() ) {
566  return;
567  }
568  try {
569  if ( GetRequest()->NeedTrace() ) {
570  GetReply()->SendTrace(
571  kWGSProcessorName + " processor resolved seq-id " + m_SeqId->AsFastaString() + " to blob-id " + m_WGSData->m_BlobId,
572  GetRequest()->GetStartTimestamp());
573  }
575  x_SendBlob();
576  }
577  catch (exception& exc) {
578  x_SendError("Exception when handling a request: ", exc);
580  return;
581  }
583 }
584 
585 
586 static void s_OnGotBlobByBlobId(void* data)
587 {
588  static_cast<CPSGS_WGSProcessor*>(data)->OnGotBlobByBlobId();
589 }
590 
591 
593 {
595  m_PSGBlobId = blob_request.m_BlobId.GetId();
596  if ( GetRequest()->NeedTrace() ) {
597  GetReply()->SendTrace(
598  kWGSProcessorName + " processor is fetching blob " + m_PSGBlobId,
599  GetRequest()->GetStartTimestamp());
600  }
602 }
603 
604 
606 {
607  CRequestContextResetter context_resetter;
608  GetRequest()->SetRequestContext();
609  try {
610  m_WGSData = m_Client->GetBlobByBlobId(m_PSGBlobId);
611  if ( GetRequest()->NeedTrace() ) {
612  GetReply()->SendTrace(
613  kWGSProcessorName + " processor finished fetching blob " + m_PSGBlobId,
614  GetRequest()->GetStartTimestamp());
615  }
616  }
617  catch (exception& exc) {
618  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
619  m_WGSData.reset();
620  }
622 }
623 
624 
626 {
627  CRequestContextResetter context_resetter;
628  GetRequest()->SetRequestContext();
629  if ( x_IsCanceled() ) {
630  return;
631  }
632  if ( s_SimulateError() ) {
633  m_WGSDataError = "simulated WGS processor error";
634  m_WGSData.reset();
635  }
636  if ( !m_WGSData ) {
637  if ( m_WGSDataError.empty() ) {
638  if ( GetRequest()->NeedTrace() ) {
639  GetReply()->SendTrace(
640  kWGSProcessorName + " processor could not find blob " + m_PSGBlobId,
641  GetRequest()->GetStartTimestamp());
642  }
645  }
646  else {
649  }
650  return;
651  }
652  if ( !x_SignalStartProcessing() ) {
653  return;
654  }
655  try {
656  if ( GetRequest()->NeedTrace() ) {
657  GetReply()->SendTrace(
658  kWGSProcessorName + " processor retrieved blob " + m_PSGBlobId,
659  GetRequest()->GetStartTimestamp());
660  }
661  x_SendBlob();
662  }
663  catch (exception& exc) {
664  x_SendError("Exception when handling a request: ", exc);
666  return;
667  }
669 }
670 
671 
672 static void s_OnGotChunk(void* data)
673 {
674  static_cast<CPSGS_WGSProcessor*>(data)->OnGotChunk();
675 }
676 
677 
679 {
680  SPSGS_TSEChunkRequest& chunk_request = GetRequest()->GetRequest<SPSGS_TSEChunkRequest>();
681  m_Id2Info = chunk_request.m_Id2Info;
682  m_ChunkId = chunk_request.m_Id2Chunk;
683  if ( GetRequest()->NeedTrace() ) {
684  GetReply()->SendTrace(
685  kWGSProcessorName + " processor is fetching chunk " + m_Id2Info + "." + NStr::NumericToString(m_ChunkId),
686  GetRequest()->GetStartTimestamp());
687  }
688  m_ThreadPool->AddTask(new CWGSThreadPoolTask_GetChunk(*this));
689 }
690 
691 
693 {
694  CRequestContextResetter context_resetter;
695  GetRequest()->SetRequestContext();
696  try {
697  m_WGSData = m_Client->GetChunk(m_Id2Info, m_ChunkId);
698  if ( GetRequest()->NeedTrace() ) {
699  GetReply()->SendTrace(
700  kWGSProcessorName + " processor finished fetching chunk " + m_Id2Info + "." + NStr::NumericToString(m_ChunkId),
701  GetRequest()->GetStartTimestamp());
702  }
703  }
704  catch (exception& exc) {
705  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
706  m_WGSData.reset();
707  }
709 }
710 
711 
713 {
714  CRequestContextResetter context_resetter;
715  GetRequest()->SetRequestContext();
716  if ( x_IsCanceled() ) {
717  return;
718  }
719  if ( s_SimulateError() ) {
720  m_WGSDataError = "simulated WGS processor error";
721  m_WGSData.reset();
722  }
723  if ( !m_WGSData ) {
724  if ( m_WGSDataError.empty() ) {
725  if ( GetRequest()->NeedTrace() ) {
726  GetReply()->SendTrace(
727  kWGSProcessorName + " processor could not find chunk " + m_Id2Info + "." + NStr::NumericToString(m_ChunkId),
728  GetRequest()->GetStartTimestamp());
729  }
732  }
733  else {
736  }
737  return;
738  }
739  if ( !x_SignalStartProcessing() ) {
740  return;
741  }
742  try {
743  if ( m_WGSData->IsForbidden() ) {
744  if ( GetRequest()->NeedTrace() ) {
745  GetReply()->SendTrace(
746  kWGSProcessorName + " processor can not send forbidden chunk " + m_Id2Info + "." + NStr::NumericToString(m_ChunkId),
747  GetRequest()->GetStartTimestamp());
748  }
749  x_SendForbidden();
750  }
751  else {
752  if ( GetRequest()->NeedTrace() ) {
753  GetReply()->SendTrace(
754  kWGSProcessorName + " processor retrieved chunk " + m_Id2Info + "." + NStr::NumericToString(m_ChunkId),
755  GetRequest()->GetStartTimestamp());
756  }
757  x_SendChunk();
758  }
759  }
760  catch (exception& exc) {
761  x_SendError("Exception when handling a request: ", exc);
763  return;
764  }
766 }
767 
768 
770 {
774  }
775  return m_OutputFormat;
776 }
777 
778 
779 void s_SetBlobVersion(CBlobRecord& blob_props, const CID2_Blob_Id& blob_id)
780 {
781  if ( blob_id.IsSetVersion() ) {
782  blob_props.SetModified(int64_t(blob_id.GetVersion())*60000);
783  }
784 }
785 
786 
787 void s_SetBlobState(CBlobRecord& blob_props, int id2_blob_state)
788 {
789  if ( id2_blob_state & (1 << eID2_Blob_State_withdrawn) ) {
790  blob_props.SetWithdrawn(true);
791  }
792  if ( id2_blob_state & ((1 << eID2_Blob_State_suppressed) |
794  blob_props.SetSuppress(true);
795  }
796  if ( id2_blob_state & (1 << eID2_Blob_State_dead) ) {
797  blob_props.SetDead(true);
798  }
799 }
800 
801 
803 {
804  if ( data.GetData_compression() == data.eData_compression_gzip ) {
805  blob_props.SetGzip(true);
806  }
807  blob_props.SetNChunks(data.GetData().size());
808 }
809 
810 
813  EPSGOperationStatus status,
814  size_t blob_size)
815 {
817  GetTiming().Register(this, operation, status, start, blob_size);
818 }
819 
820 
823  const CID2_Reply_Data& data)
824 {
825  size_t blob_size = 0;
826  for ( auto& chunk : data.GetData() ) {
827  blob_size += chunk->size();
828  }
829  x_RegisterTiming(start, operation, eOpStatusFound, blob_size);
830 }
831 
832 
834 {
836 }
837 
838 
839 void CPSGS_WGSProcessor::x_SendResult(const string& data_to_send, EOutputFormat output_format)
840 {
841  size_t item_id = GetReply()->GetItemId();
842  GetReply()->PrepareBioseqData(item_id, GetName(), data_to_send, output_format);
843  GetReply()->PrepareBioseqCompletion(item_id, GetName(), 2);
844 }
845 
846 
848 {
849  EOutputFormat output_format = x_GetOutputFormat();
850  string data_to_send;
851  if ( output_format == SPSGS_ResolveRequest::ePSGS_JsonFormat ) {
852  data_to_send = ToJsonString(
853  *m_WGSData->m_BioseqInfo,
854  m_WGSData->m_BioseqInfoFlags,
855  m_WGSData->m_BlobId);
856  } else {
857  data_to_send = ToBioseqProtobuf(*m_WGSData->m_BioseqInfo);
858  }
859 
860  x_SendResult(data_to_send, output_format);
861 }
862 
863 
864 void CPSGS_WGSProcessor::x_SendBlobProps(const string& psg_blob_id, CBlobRecord& blob_props)
865 {
866  auto& reply = *GetReply();
867  size_t item_id = reply.GetItemId();
868  string data_to_send = ToJsonString(blob_props);
869  reply.PrepareBlobPropData(item_id, GetName(), psg_blob_id, data_to_send);
870  reply.PrepareBlobPropCompletion(item_id, GetName(), 2);
871 }
872 
873 
874 void CPSGS_WGSProcessor::x_SendBlobForbidden(const string& psg_blob_id)
875 {
876  auto& reply = *GetReply();
877  size_t item_id = reply.GetItemId();
878  reply.PrepareBlobMessage(item_id, GetName(),
879  psg_blob_id,
880  "Blob retrieval is not authorized",
883  eDiag_Error);
884  reply.PrepareBlobCompletion(item_id, GetName(), 2);
885 }
886 
887 
888 void CPSGS_WGSProcessor::x_SendBlobData(const string& psg_blob_id, const CID2_Reply_Data& data)
889 {
890  size_t item_id = GetReply()->GetItemId();
891  int chunk_no = 0;
892  for ( auto& chunk : data.GetData() ) {
893  GetReply()->PrepareBlobData(item_id, GetName(), psg_blob_id,
894  (const unsigned char*)chunk->data(), chunk->size(), chunk_no++);
895  }
896  GetReply()->PrepareBlobCompletion(item_id, GetName(), chunk_no + 1);
897 }
898 
899 
901  const string& id2_info,
902  TID2ChunkId chunk_id,
903  CBlobRecord& blob_props)
904 {
905  size_t item_id = GetReply()->GetItemId();
906  string data_to_send = ToJsonString(blob_props);
907  GetReply()->PrepareTSEBlobPropData(item_id, GetName(), chunk_id, id2_info, data_to_send);
908  GetReply()->PrepareBlobPropCompletion(item_id, GetName(), 2);
909 }
910 
911 
913  const string& id2_info,
914  TID2ChunkId chunk_id,
915  const CID2_Reply_Data& data)
916 {
917  size_t item_id = GetReply()->GetItemId();
918  int chunk_no = 0;
919  for ( auto& chunk : data.GetData() ) {
920  GetReply()->PrepareTSEBlobData(item_id, GetName(),
921  (const unsigned char*)chunk->data(), chunk->size(), chunk_no++,
922  chunk_id, id2_info);
923  }
924  GetReply()->PrepareTSEBlobCompletion(item_id, GetName(), chunk_no+1);
925 }
926 
927 
929 {
930  if (!m_WGSData->m_Id2BlobId) return;
931  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
932  string blob_id = m_WGSData->m_BlobId;
933  auto split_version = m_WGSData->m_SplitVersion;
934 
935  string id2_info = osg::CPSGS_OSGGetBlobBase::GetPSGId2Info(id2_blob_id, split_version);
936 
937  CBlobRecord main_blob_props;
938  s_SetBlobVersion(main_blob_props, id2_blob_id);
939  s_SetBlobState(main_blob_props, m_WGSData->GetID2BlobState());
940  main_blob_props.SetId2Info(id2_info);
941  x_SendBlobProps(blob_id, main_blob_props);
942 
943  CBlobRecord split_info_blob_props;
945  x_WriteData(data, *m_WGSData->m_Data, m_WGSData->m_Compress);
947  s_SetBlobDataProps(split_info_blob_props, data);
948  x_SendChunkBlobProps(id2_info, kSplitInfoChunk, split_info_blob_props);
950 }
951 
952 
954 {
955  _ASSERT(m_WGSData->m_Id2BlobId);
956  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
957  string main_blob_id = m_WGSData->m_BlobId;
958 
960  x_WriteData(data, *m_WGSData->m_Data, m_WGSData->m_Compress);
962 
963  CBlobRecord main_blob_props;
964  s_SetBlobVersion(main_blob_props, id2_blob_id);
965  s_SetBlobState(main_blob_props, m_WGSData->GetID2BlobState());
966  s_SetBlobDataProps(main_blob_props, data);
967  x_SendBlobProps(main_blob_id, main_blob_props);
968  x_SendBlobData(main_blob_id, data);
969 }
970 
971 
973 {
974  size_t item_id = GetReply()->GetItemId();
975  GetReply()->PrepareBlobExcluded(item_id, GetName(), m_WGSData->m_BlobId, ePSGS_BlobExcluded);
976 }
977 
978 
980 {
981  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
982  const string& psg_blob_id = m_WGSData->m_BlobId;
983 
984  CBlobRecord blob_props;
985  s_SetBlobVersion(blob_props, id2_blob_id);
986  s_SetBlobState(blob_props, m_WGSData->GetID2BlobState());
987  x_SendBlobProps(psg_blob_id, blob_props);
988  x_SendBlobForbidden(psg_blob_id);
989 }
990 
991 
993 {
994  if ( m_WGSData->IsForbidden() ) {
995  x_SendForbidden();
996  return;
997  }
998  if ( m_WGSData->m_Excluded ) {
999  x_SendExcluded();
1000  return;
1001  }
1002  if ( m_WGSData->m_Data->GetMainObject().GetThisTypeInfo() == CID2S_Split_Info::GetTypeInfo() ) {
1003  // split info
1004  x_SendSplitInfo();
1005  }
1006  else {
1007  x_SendMainEntry();
1008  }
1009 }
1010 
1011 
1013 {
1014  _ASSERT(m_WGSData && m_WGSData->m_Data);
1015  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
1016  auto split_version = m_WGSData->m_SplitVersion;
1017  string id2_info = osg::CPSGS_OSGGetBlobBase::GetPSGId2Info(id2_blob_id, split_version);
1018 
1020  x_WriteData(data, *m_WGSData->m_Data, m_WGSData->m_Compress);
1022 
1023  CBlobRecord chunk_blob_props;
1024  s_SetBlobDataProps(chunk_blob_props, data);
1025  x_SendChunkBlobProps(id2_info, m_ChunkId, chunk_blob_props);
1026  x_SendChunkBlobData(id2_info, m_ChunkId, data);
1027 }
1028 
1029 
1031  const CAsnBinData& obj,
1032  bool compress) const
1033 {
1035  COSSWriter writer(data.SetData());
1036  CWStream writer_stream(&writer);
1038  if ( compress ) {
1039  data.SetData_compression(CID2_Reply_Data::eData_compression_gzip);
1040  str.reset(new CCompressionOStream(writer_stream,
1043  }
1044  else {
1045  data.SetData_compression(CID2_Reply_Data::eData_compression_none);
1046  str.reset(&writer_stream, eNoOwnership);
1047  }
1048  CObjectOStreamAsnBinary objstr(*str);
1049  obj.Serialize(objstr);
1050 }
1051 
1052 
1054 {
1055  {
1056  CFastMutexGuard guard(m_Mutex);
1057  if (m_Unlocked) return;
1058  m_Unlocked = true;
1059  }
1060  if (m_Request) m_Request->Unlock(kWGSProcessorEvent);
1061 }
1062 
1063 
1065 {
1066  if (m_Canceled) return;
1068 }
1069 
1070 
1072 {
1073  m_Canceled = true;
1074  if (!IsUVThreadAssigned()) {
1077  }
1078  else {
1079  x_UnlockRequest();
1080  }
1081 }
1082 
1083 
1085 {
1086  return m_Status;
1087 }
1088 
1089 
1091 {
1092  if ( m_Canceled ) {
1094  return true;
1095  }
1096  return false;
1097 }
1098 
1099 
1101 {
1102  if ( SignalStartProcessing() == ePSGS_Cancel ) {
1104  return false;
1105  }
1106  return true;
1107 }
1108 
1109 
1111 {
1112  _ASSERT(status != ePSGS_InProgress);
1113  m_Status = status;
1114  x_UnlockRequest();
1116 }
1117 
1118 
1119 void CPSGS_WGSProcessor::x_SendError(shared_ptr<CPSGS_Reply> reply,
1120  const string& msg)
1121 {
1122  reply->PrepareProcessorMessage(reply->GetItemId(), "WGS", msg,
1125  eDiag_Error);
1126 }
1127 
1128 
1129 void CPSGS_WGSProcessor::x_SendError(const string& msg)
1130 {
1131  x_SendError(m_Reply, msg);
1132 }
1133 
1134 
1135 void CPSGS_WGSProcessor::x_SendError(shared_ptr<CPSGS_Reply> reply,
1136  const string& msg, const exception& exc)
1137 {
1138  x_SendError(reply, msg+string(exc.what()));
1139 }
1140 
1141 
1142 void CPSGS_WGSProcessor::x_SendError(const string& msg, const exception& exc)
1143 {
1144  x_SendError(m_Reply, msg+string(exc.what()));
1145 }
1146 
1147 
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
const string kCassandraProcessorEvent
virtual void Serialize(CObjectOStreamAsnBinary &out) const
Definition: wgsread.cpp:325
CBlobRecord & SetNChunks(int32_t value)
CBlobRecord & SetId2Info(string const &value)
CBlobRecord & SetSuppress(bool value)
CBlobRecord & SetGzip(bool value)
CBlobRecord & SetWithdrawn(bool value)
CBlobRecord & SetDead(bool value)
CBlobRecord & SetModified(TTimestamp value)
Definition: blob_record.cpp:70
CID2_Blob_Id –.
Definition: ID2_Blob_Id.hpp:66
CNcbiRegistry –.
Definition: ncbireg.hpp:913
list< TOctetString * > TOctetStringSequence
virtual ERW_Result Flush(void)
Flush pending data (if any) down to the output device.
COSSWriter(TOctetStringSequence &out)
TOctetStringSequence & m_Output
vector< char > TOctetString
virtual ERW_Result Write(const void *buffer, size_t count, size_t *written)
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
CObjectOStreamAsnBinary –.
Definition: objostrasnb.hpp:58
@ ePSGS_BlobBySatSatKeyRequest
TRequest & GetRequest(void)
void x_SendResult(const string &data_to_send, EOutputFormat output_format)
void x_SendChunkBlobData(const string &id2_info, TID2ChunkId chunk_id, const objects::CID2_Reply_Data &data)
void x_ProcessBlobBySatSatKeyRequest(void)
void OnGotBlobBySeqId(void)
void GetBlobBySeqId(void)
void x_ProcessResolveRequest(void)
void x_InitClient(void) const
static void x_SendError(shared_ptr< CPSGS_Reply > reply, const string &msg)
void x_SendBlobProps(const string &psg_blob_id, CBlobRecord &blob_props)
EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
EOutputFormat x_GetOutputFormat(void)
string GetName(void) const override
Tells the processor name (used in logging and tracing)
void Process(void) override
Main processing function.
void x_ProcessTSEChunkRequest(void)
void x_Finish(EPSGS_Status status)
string GetGroupName(void) const override
Tells the processor group name.
void x_SendBlobData(const string &psg_blob_id, const objects::CID2_Reply_Data &data)
CRef< objects::CSeq_id > m_SeqId
void x_WriteData(objects::CID2_Reply_Data &data, const objects::CAsnBinData &obj, bool compress) const
void OnGotBlobByBlobId(void)
void x_UnlockRequest(void)
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Tells if processor can process the given request.
shared_ptr< CWGSClient > m_Client
void x_RegisterTimingNotFound(EPSGOperation operation)
shared_ptr< ncbi::CThreadPool > m_ThreadPool
void OnResolvedSeqId(void)
shared_ptr< SWGSData > m_WGSData
void GetBlobByBlobId(void)
void x_ProcessBlobBySeqIdRequest(void)
void x_SendSplitInfo(void)
void x_SendForbidden(void)
psg_time_point_t m_Start
~CPSGS_WGSProcessor(void) override
IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
void x_SendBlobForbidden(const string &psg_blob_id)
void Cancel(void) override
The infrastructure request to cancel processing.
void x_SendExcluded(void)
void x_RegisterTiming(psg_time_point_t start, EPSGOperation operation, EPSGOperationStatus status, size_t blob_size)
void x_WaitForOtherProcessors(void)
void x_SendChunkBlobProps(const string &id2_info, TID2ChunkId chunk_id, CBlobRecord &blob_props)
void x_SendMainEntry(void)
void x_SendBioseqInfo(void)
shared_ptr< SWGSProcessor_Config > m_Config
EOutputFormat m_OutputFormat
void x_RegisterTimingFound(psg_time_point_t start, EPSGOperation operation, const objects::CID2_Reply_Data &data)
bool x_IsEnabled(CPSGS_Request &request) const
EPSGS_Status m_Status
static CPubseqGatewayApp * GetInstance(void)
Abstract class for representing single task executing in pool of threads To use this class in applica...
Definition: thread_pool.hpp:76
Main class implementing functionality of pool of threads.
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_GetBlobByBlobId(CPSGS_WGSProcessor &processor)
CWGSThreadPoolTask_GetBlobBySeqId(CPSGS_WGSProcessor &processor)
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_GetChunk(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_ResolveSeqId(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
Writer-based output stream.
Definition: rwstream.hpp:171
CZipStreamCompressor – zlib based compression stream processor.
Definition: zlib.hpp:763
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > GetReply(void) const
Provides the reply wrapper.
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
void PostponeInvoke(CPSGS_UvLoopBinder::TProcessorCB cb, void *user_data)
The provided callback will be called from the libuv loop assigned to the processor.
void SignalFinishProcessing(void)
A processor should call this method when it decides that there is nothing else to be done.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
EPSGS_SeqIdParsingResult ParseInputSeqId(objects::CSeq_id &seq_id, const string &request_seq_id, int request_seq_id_type, string *err_msg=nullptr)
Parse seq-id from a string and type representation.
TProcessorPriority m_Priority
A very basic data-write interface.
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
std::ofstream out("events_result.xml")
main entry point for tests
#define true
Definition: bool.h:35
#define false
Definition: bool.h:36
static const char * str(char *buf, int n)
Definition: stats.c:84
char data[12]
Definition: iconv.c:80
Int8 int64_t
@ eNoOwnership
No ownership is assumed.
Definition: ncbi_types.h:135
string
Definition: cgiapp.hpp:687
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:773
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
Definition: ncbi_param.hpp:149
#define kMax_UInt
Definition: ncbi_limits.h:185
virtual int GetInt(const string &section, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
Definition: ncbireg.cpp:362
ERW_Result
Result codes for I/O operations.
@ eRW_Success
Everything is okay, I/O completed.
static bool EqualNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive equality of a substring with another string.
Definition: ncbistr.hpp:5353
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
EStatus
Status of the task.
Definition: thread_pool.hpp:79
@ eCompleted
executed successfully
Definition: thread_pool.hpp:83
#define DEFINE_STATIC_FAST_MUTEX(id)
Define static fast mutex and initialize it.
Definition: ncbimtx.hpp:496
operation
Bit operations.
Definition: bmconst.h:191
TVersion GetVersion(void) const
Get the Version member data.
bool IsSetVersion(void) const
version of blob, optional in some requests Check if a value has been assigned to Version data member.
@ eID2_Blob_State_dead
@ eID2_Blob_State_suppressed
@ eID2_Blob_State_suppressed_temp
@ eID2_Blob_State_withdrawn
const int64_t kSplitInfoChunk
Definition: id2info.hpp:42
T min(T x_, T y_)
static pcre_uint8 * buffer
Definition: pcretest.c:1051
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
string ToBioseqProtobuf(const CBioseqInfoRecord &bioseq_info)
#define PSG_ERROR(message)
@ ePSGS_BlobExcluded
chrono::steady_clock psg_clock_t
@ ePSGS_ParsedOK
int TProcessorPriority
@ ePSGS_UnknownError
@ ePSGS_BlobRetrievalIsNotAuthorized
psg_clock_t::time_point psg_time_point_t
Reader-writer based streams.
static CNamedPipeClient * client
vector< string > m_ExcludeBlobs
string GetId(void) const
EPSGS_TSEOption m_TSEOption
vector< string > m_DisabledProcessors
vector< string > m_EnabledProcessors
EPSGS_OutputFormat m_OutputFormat
#define _ASSERT
Pool of generic task-executing threads.
EPSGOperationStatus
Definition: timing.hpp:60
@ eOpStatusFound
Definition: timing.hpp:61
@ eOpStatusNotFound
Definition: timing.hpp:62
EPSGOperation
Definition: timing.hpp:65
@ eTseChunkRetrieve
Definition: timing.hpp:101
@ eBlobRetrieve
Definition: timing.hpp:98
#define DEFAULT_INDEX_UPDATE_TIME
USING_SCOPE(objects)
#define DEFAULT_COMPRESS_DATA
#define DEFAULT_FILE_REOPEN_TIME
static bool s_SimulateError()
void s_SetBlobState(CBlobRecord &blob_props, int id2_blob_state)
#define PARAM_FILE_RECHECK_TIME
static void s_OnGotBlobBySeqId(void *data)
NCBI_PARAM_DEF(int, WGS_PROCESSOR, ERROR_RATE, 0)
END_NAMESPACE(wgs)
#define DEFAULT_VDB_CACHE_SIZE
static const string kWGSProcessorSection
#define DEFAULT_FILE_RECHECK_TIME
BEGIN_LOCAL_NAMESPACE
#define PARAM_COMPRESS_DATA
#define PARAM_FILE_REOPEN_TIME
END_LOCAL_NAMESPACE
NCBI_PARAM_DECL(int, WGS_PROCESSOR, ERROR_RATE)
static void s_OnResolvedSeqId(void *data)
static const string kParamMaxConn
END_NCBI_NAMESPACE
static const string kWGSProcessorGroupName
static const string kWGSProcessorName
static const int kDefaultMaxConn
#define PARAM_INDEX_UPDATE_TIME
BEGIN_NCBI_NAMESPACE
static void s_OnGotBlobByBlobId(void *data)
static void s_OnGotChunk(void *data)
BEGIN_NAMESPACE(psg)
#define PARAM_VDB_CACHE_SIZE
void s_SetBlobVersion(CBlobRecord &blob_props, const CID2_Blob_Id &blob_id)
void s_SetBlobDataProps(CBlobRecord &blob_props, const CID2_Reply_Data &data)
const string kWGSProcessorEvent
#define compress
Definition: zconf_cf.h:39
ZLib Compression API.
Modified on Sun Apr 21 03:42:02 2024 by modify_doxy.py rev. 669887