NCBI C++ ToolKit
psg_loader_impl.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: psg_loader_impl.cpp 102227 2024-04-09 17:28:21Z vasilche $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Eugene Vasilchenko, Aleksey Grichenko
27  *
28  * File Description: PSG data loader
29  *
30  * ===========================================================================
31  */
32 
33 #include <ncbi_pch.hpp>
34 #include <corelib/ncbistd.hpp>
35 #include <corelib/ncbithr.hpp>
36 #include <corelib/ncbi_param.hpp>
38 #include <corelib/ncbi_url.hpp>
56 #include <util/compress/stream.hpp>
57 #include <util/compress/zlib.hpp>
58 #include <serial/objistr.hpp>
59 #include <serial/serial.hpp>
60 #include <serial/iterator.hpp>
61 #include <util/thread_pool.hpp>
62 #include <sstream>
63 
64 #if defined(HAVE_PSG_LOADER)
65 
66 #define LOCK4GET 1
67 #define GLOBAL_CHUNKS 1
68 
70 
71 //#define NCBI_USE_ERRCODE_X PSGLoader
72 //NCBI_DEFINE_ERR_SUBCODE_X(1);
73 
75 
76 const int kSplitInfoChunkId = 999999999;
77 
78 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, DEBUG);
79 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, DEBUG, 1,
80  eParam_NoThread, PSG_LOADER_DEBUG);
81 typedef NCBI_PARAM_TYPE(PSG_LOADER, DEBUG) TPSG_Debug;
82 
83 
84 static unsigned int s_GetDebugLevel()
85 {
86  static auto value = TPSG_Debug::GetDefault();
87  return value;
88 }
89 
90 
91 const unsigned int kMaxWaitSeconds = 3;
92 const unsigned int kMaxWaitMillisec = 0;
93 
94 #define DEFAULT_DEADLINE CDeadline(kMaxWaitSeconds, kMaxWaitMillisec)
95 
96 /////////////////////////////////////////////////////////////////////////////
97 // CPSGBioseqCache
98 /////////////////////////////////////////////////////////////////////////////
99 
100 
102 {
103  string sid = id.GetId();
104  if (sid.empty()) return CSeq_id_Handle();
105  try {
106  return CSeq_id_Handle::GetHandle(sid);
107  }
108  catch (exception& exc) {
109  ERR_POST("CPSGDataLoader: cannot parse Seq-id "<<sid<<": "<<exc.what());
110  }
111  return CSeq_id_Handle();
112 }
113 
114 
115 const int kDefaultCacheLifespanSeconds = 2*3600;
116 const size_t kDefaultMaxCacheSize = 10000;
117 const unsigned int kDefaultRetryCount = 4;
118 const unsigned int kDefaultBulkRetryCount = 8;
119 
120 #define DEFAULT_WAIT_TIME 1
121 #define DEFAULT_WAIT_TIME_MULTIPLIER 1.5
122 #define DEFAULT_WAIT_TIME_INCREMENT 1
123 #define DEFAULT_WAIT_TIME_MAX 30
124 
126  {
127  "wait_time",
128  0,
130  },
131  {
132  "wait_time_max",
133  0,
135  },
136  {
137  "wait_time_multiplier",
138  0,
140  },
141  {
142  "wait_time_increment",
143  0,
145  }
146 };
147 
148 
150 {
151 public:
152  CPSGBioseqCache(int lifespan, size_t max_size)
153  : m_Lifespan(lifespan), m_MaxSize(max_size) {}
155 
156  shared_ptr<SPsgBioseqInfo> Get(const CSeq_id_Handle& idh);
157  shared_ptr<SPsgBioseqInfo> Add(const CPSG_BioseqInfo& info, CSeq_id_Handle req_idh);
158 
159 private:
161  typedef list<shared_ptr<SPsgBioseqInfo> > TInfoQueue;
162 
165  size_t m_MaxSize;
168 };
169 
170 
171 shared_ptr<SPsgBioseqInfo> CPSGBioseqCache::Get(const CSeq_id_Handle& idh)
172 {
173  CFastMutexGuard guard(m_Mutex);
174  auto found = m_Ids.find(idh);
175  if (found == m_Ids.end()) return nullptr;
176  shared_ptr<SPsgBioseqInfo> ret = found->second;
177  m_Infos.remove(ret);
178  if (ret->deadline.IsExpired()) {
179  ITERATE(SPsgBioseqInfo::TIds, id, ret->ids) {
180  m_Ids.erase(*id);
181  }
182  return nullptr;
183  }
184  ret->deadline = CDeadline(m_Lifespan);
185  m_Infos.push_back(ret);
186  return ret;
187 }
188 
189 
190 shared_ptr<SPsgBioseqInfo> CPSGBioseqCache::Add(const CPSG_BioseqInfo& info, CSeq_id_Handle req_idh)
191 {
192  CSeq_id_Handle idh = PsgIdToHandle(info.GetCanonicalId());
193  if (!idh) return nullptr;
194  // Try to find an existing entry (though this should not be a common case).
195  CFastMutexGuard guard(m_Mutex);
196  auto found = m_Ids.find(idh);
197  if (found != m_Ids.end()) {
198  if (!found->second->deadline.IsExpired()) {
199  found->second->Update(info);
200  return found->second;
201  }
202  ITERATE(SPsgBioseqInfo::TIds, id, found->second->ids) {
203  m_Ids.erase(*id);
204  }
205  m_Infos.remove(found->second);
206  }
207  while (!m_Infos.empty() && (m_Infos.size() > m_MaxSize || m_Infos.front()->deadline.IsExpired())) {
208  auto rm = m_Infos.front();
209  m_Infos.pop_front();
210  ITERATE(SPsgBioseqInfo::TIds, id, rm->ids) {
211  m_Ids.erase(*id);
212  }
213  }
214  // Create new entry.
215  shared_ptr<SPsgBioseqInfo> ret = make_shared<SPsgBioseqInfo>(info, m_Lifespan);
216  m_Infos.push_back(ret);
217  if (req_idh) {
218  m_Ids[req_idh] = ret;
219  }
220  ITERATE(SPsgBioseqInfo::TIds, it, ret->ids) {
221  m_Ids[*it] = ret;
222  }
223  return ret;
224 }
225 
226 
227 /////////////////////////////////////////////////////////////////////////////
228 // CPSGAnnotInfoCache
229 /////////////////////////////////////////////////////////////////////////////
230 
232 {
234  typedef list<shared_ptr<CPSG_NamedAnnotInfo>> TInfos;
235 
236  SPsgAnnotInfo(const string& _name,
237  const TIds& _ids,
238  const TInfos& _infos,
239  int lifespan);
240 
241  string name;
245 
246 private:
249 };
250 
251 
253 {
254 public:
255  CPSGAnnotCache(int lifespan, size_t max_size)
256  : m_Lifespan(lifespan), m_MaxSize(max_size) {}
258 
260 
261  shared_ptr<SPsgAnnotInfo> Get(const string& name, const CSeq_id_Handle& idh);
262  shared_ptr<SPsgAnnotInfo> Add(const SPsgAnnotInfo::TInfos& infos,
263  const string& name,
264  const TIds& ids);
265 
266 private:
269  typedef list<shared_ptr<SPsgAnnotInfo>> TInfoQueue;
270 
273  size_t m_MaxSize;
276 };
277 
278 
280  const string& _name,
281  const TIds& _ids,
282  const TInfos& _infos,
283  int lifespan)
284  : name(_name), ids(_ids), infos(_infos), deadline(lifespan)
285 {
286 }
287 
288 
289 shared_ptr<SPsgAnnotInfo> CPSGAnnotCache::Get(const string& name, const CSeq_id_Handle& idh)
290 {
291  CFastMutexGuard guard(m_Mutex);
292  TNameMap::iterator found_name = m_NameMap.find(name);
293  if (found_name == m_NameMap.end()) return nullptr;
294  TIdMap& ids = found_name->second;
295  TIdMap::iterator found_id = ids.find(idh);
296  if (found_id == ids.end()) return nullptr;
297  shared_ptr<SPsgAnnotInfo> ret = found_id->second;
298  m_Infos.remove(ret);
299  if (ret->deadline.IsExpired()) {
300  for (auto& id : ret->ids) {
301  ids.erase(id);
302  }
303  if (ids.empty()) {
304  m_NameMap.erase(found_name);
305  }
306  return nullptr;
307  }
308  ret->deadline = CDeadline(m_Lifespan);
309  m_Infos.push_back(ret);
310  return ret;
311 }
312 
313 
314 shared_ptr<SPsgAnnotInfo> CPSGAnnotCache::Add(
315  const SPsgAnnotInfo::TInfos& infos,
316  const string& name,
317  const TIds& ids)
318 {
319  if (name.empty() || ids.empty()) return nullptr;
320  CFastMutexGuard guard(m_Mutex);
321  // Try to find an existing entry (though this should not be a common case).
322  TNameMap::iterator found_name = m_NameMap.find(name);
323  if (found_name != m_NameMap.end()) {
324  TIdMap& idmap = found_name->second;
325  TIdMap::iterator found = idmap.find(ids.front());
326  if (found != idmap.end()) {
327  if (!found->second->deadline.IsExpired()) {
328  return found->second;
329  }
330  for (auto& id : found->second->ids) {
331  idmap.erase(id);
332  }
333  if (idmap.empty()) {
334  m_NameMap.erase(found_name);
335  }
336  }
337  }
338  while (!m_Infos.empty() && (m_Infos.size() > m_MaxSize || m_Infos.front()->deadline.IsExpired())) {
339  auto rm = m_Infos.front();
340  m_Infos.pop_front();
341  TNameMap::iterator found_name = m_NameMap.find(rm->name);
342  _ASSERT(found_name != m_NameMap.end());
343  for (auto& id : rm->ids) {
344  found_name->second.erase(id);
345  }
346  if (found_name->second.empty() && found_name->first != name) {
347  m_NameMap.erase(found_name);
348  }
349  }
350  // Create new entry.
351  shared_ptr<SPsgAnnotInfo> ret = make_shared<SPsgAnnotInfo>(name, ids, infos, m_Lifespan);
352  m_Infos.push_back(ret);
353  TIdMap& idmap = m_NameMap[name];
354  for (auto& id : ids) {
355  idmap[id] = ret;
356  }
357  return ret;
358 }
359 
360 
361 /////////////////////////////////////////////////////////////////////////////
362 // CPSGCache_Base
363 /////////////////////////////////////////////////////////////////////////////
364 
365 template<class TK, class TV>
367 {
368 public:
369  typedef TK TKey;
370  typedef TV TValue;
372 
373  CPSGCache_Base(int lifespan, size_t max_size, TValue def_val = TValue(nullptr))
374  : m_Default(def_val), m_Lifespan(lifespan), m_MaxSize(max_size) {}
375 
376  TValue Find(const TKey& key) {
377  CFastMutexGuard guard(m_Mutex);
378  x_Expire();
379  auto found = m_Values.find(key);
380  return found != m_Values.end() ? found->second.value : m_Default;
381  }
382 
383  void Add(const TKey& key, const TValue& value) {
384  CFastMutexGuard guard(m_Mutex);
385  auto iter = m_Values.lower_bound(key);
386  if ( iter != m_Values.end() && key == iter->first ) {
387  // erase old value
388  x_Erase(iter++);
389  }
390  // insert
391  iter = m_Values.insert(iter,
392  typename TValues::value_type(key, SNode(value, m_Lifespan)));
393  iter->second.remove_list_iterator = m_RemoveList.insert(m_RemoveList.end(), iter);
394  x_LimitSize();
395  }
396 
397 protected:
398  // Map blob-id to blob info
399  typedef TKey key_type;
401  struct SNode;
403  typedef typename TValues::iterator TValueIter;
404  typedef list<TValueIter> TRemoveList;
405  typedef typename TRemoveList::iterator TRemoveIter;
406  struct SNode {
407  SNode(const mapped_type& value, unsigned lifespan)
408  : value(value),
409  deadline(lifespan)
410  {}
414  };
415 
416  void x_Erase(TValueIter iter) {
417  m_RemoveList.erase(iter->second.remove_list_iterator);
418  m_Values.erase(iter);
419  }
420  void x_Expire() {
421  while ( !m_RemoveList.empty() &&
422  m_RemoveList.front()->second.deadline.IsExpired() ) {
423  x_PopFront();
424  }
425  }
426  void x_LimitSize() {
427  while ( m_Values.size() > m_MaxSize ) {
428  x_PopFront();
429  }
430  }
431  void x_PopFront() {
432  _ASSERT(!m_RemoveList.empty());
433  _ASSERT(m_RemoveList.front() != m_Values.end());
434  _ASSERT(m_RemoveList.front()->second.remove_list_iterator == m_RemoveList.begin());
435  m_Values.erase(m_RemoveList.front());
436  m_RemoveList.pop_front();
437  }
438 
442  size_t m_MaxSize;
445 };
446 
447 
448 /////////////////////////////////////////////////////////////////////////////
449 // CPSGCDDInfoCache
450 /////////////////////////////////////////////////////////////////////////////
451 
452 class CPSGCDDInfoCache : public CPSGCache_Base<string, bool>
453 {
454 public:
455  CPSGCDDInfoCache(int lifespan, size_t max_size)
456  : TParent(lifespan, max_size, false) {}
457 };
458 
459 
460 /////////////////////////////////////////////////////////////////////////////
461 // CPSGBlobMap
462 /////////////////////////////////////////////////////////////////////////////
463 
464 
465 class CPSGBlobMap : public CPSGCache_Base<string, shared_ptr<SPsgBlobInfo>>
466 {
467 public:
468  CPSGBlobMap(int lifespan, size_t max_size)
469  : TParent(lifespan, max_size) {}
470 
471  void DropBlob(const CPsgBlobId& blob_id) {
472  //ERR_POST("DropBlob("<<blob_id.ToPsgId()<<")");
473  CFastMutexGuard guard(m_Mutex);
474  auto iter = m_Values.find(blob_id.ToPsgId());
475  if ( iter != m_Values.end() ) {
476  x_Erase(iter);
477  }
478  }
479 };
480 
481 
482 /////////////////////////////////////////////////////////////////////////////
483 // SPsgBioseqInfo
484 /////////////////////////////////////////////////////////////////////////////
485 
486 
487 SPsgBioseqInfo::SPsgBioseqInfo(const CPSG_BioseqInfo& bioseq_info, int lifespan)
488  : included_info(0),
489  molecule_type(CSeq_inst::eMol_not_set),
490  length(0),
491  state(0),
492  chain_state(0),
493  tax_id(INVALID_TAX_ID),
494  hash(0),
495  deadline(lifespan)
496 {
497  Update(bioseq_info);
498 }
499 
500 
502 {
503 #ifdef NCBI_ENABLE_SAFE_FLAGS
504  TIncludedInfo got_info = bioseq_info.IncludedInfo().get();
505 #else
506  TIncludedInfo got_info = bioseq_info.IncludedInfo();
507 #endif
508  TIncludedInfo new_info = got_info & ~included_info;
509  if ( !new_info ) {
510  return new_info;
511  }
512 
513  DEFINE_STATIC_FAST_MUTEX(s_Mutex);
514  CFastMutexGuard guard(s_Mutex);
515  new_info = got_info & ~included_info;
517  molecule_type = bioseq_info.GetMoleculeType();
518 
520  length = bioseq_info.GetLength();
521 
523  state = bioseq_info.GetState();
524 
526  chain_state = bioseq_info.GetChainState();
527 
529  tax_id = bioseq_info.GetTaxId();
530 
532  hash = bioseq_info.GetHash();
533 
535  canonical = PsgIdToHandle(bioseq_info.GetCanonicalId());
537  ids.push_back(canonical);
538  }
540  gi = bioseq_info.GetGi();
541  if ( gi == INVALID_GI ) {
542  gi = ZERO_GI;
543  }
544  }
545 
547  vector<CPSG_BioId> other_ids = bioseq_info.GetOtherIds();
548  ITERATE(vector<CPSG_BioId>, other_id, other_ids) {
549  // NOTE: Bioseq-info may contain unparseable ids which should be ignored (e.g "gnl|FPAA000046" for GI 132).
550  auto other_idh = PsgIdToHandle(*other_id);
551  if (other_idh) ids.push_back(other_idh);
552  }
553  }
555  blob_id = bioseq_info.GetBlobId().GetId();
556 
558  return new_info;
559 }
560 
561 
563 {
565  if ( state != CPSG_BioseqInfo::eLive ) {
567  }
568  }
570 }
571 
572 
574 {
578  }
579  }
581 }
582 
583 
584 /////////////////////////////////////////////////////////////////////////////
585 // SPsgBlobInfo
586 /////////////////////////////////////////////////////////////////////////////
587 
588 
590  : blob_state_flags(CBioseq_Handle::fState_none)
591 {
592  auto blob_id = blob_info.GetId<CPSG_BlobId>();
593  _ASSERT(blob_id);
594  blob_id_main = blob_id->GetId();
595  id2_info = blob_info.GetId2Info();
596 
600 
601  auto lm = blob_id->GetLastModified(); // last_modified is in milliseconds
602  last_modified = lm.IsNull()? 0: lm.GetValue();
603 }
604 
605 
607  : blob_state_flags(tse.GetBlobState()),
608  last_modified(tse.GetBlobVersion()*60000) // minutes to ms
609 {
610  const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*tse.GetBlobId());
611  blob_id_main = blob_id.ToPsgId();
612  id2_info = blob_id.GetId2Info();
613 }
614 
615 
616 class CPSGIpgTaxIdMap : public CPSGCache_Base<CSeq_id_Handle, TTaxId>
617 {
618 public:
619  CPSGIpgTaxIdMap(int lifespan, size_t max_size)
620  : TParent(lifespan, max_size, INVALID_TAX_ID) {}
621 };
622 
623 
624 /////////////////////////////////////////////////////////////////////////////
625 // CPSG_Task
626 /////////////////////////////////////////////////////////////////////////////
627 
628 template<class TReply> void ReportStatus(TReply reply, EPSG_Status status)
629 {
630  if (status == EPSG_Status::eSuccess) return;
631  string sstatus;
632  switch (status) {
634  sstatus = "Canceled";
635  break;
636  case EPSG_Status::eError:
637  sstatus = "Error";
638  break;
640  sstatus = "In progress";
641  break;
643  sstatus = "Not found";
644  break;
646  sstatus = "Forbidden";
647  break;
648  default:
649  sstatus = to_string((int)status);
650  break;
651  }
652  while (true) {
653  string msg = reply->GetNextMessage();
654  if (msg.empty()) break;
655  _TRACE("Request failed: " << sstatus << " - " << msg << " @ "<<CStackTrace());
656  }
657 }
658 
659 
660 class CPSG_TaskGroup;
661 
662 
664 {
665 public:
666  typedef shared_ptr<CPSG_Reply> TReply;
667 
668  CPSG_Task(TReply reply, CPSG_TaskGroup& group);
669  ~CPSG_Task(void) override {}
670 
671  EStatus Execute(void) override;
672  virtual void Finish(void) = 0;
673 
674  bool GotNotFound() const {
675  return m_GotNotFound;
676  }
677  bool GotForbidden() const {
678  return m_GotForbidden;
679  }
680 
681 protected:
682  void OnStatusChange(EStatus old) override;
683 
684  TReply& GetReply(void) { return m_Reply; }
685 
686  virtual void DoExecute(void) {
687  if (!CheckReplyStatus()) return;
688  ReadReply();
690  }
691 
692  bool IsCancelled(void) {
693  if (IsCancelRequested()) {
694  m_Status = eFailed;
695  return true;
696  }
697  return false;
698  }
699 
700  bool CheckReplyStatus(void);
701  void ReadReply(void);
702  virtual void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) = 0;
703 
708 private:
710 };
711 
712 
713 // It may happen that a CThreadPool's thread holds CRef to a task longer than
714 // the loader exists. In this case the task needs to release some data
715 // (e.g. load locks) before the loader is destroyed. The guard calls
716 // Finish() to do the cleanup.
718 {
719 public:
720  CPSG_Task_Guard(CPSG_Task& task) : m_Task(&task) {}
722  void Resease(void) { m_Task.Reset(); }
723 private:
726 
728 };
729 
730 
732 {
733 public:
735  : m_Pool(pool), m_Semaphore(0, kMax_UInt) {}
736 
737  void AddTask(CPSG_Task* task) {
738  {
739  CMutexGuard guard(m_Mutex);
740  m_Tasks.insert(Ref(task));
741  m_Pool.AddTask(task);
742  }
743  }
744 
746  {
747  {
748  CRef<CPSG_Task> ref(&task);
749  CMutexGuard guard(m_Mutex);
750  TTasks::iterator it = m_Tasks.find(ref);
751  if (it == m_Tasks.end()) return;
752  m_Done.insert(ref);
753  m_Tasks.erase(it);
754  }
755  m_Semaphore.Post();
756  }
757 
758  bool HasTasks(void) const
759  {
760  CMutexGuard guard(m_Mutex);
761  return !m_Tasks.empty() || ! m_Done.empty();
762  }
763 
764  void WaitAll(void) {
765  while (HasTasks()) GetTask<CPSG_Task>();
766  }
767 
768  template<class T>
769  CRef<T> GetTask(void) {
770  m_Semaphore.Wait();
771  CRef<T> ret;
772  CMutexGuard guard(m_Mutex);
773  _ASSERT(!m_Done.empty());
775  ret.Reset(dynamic_cast<T*>(it->GetNCPointerOrNull()));
776  m_Done.erase(it);
777  return ret;
778  }
779 
780  void CancelAll(void)
781  {
782  {
783  TTasks tasks;
784  {
785  CMutexGuard guard(m_Mutex);
786  tasks = m_Tasks;
787  }
788  for (CRef<CPSG_Task> task : tasks) {
789  task->RequestToCancel();
790  }
791  }
792  WaitAll();
793  }
794 
795 private:
797 
802  mutable CMutex m_Mutex;
803 };
804 
805 
807  : m_Reply(reply),
808  m_Status(eIdle),
809  m_GotNotFound(false),
810  m_GotForbidden(false),
811  m_Group(group)
812 {
813 }
814 
815 
817 {
819  try {
820  DoExecute();
821  }
822  catch (CException& exc) {
823  LOG_POST("CPSGDataLoader: exception in retrieval thread: "<<exc);
824  return eFailed;
825  }
826  catch (exception& exc) {
827  LOG_POST("CPSGDataLoader: exception in retrieval thread: "<<exc.what());
828  return eFailed;
829  }
830  return m_Status;
831 }
832 
833 
835 {
836  EStatus status = GetStatus();
837  if (status == eCompleted || status == eFailed || status == eCanceled) {
838  m_Group.PostFinished(*this);
839  }
840 }
841 
843 {
844  EPSG_Status status = m_Reply->GetStatus(CDeadline::eInfinite);
845  if (status != EPSG_Status::eSuccess) {
846  ReportStatus(m_Reply, status);
847  if ( status == EPSG_Status::eNotFound ) {
848  m_GotNotFound = true;
850  return false;
851  }
852  if ( status == EPSG_Status::eForbidden ) {
853  m_GotForbidden = true;
855  return false;
856  }
857  m_Status = eFailed;
858  return false;
859  }
860  return true;
861 }
862 
863 
865 {
866  EPSG_Status status;
867  for (;;) {
868  if (IsCancelled()) return;
869  auto reply_item = m_Reply->GetNextItem(DEFAULT_DEADLINE);
870  if (!reply_item) continue;
871  if (reply_item->GetType() == CPSG_ReplyItem::eEndOfReply) break;
872  if (IsCancelled()) return;
873 
874  EPSG_Status status = reply_item->GetStatus(CDeadline::eInfinite);
875  if (IsCancelled()) return;
876  if (status != EPSG_Status::eSuccess) {
877  ReportStatus(reply_item, status);
878  if ( status == EPSG_Status::eNotFound ) {
879  m_GotNotFound = true;
880  continue;
881  }
882  if ( status == EPSG_Status::eForbidden ) {
883  m_GotForbidden = true;
884  continue;
885  }
886  m_Status = eFailed;
887  return;
888  }
889  ProcessReplyItem(reply_item);
890  }
891  if (IsCancelled()) return;
892  status = m_Reply->GetStatus(CDeadline::eInfinite);
893  if (status == EPSG_Status::eNotFound) {
894  m_GotNotFound = true;
895  ReportStatus(m_Reply, status);
896  return;
897  }
898  if (status != EPSG_Status::eSuccess) {
899  ReportStatus(m_Reply, status);
900  m_Status = eFailed;
901  }
902 }
903 
904 
906 {
907 public:
909  : m_Semaphore(0, kMax_UInt), m_Loader(loader)
910  {}
911  ~CPSG_PrefetchCDD_Task(void) override
912  {}
913 
914  EStatus Execute(void) override;
915 
916  void AddRequest(const CDataLoader::TIds& ids)
917  {
918  CFastMutexGuard guard(m_Mutex);
919  m_Ids.push_back(ids);
920  m_Semaphore.Post();
921  }
922 
923  void Cancel(void)
924  {
925  RequestToCancel();
926  m_Semaphore.Post();
927  }
928 
929 private:
933  list<CDataLoader::TIds> m_Ids;
934 };
935 
936 
938 {
939  while (true) {
940  m_Semaphore.Wait();
941  if (IsCancelRequested()) return eCanceled;
942  CDataLoader::TIds ids;
943  {
944  CFastMutexGuard guard(m_Mutex);
945  if (m_Ids.empty()) continue;
946  ids = m_Ids.front();
947  m_Ids.pop_front();
948  }
949  try {
950  m_Loader.PrefetchCDD(ids);
951  }
952  catch (CException& exc) {
953  LOG_POST("CPSGDataLoader: exception in CDD prefetch thread: "<<exc);
954  }
955  catch (exception& exc) {
956  LOG_POST("CPSGDataLoader: exception in CDD prefetch thread: "<<exc.what());
957  }
958  }
959  // Never executed
960  return eCompleted;
961 }
962 
963 
964 /////////////////////////////////////////////////////////////////////////////
965 // CPSGDataLoader_Impl
966 /////////////////////////////////////////////////////////////////////////////
967 
968 #define NCBI_PSGLOADER_NAME "psg_loader"
969 #define NCBI_PSGLOADER_SERVICE_NAME "service_name"
970 #define NCBI_PSGLOADER_NOSPLIT "no_split"
971 #define NCBI_PSGLOADER_WHOLE_TSE "whole_tse"
972 #define NCBI_PSGLOADER_WHOLE_TSE_BULK "whole_tse_bulk"
973 #define NCBI_PSGLOADER_ADD_WGS_MASTER "add_wgs_master"
974 //#define NCBI_PSGLOADER_RETRY_COUNT "retry_count"
975 
976 NCBI_PARAM_DECL(string, PSG_LOADER, SERVICE_NAME);
977 NCBI_PARAM_DEF_EX(string, PSG_LOADER, SERVICE_NAME, "PSG2",
978  eParam_NoThread, PSG_LOADER_SERVICE_NAME);
979 typedef NCBI_PARAM_TYPE(PSG_LOADER, SERVICE_NAME) TPSG_ServiceName;
980 
981 NCBI_PARAM_DECL(bool, PSG_LOADER, WHOLE_TSE);
982 NCBI_PARAM_DEF_EX(bool, PSG_LOADER, WHOLE_TSE, false,
983  eParam_NoThread, PSG_LOADER_WHOLE_TSE);
984 typedef NCBI_PARAM_TYPE(PSG_LOADER, WHOLE_TSE) TPSG_WholeTSE;
985 
986 NCBI_PARAM_DECL(bool, PSG_LOADER, WHOLE_TSE_BULK);
987 NCBI_PARAM_DEF_EX(bool, PSG_LOADER, WHOLE_TSE_BULK, true,
988  eParam_NoThread, PSG_LOADER_WHOLE_TSE_BULK);
989 typedef NCBI_PARAM_TYPE(PSG_LOADER, WHOLE_TSE_BULK) TPSG_WholeTSEBulk;
990 
991 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, MAX_POOL_THREADS);
992 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, MAX_POOL_THREADS, 10,
993  eParam_NoThread, PSG_LOADER_MAX_POOL_THREADS);
994 typedef NCBI_PARAM_TYPE(PSG_LOADER, MAX_POOL_THREADS) TPSG_MaxPoolThreads;
995 
996 NCBI_PARAM_DECL(bool, PSG_LOADER, PREFETCH_CDD);
997 NCBI_PARAM_DEF_EX(bool, PSG_LOADER, PREFETCH_CDD, false,
998  eParam_NoThread, PSG_LOADER_PREFETCH_CDD);
999 typedef NCBI_PARAM_TYPE(PSG_LOADER, PREFETCH_CDD) TPSG_PrefetchCDD;
1000 
1001 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, RETRY_COUNT);
1002 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, RETRY_COUNT, kDefaultRetryCount,
1003  eParam_NoThread, PSG_LOADER_RETRY_COUNT);
1004 typedef NCBI_PARAM_TYPE(PSG_LOADER, RETRY_COUNT) TPSG_RetryCount;
1005 
1006 NCBI_PARAM_DECL(unsigned int, PSG_LOADER, BULK_RETRY_COUNT);
1007 NCBI_PARAM_DEF_EX(unsigned int, PSG_LOADER, BULK_RETRY_COUNT, kDefaultBulkRetryCount,
1008  eParam_NoThread, PSG_LOADER_BULK_RETRY_COUNT);
1009 typedef NCBI_PARAM_TYPE(PSG_LOADER, BULK_RETRY_COUNT) TPSG_BulkRetryCount;
1010 
1011 NCBI_PARAM_DECL(bool, PSG_LOADER, IPG_TAX_ID);
1012 NCBI_PARAM_DEF_EX(bool, PSG_LOADER, IPG_TAX_ID, false, eParam_NoThread, PSG_LOADER_IPG_TAX_ID);
1013 typedef NCBI_PARAM_TYPE(PSG_LOADER, IPG_TAX_ID) TPSG_IpgTaxIdEnabled;
1014 
1015 
1016 template<class TParamType>
1017 static void s_ConvertParamValue(TParamType& value, const string& str)
1018 {
1019  value = NStr::StringToNumeric<TParamType>(str);
1020 }
1021 
1022 
1023 template<>
1024 void s_ConvertParamValue<bool>(bool& value, const string& str)
1025 {
1027 }
1028 
1029 
1031  const string& name)
1032 {
1033  return params? params->FindSubNode(name): 0;
1034 }
1035 
1036 
1037 template<class TParamDescription>
1038 static typename TParamDescription::TValueType s_GetParamValue(const TPluginManagerParamTree* config)
1039 {
1040  typedef CParam<TParamDescription> TParam;
1041  typename TParam::TValueType value = TParam::GetDefault();
1043  TParam::GetState(0, &source);
1047  if ( const TPluginManagerParamTree* node = s_FindSubNode(config, TParamDescription::sm_ParamDescription.name) ) {
1048  s_ConvertParamValue<typename TParam::TValueType>(value, node->GetValue().value);
1049  }
1050  }
1051  return value;
1052 }
1053 
1054 
1056  : m_ThreadPool(new CThreadPool(kMax_UInt, TPSG_MaxPoolThreads::GetDefault())),
1057  m_WaitTime(s_WaitTimeParams)
1058 {
1059  unique_ptr<CPSGDataLoader::TParamTree> app_params;
1060  const CPSGDataLoader::TParamTree* psg_params = 0;
1061  if (params.GetParamTree()) {
1063  }
1064  else {
1066  if (app) {
1067  app_params.reset(CConfig::ConvertRegToTree(app->GetConfig()));
1068  psg_params = CPSGDataLoader::GetParamsSubnode(app_params.get(), NCBI_PSGLOADER_NAME);
1069  }
1070  }
1071 
1072  string service_name = params.GetPSGServiceName();
1073  if (service_name.empty() && psg_params) {
1074  service_name = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_SERVICE_NAME);
1075  }
1076  if (service_name.empty()) {
1077  service_name = TPSG_ServiceName::GetDefault();
1078  }
1079 
1080  bool no_split = params.GetPSGNoSplit();
1081  if (psg_params) {
1082  try {
1084  if (!value.empty()) {
1085  no_split = NStr::StringToBool(value);
1086  }
1087  }
1088  catch (CException&) {
1089  }
1090  }
1091  if ( no_split ) {
1094  }
1095  else {
1096  m_TSERequestMode = (s_GetParamValue<X_NCBI_PARAM_DECLNAME(PSG_LOADER, WHOLE_TSE)>(psg_params)?
1099  m_TSERequestModeBulk = (s_GetParamValue<X_NCBI_PARAM_DECLNAME(PSG_LOADER, WHOLE_TSE_BULK)>(psg_params)?
1102  }
1103 
1104  m_AddWGSMasterDescr = true;
1105  if ( psg_params ) {
1106  string param = CPSGDataLoader::GetParam(psg_params, NCBI_PSGLOADER_ADD_WGS_MASTER);
1107  if ( !param.empty() ) {
1108  try {
1110  }
1111  catch ( CException& exc ) {
1112  NCBI_RETHROW_FMT(exc, CLoaderException, eBadConfig,
1113  "Bad value of parameter "
1115  ": \""<<param<<"\"");
1116  }
1117  }
1118  }
1119 
1121  size_t cache_max_size = kDefaultMaxCacheSize;
1122  if (psg_params) {
1123  try {
1125  if (!value.empty()) {
1126  m_CacheLifespan = NStr::StringToNumeric<int>(value);
1127  }
1128  }
1129  catch (CException&) {
1130  }
1131  try {
1133  if (!value.empty()) {
1134  cache_max_size = NStr::StringToNumeric<size_t>(value);
1135  }
1136  }
1137  catch (CException&) {
1138  }
1139  }
1140 
1141  m_RetryCount =
1142  s_GetParamValue<X_NCBI_PARAM_DECLNAME(PSG_LOADER, RETRY_COUNT)>(psg_params);
1144  s_GetParamValue<X_NCBI_PARAM_DECLNAME(PSG_LOADER, BULK_RETRY_COUNT)>(psg_params);
1145  if ( psg_params ) {
1146  CConfig conf(psg_params);
1148  }
1149 
1150  m_BioseqCache.reset(new CPSGBioseqCache(m_CacheLifespan, cache_max_size));
1151  m_AnnotCache.reset(new CPSGAnnotCache(m_CacheLifespan, cache_max_size));
1152  m_BlobMap.reset(new CPSGBlobMap(m_CacheLifespan, cache_max_size));
1153 
1154  {{
1155  m_Queue = make_shared<CPSG_Queue>(service_name);
1157  if ( !params.GetWebCookie().empty() ) {
1159  m_RequestContext->SetProperty("auth_token", params.GetWebCookie());
1160  }
1161  }}
1162 
1163  m_CDDInfoCache.reset(new CPSGCDDInfoCache(m_CacheLifespan, cache_max_size));
1164  if (TPSG_PrefetchCDD::GetDefault()) {
1166  m_ThreadPool->AddTask(m_CDDPrefetchTask);
1167  }
1168 
1169  if (TPSG_IpgTaxIdEnabled::GetDefault()) {
1170  m_IpgTaxIdMap.reset(new CPSGIpgTaxIdMap(m_CacheLifespan, cache_max_size));
1171  }
1172 
1173  CUrlArgs args;
1174  if (params.IsSetEnableSNP()) {
1175  args.AddValue(params.GetEnableSNP() ? "enable_processor" : "disable_processor", "snp");
1176  }
1177  if (params.IsSetEnableWGS()) {
1178  args.AddValue(params.GetEnableWGS() ? "enable_processor" : "disable_processor", "wgs");
1179  }
1180  if (params.IsSetEnableCDD()) {
1181  args.AddValue(params.GetEnableCDD() ? "enable_processor" : "disable_processor", "cdd");
1182  }
1183  if (!args.GetArgs().empty()) {
1184  m_Queue->SetUserArgs(SPSG_UserArgs(args));
1185  }
1186 }
1187 
1188 
1190 {
1191  if (m_CDDPrefetchTask) {
1193  }
1194  // Make sure thread pool is destroyed before any tasks (e.g. CDD prefetch)
1195  // and stops them all before the loader is destroyed.
1196  m_ThreadPool.reset();
1197 }
1198 
1199 
1200 static bool CannotProcess(const CSeq_id_Handle& sih)
1201 {
1202  if ( !sih ) {
1203  return true;
1204  }
1205  if ( sih.Which() == CSeq_id::e_Local ) {
1206  return true;
1207  }
1208  if ( sih.Which() == CSeq_id::e_General ) {
1209  if ( NStr::EqualNocase(sih.GetSeqId()->GetGeneral().GetDb(), "SRA") ) {
1210  // SRA is good
1211  return false;
1212  }
1213  if ( NStr::StartsWith(sih.GetSeqId()->GetGeneral().GetDb(), "WGS:", NStr::eNocase) ) {
1214  // WGS is good
1215  return false;
1216  }
1217  // other general ids are good too(?)
1218  return false;
1219  }
1220  return false;
1221 }
1222 
1223 
1224 template<class Call>
1227  const char* name,
1228  int retry_count)
1229 {
1230  if ( retry_count == 0 ) {
1231  retry_count = m_RetryCount;
1232  }
1233  for ( int t = 1; t < retry_count; ++ t ) {
1234  try {
1235  return call();
1236  }
1237  catch ( CBlobStateException& ) {
1238  // no retry
1239  throw;
1240  }
1241  catch ( CLoaderException& exc ) {
1242  if ( exc.GetErrCode() == exc.eConnectionFailed ||
1243  exc.GetErrCode() == exc.eLoaderFailed ) {
1244  // can retry
1245  LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception: "<<exc);
1246  }
1247  else {
1248  // no retry
1249  throw;
1250  }
1251  }
1252  catch ( CException& exc ) {
1253  LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception: "<<exc);
1254  }
1255  catch ( exception& exc ) {
1256  LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception: "<<exc.what());
1257  }
1258  catch ( ... ) {
1259  LOG_POST(Warning<<"CPSGDataLoader::"<<name<<"() try "<<t<<" exception");
1260  }
1261  if ( t >= 2 ) {
1262  double wait_sec = m_WaitTime.GetTime(t-2);
1263  LOG_POST(Warning<<"CPSGDataLoader: waiting "<<wait_sec<<"s before retry");
1264  SleepMilliSec(Uint4(wait_sec*1000));
1265  }
1266  }
1267  return call();
1268 }
1269 
1270 
1272 {
1274  cref(idh), ref(ids)),
1275  "GetIds");
1276 }
1277 
1278 
1280 {
1281  if ( CannotProcess(idh) ) {
1282  return;
1283  }
1284  auto seq_info = x_GetBioseqInfo(idh);
1285  if (!seq_info) return;
1286 
1287  ITERATE(SPsgBioseqInfo::TIds, it, seq_info->ids) {
1288  ids.push_back(*it);
1289  }
1290 }
1291 
1292 
1295 {
1296  return CallWithRetry(bind(&CPSGDataLoader_Impl::GetGiOnce, this,
1297  cref(idh)),
1298  "GetGi");
1299 }
1300 
1301 
1304 {
1305  if ( CannotProcess(idh) ) {
1306  return CDataLoader::SGiFound();
1307  }
1309  auto seq_info = x_GetBioseqInfo(idh);
1310  if (seq_info) {
1311  ret.sequence_found = true;
1312  if ( seq_info->gi != ZERO_GI ) {
1313  ret.gi = seq_info->gi;
1314  }
1315  }
1316  return ret;
1317 }
1318 
1319 
1322 {
1324  cref(idh)),
1325  "GetAccVer");
1326 }
1327 
1328 
1331 {
1332  if ( CannotProcess(idh) ) {
1333  return CDataLoader::SAccVerFound();
1334  }
1336  auto seq_info = x_GetBioseqInfo(idh);
1337  if (seq_info) {
1338  ret.sequence_found = true;
1339  if ( seq_info->canonical.IsAccVer() ) {
1340  ret.acc_ver = seq_info->canonical;
1341  }
1342  }
1343  return ret;
1344 }
1345 
1346 
1348 {
1350  cref(idh)),
1351  "GetTaxId");
1352 }
1353 
1354 
1356 {
1357  if ( CannotProcess(idh) ) {
1358  return INVALID_TAX_ID;
1359  }
1360  auto tax_id = x_GetIpgTaxId(idh);
1361  if (tax_id != INVALID_TAX_ID) return tax_id;
1362  auto seq_info = x_GetBioseqInfo(idh);
1363  return seq_info ? seq_info->tax_id : INVALID_TAX_ID;
1364 }
1365 
1366 
1367 void CPSGDataLoader_Impl::GetTaxIds(const TIds& ids, TLoaded& loaded, TTaxIds& ret)
1368 {
1370  cref(ids), ref(loaded), ref(ret)), "GetTaxId");
1371 }
1372 
1373 
1375 {
1376  x_GetIpgTaxIds(ids, loaded, ret);
1377  for (size_t i = 0; i < ids.size(); ++i) {
1378  if (loaded[i]) continue;
1379  TTaxId taxid = GetTaxId(ids[i]);
1380  if ( taxid != INVALID_TAX_ID ) {
1381  ret[i] = taxid;
1382  loaded[i] = true;
1383  }
1384  }
1385 }
1386 
1387 
1389 {
1391  cref(idh)),
1392  "GetSequenceLength");
1393 }
1394 
1395 
1397 {
1398  if ( CannotProcess(idh) ) {
1399  return kInvalidSeqPos;
1400  }
1401  auto seq_info = x_GetBioseqInfo(idh);
1402  return (seq_info && seq_info->length > 0) ? TSeqPos(seq_info->length) : kInvalidSeqPos;
1403 }
1404 
1405 
1408 {
1410  cref(idh)),
1411  "GetSequenceHash");
1412 }
1413 
1414 
1417 {
1418  if ( CannotProcess(idh) ) {
1419  return CDataLoader::SHashFound();
1420  }
1422  auto seq_info = x_GetBioseqInfo(idh);
1423  if (seq_info) {
1424  ret.sequence_found = true;
1425  if ( seq_info->hash ) {
1426  ret.hash_known = true;
1427  ret.hash = seq_info->hash;
1428  }
1429  }
1430  return ret;
1431 }
1432 
1433 
1436 {
1438  cref(idh)),
1439  "GetSequenceType");
1440 }
1441 
1442 
1445 {
1446  if ( CannotProcess(idh) ) {
1447  return CDataLoader::STypeFound();
1448  }
1450  auto seq_info = x_GetBioseqInfo(idh);
1451  if (seq_info && seq_info->molecule_type != CSeq_inst::eMol_not_set) {
1452  ret.sequence_found = true;
1453  ret.type = seq_info->molecule_type;
1454  }
1455  return ret;
1456 }
1457 
1458 
1460 {
1462  data_source, cref(idh)),
1463  "GetSequenceState");
1464 }
1465 
1466 
1468 {
1469  const int kNotFound = (CBioseq_Handle::fState_not_found |
1471  if ( CannotProcess(idh) ) {
1472  return kNotFound;
1473  }
1474  auto info = x_GetBioseqAndBlobInfo(data_source, idh);
1475  if ( !info.first ) {
1476  return kNotFound;
1477  }
1478  CBioseq_Handle::TBioseqStateFlags state = info.first->GetBioseqStateFlags();
1479  if ( info.second ) {
1480  state |= info.second->blob_state_flags;
1481  if (!(info.first->GetBioseqStateFlags() & CBioseq_Handle::fState_dead) &&
1482  !(info.first->GetChainStateFlags() & CBioseq_Handle::fState_dead)) {
1483  state &= ~CBioseq_Handle::fState_dead;
1484  }
1485  }
1486  return state;
1487 }
1488 
1489 
1490 struct SCDDIds
1491 {
1494 };
1495 
1496 static SCDDIds x_GetCDDIds(const CDataLoader::TIds& ids);
1497 static bool x_IsLocalCDDEntryId(const CPsgBlobId& blob_id);
1498 static bool x_ParseLocalCDDEntryId(const CPsgBlobId& blob_id, SCDDIds& ids);
1499 static CTSE_Lock x_CreateLocalCDDEntry(CDataSource* data_source, const SCDDIds& ids);
1500 static string x_MakeLocalCDDEntryId(const SCDDIds& cdd_ids);
1501 
1502 
1505  const CSeq_id_Handle& idh,
1506  CDataLoader::EChoice choice)
1507 {
1509  data_source, cref(idh), choice),
1510  "GetRecords");
1511 }
1512 
1513 
1516  const CSeq_id_Handle& idh,
1517  CDataLoader::EChoice choice)
1518 {
1520  if (choice == CDataLoader::eOrphanAnnot) {
1521  // PSG loader doesn't provide orphan annotations
1522  return locks;
1523  }
1524  if ( CannotProcess(idh) ) {
1525  return locks;
1526  }
1527 
1529  if (data_source) {
1530  inc_data = m_TSERequestMode;
1531  }
1532 
1533  CPSG_BioId bio_id(idh);
1534  auto request = make_shared<CPSG_Request_Biodata>(std::move(bio_id));
1535  if (data_source) {
1536  CDataSource::TLoadedBlob_ids loaded_blob_ids;
1537  data_source->GetLoadedBlob_ids(idh, CDataSource::fKnown_bioseqs, loaded_blob_ids);
1538  ITERATE(CDataSource::TLoadedBlob_ids, loaded_blob_id, loaded_blob_ids) {
1539  const CPsgBlobId* pbid = dynamic_cast<const CPsgBlobId*>(&**loaded_blob_id);
1540  if (!pbid) continue;
1541  request->ExcludeTSE(CPSG_BlobId(pbid->ToPsgId()));
1542  }
1543  }
1544  request->IncludeData(inc_data);
1545  auto reply = x_SendRequest(request);
1546  CTSE_Lock tse_lock = x_ProcessBlobReply(reply, data_source, idh, true, true).lock;
1547 
1548  if (!tse_lock) {
1549  // TODO: return correct state with CBlobStateException
1550  if ( 0 ) {
1551  NCBI_THROW(CLoaderException, eLoaderFailed,
1552  "error loading blob for " + idh.AsString());
1553  }
1554  }
1555  else {
1556  locks.insert(tse_lock);
1557  if (m_CDDPrefetchTask) {
1558  auto bioseq_info = m_BioseqCache->Get(idh);
1559  if (bioseq_info) {
1560  auto cdd_ids = x_GetCDDIds(bioseq_info->ids);
1561  if (cdd_ids.gi && cdd_ids.acc_ver && !m_CDDInfoCache->Find(x_MakeLocalCDDEntryId(cdd_ids))) {
1562  m_CDDPrefetchTask->AddRequest(bioseq_info->ids);
1563  }
1564  }
1565  }
1566  }
1567  return locks;
1568 }
1569 
1570 
1572 {
1574  cref(idh)),
1575  "GetBlobId");
1576 }
1577 
1578 
1580 {
1581  if ( CannotProcess(idh) ) {
1582  return null;
1583  }
1584  string blob_id = x_GetCachedBlobId(idh);
1585  if ( blob_id.empty() ) {
1586  CPSG_BioId bio_id(idh);
1587  auto request = make_shared<CPSG_Request_Biodata>(std::move(bio_id));
1588  request->IncludeData(CPSG_Request_Biodata::eNoTSE);
1589  auto reply = x_SendRequest(request);
1590  blob_id = x_ProcessBlobReply(reply, nullptr, idh, true).blob_id;
1591  }
1592  CRef<CPsgBlobId> ret;
1593  if ( !blob_id.empty() ) {
1594  ret.Reset(new CPsgBlobId(blob_id));
1595  }
1596  return ret;
1597 }
1598 
1599 
1600 static bool s_GetBlobByIdShouldFail = false;
1601 
1603 {
1605 }
1606 
1607 
1609 {
1610  return s_GetBlobByIdShouldFail;
1611 }
1612 
1613 
1615 {
1617  data_source, cref(blob_id)),
1618  "GetBlobById",
1619  GetGetBlobByIdShouldFail()? 1: 0);
1620 }
1621 
1622 
1624 {
1625  if (!data_source) return CTSE_Lock();
1626 
1627  if ( GetGetBlobByIdShouldFail() ) {
1628  _TRACE("GetBlobById("<<blob_id.ToPsgId()<<") should fail");
1629  }
1630 #ifdef LOCK4GET
1631  CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(&blob_id);
1632  CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
1633  if ( load_lock.IsLoaded() ) {
1634  _TRACE("GetBlobById() already loaded " << blob_id.ToPsgId());
1635  return load_lock;
1636  }
1637 #else
1638  CTSE_LoadLock load_lock;
1639  {{
1640  CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(&blob_id);
1641  load_lock = data_source->GetTSE_LoadLockIfLoaded(dl_blob_id);
1642  if ( load_lock && load_lock.IsLoaded() ) {
1643  _TRACE("GetBlobById() already loaded " << blob_id.ToPsgId());
1644  return load_lock;
1645  }
1646  }}
1647 #endif
1648 
1649  CTSE_Lock ret;
1650  if ( x_IsLocalCDDEntryId(blob_id) ) {
1651  if ( s_GetDebugLevel() >= 5 ) {
1652  LOG_POST(Info<<"PSG loader: Re-loading CDD blob: " << blob_id.ToString());
1653  }
1654  SCDDIds cdd_ids;
1655  if ( x_ParseLocalCDDEntryId(blob_id, cdd_ids) ) {
1656  ret = x_CreateLocalCDDEntry(data_source, cdd_ids);
1657  }
1658  }
1659  else {
1660  CPSG_BlobId bid(blob_id.ToPsgId());
1661  auto request = make_shared<CPSG_Request_Blob>(bid);
1662  request->IncludeData(m_TSERequestMode);
1663  auto reply = x_SendRequest(request);
1664  ret = x_ProcessBlobReply(reply, data_source, CSeq_id_Handle(), true, false, &load_lock).lock;
1665  }
1666  if (!ret) {
1667  _TRACE("Failed to load blob for " << blob_id.ToPsgId()<<" @ "<<CStackTrace());
1668  NCBI_THROW(CLoaderException, eLoaderFailed,
1669  "CPSGDataLoader::GetBlobById("+blob_id.ToPsgId()+") failed");
1670  }
1671  return ret;
1672 }
1673 
1674 
1676 {
1677 public:
1679  TReply reply,
1680  CPSG_TaskGroup& group,
1681  const CSeq_id_Handle& idh,
1682  CDataSource* data_source,
1683  CPSGDataLoader_Impl& loader,
1684  bool lock_asap = false,
1685  CTSE_LoadLock* load_lock_ptr = nullptr)
1686  : CPSG_Task(reply, group),
1687  m_Id(idh),
1688  m_DataSource(data_source),
1689  m_Loader(loader),
1690  m_LockASAP(lock_asap),
1691  m_LoadLockPtr(load_lock_ptr)
1692  {
1693  }
1694 
1695  ~CPSG_Blob_Task(void) override {}
1696 
1698  SAutoReleaseLock(bool lock_asap, CTSE_LoadLock*& lock_ptr)
1699  : m_LockPtr(lock_ptr)
1700  {
1701  if ( lock_asap && !m_LockPtr ) {
1703  }
1704  }
1706  {
1707  m_LockPtr = 0;
1708  }
1709 
1712  };
1713 
1714  typedef pair<shared_ptr<CPSG_BlobInfo>, shared_ptr<CPSG_BlobData>> TBlobSlot;
1715  typedef map<string, TBlobSlot> TTSEBlobMap; // by PSG blob_id
1716  typedef map<string, map<TChunkId, TBlobSlot>> TChunkBlobMap; // by id2_info, id2_chunk
1717 
1719  shared_ptr<CPSG_SkippedBlob> m_Skipped;
1720  unique_ptr<CDeadline> m_SkippedWaitDeadline;
1721 
1723 
1724  const TBlobSlot* GetTSESlot(const string& psg_id) const;
1725  const TBlobSlot* GetChunkSlot(const string& id2_info, TChunkId chunk_id) const;
1726  const TBlobSlot* GetBlobSlot(const CPSG_DataId& id) const;
1727  TBlobSlot* SetBlobSlot(const CPSG_DataId& id);
1729  {
1731  return *m_LoadLockPtr;
1732  }
1733  void ObtainLoadLock();
1734  bool GotBlobData(const string& psg_blob_id) const;
1736  unique_ptr<CDeadline> GetWaitDeadline(const CPSG_SkippedBlob& skipped) const;
1737  static const char* GetSkippedType(const CPSG_SkippedBlob& skipped);
1738 
1739  void Finish(void) override
1740  {
1741  m_Skipped.reset();
1743  m_TSEBlobMap.clear();
1745  m_BlobIdMap.clear();
1746  }
1747 
1748  void SetDLBlobId(const string& psg_blob_id, CDataLoader::TBlobId dl_blob_id)
1749  {
1750  m_BlobIdMap[psg_blob_id] = dl_blob_id;
1751  }
1752 
1753  CDataLoader::TBlobId GetDLBlobId(const string& psg_blob_id) const
1754  {
1755  auto iter = m_BlobIdMap.find(psg_blob_id);
1756  if ( iter != m_BlobIdMap.end() ) {
1757  return iter->second;
1758  }
1759  return CDataLoader::TBlobId(new CPsgBlobId(psg_blob_id));
1760  }
1761 
1762 protected:
1763  void DoExecute(void) override;
1764  void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override;
1765  void CreateLoadedChunks(CTSE_LoadLock& load_lock);
1766  static bool IsChunk(const CPSG_DataId* id);
1767  static bool IsChunk(const CPSG_DataId& id);
1768  static bool IsChunk(const CPSG_SkippedBlob& skipped);
1769 
1770 private:
1778 };
1779 
1780 
1781 const CPSG_Blob_Task::TBlobSlot* CPSG_Blob_Task::GetTSESlot(const string& blob_id) const
1782 {
1783  auto iter = m_TSEBlobMap.find(blob_id);
1784  if ( iter != m_TSEBlobMap.end() ) {
1785  return &iter->second;
1786  }
1787  return 0;
1788 }
1789 
1790 
1792  TChunkId chunk_id) const
1793 {
1794  auto iter = m_ChunkBlobMap.find(id2_info);
1795  if ( iter != m_ChunkBlobMap.end() ) {
1796  auto iter2 = iter->second.find(chunk_id);
1797  if ( iter2 != iter->second.end() ) {
1798  return &iter2->second;
1799  }
1800  }
1801  return 0;
1802 }
1803 
1804 
1806 {
1807  if ( auto tse_id = dynamic_cast<const CPSG_BlobId*>(&id) ) {
1808  return GetTSESlot(tse_id->GetId());
1809  }
1810  else if ( auto chunk_id = dynamic_cast<const CPSG_ChunkId*>(&id) ) {
1811  return GetChunkSlot(chunk_id->GetId2Info(), chunk_id->GetId2Chunk());
1812  }
1813  return 0;
1814 }
1815 
1816 
1818 {
1819  if ( auto tse_id = dynamic_cast<const CPSG_BlobId*>(&id) ) {
1820  _TRACE("Blob slot for tse_id="<<tse_id->GetId());
1821  return &m_TSEBlobMap[tse_id->GetId()];
1822  }
1823  else if ( auto chunk_id = dynamic_cast<const CPSG_ChunkId*>(&id) ) {
1824  _TRACE("Blob slot for id2_info="<<chunk_id->GetId2Info()<<" chunk="<<chunk_id->GetId2Chunk());
1825  return &m_ChunkBlobMap[chunk_id->GetId2Info()][chunk_id->GetId2Chunk()];
1826  }
1827  return 0;
1828 }
1829 
1830 
1831 bool CPSG_Blob_Task::GotBlobData(const string& psg_blob_id) const
1832 {
1833  const TBlobSlot* main_blob_slot = GetTSESlot(psg_blob_id);
1834  if ( !main_blob_slot || !main_blob_slot->first ) {
1835  // no TSE blob props yet
1836  if ( s_GetDebugLevel() >= 7 ) {
1837  LOG_POST("GotBlobData("<<psg_blob_id<<"): no TSE blob props");
1838  }
1839  return false;
1840  }
1841  if ( main_blob_slot->second ) {
1842  // got TSE blob data
1843  if ( s_GetDebugLevel() >= 6 ) {
1844  LOG_POST("GotBlobData("<<psg_blob_id<<"): got TSE blob data");
1845  }
1846  return true;
1847  }
1848  auto id2_info = main_blob_slot->first->GetId2Info();
1849  if ( id2_info.empty() ) {
1850  // TSE doesn't have split info
1851  if ( s_GetDebugLevel() >= 7 ) {
1852  LOG_POST("GotBlobData("<<psg_blob_id<<"): not split");
1853  }
1854  return false;
1855  }
1856  const TBlobSlot* split_blob_slot = GetChunkSlot(id2_info, kSplitInfoChunkId);
1857  if ( !split_blob_slot || !split_blob_slot->second ) {
1858  // no split info blob data yet
1859  if ( s_GetDebugLevel() >= 7 ) {
1860  LOG_POST("GotBlobData("<<psg_blob_id<<"): no split blob data");
1861  }
1862  return false;
1863  }
1864  else {
1865  // got split info blob data
1866  if ( s_GetDebugLevel() >= 6 ) {
1867  LOG_POST("GotBlobData("<<psg_blob_id<<"): got split blob data");
1868  }
1869  return true;
1870  }
1871 }
1872 
1873 
1875 {
1876  if ( !m_LockASAP ) {
1877  // load lock is not requested
1878  return;
1879  }
1880  if ( GetLoadLock() ) {
1881  // load lock already obtained
1882  return;
1883  }
1884  if ( m_ReplyResult.blob_id.empty() ) {
1885  // blob id is not known yet
1886  return;
1887  }
1888  if ( !GotBlobData(m_ReplyResult.blob_id) ) {
1889  return;
1890  }
1891  if ( s_GetDebugLevel() >= 6 ) {
1892  LOG_POST("ObtainLoadLock("<<m_ReplyResult.blob_id<<"): getting load lock");
1893  }
1895  if ( s_GetDebugLevel() >= 6 ) {
1896  LOG_POST("ObtainLoadLock("<<m_ReplyResult.blob_id<<"): obtained load lock");
1897  }
1898 }
1899 
1900 
1902 {
1903  _TRACE("CPSG_Blob_Task::DoExecute()");
1904  if (!CheckReplyStatus()) return;
1906  ReadReply();
1907  if (m_Status == eFailed) return;
1908  if (m_Skipped) {
1909  m_Status = eCompleted;
1910  return;
1911  }
1912 
1913  if (m_ReplyResult.blob_id.empty()) {
1914  // If the source request was for blob rather than bioseq, there may be no bioseq info
1915  // and blob_id stays empty.
1916  if (m_Reply->GetRequest()->GetType() == CPSG_Request::eBlob) {
1917  shared_ptr<const CPSG_Request_Blob> blob_request = static_pointer_cast<const CPSG_Request_Blob>(m_Reply->GetRequest());
1918  if (blob_request) {
1919  m_ReplyResult.blob_id = blob_request->GetId();
1920  }
1921  }
1922  }
1923  if (m_ReplyResult.blob_id.empty()) {
1924  _TRACE("no blob_id");
1925  m_Status = eCompleted;
1926  return;
1927  }
1928 
1929  _TRACE("tse_id="<<m_ReplyResult.blob_id);
1930  const TBlobSlot* main_blob_slot = GetTSESlot(m_ReplyResult.blob_id);
1931  if ( main_blob_slot && main_blob_slot->first ) {
1932  // create and save new main blob-info entry
1933  m_ReplyResult.blob_info = make_shared<SPsgBlobInfo>(*main_blob_slot->first);
1936  }
1937 
1938  if ( !m_LoadLockPtr ) {
1939  // to TSE requested
1940  m_Status = eCompleted;
1941  return;
1942  }
1943 
1944  if ( !main_blob_slot || !main_blob_slot->first ) {
1945  _TRACE("No blob info for tse_id="<<m_ReplyResult.blob_id);
1946  m_Status = eFailed;
1947  return;
1948  }
1949 
1950  const TBlobSlot* split_blob_slot = 0;
1951  auto id2_info = main_blob_slot->first->GetId2Info();
1952  if ( !id2_info.empty() ) {
1953  split_blob_slot = GetChunkSlot(id2_info, kSplitInfoChunkId);
1954  if ( !split_blob_slot || !split_blob_slot->first ) {
1955  _TRACE("No split info tse_id="<<m_ReplyResult.blob_id<<" id2_info="<<id2_info);
1956  }
1957  }
1958 
1959  if (!m_DataSource) {
1960  _TRACE("No data source for tse_id="<<m_ReplyResult.blob_id);
1961  // No data to load, just bioseq-info.
1962  m_Status = eCompleted;
1963  return;
1964  }
1965 
1966  // Read blob data (if any) and pass to the data source.
1968  m_Status = eFailed;
1969  return;
1970  }
1972  CTSE_LoadLock load_lock;
1973  if ( GetLoadLock() && GetLoadLock()->GetBlobId() == dl_blob_id ) {
1974  load_lock = GetLoadLock();
1975  }
1976  else {
1977  load_lock = m_DataSource->GetTSE_LoadLock(dl_blob_id);
1978  }
1979  if (!load_lock) {
1980  _TRACE("Cannot get TSE load lock for tse_id="<<m_ReplyResult.blob_id);
1981  m_Status = eFailed;
1982  return;
1983  }
1985  if ( load_lock.IsLoaded() ) {
1986  if ( load_lock->x_NeedsDelayedMainChunk() &&
1987  !load_lock->GetSplitInfo().GetChunk(kDelayedMain_ChunkId).IsLoaded() ) {
1988  main_chunk_type = CPSGDataLoader_Impl::eDelayedMainChunk;
1989  }
1990  else {
1991  _TRACE("Already loaded tse_id="<<m_ReplyResult.blob_id);
1992  m_ReplyResult.lock = load_lock;
1993  m_Status = eCompleted;
1994  return;
1995  }
1996  }
1997 
1998  if ( split_blob_slot && split_blob_slot->first && split_blob_slot->second ) {
1999  auto& blob_id = *load_lock->GetBlobId();
2000  dynamic_cast<CPsgBlobId&>(const_cast<CBlobId&>(blob_id)).SetId2Info(id2_info);
2002  *split_blob_slot->first,
2003  *split_blob_slot->second,
2004  load_lock,
2006  load_lock->GetSplitInfo();
2007  }
2008  else if ( main_blob_slot && main_blob_slot->first && main_blob_slot->second ) {
2010  *main_blob_slot->first,
2011  *main_blob_slot->second,
2012  load_lock,
2014  }
2015  else if ( GotForbidden() ) {
2016  _TRACE("Got forbidden for tse_id="<<m_ReplyResult.blob_id);
2017  load_lock.Reset();
2018  m_Status = eCompleted;
2019  return;
2020  }
2021  else {
2022  _TRACE("No data for tse_id="<<m_ReplyResult.blob_id);
2023  load_lock.Reset();
2024  }
2025  if ( load_lock ) {
2026 #ifdef GLOBAL_CHUNKS
2027  m_Loader.x_SetLoaded(load_lock, main_chunk_type);
2028  CreateLoadedChunks(load_lock);
2029 #else
2030  CreateLoadedChunks(load_lock);
2031  m_Loader.x_SetLoaded(load_lock, main_chunk_type);
2032 #endif
2033  m_ReplyResult.lock = load_lock;
2034  m_Status = eCompleted;
2035  }
2036  else {
2037  m_Status = eFailed;
2038  }
2039 }
2040 
2041 
2043 {
2044  if ( !load_lock || !load_lock->HasSplitInfo() ) {
2045  return;
2046  }
2047  auto blob_id = dynamic_cast<const CPsgBlobId*>(&*load_lock->GetBlobId());
2048  if ( !blob_id ) {
2049  return;
2050  }
2051  CTSE_Split_Info& tse_split_info = load_lock->GetSplitInfo();
2052  for ( auto& chunk_slot : m_ChunkBlobMap[blob_id->GetId2Info()] ) {
2053  TChunkId chunk_id = chunk_slot.first;
2054  if ( chunk_id == kSplitInfoChunkId ) {
2055  continue;
2056  }
2057  if ( !chunk_slot.second.first || !chunk_slot.second.second ) {
2058  continue;
2059  }
2060  CTSE_Chunk_Info* chunk = 0;
2061  try {
2062  chunk = &tse_split_info.GetChunk(chunk_id);
2063  }
2064  catch ( CException& /*ignored*/ ) {
2065  }
2066  if ( !chunk || chunk->IsLoaded() ) {
2067  continue;
2068  }
2069  AutoPtr<CInitGuard> guard;
2070  if ( load_lock.IsLoaded() ) {
2071  guard = chunk->GetLoadInitGuard();
2072  if ( !guard.get() || !*guard.get() ) {
2073  continue;
2074  }
2075  }
2076  unique_ptr<CObjectIStream> in
2077  (CPSGDataLoader_Impl::GetBlobDataStream(*chunk_slot.second.first,
2078  *chunk_slot.second.second));
2079  CRef<CID2S_Chunk> id2_chunk(new CID2S_Chunk);
2080  *in >> *id2_chunk;
2081  if ( s_GetDebugLevel() >= 8 ) {
2082  LOG_POST(Info<<"PSG loader: TSE "<<chunk->GetBlobId().ToString()<<" "<<
2083  " chunk "<<chunk->GetChunkId()<<" "<<MSerial_AsnText<<*id2_chunk);
2084  }
2085 
2086  CSplitParser::Load(*chunk, *id2_chunk);
2087  chunk->SetLoaded();
2088  }
2089 }
2090 
2091 
2093 {
2094  if ( auto chunk_id = dynamic_cast<const CPSG_ChunkId*>(id) ) {
2095  return chunk_id->GetId2Chunk() != kSplitInfoChunkId;
2096  }
2097  return false;
2098 }
2099 
2100 
2102 {
2103  return IsChunk(&id);
2104 }
2105 
2106 
2108 {
2109  return IsChunk(skipped.GetId());
2110 }
2111 
2112 
2113 unique_ptr<CDeadline> CPSG_Blob_Task::GetWaitDeadline(const CPSG_SkippedBlob& skipped) const
2114 {
2115  double timeout = 0;
2116  switch ( skipped.GetReason() ) {
2118  timeout = 1;
2119  break;
2121  if ( skipped.GetTimeUntilResend().IsNull() ) {
2122  timeout = 0.2;
2123  }
2124  else {
2125  timeout = skipped.GetTimeUntilResend().GetValue();
2126  }
2127  break;
2128  default:
2129  return nullptr;
2130  }
2131  return make_unique<CDeadline>(CTimeout(timeout));
2132 }
2133 
2134 
2136 {
2137  switch ( skipped.GetReason() ) {
2139  return "in progress";
2141  return "sent";
2143  return "excluded";
2144  default:
2145  return "unknown";
2146  }
2147 }
2148 
2149 
2150 void CPSG_Blob_Task::ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)
2151 {
2152  switch (item->GetType()) {
2154  {
2155  // Only one bioseq-info is allowed per reply.
2156  shared_ptr<CPSG_BioseqInfo> bioseq_info = static_pointer_cast<CPSG_BioseqInfo>(item);
2157  m_ReplyResult.blob_id = bioseq_info->GetBlobId().GetId();
2158  ObtainLoadLock();
2159  m_Loader.m_BioseqCache->Add(*bioseq_info, m_Id);
2160  break;
2161  }
2163  {
2164  auto blob_info = static_pointer_cast<CPSG_BlobInfo>(item);
2165  _TRACE("Blob info: "<<blob_info->GetId()->Repr());
2166  if ( auto slot = SetBlobSlot(*blob_info->GetId()) ) {
2167  slot->first = blob_info;
2168  ObtainLoadLock();
2169  }
2170  break;
2171  }
2173  {
2174  shared_ptr<CPSG_BlobData> data = static_pointer_cast<CPSG_BlobData>(item);
2175  _TRACE("Blob data: "<<data->GetId()->Repr());
2176  if ( auto slot = SetBlobSlot(*data->GetId()) ) {
2177  slot->second = data;
2178  ObtainLoadLock();
2179  }
2180  break;
2181  }
2183  {
2184  // Only main blob can be skipped.
2185  if ( !m_Skipped && !IsChunk(*static_pointer_cast<CPSG_SkippedBlob>(item)) ) {
2186  shared_ptr<CPSG_SkippedBlob> skipped = static_pointer_cast<CPSG_SkippedBlob>(item);
2187  m_Skipped = skipped;
2189  }
2190  break;
2191  }
2192  default:
2193  {
2194  break;
2195  }
2196  }
2197 }
2198 
2199 
2201 {
2204  if (!m_DataSource) return ret;
2205 
2206  CDataLoader::TBlobId dl_blob_id = GetDLBlobId(ret.blob_id);
2207  CTSE_LoadLock load_lock;
2208  _ASSERT(m_Skipped);
2209  if ( m_SkippedWaitDeadline ) {
2210  load_lock = m_DataSource->GetLoadedTSE_Lock(dl_blob_id, *m_SkippedWaitDeadline);
2211  }
2212  else {
2213  load_lock = m_DataSource->GetTSE_LoadLockIfLoaded(dl_blob_id);
2214  }
2215  if ( load_lock && load_lock.IsLoaded() ) {
2216 #ifdef GLOBAL_CHUNKS
2217  CreateLoadedChunks(load_lock);
2218 #endif
2219  ret.lock = load_lock;
2220  }
2221  else {
2222  if ( s_GetDebugLevel() >= 6 ) {
2223  LOG_POST("CPSGDataLoader: '"<<GetSkippedType(*m_Skipped)<<"' blob is not loaded: "<<dl_blob_id.ToString());
2224  }
2225  }
2226  return ret;
2227 }
2228 
2229 
2231 {
2232  TLoadedSeqIds loaded;
2234  data_source, ref(loaded), ref(tse_sets)),
2235  "GetBlobs",
2237 }
2238 
2239 
2241 {
2242  if (!data_source) return;
2243  CPSG_TaskGroup group(*m_ThreadPool);
2244  ITERATE(TTSE_LockSets, tse_set, tse_sets) {
2245  const CSeq_id_Handle& idh = tse_set->first;
2246  if ( loaded.count(idh) ) {
2247  continue;
2248  }
2249  CPSG_BioId bio_id(idh);
2250  auto request = make_shared<CPSG_Request_Biodata>(std::move(bio_id));
2252  if (data_source) {
2253  inc_data = m_TSERequestModeBulk;
2254  CDataSource::TLoadedBlob_ids loaded_blob_ids;
2255  data_source->GetLoadedBlob_ids(idh, CDataSource::fKnown_bioseqs, loaded_blob_ids);
2256  ITERATE(CDataSource::TLoadedBlob_ids, loaded_blob_id, loaded_blob_ids) {
2257  const CPsgBlobId* pbid = dynamic_cast<const CPsgBlobId*>(&**loaded_blob_id);
2258  if (!pbid) continue;
2259  request->ExcludeTSE(CPSG_BlobId(pbid->ToPsgId()));
2260  }
2261  }
2262  request->IncludeData(inc_data);
2263  auto reply = x_SendRequest(request);
2264  CRef<CPSG_Blob_Task> task(
2265  new CPSG_Blob_Task(reply, group, idh, data_source, *this, true));
2266  group.AddTask(task);
2267  }
2268  size_t failed_count = 0;
2269  // Waiting for skipped blobs can block all pool threads. To prevent this postpone
2270  // waiting until all other tasks are completed.
2271  typedef list<CRef<CPSG_Blob_Task>> TTasks;
2272  TTasks skipped_tasks;
2273  list<shared_ptr<CPSG_Task_Guard>> guards;
2274  while (group.HasTasks()) {
2275  CRef<CPSG_Blob_Task> task(group.GetTask<CPSG_Blob_Task>().GetNCPointerOrNull());
2276  _ASSERT(task);
2277  guards.push_back(make_shared<CPSG_Task_Guard>(*task));
2278  if (task->GetStatus() == CThreadPool_Task::eFailed) {
2279  ++failed_count;
2280  continue;
2281  }
2282  if (task->m_Skipped) {
2283  skipped_tasks.push_back(task);
2284  continue;
2285  }
2286  SReplyResult res = task->m_ReplyResult;
2287  if (task->m_ReplyResult.lock) {
2288  tse_sets[task->m_Id].insert(task->m_ReplyResult.lock);
2289  }
2290  loaded.insert(task->m_Id);
2291  }
2292  NON_CONST_ITERATE(TTasks, it, skipped_tasks) {
2293  CPSG_Blob_Task& task = **it;
2295  if (!result.lock) {
2296  // Force reloading blob
2297  try {
2298  result = x_RetryBlobRequest(task.m_ReplyResult.blob_id, data_source, task.m_Id);
2299  }
2300  catch ( CException& /*doesn't matter*/ ) {
2301  ++failed_count;
2302  continue;
2303  }
2304  }
2305  if (result.lock) {
2306  tse_sets[task.m_Id].insert(result.lock);
2307  }
2308  loaded.insert(task.m_Id);
2309  }
2310  if ( failed_count ) {
2311  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
2312  "failed to load "<<failed_count<<" blobs");
2313  }
2314 }
2315 
2316 
2318  const TSeqIdSets& id_sets, TLoaded& loaded, TCDD_Locks& ret)
2319 {
2321  data_source, id_sets, ref(loaded), ref(ret)),
2322  "GetCDDAnnots",
2324 }
2325 
2326 
2328  CTSE_Chunk_Info& chunk_info)
2329 {
2330  CDataLoader::TChunkSet chunks;
2331  chunks.push_back(Ref(&chunk_info));
2332  LoadChunks(data_source, chunks);
2333 }
2334 
2335 
2337 {
2338 public:
2340  : CPSG_Task(reply, group), m_Chunk(chunk) {}
2341 
2342  ~CPSG_LoadChunk_Task(void) override {}
2343 
2344  void Finish(void) override {
2345  m_Chunk.Reset();
2346  m_BlobInfo.reset();
2347  m_BlobData.reset();
2348  }
2349 
2350 protected:
2351  void DoExecute(void) override;
2352  void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override;
2353 
2354 private:
2356  shared_ptr<CPSG_BlobInfo> m_BlobInfo;
2357  shared_ptr<CPSG_BlobData> m_BlobData;
2358 };
2359 
2360 
2362 {
2363  if (!CheckReplyStatus()) return;
2364  ReadReply();
2365  if (m_Status == eFailed) return;
2366 
2367  if (!m_BlobInfo || !m_BlobData) {
2368  _TRACE("Failed to get chunk info or data for blob-id " << m_Chunk->GetBlobId());
2369  m_Status = eFailed;
2370  return;
2371  }
2372 
2373  if (IsCancelled()) return;
2374  unique_ptr<CObjectIStream> in(CPSGDataLoader_Impl::GetBlobDataStream(*m_BlobInfo, *m_BlobData));
2375  if (!in.get()) {
2376  _TRACE("Failed to open chunk data stream for blob-id " << m_BlobInfo->GetId()->Repr());
2377  m_Status = eFailed;
2378  return;
2379  }
2380 
2381  CRef<CID2S_Chunk> id2_chunk(new CID2S_Chunk);
2382  *in >> *id2_chunk;
2383  if ( s_GetDebugLevel() >= 8 ) {
2384  LOG_POST(Info<<"PSG loader: TSE "<<m_Chunk->GetBlobId().ToString()<<" "<<
2385  " chunk "<<m_Chunk->GetChunkId()<<" "<<MSerial_AsnText<<*id2_chunk);
2386  }
2387  CSplitParser::Load(*m_Chunk, *id2_chunk);
2388  m_Chunk->SetLoaded();
2389 
2390  m_Status = eCompleted;
2391 }
2392 
2393 
2394 void CPSG_LoadChunk_Task::ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item)
2395 {
2396  switch (item->GetType()) {
2398  m_BlobInfo = static_pointer_cast<CPSG_BlobInfo>(item);
2399  break;
2401  m_BlobData = static_pointer_cast<CPSG_BlobData>(item);
2402  break;
2403  default:
2404  break;
2405  }
2406 }
2407 
2408 
2409 const char kCDDAnnotName[] = "CDD";
2410 const bool kCreateLocalCDDEntries = true;
2411 const char kLocalCDDEntryIdPrefix[] = "CDD:";
2412 const char kLocalCDDEntryIdSeparator = '|';
2413 
2414 
2416 {
2417  SCDDIds ret;
2418  bool is_protein = true;
2419  for ( auto id : ids ) {
2420  if ( id.IsGi() ) {
2421  ret.gi = id;
2422  continue;
2423  }
2424  if ( id.Which() == CSeq_id::e_Pdb ) {
2425  if ( !ret.acc_ver ) {
2426  ret.acc_ver = id;
2427  }
2428  continue;
2429  }
2430  auto seq_id = id.GetSeqId();
2431  if ( auto text_id = seq_id->GetTextseq_Id() ) {
2432  auto acc_type = seq_id->IdentifyAccession();
2433  if ( acc_type & CSeq_id::fAcc_nuc ) {
2434  is_protein = false;
2435  break;
2436  }
2437  else if ( text_id->IsSetAccession() && text_id->IsSetVersion() &&
2438  (acc_type & CSeq_id::fAcc_prot) ) {
2439  is_protein = true;
2440  ret.acc_ver = CSeq_id_Handle::GetHandle(text_id->GetAccession()+'.'+
2441  NStr::NumericToString(text_id->GetVersion()));
2442  }
2443  }
2444  }
2445  if (!is_protein) {
2446  ret.gi.Reset();
2447  ret.acc_ver.Reset();
2448  }
2449  return ret;
2450 }
2451 
2452 
2453 static string x_MakeLocalCDDEntryId(const SCDDIds& cdd_ids)
2454 {
2455  ostringstream str;
2456  _ASSERT(cdd_ids.gi && cdd_ids.gi.IsGi());
2457  str << kLocalCDDEntryIdPrefix << cdd_ids.gi.GetGi();
2458  if ( cdd_ids.acc_ver ) {
2459  str << kLocalCDDEntryIdSeparator << cdd_ids.acc_ver;
2460  }
2461  return str.str();
2462 }
2463 
2464 
2465 static bool x_IsLocalCDDEntryId(const CPsgBlobId& blob_id)
2466 {
2467  return NStr::StartsWith(blob_id.ToPsgId(), kLocalCDDEntryIdPrefix);
2468 }
2469 
2470 
2471 static bool x_ParseLocalCDDEntryId(const CPsgBlobId& blob_id, SCDDIds& cdd_ids)
2472 {
2473  if ( !x_IsLocalCDDEntryId(blob_id) ) {
2474  return false;
2475  }
2476  istringstream str(blob_id.ToPsgId().substr(strlen(kLocalCDDEntryIdPrefix)));
2477  TIntId gi_id = 0;
2478  str >> gi_id;
2479  if ( !gi_id ) {
2480  return false;
2481  }
2482  cdd_ids.gi = CSeq_id_Handle::GetGiHandle(GI_FROM(TIntId, gi_id));
2483  if ( str.get() == kLocalCDDEntryIdSeparator ) {
2484  string extra;
2485  str >> extra;
2486  cdd_ids.acc_ver = CSeq_id_Handle::GetHandle(extra);
2487  }
2488  return true;
2489 }
2490 
2491 
2493 {
2494  const string& str = blob_id.ToPsgId();
2495  size_t start = strlen(kLocalCDDEntryIdPrefix);
2496  size_t end = str.find(kLocalCDDEntryIdSeparator, start);
2497  return { str.substr(start, end-start), CSeq_id::e_Gi };
2498 }
2499 
2500 
2502 {
2503  if ( !cdd_ids.gi && !cdd_ids.acc_ver ) {
2504  return null;
2505  }
2508  // add main annot types
2509  CAnnotName name = kCDDAnnotName;
2510  CSeqFeatData::ESubtype subtypes[] = {
2513  };
2514  set<CSeq_id_Handle> ids;
2515  for ( int i = 0; i < 2; ++i ) {
2516  const CSeq_id_Handle& id = i ? cdd_ids.acc_ver : cdd_ids.gi;
2517  if ( !id ) {
2518  continue;
2519  }
2520  ids.insert(id);
2521  }
2522  if ( s_GetDebugLevel() >= 6 ) {
2523  for ( auto& id : ids ) {
2524  LOG_POST(Info<<"CPSGDataLoader: CDD synthetic id "<<MSerial_AsnText<<*id.GetSeqId());
2525  }
2526  }
2527  for ( auto subtype : subtypes ) {
2528  SAnnotTypeSelector type(subtype);
2529  for ( auto& id : ids ) {
2530  chunk->x_AddAnnotType(name, type, id, range);
2531  }
2532  }
2533  return chunk;
2534 }
2535 
2536 
2537 static CTSE_Lock x_CreateLocalCDDEntry(CDataSource* data_source, const SCDDIds& cdd_ids)
2538 {
2539  CRef<CPsgBlobId> blob_id(new CPsgBlobId(x_MakeLocalCDDEntryId(cdd_ids)));
2540  if ( auto chunk = x_CreateLocalCDDEntryChunk(cdd_ids) ) {
2541  CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(blob_id);
2542  CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
2543  if ( load_lock ) {
2544  if ( !load_lock.IsLoaded() ) {
2546  return CTSE_Lock();
2547  }
2548  load_lock->SetName(kCDDAnnotName);
2549  load_lock->GetSplitInfo().AddChunk(*chunk);
2550  _ASSERT(load_lock->x_NeedsDelayedMainChunk());
2551  load_lock.SetLoaded();
2552  }
2553  return load_lock;
2554  }
2555  }
2556  return CTSE_Lock();
2557 }
2558 
2559 
2560 static void x_CreateEmptyLocalCDDEntry(CDataSource* data_source,
2561  CDataLoader::TChunk chunk)
2562 {
2563  CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(chunk->GetBlobId());
2564  _ASSERT(load_lock);
2565  _ASSERT(load_lock.IsLoaded());
2566  _ASSERT(load_lock->HasNoSeq_entry());
2567  CRef<CSeq_entry> entry(new CSeq_entry);
2568  entry->SetSet().SetSeq_set();
2569  if ( s_GetDebugLevel() >= 8 ) {
2570  LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
2571  " created empty CDD entry");
2572  }
2573  load_lock->SetSeq_entry(*entry);
2574  chunk->SetLoaded();
2575 }
2576 
2577 
2578 static bool s_SameId(const CPSG_BlobId* id1, const CPSG_BlobId& id2)
2579 {
2580  return id1 && id1->GetId() == id2.GetId();
2581 }
2582 
2583 
2584 static bool s_HasFailedStatus(const CPSG_NamedAnnotStatus& na_status)
2585 {
2586  for ( auto& s : na_status.GetId2AnnotStatusList() ) {
2587  if ( s.second == EPSG_Status::eError ) {
2588  return true;
2589  }
2590  }
2591  return false;
2592 }
2593 
2594 
2596  CDataLoader::TChunk chunk,
2597  const CPSG_BlobInfo& blob_info,
2598  const CPSG_BlobData& blob_data)
2599 {
2601  _DEBUG_ARG(const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk->GetBlobId()));
2602  _ASSERT(x_IsLocalCDDEntryId(blob_id));
2603  _ASSERT(!chunk->IsLoaded());
2604 
2605  CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(chunk->GetBlobId());
2606  if ( !load_lock ||
2607  !load_lock.IsLoaded() ||
2608  !load_lock->x_NeedsDelayedMainChunk() ) {
2609  _TRACE("Cannot make CDD entry because of wrong TSE state id="<<blob_id.ToString());
2610  return false;
2611  }
2612 
2613  unique_ptr<CObjectIStream> in(GetBlobDataStream(blob_info, blob_data));
2614  if (!in.get()) {
2615  _TRACE("Failed to open blob data stream for blob-id " << blob_id.ToString());
2616  return false;
2617  }
2618 
2619  CRef<CSeq_entry> entry(new CSeq_entry);
2620  *in >> *entry;
2621  if ( s_GetDebugLevel() >= 8 ) {
2622  LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
2623  MSerial_AsnText<<*entry);
2624  }
2625  if ( s_GetDebugLevel() >= 6 ) {
2626  set<CSeq_id_Handle> annot_ids;
2627  for ( CTypeConstIterator<CSeq_id> it = ConstBegin(*entry); it; ++it ) {
2628  annot_ids.insert(CSeq_id_Handle::GetHandle(*it));
2629  }
2630  for ( auto& id : annot_ids ) {
2631  LOG_POST(Info<<"CPSGDataLoader: CDD actual id "<<MSerial_AsnText<<*id.GetSeqId());
2632  }
2633  }
2634  load_lock->SetSeq_entry(*entry);
2635  chunk->SetLoaded();
2636  return true;
2637 }
2638 
2639 
2640 shared_ptr<CPSG_Request_Blob>
2642  CDataLoader::TChunk chunk)
2643 {
2645  const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk->GetBlobId());
2646  _ASSERT(x_IsLocalCDDEntryId(blob_id));
2647  _ASSERT(!chunk->IsLoaded());
2648  bool failed = false;
2649  shared_ptr<CPSG_NamedAnnotInfo> cdd_info;
2650  shared_ptr<CPSG_NamedAnnotStatus> cdd_status;
2651 
2652  // load CDD blob id
2653  {{
2654  CPSG_BioId bio_id = x_LocalCDDEntryIdToBioId(blob_id);
2656  _ASSERT(bio_id.GetId().find('|') == NPOS);
2657  auto request = make_shared<CPSG_Request_NamedAnnotInfo>(bio_id, names);
2658  request->IncludeData(m_TSERequestMode);
2659  auto reply = x_SendRequest(request);
2660  shared_ptr<CPSG_BioseqInfo> bioseq_info;
2661  shared_ptr<CPSG_BlobInfo> blob_info;
2662  shared_ptr<CPSG_BlobData> blob_data;
2663  for (;;) {
2664  auto reply_item = reply->GetNextItem(DEFAULT_DEADLINE);
2665  if (!reply_item) continue;
2666  if (reply_item->GetType() == CPSG_ReplyItem::eEndOfReply) break;
2667  EPSG_Status status = reply_item->GetStatus(CDeadline::eInfinite);
2668  if (status != EPSG_Status::eSuccess) {
2669  ReportStatus(reply_item, status);
2670  if ( status == EPSG_Status::eNotFound ) {
2671  continue;
2672  }
2673  failed = true;
2674  break;
2675  }
2676  if (reply_item->GetType() == CPSG_ReplyItem::eBioseqInfo) {
2677  bioseq_info = static_pointer_cast<CPSG_BioseqInfo>(reply_item);
2678  }
2679  if (reply_item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) {
2680  auto na_info = static_pointer_cast<CPSG_NamedAnnotInfo>(reply_item);
2681  if ( NStr::EqualNocase(na_info->GetName(), kCDDAnnotName) ) {
2682  cdd_info = na_info;
2683  }
2684  }
2685  if (reply_item->GetType() == CPSG_ReplyItem::eNamedAnnotStatus) {
2686  cdd_status = static_pointer_cast<CPSG_NamedAnnotStatus>(reply_item);
2687  if ( s_HasFailedStatus(*cdd_status) ) {
2688  failed = true;
2689  }
2690  }
2691  if (reply_item->GetType() == CPSG_ReplyItem::eBlobInfo) {
2692  blob_info = static_pointer_cast<CPSG_BlobInfo>(reply_item);
2693  }
2694  if (reply_item->GetType() == CPSG_ReplyItem::eBlobData) {
2695  blob_data = static_pointer_cast<CPSG_BlobData>(reply_item);
2696  }
2697  }
2698  if ( failed ) {
2699  NCBI_THROW(CLoaderException, eLoaderFailed, "failed to get CDD blob-id for "+bio_id.Repr());
2700  }
2701  if ( !cdd_info ) {
2702  x_CreateEmptyLocalCDDEntry(data_source, chunk);
2703  return nullptr;
2704  }
2705  // see if we got blob already
2706  if ( blob_info && s_SameId(blob_info->GetId<CPSG_BlobId>(), cdd_info->GetBlobId()) &&
2707  blob_data && s_SameId(blob_data->GetId<CPSG_BlobId>(), cdd_info->GetBlobId()) ) {
2708  _TRACE("Got CDD entry: "<<cdd_info->GetBlobId().Repr());
2709  if ( x_ReadCDDChunk(data_source, chunk, *blob_info, *blob_data) ) {
2710  return nullptr;
2711  }
2712  }
2713  }}
2714 
2715  // load CDD blob request
2716  return make_shared<CPSG_Request_Blob>(cdd_info->GetBlobId());
2717 }
2718 
2719 
2721  const CDataLoader::TChunkSet& chunks)
2722 {
2724  data_source, cref(chunks)),
2725  "LoadChunks");
2726 }
2727 
2728 
2730  const CDataLoader::TChunkSet& chunks)
2731 {
2732  if (chunks.empty()) return;
2733 
2734  CPSG_TaskGroup group(*m_ThreadPool);
2735  list<shared_ptr<CPSG_Task_Guard>> guards;
2736  ITERATE(CDataLoader::TChunkSet, it, chunks) {
2737  const CTSE_Chunk_Info& chunk = **it;
2738  if ( chunk.IsLoaded() ) {
2739  continue;
2740  }
2741  if ( chunk.GetChunkId() == kMasterWGS_ChunkId ) {
2742  CWGSMasterSupport::LoadWGSMaster(data_source->GetDataLoader(), *it);
2743  continue;
2744  }
2745  if ( chunk.GetChunkId() == kDelayedMain_ChunkId ) {
2746  const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk.GetBlobId());
2747  shared_ptr<CPSG_Request_Blob> request;
2748  if ( x_IsLocalCDDEntryId(blob_id) ) {
2749  if (m_CDDInfoCache->Find(blob_id.ToPsgId())) {
2750  x_CreateEmptyLocalCDDEntry(data_source, *it);
2751  continue;
2752  }
2753  request = x_MakeLoadLocalCDDEntryRequest(data_source, *it);
2754  if ( !request ) {
2755  continue;
2756  }
2757  }
2758  else {
2759  request = make_shared<CPSG_Request_Blob>(blob_id.ToPsgId());
2760  }
2761  request->IncludeData(m_TSERequestMode);
2762  auto reply = x_SendRequest(request);
2763  CRef<CPSG_Blob_Task> task(new CPSG_Blob_Task(reply, group, CSeq_id_Handle(), data_source, *this, true));
2764  task->SetDLBlobId(dynamic_cast<const CPSG_Request_Blob&>(*reply->GetRequest()).GetBlobId().GetId(),
2765  chunk.GetBlobId());
2766  guards.push_back(make_shared<CPSG_Task_Guard>(*task));
2767  group.AddTask(task);
2768  }
2769  else {
2770  const CPsgBlobId& blob_id = dynamic_cast<const CPsgBlobId&>(*chunk.GetBlobId());
2771  auto request = make_shared<CPSG_Request_Chunk>(CPSG_ChunkId(chunk.GetChunkId(),
2772  blob_id.GetId2Info()));
2773  auto reply = x_SendRequest(request);
2774  CRef<CPSG_LoadChunk_Task> task(new CPSG_LoadChunk_Task(reply, group, *it));
2775  guards.push_back(make_shared<CPSG_Task_Guard>(*task));
2776  group.AddTask(task);
2777  }
2778  }
2779  group.WaitAll();
2780  // check if all chunks are loaded
2781  size_t failed_count = 0;
2782  ITERATE(CDataLoader::TChunkSet, it, chunks) {
2783  const CTSE_Chunk_Info & chunk = **it;
2784  if (!chunk.IsLoaded()) {
2785  _TRACE("Failed to load chunk " << chunk.GetChunkId() << " of " << dynamic_cast<const CPsgBlobId&>(*chunk.GetBlobId()).ToPsgId());
2786  ++failed_count;
2787  }
2788  }
2789  if ( failed_count ) {
2790  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
2791  "failed to load "<<failed_count<<" chunks");
2792  }
2793 }
2794 
2795 
2797 {
2798 public:
2800  : CPSG_Task(reply, group) {}
2801 
2802  ~CPSG_AnnotRecordsNA_Task(void) override {}
2803 
2804  list<shared_ptr<CPSG_NamedAnnotInfo>> m_AnnotInfo;
2805  shared_ptr<CPSG_NamedAnnotStatus> m_AnnotStatus;
2806 
2807  void Finish(void) override {
2808  m_AnnotInfo.clear();
2809  m_AnnotStatus.reset();
2810  }
2811 
2812 protected:
2813  void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override {
2814  if (item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) {
2815  m_AnnotInfo.push_back(static_pointer_cast<CPSG_NamedAnnotInfo>(item));
2816  }
2817  if (item->GetType() == CPSG_ReplyItem::eNamedAnnotStatus) {
2818  m_AnnotStatus = static_pointer_cast<CPSG_NamedAnnotStatus>(item);
2819  if ( s_HasFailedStatus(*m_AnnotStatus) ) {
2820  m_Status = eFailed;
2821  RequestToCancel();
2822  }
2823  }
2824  }
2825 };
2826 
2828 {
2829 public:
2831  : CPSG_Task(reply, group) {}
2832 
2833  ~CPSG_AnnotRecordsCDD_Task(void) override {}
2834 
2835  shared_ptr<CPSG_BioseqInfo> m_BioseqInfo;
2836  list<shared_ptr<CPSG_NamedAnnotInfo>> m_AnnotInfo;
2837  shared_ptr<CPSG_NamedAnnotStatus> m_AnnotStatus;
2838 
2839  void Finish(void) override {
2840  m_BioseqInfo.reset();
2841  m_AnnotInfo.clear();
2842  m_AnnotStatus.reset();
2843  }
2844 
2845 protected:
2846  void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override {
2847  if (item->GetType() == CPSG_ReplyItem::eBioseqInfo) {
2848  m_BioseqInfo = static_pointer_cast<CPSG_BioseqInfo>(item);
2849  }
2850  if (item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) {
2851  m_AnnotInfo.push_back(static_pointer_cast<CPSG_NamedAnnotInfo>(item));
2852  }
2853  if (item->GetType() == CPSG_ReplyItem::eNamedAnnotStatus) {
2854  m_AnnotStatus = static_pointer_cast<CPSG_NamedAnnotStatus>(item);
2855  if ( s_HasFailedStatus(*m_AnnotStatus) ) {
2856  m_Status = eFailed;
2857  RequestToCancel();
2858  }
2859  }
2860  }
2861 };
2862 
2863 static
2864 pair<CRef<CTSE_Chunk_Info>, string>
2865 s_CreateNAChunk(const CPSG_NamedAnnotInfo& psg_annot_info)
2866 {
2867  pair<CRef<CTSE_Chunk_Info>, string> ret;
2869  unsigned main_count = 0;
2870  unsigned zoom_count = 0;
2871  // detailed annot info
2873  for ( auto& annot_info_ref : psg_annot_info.GetId2AnnotInfoList() ) {
2874  if ( s_GetDebugLevel() >= 8 ) {
2875  LOG_POST(Info<<"PSG loader: "<<psg_annot_info.GetBlobId().GetId()<<" NA info "
2876  <<MSerial_AsnText<<*annot_info_ref);
2877  }
2878  const CID2S_Seq_annot_Info& annot_info = *annot_info_ref;
2879  // create special external annotations blob
2880  CAnnotName name(annot_info.GetName());
2881  if ( name.IsNamed() && !ExtractZoomLevel(name.GetName(), 0, 0) ) {
2882  //setter.GetTSE_LoadLock()->SetName(name);
2883  names.insert(name.GetName());
2884  ++main_count;
2885  }
2886  else {
2887  ++zoom_count;
2888  }
2889 
2890  vector<SAnnotTypeSelector> types;
2891  if ( annot_info.IsSetAlign() ) {
2893  }
2894  if ( annot_info.IsSetGraph() ) {
2896  }
2897  if ( annot_info.IsSetFeat() ) {
2898  for ( auto feat_type_info_iter : annot_info.GetFeat() ) {
2899  const CID2S_Feat_type_Info& finfo = *feat_type_info_iter;
2900  int feat_type = finfo.GetType();
2901  if ( feat_type == 0 ) {
2902  types.push_back(SAnnotTypeSelector
2904  }
2905  else if ( !finfo.IsSetSubtypes() ) {
2906  types.push_back(SAnnotTypeSelector
2907  (CSeqFeatData::E_Choice(feat_type)));
2908  }
2909  else {
2910  for ( auto feat_subtype : finfo.GetSubtypes() ) {
2911  types.push_back(SAnnotTypeSelector
2912  (CSeqFeatData::ESubtype(feat_subtype)));
2913  }
2914  }
2915  }
2916  }
2917 
2919  CSplitParser::x_ParseLocation(loc, annot_info.GetSeq_loc());
2920 
2921  ITERATE ( vector<SAnnotTypeSelector>, it, types ) {
2922  chunk->x_AddAnnotType(name, *it, loc);
2923  }
2924  }
2925  if ( names.size() == 1 ) {
2926  ret.second = *names.begin();
2927  }
2928  if ( s_GetDebugLevel() >= 5 ) {
2929  LOG_POST(Info<<"PSG loader: TSE "<<psg_annot_info.GetBlobId().GetId()<<
2930  " annots: "<<ret.second<<" "<<main_count<<"+"<<zoom_count);
2931  }
2932  if ( !names.empty() ) {
2933  ret.first = chunk;
2934  }
2935  return ret;
2936 }
2937 
2938 
2940  CDataSource* data_source,
2941  const TIds& ids,
2942  const SAnnotSelector* sel,
2943  CDataLoader::TProcessedNAs* processed_nas)
2944 {
2946  data_source, cref(ids), sel, processed_nas),
2947  "GetAnnotRecordsNA");
2948 }
2949 
2950 
2952  const string& name,
2953  const TIds& ids,
2954  CDataSource* data_source,
2955  CDataLoader::TProcessedNAs* processed_nas,
2957 {
2958  auto cached = m_AnnotCache->Get(name, *ids.begin());
2959  if (cached) {
2960  for (auto& info : cached->infos) {
2961  CDataLoader::SetProcessedNA(name, processed_nas);
2962  auto chunk_info = s_CreateNAChunk(*info);
2963  CRef<CPsgBlobId> blob_id(new CPsgBlobId(info->GetBlobId().GetId()));
2964  CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(blob_id);
2965  CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
2966  if ( load_lock ) {
2967  if ( !load_lock.IsLoaded() ) {
2968  load_lock->SetName(cached->name);
2969  load_lock->GetSplitInfo().AddChunk(*chunk_info.first);
2970  _ASSERT(load_lock->x_NeedsDelayedMainChunk());
2971  load_lock.SetLoaded();
2972  }
2973  locks.insert(load_lock);
2974  }
2975  }
2976  return true;
2977  }
2978  return false;
2979 }
2980 
2981 
2983  CDataSource* data_source,
2984  const TIds& ids,
2985  const SAnnotSelector* sel,
2986  CDataLoader::TProcessedNAs* processed_nas)
2987 {
2989  if ( !data_source || ids.empty() ) {
2990  return locks;
2991  }
2993  if ( !kCreateLocalCDDEntries && !x_CheckAnnotCache(kCDDAnnotName, ids, data_source, processed_nas, locks) ) {
2994  annot_names.push_back(kCDDAnnotName);
2995  }
2996  auto snp_scale_limit = CSeq_id::eSNPScaleLimit_Default;
2997  string snp_name = "SNP"; // name used for caching SNP annots with scale-limit.
2998  if ( sel && sel->IsIncludedAnyNamedAnnotAccession() ) {
2999  CPSG_BioIds bio_ids;
3000  for (auto& id : ids) {
3001  bio_ids.push_back(CPSG_BioId(id));
3002  }
3006  // CDDs are added as external annotations
3007  continue;
3008  }
3009  string name = it->first;
3010  if (name == "SNP") {
3011  snp_scale_limit = sel->GetSNPScaleLimit();
3012  if (snp_scale_limit == CSeq_id::eSNPScaleLimit_Default) {
3013  snp_scale_limit = CPSGDataLoader::GetSNP_Scale_Limit();
3014  }
3015  if (snp_scale_limit != CSeq_id::eSNPScaleLimit_Default) {
3016  snp_name = "SNP::" + NStr::NumericToString((int)snp_scale_limit);
3017  name = snp_name;
3018  }
3019  }
3020  if ( !x_CheckAnnotCache(name, ids, data_source, processed_nas, locks) ) {
3021  annot_names.push_back(it->first);
3022  }
3023  }
3024 
3025  if ( !annot_names.empty() ) {
3026  auto request = make_shared<CPSG_Request_NamedAnnotInfo>(std::move(bio_ids), annot_names);
3027  request->SetSNPScaleLimit(snp_scale_limit);
3028  auto reply = x_SendRequest(request);
3029  CPSG_TaskGroup group(*m_ThreadPool);
3031  CPSG_Task_Guard guard(*task);
3032  group.AddTask(task);
3033  group.WaitAll();
3034 
3035  if (task->GetStatus() == CThreadPool_Task::eCompleted) {
3036  map<string, SPsgAnnotInfo::TInfos> infos_by_name;
3037  for ( auto& info : task->m_AnnotInfo ) {
3038  CDataLoader::SetProcessedNA(info->GetName(), processed_nas);
3039  CRef<CPsgBlobId> blob_id(new CPsgBlobId(info->GetBlobId().GetId()));
3040  auto chunk_info = s_CreateNAChunk(*info);
3041  if ( chunk_info.first ) {
3042  infos_by_name[info->GetName()].push_back(info);
3043  CDataLoader::TBlobId dl_blob_id = CDataLoader::TBlobId(blob_id);
3044  CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
3045  if ( load_lock ) {
3046  if ( !load_lock.IsLoaded() ) {
3047  if ( !chunk_info.second.empty() ) {
3048  load_lock->SetName(chunk_info.second);
3049  }
3050  load_lock->GetSplitInfo().AddChunk(*chunk_info.first);
3051  _ASSERT(load_lock->x_NeedsDelayedMainChunk());
3052  load_lock.SetLoaded();
3053  }
3054  locks.insert(load_lock);
3055  }
3056  }
3057  else {
3058  // no annot info
3059  if ( auto tse_lock = GetBlobById(data_source, *blob_id) ) {
3060  locks.insert(tse_lock);
3061  }
3062  }
3063  }
3064  if (!ids.empty() && !infos_by_name.empty()) {
3065  for(auto infos : infos_by_name) {
3066  m_AnnotCache->Add(infos.second, (infos.first == "SNP" ? snp_name : infos.first), ids);
3067  }
3068  }
3069  }
3070  else {
3071  _TRACE("Failed to load annotations for " << ids.begin()->AsString());
3072  NCBI_THROW(CLoaderException, eLoaderFailed,
3073  "CPSGDataLoader::GetAnnotRecordsNA() failed");
3074  }
3075  }
3076  }
3077  if ( kCreateLocalCDDEntries ) {
3078  SCDDIds cdd_ids = x_GetCDDIds(ids);
3079  if ( cdd_ids.gi ) {
3080  if ( auto tse_lock = x_CreateLocalCDDEntry(data_source, cdd_ids) ) {
3081  locks.insert(tse_lock);
3082  }
3083  }
3084  }
3085  return locks;
3086 }
3087 
3088 
3090 {
3091  if (ids.empty()) return;
3093 
3094  SCDDIds cdd_ids = x_GetCDDIds(ids);
3095  if (!cdd_ids.gi) return;
3096  string blob_id = x_MakeLocalCDDEntryId(cdd_ids);
3097 
3098  if (m_CDDInfoCache->Find(blob_id)) return;
3099  auto cached = m_AnnotCache->Get(kCDDAnnotName, ids.front());
3100  if (cached) return;
3101 
3102  CPSG_BioIds bio_ids;
3103  for (auto& id : ids) {
3104  bio_ids.push_back(CPSG_BioId(id));
3105  }
3107  auto request = make_shared<CPSG_Request_NamedAnnotInfo>(std::move(bio_ids), annot_names);
3108  auto reply = x_SendRequest(request);
3109  for (;;) {
3110  if (m_CDDPrefetchTask->IsCancelRequested()) return;
3111  auto reply_item = reply->GetNextItem(DEFAULT_DEADLINE);
3112  if (!reply_item) continue;
3113  if (reply_item->GetType() == CPSG_ReplyItem::eEndOfReply) break;
3114  if (m_CDDPrefetchTask->IsCancelRequested()) return;
3115  EPSG_Status status = reply_item->GetStatus(CDeadline::eInfinite);
3116  if (m_CDDPrefetchTask->IsCancelRequested()) return;
3117  if (status != EPSG_Status::eSuccess) {
3118  ReportStatus(reply_item, status);
3119  if ( status == EPSG_Status::eNotFound ) {
3120  continue;
3121  }
3122  if ( status == EPSG_Status::eForbidden ) {
3123  continue;
3124  }
3125  return;
3126  }
3127  if (reply_item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) return;
3128  }
3129  // No named annot info returned, mark the bioseq as having no CDDs.
3130  m_CDDInfoCache->Add(blob_id, true);
3131 }
3132 
3133 
3135 {
3136 public:
3137  CPSG_CDDAnnotBulk_Task(TReply reply, CPSG_TaskGroup& group, size_t idx)
3138  : CPSG_Task(reply, group), m_Idx(idx) {}
3139 
3140  ~CPSG_CDDAnnotBulk_Task(void) override {}
3141 
3142  size_t m_Idx;
3143  shared_ptr<CPSG_NamedAnnotInfo> m_AnnotInfo;
3144  shared_ptr<CPSG_NamedAnnotStatus> m_AnnotStatus;
3145  shared_ptr<CPSG_BlobInfo> m_BlobInfo;
3146  shared_ptr<CPSG_BlobData> m_BlobData;
3147 
3148  void Finish(void) override
3149  {
3150  m_AnnotInfo.reset();
3151  m_BlobInfo.reset();
3152  m_BlobData.reset();
3153  }
3154 
3155 protected:
3156  void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override {
3157  switch (item->GetType()) {
3159  m_AnnotInfo = static_pointer_cast<CPSG_NamedAnnotInfo>(item);
3160  break;
3162  m_AnnotStatus = static_pointer_cast<CPSG_NamedAnnotStatus>(item);
3163  if ( s_HasFailedStatus(*m_AnnotStatus) ) {
3164  m_Status = eFailed;
3165  RequestToCancel();
3166  }
3167  break;
3169  m_BlobInfo = static_pointer_cast<CPSG_BlobInfo>(item);
3170  break;
3172  m_BlobData = static_pointer_cast<CPSG_BlobData>(item);
3173  break;
3174  default:
3175  break;
3176  }
3177  }
3178 };
3179 
3180 
3181 static bool x_IsEmptyCDD(const CTSE_Info& tse)
3182 {
3183  // check if delayed TSE chunk is loaded
3184  if ( tse.x_NeedsDelayedMainChunk() ) {
3185  // not loaded yet, cannot tell if its empty
3186  return false;
3187  }
3188  // check core Seq-entry content
3189  auto core = tse.GetTSECore();
3190  if ( !core->IsSet() ) {
3191  // wrong entry type
3192  return false;
3193  }
3194  auto& seqset = core->GetSet();
3195  return seqset.GetSeq_set().empty() && seqset.GetAnnot().empty();
3196 }
3197 
3198 
3200  const TSeqIdSets& id_sets, TLoaded& loaded, TCDD_Locks& ret)
3201 {
3202  if (id_sets.empty()) return;
3203  _ASSERT(id_sets.size() == loaded.size());
3204  _ASSERT(id_sets.size() == ret.size());
3205 
3207  CPSG_TaskGroup group(*m_ThreadPool);
3208  vector<SCDDIds> cdd_ids(id_sets.size());
3209  vector<CRef<CPSG_CDDAnnotBulk_Task>> tasks;
3210  for (size_t i = 0; i < id_sets.size(); ++i) {
3211  if ( loaded[i] ) {
3212  continue;
3213  }
3214  const TIds& ids = id_sets[i];
3215  if ( ids.empty() ) {
3216  continue;
3217  }
3218  cdd_ids[i] = x_GetCDDIds(ids);
3219  // Skip if it's known that the bioseq has no CDDs.
3220  if (cdd_ids[i].gi && m_CDDInfoCache->Find(x_MakeLocalCDDEntryId(cdd_ids[i]))) {
3221  // no CDDs for this Seq-id
3222  loaded[i] = true;
3223  continue;
3224  }
3225  // Check if there's a loaded CDD blob.
3226  for (auto& id : ids) {
3228  data_source->GetLoadedBlob_ids(id, CDataSource::fLoaded_orphan_annots, blob_ids);
3229  bool have_cdd = false;
3230  for (auto& bid : blob_ids) {
3231  if (x_IsLocalCDDEntryId(dynamic_cast<const CPsgBlobId&>(*bid))) {
3232  have_cdd = true;
3233  break;
3234  }
3235  }
3236  if (have_cdd) {
3237  continue;
3238  }
3239  }
3241  if ( x_CheckAnnotCache(kCDDAnnotName, ids, data_source, nullptr, locks) ) {
3242  _ASSERT(locks.size() == 1);
3243  if ( !x_IsEmptyCDD(**locks.begin()) ) {
3244  ret[i] = *locks.begin();
3245  }
3246  loaded[i] = true;
3247  continue;
3248  }
3249 
3250  CPSG_BioIds bio_ids;
3251  for (auto& id : ids) {
3252  bio_ids.push_back(CPSG_BioId(id));
3253  }
3254  auto request = make_shared<CPSG_Request_NamedAnnotInfo>(
3255  std::move(bio_ids), annot_names, make_shared<size_t>(i));
3256  request->IncludeData(CPSG_Request_Biodata::eWholeTSE);
3257  auto reply = x_SendRequest(request);
3258  tasks.push_back(Ref(new CPSG_CDDAnnotBulk_Task(reply, group, i)));
3259  group.AddTask(tasks.back());
3260  }
3261 
3262  size_t failed_count = 0;
3263  typedef list<CRef<CPSG_CDDAnnotBulk_Task>> TTasks;
3264  TTasks skipped_tasks;
3265  list<shared_ptr<CPSG_Task_Guard>> guards;
3266  while (group.HasTasks()) {
3267  CRef<CPSG_CDDAnnotBulk_Task> task(group.GetTask<CPSG_CDDAnnotBulk_Task>().GetNCPointerOrNull());
3268  _ASSERT(task);
3269  guards.push_back(make_shared<CPSG_Task_Guard>(*task));
3270  if (task->GetStatus() == CThreadPool_Task::eFailed) {
3271  ++failed_count;
3272  continue;
3273  }
3274  auto idx = task->m_Idx;
3275  if (!task->m_AnnotInfo || !task->m_BlobInfo || !task->m_BlobData) {
3276  // no CDDs
3277  m_CDDInfoCache->Add(x_MakeLocalCDDEntryId(cdd_ids[idx]), true);
3278  loaded[idx] = true;
3279  continue;
3280  }
3281  auto annot_info = task->m_AnnotInfo;
3282  auto blob_info = task->m_BlobInfo;
3283  auto blob_data = task->m_BlobData;
3284  if ( s_SameId(blob_info->GetId<CPSG_BlobId>(), annot_info->GetBlobId()) &&
3285  s_SameId(blob_data->GetId<CPSG_BlobId>(), annot_info->GetBlobId()) ) {
3286 
3287  CDataLoader::TBlobId dl_blob_id;
3288  if ( kCreateLocalCDDEntries ) {
3289  if (!cdd_ids[idx].gi) continue;
3290  dl_blob_id = new CPsgBlobId(x_MakeLocalCDDEntryId(cdd_ids[idx]));
3291  }
3292  else {
3293  SPsgAnnotInfo::TInfos infos{annot_info};
3294  m_AnnotCache->Add(infos, kCDDAnnotName, id_sets[idx]);
3295  dl_blob_id = new CPsgBlobId(annot_info->GetBlobId().GetId());
3296  }
3297 
3298  CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
3299  if (!load_lock) continue;
3300  if (load_lock.IsLoaded()) {
3301  if ( !x_IsEmptyCDD(*load_lock) ) {
3302  ret[idx] = load_lock;
3303  }
3304  loaded[idx] = true;
3305  continue;
3306  }
3307  if ( kCreateLocalCDDEntries ) {
3308  x_CreateLocalCDDEntry(data_source, cdd_ids[idx]);
3309  }
3310  else {
3311  SPsgAnnotInfo::TInfos infos{annot_info};
3312  m_AnnotCache->Add(infos, kCDDAnnotName, id_sets[idx]);
3313  dl_blob_id = new CPsgBlobId(annot_info->GetBlobId().GetId());
3314  m_BlobMap->Add(annot_info->GetBlobId().GetId(), make_shared<SPsgBlobInfo>(*blob_info));
3315  }
3316 
3317  unique_ptr<CObjectIStream> in(GetBlobDataStream(*blob_info, *blob_data));
3318  if (!in.get()) {
3319  ++failed_count;
3320  continue;
3321  }
3322  CRef<CSeq_entry> entry(new CSeq_entry);
3323  *in >> *entry;
3324  if (kCreateLocalCDDEntries) {
3325  load_lock->SetSeq_entry(*entry);
3327  }
3328  else {
3329  load_lock->SetSeq_entry(*entry);
3330  load_lock.SetLoaded();
3331  }
3332  if ( !x_IsEmptyCDD(*load_lock) ) {
3333  ret[idx] = load_lock;
3334  }
3335  loaded[idx] = true;
3336  }
3337  }
3338  group.WaitAll();
3339  /*
3340  NON_CONST_ITERATE(TTasks, it, skipped_tasks) {
3341  CPSG_CDDAnnotBulk_Task& task = **it;
3342  SReplyResult result = task.WaitForSkipped();
3343  if (!result.lock) {
3344  // Force reloading blob
3345  result = x_RetryBlobRequest(task.m_ReplyResult.blob_id, data_source, task.m_Id);
3346  }
3347  if (result.lock) ret[task.m_Idx] = result.lock;
3348  }
3349  */
3350  if ( failed_count ) {
3351  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
3352  "failed to load "<<failed_count<<" CDD annots in bulk request");
3353  }
3354 }
3355 
3356 
3358 {
3359 }
3360 
3361 
3362 void CPSGDataLoader_Impl::GetBulkIds(const TIds& ids, TLoaded& loaded, TBulkIds& ret)
3363 {
3365  cref(ids), ref(loaded), ref(ret)),
3366  "GetBulkIds",
3368 }
3369 
3370 
3372 {
3373  vector<shared_ptr<SPsgBioseqInfo>> infos;
3374  infos.resize(ret.size());
3375  auto counts = x_GetBulkBioseqInfo(ids, loaded, infos);
3376  if ( counts.first ) {
3377  // have loaded infos
3378  for (size_t i = 0; i < infos.size(); ++i) {
3379  if (loaded[i] || !infos[i].get()) continue;
3380  ITERATE(SPsgBioseqInfo::TIds, it, infos[i]->ids) {
3381  ret[i].push_back(*it);
3382  }
3383  loaded[i] = true;
3384  }
3385  }
3386  if ( counts.second ) {
3387  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
3388  "failed to load "<<counts.second<<" seq-ids in bulk request");
3389  }
3390 }
3391 
3392 
3393 void CPSGDataLoader_Impl::GetAccVers(const TIds& ids, TLoaded& loaded, TIds& ret)
3394 {
3396  cref(ids), ref(loaded), ref(ret)),
3397  "GetAccVers",
3399 }
3400 
3401 
3402 void CPSGDataLoader_Impl::GetAccVersOnce(const TIds& ids, TLoaded& loaded, TIds& ret)
3403 {
3404  vector<shared_ptr<SPsgBioseqInfo>> infos;
3405  infos.resize(ret.size());
3406  auto counts = x_GetBulkBioseqInfo(ids, loaded, infos);
3407  if ( counts.first ) {
3408  // have loaded infos
3409  for (size_t i = 0; i < infos.size(); ++i) {
3410  if (loaded[i] || !infos[i].get()) continue;
3411  CSeq_id_Handle idh = infos[i]->canonical;
3412  if (idh.IsAccVer()) {
3413  ret[i] = idh;
3414  }
3415  loaded[i] = true;
3416  }
3417  }
3418  if ( counts.second ) {
3419  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
3420  "failed to load "<<counts.second<<" acc.ver in bulk request");
3421  }
3422 }
3423 
3424 
3425 void CPSGDataLoader_Impl::GetGis(const TIds& ids, TLoaded& loaded, TGis& ret)
3426 {
3428  cref(ids), ref(loaded), ref(ret)),
3429  "GetGis",
3431 }
3432 
3433 
3434 void CPSGDataLoader_Impl::GetGisOnce(const TIds& ids, TLoaded& loaded, TGis& ret)
3435 {
3436  vector<shared_ptr<SPsgBioseqInfo>> infos;
3437  infos.resize(ret.size());
3438  auto counts = x_GetBulkBioseqInfo(ids, loaded, infos);
3439  if ( counts.first ) {
3440  // have loaded infos
3441  for (size_t i = 0; i < infos.size(); ++i) {
3442  if (loaded[i] || !infos[i].get()) continue;
3443  ret[i] = infos[i]->gi;
3444  loaded[i] = true;
3445  }
3446  }
3447  if ( counts.second ) {
3448  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
3449  "failed to load "<<counts.second<<" acc.ver in bulk request");
3450  }
3451 }
3452 
3453 
3454 void CPSGDataLoader_Impl::GetLabels(const TIds& ids, TLoaded& loaded, TLabels& ret)
3455 {
3457  cref(ids), ref(loaded), ref(ret)),
3458  "GetLabels",
3460 }
3461 
3462 
3464 {
3465  vector<shared_ptr<SPsgBioseqInfo>> infos;
3466  infos.resize(ret.size());
3467  auto counts = x_GetBulkBioseqInfo(ids, loaded, infos);
3468  if ( counts.first ) {
3469  // have loaded infos
3470  for (size_t i = 0; i < infos.size(); ++i) {
3471  if (loaded[i] || !infos[i].get()) continue;
3472  ret[i] = objects::GetLabel(infos[i]->ids);
3473  if (!ret[i].empty()) loaded[i] = true;
3474  }
3475  }
3476  if ( counts.second ) {
3477  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
3478  "failed to load "<<counts.second<<" labels in bulk request");
3479  }
3480 }
3481 
3482 
3484 {
3486  cref(ids), ref(loaded), ref(ret)),
3487  "GetSequenceLengths",
3489 }
3490 
3491 
3493 {
3494  vector<shared_ptr<SPsgBioseqInfo>> infos;
3495  infos.resize(ret.size());
3496  auto counts = x_GetBulkBioseqInfo(ids, loaded, infos);
3497  if ( counts.first ) {
3498  // have loaded infos
3499  for (size_t i = 0; i < infos.size(); ++i) {
3500  if (loaded[i] || !infos[i].get()) continue;
3501  auto length = infos[i]->length;
3502  ret[i] = length > 0? TSeqPos(length): kInvalidSeqPos;
3503  loaded[i] = true;
3504  }
3505  }
3506  if ( counts.second ) {
3507  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
3508  "failed to load "<<counts.second<<" sequence lengths in bulk request");
3509  }
3510 }
3511 
3512 
3514 {
3516  cref(ids), ref(loaded), ref(ret)),
3517  "GetSequenceTypes",
3519 }
3520 
3521 
3523 {
3524  vector<shared_ptr<SPsgBioseqInfo>> infos;
3525  infos.resize(ret.size());
3526  auto counts = x_GetBulkBioseqInfo(ids, loaded, infos);
3527  if ( counts.first ) {
3528  // have loaded infos
3529  for (size_t i = 0; i < infos.size(); ++i) {
3530  if (loaded[i] || !infos[i].get()) continue;
3531  ret[i] = infos[i]->molecule_type;
3532  loaded[i] = true;
3533  }
3534  }
3535  if ( counts.second ) {
3536  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
3537  "failed to load "<<counts.second<<" sequence types in bulk request");
3538  }
3539 }
3540 
3541 
3543 {
3545  data_source, cref(ids), ref(loaded), ref(ret)),
3546  "GetSequenceStates",
3548 }
3549 
3550 
3552 {
3553  TBioseqAndBlobInfos infos;
3554  infos.resize(ret.size());
3555  auto counts = x_GetBulkBioseqAndBlobInfo(data_source, ids, loaded, infos);
3556  if ( counts.first ) {
3557  // have loaded infos
3558  for (size_t i = 0; i < infos.size(); ++i) {
3559  if (loaded[i] || (!infos[i].first.get() || !infos[i].second.get())) continue;
3560  auto bioseq_info = infos[i].first;
3561  auto blob_info = infos[i].second;
3562  ret[i] = bioseq_info->GetBioseqStateFlags();
3563  ret[i] |= blob_info->blob_state_flags;
3564  if (!(bioseq_info->GetBioseqStateFlags() & CBioseq_Handle::fState_dead) &&
3565  !(bioseq_info->GetChainStateFlags() & CBioseq_Handle::fState_dead)) {
3566  ret[i] &= ~CBioseq_Handle::fState_dead;
3567  }
3568  loaded[i] = true;
3569  }
3570  }
3571  if ( counts.second ) {
3572  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
3573  "failed to load "<<counts.second<<" sequence states in bulk request");
3574  }
3575 }
3576 
3577 
3579 {
3581  cref(ids), ref(loaded), ref(ret), ref(known)),
3582  "GetSequenceHashes",
3584 }
3585 
3586 
3588 {
3589  vector<shared_ptr<SPsgBioseqInfo>> infos;
3590  infos.resize(ret.size());
3591  auto counts = x_GetBulkBioseqInfo(ids, loaded, infos);
3592  if ( counts.first ) {
3593  // have loaded infos
3594  for (size_t i = 0; i < infos.size(); ++i) {
3595  if (loaded[i] || !infos[i].get()) continue;
3596  ret[i] = infos[i]->hash;
3597  known[i] = infos[i]->hash != 0;
3598  loaded[i] = true;
3599  }
3600  }
3601  if ( counts.second ) {
3602  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
3603  "failed to load "<<counts.second<<" sequence hashes in bulk request");
3604  }
3605 }
3606 
3607 
3608 shared_ptr<CPSG_Reply> CPSGDataLoader_Impl::x_SendRequest(shared_ptr<CPSG_Request> request)
3609 {
3610  if ( m_RequestContext ) {
3611  request->SetRequestContext(m_RequestContext);
3612  }
3613  return m_Queue->SendRequestAndGetReply(request, DEFAULT_DEADLINE);
3614 }
3615 
3616 
3618 CPSGDataLoader_Impl::x_RetryBlobRequest(const string& blob_id, CDataSource* data_source, CSeq_id_Handle req_idh)
3619 {
3620 #ifdef LOCK4GET
3621  CDataLoader::TBlobId dl_blob_id(new CPsgBlobId(blob_id));
3622  CTSE_LoadLock load_lock = data_source->GetTSE_LoadLock(dl_blob_id);
3623  if ( load_lock.IsLoaded() ) {
3624  _TRACE("x_RetryBlobRequest() already loaded " << blob_id);
3625  SReplyResult ret;
3626  ret.lock = load_lock;
3627  ret.blob_id = blob_id;
3628  return ret;
3629  }
3630 #else
3631  CTSE_LoadLock load_lock;
3632  {{
3633  CDataLoader::TBlobId dl_blob_id(new CPsgBlobId(blob_id));
3634  CTSE_LoadLock load_lock = data_source->GetTSE_LoadLockIfLoaded(dl_blob_id);
3635  if ( load_lock && load_lock.IsLoaded() ) {
3636  _TRACE("x_RetryBlobRequest() already loaded " << blob_id);
3637  SReplyResult ret;
3638  ret.lock = load_lock;
3639  ret.blob_id = blob_id;
3640  return ret;
3641  }
3642  }}
3643 #endif
3644 
3645  CPSG_BlobId req_blob_id(blob_id);
3646  auto blob_request = make_shared<CPSG_Request_Blob>(req_blob_id);
3647  blob_request->IncludeData(m_TSERequestMode);
3648  auto blob_reply = x_SendRequest(blob_request);
3649  return x_ProcessBlobReply(blob_reply, data_source, req_idh, false, false, &load_lock);
3650 }
3651 
3652 
3654  shared_ptr<CPSG_Reply> reply,
3655  CDataSource* data_source,
3656  CSeq_id_Handle req_idh,
3657  bool retry,
3658  bool lock_asap,
3659  CTSE_LoadLock* load_lock)
3660 {
3661  SReplyResult ret;
3662 
3663  if (!reply) {
3664  _TRACE("Request failed: null reply");
3665  return ret;
3666  }
3667 
3668  CPSG_TaskGroup group(*m_ThreadPool);
3669  CRef<CPSG_Blob_Task> task(
3670  new CPSG_Blob_Task(reply, group, req_idh, data_source, *this, lock_asap, load_lock));
3671  CPSG_Task_Guard guard(*task);
3672  group.AddTask(task);
3673  group.WaitAll();
3674 
3675  if (task->GetStatus() == CThreadPool_Task::eCompleted) {
3676  if (task->m_Skipped) {
3677  ret = task->WaitForSkipped();
3678  if (!ret.lock && retry) {
3679  // Force reloading blob
3680  ret = x_RetryBlobRequest(task->m_ReplyResult.blob_id, data_source, req_idh);
3681  }
3682  }
3683  else {
3684  ret = task->m_ReplyResult;
3685  if (ret.blob_info) {
3686  x_AdjustBlobState(*ret.blob_info, req_idh);
3687  }
3688  }
3689  }
3690  else if ( !GetGetBlobByIdShouldFail() &&
3691  (lock_asap || load_lock) &&
3692  !task->m_ReplyResult.blob_id.empty() &&
3693  retry &&
3694  !task->GotNotFound() &&
3695  !task->GotForbidden() ) {
3696  // blob is required, but not received, yet blob_id is known, so we retry
3697  ret = x_RetryBlobRequest(task->m_ReplyResult.blob_id, data_source, req_idh);
3698  if ( !ret.lock ) {
3699  _TRACE("Failed to load blob for " << req_idh.AsString()<<" @ "<<CStackTrace());
3700  NCBI_THROW(CLoaderException, eLoaderFailed,
3701  "CPSGDataLoader::GetRecords("+req_idh.AsString()+") failed");
3702  }
3703  }
3704  else if ( task->GetStatus() == CThreadPool_Task::eFailed ) {
3705  NCBI_THROW(CLoaderException, eLoaderFailed,
3706  "CPSGDataLoader::GetRecords("+req_idh.AsString()+") failed");
3707  }
3708  else if ( task->GotNotFound() ) {
3710  "CPSGDataLoader: No blob for seq_id="<<req_idh<<" blob_id="<<task->m_ReplyResult.blob_id);
3711  /*
3712  // not sure about reaction on timed out replies after eNotFound reply was received
3713  // most probably it still should be treated as error
3714  // here's eNoData reply code in case it will be decided to be acceptable
3715  CBioseq_Handle::TBioseqStateFlags state =
3716  CBioseq_Handle::fState_no_data;
3717  if ( task->m_ReplyResult.blob_info ) {
3718  state |= task->m_ReplyResult.blob_info->blob_state_flags;
3719  }
3720  NCBI_THROW2(CBlobStateException, eBlobStateError,
3721  "blob state error for "+req_idh.AsString(), state);
3722  */
3723  }
3724  else if ( task->GotForbidden() ) {
3727  if ( task->m_ReplyResult.blob_info ) {
3728  state |= task->m_ReplyResult.blob_info->blob_state_flags;
3729  }
3730  NCBI_THROW2(CBlobStateException, eBlobStateError,
3731  "blob state error for "+req_idh.AsString(), state);
3732  }
3733  else {
3734  _TRACE("Failed to load blob for " << req_idh.AsString()<<" @ "<<CStackTrace());
3735  NCBI_THROW(CLoaderException, eLoaderFailed,
3736  "CPSGDataLoader::GetRecords("+req_idh.AsString()+") failed");
3737  }
3738  if ( !ret.lock && task->GotForbidden() ) {
3741  if ( task->m_ReplyResult.blob_info ) {
3742  state |= task->m_ReplyResult.blob_info->blob_state_flags;
3743  }
3744  NCBI_THROW2(CBlobStateException, eBlobStateError,
3745  "blob state error for "+req_idh.AsString(), state);
3746  }
3747  return ret;
3748 }
3749 
3750 
3752 {
3753 public:
3755  : CPSG_Task(reply, group) {}
3756 
3757  ~CPSG_BioseqInfo_Task(void) override {}
3758 
3759  shared_ptr<CPSG_BioseqInfo> m_BioseqInfo;
3760 
3761  void Finish(void) override {
3762  m_BioseqInfo.reset();
3763  }
3764 
3765 protected:
3766  void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override {
3767  if (item->GetType() == CPSG_ReplyItem::eBioseqInfo) {
3768  m_BioseqInfo = static_pointer_cast<CPSG_BioseqInfo>(item);
3769  }
3770  }
3771 };
3772 
3773 
3775 {
3776  // Check cache
3777  if ( auto seq_info = m_BioseqCache->Get(idh) ) {
3778  return seq_info->blob_id;
3779  }
3780  return string();
3781 }
3782 
3783 
3784 shared_ptr<SPsgBioseqInfo> CPSGDataLoader_Impl::x_GetBioseqInfo(const CSeq_id_Handle& idh)
3785 {
3786  if ( shared_ptr<SPsgBioseqInfo> ret = m_BioseqCache->Get(idh) ) {
3787  return ret;
3788  }
3789 
3790  CPSG_BioId bio_id(idh);
3791  shared_ptr<CPSG_Request_Resolve> request = make_shared<CPSG_Request_Resolve>(std::move(bio_id));
3792  request->IncludeInfo(CPSG_Request_Resolve::fAllInfo);
3793  auto reply = x_SendRequest(request);
3794  if (!reply) {
3795  _TRACE("Request failed: null reply");
3796  NCBI_THROW(CLoaderException, eLoaderFailed, "null reply for "+idh.AsString());
3797  return nullptr;
3798  }
3799 
3800  CPSG_TaskGroup group(*m_ThreadPool);
3801  CRef<CPSG_BioseqInfo_Task> task(new CPSG_BioseqInfo_Task(reply, group));
3802  CPSG_Task_Guard guard(*task);
3803  group.AddTask(task);
3804  group.WaitAll();
3805 
3806  if (task->GetStatus() != CThreadPool_Task::eCompleted) {
3807  _TRACE("Failed to get bioseq info for " << idh.AsString() << " @ "<<CStackTrace());
3808  NCBI_THROW(CLoaderException, eLoaderFailed, "failed to get bioseq info for "+idh.AsString());
3809  }
3810  if (!task->m_BioseqInfo) {
3811  _TRACE("No bioseq info for " << idh.AsString());
3812  return nullptr;
3813  }
3814 
3815  return m_BioseqCache->Add(*task->m_BioseqInfo, idh);
3816 }
3817 
3818 
3820 {
3821 public:
3822  CPSG_IpgTaxId_Task(size_t idx, bool is_wp_acc, TReply reply, CPSG_TaskGroup& group)
3823  : CPSG_Task(reply, group), m_Idx(idx), m_IsWPAcc(is_wp_acc) {}
3824 
3825  ~CPSG_IpgTaxId_Task(void) override {}
3826 
3827  size_t m_Idx = 0;
3828  bool m_IsWPAcc = false;
3830 
3831  void Finish(void) override {
3833  }
3834 
3835 protected:
3836  void ProcessReplyItem(shared_ptr<CPSG_ReplyItem> item) override {
3837  if (m_TaxId != INVALID_TAX_ID) return;
3838  if (item->GetType() == CPSG_ReplyItem::eIpgInfo) {
3839  auto ipg_info = static_pointer_cast<CPSG_IpgInfo>(item);
3840  if (!m_IsWPAcc) {
3841  m_TaxId = ipg_info->GetTaxId();
3842  }
3843  else if (ipg_info->GetNucleotide().empty()) {
3844  m_TaxId = ipg_info->GetTaxId();
3845  }
3846  }
3847  }
3848 };
3849 
3850 
3851 static bool s_IsIpgAccession(const CSeq_id_Handle& idh, string& acc_ver, bool& is_wp_acc)
3852 {
3853  if (!idh) return false;
3854  auto seq_id = idh.GetSeqId();
3855  auto text_id = seq_id->GetTextseq_Id();
3856  if (!text_id) return false;
3857  CSeq_id::EAccessionInfo acc_info = idh.IdentifyAccession();
3858  const int kAccFlags = CSeq_id::fAcc_prot | CSeq_id::fAcc_vdb_only;
3859  if ((acc_info & kAccFlags) != kAccFlags) return false;
3860  if ( !text_id->IsSetAccession() || !text_id->IsSetVersion() ) return false;
3861  acc_ver = text_id->GetAccession()+'.'+NStr::NumericToString(text_id->GetVersion());
3863  return true;
3864 }
3865 
3866 
3868 {
3869  if (!m_IpgTaxIdMap) return INVALID_TAX_ID;
3870 
3871  TTaxId cached = m_IpgTaxIdMap->Find(idh);
3872  if (cached != INVALID_TAX_ID) return cached;
3873 
3874  string acc_ver;
3875  bool is_wp_acc = false;
3876  if (!s_IsIpgAccession(idh, acc_ver, is_wp_acc)) return INVALID_TAX_ID;
3877 
3878  shared_ptr<CPSG_Request_IpgResolve> request = make_shared<CPSG_Request_IpgResolve>(acc_ver);
3879  auto reply = x_SendRequest(request);
3880  if (!reply) {
3881  _TRACE("Request failed: null reply");
3882  NCBI_THROW(CLoaderException, eLoaderFailed, "null reply for "+idh.AsString());
3883  return INVALID_TAX_ID;
3884  }
3885 
3886  CPSG_TaskGroup group(*m_ThreadPool);
3887  CRef<CPSG_IpgTaxId_Task> task(new CPSG_IpgTaxId_Task(0, is_wp_acc, reply, group));
3888  CPSG_Task_Guard guard(*task);
3889  group.AddTask(task);
3890  group.WaitAll();
3891 
3892  if (task->GetStatus() != CThreadPool_Task::eCompleted) {
3893  _TRACE("Failed to get ipg info for " << idh.AsString() << " @ "<<CStackTrace());
3894  NCBI_THROW(CLoaderException, eLoaderFailed, "failed to get ipg info for "+idh.AsString());
3895  }
3896  m_IpgTaxIdMap->Add(idh, task->m_TaxId);
3897  return task->m_TaxId;
3898 }
3899 
3900 
3902 {
3903  if (!m_IpgTaxIdMap) return;
3904 
3905  CPSG_TaskGroup group(*m_ThreadPool);
3906  for (size_t i = 0; i < ids.size(); ++i) {
3907  TTaxId cached = m_IpgTaxIdMap->Find(ids[i]);
3908  if (cached != INVALID_TAX_ID) {
3909  ret[i] = cached;
3910  loaded[i] = true;
3911  continue;
3912  }
3913 
3914  string acc_ver;
3915  bool is_wp_acc = false;
3916  if (!s_IsIpgAccession(ids[i], acc_ver, is_wp_acc)) continue;
3917 
3918  shared_ptr<CPSG_Request_IpgResolve> request = make_shared<CPSG_Request_IpgResolve>(acc_ver);
3919  auto reply = x_SendRequest(request);
3920  if (!reply) {
3921  _TRACE("Request failed: null reply");
3922  NCBI_THROW(CLoaderException, eLoaderFailed, "null reply for "+ids[i].AsString());
3923  continue;
3924  }
3925 
3926  CRef<CPSG_IpgTaxId_Task> task(new CPSG_IpgTaxId_Task(i, is_wp_acc, reply, group));
3927  group.AddTask(task);
3928  }
3929 
3930  list<shared_ptr<CPSG_Task_Guard>> guards;
3931  int failed_count = 0;
3932  while (group.HasTasks()) {
3933  CRef<CPSG_IpgTaxId_Task> task(group.GetTask<CPSG_IpgTaxId_Task>().GetNCPointerOrNull());
3934  _ASSERT(task);
3935  guards.push_back(make_shared<CPSG_Task_Guard>(*task));
3936 
3937  if (task->GetStatus() == CThreadPool_Task::eFailed) {
3938  ++failed_count;
3939  continue;
3940  }
3941  if (task->m_TaxId != INVALID_TAX_ID) {
3942  m_IpgTaxIdMap->Add(ids[task->m_Idx], task->m_TaxId);
3943  ret[task->m_Idx] = task->m_TaxId;
3944  loaded[task->m_Idx] = true;
3945  }
3946  }
3947  if ( failed_count ) {
3948  NCBI_THROW_FMT(CLoaderException, eLoaderFailed,
3949  "failed to load "<<failed_count<<" tax-ids");
3950  }
3951 }
3952 
3953 
3956  const CSeq_id_Handle& idh)
3957 {
3958  shared_ptr<SPsgBioseqInfo> bioseq_info = m_BioseqCache->Get(idh);
3959  shared_ptr<SPsgBlobInfo> blob_info;
3960  if ( bioseq_info && !bioseq_info->blob_id.empty() ) {
3961  // get by blob id
3962  blob_info = x_GetBlobInfo(data_source, bioseq_info->blob_id);
3963  if (blob_info) {
3964  x_AdjustBlobState(*blob_info, idh);
3965  }
3966  }
3967  else {
3968  CPSG_BioId bio_id(idh);
3969  auto request1 = make_shared<CPSG_Request_Resolve>(bio_id);
3970  request1->IncludeInfo(CPSG_Request_Resolve::fAllInfo);
3971  auto request2 = make_shared<CPSG_Request_Biodata>(std::move(bio_id));
3972  request2->IncludeData(CPSG_Request_Biodata::eNoTSE);
3973 
3974  auto reply1 = x_SendRequest(request1);
3975  auto reply2 = x_SendRequest(request2);
3976  if ( !reply1 || !reply2 ) {
3977  NCBI_THROW(CLoaderException, eLoaderFailed, "null reply for "+idh.AsString());
3978  }
3979 
3980  CPSG_TaskGroup group(*m_ThreadPool);
3981  CRef<CPSG_BioseqInfo_Task> task1(new CPSG_BioseqInfo_Task(reply1, group));
3982  CPSG_Task_Guard guard1(*task1);
3983  group.AddTask(task1);
3984  CRef<CPSG_Blob_Task> task2(new CPSG_Blob_Task(reply2, group, idh, data_source, *this));
3985  CPSG_Task_Guard guard2(*task2);
3986  group.AddTask(task2);
3987  group.WaitAll();
3988 
3989  if ( task1->GetStatus() != CThreadPool_Task::eCompleted ) {
3990  _TRACE("Failed to get bioseq info for " << idh.AsString() << " @ "<<CStackTrace());
3991  NCBI_THROW(CLoaderException, eLoaderFailed,
3992  "failed to get bioseq info for "+idh.AsString());
3993  }
3994  if ( !task1->m_BioseqInfo ) {
3995  return TBioseqAndBlobInfo();
3996  }
3997  bioseq_info = m_BioseqCache->Add(*task1->m_BioseqInfo, idh);
3998 
3999  if ( task2->GetStatus() != CThreadPool_Task::eCompleted ) {
4000  _TRACE("Failed to get blob info for " << idh.AsString() << " @ "<<CStackTrace());
4001  NCBI_THROW(CLoaderException, eLoaderFailed,
4002  "failed to get blob info for "+idh.AsString());
4003  }
4004  if ( task2->m_Skipped ) {
4005  blob_info = task2->WaitForSkipped().blob_info;
4006  if ( !blob_info ) {
4007  blob_info = x_GetBlobInfo(data_source, bioseq_info->blob_id);
4008  }
4009  }
4010  else {
4011  blob_info = task2->m_ReplyResult.blob_info;
4012  }
4013  }
4014  return make_pair(bioseq_info, blob_info);
4015 }
4016 
4017 
4018 shared_ptr<SPsgBlobInfo> CPSGDataLoader_Impl::x_GetBlobInfo(CDataSource* data_source,
4019  const string& blob_id)
4020 {
4021  // lookup cached blob info
4022  if ( shared_ptr<SPsgBlobInfo> ret = m_BlobMap->Find(blob_id) ) {
4023  return ret;
4024  }
4025  // use blob info from already loaded TSE
4026  if ( data_source ) {
4027  CDataLoader::TBlobId dl_blob_id(new CPsgBlobId(blob_id));
4028  auto load_lock = data_source->GetTSE_LoadLockIfLoaded(dl_blob_id);
4029  if ( load_lock && load_lock.IsLoaded() ) {
4030  return make_shared<SPsgBlobInfo>(*load_lock);
4031  }
4032  }
4033  // load blob info from PSG
4034  CPSG_BlobId bid(blob_id);
4035  auto request = make_shared<CPSG_Request_Blob>(bid);
4036  request->IncludeData(CPSG_Request_Biodata::eNoTSE);
4037  auto reply = x_SendRequest(request);
4038  return x_ProcessBlobReply(reply, nullptr, CSeq_id_Handle(), false).blob_info;
4039 }
4040 
4041 
4043 {
4044  if (!idh) return;
4045  if (!(blob_info.blob_state_flags & CBioseq_Handle::fState_dead)) return;
4046  auto seq_info = m_BioseqCache->Get(idh);
4047  if (!seq_info) return;
4048  auto seq_state = seq_info->GetBioseqStateFlags();
4049  auto chain_state = seq_info->GetChainStateFlags();
4050  if (seq_state == CBioseq_Handle::fState_none &&
4051  chain_state == CBioseq_Handle::fState_none) {
4052  blob_info.blob_state_flags &= ~CBioseq_Handle::fState_dead;
4053  }
4054 }
4055 
4056 
4058  const SPsgBlobInfo& psg_blob_info,
4059  const CPSG_BlobInfo& blob_info,
4060  const CPSG_BlobData& blob_data,
4061  CTSE_LoadLock& load_lock,
4062  ESplitInfoType split_info_type)
4063 {
4064  if ( !load_lock.IsLoaded() ) {
4065  load_lock->SetBlobVersion(psg_blob_info.GetBlobVersion());
4066  load_lock->SetBlobState(psg_blob_info.blob_state_flags);
4067  }
4068 
4069  unique_ptr<CObjectIStream> in(GetBlobDataStream(blob_info, blob_data));
4070  if (!in.get()) {
4071  _TRACE("Failed to open blob data stream for blob-id " << blob_info.GetId()->Repr());
4072  return;
4073  }
4074 
4075  if ( split_info_type == eIsSplitInfo ) {
4076  CRef<CID2S_Split_Info> split_info(new CID2S_Split_Info);
4077  *in >> *split_info;
4078  if ( s_GetDebugLevel() >= 8 ) {
4079  LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
4080  MSerial_AsnText<<*split_info);
4081  }
4082  CSplitParser::Attach(*load_lock, *split_info);
4083  }
4084  else {
4085  CRef<CSeq_entry> entry(new CSeq_entry);
4086  *in >> *entry;
4087  if ( s_GetDebugLevel() >= 8 ) {
4088  LOG_POST(Info<<"PSG loader: TSE "<<load_lock->GetBlobId().ToString()<<" "<<
4089  MSerial_AsnText<<*entry);
4090  }
4091  load_lock->SetSeq_entry(*entry);
4092  }
4093  if ( m_AddWGSMasterDescr ) {
4095  }
4096 }
4097 
4098 
4100  EMainChunkType main_chunk_type)
4101 {
4102  if ( main_chunk_type == eDelayedMainChunk ) {
4104  //_ASSERT(!load_lock->x_NeedsDelayedMainChunk());
4105  }
4106  else {
4107  if ( s_GetDebugLevel() >= 6 ) {
4108  LOG_POST("calling SetLoaded("<<load_lock->GetBlobId().ToString()<<")");
4109  }
4110  load_lock.SetLoaded();
4111  }
4112 }
4113 
4114 
4116  const CPSG_BlobInfo& blob_info,
4117  const CPSG_BlobData& blob_data)
4118 {
4119  istream& data_stream = blob_data.GetStream();
4120  CNcbiIstream* in = &data_stream;
4121  unique_ptr<CNcbiIstream> z_stream;
4122  CObjectIStream* ret = nullptr;
4123 
4124  if (blob_info.GetCompression() == "gzip") {
4125  z_stream.reset(new CCompressionIStream(data_stream,
4128  in = z_stream.get();
4129  }
4130  else if (!blob_info.GetCompression().empty()) {
4131  _TRACE("Unsupported data compression: '" << blob_info.GetCompression() << "'");
4132  return nullptr;
4133  }
4134 
4136  if (blob_info.GetFormat() == "asn.1") {
4138  }
4139  else if (blob_info.GetFormat() == "asn1-text") {
4140  ret = CObjectIStream::Open(eSerial_AsnText, *in, own);
4141  }
4142  else if (blob_info.GetFormat() == "xml") {
4143  ret = CObjectIStream::Open(eSerial_Xml, *in, own);
4144  }
4145  else if (blob_info.GetFormat() == "json") {
4146  ret = CObjectIStream::Open(eSerial_Json, *in, own);
4147  }
4148  else {
4149  _TRACE("Unsupported data format: '" << blob_info.GetFormat() << "'");
4150  return nullptr;
4151  }
4152  _ASSERT(ret);
4153  z_stream.release();
4154  return ret;
4155 }
4156 
4157 
4159  const TIds& ids,
4160  const TLoaded& loaded,
4161  TBioseqInfos& ret)
4162 {
4163  pair<size_t, size_t> counts(0, 0);
4164  CPSG_TaskGroup group(*m_ThreadPool);
4165  typedef map<CRef<CPSG_BioseqInfo_Task>, size_t> TTasks;
4166  TTasks tasks;
4167  list<shared_ptr<CPSG_Task_Guard>> guards;
4168  for (size_t i = 0; i < ids.size(); ++i) {
4169  if (loaded[i]) continue;
4170  if ( CannotProcess(ids[i]) ) {
4171  continue;
4172  }
4173  ret[i] = m_BioseqCache->Get(ids[i]);
4174  if (ret[i]) {
4175  counts.first += 1;
4176  continue;
4177  }
4178  CPSG_BioId bio_id(ids[i]);
4179  shared_ptr<CPSG_Request_Resolve> request = make_shared<CPSG_Request_Resolve>(std::move(bio_id));
4180  request->IncludeInfo(CPSG_Request_Resolve::fAllInfo);
4181  auto reply = x_SendRequest(request);
4182  CRef<CPSG_BioseqInfo_Task> task(new CPSG_BioseqInfo_Task(reply, group));
4183  guards.push_back(make_shared<CPSG_Task_Guard>(*task));
4184  tasks[task] = i;
4185  group.AddTask(task);
4186  }
4187  while (group.HasTasks()) {
4189  _ASSERT(task);
4190  TTasks::const_iterator it = tasks.find(task);
4191  _ASSERT(it != tasks.end());
4192  if (task->GetStatus() == CThreadPool_Task::eFailed) {
4193  _TRACE("Failed to load bioseq info for " << ids[it->second].AsString());
4194  counts.second += 1;
4195  continue;
4196  }
4197  if (!task->m_BioseqInfo) {
4198  _TRACE("No bioseq info for " << ids[it->second].AsString());
4199  // not loaded and no failure
4200  continue;
4201  }
4202  _ASSERT(task->m_BioseqInfo);
4203  ret[it->second] = m_BioseqCache->Add(*task->m_BioseqInfo, ids[it->second]);
4204  counts.first += 1;
4205  }
4206  return counts;
4207 }
4208 
4209 
4211  CDataSource* data_source,
4212  const TIds& ids,
4213  const TLoaded& loaded,
4214  TBioseqAndBlobInfos& ret)
4215 {
4216  pair<size_t, size_t> counts(0, 0);
4217  CPSG_TaskGroup group(*m_ThreadPool);
4218  typedef map<CRef<CPSG_Task>, size_t> TTasks;
4219  TTasks tasks;
4220  vector<bool> errors(ids.size());
4221  list<shared_ptr<CPSG_Task_Guard>> guards;
4222 
4223  for (size_t i = 0; i < ids.size(); ++i) {
4224  if (loaded[i]) continue;
4225  if (CannotProcess(ids[i])) continue;
4226  CPSG_BioId bio_id(ids[i]);
4227  shared_ptr<SPsgBioseqInfo> bioseq_info = m_BioseqCache->Get(ids[i]);
4228  shared_ptr<SPsgBlobInfo> blob_info;
4229  if (bioseq_info && !bioseq_info->blob_id.empty()) {
4230  ret[i].first = bioseq_info;
4231  blob_info = m_BlobMap->Find(bioseq_info->blob_id);
4232  if (blob_info) {
4233  ret[i].second = blob_info;
4234  counts.first += 1;
4235  continue;
4236  }
4237  }
4238  else {
4239  auto bioseq_request = make_shared<CPSG_Request_Resolve>(bio_id);
4240  bioseq_request->IncludeInfo(CPSG_Request_Resolve::fAllInfo);
4241  auto bioseq_reply = x_SendRequest(bioseq_request);
4242  CRef<CPSG_BioseqInfo_Task> bioseq_task(new CPSG_BioseqInfo_Task(bioseq_reply, group));
4243  guards.push_back(make_shared<CPSG_Task_Guard>(*bioseq_task));
4244  tasks[bioseq_task] = i;
4245  group.AddTask(bioseq_task);
4246  }
4247  auto blob_request = make_shared<CPSG_Request_Biodata>(std::move(bio_id));
4248  blob_request->IncludeData(CPSG_Request_Biodata::eNoTSE);
4249  auto blob_reply = x_SendRequest(blob_request);
4250  CRef<CPSG_Blob_Task> blob_task(new CPSG_Blob_Task(blob_reply, group, ids[i], data_source, *this));
4251  guards.push_back(make_shared<CPSG_Task_Guard>(*blob_task));
4252  tasks[blob_task] = i;
4253  group.AddTask(blob_task);
4254  }
4255  while (group.HasTasks()) {
4256  CRef<CPSG_Task> task = group.GetTask<CPSG_Task>();
4257  _ASSERT(task);
4258  TTasks::iterator it = tasks.find(task);
4259  _ASSERT(it != tasks.end());
4260  size_t i = it->second;
4261 
4262  CRef<CPSG_BioseqInfo_Task> bioseq_task = Ref(dynamic_cast<CPSG_BioseqInfo_Task*>(task.GetNCPointerOrNull()));
4263  if (bioseq_task) {
4264  _ASSERT(!ret[i].first);
4265  if (bioseq_task->GetStatus() == CThreadPool_Task::eFailed) {
4266  _TRACE("Failed to load bioseq info for " << ids[i].AsString());
4267  errors[i] = true;
4268  continue;
4269  }
4270  if (!bioseq_task->m_BioseqInfo) {
4271  _TRACE("No bioseq info for " << ids[i].AsString());
4272  // not loaded and no failure
4273  continue;
4274  }
4275  ret[i].first = m_BioseqCache->Add(*bioseq_task->m_BioseqInfo, ids[i]);
4276  }
4277  else {
4278  CRef<CPSG_Blob_Task> blob_task = Ref(dynamic_cast<CPSG_Blob_Task*>(task.GetNCPointerOrNull()));
4279  _ASSERT(blob_task);
4280  _ASSERT(!ret[i].second);
4281  if (blob_task->GetStatus() == CThreadPool_Task::eFailed) {
4282  _TRACE("Failed to load blob info for " << ids[i].AsString());
4283  errors[i] = true;
4284  continue;
4285  }
4286  if (blob_task->m_Skipped) {
4287  ret[i].second = blob_task->WaitForSkipped().blob_info;
4288  }
4289  }
4290  }
4291 
4292  for (size_t i = 0; i < ids.size(); ++i) {
4293  if (errors[i]) {
4294  counts.second += 1;
4295  continue;
4296  }
4297  if (!ret[i].first) continue;
4298  if (!ret[i].second) {
4299  ret[i].second = x_GetBlobInfo(data_source, ret[i].first->blob_id);
4300  if (!ret[i].second) continue;
4301  }
4302  counts.first += 1;
4303  }
4304  return counts;
4305 }
4306 
4307 
4310 
4311 #endif // HAVE_PSG_LOADER
#define static
User-defined methods of the data storage class.
User-defined methods of the data storage class.
User-defined methods of the data storage class.
EStatus
const string & GetName(void) const
Definition: annot_name.hpp:62
bool IsNamed(void) const
Definition: annot_name.hpp:58
CBioseq_Handle –.
Blob state exceptions, used by GenBank loader.
CTSE_LoadLock GetTSE_LoadLock(const TBlobId &blob_id)
CTSE_LoadLock GetTSE_LoadLockIfLoaded(const TBlobId &blob_id)
CDataLoader * GetDataLoader(void) const
CTSE_LoadLock GetLoadedTSE_Lock(const TBlobId &blob_id, const CDeadline &deadline)
vector< TBlobId > TLoadedBlob_ids
void GetLoadedBlob_ids(const CSeq_id_Handle &idh, TLoadedTypes types, TLoadedBlob_ids &blob_ids) const
CDeadline.
Definition: ncbitime.hpp:1830
CFastMutex –.
Definition: ncbimtx.hpp:667
static TParamTree * GetParamsSubnode(TParamTree *params, const string &subnode_name)
Definition: gbloader.cpp:602
static string GetParam(const TParamTree *params, const string &param_name)
Definition: gbloader.cpp:665
bool GetPSGNoSplit(void) const
Definition: gbloader.hpp:190
const string & GetPSGServiceName(void) const
Definition: gbloader.hpp:173
bool IsSetEnableCDD(void) const
Definition: gbloader.cpp:184
const TParamTree * GetParamTree(void) const
Definition: gbloader.hpp:144
bool IsSetEnableSNP(void) const
Definition: gbloader.cpp:154
const string & GetWebCookie(void) const
Definition: gbloader.hpp:168
bool GetEnableWGS(void) const
Definition: gbloader.cpp:174
bool HasHUPIncluded(void) const
Definition: gbloader.hpp:163
bool GetEnableCDD(void) const
Definition: gbloader.cpp:189
bool GetEnableSNP(void) const
Definition: gbloader.cpp:159
bool IsSetEnableWGS(void) const
Definition: gbloader.cpp:169
CID2S_Chunk –.
Definition: ID2S_Chunk.hpp:66
CID2S_Feat_type_Info –.
CID2S_Seq_annot_Info –.
CID2S_Split_Info –.
double GetTime(int step) const
Definition: incr_time.cpp:75
void Init(CConfig &conf, const string &driver_name, const SAllParams &params)
Definition: incr_time.cpp:43
Data loader exceptions, used by GenBank loader.
CMutex –.
Definition: ncbimtx.hpp:749
CObjectIStream –.
Definition: objistr.hpp:93
map< CSeq_id_Handle, shared_ptr< SPsgAnnotInfo > > TIdMap
CDataLoader::TIds TIds
shared_ptr< SPsgAnnotInfo > Get(const string &name, const CSeq_id_Handle &idh)
shared_ptr< SPsgAnnotInfo > Add(const SPsgAnnotInfo::TInfos &infos, const string &name, const TIds &ids)
map< string, TIdMap > TNameMap
list< shared_ptr< SPsgAnnotInfo > > TInfoQueue
CPSGAnnotCache(int lifespan, size_t max_size)
shared_ptr< SPsgBioseqInfo > Get(const CSeq_id_Handle &idh)
CPSGBioseqCache(int lifespan, size_t max_size)
shared_ptr< SPsgBioseqInfo > Add(const CPSG_BioseqInfo &info, CSeq_id_Handle req_idh)
map< CSeq_id_Handle, shared_ptr< SPsgBioseqInfo > > TIdMap
list< shared_ptr< SPsgBioseqInfo > > TInfoQueue
CPSGBlobMap(int lifespan, size_t max_size)
void DropBlob(const CPsgBlobId &blob_id)
CPSGCDDInfoCache(int lifespan, size_t max_size)
TValue Find(const TKey &key)
void x_Erase(TValueIter iter)
TRemoveList m_RemoveList
list< TValueIter > TRemoveList
CPSGCache_Base(int lifespan, size_t max_size, TValue def_val=TValue(nullptr))
void Add(const TKey &key, const TValue &value)
map< key_type, SNode > TValues
TValues::iterator TValueIter
TRemoveList::iterator TRemoveIter
CPSGCache_Base< TKey, TValue > TParent
unique_ptr< CThreadPool > m_ThreadPool
CDataLoader::TSequenceStates TSequenceStates
void GetBulkIds(const TIds &ids, TLoaded &loaded, TBulkIds &ret)
void x_AdjustBlobState(SPsgBlobInfo &blob_info, const CSeq_id_Handle idh)
void GetCDDAnnotsOnce(CDataSource *data_source, const TSeqIdSets &id_sets, TLoaded &loaded, TCDD_Locks &ret)
CRef< CPsgBlobId > GetBlobId(const CSeq_id_Handle &idh)
void GetBlobsOnce(CDataSource *data_source, TLoadedSeqIds &loaded, TTSE_LockSets &tse_sets)
pair< size_t, size_t > x_GetBulkBioseqAndBlobInfo(CDataSource *data_source, const TIds &ids, const TLoaded &loaded, TBioseqAndBlobInfos &ret)
shared_ptr< SPsgBlobInfo > x_GetBlobInfo(CDataSource *data_source, const string &blob_id)
void GetSequenceStatesOnce(CDataSource *data_source, const TIds &ids, TLoaded &loaded, TSequenceStates &ret)
void GetSequenceTypesOnce(const TIds &ids, TLoaded &loaded, TSequenceTypes &ret)
pair< size_t, size_t > x_GetBulkBioseqInfo(const TIds &ids, const TLoaded &loaded, TBioseqInfos &ret)
void GetAccVers(const TIds &ids, TLoaded &loaded, TIds &ret)
static void SetGetBlobByIdShouldFail(bool value)
CRef< CRequestContext > m_RequestContext
shared_ptr< CPSG_Queue > m_Queue
void GetTaxIdsOnce(const TIds &ids, TLoaded &loaded, TTaxIds &ret)
CDataLoader::TBulkIds TBulkIds
shared_ptr< CPSG_Request_Blob > x_MakeLoadLocalCDDEntryRequest(CDataSource *data_source, CDataLoader::TChunk chunk)
static CObjectIStream * GetBlobDataStream(const CPSG_BlobInfo &blob_info, const CPSG_BlobData &blob_data)
void GetIds(const CSeq_id_Handle &idh, TIds &ids)
CDataLoader::SGiFound GetGi(const CSeq_id_Handle &idh)
void DropTSE(const CPsgBlobId &blob_id)
void LoadChunk(CDataSource *data_source, CTSE_Chunk_Info &chunk_info)
string x_GetCachedBlobId(const CSeq_id_Handle &idh)
bool x_ReadCDDChunk(CDataSource *data_source, CDataLoader::TChunk chunk, const CPSG_BlobInfo &blob_info, const CPSG_BlobData &blob_data)
CTSE_Lock GetBlobById(CDataSource *data_source, const CPsgBlobId &blob_id)
unique_ptr< CPSGAnnotCache > m_AnnotCache
void LoadChunks(CDataSource *data_source, const CDataLoader::TChunkSet &chunks)
int GetSequenceState(CDataSource *data_source, const CSeq_id_Handle &idh)
shared_ptr< SPsgBioseqInfo > x_GetBioseqInfo(const CSeq_id_Handle &idh)
void GetSequenceStates(CDataSource *data_source, const TIds &ids, TLoaded &loaded, TSequenceStates &ret)
void GetBulkIdsOnce(const TIds &ids, TLoaded &loaded, TBulkIds &ret)
CDataLoader::TSequenceHashes TSequenceHashes
CDataLoader::TTSE_LockSet GetRecordsOnce(CDataSource *data_source, const CSeq_id_Handle &idh, CDataLoader::EChoice choice)
CDataLoader::TSeqIdSets TSeqIdSets
TTaxId GetTaxId(const CSeq_id_Handle &idh)
unique_ptr< CPSGCDDInfoCache > m_CDDInfoCache
void GetGisOnce(const TIds &ids, TLoaded &loaded, TGis &ret)
CDataLoader::TTSE_LockSet GetRecords(CDataSource *data_source, const CSeq_id_Handle &idh, CDataLoader::EChoice choice)
unique_ptr< CPSGIpgTaxIdMap > m_IpgTaxIdMap
void GetSequenceHashes(const TIds &ids, TLoaded &loaded, TSequenceHashes &ret, THashKnown &known)
unique_ptr< CPSGBlobMap > m_BlobMap
vector< shared_ptr< SPsgBioseqInfo > > TBioseqInfos
CDataLoader::TSequenceLengths TSequenceLengths
CDataLoader::STypeFound GetSequenceType(const CSeq_id_Handle &idh)
unique_ptr< CPSGBioseqCache > m_BioseqCache
void x_ReadBlobData(const SPsgBlobInfo &psg_blob_info, const CPSG_BlobInfo &blob_info, const CPSG_BlobData &blob_data, CTSE_LoadLock &load_lock, ESplitInfoType split_info_type)
void GetGis(const TIds &ids, TLoaded &loaded, TGis &ret)
CRef< CPsgBlobId > GetBlobIdOnce(const CSeq_id_Handle &idh)
void GetSequenceHashesOnce(const TIds &ids, TLoaded &loaded, TSequenceHashes &ret, THashKnown &known)
CPSG_Request_Biodata::EIncludeData m_TSERequestMode
CPSG_Request_Biodata::EIncludeData m_TSERequestModeBulk
CRef< CPSG_PrefetchCDD_Task > m_CDDPrefetchTask
void GetLabels(const TIds &ids, TLoaded &loaded, TLabels &ret)
SReplyResult x_ProcessBlobReply(shared_ptr< CPSG_Reply > reply, CDataSource *data_source, CSeq_id_Handle req_idh, bool retry, bool lock_asap=false, CTSE_LoadLock *load_lock=nullptr)
CDataLoader::SHashFound GetSequenceHashOnce(const CSeq_id_Handle &idh)
SReplyResult x_RetryBlobRequest(const string &blob_id, CDataSource *data_source, CSeq_id_Handle req_idh)
CTSE_Lock GetBlobByIdOnce(CDataSource *data_source, const CPsgBlobId &blob_id)
CDataLoader::STypeFound GetSequenceTypeOnce(const CSeq_id_Handle &idh)
vector< TBioseqAndBlobInfo > TBioseqAndBlobInfos
shared_ptr< CPSG_Reply > x_SendRequest(shared_ptr< CPSG_Request > request)
void GetSequenceTypes(const TIds &ids, TLoaded &loaded, TSequenceTypes &ret)
CDataLoader::TLabels TLabels
CDataLoader::TSequenceTypes TSequenceTypes
int GetSequenceStateOnce(CDataSource *data_source, const CSeq_id_Handle &idh)
TSeqPos GetSequenceLength(const CSeq_id_Handle &idh)
void GetSequenceLengths(const TIds &ids, TLoaded &loaded, TSequenceLengths &ret)
unsigned int m_BulkRetryCount
CDataLoader::TLoaded TLoaded
void GetLabelsOnce(const TIds &ids, TLoaded &loaded, TLabels &ret)
void x_GetIpgTaxIds(const TIds &ids, TLoaded &loaded, TTaxIds &ret)
CDataLoader::SAccVerFound GetAccVerOnce(const CSeq_id_Handle &idh)
void LoadChunksOnce(CDataSource *data_source, const CDataLoader::TChunkSet &chunks)
bool x_CheckAnnotCache(const string &name, const TIds &ids, CDataSource *data_source, CDataLoader::TProcessedNAs *processed_nas, CDataLoader::TTSE_LockSet &locks)
CDataLoader::TTaxIds TTaxIds
friend class CPSG_Blob_Task
~CPSGDataLoader_Impl(void) override
CDataLoader::TGis TGis
pair< shared_ptr< SPsgBioseqInfo >, shared_ptr< SPsgBlobInfo > > TBioseqAndBlobInfo
TTaxId GetTaxIdOnce(const CSeq_id_Handle &idh)
CDataLoader::SHashFound GetSequenceHash(const CSeq_id_Handle &idh)
void GetSequenceLengthsOnce(const TIds &ids, TLoaded &loaded, TSequenceLengths &ret)
CDataLoader::TTSE_LockSet GetAnnotRecordsNAOnce(CDataSource *data_source, const TIds &ids, const SAnnotSelector *sel, CDataLoader::TProcessedNAs *processed_nas)
std::invoke_result< Call >::type CallWithRetry(Call &&call, const char *name, int retry_count=0)
CDataLoader::THashKnown THashKnown
CIncreasingTime m_WaitTime
void GetBlobs(CDataSource *data_source, TTSE_LockSets &tse_sets)
TSeqPos GetSequenceLengthOnce(const CSeq_id_Handle &idh)
void GetCDDAnnots(CDataSource *data_source, const TSeqIdSets &id_sets, TLoaded &loaded, TCDD_Locks &ret)
static bool GetGetBlobByIdShouldFail()
CDataLoader::TTSE_LockSet GetAnnotRecordsNA(CDataSource *data_source, const TIds &ids, const SAnnotSelector *sel, CDataLoader::TProcessedNAs *processed_nas)
void GetIdsOnce(const CSeq_id_Handle &idh, TIds &ids)
void PrefetchCDD(const TIds &ids)
CDataLoader::TCDD_Locks TCDD_Locks
TTaxId x_GetIpgTaxId(const CSeq_id_Handle &idh)
void GetTaxIds(const TIds &ids, TLoaded &loaded, TTaxIds &ret)
CDataLoader::TIds TIds
void GetAccVersOnce(const TIds &ids, TLoaded &loaded, TIds &ret)
CDataLoader::SAccVerFound GetAccVer(const CSeq_id_Handle &idh)
TBioseqAndBlobInfo x_GetBioseqAndBlobInfo(CDataSource *data_source, const CSeq_id_Handle &idh)
void x_SetLoaded(CTSE_LoadLock &load_lock, EMainChunkType main_chunk_type)
CPSGDataLoader_Impl(const CGBLoaderParams &params)
CDataLoader::SGiFound GetGiOnce(const CSeq_id_Handle &idh)
static CSeq_id::ESNPScaleLimit GetSNP_Scale_Limit(void)
Definition: psg_loader.cpp:158
CPSGIpgTaxIdMap(int lifespan, size_t max_size)
~CPSG_AnnotRecordsCDD_Task(void) override
list< shared_ptr< CPSG_NamedAnnotInfo > > m_AnnotInfo
shared_ptr< CPSG_BioseqInfo > m_BioseqInfo
void ProcessReplyItem(shared_ptr< CPSG_ReplyItem > item) override
void Finish(void) override
shared_ptr< CPSG_NamedAnnotStatus > m_AnnotStatus
CPSG_AnnotRecordsCDD_Task(TReply reply, CPSG_TaskGroup &group)
~CPSG_AnnotRecordsNA_Task(void) override
void ProcessReplyItem(shared_ptr< CPSG_ReplyItem > item) override
CPSG_AnnotRecordsNA_Task(TReply reply, CPSG_TaskGroup &group)
list< shared_ptr< CPSG_NamedAnnotInfo > > m_AnnotInfo
void Finish(void) override
shared_ptr< CPSG_NamedAnnotStatus > m_AnnotStatus
Bio-id (such as accession)
Definition: psg_client.hpp:175
const string & GetId() const
Get ID.
Definition: psg_client.hpp:196
string Repr() const
Get tilde-separated string representation of this bio ID (e.g. for logging)
Definition: psg_client.cpp:190
void Finish(void) override
~CPSG_BioseqInfo_Task(void) override
void ProcessReplyItem(shared_ptr< CPSG_ReplyItem > item) override
CPSG_BioseqInfo_Task(TReply reply, CPSG_TaskGroup &group)
shared_ptr< CPSG_BioseqInfo > m_BioseqInfo
Bio-sequence metainfo – result of the bio-id resolution.
Definition: psg_client.hpp:859
CPSG_BioId GetCanonicalId() const
Get canonical bio-id for the bioseq (usually "accession.version")
TState GetState() const
State of this exact bio-sequence's seq-id.
TState GetChainState() const
State of the bio-sequence's seq-id chain, i.e.
objects::CSeq_inst::TMol GetMoleculeType() const
The bioseq's molecule type (DNA, RNA, protein, etc)
TGi GetGi() const
Get GI.
CPSG_BioIds GetOtherIds() const
Get non-canonical bio-ids (aliases) for the bioseq.
TTaxId GetTaxId() const
Get the bioseq's taxonomy ID.
CPSG_Request_Resolve::TIncludeInfo IncludedInfo() const
What data is immediately available now.
Uint8 GetLength() const
Length of bio-sequence.
CPSG_BlobId GetBlobId() const
Get coordinates of the TSE blob that contains the bioseq itself.
int GetHash() const
Get the bioseq's (pre-calculated) hash.
Blob data.
Definition: psg_client.hpp:718
istream & GetStream() const
Get the stream from which to read the item's content.
Definition: psg_client.hpp:726
Blob unique ID.
Definition: psg_client.hpp:226
const string & GetId() const
Get ID.
Definition: psg_client.hpp:250
Blob data meta information.
Definition: psg_client.hpp:742
const TDataId * GetId() const
Get data ID.
Definition: psg_client.hpp:746
bool IsSuppressed() const
Return TRUE if the blob data is "suppressed".
bool IsWithdrawn() const
Return TRUE if the blob data is "withdrawn".
string GetFormat() const
Get data serialization format: asn.1, asn1-text, json, xml, ...
string GetCompression() const
Get data compression algorithm: gzip, bzip2, zip, compress, nlmzip, ...
bool IsDead() const
Return TRUE if the blob data is "dead".
string GetId2Info() const
Get ID2 info.
shared_ptr< CPSG_SkippedBlob > m_Skipped
CSeq_id_Handle m_Id
unique_ptr< CDeadline > m_SkippedWaitDeadline
static bool IsChunk(const CPSG_DataId *id)
CPSGDataLoader_Impl & m_Loader
void CreateLoadedChunks(CTSE_LoadLock &load_lock)
CDataSource * m_DataSource
const TBlobSlot * GetChunkSlot(const string &id2_info, TChunkId chunk_id) const
TChunkBlobMap m_ChunkBlobMap
void ProcessReplyItem(shared_ptr< CPSG_ReplyItem > item) override
bool GotBlobData(const string &psg_blob_id) const
const TBlobSlot * GetTSESlot(const string &psg_id) const
unique_ptr< CDeadline > GetWaitDeadline(const CPSG_SkippedBlob &skipped) const
~CPSG_Blob_Task(void) override
TTSEBlobMap m_TSEBlobMap
CDataLoader::TBlobId GetDLBlobId(const string &psg_blob_id) const
TBlobSlot * SetBlobSlot(const CPSG_DataId &id)
map< string, CDataLoader::TBlobId > m_BlobIdMap
const TBlobSlot * GetBlobSlot(const CPSG_DataId &id) const
map< string, map< TChunkId, TBlobSlot > > TChunkBlobMap
void SetDLBlobId(const string &psg_blob_id, CDataLoader::TBlobId dl_blob_id)
CPSG_Blob_Task(TReply reply, CPSG_TaskGroup &group, const CSeq_id_Handle &idh, CDataSource *data_source, CPSGDataLoader_Impl &loader, bool lock_asap=false, CTSE_LoadLock *load_lock_ptr=nullptr)
CTSE_LoadLock & GetLoadLock() const
pair< shared_ptr< CPSG_BlobInfo >, shared_ptr< CPSG_BlobData > > TBlobSlot
CPSGDataLoader_Impl::SReplyResult WaitForSkipped(void)
void Finish(void) override
map< string, TBlobSlot > TTSEBlobMap
CPSGDataLoader_Impl::SReplyResult m_ReplyResult
void DoExecute(void) override
static const char * GetSkippedType(const CPSG_SkippedBlob &skipped)
CTSE_LoadLock * m_LoadLockPtr
void ProcessReplyItem(shared_ptr< CPSG_ReplyItem > item) override
~CPSG_CDDAnnotBulk_Task(void) override
shared_ptr< CPSG_NamedAnnotInfo > m_AnnotInfo
shared_ptr< CPSG_BlobData > m_BlobData
shared_ptr< CPSG_NamedAnnotStatus > m_AnnotStatus
void Finish(void) override
shared_ptr< CPSG_BlobInfo > m_BlobInfo
CPSG_CDDAnnotBulk_Task(TReply reply, CPSG_TaskGroup &group, size_t idx)
Chunk unique ID.
Definition: psg_client.hpp:265
Blob data unique ID.
Definition: psg_client.hpp:213
~CPSG_IpgTaxId_Task(void) override
CPSG_IpgTaxId_Task(size_t idx, bool is_wp_acc, TReply reply, CPSG_TaskGroup &group)
void Finish(void) override
void ProcessReplyItem(shared_ptr< CPSG_ReplyItem > item) override
shared_ptr< CPSG_BlobInfo > m_BlobInfo
CPSG_LoadChunk_Task(TReply reply, CPSG_TaskGroup &group, CDataLoader::TChunk chunk)
void Finish(void) override
void DoExecute(void) override
CDataLoader::TChunk m_Chunk
~CPSG_LoadChunk_Task(void) override
shared_ptr< CPSG_BlobData > m_BlobData
void ProcessReplyItem(shared_ptr< CPSG_ReplyItem > item) override
Named Annotations (NAs) metainfo – reply to CPSG_Request_NamedAnnotInfo.
Definition: psg_client.hpp:929
CPSG_BlobId GetBlobId() const
Coordinates of the blob that contains the NA data.
TId2AnnotInfoList GetId2AnnotInfoList() const
Named Annotations (NAs) status – reply to CPSG_Request_NamedAnnotInfo.
Definition: psg_client.hpp:963
TId2AnnotStatusList GetId2AnnotStatusList() const
EStatus Execute(void) override
Do the actual job.
CPSGDataLoader_Impl & m_Loader
~CPSG_PrefetchCDD_Task(void) override
void AddRequest(const CDataLoader::TIds &ids)
Definitio