NCBI C++ ToolKit
async_writers.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: async_writers.cpp 98486 2022-11-25 17:54:31Z gotvyans $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Sergiy Gotvyanskyy
27 *
28 * File Description:
29 *
30 */
31 
32 #include <ncbi_pch.hpp>
33 
35 
39 
42 
43 #include <serial/objostr.hpp>
44 #include <serial/objectio.hpp>
46 
50 
52  EDuplicateIdPolicy policy)
53  : m_ostream{o_stream},
54  m_DuplicateIdPolicy{policy} {}
55 
56 
58 
59 
61  m_ostream->Write(topobject, topobject->GetThisTypeInfo());
62 }
63 
65 {
66  auto next_function = [this]()
67  {
68  return m_write_queue.pop_front();
69  };
70 
71  m_writer_task = std::async(std::launch::async, [this, next_function](CConstRef<CSerialObject> topobj)
72  {
73  Write(topobj, next_function);
74  }, std::move(topobject));
75 }
76 
78 {
79  m_write_queue.push_back(std::move(entry));
80 }
81 
83 {
84  m_write_queue.push_back({});
85  m_writer_task.wait();
86 }
87 
89 {
90  m_write_queue.clear();
91  m_write_queue.push_back({});
92  m_writer_task.wait();
93 }
94 
96 static void s_ReportDuplicateIds(const TIdSet& duplicateIds)
97 {
98  if (duplicateIds.empty()) {
99  return;
100  }
101  string msg = "duplicate Bioseq id";
102  if (duplicateIds.size() > 1) {
103  msg += "s";
104  }
105  for (auto pId : duplicateIds) {
106  msg += "\n";
107  msg += GetLabel(*pId);
108  }
109  NCBI_THROW(CHugeFileException, eDuplicateSeqIds, msg);
110 }
111 
112 
114 {
115  size_t bioseq_level = 0;
116  auto seq_set_member = CObjectTypeInfo(CBioseq_set::GetTypeInfo()).FindMember("seq-set");
117  SetLocalWriteHook(seq_set_member.GetMemberType(), *m_ostream,
118  [&bioseq_level, get_next_entry]
119  (CObjectOStream& out, const CConstObjectInfo& object)
120  {
121  bioseq_level++;
122  if (bioseq_level == 1)
123  {
124  COStreamContainer out_container(out, object.GetTypeInfo());
125  CConstRef<CSeq_entry> entry;
126  while((entry = get_next_entry()))
127  {
128  if (entry)
129  out_container << *entry;
130  entry.Reset();
131  }
132  } else {
133  object.GetTypeInfo()->DefaultWriteData(out, object.GetObjectPtr());
134  }
135  bioseq_level--;
136  });
137 
138 
139  TIdSet processedIds, duplicateIds;
140  if (m_DuplicateIdPolicy != eIgnore)
141  {
143  *m_ostream,
144  [&processedIds, &duplicateIds, this](CObjectOStream& out, const CConstObjectInfoMI& member)
145  {
146  out.WriteClassMember(member);
147  const auto& container = *CType<CBioseq::TId>::GetUnchecked(*member);
148  for (auto pId : container) {
149  if (!processedIds.insert(pId).second) {
150  if (m_DuplicateIdPolicy == eThrowImmediately) {
151  s_ReportDuplicateIds({pId});
152  }
153  else {
154  duplicateIds.insert(pId);
155  }
156  }
157  }
158  });
159  }
160 
161  m_ostream->Write(topobject, topobject->GetThisTypeInfo());
162 
163  s_ReportDuplicateIds(duplicateIds);
164 }
165 
static void s_ReportDuplicateIds(const TIdSet &duplicateIds)
USING_SCOPE(edit)
CConstObjectInfoMI –.
Definition: objectiter.hpp:397
CConstObjectInfo –.
Definition: objectinfo.hpp:421
virtual ~CGenBankAsyncWriter()
std::future< void > m_writer_task
void PushNextEntry(CConstRef< CSeq_entry > entry)
void StartWriter(CConstRef< CSerialObject > topobject)
void Write(CConstRef< CSerialObject > topobject)
CMessageQueue< CConstRef< CSeq_entry > > m_write_queue
std::function< CConstRef< CSeq_entry >()> TGetNextFunction
CObjectOStream * m_ostream
CObjectOStream –.
Definition: objostr.hpp:83
CObjectTypeInfo –.
Definition: objectinfo.hpp:94
Definition: set.hpp:45
size_type size() const
Definition: set.hpp:132
bool empty() const
Definition: set.hpp:133
std::ofstream out("events_result.xml")
main entry point for tests
@ eIgnore
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
virtual const CTypeInfo * GetThisTypeInfo(void) const =0
string GetLabel(const CSeq_id &id)
static C * GetUnchecked(const CObjectInfo &object)
Definition: objecttype.hpp:129
CMemberIterator FindMember(const string &memberName) const
Find class member by its name.
void Write(const CConstObjectInfo &object)
Definition: objostr.cpp:593
#define END_SCOPE(ns)
End the previously defined scope.
Definition: ncbistl.hpp:75
#define BEGIN_SCOPE(ns)
Define a new scope.
Definition: ncbistl.hpp:72
Definition: fix_pub.hpp:45
Magic spell ;-) needed for some weird compilers... very empiric.
void SetLocalWriteHook(const CObjectTypeInfo &obj_type_info, CObjectOStream &ostr, _Func _func)
Compare objects pointed to by (smart) pointer.
Definition: ncbiutil.hpp:67
Modified on Sun May 05 05:22:01 2024 by modify_doxy.py rev. 669887