NCBI C++ ToolKit
cdd_processor.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: cdd_processor.cpp 101577 2024-01-08 11:04:03Z grichenk $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Aleksey Grichenko
27  *
28  * File Description: processor for data from CDD
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "cdd_processor.hpp"
35 #include "pubseq_gateway.hpp"
38 #include "psgs_thread_pool.hpp"
49 #include <util/thread_pool.hpp>
50 
51 
55 
57 
58 static const string kCDDAnnotName = "CDD";
59 static const string kCDDProcessorGroupName = "CDD";
60 static const string kCDDProcessorName = "CDD";
61 static const string kCDDProcessorSection = "CDD_PROCESSOR";
63 
64 static const string kParamMaxConn = "maxconn";
65 static const int kDefaultMaxConn = 64;
66 
67 static const string kParamCDDBackendTimeout = "backend_timeout";
68 static const double kDefaultCDDBackendTimeout = 5.0;
69 
70 
71 /////////////////////////////////////////////////////////////////////////////
72 // Helper classes
73 /////////////////////////////////////////////////////////////////////////////
74 
76 
78 {
79 public:
81 
82  virtual EStatus Execute(void) override
83  {
85  return eCompleted;
86  }
87 
88 private:
90 };
91 
92 
94 {
95 public:
97 
98  virtual EStatus Execute(void) override
99  {
101  return eCompleted;
102  }
103 
104 private:
106 };
107 
108 
110 {
111 public:
113 
114  virtual EStatus Execute(void) override
115  {
117  return eCompleted;
118  }
119 
120 private:
122 };
123 
124 
126 
127 
128 NCBI_PARAM_DECL(int, CDD_PROCESSOR, ERROR_RATE);
129 NCBI_PARAM_DEF(int, CDD_PROCESSOR, ERROR_RATE, 0);
130 
131 static bool s_SimulateError()
132 {
133  static int error_rate = NCBI_PARAM_TYPE(CDD_PROCESSOR, ERROR_RATE)::GetDefault();
134  if ( error_rate > 0 ) {
135  static int error_counter = 0;
136  if ( ++error_counter >= error_rate ) {
137  error_counter = 0;
138  return true;
139  }
140  }
141  return false;
142 }
143 
144 
146  : m_ClientPool(new CCDDClientPool()),
147  m_Status(ePSGS_NotFound),
148  m_Canceled(false),
149  m_Unlocked(true)
150 {
151  const CNcbiRegistry& registry = CPubseqGatewayApp::GetInstance()->GetConfig();
153  try {
154  m_ClientPool->SetClientTimeout(CTimeout(timeout));
155  }
156  catch (CTimeException&) {
157  m_ClientPool->SetClientTimeout(CTimeout(CTimeout::eDefault));
158  }
160  if (max_conn == 0) {
161  max_conn = kDefaultMaxConn;
162  }
165  min(3u, max_conn), max_conn)));
166 }
167 
169  shared_ptr<CCDDClientPool> client_pool,
170  shared_ptr<CThreadPool> thread_pool,
171  shared_ptr<CPSGS_Request> request,
172  shared_ptr<CPSGS_Reply> reply,
173  TProcessorPriority priority)
174  : m_ClientPool(client_pool),
175  m_Start(psg_clock_t::now()),
176  m_Status(ePSGS_InProgress),
177  m_Canceled(false),
178  m_Unlocked(true),
179  m_ThreadPool(thread_pool)
180 {
181  m_Request = request;
182  m_Reply = reply;
183  m_Priority = priority;
184 }
185 
187 {
189  x_UnlockRequest();
190 }
191 
192 
194 {
195  // first check global setting
197  if ( enabled ) {
198  // check if disabled in request
199  for (const auto& name : request.GetRequest<SPSGS_RequestBase>().m_DisabledProcessors ) {
200  if ( NStr::EqualNocase(name, kCDDProcessorName) ) {
201  enabled = false;
202  break;
203  }
204  }
205  }
206  else {
207  // check if enabled in request
208  for (const auto& name : request.GetRequest<SPSGS_RequestBase>().m_EnabledProcessors ) {
209  if ( NStr::EqualNocase(name, kCDDProcessorName) ) {
210  enabled = true;
211  break;
212  }
213  }
214  }
215  return enabled;
216 }
217 
218 
219 vector<string> CPSGS_CDDProcessor::WhatCanProcess(shared_ptr<CPSGS_Request> request,
220  shared_ptr<CPSGS_Reply> reply) const
221 {
222  try {
223  vector<string> can_process;
224  if ( x_IsEnabled(*request) ) {
225  SPSGS_AnnotRequest& annot_request = request->GetRequest<SPSGS_AnnotRequest>();
226  if ( x_CanProcessAnnotRequestIds(annot_request) ) {
227  for ( auto& name : annot_request.m_Names ) {
228  if ( name == kCDDAnnotName ) {
229  can_process.push_back(name);
230  break;
231  }
232  }
233  }
234  }
235  return can_process;
236  }
237  catch ( exception& exc ) {
238  x_SendError(reply, "Exception in WhatCanProcess: ", exc);
239  throw;
240  }
241 }
242 
243 
244 bool CPSGS_CDDProcessor::CanProcess(shared_ptr<CPSGS_Request> request,
245  shared_ptr<CPSGS_Reply> reply) const
246 {
247  try {
248  auto req_type = request->GetRequestType();
249  if (req_type != CPSGS_Request::ePSGS_AnnotationRequest &&
251  return false;
252  }
253 
254  if ( !x_IsEnabled(*request) ) {
255  return false;
256  }
257 
258  if (req_type == CPSGS_Request::ePSGS_AnnotationRequest &&
259  !x_CanProcessAnnotRequest(request->GetRequest<SPSGS_AnnotRequest>(), 0)) {
260  return false;
261  }
263  !x_CanProcessBlobRequest(request->GetRequest<SPSGS_BlobBySatSatKeyRequest>())) {
264  return false;
265  }
266 
267  return true;
268  }
269  catch ( exception& exc ) {
270  x_SendError(reply, "Exception in CanProcess: ", exc);
271  throw;
272  }
273 }
274 
275 
277 CPSGS_CDDProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
278  shared_ptr<CPSGS_Reply> reply,
279  TProcessorPriority priority) const
280 {
281  try {
282  auto req_type = request->GetRequestType();
283  if (req_type != CPSGS_Request::ePSGS_AnnotationRequest &&
285  return nullptr;
286  }
287 
288  if ( !x_IsEnabled(*request) ) {
289  return nullptr;
290  }
291 
292  if (req_type == CPSGS_Request::ePSGS_AnnotationRequest &&
293  !x_CanProcessAnnotRequest(request->GetRequest<SPSGS_AnnotRequest>(), priority)) {
294  return nullptr;
295  }
297  !x_CanProcessBlobRequest(request->GetRequest<SPSGS_BlobBySatSatKeyRequest>())) {
298  return nullptr;
299  }
300 
301  return new CPSGS_CDDProcessor(m_ClientPool, m_ThreadPool, request, reply, priority);
302  }
303  catch ( exception& exc ) {
304  x_SendError(reply, "Exception in CreateProcessor: ", exc);
305  throw;
306  }
307 }
308 
309 
311 {
312  return kCDDProcessorName;
313 }
314 
315 
317 {
318  return kCDDProcessorGroupName;
319 }
320 
321 
322 void CPSGS_CDDProcessor::x_SendError(shared_ptr<CPSGS_Reply> reply,
323  const string& msg)
324 {
325  reply->PrepareProcessorMessage(reply->GetItemId(), kCDDProcessorName, msg,
328  eDiag_Error);
329 }
330 
331 
332 void CPSGS_CDDProcessor::x_SendError(const string& msg)
333 {
334  x_SendError(GetReply(), msg);
335 }
336 
337 
338 void CPSGS_CDDProcessor::x_SendError(shared_ptr<CPSGS_Reply> reply,
339  const string& msg, const exception& exc)
340 {
341  x_SendError(reply, msg+string(exc.what()));
342 }
343 
344 
345 void CPSGS_CDDProcessor::x_SendError(const string& msg, const exception& exc)
346 {
347  x_SendError(GetReply(), msg+string(exc.what()));
348 }
349 
350 
352 {
353  SPSGS_AnnotRequest& annot_request = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
354  annot_request.ReportResultStatus(kCDDAnnotName, status);
355 }
356 
357 
359 {
360  _ASSERT(GetRequest());
361  CRequestContextResetter context_resetter;
362  GetRequest()->SetRequestContext();
363 
364  try {
365  {
366  CFastMutexGuard guard(m_Mutex);
367  m_Unlocked = false;
368  }
370  auto req_type = GetRequest()->GetRequestType();
371  switch (req_type) {
374  break;
377  break;
378  default:
380  break;
381  }
382  }
383  catch (exception& exc) {
384  x_SendError("Exception when handling a request: ", exc);
386  }
387 }
388 
389 
390 static void s_OnGotBlobId(void* data)
391 {
392  static_cast<CPSGS_CDDProcessor*>(data)->OnGotBlobId();
393 }
394 
395 
396 static void s_OnGotBlobBySeqId(void* data)
397 {
398  static_cast<CPSGS_CDDProcessor*>(data)->OnGotBlobBySeqId();
399 }
400 
401 
403 {
404  SPSGS_AnnotRequest& annot_request = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
405  if ( !x_NameIncluded(annot_request.GetNotProcessedName(m_Priority)) ) {
408  return;
409  }
410  if (!annot_request.m_SeqId.empty() && x_CanProcessSeq_id(annot_request.m_SeqId)) {
411  m_SeqIds.push_back(CSeq_id_Handle::GetHandle(annot_request.m_SeqId));
412  }
413  for (auto& id : annot_request.m_SeqIds) {
414  if (x_CanProcessSeq_id(id)) {
415  m_SeqIds.push_back(CSeq_id_Handle::GetHandle(id));
416  }
417  }
418 
419  if (annot_request.m_TSEOption == SPSGS_BlobRequestBase::EPSGS_TSEOption::ePSGS_SmartTSE ||
420  annot_request.m_TSEOption == SPSGS_BlobRequestBase::EPSGS_TSEOption::ePSGS_WholeTSE ||
421  annot_request.m_TSEOption == SPSGS_BlobRequestBase::EPSGS_TSEOption::ePSGS_OrigTSE) {
422  // Send whole TSE.
423  m_ThreadPool->AddTask(new CCDDThreadPoolTask_GetBlobBySeqId(*this));
424  }
425  else {
426  // Send annot info only.
427  m_ThreadPool->AddTask(new CCDDThreadPoolTask_GetBlobId(*this));
428  }
429 }
430 
431 
432 static void s_OnGotBlobByBlobId(void* data)
433 {
434  static_cast<CPSGS_CDDProcessor*>(data)->OnGotBlobByBlobId();
435 }
436 
437 
439 {
440  SPSGS_BlobBySatSatKeyRequest blob_request =
441  GetRequest()->GetRequest<SPSGS_BlobBySatSatKeyRequest>();
443  if ( !m_BlobId ) {
445  return;
446  }
448 }
449 
450 
452 {
453  CRequestContextResetter context_resetter;
454  GetRequest()->SetRequestContext();
455  for (auto id : m_SeqIds) {
456  try {
457  if ( GetRequest()->NeedTrace() ) {
458  GetReply()->SendTrace(
459  kCDDProcessorName + " processor trying to get blob-id by seq-id " + id.AsString(),
460  GetRequest()->GetStartTimestamp());
461  }
462  m_CDDBlob.info = m_ClientPool->GetBlobIdBySeq_id(id);
463  if (m_CDDBlob.info) break;
464  }
465  catch (exception& exc) {
466  if ( GetRequest()->NeedTrace() ) {
467  GetReply()->SendTrace(
468  kCDDProcessorName + " processor failed to get blob-id by seq-id, exception: " + exc.what(),
469  GetRequest()->GetStartTimestamp());
470  }
471  m_Error = "Exception when handling get_na request: " + string(exc.what());
472  m_CDDBlob.info.Reset();
473  m_CDDBlob.data.Reset();
474  }
475  }
477 }
478 
479 
481 {
482  CRequestContextResetter context_resetter;
483  GetRequest()->SetRequestContext();
484  for (auto id : m_SeqIds) {
485  try {
486  if ( GetRequest()->NeedTrace() ) {
487  GetReply()->SendTrace(
488  kCDDProcessorName + " processor trying to get blob by seq-id " + id.AsString(),
489  GetRequest()->GetStartTimestamp());
490  }
491  m_CDDBlob = m_ClientPool->GetBlobBySeq_id(id);
492  if (m_CDDBlob.info && m_CDDBlob.data) break;
493  }
494  catch (exception& exc) {
495  if ( GetRequest()->NeedTrace() ) {
496  GetReply()->SendTrace(
497  kCDDProcessorName + " processor failed to get blob by seq-id, exception: " + exc.what(),
498  GetRequest()->GetStartTimestamp());
499  }
500  m_Error = "Exception when handling get_na request: " + string(exc.what());
501  m_CDDBlob.info.Reset();
502  m_CDDBlob.data.Reset();
503  }
504  }
506 }
507 
508 
510 {
511  CRequestContextResetter context_resetter;
512  GetRequest()->SetRequestContext();
513  try {
514  if ( GetRequest()->NeedTrace() ) {
515  GetReply()->SendTrace(
516  kCDDProcessorName + " processor trying to get blob by blob-id " + CCDDClientPool::BlobIdToString(*m_BlobId),
517  GetRequest()->GetStartTimestamp());
518  }
519  m_CDDBlob.data = m_ClientPool->GetBlobByBlobId(*m_BlobId);
520  }
521  catch (exception& exc) {
522  if ( GetRequest()->NeedTrace() ) {
523  GetReply()->SendTrace(
524  kCDDProcessorName + " processor failed to get blob by blob-id, exception: " + exc.what(),
525  GetRequest()->GetStartTimestamp());
526  }
527  m_Error = "Exception when handling getblob request: " + string(exc.what());
528  m_CDDBlob.info.Reset();
529  m_CDDBlob.data.Reset();
530  }
532 }
533 
534 
536 {
537  CRequestContextResetter context_resetter;
538  GetRequest()->SetRequestContext();
539  if ( x_IsCanceled() ) {
540  return;
541  }
542  if ( s_SimulateError() ) {
543  m_Error = "simulated CDD processor error";
544  m_CDDBlob.info.Reset();
545  m_CDDBlob.data.Reset();
546  }
547  if ( !m_CDDBlob.info ) {
548  if ( !m_Error.empty() ) {
552  }
553  else {
554  if ( GetRequest()->NeedTrace() ) {
555  GetReply()->SendTrace(
556  kCDDProcessorName + " processor did not find the requested blob-id",
557  GetRequest()->GetStartTimestamp());
558  }
562  }
563  return;
564  }
565  if ( !x_SignalStartProcessing() ) {
566  return;
567  }
568  try {
569  x_SendAnnotInfo(*m_CDDBlob.info);
570  }
571  catch (exception& exc) {
572  m_Error = "Exception when sending get_na reply: " + string(exc.what());
576  return;
577  }
579 }
580 
581 
583 {
584  CRequestContextResetter context_resetter;
585  GetRequest()->SetRequestContext();
586  if ( x_IsCanceled() ) {
587  return;
588  }
589  if ( s_SimulateError() ) {
590  m_Error = "simulated CDD processor error";
591  m_CDDBlob.info.Reset();
592  m_CDDBlob.data.Reset();
593  }
594  if ( !m_CDDBlob.info || !m_CDDBlob.data ) {
595  if ( !m_Error.empty() ) {
599  }
600  else {
601  if ( GetRequest()->NeedTrace() ) {
602  GetReply()->SendTrace(
603  kCDDProcessorName + " processor did not find the requested blob",
604  GetRequest()->GetStartTimestamp());
605  }
609  }
610  return;
611  }
612  if ( !x_SignalStartProcessing() ) {
613  return;
614  }
615  try {
616  x_SendAnnotInfo(*m_CDDBlob.info);
617  x_SendAnnot(m_CDDBlob.info->GetBlob_id(), m_CDDBlob.data);
618  }
619  catch (exception& exc) {
620  m_Error = "Exception when sending get_na reply: " + string(exc.what());
624  return;
625  }
627 }
628 
629 
631 {
632  CRequestContextResetter context_resetter;
633  GetRequest()->SetRequestContext();
634  if ( x_IsCanceled() ) {
635  return;
636  }
637  if ( s_SimulateError() ) {
638  m_Error = "simulated CDD processor error";
639  m_CDDBlob.info.Reset();
640  m_CDDBlob.data.Reset();
641  }
642  if ( !m_CDDBlob.data ) {
643  if ( !m_Error.empty() ) {
646  }
647  else {
648  if ( GetRequest()->NeedTrace() ) {
649  GetReply()->SendTrace(
650  kCDDProcessorName + " processor did not find the requested blob",
651  GetRequest()->GetStartTimestamp());
652  }
655  }
656  return;
657  }
658  if ( !x_SignalStartProcessing() ) {
659  return;
660  }
661  try {
663  }
664  catch (exception& exc) {
665  m_Error = "Exception when sending getblob reply: " + string(exc.what());
668  return;
669  }
671 }
672 
673 
675  EPSGOperationStatus status,
676  size_t blob_size)
677 {
679  GetTiming().Register(this, operation, status, m_Start, blob_size);
680 }
681 
682 
684 {
686 }
687 
688 
690 {
691  SPSGS_AnnotRequest& annot_request = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
692  if ( annot_request.RegisterProcessedName(GetPriority(), kCDDAnnotName) > GetPriority() ) {
693  // higher priority processor already processed this request
694  if ( GetRequest()->NeedTrace() ) {
695  GetReply()->SendTrace(
696  kCDDProcessorName + " processor stops sending annot-info because a higher priority processor has already sent it",
697  GetRequest()->GetStartTimestamp());
698  }
700  return;
701  }
702 
704 
705  const CID2_Blob_Id& blob_id = blob_info.GetBlob_id();
707  json.SetString("blob_id", CCDDClientPool::BlobIdToString(blob_id));
708  if ( blob_id.IsSetVersion() ) {
709  json.SetInteger("last_modified", blob_id.GetVersion()*60000);
710  }
711 
713  annot_info->SetName(kCDDAnnotName);
715  feat_info->SetType(CSeqFeatData::e_Region);
716  feat_info->SetSubtypes().push_back(CSeqFeatData::eSubtype_region);
717  annot_info->SetFeat().push_back(feat_info);
718  feat_info.Reset(new CID2S_Feat_type_Info);
719  feat_info->SetType(CSeqFeatData::e_Site);
720  feat_info->SetSubtypes().push_back(CSeqFeatData::eSubtype_site);
721  annot_info->SetFeat().push_back(feat_info);
722 
723  const CSeq_id& annot_id = blob_info.GetSeq_id();
724  if ( annot_id.IsGi() ) {
725  annot_info->SetSeq_loc().SetWhole_gi(annot_id.GetGi());
726  }
727  else {
728  annot_info->SetSeq_loc().SetWhole_seq_id().Assign(annot_id);
729  }
730 
731  ostringstream annot_str;
732  annot_str << MSerial_AsnBinary << *annot_info;
733  json.SetString("seq_annot_info", NStr::Base64Encode(annot_str.str(), 0));
734 
735  GetReply()->PrepareNamedAnnotationData(kCDDAnnotName, kCDDProcessorName,
737 }
738 
739 
741 {
742  string psg_blob_id = CCDDClientPool::BlobIdToString(id2_blob_id);
743  CRef<CSeq_entry> entry(new CSeq_entry);
744  entry->SetSet().SetSeq_set();
745  entry->SetAnnot().push_back(annot);
746  ostringstream blob_str;
747  blob_str << MSerial_AsnBinary << *entry;
748  string blob_data = blob_str.str();
749 
750  x_RegisterTiming(eNARetrieve, eOpStatusFound, blob_data.size());
751 
752  CBlobRecord blob_props;
753  if (id2_blob_id.IsSetVersion()) {
754  blob_props.SetModified(int64_t(id2_blob_id.GetVersion()*60000));
755  }
756  blob_props.SetNChunks(1);
757  size_t item_id = GetReply()->GetItemId();
758  GetReply()->PrepareBlobPropData(
759  item_id,
761  psg_blob_id,
762  ToJsonString(blob_props));
763  GetReply()->PrepareBlobPropCompletion(item_id, kCDDProcessorName, 2);
764 
765  item_id = GetReply()->GetItemId();
766  GetReply()->PrepareBlobData(
767  item_id,
769  psg_blob_id,
770  (const unsigned char*)blob_data.data(), blob_data.size(), 0);
771  GetReply()->PrepareBlobCompletion(item_id, kCDDProcessorName, 2);
772 }
773 
774 
776 {
777  m_Canceled = true;
778  if (!IsUVThreadAssigned()) {
781  }
782  else {
783  x_UnlockRequest();
784  }
785 }
786 
787 
789 {
790  return m_Status;
791 }
792 
793 
795 {
796  {
797  CFastMutexGuard guard(m_Mutex);
798  if (m_Unlocked) return;
799  m_Unlocked = true;
800  }
801  if (GetRequest()) GetRequest()->Unlock(kCDDProcessorEvent);
802 }
803 
804 
806 {
807  if ( m_Canceled ) {
809  return true;
810  }
811  return false;
812 }
813 
814 
816 {
817  if ( GetRequest()->GetRequestType() == CPSGS_Request::ePSGS_AnnotationRequest ) {
818  SPSGS_AnnotRequest& annot_request = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
819  if ( annot_request.RegisterProcessedName(GetPriority(), kCDDAnnotName) > GetPriority() ) {
820  // higher priority processor already processed this request
821  if ( GetRequest()->NeedTrace() ) {
822  GetReply()->SendTrace(
823  kCDDProcessorName + " processor stops processing request because a higher priority processor has already processed it",
824  GetRequest()->GetStartTimestamp());
825  }
827  return false;
828  }
829  }
830  else {
831  if ( SignalStartProcessing() == ePSGS_Cancel ) {
833  return false;
834  }
835  }
836  return true;
837 }
838 
839 
840 bool CPSGS_CDDProcessor::x_CanProcessSeq_id(const string& psg_id) const
841 {
842  try {
843  CSeq_id id(psg_id);
844  if (!id.IsGi() && !id.GetTextseq_Id()) return false;
845  if (!m_ClientPool->IsValidId(id)) return false;
846  }
847  catch (exception& e) {
848  return false;
849  }
850  return true;
851 }
852 
853 
855 {
856  if (!annot_request.m_SeqId.empty() && x_CanProcessSeq_id(annot_request.m_SeqId)) return true;
857  for (const auto& id: annot_request.m_SeqIds) {
858  if (x_CanProcessSeq_id(id)) return true;
859  }
860  return false;
861 }
862 
863 
865  TProcessorPriority priority) const
866 {
867  if (!x_NameIncluded(annot_request.GetNotProcessedName(priority))) {
868  return false;
869  }
870  if (!x_CanProcessAnnotRequestIds(annot_request)) {
871  return false;
872  }
873  return true;
874 }
875 
876 
878 {
881  return blob_id && blob_id->GetSat() == kCDDSat;
882 }
883 
884 
885 bool CPSGS_CDDProcessor::x_NameIncluded(const vector<string>& names) const
886 {
887  for ( auto& name : names ) {
888  if ( name == kCDDAnnotName ) return true;
889  }
890  return false;
891 }
892 
893 
895 {
896  _ASSERT(status != ePSGS_InProgress);
897  m_Status = status;
898  x_UnlockRequest();
900 }
901 
902 
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
#define true
Definition: bool.h:35
#define false
Definition: bool.h:36
USING_SCOPE(objects)
static bool s_SimulateError()
static void s_OnGotBlobBySeqId(void *data)
static void s_OnGotBlobId(void *data)
BEGIN_LOCAL_NAMESPACE
static const string kCDDProcessorSection
static const string kCDDProcessorName
NCBI_PARAM_DEF(int, CDD_PROCESSOR, ERROR_RATE, 0)
NCBI_PARAM_DECL(int, CDD_PROCESSOR, ERROR_RATE)
static const string kCDDAnnotName
END_LOCAL_NAMESPACE
const CID2_Blob_Id::TSat kCDDSat
static const string kParamMaxConn
END_NCBI_NAMESPACE
static const int kDefaultMaxConn
BEGIN_NCBI_NAMESPACE
static void s_OnGotBlobByBlobId(void *data)
BEGIN_NAMESPACE(psg)
static const string kCDDProcessorGroupName
END_NAMESPACE(cdd)
static const double kDefaultCDDBackendTimeout
static const string kParamCDDBackendTimeout
const string kCDDProcessorEvent
CBlobRecord & SetNChunks(int32_t value)
CBlobRecord & SetModified(TTimestamp value)
Definition: blob_record.cpp:70
static CRef< TBlobId > StringToBlobId(const string &s)
Definition: cdd_client.cpp:631
static string BlobIdToString(const TBlobId &blob_id)
Definition: cdd_client.cpp:622
CCDDThreadPoolTask_GetBlobByBlobId(CPSGS_CDDProcessor &processor)
virtual EStatus Execute(void) override
Do the actual job.
virtual EStatus Execute(void) override
Do the actual job.
CCDDThreadPoolTask_GetBlobBySeqId(CPSGS_CDDProcessor &processor)
virtual EStatus Execute(void) override
Do the actual job.
CCDDThreadPoolTask_GetBlobId(CPSGS_CDDProcessor &processor)
CPSGS_CDDProcessor & m_Processor
CCDD_Reply_Get_Blob_Id –.
CID2S_Feat_type_Info –.
CID2S_Seq_annot_Info –.
CID2_Blob_Id –.
Definition: ID2_Blob_Id.hpp:66
JSON node abstraction.
string Repr(TReprFlags flags=0) const
Return a string representation of this node.
void SetString(const string &key, const string &value)
Set a JSON object element to the specified string value.
void SetInteger(const string &key, Int8 value)
Set a JSON object element to the specified integer value.
static CJsonNode NewObjectNode()
Create a new JSON object node.
CNcbiRegistry –.
Definition: ncbireg.hpp:913
void x_UnlockRequest(void)
void x_SendAnnotInfo(const objects::CCDD_Reply_Get_Blob_Id &blob_info)
vector< objects::CSeq_id_Handle > m_SeqIds
void x_SendAnnot(const objects::CID2_Blob_Id &id2_blob_id, CRef< objects::CSeq_annot > &annot)
bool x_CanProcessAnnotRequestIds(SPSGS_AnnotRequest &annot_request) const
vector< string > WhatCanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Needs to be implemented only for the ID/get_na requests.
void GetBlobByBlobId(void)
void OnGotBlobByBlobId(void)
psg_time_point_t m_Start
EPSGS_Status m_Status
void x_Finish(EPSGS_Status status)
objects::CCDDClientPool::SCDDBlob m_CDDBlob
void x_ProcessGetBlobRequest(void)
string GetGroupName(void) const override
Tells the processor group name.
bool x_CanProcessSeq_id(const string &psg_id) const
void x_RegisterTimingNotFound(EPSGOperation operation)
void Process(void) override
Main processing function.
~CPSGS_CDDProcessor(void) override
void GetBlobBySeqId(void)
bool x_NameIncluded(const vector< string > &names) const
IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const override
Create processor to fulfil PSG request using the data source.
bool x_IsEnabled(CPSGS_Request &request) const
void Cancel(void) override
The infrastructure request to cancel processing.
bool x_CanProcessAnnotRequest(SPSGS_AnnotRequest &annot_request, TProcessorPriority priority) const
void x_ReportResultStatus(SPSGS_AnnotRequest::EPSGS_ResultStatus status)
void OnGotBlobBySeqId(void)
string GetName(void) const override
Tells the processor name (used in logging and tracing)
bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const override
Tells if processor can process the given request.
EPSGS_Status GetStatus(void) override
Tells the processor status (if it has finished or in progress)
shared_ptr< ncbi::CThreadPool > m_ThreadPool
CRef< objects::CCDDClientPool::TBlobId > m_BlobId
static void x_SendError(shared_ptr< CPSGS_Reply > reply, const string &msg)
void x_RegisterTiming(EPSGOperation operation, EPSGOperationStatus status, size_t size)
bool x_CanProcessBlobRequest(SPSGS_BlobBySatSatKeyRequest &blob_request) const
shared_ptr< objects::CCDDClientPool > m_ClientPool
void x_ProcessResolveRequest(void)
@ ePSGS_BlobBySatSatKeyRequest
TRequest & GetRequest(void)
bool GetCDDProcessorsEnabled() const
static CPubseqGatewayApp * GetInstance(void)
CRef –.
Definition: ncbiobj.hpp:618
Definition: Seq_entry.hpp:56
TAnnot & SetAnnot(void)
Definition: Seq_entry.cpp:195
Abstract class for representing single task executing in pool of threads To use this class in applica...
Definition: thread_pool.hpp:76
Main class implementing functionality of pool of threads.
CTimeException –.
Definition: ncbitime.hpp:2076
CTimeout – Timeout interval.
Definition: ncbitime.hpp:1693
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
shared_ptr< CPSGS_Reply > GetReply(void) const
Provides the reply wrapper.
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
shared_ptr< CPSGS_Reply > m_Reply
TProcessorPriority GetPriority(void) const
Provides the processor priority.
EPSGS_Status
The GetStatus() method returns a processor current status.
void PostponeInvoke(CPSGS_UvLoopBinder::TProcessorCB cb, void *user_data)
The provided callback will be called from the libuv loop assigned to the processor.
void SignalFinishProcessing(void)
A processor should call this method when it decides that there is nothing else to be done.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
TProcessorPriority m_Priority
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
static const struct name_t names[]
string
Definition: cgiapp.hpp:687
@ eDiag_Error
Error message.
Definition: ncbidiag.hpp:653
#define MSerial_AsnBinary
Definition: serialbase.hpp:697
static CSeq_id_Handle GetHandle(const CSeq_id &id)
Normal way of getting a handle, works for any seq-id.
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:773
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
Definition: ncbi_param.hpp:149
#define kMax_UInt
Definition: ncbi_limits.h:185
virtual int GetInt(const string &section, const string &name, int default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get integer value of specified parameter name.
Definition: ncbireg.cpp:362
virtual double GetDouble(const string &section, const string &name, double default_value, TFlags flags=0, EErrAction err_action=eThrow) const
Get double value of specified parameter name.
Definition: ncbireg.cpp:420
static string Base64Encode(const CTempString str, size_t line_len=0)
Base64-encode string.
Definition: ncbistr.cpp:6266
static bool EqualNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive equality of a substring with another string.
Definition: ncbistr.hpp:5353
EStatus
Status of the task.
Definition: thread_pool.hpp:79
@ eCompleted
executed successfully
Definition: thread_pool.hpp:83
@ eDefault
Default timeout (to be interpreted by the client code)
Definition: ncbitime.hpp:1698
operation
Bit operations.
Definition: bmconst.h:191
const TBlob_id & GetBlob_id(void) const
Get the Blob_id member data.
const TSeq_id & GetSeq_id(void) const
Get the Seq_id member data.
TVersion GetVersion(void) const
Get the Version member data.
bool IsSetVersion(void) const
version of blob, optional in some requests Check if a value has been assigned to Version data member.
@ e_Region
named region (globin locus)
TGi GetGi(void) const
Get the variant data.
Definition: Seq_id_.hpp:889
bool IsGi(void) const
Check if variant Gi is selected.
Definition: Seq_id_.hpp:883
TSet & SetSet(void)
Select the variant.
Definition: Seq_entry_.cpp:130
TSeq_set & SetSeq_set(void)
Assign a value to Seq_set data member.
T min(T x_, T y_)
string ToJsonString(const CBioseqInfoRecord &bioseq_info, SPSGS_ResolveRequest::TPSGS_BioseqIncludeData include_data_flags, const string &custom_blob_id)
chrono::steady_clock psg_clock_t
int TProcessorPriority
@ ePSGS_UnknownError
signed __int64 int64_t
Definition: stdint.h:135
TProcessorPriority RegisterProcessedName(TProcessorPriority priority, const string &name)
vector< string > m_Names
vector< string > GetNotProcessedName(TProcessorPriority priority)
vector< string > m_SeqIds
void ReportResultStatus(const string &annot_name, EPSGS_ResultStatus rs)
string GetId(void) const
EPSGS_TSEOption m_TSEOption
vector< string > m_DisabledProcessors
vector< string > m_EnabledProcessors
#define _ASSERT
Pool of generic task-executing threads.
EPSGOperationStatus
Definition: timing.hpp:60
@ eOpStatusFound
Definition: timing.hpp:61
@ eOpStatusNotFound
Definition: timing.hpp:62
EPSGOperation
Definition: timing.hpp:65
@ eNAResolve
Definition: timing.hpp:102
@ eNARetrieve
Definition: timing.hpp:99
Modified on Thu Mar 28 17:09:11 2024 by modify_doxy.py rev. 669887