NCBI C++ ToolKit
async_writers.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef _ASYNC_WRITERS_HPP_
2 #define _ASYNC_WRITERS_HPP_
3 
4 /* $Id: async_writers.hpp 99290 2023-03-07 16:08:06Z foleyjp $
5 * ===========================================================================
6 *
7 * PUBLIC DOMAIN NOTICE
8 * National Center for Biotechnology Information
9 *
10 * This software/database is a "United States Government Work" under the
11 * terms of the United States Copyright Act. It was written as part of
12 * the author's official duties as a United States Government employee and
13 * thus cannot be copyrighted. This software/database is freely available
14 * to the public for use. The National Library of Medicine and the U.S.
15 * Government have not placed any restriction on its use or reproduction.
16 *
17 * Although all reasonable efforts have been taken to ensure the accuracy
18 * and reliability of the software and data, the NLM and the U.S.
19 * Government do not and cannot warrant the performance or results that
20 * may be obtained by using this software or data. The NLM and the U.S.
21 * Government disclaim all warranties, express or implied, including
22 * warranties of performance, merchantability or fitness for any particular
23 * purpose.
24 *
25 * Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Authors: Sergiy Gotvyanskyy
30 *
31 * File Description:
32 * Asynchronous writers
33 *
34 */
35 
36 #include <functional>
37 #include <future>
38 #include <util/message_queue.hpp>
39 #include <corelib/ncbiobj.hpp>
40 
42 
43 class CObjectOStream;
44 class CSerialObject;
45 
47 
48 class CSeq_entry;
49 
50 template<typename _token>
52 {
53 public:
54  using TToken = _token;
55  using TFuture = std::future<TToken>;
56  using TProcessFunction = std::function<void(TToken&)>;
57  using TPullNextFunction = std::function<bool(TToken&)>;
60  virtual ~TAsyncPipeline() {}
61 
62  void SetDepth(size_t n) {
63  m_queue.Trottle().m_limit = n;
64  }
65 
66  void PostData(TToken data, TProcessFunction process_func)
67  {
68  TFuture fut = std::async(std::launch::async, [process_func](TToken _data) -> TToken
69  {
70  process_func(_data);
71  return _data;
72  },
73  data);
74  m_queue.push_back(std::move(fut));
75  }
76 
77  void PostException(std::exception_ptr _excp_ptr)
78  {
79  m_queue.clear();
80  std::promise<TToken> exc_prom;
81  std::future<TToken> fut = exc_prom.get_future();
82  exc_prom.set_exception(_excp_ptr);
83  m_queue.push_back(std::move(fut));
84  }
85 
86  void Complete()
87  {
88  m_queue.push_back({});
89  }
90 
91 
92  void cancel() {
93  m_queue.cancel();
94  }
95 
96  void clear() {
97  m_queue.clear();
98  }
99 
100  auto pop_front()
101  {
102  return m_queue.pop_front();
103  }
104 
105  std::future<void> make_producer_task(TPullNextFunction pull_next_token, TProcessFunction process_func)
106  {
107  return std::async(std::launch::async, [this, pull_next_token, process_func]()
108  {
109  try
110  {
111  TToken token;
112  while ((pull_next_token(token)))
113  {
114  PostData(token, process_func);
115  }
116  Complete();
117  }
118  catch(...)
119  {
120  PostException(std::current_exception());
121  }
122  }
123  );
124  }
125 
126 protected:
128 };
129 
130 
132 {
133 public:
137  eReportAll
138  };
139 
140  CGenBankAsyncWriter(CObjectOStream* o_stream, EDuplicateIdPolicy policy=eReportAll);
141  virtual ~CGenBankAsyncWriter();
142 
143  // write any object traditionally
144  void Write(CConstRef<CSerialObject> topobject);
145 
146  // write genbankset with multiple entries, could be seq_submit, seq_entry, bioseq_set
147  // or any other object, get_next_entry will be called only for genbankset's
148  using TGetNextFunction = std::function<CConstRef<CSeq_entry>()>;
149  void Write(CConstRef<CSerialObject> topobject, TGetNextFunction get_next_entry);
150 
151  // the same as above, but write asyncronously
152  void StartWriter(CConstRef<CSerialObject> topobject);
153  void PushNextEntry(CConstRef<CSeq_entry> entry);
154  void FinishWriter();
155  void CancelWriter();
156 
157 protected:
158  CObjectOStream* m_ostream = nullptr;
161  std::future<void> m_writer_task;
162 
163 };
164 
165 template<class _Token>
167 {
168 public:
169  using TToken = _Token;
173 
175 
176  void SetDepth(size_t depth) {
178  }
179 
181  TPullNextFunction pull_next_token,
182  TProcessFunction process_func = {},
183  TProcessFunction chain_func = {})
184  {
185  auto pull_next_task = m_pipeline.make_producer_task(pull_next_token, process_func);
186 
187  TGetNextFunction get_next_entry = [this, chain_func]() -> CConstRef<CSeq_entry>
188  {
189  auto token_future = m_pipeline.pop_front();
190  if (!token_future.valid()) {
191  return {};
192  }
193 
194  TToken token;
195  try {
196  token = token_future.get(); // this can throw an exception that was caught within the thread
197  if (chain_func) {
198  chain_func(token);
199  }
200  }
201  catch(...) {
202  m_pipeline.cancel();
203  throw;
204  }
205 
206  return token;
207  };
208 
209  Write(topobject, get_next_entry);
210  }
211 
213  TPullNextFunction pull_next_token,
214  TProcessFunction process_func = {},
215  TProcessFunction chain_func = {})
216  {
217  StartWriter(topobject);
218  try
219  {
220  TToken token;
221  while ((pull_next_token(token)))
222  {
223  if (process_func)
224  process_func(token);
225 
226  PushNextEntry(token);
227 
228  if (chain_func)
229  chain_func(token);
230  }
231  FinishWriter();
232  }
233  catch(...)
234  {
235  CancelWriter();
236  throw;
237  }
238  }
239 
241  TPullNextFunction pull_next_token,
242  TProcessFunction process_func = {},
243  TProcessFunction chain_func = {})
244  {
245  auto get_next_entry = [pull_next_token, process_func, chain_func]() -> CConstRef<CSeq_entry>
246  {
247  TToken token;
248 
249  if (!pull_next_token(token))
250  return {};
251 
252  if (process_func)
253  process_func(token);
254 
255  if (chain_func)
256  chain_func(token);
257 
258  return token;
259  };
260 
261  Write(topobject, get_next_entry);
262  }
263 
264 
265 protected:
267 };
268 
269 
272 
273 #endif
typename _Pipeline::TPullNextFunction TPullNextFunction
void WriteAsyncMT(CConstRef< CSerialObject > topobject, TPullNextFunction pull_next_token, TProcessFunction process_func={}, TProcessFunction chain_func={})
void WriteAsyncST(CConstRef< CSerialObject > topobject, TPullNextFunction pull_next_token, TProcessFunction process_func={}, TProcessFunction chain_func={})
void WriteAsync2T(CConstRef< CSerialObject > topobject, TPullNextFunction pull_next_token, TProcessFunction process_func={}, TProcessFunction chain_func={})
void SetDepth(size_t depth)
typename _Pipeline::TProcessFunction TProcessFunction
CGenBankAsyncWriter(CObjectOStream *o_stream, EDuplicateIdPolicy policy=eReportAll)
std::future< void > m_writer_task
void PushNextEntry(CConstRef< CSeq_entry > entry)
EDuplicateIdPolicy m_DuplicateIdPolicy
void StartWriter(CConstRef< CSerialObject > topobject)
void Write(CConstRef< CSerialObject > topobject)
CMessageQueue< CConstRef< CSeq_entry > > m_write_queue
std::function< CConstRef< CSeq_entry >()> TGetNextFunction
CObjectOStream –.
Definition: objostr.hpp:83
Definition: Seq_entry.hpp:56
Base class for all serializable objects.
Definition: serialbase.hpp:150
void SetDepth(size_t n)
std::function< void(TToken &)> TProcessFunction
void PostData(TToken data, TProcessFunction process_func)
void PostException(std::exception_ptr _excp_ptr)
std::future< TToken > TFuture
std::future< void > make_producer_task(TPullNextFunction pull_next_token, TProcessFunction process_func)
TProcessingQueue m_queue
std::function< bool(TToken &)> TPullNextFunction
virtual ~TAsyncPipeline()
_Trottle & Trottle()
value_type pop_front()
void push_back(value_type msg)
static unsigned char depth[2 *(256+1+29)+1]
#define bool
Definition: bool.h:34
char data[12]
Definition: iconv.c:80
void Write(CObjectOStream &out, TConstObjectPtr object, const CTypeRef &type)
Definition: serial.cpp:55
#define END_SCOPE(ns)
End the previously defined scope.
Definition: ncbistl.hpp:75
#define BEGIN_SCOPE(ns)
Define a new scope.
Definition: ncbistl.hpp:72
#define NCBI_XOBJWRITE_EXPORT
Definition: ncbi_export.h:1347
yy_size_t n
Magic spell ;-) needed for some weird compilers... very empiric.
Portable reference counted smart and weak pointers using CWeakRef, CRef, CObject and CObjectEx.
TToken
Definition: tokens.hpp:38
Modified on Wed Sep 04 15:01:55 2024 by modify_doxy.py rev. 669887