1 /* $Id: remote_updater.cpp 101887 2024-02-28 18:19:12Z vasilche $
2  * ===========================================================================
3  *
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergiy Gotvyanskyy, NCBI
27  * Colleen Bolin, NCBI
28  *
29  * File Description:
30  * Front-end class for making remote request to MLA and taxon
31  *
32  * ===========================================================================
33  */
34 #include <ncbi_pch.hpp>
39 #include <objects/seq/Pubdesc.hpp>
46 #include <objects/seq/Bioseq.hpp>
48 #include <objmgr/seq_descr_ci.hpp>
49 #include <objmgr/seqdesc_ci.hpp>
50 #include <objmgr/bioseq_ci.hpp>
52 // new
64 #include <common/test_assert.h> /* This header must go last */
71 namespace
72 {
73 TEntrezId FindPMID(const list<CRef<CPub>>& arr)
74 {
75  for (const auto& pPub : arr) {
76  if (pPub->IsPmid()) {
77  return pPub->GetPmid().Get();
78  }
79  }
80  return ZERO_ENTREZ_ID;
81 }
83 static bool s_IsConnectionFailure(EPubmedError errorVal)
84 {
85  switch (errorVal) {
88  return true;
89  default:
90  break;
91  }
92  return false;
93 }
95 static
96 CRef<CPub> s_GetPubFrompmid(CEUtilsUpdater* upd, TEntrezId id, int maxAttempts, IObjtoolsListener* pMessageListener)
97 {
100  int maxCount = max(1, maxAttempts);
101  for (int count = 0; count < maxCount; ++count) {
102  EPubmedError errorVal;
103  result = upd->GetPubmedEntry(id, &errorVal);
104  if (result) {
105  return result;
106  }
108  bool isConnectionError = s_IsConnectionFailure(errorVal);
109  if (isConnectionError && count < maxCount - 1) {
110  continue;
111  }
113  std::ostringstream oss;
114  oss << "Failed to retrieve publication for PMID "
115  << id
116  << ". ";
117  if (isConnectionError) {
118  oss << count + 1 << " attempts made. ";
119  }
120  oss << "Pubmed error: " << errorVal;
121  string msg = oss.str();
122  if (pMessageListener) {
123  pMessageListener->PutMessage(CRemoteUpdaterMessage(msg, errorVal));
124  break;
125  } else {
127  }
128  }
129  return result;
130 }
132 } // end anonymous namespace
134 static const bool kUseBulkTaxonQuery = true;
137 {
138 public:
141  void Init()
142  {
143  if (! m_taxon) {
145  m_cache.reset(new CCachedReplyMap);
146  }
147  }
149  void InitWithTimeout(unsigned seconds, unsigned retries, bool exponential)
150  {
151  if (! m_taxon) {
152  const STimeout timeout = { seconds, 0 };
153  m_taxon.reset(new CTaxon3(timeout, retries, exponential));
154  m_cache.reset(new CCachedReplyMap);
155  }
156  }
158  void ClearCache()
159  {
160  if (m_cache) {
161  m_cache->clear();
162  }
163  }
165  void ReportStats(std::ostream& str)
166  {
167  str << "CRemoteUpdater: cache_hits " << m_cache_hits << " out of " << m_num_requests << " requests\n";
168  }
171  {
173  CRef<CT3Reply> reply = GetOrgReply(org, f_logger);
174  if (reply->IsData() && reply->SetData().IsSetOrg()) {
175  result.Reset(&reply->SetData().SetOrg());
176  }
177  return result;
178  }
181  {
182  m_num_requests++;
183  std::ostringstream os;
184  os << MSerial_AsnText << in_org;
185  CRef<CT3Reply>& reply = (*m_cache)[os.str()];
186  if (reply.Empty()) {
187  CTaxon3_request request;
190  CRef<COrg_ref> org(new COrg_ref);
191  org->Assign(in_org);
192  rq->SetOrg(*org);
194  request.SetRequest().push_back(rq);
195  CRef<CTaxon3_reply> result = m_taxon->SendRequest(request);
196  reply = result->SetReply().front();
197  if (reply->IsError() && f_logger) {
198  const string& error_message =
199  "Taxon update: " +
200  (in_org.IsSetTaxname() ? in_org.GetTaxname() : NStr::NumericToString(in_org.GetTaxId())) + ": " +
201  reply->GetError().GetMessage();
203  f_logger(error_message);
204  } else if (reply->IsData() && reply->SetData().IsSetOrg()) {
205  reply->SetData().SetOrg().ResetSyn();
206  // next will reset 'attrib = specified'
207  // RW-1380 why do we need to reset attrib 'specified' ?
208  //reply->SetData().SetOrg().SetOrgname().SetFormalNameFlag(false);
209  }
210  } else {
211  m_cache_hits++;
212 #ifdef _DEBUG
213  //cerr << "Using cache for:" << os.str() << endl;
214 #endif
215  }
216  return reply;
217  }
220  {
222  if ( kUseBulkTaxonQuery ) {
223  size_t n = query.size();
224  m_num_requests += n;
225  // lookup cache first
226  vector<string> keys(n);
227  vector<CRef<CT3Reply>*> cache_refs(n);
229  for ( size_t i = 0; i < n; ++i ) {
230  std::ostringstream os;
231  os << MSerial_AsnText << *query[i];
232  keys[i] = os.str();
233  CRef<CT3Reply>& reply = (*m_cache)[keys[i]];
234  cache_refs[i] = &reply;
235  if ( reply ) {
236  m_cache_hits++;
237  }
238  else {
239  to_ask[keys[i]] = make_pair(query[i], &reply);
240  }
241  }
242  if ( !to_ask.empty() ) { // there are cache misses
243  // prepare request of new orgs
244  vector<pair<CRef<COrg_ref>, CRef<CT3Reply>*>> to_ask_vec;
245  for ( const auto& it : to_ask ) {
246  to_ask_vec.push_back(it.second);
247  }
248  size_t m = to_ask_vec.size();
249  CTaxon3_request whole_request;
250  auto& requests = whole_request.SetRequest();
251  for ( auto& it : to_ask_vec ) {
253  rq->SetOrg(*it.first);
254  requests.push_back(rq);
255  }
256  // invoke request
258  CRef<CTaxon3_reply> whole_reply = m_taxon->SendRequest(whole_request);
259  LOG_POST(Info<<"Got "<<m<<" taxonomy in "<<sw.Elapsed()<<" s");
260  auto& replies = whole_reply->SetReply();
261  if ( replies.size() != m ) {
262  // requests and replies mismatch
263  if (logger) {
264  const string& error_message =
265  "Taxon update: got " + NStr::NumericToString(replies.size()) +
266  " replies for " + NStr::NumericToString(m) +
267  " requests";
268  logger(error_message);
269  }
270  return result; // Error?
271  }
272  // store replies into cache
273  size_t i = 0;
274  for ( auto& reply : replies ) {
275  auto& ask = to_ask_vec[i++];
276  *ask.second = reply;
277  if (reply->IsError() && logger) {
278  auto& in_org = *ask.first;
279  const string& error_message =
280  "Taxon update: " +
281  (in_org.IsSetTaxname() ? in_org.GetTaxname() : NStr::NumericToString(in_org.GetTaxId())) + ": " +
282  reply->GetError().GetMessage();
284  logger(error_message);
285  } else if (reply->IsData() && reply->SetData().IsSetOrg()) {
286  reply->SetData().SetOrg().ResetSyn();
287  // next will reset 'attrib = specified'
288  // RW-1380 why do we need to reset attrib 'specified' ?
289  //reply->SetData().SetOrg().SetOrgname().SetFormalNameFlag(false);
290  }
291  }
292  }
293  // update results with new cache content
294  for ( size_t i = 0; i < n; ++i ) {
295  result->SetReply().push_back(*cache_refs[i]);
296  }
297  }
298  else {
299  for (const auto& it : query) {
300  result->SetReply().push_back(GetOrgReply(*it, logger));
301  }
302  }
303  return result;
304  }
306 protected:
307  unique_ptr<CTaxon3> m_taxon;
308  unique_ptr<CCachedReplyMap> m_cache;
309  size_t m_num_requests = 0;
310  size_t m_cache_hits = 0;
311 };
314 {
315  auto pub = s_GetPubFrompmid(m_pubmed.get(), id, m_MaxMlaAttempts, m_pMessageListener);
316  if (! (pub && pub->IsMedline())) {
317  return false;
318  }
320  CRef<CPub> new_pub(new CPub);
321  switch (m_pm_pub_type) {
322  case CPub::e_Article:
323  if (! pub->GetMedline().IsSetCit()) {
324  return false;
325  }
326  new_pub->SetArticle().Assign(pub->GetMedline().GetCit());
327  break;
328  case CPub::e_Medline:
329  new_pub->SetMedline().Assign(pub->GetMedline());
330  break;
331  default:
332  return false;
333  }
335  // authors come back in a weird format that we need
336  // to convert to ISO
337  if (new_pub->IsSetAuthors())
340  arr.clear();
341  CRef<CPub> new_pmid(new CPub);
342  new_pmid->SetPmid().Set(id);
343  arr.push_back(new_pmid);
344  arr.push_back(new_pub);
345  return true;
346 }
349 {
350  m_MaxMlaAttempts = maxAttempts;
351 }
353 void CRemoteUpdater::SetTaxonTimeout(unsigned seconds, unsigned retries, bool exponential)
354 {
355  m_TaxonTimeoutSet = true;
356  m_TaxonTimeout = seconds;
357  m_TaxonAttempts = retries;
358  m_TaxonExponential = exponential;
359 }
362 {
363  // default update lambda function
364  m_taxon_update = [this](const vector<CRef<COrg_ref>>& query) -> CRef<CTaxon3_reply>
365  { // we need to make a copy of record to prevent changes put back to cache
368  copied->Assign(*res);
369  return copied;
370  };
373  if (app) {
374  const CNcbiRegistry& cfg = app->GetConfig();
376  if (cfg.HasEntry("RemotePubmedUpdate")) {
377  const string sect = "RemotePubmedUpdate";
379  if (cfg.HasEntry(sect, "URL")) {
380  m_pm_url = cfg.GetString(sect, "URL", {});
381  }
383  if (cfg.HasEntry(sect, "UseCache")) {
384  m_pm_use_cache = cfg.GetBool(sect, "UseCache", true);
385  }
386  }
388  if (cfg.HasEntry("RemoteTaxonomyUpdate")) {
389  const string sect = "RemoteTaxonomyUpdate";
390  int delay = cfg.GetInt(sect, "RetryDelay", 20);
391  if (delay < 0)
392  delay = 20;
393  int count = cfg.GetInt(sect, "RetryCount", 5);
394  if (count < 0)
395  count = 5;
396  bool exponential = cfg.GetBool(sect, "RetryExponentially", false);
398  SetTaxonTimeout(static_cast<unsigned>(delay), static_cast<unsigned>(count), exponential);
399  return true;
400  }
401  }
403  return false;
404 }
407 {
408  if (desc.IsOrg()) {
409  xUpdateOrgTaxname(desc.SetOrg());
410  } else if (desc.IsSource() && desc.GetSource().IsSetOrg()) {
412  }
413 }
416 {
417  if (! m_taxClient) {
418  m_taxClient.reset(new CCachedTaxon3_impl);
419  if (m_TaxonTimeoutSet)
421  else
422  m_taxClient->Init();
423  }
424 }
427 {
428  std::lock_guard<std::mutex> guard(m_Mutex);
430  TTaxId taxid = org.GetTaxId();
431  if (taxid == ZERO_TAX_ID && ! org.IsSetTaxname())
432  return;
434  xInitTaxCache();
436  CRef<COrg_ref> new_org = m_taxClient->GetOrg(org, m_logger);
437  if (new_org.NotEmpty()) {
438  org.Assign(*new_org);
439  }
440 }
443 {
444  static CRemoteUpdater instance{ (IObjtoolsListener*)nullptr };
445  return instance;
446 }
449  m_logger{ logger }, m_pm_normalize(norm)
450 {
451  xSetFromConfig();
452 }
455  m_pMessageListener(pMessageListener), m_pm_normalize(norm)
456 {
457  if (m_pMessageListener) {
458  m_logger = [this](const string& error_message) {
460  };
461  }
462  xSetFromConfig();
463 }
466 {
467 }
470 {
471  std::lock_guard<std::mutex> guard(m_Mutex);
473  if (m_taxClient) {
474  m_taxClient->ClearCache();
475  }
477  if (m_pm_use_cache && m_pubmed) {
478  auto* upd = dynamic_cast<CEUtilsUpdaterWithCache*>(m_pubmed.get());
479  if (upd) {
480  upd->ClearCache();
481  }
482  }
483 }
486 {
487  for (CBioseq_CI it(obj); it; ++it) {
488  xUpdatePubReferences(it->GetEditHandle().SetDescr());
489  }
490 }
493 {
494  if (obj.GetThisTypeInfo()->IsType(CSeq_entry::GetTypeInfo())) {
495  CSeq_entry* entry = static_cast<CSeq_entry*>(&obj);
496  xUpdatePubReferences(*entry);
497  } else if (obj.GetThisTypeInfo()->IsType(CSeq_submit::GetTypeInfo())) {
498  CSeq_submit* submit = static_cast<CSeq_submit*>(&obj);
499  for (auto& it : submit->SetData().SetEntrys()) {
501  }
502  } else if (obj.GetThisTypeInfo()->IsType(CSeq_descr::GetTypeInfo())) {
503  CSeq_descr* desc = static_cast<CSeq_descr*>(&obj);
504  xUpdatePubReferences(*desc);
505  } else if (obj.GetThisTypeInfo()->IsType(CSeqdesc::GetTypeInfo())) {
506  CSeqdesc* desc = static_cast<CSeqdesc*>(&obj);
507  CSeq_descr tmp;
508  tmp.Set().push_back(CRef<CSeqdesc>(desc));
510  }
511 }
514 {
515  if (entry.IsSet()) {
516  for (auto& it : entry.SetSet().SetSeq_set()) {
518  }
519  }
521  if (! entry.IsSetDescr())
522  return;
525 }
528 {
529  std::lock_guard<std::mutex> guard(m_Mutex);
531  for (auto& pDesc : seq_descr.Set()) {
532  if (! pDesc->IsPub() || ! pDesc->GetPub().IsSetPub()) {
533  continue;
534  }
536  auto& arr = pDesc->SetPub().SetPub().Set();
537  if (! m_pubmed) {
538  if (m_pm_use_cache) {
540  } else {
542  }
543  if (! m_pm_url.empty()) {
545  }
546  if (m_pm_interceptor) {
547  m_pubmed->SetPubInterceptor(m_pm_interceptor);
548  }
549  }
551  TEntrezId id = FindPMID(arr);
552  if (id > ZERO_ENTREZ_ID) {
553  xUpdatePubPMID(arr, id);
554  continue;
555  }
557  for (const auto& pPub : arr) {
558  if (pPub->IsArticle()) {
559  id = m_pubmed->CitMatch(*pPub);
560  if (id > ZERO_ENTREZ_ID && xUpdatePubPMID(arr, id)) {
561  break;
562  }
563  }
564  }
565  }
566 }
569 namespace
570 {
571  typedef set<CRef<CSeqdesc>*> TOwnerSet;
572  typedef struct {
573  TOwnerSet owner;
574  CRef<COrg_ref> org_ref;
575  } TOwner;
576  typedef map<string, TOwner> TOrgMap;
577  void _UpdateOrgFromTaxon(CSeq_entry& entry, TOrgMap& m)
578  {
579  if (entry.IsSet()) {
580  for (auto& it : entry.SetSet().SetSeq_set()) {
581  _UpdateOrgFromTaxon(*it, m);
582  }
583  }
585  if (! entry.IsSetDescr())
586  return;
588  for (auto& it : entry.SetDescr().Set()) {
589  CRef<CSeqdesc>& owner = it;
590  CSeqdesc& desc = *owner;
591  CRef<COrg_ref> org_ref;
592  if (desc.IsOrg()) {
593  org_ref.Reset(&desc.SetOrg());
594  } else if (desc.IsSource() && desc.GetSource().IsSetOrg()) {
595  org_ref.Reset(&desc.SetSource().SetOrg());
596  }
597  if (org_ref) {
598  string id;
599  std::ostringstream os;
600  os << MSerial_AsnText << *org_ref;
601  id = os.str();
602  TOwner& v = m[id];
603  v.owner.insert(&owner);
604  v.org_ref = org_ref;
605  }
606  }
607  }
609  void xUpdate(TOwnerSet& owners, COrg_ref& org_ref)
610  {
611  for (auto& owner_it : owners) {
612  if ((*owner_it)->IsOrg()) {
613  (*owner_it)->SetOrg(org_ref);
614  } else if ((*owner_it)->IsSource()) {
615  (*owner_it)->SetSource().SetOrg(org_ref);
616  }
617  }
618  }
619 }
622 {
623  TOrgMap org_to_update;
625  _UpdateOrgFromTaxon(entry, org_to_update);
626  if (org_to_update.empty())
627  return;
629  std::lock_guard<std::mutex> guard(m_Mutex);
631  if (! m_taxClient) {
632  m_taxClient.reset(new CCachedTaxon3_impl);
633  if (m_TaxonTimeoutSet)
635  else
636  m_taxClient->Init();
637  }
639  if ( kUseBulkTaxonQuery ) {
640  vector<CRef<COrg_ref>> reflist;
641  for (auto& it : org_to_update) {
642  reflist.push_back(it.second.org_ref);
643  }
644  m_taxClient->SendOrgRefList(reflist, m_logger);
645  }
647  for (auto& it : org_to_update) {
648  vector<CRef<COrg_ref>> reflist;
649  reflist.push_back(it.second.org_ref);
650  CRef<CTaxon3_reply> reply = m_taxClient->SendOrgRefList(reflist, m_logger);
652  if (reply.NotNull()) {
653  auto& reply_it = reply->SetReply().front();
654  if (reply_it->IsData() && reply_it->SetData().IsSetOrg()) {
655  xUpdate(it.second.owner, reply_it->SetData().SetOrg());
656  }
657  }
658  }
659 }
663 {
664  if (! auth_list.IsSetNames()) {
665  return;
666  }
668  auth_list.ConvertMlToStd(false);
670  if (auth_list.GetNames().IsStd()) {
671  list<CRef<CAuthor>> authors_with_affil;
672  for (CRef<CAuthor>& it : auth_list.SetNames().SetStd()) {
673  if (it->IsSetAffil()) {
674  authors_with_affil.push_back(it);
675  }
676  }
678  if (authors_with_affil.size() == 1) {
679  // we may need to hoist an affiliation
680  if (auth_list.IsSetAffil()) {
681  ERR_POST(Error << "publication contains multiple affiliations");
682  } else {
683  auth_list.SetAffil(authors_with_affil.front()->SetAffil());
684  authors_with_affil.front()->ResetAffil();
685  }
686  }
687  }
688 }
692 {
693  if (obj.IsSet()) {
694  for (CRef<CSeq_entry>& it : obj.SetSet().SetSeq_set()) {
695  PostProcessPubs(*it);
696  }
697  } else if (obj.IsSeq() && obj.IsSetDescr()) {
698  for (CRef<CSeqdesc>& desc_it : obj.SetSeq().SetDescr().Set()) {
699  if (desc_it->IsPub()) {
700  PostProcessPubs(desc_it->SetPub());
701  }
702  }
703  }
704 }
707 {
708  if (! pubdesc.IsSetPub())
709  return;
711  for (CRef<CPub>& it : pubdesc.SetPub().Set()) {
712  if (it->IsSetAuthors()) {
713  ConvertToStandardAuthors(it->SetAuthors());
714  }
715  }
716 }
719 {
720  for (CBioseq_CI bioseq_it(obj); bioseq_it; ++bioseq_it) {
721  for (CSeqdesc_CI desc_it(bioseq_it->GetEditHandle(), CSeqdesc::e_Pub); desc_it; ++desc_it) {
722  PostProcessPubs(const_cast<CPubdesc&>(desc_it->GetPub()));
723  }
724  }
725 }
728 {
729  m_pubmed.reset(pubmedUpdater);
730 }
733 {
734  std::lock_guard<std::mutex> guard(m_Mutex);
736  xInitTaxCache();
738  CRef<CTaxon3_reply> reply = m_taxClient->SendOrgRefList(list, nullptr);
739  return reply;
740 }
742 void CRemoteUpdater::ReportStats(std::ostream& os)
743 {
744  std::lock_guard<std::mutex> guard(m_Mutex);
746  if (m_taxClient) {
747  m_taxClient->ReportStats(os);
748  }
750  if (m_pm_use_cache && m_pubmed) {
751  auto* upd = dynamic_cast<CEUtilsUpdaterWithCache*>(m_pubmed.get());
752  if (upd) {
753  upd->ReportStats(os);
754  }
755  }
756 }
