NCBI C++ ToolKit
wgs_processor.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: wgs_processor.cpp 102541 2024-05-28 17:12:13Z grichenk $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Aleksey Grichenko, Eugene Vasilchenko
27  *
28  * File Description: processor for data from WGS
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "wgs_processor.hpp"
35 #include "pubseq_gateway.hpp"
39 #include "id2info.hpp"
40 #include "cass_processor_base.hpp"
41 #include "psgs_thread_pool.hpp"
44 #include <corelib/rwstream.hpp>
45 #include <serial/serial.hpp>
46 #include <serial/objostrasnb.hpp>
47 #include <util/compress/zlib.hpp>
48 #include <util/thread_pool.hpp>
56 
57 
61 
63 
64 static const string kWGSProcessorName = "WGS";
65 static const string kWGSProcessorGroupName = "WGS";
66 static const string kWGSProcessorSection = "WGS_PROCESSOR";
67 
68 static const string kParamMaxConn = "maxconn";
69 static const int kDefaultMaxConn = 64;
70 
71 
72 /////////////////////////////////////////////////////////////////////////////
73 // Helper classes
74 /////////////////////////////////////////////////////////////////////////////
75 
77 
78 class COSSWriter : public IWriter
79 {
80 public:
81  typedef vector<char> TOctetString;
82  typedef list<TOctetString*> TOctetStringSequence;
83 
85  : m_Output(out)
86  {
87  }
88 
89  virtual ERW_Result Write(const void* buffer,
90  size_t count,
91  size_t* written)
92  {
93  const char* data = static_cast<const char*>(buffer);
94  m_Output.push_back(new TOctetString(data, data+count));
95  if ( written ) {
96  *written = count;
97  }
98  return eRW_Success;
99  }
100  virtual ERW_Result Flush(void)
101  {
102  return eRW_Success;
103  }
104 
105 private:
107 };
108 
109 
111 {
112 public:
114 
115  virtual EStatus Execute(void) override
116  {
118  return eCompleted;
119  }
120 
121 private:
123 };
124 
125 
127 {
128 public:
130 
131  virtual EStatus Execute(void) override
132  {
134  return eCompleted;
135  }
136 
137 private:
139 };
140 
141 
143 {
144 public:
146 
147  virtual EStatus Execute(void) override
148  {
150  return eCompleted;
151  }
152 
153 private:
155 };
156 
157 
159 {
160 public:
162 
163  virtual EStatus Execute(void) override
164  {
166  return eCompleted;
167  }
168 
169 private:
171 };
172 
173 
175 
176 
177 NCBI_PARAM_DECL(int, WGS_PROCESSOR, ERROR_RATE);
178 NCBI_PARAM_DEF(int, WGS_PROCESSOR, ERROR_RATE, 0);
179 
180 static bool s_SimulateError()
181 {
182  static int error_rate = NCBI_PARAM_TYPE(WGS_PROCESSOR, ERROR_RATE)::GetDefault();
183  if ( error_rate > 0 ) {
184  static int error_counter = 0;
185  if ( ++error_counter >= error_rate ) {
186  error_counter = 0;
187  return true;
188  }
189  }
190  return false;
191 }
192 
193 
194 static const char kSubSatSeparator = '/';
195 
196 static void s_FormatBlobId(ostream& s, const CID2_Blob_Id& blob_id)
197 {
198  s << blob_id.GetSat()
199  << kSubSatSeparator << blob_id.GetSub_sat()
200  << '.' << blob_id.GetSat_key();
201 }
202 
203 
204 /////////////////////////////////////////////////////////////////////////////
205 // CPSGS_WGSProcessor
206 /////////////////////////////////////////////////////////////////////////////
207 
208 
209 #define PARAM_VDB_CACHE_SIZE "vdb_cache_size"
210 #define PARAM_INDEX_UPDATE_TIME "index_update_time"
211 #define PARAM_FILE_REOPEN_TIME "file_reopen_time"
212 #define PARAM_FILE_RECHECK_TIME "file_recheck_time"
213 #define PARAM_COMPRESS_DATA "compress_data"
214 
215 #define DEFAULT_VDB_CACHE_SIZE 100
216 #define DEFAULT_INDEX_UPDATE_TIME 600
217 #define DEFAULT_FILE_REOPEN_TIME 3600
218 #define DEFAULT_FILE_RECHECK_TIME 600
219 #define DEFAULT_COMPRESS_DATA SWGSProcessor_Config::eCompressData_some
220 
221 
223  : m_Config(new SWGSProcessor_Config),
224  m_Status(ePSGS_NotFound),
225  m_Canceled(false),
226  m_ChunkId(0),
227  m_OutputFormat(SPSGS_ResolveRequest::ePSGS_NativeFormat),
228  m_Unlocked(true)
229 {
230  x_LoadConfig();
231 }
232 
233 
235  const shared_ptr<CWGSClient>& client,
236  shared_ptr<ncbi::CThreadPool> thread_pool,
237  shared_ptr<CPSGS_Request> request,
238  shared_ptr<CPSGS_Reply> reply,
239  TProcessorPriority priority)
240  : m_Client(client),
241  m_Start(psg_clock_t::now()),
242  m_Status(ePSGS_InProgress),
243  m_Canceled(false),
244  m_ChunkId(0),
245  m_OutputFormat(SPSGS_ResolveRequest::ePSGS_NativeFormat),
246  m_Unlocked(true),
247  m_ThreadPool(thread_pool)
248 {
249  m_Request = request;
250  m_Reply = reply;
251  m_Priority = priority;
252 }
253 
254 
256 {
258  x_UnlockRequest();
259 }
260 
261 
263 {
264  const CNcbiRegistry& registry = CPubseqGatewayApp::GetInstance()->GetConfig();
266 
268  if ( compress_data >= SWGSProcessor_Config::eCompressData_never &&
269  compress_data <= SWGSProcessor_Config::eCompressData_always ) {
270  m_Config->m_CompressData = SWGSProcessor_Config::ECompressData(compress_data);
271  }
275 
277  if (max_conn == 0) {
278  max_conn = kDefaultMaxConn;
279  }
282  min(3u, max_conn), max_conn)));
283 }
284 
285 
286 bool CPSGS_WGSProcessor::CanProcess(shared_ptr<CPSGS_Request> request,
287  shared_ptr<CPSGS_Reply> /*reply*/) const
288 {
289  if ( !x_IsEnabled(*request) ) return false;
290  x_InitClient();
291  _ASSERT(m_Client);
292  return m_Client->CanProcessRequest(*request);
293 }
294 
295 
297 CPSGS_WGSProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
298  shared_ptr<CPSGS_Reply> reply,
299  TProcessorPriority priority) const
300 {
301  if ( !x_IsEnabled(*request) ) return nullptr;
302  x_InitClient();
303  _ASSERT(m_Client);
304  if ( !m_Client->CanProcessRequest(*request) ) return nullptr;
305 
306  return new CPSGS_WGSProcessor(m_Client, m_ThreadPool, request, reply, priority);
307 }
308 
309 
311 {
312  return kWGSProcessorName;
313 }
314 
315 
317 {
318  return kWGSProcessorGroupName;
319 }
320 
321 
323 {
324  CRequestContextResetter context_resetter;
325  GetRequest()->SetRequestContext();
326  if ( x_IsCanceled() ) {
327  return;
328  }
329 
330  try {
331  {
332  CFastMutexGuard guard(m_Mutex);
333  m_Unlocked = false;
334  }
336  auto req_type = GetRequest()->GetRequestType();
337  switch (req_type) {
340  break;
343  break;
346  break;
349  break;
350  default:
352  break;
353  }
354  }
355  catch (exception& exc) {
356  x_SendError("Exception when handling a request: ", exc);
358  }
359 }
360 
361 
363 {
364  auto app = CPubseqGatewayApp::GetInstance();
365  bool enabled = app->GetWGSProcessorsEnabled();
366  if ( enabled ) {
367  for (const auto& name : request.GetRequest<SPSGS_RequestBase>().m_DisabledProcessors ) {
368  if ( NStr::EqualNocase(name, kWGSProcessorName) ) return false;
369  }
370  }
371  else {
372  for (const auto& name : request.GetRequest<SPSGS_RequestBase>().m_EnabledProcessors ) {
373  if ( NStr::EqualNocase(name, kWGSProcessorName) ) return true;
374  }
375  }
376  return enabled;
377 }
378 
379 
380 inline
382 {
383  DEFINE_STATIC_FAST_MUTEX(s_ClientMutex);
384  if ( m_Client ) return;
385  CFastMutexGuard guard(s_ClientMutex);
386  if ( !m_Client ) {
387  m_Client = make_shared<CWGSClient>(*m_Config);
388  }
389 }
390 
391 
392 static void s_OnResolvedSeqId(void* data)
393 {
394  static_cast<CPSGS_WGSProcessor*>(data)->OnResolvedSeqId();
395 }
396 
397 
399 {
400  SPSGS_ResolveRequest& resolve_request = GetRequest()->GetRequest<SPSGS_ResolveRequest>();
401  m_OutputFormat = resolve_request.m_OutputFormat;
402  m_SeqId.Reset(new CSeq_id());
403  string err;
404  if (ParseInputSeqId(*m_SeqId, resolve_request.m_SeqId, resolve_request.m_SeqIdType, &err) != ePSGS_ParsedOK) {
405  PSG_ERROR("Error parsing seq-id: " << (err.empty() ? resolve_request.m_SeqId : err));
407  return;
408  }
409  if ( GetRequest()->NeedTrace() ) {
410  GetReply()->SendTrace(
411  kWGSProcessorName + " processor is resolving seq-id " + m_SeqId->AsFastaString(),
412  GetRequest()->GetStartTimestamp());
413  }
414  m_ThreadPool->AddTask(new CWGSThreadPoolTask_ResolveSeqId(*this));
415 }
416 
417 
419 {
420  CRequestContextResetter context_resetter;
421  GetRequest()->SetRequestContext();
422  try {
423  m_WGSData = m_Client->ResolveSeqId(*m_SeqId);
424  if ( GetRequest()->NeedTrace() ) {
425  GetReply()->SendTrace(
426  kWGSProcessorName + " processor finished resolving seq-id " + m_SeqId->AsFastaString() + ", waiting for other processors",
427  GetRequest()->GetStartTimestamp());
428  }
430  }
431  catch (exception& exc) {
432  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
433  m_WGSData.reset();
434  }
436 }
437 
438 
440 {
441  CRequestContextResetter context_resetter;
442  GetRequest()->SetRequestContext();
443  if ( x_IsCanceled() ) {
444  return;
445  }
446  if ( s_SimulateError() ) {
447  m_WGSDataError = "simulated WGS processor error";
448  m_WGSData.reset();
449  }
450  if ( !m_WGSData || !m_WGSData->m_BioseqInfo || m_WGSData->m_GetResult == SWGSData::eResult_NotFound ) {
451  if ( m_WGSDataError.empty() ) {
452  if ( GetRequest()->NeedTrace() ) {
453  GetReply()->SendTrace(
454  kWGSProcessorName + " processor could not find info for seq-id " + m_SeqId->AsFastaString(),
455  GetRequest()->GetStartTimestamp());
456  }
458  }
459  else {
462  }
463  return;
464  }
465  if ( !x_SignalStartProcessing() ) {
466  return;
467  }
468  try {
469  if ( GetRequest()->NeedTrace() ) {
470  GetReply()->SendTrace(
471  kWGSProcessorName + " processor resolved seq-id " + m_SeqId->AsFastaString() + " to blob-id " + m_WGSData->m_BlobId,
472  GetRequest()->GetStartTimestamp());
473  }
475  }
476  catch (exception& exc) {
477  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
480  return;
481  }
483 }
484 
485 
486 static void s_OnGotBlobBySeqId(void* data)
487 {
488  static_cast<CPSGS_WGSProcessor*>(data)->OnGotBlobBySeqId();
489 }
490 
491 
493 {
494  SPSGS_BlobBySeqIdRequest& get_request = GetRequest()->GetRequest<SPSGS_BlobBySeqIdRequest>();
495  m_SeqId.Reset(new CSeq_id());
496  m_ClientId = get_request.m_ClientId;
497  string err;
498  if (ParseInputSeqId(*m_SeqId, get_request.m_SeqId, get_request.m_SeqIdType, &err) != ePSGS_ParsedOK) {
499  PSG_ERROR("Error parsing seq-id: " << (err.empty() ? get_request.m_SeqId : err));
501  return;
502  }
503 
505  if ( GetRequest()->NeedTrace() ) {
506  GetReply()->SendTrace(
507  kWGSProcessorName + " processor is getting info for seq-id " + m_SeqId->AsFastaString(),
508  GetRequest()->GetStartTimestamp());
509  }
510  m_ThreadPool->AddTask(new CWGSThreadPoolTask_ResolveSeqId(*this));
511  }
512  else {
513  if ( GetRequest()->NeedTrace() ) {
514  GetReply()->SendTrace(
515  kWGSProcessorName + " processor is getting blob for seq-id " + m_SeqId->AsFastaString(),
516  GetRequest()->GetStartTimestamp());
517  }
518  m_ExcludedBlobs = get_request.m_ExcludeBlobs;
520  m_ThreadPool->AddTask(new CWGSThreadPoolTask_GetBlobBySeqId(*this));
521  }
522 }
523 
524 
526 {
527  CRequestContextResetter context_resetter;
528  GetRequest()->SetRequestContext();
529  try {
531  m_WGSData = m_Client->GetSeqInfoBySeqId(*m_SeqId, seq, m_ExcludedBlobs);
532  if ( m_WGSData ) {
533  if (x_CheckExcludedCache() && m_WGSData->m_GetResult == SWGSData::eResult_Found) {
534  m_Client->GetWGSData(m_WGSData, seq);
535  }
536  }
537 
538  if ( GetRequest()->NeedTrace() ) {
539  GetReply()->SendTrace(
540  kWGSProcessorName + " processor finished getting blob for seq-id " + m_SeqId->AsFastaString() + ", waiting for other processors",
541  GetRequest()->GetStartTimestamp());
542  }
544  }
545  catch (exception& exc) {
546  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
547  m_WGSData.reset();
548  }
550 }
551 
552 
554 {
555  CRequestContextResetter context_resetter;
556  GetRequest()->SetRequestContext();
557  if ( x_IsCanceled() ) {
558  return;
559  }
560  if ( s_SimulateError() ) {
561  m_WGSDataError = "simulated WGS processor error";
562  m_WGSData.reset();
563  }
564  // NOTE: m_Data may be null if the blob was excluded/skipped.
565  if ( !m_WGSData || !m_WGSData->m_BioseqInfo || m_WGSData->m_GetResult == SWGSData::eResult_NotFound ) {
566  if ( m_WGSDataError.empty() ) {
567  if ( GetRequest()->NeedTrace() ) {
568  GetReply()->SendTrace(
569  kWGSProcessorName + " processor could not find info for seq-id " + m_SeqId->AsFastaString(),
570  GetRequest()->GetStartTimestamp());
571  }
574  }
575  else {
578  }
579  return;
580  }
581  if ( !x_SignalStartProcessing() ) {
582  return;
583  }
584  try {
585  if ( GetRequest()->NeedTrace() ) {
586  GetReply()->SendTrace(
587  kWGSProcessorName + " processor resolved seq-id " + m_SeqId->AsFastaString() + " to blob-id " + m_WGSData->m_BlobId,
588  GetRequest()->GetStartTimestamp());
589  }
591  x_SendBlob();
592  }
593  catch (exception& exc) {
594  x_SendError("Exception when handling a request: ", exc);
596  return;
597  }
599 }
600 
601 
602 static void s_OnGotBlobByBlobId(void* data)
603 {
604  static_cast<CPSGS_WGSProcessor*>(data)->OnGotBlobByBlobId();
605 }
606 
607 
609 {
611  m_PSGBlobId = blob_request.m_BlobId.GetId();
612  m_ClientId = blob_request.m_ClientId;
613  if ( GetRequest()->NeedTrace() ) {
614  GetReply()->SendTrace(
615  kWGSProcessorName + " processor is fetching blob " + m_PSGBlobId,
616  GetRequest()->GetStartTimestamp());
617  }
619 }
620 
621 
623 {
624  CRequestContextResetter context_resetter;
625  GetRequest()->SetRequestContext();
626  try {
627  m_WGSData = m_Client->GetBlobByBlobId(m_PSGBlobId);
628  if ( GetRequest()->NeedTrace() ) {
629  GetReply()->SendTrace(
630  kWGSProcessorName + " processor finished fetching blob " + m_PSGBlobId,
631  GetRequest()->GetStartTimestamp());
632  }
633  }
634  catch (exception& exc) {
635  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
636  m_WGSData.reset();
637  }
639 }
640 
641 
643 {
644  CRequestContextResetter context_resetter;
645  GetRequest()->SetRequestContext();
646  if ( x_IsCanceled() ) {
647  return;
648  }
649  if ( s_SimulateError() ) {
650  m_WGSDataError = "simulated WGS processor error";
651  m_WGSData.reset();
652  }
653  if ( !m_WGSData || m_WGSData->m_GetResult == SWGSData::eResult_NotFound ) {
654  if ( m_WGSDataError.empty() ) {
655  if ( GetRequest()->NeedTrace() ) {
656  GetReply()->SendTrace(
657  kWGSProcessorName + " processor could not find blob " + m_PSGBlobId,
658  GetRequest()->GetStartTimestamp());
659  }
662  }
663  else {
666  }
667  return;
668  }
669  if ( !x_SignalStartProcessing() ) {
670  return;
671  }
672  try {
673  if ( GetRequest()->NeedTrace() ) {
674  GetReply()->SendTrace(
675  kWGSProcessorName + " processor retrieved blob " + m_PSGBlobId,
676  GetRequest()->GetStartTimestamp());
677  }
678  x_SendBlob();
679  }
680  catch (exception& exc) {
681  x_SendError("Exception when handling a request: ", exc);
683  return;
684  }
686 }
687 
688 
689 static void s_OnGotChunk(void* data)
690 {
691  static_cast<CPSGS_WGSProcessor*>(data)->OnGotChunk();
692 }
693 
694 
696 {
697  SPSGS_TSEChunkRequest& chunk_request = GetRequest()->GetRequest<SPSGS_TSEChunkRequest>();
698  m_Id2Info = chunk_request.m_Id2Info;
699  m_ChunkId = chunk_request.m_Id2Chunk;
700  if ( GetRequest()->NeedTrace() ) {
701  GetReply()->SendTrace(
702  kWGSProcessorName + " processor is fetching chunk " + m_Id2Info + "." + NStr::NumericToString(m_ChunkId),
703  GetRequest()->GetStartTimestamp());
704  }
705  m_ThreadPool->AddTask(new CWGSThreadPoolTask_GetChunk(*this));
706 }
707 
708 
710 {
711  CRequestContextResetter context_resetter;
712  GetRequest()->SetRequestContext();
713  try {
714  m_WGSData = m_Client->GetChunk(m_Id2Info, m_ChunkId);
715  if ( GetRequest()->NeedTrace() ) {
716  GetReply()->SendTrace(
717  kWGSProcessorName + " processor finished fetching chunk " + m_Id2Info + "." + NStr::NumericToString(m_ChunkId),
718  GetRequest()->GetStartTimestamp());
719  }
720  }
721  catch (exception& exc) {
722  m_WGSDataError = "Exception when handling a request: "+string(exc.what());
723  m_WGSData.reset();
724  }
726 }
727 
728 
730 {
731  CRequestContextResetter context_resetter;
732  GetRequest()->SetRequestContext();
733  if ( x_IsCanceled() ) {
734  return;
735  }
736  if ( s_SimulateError() ) {
737  m_WGSDataError = "simulated WGS processor error";
738  m_WGSData.reset();
739  }
740  if ( !m_WGSData || m_WGSData->m_GetResult == SWGSData::eResult_NotFound ) {
741  if ( m_WGSDataError.empty() ) {
742  if ( GetRequest()->NeedTrace() ) {
743  GetReply()->SendTrace(
744  kWGSProcessorName + " processor could not find chunk " + m_Id2Info + "." + NStr::NumericToString(m_ChunkId),
745  GetRequest()->GetStartTimestamp());
746  }
749  }
750  else {
753  }
754  return;
755  }
756  if ( !x_SignalStartProcessing() ) {
757  return;
758  }
759  try {
760  if ( m_WGSData->IsForbidden() ) {
761  if ( GetRequest()->NeedTrace() ) {
762  GetReply()->SendTrace(
763  kWGSProcessorName + " processor can not send forbidden chunk " + m_Id2Info + "." + NStr::NumericToString(m_ChunkId),
764  GetRequest()->GetStartTimestamp());
765  }
766  x_SendForbidden();
767  }
768  else {
769  if ( GetRequest()->NeedTrace() ) {
770  GetReply()->SendTrace(
771  kWGSProcessorName + " processor retrieved chunk " + m_Id2Info + "." + NStr::NumericToString(m_ChunkId),
772  GetRequest()->GetStartTimestamp());
773  }
774  x_SendChunk();
775  }
776  }
777  catch (exception& exc) {
778  x_SendError("Exception when handling a request: ", exc);
780  return;
781  }
783 }
784 
785 
787 {
791  }
792  return m_OutputFormat;
793 }
794 
795 
796 void s_SetBlobVersion(CBlobRecord& blob_props, const CID2_Blob_Id& blob_id)
797 {
798  if ( blob_id.IsSetVersion() ) {
799  blob_props.SetModified(int64_t(blob_id.GetVersion())*60000);
800  }
801 }
802 
803 
804 void s_SetBlobState(CBlobRecord& blob_props, int id2_blob_state)
805 {
806  if ( id2_blob_state & (1 << eID2_Blob_State_withdrawn) ) {
807  blob_props.SetWithdrawn(true);
808  }
809  if ( id2_blob_state & ((1 << eID2_Blob_State_suppressed) |
811  blob_props.SetSuppress(true);
812  }
813  if ( id2_blob_state & (1 << eID2_Blob_State_dead) ) {
814  blob_props.SetDead(true);
815  }
816 }
817 
818 
820 {
821  if ( data.GetData_compression() == data.eData_compression_gzip ) {
822  blob_props.SetGzip(true);
823  }
824  blob_props.SetNChunks(data.GetData().size());
825 }
826 
827 
830  EPSGOperationStatus status,
831  size_t blob_size)
832 {
834  GetTiming().Register(this, operation, status, start, blob_size);
835 }
836 
837 
840  const CID2_Reply_Data& data)
841 {
842  size_t blob_size = 0;
843  for ( auto& chunk : data.GetData() ) {
844  blob_size += chunk->size();
845  }
846  x_RegisterTiming(start, operation, eOpStatusFound, blob_size);
847 }
848 
849 
851 {
853 }
854 
855 
856 void CPSGS_WGSProcessor::x_SendResult(const string& data_to_send, EOutputFormat output_format)
857 {
858  size_t item_id = GetReply()->GetItemId();
859  GetReply()->PrepareBioseqData(item_id, GetName(), data_to_send, output_format);
860  GetReply()->PrepareBioseqCompletion(item_id, GetName(), 2);
861 }
862 
863 
865 {
866  EOutputFormat output_format = x_GetOutputFormat();
867  string data_to_send;
868  if ( output_format == SPSGS_ResolveRequest::ePSGS_JsonFormat ) {
869  data_to_send = ToJsonString(
870  *m_WGSData->m_BioseqInfo,
871  m_WGSData->m_BioseqInfoFlags,
872  m_WGSData->m_BlobId);
873  } else {
874  data_to_send = ToBioseqProtobuf(*m_WGSData->m_BioseqInfo);
875  }
876 
877  x_SendResult(data_to_send, output_format);
878 }
879 
880 
881 void CPSGS_WGSProcessor::x_SendBlobProps(const string& psg_blob_id, CBlobRecord& blob_props)
882 {
883  auto& reply = *GetReply();
884  size_t item_id = reply.GetItemId();
885  string data_to_send = ToJsonString(blob_props);
886  reply.PrepareBlobPropData(item_id, GetName(), psg_blob_id, data_to_send);
887  reply.PrepareBlobPropCompletion(item_id, GetName(), 2);
888 }
889 
890 
891 void CPSGS_WGSProcessor::x_SendBlobForbidden(const string& psg_blob_id)
892 {
893  auto& reply = *GetReply();
894  size_t item_id = reply.GetItemId();
895  reply.PrepareBlobMessage(item_id, GetName(),
896  psg_blob_id,
897  "Blob retrieval is not authorized",
900  eDiag_Error);
901  reply.PrepareBlobCompletion(item_id, GetName(), 2);
902 }
903 
904 
905 void CPSGS_WGSProcessor::x_SendBlobData(const string& psg_blob_id, const CID2_Reply_Data& data)
906 {
907  size_t item_id = GetReply()->GetItemId();
908  int chunk_no = 0;
909  for ( auto& chunk : data.GetData() ) {
910  GetReply()->PrepareBlobData(item_id, GetName(), psg_blob_id,
911  (const unsigned char*)chunk->data(), chunk->size(), chunk_no++);
912  }
913  GetReply()->PrepareBlobCompletion(item_id, GetName(), chunk_no + 1);
914 }
915 
916 
918  const string& id2_info,
919  TID2ChunkId chunk_id,
920  CBlobRecord& blob_props)
921 {
922  size_t item_id = GetReply()->GetItemId();
923  string data_to_send = ToJsonString(blob_props);
924  GetReply()->PrepareTSEBlobPropData(item_id, GetName(), chunk_id, id2_info, data_to_send);
925  GetReply()->PrepareBlobPropCompletion(item_id, GetName(), 2);
926 }
927 
928 
930  const string& id2_info,
931  TID2ChunkId chunk_id,
932  const CID2_Reply_Data& data)
933 {
934  size_t item_id = GetReply()->GetItemId();
935  int chunk_no = 0;
936  for ( auto& chunk : data.GetData() ) {
937  GetReply()->PrepareTSEBlobData(item_id, GetName(),
938  (const unsigned char*)chunk->data(), chunk->size(), chunk_no++,
939  chunk_id, id2_info);
940  }
941  GetReply()->PrepareTSEBlobCompletion(item_id, GetName(), chunk_no+1);
942 }
943 
944 
946 {
947  if (!m_WGSData->m_Id2BlobId) return;
948  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
949  string blob_id = m_WGSData->m_BlobId;
950  auto split_version = m_WGSData->m_SplitVersion;
951 
952  string id2_info = GetPSGId2Info(id2_blob_id, split_version);
953 
954  CBlobRecord main_blob_props;
955  s_SetBlobVersion(main_blob_props, id2_blob_id);
956  s_SetBlobState(main_blob_props, m_WGSData->GetID2BlobState());
957  main_blob_props.SetId2Info(id2_info);
958  x_SendBlobProps(blob_id, main_blob_props);
959 
960  CBlobRecord split_info_blob_props;
962  x_WriteData(data, *m_WGSData->m_Data, m_WGSData->m_Compress);
964  s_SetBlobDataProps(split_info_blob_props, data);
965  x_SendChunkBlobProps(id2_info, kSplitInfoChunk, split_info_blob_props);
967 }
968 
969 
971 {
972  _ASSERT(m_WGSData->m_Id2BlobId);
973  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
974  string main_blob_id = m_WGSData->m_BlobId;
975 
977  x_WriteData(data, *m_WGSData->m_Data, m_WGSData->m_Compress);
979 
980  CBlobRecord main_blob_props;
981  s_SetBlobVersion(main_blob_props, id2_blob_id);
982  s_SetBlobState(main_blob_props, m_WGSData->GetID2BlobState());
983  s_SetBlobDataProps(main_blob_props, data);
984  x_SendBlobProps(main_blob_id, main_blob_props);
985  x_SendBlobData(main_blob_id, data);
986 }
987 
988 
990 {
991  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
992  const string& psg_blob_id = m_WGSData->m_BlobId;
993 
994  CBlobRecord blob_props;
995  s_SetBlobVersion(blob_props, id2_blob_id);
996  s_SetBlobState(blob_props, m_WGSData->GetID2BlobState());
997  x_SendBlobProps(psg_blob_id, blob_props);
998  x_SendBlobForbidden(psg_blob_id);
999 }
1000 
1001 
1003 {
1004  if ( m_WGSData->IsForbidden() ) {
1005  x_SendForbidden();
1006  return;
1007  }
1008  switch (m_WGSData->m_GetResult) {
1010  GetReply()->PrepareBlobExcluded(m_WGSData->m_BlobId, GetName(), ePSGS_BlobExcluded);
1011  return;
1013  GetReply()->PrepareBlobExcluded(m_WGSData->m_BlobId, GetName(), ePSGS_BlobInProgress);
1014  return;
1016  GetReply()->PrepareBlobExcluded(m_WGSData->m_BlobId, GetName(),
1018  return;
1019  default:
1020  break;
1021  }
1022  if ( m_WGSData->m_Data->GetMainObject().GetThisTypeInfo() == CID2S_Split_Info::GetTypeInfo() ) {
1023  // split info
1024  x_SendSplitInfo();
1025  }
1026  else {
1027  x_SendMainEntry();
1028  }
1030 }
1031 
1032 
1034 {
1035  _ASSERT(m_WGSData && m_WGSData->m_Data);
1036  CID2_Blob_Id& id2_blob_id = *m_WGSData->m_Id2BlobId;
1037  auto split_version = m_WGSData->m_SplitVersion;
1038  string id2_info = GetPSGId2Info(id2_blob_id, split_version);
1039 
1041  x_WriteData(data, *m_WGSData->m_Data, m_WGSData->m_Compress);
1043 
1044  CBlobRecord chunk_blob_props;
1045  s_SetBlobDataProps(chunk_blob_props, data);
1046  x_SendChunkBlobProps(id2_info, m_ChunkId, chunk_blob_props);
1047  x_SendChunkBlobData(id2_info, m_ChunkId, data);
1048 }
1049 
1050 
1052  const CAsnBinData& obj,
1053  bool compress) const
1054 {
1056  COSSWriter writer(data.SetData());
1057  CWStream writer_stream(&writer);
1059  if ( compress ) {
1060  data.SetData_compression(CID2_Reply_Data::eData_compression_gzip);
1061  str.reset(new CCompressionOStream(writer_stream,
1064  }
1065  else {
1066  data.SetData_compression(CID2_Reply_Data::eData_compression_none);
1067  str.reset(&writer_stream, eNoOwnership);
1068  }
1069  CObjectOStreamAsnBinary objstr(*str);
1070  obj.Serialize(objstr);
1071 }
1072 
1073 
1075 {
1076  {
1077  CFastMutexGuard guard(m_Mutex);
1078  if (m_Unlocked) return;
1079  m_Unlocked = true;
1080  }
1081  if (m_Request) m_Request->Unlock(kWGSProcessorEvent);
1082 }
1083 
1084 
1086 {
1087  if (m_Canceled) return;
1089 }
1090 
1091 
1093 {
1094  m_Canceled = true;
1095  if (!IsUVThreadAssigned()) {
1098  }
1099  else {
1100  x_UnlockRequest();
1101  }
1102 }
1103 
1104 
1106 {
1107  return m_Status;
1108 }
1109 
1110 
1112 {
1113  if ( m_Canceled ) {
1115  return true;
1116  }
1117  return false;
1118 }
1119 
1120 
1122 {
1123  if ( SignalStartProcessing() == ePSGS_Cancel ) {
1125  return false;
1126  }
1127  return true;
1128 }
1129 
1130 
1132 {
1133  _ASSERT(status != ePSGS_InProgress);
1134  if (m_AddedToExcludedCache) {
1135  // The blob was added to the cache but never completed and needs to be removed.
1137  }
1138  m_Status = status;
1139  x_UnlockRequest();
1141 }
1142 
1143 
1144 void CPSGS_WGSProcessor::x_SendError(shared_ptr<CPSGS_Reply> reply,
1145  const string& msg)
1146 {
1147  reply->PrepareProcessorMessage(reply->GetItemId(), "WGS", msg,
1150  eDiag_Error);
1151 }
1152 
1153 
1155 {
1157 }
1158 
1159 
1160 void CPSGS_WGSProcessor::x_SendError(shared_ptr<CPSGS_Reply> reply,
1161  const string& msg, const exception& exc)
1162 {
1163  x_SendError(reply, msg+string(exc.what()));
1164 }
1165 
1166 
1167 void CPSGS_WGSProcessor::x_SendError(const string& msg, const exception& exc)
1168 {
1169  x_SendError(m_Reply, msg+string(exc.what()));
1170 }
1171 
1172 
1174  CWGSClient::TID2SplitVersion split_version)
1175 {
1176  ostringstream s;
1177  if ( CWGSClient::IsOSGBlob(tse_id) ) {
1178  s_FormatBlobId(s, tse_id);
1179  CWGSClient::TID2BlobVersion blob_version = tse_id.IsSetVersion()? tse_id.GetVersion(): 0;
1180  s << '.' << blob_version << '.' << split_version;
1181  }
1182  return s.str();
1183 }
1184 
1185 
1187 {
1188  if (m_ClientId.empty() || m_ResendTimeoutMks == 0 ||
1189  !m_WGSData || m_WGSData->m_BlobId.empty())
1190  return true; // proceed sending data if any
1191 
1192  bool completed;
1193  psg_time_point_t completed_time;
1194  auto* app = CPubseqGatewayApp::GetInstance();
1195  auto cache_result = app->GetExcludeBlobCache()->AddBlobId(
1196  m_ClientId, SExcludeBlobId(m_WGSData->m_BlobId),
1197  completed, completed_time);
1198  if (cache_result == ePSGS_Added) {
1199  m_AddedToExcludedCache = true;
1200  return true; // blob-id was not in the cache, ok to send data
1201  }
1202  // Blob is already in the cache - check the progress and resend timeout.
1203  if (!completed) {
1204  m_WGSData->m_GetResult = SWGSData::eResult_InProgress;
1205  return false;
1206  }
1207  m_SentMksAgo = GetTimespanToNowMks(completed_time);
1209  m_WGSData->m_GetResult = SWGSData::eResult_Sent;
1210  return false;
1211  }
1212  return true;
1213 }
1214 
1215 
1217 {
1218  if (!m_AddedToExcludedCache || m_ClientId.empty() ||
1219  !m_WGSData || m_WGSData->m_BlobId.empty())
1220  return;
1221  auto* app = CPubseqGatewayApp::GetInstance();
1222  app->GetExcludeBlobCache()->Remove(m_ClientId, SExcludeBlobId(m_WGSData->m_BlobId));
1223  m_AddedToExcludedCache = false;
1224 }
1225 
1226 
1228 {
1229  if (!m_AddedToExcludedCache || m_ClientId.empty() ||
1230  !m_WGSData || m_WGSData->m_BlobId.empty())
1231  return;
1232  auto* app = CPubseqGatewayApp::GetInstance();
1233  app->GetExcludeBlobCache()->SetCompleted(m_ClientId, SExcludeBlobId(m_WGSData->m_BlobId), true);
1234  m_AddedToExcludedCache = false;
1235 }
1236 
1237 
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
const string kCassandraProcessorEvent
virtual void Serialize(CObjectOStreamAsnBinary &out) const
Definition: wgsread.cpp:325
CBlobRecord & SetNChunks(int32_t value)
CBlobRecord & SetId2Info(string const &value)
CBlobRecord & SetSuppress(bool value)
CBlobRecord & SetGzip(bool value)
CBlobRecord & SetWithdrawn(bool value)
CBlobRecord & SetDead(bool value)
CBlobRecord & SetModified(TTimestamp value)
Definition: blob_record.cpp:70
CID2_Blob_Id –.
Definition: ID2_Blob_Id.hpp:66
CNcbiRegistry –.
Definition: ncbireg.hpp:913
list< TOctetString * > TOctetStringSequence
virtual ERW_Result Flush(void)
Flush pending data (if any) down to the output device.
COSSWriter(TOctetStringSequence &out)
TOctetStringSequence & m_Output
vector< char > TOctetString
virtual ERW_Result Write(const void *buffer, size_t count, size_t *written)
Write up to "count" bytes from the buffer pointed to by the "buf" argument onto the output device.
CObjectOStreamAsnBinary –.
Definition: objostrasnb.hpp:58
@ ePSGS_BlobBySatSatKeyRequest
TRequest & GetRequest(void)
void x_SendResult(const string &data_to_send, EOutputFormat output_format)
void x_SendChunkBlobData(const string &id2_info, TID2ChunkId chunk_id, const objects::CID2_Reply_Data &data)
void x_ProcessBlobBySatSatKeyRequest(void)
void OnGotBlobBySeqId(void)
void GetBlobBySeqId(void)
void x_ProcessResolveRequest(void)
void x_InitClient(void) const
static void x_SendError(shared_ptr< CPSGS_Reply > reply, const string &msg)
void x_SendBlobProps(const string &psg_blob_id, CBlobRecord &blob_props)
EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
EOutputFormat x_GetOutputFormat(void)
string GetName(void) const override
Tells the processor name (used in logging and tracing)
void Process(void) override
Main processing function.
bool x_CheckExcludedCache(void)
void x_ProcessTSEChunkRequest(void)
void x_Finish(EPSGS_Status status)
string GetGroupName(void) const override
Tells the processor group name.
void x_SendBlobData(const string &psg_blob_id, const objects::CID2_Reply_Data &data)
CRef< objects::CSeq_id > m_SeqId
unsigned long m_ResendTimeoutMks
void x_WriteData(objects::CID2_Reply_Data &data, const objects::CAsnBinData &obj, bool compress) const
void OnGotBlobByBlobId(void)
void x_UnlockRequest(void)
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Tells if processor can process the given request.
shared_ptr< CWGSClient > m_Client
void x_RegisterTimingNotFound(EPSGOperation operation)
shared_ptr< ncbi::CThreadPool > m_ThreadPool
void OnResolvedSeqId(void)
static string GetPSGId2Info(const CID2_Blob_Id &tse_id, CWGSClient::TID2SplitVersion split_version)
shared_ptr< SWGSData > m_WGSData
void GetBlobByBlobId(void)
void x_ProcessBlobBySeqIdRequest(void)
void x_SendSplitInfo(void)
unsigned long m_SentMksAgo
void x_SendForbidden(void)
psg_time_point_t m_Start
~CPSGS_WGSProcessor(void) override
IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
void x_SendBlobForbidden(const string &psg_blob_id)
void Cancel(void) override
The infrastructure request to cancel processing.
void x_SetExcludedCacheCompleted(void)
void x_RegisterTiming(psg_time_point_t start, EPSGOperation operation, EPSGOperationStatus status, size_t blob_size)
void x_WaitForOtherProcessors(void)
void x_RemoveFromExcludedCache(void)
void x_SendChunkBlobProps(const string &id2_info, TID2ChunkId chunk_id, CBlobRecord &blob_props)
void x_SendMainEntry(void)
void x_SendBioseqInfo(void)
shared_ptr< SWGSProcessor_Config > m_Config
EOutputFormat m_OutputFormat
void x_RegisterTimingFound(psg_time_point_t start, EPSGOperation operation, const objects::CID2_Reply_Data &data)
bool x_IsEnabled(CPSGS_Request &request) const
EPSGS_Status m_Status
static CPubseqGatewayApp * GetInstance(void)
Abstract class for representing single task executing in pool of threads To use this class in applica...
Definition: thread_pool.hpp:76
Main class implementing functionality of pool of threads.
int TID2BlobVersion
Definition: wgs_client.hpp:193
static bool IsOSGBlob(const CID2_Blob_Id &blob_id)
int TID2SplitVersion
Definition: wgs_client.hpp:192
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_GetBlobByBlobId(CPSGS_WGSProcessor &processor)
CWGSThreadPoolTask_GetBlobBySeqId(CPSGS_WGSProcessor &processor)
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_GetChunk(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
CWGSThreadPoolTask_ResolveSeqId(CPSGS_WGSProcessor &processor)
CPSGS_WGSProcessor & m_Processor
virtual EStatus Execute(void) override
Do the actual job.
Writer-based output stream.
Definition: rwstream.hpp:171
CZipStreamCompressor – zlib based compression stream processor.
Definition: zlib.hpp:763
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > GetReply(void) const
Provides the reply wrapper.
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_Status
The GetStatus() method returns a processor current status.
void PostponeInvoke(CPSGS_UvLoopBinder::TProcessorCB cb, void *user_data)
The provided callback will be called from the libuv loop assigned to the processor.
void SignalFinishProcessing(void)
A processor should call this method when it decides that there is nothing else to be done.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
EPSGS_SeqIdParsingResult ParseInputSeqId(objects::CSeq_id &seq_id, const string &request_seq_id, int request_seq_id_type, string *err_msg=nullptr)
Parse seq-id from a string and type representation.
TProcessorPriority m_Priority
A very basic data-write interface.
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
std::ofstream out("events_result.xml")
main entry point for tests
@ ePSGS_Added
#define true
Definition: bool.h:35
#define false
Definition: bool.h:36
static const char * str(char *buf, int n)
Definition: stats.c:84
char data[12]
Definition: iconv.c:80
Int8 int64_t
@ eNoOwnership
No ownership is assumed.
Definition: ncbi_types.h:135
string
Definition: cgiapp.hpp:690
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:773
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
Definition: ncbi_param.hpp:149
#define kMax_UInt
Definition: ncbi_limits.h:185
virtual int GetInt(const string &section, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
Definition: ncbireg.cpp:362
ERW_Result
Result codes for I/O operations.
@ eRW_Success
Everything is okay, I/O completed.
static bool EqualNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive equality of a substring with another string.
Definition: ncbistr.hpp:5347
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
EStatus
Status of the task.
Definition: thread_pool.hpp:79
@ eCompleted
executed successfully
Definition: thread_pool.hpp:83
#define DEFINE_STATIC_FAST_MUTEX(id)
Define static fast mutex and initialize it.
Definition: ncbimtx.hpp:496
operation
Bit operations.
Definition: bmconst.h:191
TVersion GetVersion(void) const
Get the Version member data.
TSat_key GetSat_key(void) const
Get the Sat_key member data.
bool IsSetVersion(void) const
version of blob, optional in some requests Check if a value has been assigned to Version data member.
TSub_sat GetSub_sat(void) const
Get the Sub_sat member data.
TSat GetSat(void) const
Get the Sat member data.
@ eID2_Blob_State_dead
@ eID2_Blob_State_suppressed
@ eID2_Blob_State_suppressed_temp
@ eID2_Blob_State_withdrawn
const int64_t kSplitInfoChunk
Definition: id2info.hpp:42
T min(T x_, T y_)
#define count
static uint8_t * buffer
Definition: pcre2test.c:1016
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
string ToBioseqProtobuf(const CBioseqInfoRecord &bioseq_info)
#define PSG_ERROR(message)
@ ePSGS_BlobExcluded
@ ePSGS_BlobInProgress
chrono::steady_clock psg_clock_t
@ ePSGS_ParsedOK
int TProcessorPriority
@ ePSGS_UnknownError
@ ePSGS_BlobRetrievalIsNotAuthorized
psg_clock_t::time_point psg_time_point_t
unsigned long GetTimespanToNowMks(const psg_time_point_t &t_point)
Reader-writer based streams.
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static CNamedPipeClient * client
vector< string > m_ExcludeBlobs
unsigned long m_ResendTimeoutMks
string GetId(void) const
EPSGS_TSEOption m_TSEOption
vector< string > m_DisabledProcessors
vector< string > m_EnabledProcessors
EPSGS_OutputFormat m_OutputFormat
@ eResult_Excluded
Definition: wgs_client.hpp:91
@ eResult_Found
Definition: wgs_client.hpp:89
@ eResult_Sent
Definition: wgs_client.hpp:93
@ eResult_InProgress
Definition: wgs_client.hpp:92
@ eResult_NotFound
Definition: wgs_client.hpp:90
#define _ASSERT
Pool of generic task-executing threads.
EPSGOperationStatus
Definition: timing.hpp:60
@ eOpStatusFound
Definition: timing.hpp:61
@ eOpStatusNotFound
Definition: timing.hpp:62
EPSGOperation
Definition: timing.hpp:65
@ eTseChunkRetrieve
Definition: timing.hpp:101
@ eBlobRetrieve
Definition: timing.hpp:98
#define DEFAULT_INDEX_UPDATE_TIME
USING_SCOPE(objects)
#define DEFAULT_COMPRESS_DATA
#define DEFAULT_FILE_REOPEN_TIME
static bool s_SimulateError()
void s_SetBlobState(CBlobRecord &blob_props, int id2_blob_state)
#define PARAM_FILE_RECHECK_TIME
static void s_OnGotBlobBySeqId(void *data)
NCBI_PARAM_DEF(int, WGS_PROCESSOR, ERROR_RATE, 0)
END_NAMESPACE(wgs)
#define DEFAULT_VDB_CACHE_SIZE
static const string kWGSProcessorSection
#define DEFAULT_FILE_RECHECK_TIME
BEGIN_LOCAL_NAMESPACE
#define PARAM_COMPRESS_DATA
#define PARAM_FILE_REOPEN_TIME
END_LOCAL_NAMESPACE
NCBI_PARAM_DECL(int, WGS_PROCESSOR, ERROR_RATE)
static void s_OnResolvedSeqId(void *data)
static const string kParamMaxConn
END_NCBI_NAMESPACE
static const string kWGSProcessorGroupName
static const string kWGSProcessorName
static const int kDefaultMaxConn
#define PARAM_INDEX_UPDATE_TIME
static const char kSubSatSeparator
BEGIN_NCBI_NAMESPACE
static void s_OnGotBlobByBlobId(void *data)
static void s_OnGotChunk(void *data)
BEGIN_NAMESPACE(psg)
#define PARAM_VDB_CACHE_SIZE
void s_SetBlobVersion(CBlobRecord &blob_props, const CID2_Blob_Id &blob_id)
void s_SetBlobDataProps(CBlobRecord &blob_props, const CID2_Reply_Data &data)
static void s_FormatBlobId(ostream &s, const CID2_Blob_Id &blob_id)
const string kWGSProcessorEvent
#define compress
Definition: zconf_cf.h:39
ZLib Compression API.
Modified on Fri Sep 20 14:57:58 2024 by modify_doxy.py rev. 669887