NCBI C++ ToolKit
async_task.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef __REMOTE_APP_ASYNC_TASK__HPP
2 #define __REMOTE_APP_ASYNC_TASK__HPP
3 
4 /* $Id: async_task.hpp 100706 2023-08-31 20:10:36Z lavr $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Author: Rafael Sadyrov
30  *
31  */
32 
33 #include <list>
34 
35 #include <corelib/ncbimtx.hpp>
36 #include <corelib/ncbithr.hpp>
37 
39 
40 template <class TTask>
42 {
43  // Context holds all data that is shared between Scheduler and Executor.
44  // Also, it actually implements both these actors
45  class CContext
46  {
47  public:
48  CContext(int s, int m)
49  : sleep(s > 1 ? s : 1), // Sleep at least one second between executions
50  max_attempts(m),
51  stop(false)
52  {}
53 
54  bool Enabled() const { return max_attempts > 0; }
55 
56  bool SchedulerImpl(TTask);
57  void ExecutorImpl();
58 
60  {
61  CMutexGuard guard(lock);
62  stop = true;
63  cond.SignalSome();
64  }
65 
66  private:
67  typedef list<pair<int, TTask>> TTasks;
68  typedef typename TTasks::iterator TTasks_I;
69 
71 
76  const unsigned sleep;
77  const int max_attempts;
78  bool stop;
79  };
80 
81  // Executor runs async tasks (in a separate thread)
82  class CExecutor
83  {
84  struct SThread : public CThread
85  {
86  SThread(CContext& context, string thread_name)
87  : m_Context(context),
88  m_ThreadName(std::move(thread_name))
89  {}
90 
91  void Stop()
92  {
93  try {
94  if (Discard()) return;
95 
97  Join();
98  } STD_CATCH_ALL("Exception in CExecutor::SThread::Stop()")
99  }
100 
101  private:
102  // This is the only method called in a different thread
103  void* Main(void)
104  {
107  return NULL;
108  }
109 
111  const string m_ThreadName;
112  };
113 
114  public:
115  CExecutor(CContext& context, string thread_name)
116  : m_Thread(context.Enabled() ? new SThread(context, std::move(thread_name)) : nullptr)
117  {}
118 
119  void Start() { if (m_Thread) m_Thread->Run(); }
121 
122  private:
123  SThread* m_Thread;
124  };
125 
126 public:
127  // Scheduler gives tasks to Executor
129  {
130  public:
131  bool operator()(TTask task)
132  {
133  return m_Context.SchedulerImpl(task);
134  }
135 
136  private:
138 
139  CContext& m_Context;
140 
141  friend class CAsyncTaskProcessor;
142  };
143 
144  CAsyncTaskProcessor(int sleep, int max_attempts, string thread_name);
145 
146  CScheduler& GetScheduler() { return m_Scheduler; }
148 
149 private:
150  CContext m_Context;
151  CScheduler m_Scheduler;
152  CExecutor m_Executor;
153 };
154 
155 template <class TTask>
157 {
158  if (Enabled()) {
159  CMutexGuard guard(lock);
160  tasks.emplace_back(0, task);
161  cond.SignalSome();
162  return true;
163  }
164 
165  return false;
166 }
167 
168 template <class TTask>
170 {
171  for (;;) {
172  auto backlog_end = backlog.end();
173 
174  // If stop was requested
175  if (!FillBacklog(backlog_end)) {
176  return;
177  }
178 
179  // Execute tasks from the backlog
180  auto it = backlog.begin();
181  while (it != backlog_end) {
182  if (it->second(++it->first, max_attempts)) {
183  backlog.erase(it++);
184  } else {
185  ++it;
186  }
187  }
188  }
189 }
190 
191 template <class TTask>
193 {
194  CMutexGuard guard(lock);
195 
196  while (!stop) {
197  // If there are some new tasks to execute
198  if (!tasks.empty()) {
199  // Move them to the backlog, these only will be processed this time
200  backlog_end = backlog.begin();
201  backlog.splice(backlog_end, tasks);
202  return true;
203 
204  // If there is nothing to do, wait for a signal
205  } else if (backlog.empty()) {
206  while (!cond.WaitForSignal(lock));
207 
208  // No backlog processing if there is a signal
209  } else if (!cond.WaitForSignal(lock, sleep)) {
210  return true;
211  }
212  }
213 
214  return false;
215 }
216 template <class TTask>
217 inline CAsyncTaskProcessor<TTask>::CAsyncTaskProcessor(int sleep, int max_attempts, string thread_name)
218  : m_Context(sleep, max_attempts),
220  m_Executor(m_Context, std::move(thread_name))
221 {
222 }
223 
225 
226 #endif
list< pair< int, TTask > > TTasks
Definition: async_task.hpp:67
CConditionVariable cond
Definition: async_task.hpp:73
bool FillBacklog(TTasks_I &)
Definition: async_task.hpp:192
CExecutor(CContext &context, string thread_name)
Definition: async_task.hpp:115
CScheduler(CContext &context)
Definition: async_task.hpp:137
CScheduler & GetScheduler()
Definition: async_task.hpp:146
CAsyncTaskProcessor(int sleep, int max_attempts, string thread_name)
Definition: async_task.hpp:217
CScheduler m_Scheduler
Definition: async_task.hpp:151
CMutex –.
Definition: ncbimtx.hpp:749
#define false
Definition: bool.h:36
#define NULL
Definition: ncbistd.hpp:225
#define STD_CATCH_ALL(message)
Standard handling of "exception"-derived exceptions; catches non-standard exceptions and generates "u...
Definition: ncbiexpt.hpp:570
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
bool Run(TRunMode flags=fRunDefault)
Run the thread.
Definition: ncbithr.cpp:724
void SignalSome(void)
Wake at least one of the threads that are currently waiting on this condition variable (if any thread...
Definition: ncbimtx.cpp:2594
bool Discard(void)
If the thread has not been Run() yet, then schedule the thread object for destruction,...
Definition: ncbithr.cpp:923
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
Definition: ncbithr.cpp:958
void Join(void **exit_data=0)
Wait for the thread termination.
Definition: ncbithr.cpp:863
#define nullptr
Definition: ncbimisc.hpp:45
Multi-threading – mutexes; rw-locks; semaphore.
Multi-threading – classes, functions, and features.
void * Main(void)
Derived (user-created) class must provide a real thread function.
Definition: async_task.hpp:103
SThread(CContext &context, string thread_name)
Definition: async_task.hpp:86
static CS_CONTEXT * context
Definition: will_convert.c:21
Modified on Wed May 22 11:35:09 2024 by modify_doxy.py rev. 669887