NCBI C++ ToolKit
streamiter.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef STREAMITER__HPP
2 #define STREAMITER__HPP
3 
4 /* $Id: streamiter.hpp 100702 2023-08-31 19:27:05Z lavr $
5 * ===========================================================================
6 *
7 * PUBLIC DOMAIN NOTICE
8 * National Center for Biotechnology Information
9 *
10 * This software/database is a "United States Government Work" under the
11 * terms of the United States Copyright Act. It was written as part of
12 * the author's official duties as a United States Government employee and
13 * thus cannot be copyrighted. This software/database is freely available
14 * to the public for use. The National Library of Medicine and the U.S.
15 * Government have not placed any restriction on its use or reproduction.
16 *
17 * Although all reasonable efforts have been taken to ensure the accuracy
18 * and reliability of the software and data, the NLM and the U.S.
19 * Government do not and cannot warrant the performance or results that
20 * may be obtained by using this software or data. The NLM and the U.S.
21 * Government disclaim all warranties, express or implied, including
22 * warranties of performance, merchantability or fitness for any particular
23 * purpose.
24 *
25 * Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Authors: Andrei Gourianov, Alexander Astashyn
30 *
31 * File Description:
32 * Input stream iterators
33 * Please note:
34 * This API requires multi-threading
35 */
36 
37 #include <corelib/ncbistd.hpp>
38 #include <corelib/ncbithr.hpp>
39 #include <serial/objistr.hpp>
40 #include <serial/objectio.hpp>
41 
42 #include <queue>
43 #include <future>
44 #include <thread>
45 #include <mutex>
46 #include <condition_variable>
47 
48 
49 /** @addtogroup ObjStreamSupport
50  *
51  * @{
52  */
53 
54 
56 
57 #if defined(NCBI_THREADS)
58 
60 template<typename TRoot>
61 TMemberIndex xxx_MemberIndex(const string& mem_name);
62 }
63 template<typename...>
65 
66 /////////////////////////////////////////////////////////////////////////////
67 /// CObjectIStreamIterator
68 ///
69 /// Synchronously read multiple same-type data objects from an input stream
70 /// with optional filtering.
71 /// @sa CObjectIStreamAsyncIterator
72 ///
73 /// The algorithm assumes that the input stream on its top level consists
74 /// exclusively of one or more serial objects of type TRoot.
75 ///
76 /// There are two flavors of this template:
77 /// - CObjectIStreamIterator<TRoot> - iterate through the top-level serial
78 /// objects of type TRoot.
79 /// - CObjectIStreamIterator<TRoot, TChild> - iterate through serial objects
80 /// of type TChild which are contained within the top-level serial objects
81 /// of type TRoot.
82 ///
83 /// Usage:
84 /// @code
85 ///
86 /// CObjectIStream istr ....;
87 ///
88 /// for (CSeq_entry& obj : CObjectIStreamIterator<CSeq_entry>(istr)) {
89 /// // ...do something with "obj" here...
90 /// }
91 ///
92 /// for (CBioseq& obj : CObjectIStreamIterator<CSeq_entry,CBioseq>(istr)) {
93 /// // ...do something with "obj" here...
94 /// }
95 ///
96 ///
97 /// CObjectIStreamIterator<CSeq_entry> it(istr);
98 /// CObjectIStreamIterator<CSeq_entry> eos;
99 /// for_each (it, eos, [](CSeq_entry& obj) { ... });
100 ///
101 /// CObjectIStreamIterator<CSeq_entry,CBioseq> it(istr);
102 /// CObjectIStreamIterator<CSeq_entry,CBioseq> eos;
103 /// for_each (it, eos, [](CBioseq& obj) { ... });
104 ///
105 ///
106 /// for (CObjectIStreamIterator<CSeq_entry> it(istr); it.IsValid(); ++it) {
107 /// CSeq_entry& obj = *it;
108 /// // ...do something with "obj" here...
109 /// }
110 ///
111 /// for (CObjectIStreamIterator<CSeq_entry,CBioseq> it(istr);
112 /// it.IsValid(); ++it) {
113 /// CRef<CBioseq> obj(&*it);
114 /// // ...do something with "obj" here...
115 /// }
116 ///
117 /// for (CObjectIStreamIterator<CSeq_entry, string> it(istr);
118 /// it.IsValid(); ++it) {
119 /// string& obj = *it;
120 /// }
121 ///
122 /// with filtering (only CTaxon1_data objects with optional 'org' member set are valid):
123 /// CObjectIStreamIterator<CTaxon1_data> i(istr, eNoOwnership,
124 /// CObjectIStreamIterator<CTaxon1_data>::CParams().FilterByMember("org",
125 /// [](const CObjectIStream& istr, CTaxon1_data& obj,
126 /// TMemberIndex mem_index, CObjectInfo* mem, void* extra)->bool {
127 /// return mem != nullptr;
128 /// }));
129 ///
130 /// @endcode
131 ///
132 /// @attention
133 /// Input iterators only guarantee validity for single pass algorithms:
134 /// once an input iterator has been incremented, all copies of its previous
135 /// value may be invalidated. It is still possible to keep data objects
136 /// for future use by placing them into CRef containers, when applicable.
137 
138 template<typename...>
140 {
141 public:
142 
143  /// Object member filtering function
144  ///
145  /// @param istr
146  /// Serial object stream
147  /// @param obj
148  /// Object being checked. It is being populated and is incomplete.
149  /// @param mem_index
150  /// Member index
151  /// @param mem
152  /// Member information. If mem is nullptr, the member is missing in the stream.
153  /// @param extra
154  /// Extra information provided by the caller when constructing iterator.
155  ///
156  /// @attention
157  /// When using filtering with CObjectIStreamAsyncIterator, please note
158  /// that the function may be called from different threads.
159  /// Synchronization of access to shared data, if required, is the responsibility of the client.
160  template<typename TObj>
161  using FMemberFilter = function<bool(const CObjectIStream& istr, TObj& obj,
162  TMemberIndex mem_index, CObjectInfo* mem,
163  void* extra)>;
164  /// Filtering parameters
165  template<typename TObj>
166  class CParams
167  {
168  public:
169  CParams(void)
170  : m_Index(kInvalidMember)
171  , m_FnFilter(nullptr)
172  , m_Extra(nullptr) {
173  }
174  /// Filter by member index
175  CParams& FilterByMember(TMemberIndex index, FMemberFilter<TObj> fn, void* extra = nullptr) {
176  m_Index = index; m_FnFilter = fn; m_Extra = extra; return *this;
177  }
178  /// Filter by member name
179  CParams& FilterByMember(const string& mem_name, FMemberFilter<TObj> fn, void* extra = nullptr) {
180  m_Index = ns_ObjectIStreamFilterIterator::xxx_MemberIndex<TObj>(mem_name);
181  m_FnFilter = fn; m_Extra = extra; return *this;
182  }
183 
184  private:
185 // void xxx_MemberIndex(const string& mem_name);
188  void* m_Extra;
189  template<typename...> friend class CObjectIStreamIterator;
190  template<typename...> friend class CObjectIStreamAsyncIterator;
191  };
192 
193  /// Construct iterator upon an object serialization stream
194  ///
195  /// @param istr
196  /// Serial object stream
197  /// @param own_istr
198  /// eTakeOwnership means that the input stream will be deleted
199  /// automatically when the iterator gets destroyed
200  /// @param params
201  /// Filtering parameters (default is no filtering)
202  template<typename TObj>
204  EOwnership deleteInStream = eNoOwnership,
205  const CParams<TObj>& params = CParams<TObj>()) = delete;
206 
207  /// Construct end-of-stream (invalid) iterator
208  /// @sa IsValid()
209  CObjectIStreamIterator(void) = delete;
210 
211  // Copy-ctor and assignment
214 
215  /// Advance to the next data object
217 
218  // Comparison
219  bool operator==(const CObjectIStreamIterator&) const;
220  bool operator!=(const CObjectIStreamIterator&) const;
221 
222  /// Check whether the iterator points to a data
223  /// TRUE if the iterator is constructed upon a serialization stream AND
224  /// if it's not end-of-stream or error-in-stream
225  bool IsValid(void) const;
226 
227  /// Return the underlying serial object stream
228  const CObjectIStream& GetObjectIStream(void) const;
229 
230  /// Return data object which is currently pointed to by the iterator.
231  /// Throw an exception is the iterator does not point to a data, i.e.
232  /// if IsValid() is FALSE.
233  template<typename TObj> TObj& operator*();
234 
235  /// Return pointer to data object which is currently pointed to by the
236  /// iterator.
237  /// Return NULL is the iterator does not point to a data, i.e.
238  /// if IsValid() is FALSE.
239  template<typename TObj> TObj* operator->();
240 
241  /// Return self
243 
244  /// Construct and return end-of-stream iterator
246 
247  // dtor
249 };
250 
251 
252 /////////////////////////////////////////////////////////////////////////////
253 /// CObjectIStreamAsyncIterator
254 ///
255 /// Asynchronously read multiple same-type data objects from an input stream
256 /// with optional filtering
257 /// @sa CObjectIStreamIterator
258 ///
259 /// The algorithm assumes that the input stream on its top level consists
260 /// exclusively of one or more serial objects of type TRoot.
261 ///
262 /// There are two flavors of this template:
263 /// - CObjectIStreamAsyncIterator<TRoot> - iterate through the top-level
264 /// serial objects of type TRoot.
265 /// - CObjectIStreamAsyncIterator<TRoot, TChild> - iterate through serial
266 /// objects of type TChild which are contained within the top-level serial
267 /// objects of type TRoot.
268 ///
269 /// @attention
270 /// This iterator supports only the TChild types that are derived from
271 /// CSerialObject class
272 ///
273 /// Usage:
274 /// @code
275 ///
276 /// CObjectIStream istr ....;
277 ///
278 /// for (CSeq_entry& obj : CObjectIStreamAsyncIterator<CSeq_entry>(istr))
279 /// {
280 /// // ...do something with "obj" here...
281 /// }
282 ///
283 /// for (CBioseq& obj : CObjectIStreamAsyncIterator<CSeq_entry,CBioseq>(istr))
284 /// {
285 /// // ...do something with "obj" here...
286 /// }
287 ///
288 ///
289 /// CObjectIStreamAsyncIterator<CSeq_entry> it(istr);
290 /// CObjectIStreamAsyncIterator<CSeq_entry> eos;
291 /// for_each (it, eos, [](CSeq_entry& obj) { ... });
292 ///
293 /// CObjectIStreamAsyncIterator<CSeq_entry,CBioseq> it(istr);
294 /// CObjectIStreamAsyncIterator<CSeq_entry,CBioseq> eos;
295 /// for_each (it, eos, [](CBioseq& obj) { ... });
296 ///
297 ///
298 /// for (CObjectIStreamAsyncIterator<CSeq_entry> it(istr);
299 /// it.IsValid(); ++it) {
300 /// CSeq_entry& obj = *it;
301 /// // ...do something with "obj" here...
302 /// }
303 ///
304 /// for (CObjectIStreamAsyncIterator<CSeq_entry,CBioseq> it(istr);
305 /// it.IsValid(); ++it) {
306 /// CRef<CBioseq> obj(&*it);
307 /// // ...do something with "obj" here...
308 /// }
309 ///
310 /// @endcode
311 ///
312 /// To speed up reading, the iterator offloads data reading, pre-parsing and
313 /// parsing into separate threads. If the data stream contains numerous TRoot
314 /// data records CObjectIStreamAsyncIterator can give up to 2-4 times speed-up
315 /// (wall-clock wise) comparing to the synchronous processing (such as with
316 /// CObjectIStreamIterator) of the same data.
317 ///
318 /// The reader has to read the whole object into memory. If such objects are
319 /// relatively small, then there will be several objects read into a single
320 /// buffer, which is good. If data object is big it still goes into a single
321 /// buffer no matter how big the object is.
322 /// To limit memory consumption, use MaxTotalRawSize parameter.
323 ///
324 /// The iterator does its job asynchronously. It starts working immediately
325 /// after its creation and stops only when it is destroyed.
326 /// Even if you do not use it, it still works in the background, reading and
327 /// parsing the data.
328 ///
329 /// @attention
330 /// Input iterators only guarantee validity for single pass algorithms:
331 /// once an input iterator has been incremented, all copies of its previous
332 /// value may be invalidated. It is still possible to keep data objects
333 /// for future use by placing them into CRef containers, when applicable.
334 
335 template<typename...>
337 {
338 public:
339 
340  /// Asynchronous parsing parameters
341  template<typename TObj>
343  {
344  public:
346  template<typename TR>
348 
349  CParams(void)
350  : m_ThreadPolicy(launch::async)
351  , m_MaxParserThreads (16)
352  , m_MaxTotalRawSize (16 * 1024 * 1024)
353  , m_MinRawBufferSize (128 * 1024)
354  , m_SameThread(false) {
355  }
356 
357  /// Filter by member index
358  CParams& FilterByMember(TMemberIndex index, FMemberFilter<TObj> fn, void* extra = nullptr) {
359  CParent::FilterByMember(index, fn, extra); return *this;
360  }
361 
362  /// Filter by member name
363  CParams& FilterByMember(const string& mem_name, FMemberFilter<TObj> fn, void* extra = nullptr) {
364  CParent::FilterByMember(mem_name, fn, extra); return *this;
365  }
366 
367  /// Parsing thread launch policy
368  CParams& LaunchPolicy(launch policy) {
369  m_ThreadPolicy = policy; return *this;
370  }
371 
372  /// Maximum number of parsing threads
373  CParams& MaxParserThreads(unsigned max_parser_threads) {
374  m_MaxParserThreads = max_parser_threads; return *this;
375  }
376 
377  /// Total size of raw data buffers is allowed to grow to this value
378  CParams& MaxTotalRawSize(size_t max_total_raw_size) {
379  m_MaxTotalRawSize = max_total_raw_size; return *this;
380  }
381 
382  /// Single raw data memory buffer size should be at least this big
383  CParams& MinRawBufferSize(size_t min_raw_buffer_size) {
384  m_MinRawBufferSize = min_raw_buffer_size; return *this;
385  }
386 
387  /// Raw data read and its pre-parsing (storing the raw data pertaining
388  /// to a single object and putting it into the parsing queue) to be
389  /// done in the same thread.
390  /// @note
391  /// The default is to do these two tasks in two separate threads,
392  /// which in some cases can give an additional 10-20% performance
393  /// boost, wall-clock time wise.
394  CParams& ReadAndSkipInTheSameThread(bool same_thread) {
395  m_SameThread = same_thread; return *this;
396  }
397 
398  private:
404 
405  template<typename...> friend class CObjectIStreamAsyncIterator;
406  };
407 
408 
409  /// Construct iterator upon an object serialization stream
410  ///
411  /// @param istr
412  /// Serial object stream
413  /// @param own_istr
414  /// eTakeOwnership means that the input stream will be deleted
415  /// automatically when the iterator gets destroyed
416  /// @param params
417  /// Parsing algorithm's parameters
418  /// @param params
419  /// Filtering and parsing parameters (default is no filtering)
420  template<typename TObj>
422  EOwnership own_istr = eNoOwnership,
423  const CParams<TObj>& params = CParams<TObj>()) = delete;
424 
425  /// Construct end-of-stream (invalid) iterator
426  /// @sa IsValid()
428 
429  // Copy-ctor and assignment
432 
433  /// Advance to the next data object
435 
436  // Comparison
439 
440  /// Check whether the iterator points to a data
441  /// TRUE if the iterator is constructed upon a serialization stream AND
442  /// if it's not end-of-stream or error-in-stream
443  bool IsValid(void) const;
444 
445  /// Return data object which is currently pointed to by the iterator.
446  /// Throw an exception is the iterator does not point to a data, i.e.
447  /// if IsValid() is FALSE.
448  template<typename TObj> TObj& operator*();
449 
450  /// Return pointer to data object which is currently pointed to by the
451  /// iterator.
452  /// Return NULL is the iterator does not point to a data, i.e.
453  /// if IsValid() is FALSE.
454  template<typename TObj> TObj* operator->();
455 
456  /// Return self
458 
459  /// Construct and return end-of-stream iterator
461 
462  // dtor
464 };
465 
466 
467 
468 /////////////////////////////////////////////////////////////////////////////
469 /////////////////////////////////////////////////////////////////////////////
470 /// template specializations and implementation
471 
472 /////////////////////////////////////////////////////////////////////////////
473 /// CObjectIStreamIterator<TRoot>
474 
475 template<typename TRoot>
477 {
478 public:
479  using iterator_category = input_iterator_tag;
480  using value_type = TRoot;
481  using difference_type = ptrdiff_t;
482  using pointer = TRoot*;
483  using reference = TRoot&;
485 
487  EOwnership deleteInStream = eNoOwnership,
488  const CParams& params = CParams());
489 
494 
495  CObjectIStreamIterator& operator++(void);
496  bool operator==(const CObjectIStreamIterator&) const;
497  bool operator!=(const CObjectIStreamIterator&) const;
498  bool IsValid(void) const;
499  const CObjectIStream& GetObjectIStream(void) const;
500 
501  TRoot& operator*();
502  TRoot* operator->();
503 
504  CObjectIStreamIterator& begin(void);
505  CObjectIStreamIterator end(void);
506 
507 protected:
508  struct CData
509  {
510  CData(CObjectIStream& istr, EOwnership deleteInStream,
511  const CParams& params, TTypeInfo tinfo);
512  ~CData(void);
513 
514  void x_BeginRead(void);
515  void x_EndRead(void);
516  void x_AcceptData(CObjectIStream& in, const CObjectInfo& type);
517  void x_Next(void);
518  bool x_NextNoFilter(const CObjectInfo& objinfo);
519  bool x_NextSeqWithFilter(const CObjectInfo& objinfo);
520  bool x_NextChoiceWithFilter(const CObjectInfo& objinfo);
521  bool x_NextContainerWithFilter(const CObjectInfo& objinfo);
522 
531  condition_variable m_ReaderCv;
532  thread m_Reader;
533  exception_ptr m_ReaderExpt;
534  enum EFilter {
543  eAllContainer
544  } m_FilterType;
545 
546  template<typename TR>
547  class x_CObjectIStreamIteratorHook : public CSkipObjectHook
548  {
549  public:
551  typename CObjectIStreamIterator<TR>::CData* pthis)
552  : m_This(pthis) {
553  }
554  virtual void SkipObject(CObjectIStream& in, const CObjectTypeInfo& type) override {
555  m_This->x_AcceptData(in,CObjectInfo(type.GetTypeInfo()));
556  }
557  private:
559  };
560  };
561 
563  const CParams& params, EOwnership deleteInStream);
564  void x_ReaderThread(void);
565  shared_ptr<CData> m_Data;
566 };
567 
568 /////////////////////////////////////////////////////////////////////////////
569 /// CObjectIStreamIterator<TRoot, TChild>
570 
571 template<typename TRoot, typename TChild>
572 class CObjectIStreamIterator<TRoot, TChild>
573  : public CObjectIStreamIterator<TChild>
574 {
575 public:
577 
579  EOwnership deleteInStream = eNoOwnership,
580  const CParams& params = CParams());
581 
586 
590 
591 protected:
593  void x_ReaderThread(void);
594 
595  template<typename TR>
596  class x_CObjectIStreamIteratorReadHook : public CReadObjectHook
597  {
598  public:
600  typename CObjectIStreamIterator<TR>::CData* pthis)
601  : m_This(pthis) {
602  }
603  virtual void ReadObject(CObjectIStream& in, const CObjectInfo& type) override {
604  m_This->x_AcceptData(in,type);
605  }
606  private:
608  };
609 };
610 
611 
612 /////////////////////////////////////////////////////////////////////////////
613 // helpers
614 
616 
617 template<typename TRoot>
620 {
621  return TRoot::GetTypeInfo();
622 }
623 
624 template<typename TRoot>
625 //typename enable_if< !is_base_of< CSerialObject, TRoot>::value, TTypeInfo>::type
628 {
630 }
631 
632 template<typename TRoot>
633 TMemberIndex xxx_MemberIndex(const string& mem_name)
634 {
635  TTypeInfo tinfo = xxx_GetTypeInfo<TRoot>();
636  ETypeFamily type = tinfo->GetTypeFamily();
639  return cinfo->GetItems().Find(mem_name);
640  }
641  return kInvalidMember;
642 }
643 
644 } // ns_ObjectIStreamFilterIterator
645 
646 #if 0
647 template<typename...>
648 template<typename TObj>
649 void
651  TTypeInfo tinfo = ns_ObjectIStreamFilterIterator::xxx_GetTypeInfo<TObj>();
652  ETypeFamily type = tinfo->GetTypeFamily();
655  return cinfo->GetItems().Find(mem_name);
656  }
657  return kInvalidMember;
658 }
659 #endif
660 
661 
662 /////////////////////////////////////////////////////////////////////////////
663 /// CObjectIStreamIterator<TRoot> implementation
664 
665 template<typename TRoot>
667  : m_Data(nullptr) {
668 }
669 
670 template<typename TRoot>
672  CObjectIStream& istr, const CParams& params, EOwnership deleteInStream)
673  : m_Data( new CData(istr, deleteInStream, params,
675 }
676 
677 template<typename TRoot>
679  CObjectIStream& istr, EOwnership deleteInStream, const CParams& params)
680  : CObjectIStreamIterator(istr, params, deleteInStream)
681 {
682  if (m_Data->m_FilterType != CData::eNone && !m_Data->m_EndOfData) {
683  m_Data->m_HasReader = true;
684  m_Data->m_Reader = thread([this](){x_ReaderThread();});
685  }
686  ++(*this);
687 }
688 
689 template<typename TRoot>
691  const CObjectIStreamIterator& v) : m_Data(v.m_Data) {
692 }
693 
694 template<typename TRoot>
697  m_Data = v.m_Data;
698  return *this;
699 }
700 
701 template<typename TRoot>
703 }
704 
705 template<typename TRoot>
707  CObjectIStream& istr, EOwnership deleteInStream,
708  const CParams& params, TTypeInfo tinfo)
709  : m_Istr(&istr), m_Own(deleteInStream)
710  , m_ValueType(tinfo), m_Value(tinfo), m_HasReader(false)
711  , m_EndOfData(m_Istr->EndOfData()), m_Params(params)
712 {
713  ETypeFamily type = tinfo->GetTypeFamily();
715  m_Params.m_FnFilter = nullptr;
716  }
717  m_FilterType = eNone;
718  if (m_Params.m_FnFilter) {
719  if (type == eTypeFamilyClass) {
721  if (cinfo->Implicit()) {
722  const CItemInfo* itemInfo =
723  cinfo->GetItems().GetItemInfo(cinfo->GetItems().FirstIndex());
724  if (itemInfo->GetTypeInfo()->GetTypeFamily() == eTypeFamilyContainer) {
725  m_FilterType = m_Params.m_Index != kInvalidMember ? eOneContainer : eAllContainer;
726  }
727  }
728  if (m_FilterType == eNone) {
729  bool is_random = cinfo->RandomOrder();
730  if (m_Params.m_Index != kInvalidMember) {
731  m_FilterType = is_random ? eOneRandom : eOneSeq;
732  } else {
733  m_FilterType = is_random ? eAllRandom : eAllSeq;
734  }
735  }
736  } else if (type == eTypeFamilyChoice) {
737  m_FilterType = m_Params.m_Index != kInvalidMember ? eOneChoice : eAllChoice;
738  } else if (type == eTypeFamilyContainer) {
739  m_FilterType = m_Params.m_Index != kInvalidMember ? eOneContainer : eAllContainer;
740  }
741  }
742 }
743 
744 template<typename TRoot>
746  if (m_Reader.joinable()) {
747  m_EndOfData = true;
748  m_ReaderCv.notify_all();
749  m_Reader.join();
750  }
751  if (m_Istr && m_Own == eTakeOwnership) {
752  delete m_Istr;
753  }
754 }
755 
756 template<typename TRoot>
757 void
759  unique_lock<mutex> lck( m_ReaderMutex);
760  while (m_Value.GetObjectPtr() != nullptr) {
761  m_ReaderCv.wait(lck);
762  }
763 }
764 
765 template<typename TRoot>
766 void
768  m_Value = CObjectInfo();
769  m_EndOfData = true;
770  m_ReaderCv.notify_one();
771 }
772 
773 template<typename TRoot>
774 void
776  m_Data->x_BeginRead();
777  try {
778  m_Data->m_ValueType.SetLocalSkipHook(*(m_Data->m_Istr), new typename CData::template x_CObjectIStreamIteratorHook<TRoot>(m_Data.get()));
779  while (Serial_FilterSkip(*(m_Data->m_Istr),m_Data->m_ValueType))
780  ;
781  } catch (...) {
782  if (!m_Data->m_EndOfData) {
783  m_Data->m_ReaderExpt = current_exception();
784  }
785  }
786  m_Data->x_EndRead();
787 }
788 
789 template<typename TRoot, typename TChild>
790 void
792  this->m_Data->x_BeginRead();
793  try {
794  this->m_Data->m_ValueType.SetLocalSkipHook(*(this->m_Data->m_Istr), new typename CParent::CData::template x_CObjectIStreamIteratorHook<TChild>(this->m_Data.get()));
795  this->m_Data->m_ValueType.SetLocalReadHook(*(this->m_Data->m_Istr), new x_CObjectIStreamIteratorReadHook<TChild>(this->m_Data.get()));
796  while (Serial_FilterSkip(*(this->m_Data->m_Istr),CObjectTypeInfo(CType<TRoot>())))
797  ;
798  } catch (...) {
799  if (!this->m_Data->m_EndOfData) {
800  this->m_Data->m_ReaderExpt = current_exception();
801  }
802  }
803  this->m_Data->x_EndRead();
804 }
805 
806 template<typename TRoot>
807 void
809  CObjectIStream& in, const CObjectInfo& objinfo)
810 {
811  if (m_Istr->EndOfData()) {
812  m_EndOfData = true;
813  } else {
814  bool res = false;
815  switch ( m_FilterType) {
816  default:
817  case eNone:
818  res = x_NextNoFilter(objinfo);
819  break;
820  case eOneSeq:
821  case eOneRandom:
822  case eAllSeq:
823  case eAllRandom:
824  res = x_NextSeqWithFilter(objinfo);
825  break;
826  case eOneChoice:
827  case eAllChoice:
828  res = x_NextChoiceWithFilter(objinfo);
829  break;
830  case eOneContainer:
831  case eAllContainer:
832  res = x_NextContainerWithFilter(objinfo);
833  break;
834  }
835  if (res) {
836  unique_lock<mutex> lck(m_ReaderMutex);
837  m_Value = objinfo;
838  m_ReaderCv.notify_one();
839  while (m_Value.GetObjectPtr() != nullptr) {
840  if (m_EndOfData) {
841  NCBI_THROW( CEofException, eEof,
842  "CObjectIStreamIterator: abort data parsing");
843  }
844  m_ReaderCv.wait(lck);
845  }
846  } else {
847  in.SetDiscardCurrObject();
848  }
849  }
850 }
851 
852 template<typename TRoot>
853 void
855  unique_lock<mutex> lck(m_ReaderMutex);
856  m_Value = CObjectInfo();
857  m_ReaderCv.notify_one();
858  while (m_Value.GetObjectPtr() == nullptr && !m_EndOfData) {
859  m_ReaderCv.wait(lck);
860  }
861  if (m_ReaderExpt) {
862  rethrow_exception(m_ReaderExpt);
863  }
864 }
865 
866 template<typename TRoot>
867 bool
869 {
870  objinfo.GetTypeInfo()->DefaultReadData(*m_Istr, objinfo.GetObjectPtr());
871  return true;
872 }
873 
874 template<typename TRoot>
875 bool
877 {
880  bool checked = false;
881  bool valid = true;
882  TRoot& obj = *CTypeConverter<TRoot>::SafeCast(objinfo.GetObjectPtr());
883 
884  for ( CIStreamClassMemberIterator i(*m_Istr, objinfo); i; ++i ) {
885 
886  TMemberIndex mi_now = (*i).GetMemberIndex();
887  CObjectInfoMI minfo(objinfo, mi_now);
888 
889  if (valid) {
890 // before read - validate missing members
891  switch (m_FilterType) {
892  case eOneRandom:
893  case eAllRandom:
894  default:
895  break;
896  case eOneSeq:
897  if (mi_now > m_Params.m_Index && !checked) {
898  valid = m_Params.m_FnFilter( *m_Istr, obj, m_Params.m_Index, nullptr, m_Params.m_Extra);
899  checked = true;
900  }
901  break;
902  case eAllSeq:
903  for (++mi; valid && mi < mi_now; ++mi) {
904  valid = m_Params.m_FnFilter( *m_Istr, obj, mi, nullptr, m_Params.m_Extra);
905  }
906  break;
907  }
908  }
909 
910 // if still valid
911  if (valid) {
912 // read next member
913  i.ReadClassMember(objinfo);
914 
915 // after read - validate member
916  switch (m_FilterType) {
917  default: break;
918  case eOneSeq:
919  case eOneRandom:
920  if (mi_now == m_Params.m_Index) {
922  minfo.GetMember().GetPointedObject() : minfo.GetMember();
923  valid = m_Params.m_FnFilter( *m_Istr, obj, mi_now, &oi, m_Params.m_Extra);
924  checked = true;
925  }
926  break;
927  case eAllRandom:
928  done.insert(mi_now);
929  // no break
931  case eAllSeq:
932  {
934  minfo.GetMember().GetPointedObject() : minfo.GetMember();
935  valid = m_Params.m_FnFilter( *m_Istr, obj, mi_now, &oi, m_Params.m_Extra);
936  }
937  break;
938  }
939  } else {
940 // object invalid - skip remaining members
941  i.SkipClassMember();
942  }
943  mi = mi_now;
944  }
945 
946 // finally - validate missing members
947  if (valid) {
948  switch (m_FilterType) {
949  default: break;
950  case eOneSeq:
951  case eOneRandom:
952  if (!checked) {
953  valid = m_Params.m_FnFilter( *m_Istr, obj, m_Params.m_Index, nullptr, m_Params.m_Extra);
954  }
955  break;
956  case eAllSeq:
957  {
958  TMemberIndex mi_last = objinfo.GetClassTypeInfo()->GetItems().LastIndex() + 1;
959  for (++mi; valid && mi < mi_last; ++mi) {
960  valid = m_Params.m_FnFilter( *m_Istr, obj, mi, nullptr, m_Params.m_Extra);
961  }
962  }
963  break;
964  case eAllRandom:
965  {
966  mi = objinfo.GetClassTypeInfo()->GetItems().FirstIndex();
967  TMemberIndex mi_last = objinfo.GetClassTypeInfo()->GetItems().LastIndex() + 1;
968  for (; valid && mi < mi_last; ++mi) {
969  if (done.find(mi) == done.end()) {
970  valid = m_Params.m_FnFilter( *m_Istr, obj, mi, nullptr, m_Params.m_Extra);
971  }
972  }
973  }
974  break;
975  }
976  }
977  return valid;
978 }
979 
980 template<typename TRoot>
981 bool
983 {
984  bool valid = true;
985  objinfo.GetTypeInfo()->DefaultReadData(*m_Istr, objinfo.GetObjectPtr());
986  TRoot& obj = *CTypeConverter<TRoot>::SafeCast(objinfo.GetObjectPtr());
987  CObjectInfoCV cv = objinfo.GetCurrentChoiceVariant();
989  if (i == m_Params.m_Index) {
991  cv.GetVariant().GetPointedObject() : cv.GetVariant();
992  valid = m_Params.m_FnFilter( *m_Istr, obj, i, &oi, m_Params.m_Extra);
993  } else {
994  valid = m_Params.m_FnFilter( *m_Istr, obj, m_Params.m_Index, nullptr, m_Params.m_Extra);
995  }
996  return valid;
997 }
998 
999 template<typename TRoot>
1000 bool
1002 {
1004  bool valid = true;
1005  TRoot& obj = *CTypeConverter<TRoot>::SafeCast(objinfo.GetObjectPtr());
1006 
1007  for ( CIStreamContainerIterator i(*m_Istr, objinfo); i; ++i ) {
1008  if (valid) {
1009  CObjectInfo oi(i.ReadElement(objinfo.GetObjectPtr()));
1010  ++mi;
1011  if (oi.GetObjectPtr()) {
1012  if (m_FilterType == eAllContainer || (m_FilterType == eOneContainer && mi == m_Params.m_Index)) {
1014  valid = m_Params.m_FnFilter( *m_Istr, obj, mi, &oe, m_Params.m_Extra);
1015  }
1016  }
1017  } else {
1018  i.SkipElement();
1019  }
1020  }
1021  return valid;
1022 }
1023 
1024 template<typename TRoot>
1027  if (m_Data.get() != nullptr) {
1028  if (!m_Data->m_HasReader) {
1029  if (m_Data->m_Istr->EndOfData()) {
1030  m_Data.reset();
1031  } else {
1032  m_Data->m_Value = CObjectInfo(m_Data->m_ValueType);
1033  m_Data->m_Istr->Read(m_Data->m_Value);
1034  }
1035  } else {
1036  m_Data->x_Next();
1037  if (m_Data->m_EndOfData) {
1038  m_Data.reset();
1039  }
1040  }
1041  }
1042  return *this;
1043 }
1044 
1045 template<typename TRoot>
1046 bool
1048  const CObjectIStreamIterator& v) const {
1049  return m_Data.get() == v.m_Data.get();
1050 }
1051 
1052 template<typename TRoot>
1053 bool
1055  const CObjectIStreamIterator& v) const {
1056  return m_Data.get() != v.m_Data.get();
1057 }
1058 
1059 template<typename TRoot>
1061  return m_Data.get() != nullptr && m_Data->m_Value.GetObjectPtr() != nullptr;
1062 }
1063 
1064 template<typename TRoot>
1065 const CObjectIStream&
1067  return *(m_Data->m_Istr);
1068 }
1069 
1070 template<typename TRoot>
1071 TRoot&
1073  return *(TRoot*)(m_Data->m_Value.GetObjectPtr());
1074 }
1075 
1076 template<typename TRoot>
1077 TRoot*
1079  return IsValid() ? (TRoot*)m_Data->m_Value.GetObjectPtr() : nullptr;
1080 }
1081 
1082 template<typename TRoot>
1085  return *this;
1086 }
1087 
1088 template<typename TRoot>
1092 }
1093 
1094 
1095 /////////////////////////////////////////////////////////////////////////////
1096 /// CObjectIStreamIterator<TRoot, TChild> implementation
1097 
1098 template<typename TRoot, typename TChild>
1100  : CParent() {
1101 }
1102 
1103 template<typename TRoot, typename TChild>
1105  CObjectIStream& istr, EOwnership deleteInStream, const CParams& params)
1106  : CParent(istr, params, deleteInStream)
1107 {
1108  if (!this->m_Data->m_EndOfData) {
1109  this->m_Data->m_HasReader = true;
1110  this->m_Data->m_Reader = thread([this](){x_ReaderThread();});
1111  }
1112  ++(*this);
1113 }
1114 
1115 template<typename TRoot, typename TChild>
1117  const CObjectIStreamIterator& v) : CParent(v) {
1118 }
1119 
1120 template<typename TRoot, typename TChild>
1123  const CObjectIStreamIterator& v) {
1124  CParent::operator=(v);
1125  return *this;
1126 }
1127 
1128 template<typename TRoot, typename TChild>
1130 }
1131 
1132 template<typename TRoot, typename TChild>
1135  CParent::operator++();
1136  return *this;
1137 }
1138 
1139 template<typename TRoot, typename TChild>
1142  return *this;
1143 }
1144 
1145 template<typename TRoot, typename TChild>
1149 }
1150 
1151 
1152 /////////////////////////////////////////////////////////////////////////////
1153 /// CObjectIStreamAsyncIterator<TRoot>
1154 
1155 template<typename TRoot>
1157 {
1158 public:
1159  using iterator_category = input_iterator_tag;
1160  using value_type = TRoot;
1161  using difference_type = ptrdiff_t;
1162  using pointer = TRoot*;
1163  using reference = TRoot&;
1165 
1167  EOwnership deleteInStream = eNoOwnership,
1168  const CParams& params = CParams());
1173 
1174  CObjectIStreamAsyncIterator& operator++(void);
1175  bool operator==(const CObjectIStreamAsyncIterator&) const;
1176  bool operator!=(const CObjectIStreamAsyncIterator&) const;
1177  bool IsValid(void) const;
1178 
1179  TRoot& operator*();
1180  TRoot* operator->();
1181 
1182  CObjectIStreamAsyncIterator& begin(void);
1183  CObjectIStreamAsyncIterator end(void);
1184 
1185 
1186 protected:
1187  typedef queue< CRef<TRoot> > TObjectsQueue;
1188 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1190 #else
1192 #endif
1194  EOwnership deleteInStream,
1195  FParserFunction parser,
1196  const CParams& params);
1197 private:
1198  static TObjectsQueue sx_ClearGarbageAndParse(
1200  const CParams& params,
1201 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1202  TObjectsQueue garbage
1203 #else
1204  TObjectsQueue&& garbage
1205 #endif
1206  );
1207 
1208  struct CData {
1209  CData(CObjectIStream& istr, EOwnership deleteInStream, FParserFunction parser,
1210  const CParams& params);
1211  ~CData(void);
1212 
1213  using future_queue_t = future<TObjectsQueue>;
1214  using futures_queue_t = queue<future_queue_t>;
1215 
1216  void x_UpdateObjectsQueue();
1217  void x_UpdateFuturesQueue();
1218  CRef< CByteSource > x_GetNextData(void);
1219  void x_ReaderThread(void);
1220 
1221  TObjectsQueue m_ObjectsQueue; // current queue of objects
1222  TObjectsQueue m_GarbageQueue; // popped so-far from objects-queue
1223  futures_queue_t m_FuturesQueue; // queue-of-futures-of-object-queues
1224 
1232  launch m_Policy;
1235 
1237  condition_variable m_ReaderCv;
1238  thread m_Reader;
1239  queue< CRef< CByteSource > > m_ReaderData;
1240  queue< size_t > m_ReaderDataSize;
1241  };
1242  shared_ptr<CData> m_Data;
1243 };
1244 
1245 
1246 /////////////////////////////////////////////////////////////////////////////
1247 /// CObjectIStreamAsyncIterator<TRoot, TChild>
1248 
1249 template<typename TRoot, typename TChild>
1250 class CObjectIStreamAsyncIterator<TRoot, TChild>
1251  : public CObjectIStreamAsyncIterator<TChild>
1252 {
1253 public:
1255 
1257  EOwnership deleteInStream = eNoOwnership,
1258  const CParams& params = CParams());
1263 
1267 
1268 
1269 private:
1271  using TObjectsQueue = typename CParent::TObjectsQueue;
1272 
1273  static TObjectsQueue sx_ClearGarbageAndParse(
1275  const CParams& params,
1276 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1277  TObjectsQueue garbage
1278 #else
1279  TObjectsQueue&& garbage
1280 #endif
1281  );
1282 };
1283 
1284 /////////////////////////////////////////////////////////////////////////////
1285 /// CObjectIStreamAsyncIterator<TRoot> implementation
1286 
1287 template<typename TRoot>
1289  : m_Data(nullptr)
1290 {
1291 }
1292 
1293 template<typename TRoot>
1295  CObjectIStream& istr, EOwnership deleteInStream,
1296  const CParams& params)
1297  : CObjectIStreamAsyncIterator( istr, deleteInStream,
1298  &CObjectIStreamAsyncIterator<TRoot>::sx_ClearGarbageAndParse, params)
1299 {
1300 }
1301 
1302 template<typename TRoot>
1304  CObjectIStream& istr, EOwnership deleteInStream,
1305  FParserFunction parser, const CParams& params)
1306  : m_Data(new CData(istr, deleteInStream, parser, params))
1307 {
1308  ++(*this);
1309 }
1310 
1311 template<typename TRoot>
1313  const CObjectIStreamAsyncIterator& v)
1314  : m_Data(v.m_Data)
1315 {
1316 }
1317 
1318 template<typename TRoot>
1321  const CObjectIStreamAsyncIterator& v) {
1322  m_Data = v.m_Data;
1323  return *this;
1324 }
1325 
1326 template<typename TRoot>
1328 }
1329 
1330 template<typename TRoot>
1333  if (m_Data.get() != nullptr) {
1334  do {
1335  m_Data->x_UpdateFuturesQueue();
1336  m_Data->x_UpdateObjectsQueue();
1337  } while (!IsValid() && !m_Data->m_EndOfData);
1338  if (!IsValid()) {
1339  m_Data.reset();
1340  }
1341  }
1342  return *this;
1343 }
1344 
1345 template<typename TRoot>
1346 bool
1348  const CObjectIStreamAsyncIterator& v) const {
1349  return m_Data.get() == v.m_Data.get();
1350 }
1351 
1352 template<typename TRoot>
1353 bool
1355  const CObjectIStreamAsyncIterator& v) const {
1356  return m_Data.get() != v.m_Data.get();
1357 }
1358 
1359 template<typename TRoot>
1361  return m_Data.get() != nullptr && !m_Data->m_ObjectsQueue.empty();
1362 }
1363 
1364 template<typename TRoot>
1365 TRoot&
1367  return m_Data->m_ObjectsQueue.front().GetObject();
1368 }
1369 
1370 template<typename TRoot>
1371 TRoot*
1373  return IsValid() ? m_Data->m_ObjectsQueue.front().GetPointer() : nullptr;
1374 }
1375 
1376 template<typename TRoot>
1379  return *this;
1380 }
1381 
1382 template<typename TRoot>
1386 }
1387 
1388 
1389 template<typename TRoot>
1392  CRef<CByteSource> bytesource,
1394  const CParams& params,
1395 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1396  TObjectsQueue garbage
1397 #else
1398  TObjectsQueue&& garbage
1399 #endif
1400  )
1401 {
1402  {{
1404  swap(garbage, dummy);
1405  // garbage now gets destroyed, if last-reference
1406  }}
1407 
1408  // deserialize objects from bytesource
1409  unique_ptr<CObjectIStream> istr { CObjectIStream::Create(format, *bytesource) };
1410 
1411  TObjectsQueue queue;
1412  if (params.m_FnFilter) {
1413  for (TRoot& object : CObjectIStreamIterator<TRoot>( *istr, eNoOwnership, params)) {
1414  queue.push( CRef<TRoot>(&object));
1415  }
1416  } else {
1417  while(!istr->EndOfData()) {
1418  CRef<TRoot> object(new TRoot);
1419  istr->Read(&*object, object->GetThisTypeInfo());
1420  queue.push(object);
1421  }
1422  }
1423  return queue;
1424 }
1425 
1426 template<typename TRoot>
1428  CObjectIStream& istr, EOwnership deleteInStream, FParserFunction parser,
1429  const CParams& params)
1430  : m_Istr(&istr)
1431  , m_Own(deleteInStream)
1432  , m_Parser(parser)
1433  , m_ParserCount( params.m_MaxParserThreads != 0 ? params.m_MaxParserThreads : 16)
1434  , m_RawBufferSize( params.m_MinRawBufferSize)
1435  , m_MaxRawSize( params.m_SameThread ? 0 : params.m_MaxTotalRawSize)
1436  , m_CurrentRawSize(0)
1437  , m_Policy(params.m_ThreadPolicy)
1438  , m_EndOfData(m_Istr->EndOfData())
1439  , m_Params(params)
1440 {
1441  if (m_MaxRawSize != 0 && !m_EndOfData) {
1442  m_Reader = thread([this](){x_ReaderThread();});
1443  }
1444 }
1445 
1446 template<typename TRoot>
1448  if (m_Reader.joinable()) {
1449  m_EndOfData = true;
1450  m_ReaderCv.notify_all();
1451  m_Reader.join();
1452  }
1453  if (m_Istr && m_Own == eTakeOwnership) {
1454  delete m_Istr;
1455  }
1456 }
1457 
1458 // m_GarbageQueue processing:
1459 //
1460 // When the current object (the one returned by operator*())
1461 // goes out of scope, if it is the last reference, the
1462 // destructor of the object will be called from main
1463 // thread, which is an expensive operation, which
1464 // we want to offload to a different thread - "here are some
1465 // objects - just let them go out of scope"
1466 //
1467 // So before calling m_ObjectsQueue pop we'll save the
1468 // current object in the garbage-queue to prevent it from being
1469 // destructed at this time, and will pass the garbage
1470 // queue to sx_ClearGarbageAndParse (executed asynchrously),
1471 // where the destructors of the garbage-objects will be called
1472 // (as apporpriate, as determined by CRefs going out of scope)
1473 
1474 template<typename TRoot>
1475 void
1477 {
1478  // bring the next objects up front; save the garbage
1479  if(!m_ObjectsQueue.empty()) {
1480  m_GarbageQueue.push( m_ObjectsQueue.front());
1481  m_ObjectsQueue.pop();
1482  }
1483 
1484  // unpack the next objects-queue from futures-queue if empty
1485  if( m_ObjectsQueue.empty()
1486  && !m_FuturesQueue.empty())
1487  {
1488  m_ObjectsQueue = m_FuturesQueue.front().get();
1489  m_FuturesQueue.pop();
1490  }
1491 }
1492 
1493 template<typename TRoot>
1494 void
1496 {
1497  // nothing to deserialize, or already full
1498  if( m_FuturesQueue.size() >= m_ParserCount) {
1499  return;
1500  }
1501  if (m_EndOfData ||
1502  // no raw data ready yet, but we still have work to do
1503  (m_MaxRawSize != 0 && m_ReaderData.empty() && !m_FuturesQueue.empty())) {
1504  return;
1505  }
1506  CRef< CByteSource > data = x_GetNextData();
1507  if (data.IsNull()) {
1508  m_EndOfData = true;
1509  return;
1510  }
1511 
1512 #if 0
1513  // for reference / profiling: clearing the garbage-queue
1514  // from this thread will make the processing considerably slower.
1515  // Instead, we'll pass the garbage to the async call below.
1516  if(false) {
1518  swap(m_GarbageQueue, dummy);
1519  }
1520 #endif
1521 
1522  // launch async task to deserialize objects
1523  // from the skipped bytes in delay-buffer, and
1524  // also pass the garbage queue for destruction
1525  // of contents.
1526  // note: we can't move m_GarbageQueue directly
1527  // as it lacks ::clear() method that could restore
1528  // it to a valid empty state after move.
1529 
1530  TObjectsQueue tmp_garbage_queue;
1531  swap(m_GarbageQueue, tmp_garbage_queue);
1532 
1533  m_FuturesQueue.push( async( m_Policy, m_Parser,
1534  data, m_Istr->GetDataFormat(), m_Params, std::move(tmp_garbage_queue)));
1535 }
1536 
1537 template<typename TRoot>
1540 {
1541  if (m_MaxRawSize == 0) {
1542  // read raw data in this (main) thread
1543  if (m_Istr->EndOfData()) {
1544  return CRef< CByteSource >();
1545  }
1546  const CNcbiStreampos endpos =
1547  m_Istr->GetStreamPos() + (CNcbiStreampos)(m_RawBufferSize);
1548  CStreamDelayBufferGuard guard(*m_Istr);
1549  do {
1550  m_Istr->SkipAnyContentObject();
1551  } while( !m_Istr->EndOfData() && m_Istr->GetStreamPos() < endpos);
1552  return guard.EndDelayBuffer();
1553  }
1554 
1555  // get raw data prepared by reader
1556  unique_lock<mutex> lck(m_ReaderMutex);
1557  while (m_ReaderData.empty()) {
1558  m_ReaderCv.wait(lck);
1559  }
1560  CRef< CByteSource > data = m_ReaderData.front();
1561  m_ReaderData.pop();
1562  m_CurrentRawSize -= m_ReaderDataSize.front();
1563  m_ReaderDataSize.pop();
1564  m_ReaderCv.notify_one();
1565  return data;
1566 }
1567 
1568 template<typename TRoot>
1569 void
1571 {
1572  // Skip over some objects in stream without parsing, up to buffer_size.
1573  while (!m_Istr->EndOfData()) {
1574  const CNcbiStreampos startpos = m_Istr->GetStreamPos();
1575  const CNcbiStreampos endpos =
1576  startpos + (CNcbiStreampos)(m_RawBufferSize);
1577 
1578  CStreamDelayBufferGuard guard(*(m_Istr));
1579  try {
1580  do {
1581  m_Istr->SkipAnyContentObject();
1582  } while( !m_Istr->EndOfData() && m_Istr->GetStreamPos() < endpos);
1583  } catch (...) {
1584  }
1585 
1586  size_t this_buffer_size = m_Istr->GetStreamPos() - startpos;
1588  {
1589  unique_lock<mutex> lck(m_ReaderMutex);
1590  // make sure we do not consume too much memory
1591  while (!m_EndOfData && m_CurrentRawSize >= m_MaxRawSize) {
1592  m_ReaderCv.wait(lck);
1593  }
1594  if (m_EndOfData) {
1595  break;
1596  }
1597  m_ReaderData.push( data);
1598  m_ReaderDataSize.push( this_buffer_size);
1599  m_CurrentRawSize += this_buffer_size;
1600  m_ReaderCv.notify_one();
1601  }
1602  }
1604  m_ReaderMutex.lock();
1605  m_ReaderData.push( data);
1606  m_ReaderDataSize.push(0);
1607  m_ReaderMutex.unlock();
1608  m_ReaderCv.notify_one();
1609 }
1610 
1611 
1612 /////////////////////////////////////////////////////////////////////////////
1613 /// CObjectIStreamAsyncIterator<TRoot,TChild> implementation
1614 
1615 template<typename TRoot, typename TChild>
1617  : CParent()
1618 {
1619 }
1620 
1621 template<typename TRoot, typename TChild>
1623  CObjectIStream& istr, EOwnership deleteInStream,
1624  const CParams& params)
1625  : CParent(istr, deleteInStream,
1626  &CObjectIStreamAsyncIterator<TRoot, TChild>::sx_ClearGarbageAndParse, params)
1627 {
1628 }
1629 
1630 template<typename TRoot, typename TChild>
1632  const CObjectIStreamAsyncIterator& v)
1633  : CParent(v)
1634 {
1635 }
1636 
1637 template<typename TRoot, typename TChild>
1640  const CObjectIStreamAsyncIterator& v) {
1641  CParent::operator=(v);
1642  return *this;
1643 }
1644 
1645 template<typename TRoot, typename TChild>
1647 }
1648 
1649 template<typename TRoot, typename TChild>
1652  CParent::operator++();
1653  return *this;
1654 }
1655 
1656 template<typename TRoot, typename TChild>
1659  return *this;
1660 }
1661 
1662 template<typename TRoot, typename TChild>
1666 }
1667 
1668 template<typename TRoot, typename TChild>
1671  CRef<CByteSource> bytesource,
1673  const CParams& params,
1674 #if NCBI_COMPILER_MSVC && _MSC_VER < 1900
1675  TObjectsQueue garbage
1676 #else
1677  TObjectsQueue&& garbage
1678 #endif
1679  )
1680 {
1681  {{
1683  swap(garbage, dummy);
1684  // garbage now gets destroyed, if last-reference
1685  }}
1686 
1687  // deserialize objects from bytesource
1688  unique_ptr<CObjectIStream> istr { CObjectIStream::Create(format, *bytesource) };
1689  TObjectsQueue queue;
1690  for (TChild& object : CObjectIStreamIterator<TRoot, TChild>( *istr, eNoOwnership, params)) {
1691  queue.push( CRef<TChild>(&object));
1692  }
1693  return queue;
1694 }
1695 
1696 
1697 
1698 
1699 /////////////////////////////////////////////////////////////////////////////
1700 /////////////////////////////////////////////////////////////////////////////
1701 // Iterate over objects in input stream
1702 //
1703 // IMPORTANT: the following API requires multi-threading
1704 //
1705 // IMPORTANT: this API is deprecated, use CObjectIStreamIterator instead (defined above)
1706 
1707 
1708 template<typename TRoot, typename TObject>
1710 
1711 // Helper hook class
1712 template<typename TRoot, typename TObject>
1714 {
1715 public:
1717  : m_Reader(thr)
1718  {
1719  }
1720  virtual void Process(const TObject& obj) override;
1721 private:
1723 };
1724 
1725 // Helper thread class
1726 template<typename TRoot, typename TObject>
1728 {
1729 public:
1731  : m_In(in), m_Resume(0,1), m_Ready(0,1), m_Obj(0),
1732  m_Ownership(deleteInStream), m_Stop(false), m_Failed(false)
1733  {
1734  }
1735  // Resume thread, wait for the next object
1736  void Next(void)
1737  {
1738  m_Obj = 0;
1739  if (!m_Stop && !m_In.EndOfData()) {
1740  m_Resume.Post();
1741  m_Ready.Wait();
1742  if (m_Failed) {
1744  "invalid data object received");
1745  }
1746  }
1747  }
1748  // Request stop: thread is no longer needed
1749  void Stop(void)
1750  {
1751  m_Stop = true;
1752  m_Resume.Post();
1753  Join(0);
1754  }
1755  void Fail(void)
1756  {
1757  m_Failed = true;
1758  SetObject(0);
1759  }
1760  // Object is ready: suspend thread
1761  void SetObject(const TObject* obj)
1762  {
1763  m_Obj = obj;
1764  m_Ready.Post();
1765  m_Resume.Wait();
1766  if (m_Stop) {
1767  Exit(0);
1768  }
1769  }
1770  const TObject* GetObject(void) const
1771  {
1772  return m_Obj;
1773  }
1774 protected:
1776  {
1777  if (m_Ownership == eTakeOwnership) {
1778  delete &m_In;
1779  }
1780  }
1781  virtual void* Main(void)
1782  {
1783  return 0;
1784  }
1785 protected:
1789  const TObject* m_Obj;
1791  bool m_Stop;
1792  bool m_Failed;
1793 };
1794 
1795 // Reading thread for serial objects
1796 template<typename TRoot, typename TObject>
1798  : public CIStreamIteratorThread_Base< TRoot,TObject >
1799 {
1800 public:
1802  : CIStreamIteratorThread_Base< TRoot,TObject >(in, deleteInStream)
1803  {
1804  }
1805 protected:
1807  {
1808  }
1809  virtual void* Main(void) override
1810  {
1811  this->m_Resume.Wait();
1812  // Enumerate objects of requested type
1813  try {
1814  Serial_FilterObjects< TRoot >( this->m_In,
1816  this->SetObject(0);
1817  } catch (CException& e) {
1818  NCBI_REPORT_EXCEPTION("In CIStreamObjectIteratorThread",e);
1819  this->Fail();
1820  }
1821  return 0;
1822  }
1823 };
1824 
1825 // Reading thread for std objects
1826 template<typename TRoot, typename TObject>
1828  : public CIStreamIteratorThread_Base< TRoot,TObject >
1829 {
1830 public:
1832  : CIStreamIteratorThread_Base< TRoot,TObject >(in, deleteInStream)
1833  {
1834  }
1835 protected:
1837  {
1838  }
1839  virtual void* Main(void) override
1840  {
1841  this->m_Resume.Wait();
1842  // Enumerate objects of requested type
1843  try {
1844  Serial_FilterStdObjects< TRoot >( this->m_In,
1846  this->SetObject(0);
1847  } catch (CException& e) {
1848  NCBI_REPORT_EXCEPTION("In CIStreamStdIteratorThread",e);
1849  this->Fail();
1850  }
1851  return 0;
1852  }
1853 };
1854 
1855 template<typename TRoot, typename TObject>
1856 inline
1858 {
1859  m_Reader.SetObject(&obj);
1860 }
1861 
1862 // Stream iterator base class
1863 template<typename TRoot, typename TObject>
1865 {
1866 public:
1867  void operator++()
1868  {
1869  m_Reader->Next();
1870  }
1871  void operator++(int)
1872  {
1873  m_Reader->Next();
1874  }
1875  const TObject& operator* (void) const
1876  {
1877  return *(m_Reader->GetObject());
1878  }
1879  const TObject* operator-> (void) const
1880  {
1881  return m_Reader->GetObject();
1882  }
1883  bool IsValid(void) const
1884  {
1885  return m_Reader->GetObject() != 0;
1886  }
1887 protected:
1889  : m_Reader(nullptr)
1890  {
1891  }
1893  {
1894  if (m_Reader) {
1895  m_Reader->Stop();
1896  }
1897  }
1898 private:
1899  // prohibit copy
1901  // prohibit assignment
1904 
1905 protected:
1907 };
1908 
1909 /// Stream iterator for serial objects
1910 ///
1911 /// Usage:
1912 /// CObjectIStream* is = CObjectIStream::Open(...);
1913 /// CIStreamObjectIterator<CRootClass,CObjectClass> i(*is);
1914 /// for ( ; i.IsValid(); ++i) {
1915 /// const CObjectClass& obj = *i;
1916 /// ...
1917 /// }
1918 /// IMPORTANT:
1919 /// This API requires multi-threading!
1920 
1921 template<typename TRoot, typename TObject>
1923  : public CIStreamIterator_Base< TRoot, TObject>
1924 {
1925 public:
1927  {
1928  // Create reading thread, wait until it finds the next object
1929  this->m_Reader =
1931  this->m_Reader->Run();
1932  this->m_Reader->Next();
1933  }
1935  {
1936  }
1937 };
1938 
1939 /// Stream iterator for standard type objects
1940 ///
1941 /// Usage:
1942 /// CObjectIStream* is = CObjectIStream::Open(...);
1943 /// CIStreamStdIterator<CRootClass,string> i(*is);
1944 /// for ( ; i.IsValid(); ++i) {
1945 /// const string& obj = *i;
1946 /// ...
1947 /// }
1948 /// IMPORTANT:
1949 /// This API requires multi-threading!
1950 
1951 template<typename TRoot, typename TObject>
1953  : public CIStreamIterator_Base< TRoot, TObject>
1954 {
1955 public:
1957  {
1958  // Create reading thread, wait until it finds the next object
1959  this->m_Reader =
1960  new CIStreamStdIteratorThread< TRoot, TObject >(in,deleteInStream);
1961  this->m_Reader->Run();
1962  this->m_Reader->Next();
1963  }
1965  {
1966  }
1967 };
1968 
1969 #endif // NCBI_THREADS
1970 
1971 
1972 /* @} */
1973 
1975 
1976 #endif /* STREAMITER__HPP */
bool operator!=(const _Ht_iterator< _Val, _Nonconst_traits< _Val >, _Key, _HF, _ExK, _EqK, _All > &__x, const _Ht_iterator< _Val, _Const_traits< _Val >, _Key, _HF, _ExK, _EqK, _All > &__y)
Definition: _hashtable.h:173
@ eNone
None specified.
Definition: blast_def.h:326
unsigned dummy
Definition: block_cipher.h:0
Reading (iterating through) members of the class (SET, SEQUENCE)
Definition: objectio.hpp:120
Reading (iterating through) elements of containers (SET OF, SEQUENCE OF).
Definition: objectio.hpp:164
Stream iterator for serial objects.
Stream iterator for standard type objects.
Asynchronous parsing parameters.
Definition: streamiter.hpp:343
CObjectIStreamAsyncIterator<TRoot, TChild>
CObjectIStreamAsyncIterator<TRoot>
CObjectIStreamAsyncIterator.
Definition: streamiter.hpp:337
Filtering parameters.
Definition: streamiter.hpp:167
CObjectIStreamIterator<TRoot, TChild>
Definition: streamiter.hpp:574
template specializations and implementation
Definition: streamiter.hpp:477
CObjectIStreamIterator.
Definition: streamiter.hpp:140
CObjectIStream –.
Definition: objistr.hpp:93
CObjectInfoCV –.
Definition: objectiter.hpp:588
CObjectInfoMI –.
Definition: objectiter.hpp:432
CObjectInfo –.
Definition: objectinfo.hpp:597
CObjectTypeInfo –.
Definition: objectinfo.hpp:94
Read hook for a standalone object.
Definition: objhook.hpp:59
CSemaphore –.
Definition: ncbimtx.hpp:1375
Root class for all serialization exceptions.
Definition: exception.hpp:50
Helper hook for Serial_FilterObjects function template; User hook class should be derived from this b...
Definition: objhook.hpp:588
Skip hook for a standalone object.
Definition: objhook.hpp:205
Guard class for CObjectIStream::StartDelayBuffer/EndDelayBuffer.
Definition: objistr.hpp:1177
CTypeInfo class contains all information about C++ types (both basic and classes): members and layout...
Definition: typeinfo.hpp:76
Definition: set.hpp:45
Include a standard set of the NCBI C++ Toolkit most basic headers.
bool operator==(const CEquivRange &A, const CEquivRange &B)
#define false
Definition: bool.h:36
#define bool
Definition: bool.h:34
char data[12]
Definition: iconv.c:80
void swap(NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair1, NCBI_NS_NCBI::pair_base_member< T1, T2 > &pair2)
Definition: ncbimisc.hpp:1508
@ eTakeOwnership
An object can take ownership of another.
Definition: ncbi_types.h:136
@ eNoOwnership
No ownership is assumed.
Definition: ncbi_types.h:135
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
#define NCBI_REPORT_EXCEPTION(title, ex)
Generate a report on the exception.
Definition: ncbiexpt.hpp:755
TMemberIndex Find(const CTempString &name) const
Definition: memberlist.cpp:256
TMemberIndex m_Index
Definition: item.hpp:120
const CItemInfo * GetItemInfo(TMemberIndex index) const
static TMemberIndex FirstIndex(void)
Definition: memberlist.hpp:78
TTypeInfo GetTypeInfo(void) const
TMemberIndex LastIndex(void) const
Definition: memberlist.hpp:82
CVect2< NCBI_PROMOTE(int,U) > operator*(int v1, const CVect2< U > &v2)
Definition: globals.hpp:371
size_t TMemberIndex
Type used for indexing class members and choice variants.
Definition: serialdef.hpp:230
const TMemberIndex kInvalidMember
Special value returned from FindMember.
Definition: serialdef.hpp:237
ETypeFamily
Type family.
Definition: serialdef.hpp:138
static const TObjectType * SafeCast(TTypeInfo type)
Definition: serialutil.hpp:76
ESerialDataFormat
Data file format.
Definition: serialdef.hpp:71
@ eTypeFamilyClass
Definition: serialdef.hpp:140
@ eTypeFamilyContainer
Definition: serialdef.hpp:142
@ eTypeFamilyChoice
Definition: serialdef.hpp:141
@ eTypeFamilyPointer
Definition: serialdef.hpp:143
bool IsValid(void) const
Check whether the iterator points to a data TRUE if the iterator is constructed upon a serialization ...
CRef< CByteSource > EndDelayBuffer(void)
Redirect call to protected CObjectIStream After this call guarding is finished.
CObjectIStreamIterator(CObjectIStream &istr, EOwnership deleteInStream=eNoOwnership, const CParams< TObj > &params=CParams< TObj >())=delete
Construct iterator upon an object serialization stream.
CObjectIStreamIterator & begin(void)
Return self.
CObjectIStreamAsyncIterator<>::CParams< TChild > CParams
CParams & FilterByMember(const string &mem_name, FMemberFilter< TObj > fn, void *extra=nullptr)
Filter by member name.
Definition: streamiter.hpp:363
shared_ptr< CData > m_Data
Definition: streamiter.hpp:565
CIStreamIteratorThread_Base< TRoot, TObject > & m_Reader
x_CObjectIStreamIteratorReadHook(typename CObjectIStreamIterator< TR >::CData *pthis)
Definition: streamiter.hpp:599
CObjectInfo GetPointedObject(void) const
Get data and type information of object to which this type refers.
Definition: objectinfo.cpp:102
CObjectIStreamIterator & operator=(const CObjectIStreamIterator &)
bool IsValid(void) const
Check whether the iterator points to a data TRUE if the iterator is constructed upon a serialization ...
void SetObject(const TObject *obj)
CParams & ReadAndSkipInTheSameThread(bool same_thread)
Raw data read and its pre-parsing (storing the raw data pertaining to a single object and putting it ...
Definition: streamiter.hpp:394
function< bool(const CObjectIStream &istr, TObj &obj, TMemberIndex mem_index, CObjectInfo *mem, void *extra)> FMemberFilter
Object member filtering function.
Definition: streamiter.hpp:163
const TObject * operator->(void) const
CIStreamStdIteratorThread(CObjectIStream &in, EOwnership deleteInStream)
typename CParent::TObjectsQueue TObjectsQueue
bool operator!=(const CObjectIStreamIterator &) const
static CObjectIStream * Create(ESerialDataFormat format)
Create serial object reader.
Definition: objistr.cpp:144
CParams & MinRawBufferSize(size_t min_raw_buffer_size)
Single raw data memory buffer size should be at least this big.
Definition: streamiter.hpp:383
virtual bool EndOfData(void)
Check if there is still some meaningful data that can be read; in text streams this function will ski...
Definition: objistr.cpp:588
CObjectIStreamIterator & operator++(void)
Advance to the next data object.
CObjectIStreamIterator<>::CParams< TRoot > CParams
Definition: streamiter.hpp:484
CIStreamIterator_Base(const CIStreamIterator_Base< TRoot, TObject > &v)
const CObjectIStream & GetObjectIStream(void) const
Return the underlying serial object stream.
CObjectIStreamIterator(const CObjectIStreamIterator &)
queue< CRef< TRoot > > TObjectsQueue
CObjectIStreamAsyncIterator(const CObjectIStreamAsyncIterator &)
function< TObjectsQueue(CRef< CByteSource >, ESerialDataFormat, const CParams &, TObjectsQueue &&)> FParserFunction
TObjectPtr GetObjectPtr(void) const
Get pointer to object.
bool operator==(const CObjectIStreamIterator &) const
CObjectIStreamAsyncIterator & operator++(void)
Advance to the next data object.
CObjectInfo GetMember(void) const
Get class member data.
CObjectIStreamIterator<>::FMemberFilter< TR > FMemberFilter
Definition: streamiter.hpp:347
bool IsValid(void) const
TObj * operator->()
Return pointer to data object which is currently pointed to by the iterator.
x_CObjectIStreamIteratorHook(typename CObjectIStreamIterator< TR >::CData *pthis)
Definition: streamiter.hpp:550
CObjectIStreamIterator<>::CParams< TObj > CParent
Definition: streamiter.hpp:345
CIStreamObjectIterator(CObjectIStream &in, EOwnership deleteInStream=eNoOwnership)
CChoiceVariant GetCurrentChoiceVariant(void) const
Get data and type information of selected choice variant.
CIStreamIteratorThread_Base< TRoot, TObject > * m_Reader
virtual void ReadObject(CObjectIStream &in, const CObjectInfo &type) override
This method will be called at approriate time when the object of requested type is to be read.
Definition: streamiter.hpp:603
input_iterator_tag iterator_category
Definition: streamiter.hpp:479
TTypeInfo GetTypeInfo(void) const
virtual void * Main(void) override
Derived (user-created) class must provide a real thread function.
TObj & operator*()
Return data object which is currently pointed to by the iterator.
CObjectIStreamIterator<>::CParams< TChild > CParams
Definition: streamiter.hpp:576
CObjectIStreamAsyncIterator<>::CParams< TRoot > CParams
CParams & FilterByMember(const string &mem_name, FMemberFilter< TObj > fn, void *extra=nullptr)
Filter by member name.
Definition: streamiter.hpp:179
CIStreamObjectHook(CIStreamIteratorThread_Base< TRoot, TObject > &thr)
CObjectIStreamIterator end(void)
Construct and return end-of-stream iterator.
const CClassTypeInfo * GetClassTypeInfo(void) const
Definition: objectinfo.cpp:61
CIStreamStdIterator(CObjectIStream &in, EOwnership deleteInStream=eNoOwnership)
CObjectInfo GetVariant(void) const
Get variant data.
virtual void Process(const TObject &obj) override
This method will be called when the object of the requested class is read.
CIStreamObjectIteratorThread(CObjectIStream &in, EOwnership deleteInStream)
TMemberIndex xxx_MemberIndex(const string &mem_name)
Definition: streamiter.hpp:633
bool operator!=(const CObjectIStreamAsyncIterator &) const
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
virtual void SkipObject(CObjectIStream &in, const CObjectTypeInfo &type) override
Definition: streamiter.hpp:554
CParams & FilterByMember(TMemberIndex index, FMemberFilter< TObj > fn, void *extra=nullptr)
Filter by member index.
Definition: streamiter.hpp:175
FMemberFilter< TObj > m_FnFilter
Definition: streamiter.hpp:187
friend class CObjectIStreamAsyncIterator
Definition: streamiter.hpp:190
TMemberIndex GetVariantIndex(void) const
Get index of the variant in the choice.
CParams & MaxTotalRawSize(size_t max_total_raw_size)
Total size of raw data buffers is allowed to grow to this value.
Definition: streamiter.hpp:378
bool operator==(const CObjectIStreamAsyncIterator &) const
const TObject * GetObject(void) const
ETypeFamily GetTypeFamily(void) const
Get data type family.
CObjectIStreamAsyncIterator & operator=(const CObjectIStreamAsyncIterator &)
CObjectIStreamIterator(void)=delete
Construct end-of-stream (invalid) iterator.
CIStreamIterator_Base< TRoot, TObject > & operator=(const CIStreamIterator_Base< TRoot, TObject > &v)
enable_if< is_base_of< CSerialObject, TRoot >::value, TTypeInfo >::type xxx_GetTypeInfo(void)
Definition: streamiter.hpp:619
const TObject & operator*(void) const
TObj & operator*()
Return data object which is currently pointed to by the iterator.
CParams & LaunchPolicy(launch policy)
Parsing thread launch policy.
Definition: streamiter.hpp:368
CParams & FilterByMember(TMemberIndex index, FMemberFilter< TObj > fn, void *extra=nullptr)
Filter by member index.
Definition: streamiter.hpp:358
CObjectIStreamAsyncIterator & begin(void)
Return self.
CIStreamIteratorThread_Base(CObjectIStream &in, EOwnership deleteInStream)
CObjectIStreamAsyncIterator end(void)
Construct and return end-of-stream iterator.
queue< CRef< CByteSource > > m_ReaderData
TObj * operator->()
Return pointer to data object which is currently pointed to by the iterator.
virtual void * Main(void) override
Derived (user-created) class must provide a real thread function.
bool Serial_FilterSkip(CObjectIStream &in, const CObjectTypeInfo &ctype)
Definition: serial.cpp:76
CObjectIStreamAsyncIterator(void)=delete
Construct end-of-stream (invalid) iterator.
CObjectIStreamAsyncIterator(CObjectIStream &istr, EOwnership own_istr=eNoOwnership, const CParams< TObj > &params=CParams< TObj >())=delete
Construct iterator upon an object serialization stream.
CParams & MaxParserThreads(unsigned max_parser_threads)
Maximum number of parsing threads.
Definition: streamiter.hpp:373
bool IsValid(const CSeq_point &pt, CScope *scope)
Checks that point >= 0 and point < length of Bioseq.
#define NCBI_FALLTHROUGH
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
IO_PREFIX::streampos CNcbiStreampos
Portable alias for streampos.
Definition: ncbistre.hpp:134
static void Exit(void *exit_data)
Cancel current thread.
Definition: ncbithr.cpp:906
void Wait(void)
Wait on semaphore.
Definition: ncbimtx.cpp:1787
void Post(unsigned int count=1)
Increment the semaphore by "count".
Definition: ncbimtx.cpp:1971
void Join(void **exit_data=0)
Wait for the thread termination.
Definition: ncbithr.cpp:863
const CItemsInfo & GetItems(void) const
ETypeFamily GetTypeFamily(void) const
bool RandomOrder(void) const
void DefaultReadData(CObjectIStream &in, TObjectPtr object) const
bool Implicit(void) const
enum ENcbiOwnership EOwnership
Ownership relations between objects.
int i
enable_if< is_pod< TRoot >::value||is_convertible< TRoot, std::string >::value, TTypeInfo >::type xxx_GetTypeInfo(void)
Definition: streamiter.hpp:627
const GenericPointer< typename T::ValueType > T2 value
Definition: pointer.h:1227
#define NCBI_COMPILER_MSVC
Definition: ncbiconf_msvc.h:22
#define nullptr
Definition: ncbimisc.hpp:45
Multi-threading – classes, functions, and features.
static Format format
Definition: njn_ioutil.cpp:53
std::istream & in(std::istream &in_, double &x_)
Definition: type.c:6
CRef< CTestThread > thr[k_NumThreadsMax]
Definition: test_mt.cpp:267
done
Definition: token1.c:1
Modified on Wed Sep 04 15:02:40 2024 by modify_doxy.py rev. 669887