NCBI C++ ToolKit
osg_annot.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: osg_annot.cpp 99336 2023-03-13 14:22:34Z vasilche $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Eugene Vasilchenko
27  *
28  * File Description: processor for data from OSG
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "osg_annot.hpp"
35 #include "osg_getblob_base.hpp"
36 #include "osg_fetch.hpp"
37 #include "osg_connection.hpp"
38 #include "pubseq_gateway.hpp"
39 
42 #include <objects/id2/id2__.hpp>
44 #include <util/range.hpp>
45 #include <corelib/ncbi_param.hpp>
46 
50 
51 
53  const CRef<COSGConnectionPool>& pool,
54  const shared_ptr<CPSGS_Request>& request,
55  const shared_ptr<CPSGS_Reply>& reply,
56  TProcessorPriority priority)
57  : CPSGS_OSGProcessorBase(enabled_flags, pool, request, reply, priority)
58 {
59 }
60 
61 
63 {
65 }
66 
67 
69 {
70  return "OSG-annot";
71 }
72 
73 
75 {
77 }
78 
79 
80 vector<string> CPSGS_OSGAnnot::WhatCanProcess(TEnabledFlags enabled_flags,
81  shared_ptr<CPSGS_Request>& request)
82 {
83  if ( !(enabled_flags&fEnabledAllAnnot) ) {
84  return vector<string>();
85  }
86  auto& psg_req = request->GetRequest<SPSGS_AnnotRequest>();
87  // check if id is good enough
88  CSeq_id id;
89  try {
90  SetSeqId(id, psg_req.m_SeqIdType, psg_req.m_SeqId);
91  }
92  catch ( exception& /*ignore*/ ) {
93  return vector<string>();
94  }
95  if ( !id.IsGi() && !id.GetTextseq_Id() ) {
96  return vector<string>();
97  }
98  //if ( !CanResolve(request.m_SeqIdType, request.m_SeqId) ) {
99  // return false;
100  //}
101  return GetNamesToProcess(enabled_flags, psg_req, 0);
102 }
103 
104 
106  shared_ptr<CPSGS_Request>& request,
107  TProcessorPriority priority)
108 {
109  if ( !(enabled_flags&fEnabledAllAnnot) ) {
110  return false;
111  }
112  auto& psg_req = request->GetRequest<SPSGS_AnnotRequest>();
113  // check if id is good enough
114  CSeq_id id;
115  try {
116  SetSeqId(id, psg_req.m_SeqIdType, psg_req.m_SeqId);
117  }
118  catch ( exception& /*ignore*/ ) {
119  return false;
120  }
121  if ( !id.IsGi() && !id.GetTextseq_Id() ) {
122  return false;
123  }
124  //if ( !CanResolve(request.m_SeqIdType, request.m_SeqId) ) {
125  // return false;
126  //}
127  return !GetNamesToProcess(enabled_flags, psg_req, priority).empty();
128 }
129 
130 
132  SPSGS_AnnotRequest& request,
133  TProcessorPriority priority)
134 {
135  vector<string> ret;
136  for ( auto& name : request.GetNotProcessedName(priority) ) {
137  if ( CanProcessAnnotName(enabled_flags, name) ) {
138  ret.push_back(name);
139  }
140  }
141  return ret;
142 }
143 
144 
145 static bool IsCDDName(const string& name)
146 {
147  return NStr::EqualNocase(name, "CDD");
148 }
149 
150 
151 // primary SNP track
152 static bool IsPrimarySNPName(const string& name)
153 {
154  return NStr::EqualNocase(name, "SNP");
155 }
156 
157 
158 // explicit name for a SNP track
159 static bool IsExplicitSNPName(const string& name)
160 {
161  return NStr::StartsWith(name, "NA", NStr::eNocase) && name.find("#") != NPOS;
162 }
163 
164 
165 static bool IsSNPName(const string& name)
166 {
167  return IsPrimarySNPName(name) || IsExplicitSNPName(name);
168 }
169 
170 
172  const string& name)
173 {
174  if ( (enabled_flags & fEnabledSNP) && IsSNPName(name) ) {
175  return true;
176  }
177  if ( (enabled_flags & fEnabledCDD) && IsCDDName(name) ) {
178  return true;
179  }
180  return false;
181 }
182 
183 
185 {
186  auto& psg_req = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
187  CRef<CID2_Request> osg_req(new CID2_Request);
188  auto& req = osg_req->SetRequest().SetGet_blob_id();
189  SetSeqId(req.SetSeq_id().SetSeq_id().SetSeq_id(), psg_req.m_SeqIdType, psg_req.m_SeqId);
191  m_ApplyCDDFix = false;
192  for ( auto& name : GetNamesToProcess(GetEnabledFlags(), psg_req, GetPriority()) ) {
193  m_NamesToProcess.insert(name);
194  if ( IsCDDName(name) ) {
195  // CDD are external annotations in OSG
196  req.SetExternal();
198  }
199  else {
200  // others have named annot accession (source)
201  req.SetSources().push_back(name);
202  }
203  }
204  AddRequest(osg_req);
205 }
206 
207 
209 {
210  if ( m_ApplyCDDFix ) {
211  m_CDDReceived = false;
213  }
214 }
215 
216 
218 {
219  if ( m_ApplyCDDFix ) {
220  if ( IsCDDReply(reply) ) {
221  m_CDDReceived = true;
222  }
223  }
224 }
225 
226 
228 {
229  if ( m_ApplyCDDFix ) {
230  if ( !m_CDDReceived &&
231  m_RequestTime.Elapsed() > GetConnectionPool().GetCDDRetryTimeout() ) {
232  NCBI_THROW(CPubseqGatewayException, eRequestCanceled, "no CDD due to OSG timeout");
233  }
234  }
235 }
236 
237 
238 bool CPSGS_OSGAnnot::RegisterProcessedName(const string& annot_name)
239 {
240  auto& psg_req = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
241  auto prev_priority = psg_req.RegisterProcessedName(GetPriority(), annot_name);
242  if ( prev_priority > GetPriority() ) {
243  if ( NeedTrace() ) {
244  ostringstream str;
245  str << "OSG-annot: skip sending NA "<<annot_name
246  << " that has already been processed by processor with priority "
247  << prev_priority << " > " << GetPriority();
248  SendTrace(str.str());
249  }
250  return false;
251  }
252  if ( prev_priority != kUnknownPriority && prev_priority < GetPriority() ) {
253  if ( NeedTrace() ) {
254  ostringstream str;
255  str << "OSG-annot: resending NA "<<annot_name
256  << " that has already been processed by processor with priority "
257  << prev_priority << " < " << GetPriority();
258  SendTrace(str.str());
259  }
260  }
261  return true;
262 }
263 
264 
266 {
267  for ( auto& f : GetFetches() ) {
268  if ( GetDebugLevel() >= eDebug_exchange ) {
269  LOG_POST(GetDiagSeverity() << "OSG: "
270  "Processing fetch: "<<MSerial_AsnText<<f->GetRequest());
271  }
272  for ( auto& r : f->GetReplies() ) {
273  if ( GetDebugLevel() >= eDebug_exchange ) {
274  LOG_POST(GetDiagSeverity() << "OSG: "
275  "Processing reply: "<<MSerial_AsnText<<*r);
276  }
277  switch ( r->GetReply().Which() ) {
280  // do nothing
281  break;
283  // do nothing
284  break;
286  AddBlobId(r->GetReply().GetGet_blob_id());
287  break;
288  default:
289  PSG_ERROR(GetName()<<": "
290  "Unknown reply to "<<MSerial_AsnText<<*f->GetRequest()<<"\n"<<*r);
291  break;
292  }
293  }
294  }
295  if ( IsCanceled() ) {
296  return;
297  }
298  if ( s_SimulateError() ) {
299  SPSGS_AnnotRequest& annot_request = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
300  for ( auto& name : m_NamesToProcess ) {
302  }
304  return;
305  }
306  SendReplies();
307 }
308 
309 
311 {
312  string name;
313  for ( auto& ai : blob_id.GetAnnot_info() ) {
314  // find annot name
315  string ai_name = ai->GetName();
316  SIZE_TYPE zoom_pos = ai_name.find("@@");
317  if ( zoom_pos != NPOS ) {
318  ai_name.resize(zoom_pos);
319  }
320  if ( name.empty() ) {
321  name = ai_name;
322  }
323  else if ( name != ai_name ) {
324  return 0;
325  }
326  }
327  return name;
328 }
329 
330 
332 {
333  if ( !blob_id.IsSetBlob_id() ||
335  !blob_id.IsSetAnnot_info() ) {
336  return 0;
337  }
338  string name = GetAnnotName(blob_id);
339  if ( !m_NamesToProcess.count(name) ) {
340  return 0;
341  }
342  auto iter = m_BlobIds.insert(make_pair(name, TBlobIdList())).first;
343  iter->second.push_back(Ref(&blob_id));
344  return &iter->first;
345 }
346 
347 
349 {
350  if ( GetDebugLevel() >= eDebug_exchange ) {
351  for ( auto& name : m_NamesToProcess ) {
352  LOG_POST(GetDiagSeverity() << "OSG: "
353  "Asked for annot "<<name);
354  }
355  for ( auto& r_name : m_BlobIds ) {
356  for ( auto& r : r_name.second ) {
357  LOG_POST(GetDiagSeverity() << "OSG: "
358  "Received annot reply "<<MSerial_AsnText<<*r);
359  }
360  }
361  }
362  set<string> has_data;
363  set<string> processed_by_other;
364  for ( auto& r_name : m_BlobIds ) {
365  auto& annot_name = r_name.first;
366  if ( !RegisterProcessedName(annot_name) ) {
367  processed_by_other.insert(annot_name);
368  continue;
369  }
370  has_data.insert(annot_name);
371  for ( auto& r : r_name.second ) {
372  string psg_blob_id = CPSGS_OSGGetBlobBase::GetPSGBlobId(r->GetBlob_id());
375  json.SetString("blob_id", psg_blob_id);
376  if ( r->GetBlob_id().IsSetVersion() ) {
377  json.SetInteger("last_modified", r->GetBlob_id().GetVersion()*60000);
378  }
379  if ( r->IsSetAnnot_info() ) {
380  // set ASN.1 annot info
381  ostringstream str;
382  for ( auto& info : r->GetAnnot_info() ) {
383  str << MSerial_AsnBinary << *info;
384  }
385  json.SetString("seq_annot_info", NStr::Base64Encode(str.str(), 0));
386  }
387  GetReply()->PrepareNamedAnnotationData(annot_name, GetName(),
389  }
390  }
391  SPSGS_AnnotRequest& annot_request = GetRequest()->GetRequest<SPSGS_AnnotRequest>();
392  for ( auto& name : m_NamesToProcess ) {
393  if ( processed_by_other.count(name) ) {
394  continue;
395  }
396  if ( !has_data.count(name) ) {
398  }
399  }
401 }
402 
403 
404 bool CPSGS_OSGAnnot::IsCDDReply(const CID2_Reply& reply) const
405 {
406  if ( !reply.GetReply().IsGet_blob_id() ) {
407  return false;
408  }
409 
410  const CID2_Reply_Get_Blob_Id& blob_id = reply.GetReply().GetGet_blob_id();
411  if ( !blob_id.IsSetBlob_id() ||
413  !blob_id.IsSetAnnot_info() ) {
414  return false;
415  }
416  return IsCDDName(GetAnnotName(blob_id));
417 }
418 
419 
CID2_Reply_Get_Blob_Id –.
CID2_Reply –.
Definition: ID2_Reply.hpp:66
CID2_Request –.
Definition: ID2_Request.hpp:66
JSON node abstraction.
string Repr(TReprFlags flags=0) const
Return a string representation of this node.
void SetString(const string &key, const string &value)
Set a JSON object element to the specified string value.
void SetInteger(const string &key, Int8 value)
Set a JSON object element to the specified integer value.
static CJsonNode NewObjectNode()
Create a new JSON object node.
double GetCDDRetryTimeout() const
virtual void ProcessReplies() override
Definition: osg_annot.cpp:265
virtual string GetName() const override
Tells the processor name (used in logging and tracing)
Definition: osg_annot.cpp:68
const string * AddBlobId(const CID2_Reply_Get_Blob_Id &blob_id)
Definition: osg_annot.cpp:331
static string GetAnnotName(const CID2_Reply_Get_Blob_Id &blob_id)
Definition: osg_annot.cpp:310
map< string, TBlobIdList > m_BlobIds
Definition: osg_annot.hpp:103
virtual void NotifyOSGCallReply(const CID2_Reply &reply) override
Definition: osg_annot.cpp:217
static bool CanProcess(TEnabledFlags enabled_flags, shared_ptr< CPSGS_Request > &request, TProcessorPriority priority)
Definition: osg_annot.cpp:105
bool IsCDDReply(const CID2_Reply &reply) const
Definition: osg_annot.cpp:404
CStopWatch m_RequestTime
Definition: osg_annot.hpp:106
static vector< string > GetNamesToProcess(TEnabledFlags enabled_flags, SPSGS_AnnotRequest &request, TProcessorPriority priority)
Definition: osg_annot.cpp:131
virtual void NotifyOSGCallStart() override
Definition: osg_annot.cpp:208
set< string > m_NamesToProcess
Definition: osg_annot.hpp:101
virtual string GetGroupName() const override
Tells the processor group name.
Definition: osg_annot.cpp:74
static vector< string > WhatCanProcess(TEnabledFlags enabled_flags, shared_ptr< CPSGS_Request > &request)
Definition: osg_annot.cpp:80
virtual ~CPSGS_OSGAnnot()
Definition: osg_annot.cpp:62
virtual void NotifyOSGCallEnd() override
Definition: osg_annot.cpp:227
vector< CConstRef< CID2_Reply_Get_Blob_Id > > TBlobIdList
Definition: osg_annot.hpp:102
static bool CanProcessAnnotName(TEnabledFlags enabled_flags, const string &name)
Definition: osg_annot.cpp:171
virtual void CreateRequests() override
Definition: osg_annot.cpp:184
void SendReplies()
Definition: osg_annot.cpp:348
CPSGS_OSGAnnot(TEnabledFlags enabled_flags, const CRef< COSGConnectionPool > &pool, const shared_ptr< CPSGS_Request > &request, const shared_ptr< CPSGS_Reply > &reply, TProcessorPriority priority)
Definition: osg_annot.cpp:52
bool RegisterProcessedName(const string &annot_name)
Definition: osg_annot.cpp:238
static bool IsEnabledCDDBlob(TEnabledFlags enabled_flags, const CID2_Blob_Id &blob_id)
static string GetPSGBlobId(const CID2_Blob_Id &blob_id)
static bool IsEnabledAnnotBlob(TEnabledFlags enabled_flags, const CID2_Blob_Id &blob_id)
void AddRequest(const CRef< CID2_Request > &req)
TEnabledFlags GetEnabledFlags() const
void SendTrace(const string &str)
const TFetches & GetFetches() const
void x_RegisterTiming(EPSGOperation operation, EPSGOperationStatus status, size_t blob_size)
COSGConnectionPool & GetConnectionPool() const
static void SetSeqId(CSeq_id &id, int seq_id_type, const string &seq_id)
shared_ptr< CPSGS_Reply > GetReply(void) const
Provides the reply wrapper.
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
TProcessorPriority GetPriority(void) const
Provides the processor priority.
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
void clear()
Definition: set.hpp:153
bool empty() const
Definition: set.hpp:133
static const char * str(char *buf, int n)
Definition: stats.c:84
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
#define MSerial_AsnBinary
Definition: serialbase.hpp:697
#define MSerial_AsnText
I/O stream manipulators –.
Definition: serialbase.hpp:696
CRef< C > Ref(C *object)
Helper functions to get CRef<> and CConstRef<> objects.
Definition: ncbiobj.hpp:2015
NCBI_NS_STD::string::size_type SIZE_TYPE
Definition: ncbistr.hpp:132
static string Base64Encode(const CTempString str, size_t line_len=0)
Base64-encode string.
Definition: ncbistr.cpp:6270
#define NPOS
Definition: ncbistr.hpp:133
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
Definition: ncbistr.hpp:5412
static bool EqualNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive equality of a substring with another string.
Definition: ncbistr.hpp:5353
@ eNocase
Case insensitive compare.
Definition: ncbistr.hpp:1206
double Restart(void)
Return time elapsed since first Start() or last Restart() call (in seconds).
Definition: ncbitime.hpp:2817
double Elapsed(void) const
Return time elapsed since first Start() or last Restart() call (in seconds).
Definition: ncbitime.hpp:2776
bool IsSetAnnot_info(void) const
annotation types in this blob annotation are unknown if this field is omitted Check if a value has be...
const TGet_blob_id & GetGet_blob_id(void) const
Get the variant data.
Definition: ID2_Reply_.cpp:186
const TAnnot_info & GetAnnot_info(void) const
Get the Annot_info member data.
const TReply & GetReply(void) const
Get the Reply member data.
Definition: ID2_Reply_.hpp:940
const TBlob_id & GetBlob_id(void) const
Get the Blob_id member data.
bool IsGet_blob_id(void) const
Check if variant Get_blob_id is selected.
Definition: ID2_Reply_.hpp:775
bool IsSetBlob_id(void) const
result Check if a value has been assigned to Blob_id data member.
static MDB_envinfo info
Definition: mdb_load.c:37
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
double f(double x_, const double &y_)
Definition: njn_root.hpp:188
static bool IsSNPName(const string &name)
Definition: osg_annot.cpp:165
static bool IsExplicitSNPName(const string &name)
Definition: osg_annot.cpp:159
END_NCBI_NAMESPACE
Definition: osg_annot.cpp:422
BEGIN_NCBI_NAMESPACE
Definition: osg_annot.cpp:47
static bool IsCDDName(const string &name)
Definition: osg_annot.cpp:145
static bool IsPrimarySNPName(const string &name)
Definition: osg_annot.cpp:152
BEGIN_NAMESPACE(psg)
END_NAMESPACE(osg)
Severity GetDiagSeverity()
int GetDebugLevel()
@ eDebug_exchange
const string kOSGProcessorGroupName
#define PSG_ERROR(message)
int TProcessorPriority
const int kUnknownPriority
TProcessorPriority RegisterProcessedName(TProcessorPriority priority, const string &name)
vector< string > GetNotProcessedName(TProcessorPriority priority)
void ReportResultStatus(const string &annot_name, EPSGS_ResultStatus rs)
@ eOpStatusFound
Definition: timing.hpp:61
@ eNAResolve
Definition: timing.hpp:102
Modified on Thu Apr 11 15:03:27 2024 by modify_doxy.py rev. 669887