NCBI C++ ToolKit
bmthreadpool.h
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef BMTPOOL__H__INCLUDED__
2 #define BMTPOOL__H__INCLUDED__
3 /*
4 Copyright(c) 2002-2021 Anatoliy Kuznetsov(anatoliy_kuznetsov at yahoo.com)
5 
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9 
10  http://www.apache.org/licenses/LICENSE-2.0
11 
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 
18 For more information please visit: http://bitmagic.io
19 */
20 
21 #include <type_traits>
22 #include <queue>
23 #include <thread>
24 #include <mutex>
25 #include <atomic>
26 #include <condition_variable>
27 
28 #include "bmbuffer.h"
29 #include "bmtask.h"
30 
31 namespace bm
32 {
33 
34 
35 /// Pad 60 bytes so that the final ocupiles 64 bytes (1 cache line)
36 /// @internal
37 struct pad60_struct { char c[60]; };
38 /// Empty padding
39 /// @internal
40 struct pad0_struct { };
41 
42 /**
43  Spin-lock with two-phase acquire (read + cas)
44  padding parameter optionally adds a buffer to avoid CPU cache
45  line contention.
46  TODO: test if padding realy helps in our case
47 
48  Generally spin_lock does not have advantage over std::mutex
49  but in some specific cases like WebAssembly it may be better
50  due no "noexcept" property
51 
52  @ingroup bmtasks
53  */
54 template<class Pad = bm::pad0_struct>
55 class spin_lock
56 {
57 public:
58  spin_lock() noexcept : locked_(0) {}
59 
60  /// Lock the lock
61  void lock() noexcept
62  {
63  while(1) // spin loop
64  {
65  unsigned locked = locked_.load(std::memory_order_relaxed);
66  if (!locked &&
67  locked_.compare_exchange_weak(locked, true,
68  std::memory_order_acquire))
69  break;
70 #if defined(BMSSE2OPT) || defined(BMSSE42OPT) || defined(BMAVX2OPT) || defined(BMAVX512OPT)
71  _mm_pause();
72 #endif
73  } // while
74  }
75 
76  /// Try to acquire the lock, return true if successfull
77  ///
78  bool try_lock() noexcept
79  {
80  unsigned locked = locked_.load(std::memory_order_relaxed);
81  if (!locked &&
82  locked_.compare_exchange_weak(locked, true,
83  std::memory_order_acquire))
84  return true;
85  return false;
86  }
87 
88  /// Unlock the lock
89  void unlock() noexcept
90  {
91  locked_.store(false, std::memory_order_release);
92  }
93 private:
94  spin_lock(const spin_lock&)=delete;
95  spin_lock& operator=(const spin_lock&)=delete;
96 
97 private:
98  std::atomic<unsigned> locked_;
99  Pad p_;
100 };
101 
102 /// Wait for multiple threads to exit
103 ///
104 /// @internal
105 /// @ingroup bmtasks
106 ///
107 template<typename TCont>
108 void join_multiple_threads(TCont& tcont)
109 {
110  typename TCont::iterator it_end = tcont.end();
111  for (typename TCont::iterator it = tcont.begin(); it != it_end; ++it)
112  {
113  if (it->joinable())
114  it->join();
115  } // for it
116 }
117 
118 
119 template<typename QValue, typename Lock> class thread_pool;
120 
121 
122 /**
123  Thread-sync queue with MT access protecion
124 
125  @ingroup bmtasks
126  @internal
127  */
128 template<typename Value, typename Lock>
130 {
131 public:
132  typedef Value value_type;
133  typedef Lock lock_type;
134 
135  /// constructor
136  ///
137  queue_sync() noexcept {}
138 
139  /// Push value to the back of the queue
140  /// @param v - value to put in the queue
141  ///
142  /// @sa push_no_lock
143  ///
144  void push(const value_type& v) //noexcept(bm::is_lock_noexcept<lock_type>::value)
145  {
146  {
147  std::lock_guard<lock_type> lg(dq_lock_);
148  data_queue_.push(v);
149  }
150  queue_push_cond_.notify_one(); // noexcept
151  }
152 
153  /// Push value to the back of the queue without lock protection
154  /// It is assumed that processing did not start and we are just staging
155  /// the batch
156  ///
157  /// @param v - value to put in the queue
158  /// @sa push
159  ///
160  void push_no_lock(const value_type& v)
161  {
162  data_queue_.push(v);
163  }
164 
165 
166  /// Extract value
167  /// @param v - [out] value returned
168  /// @return true if extracted
169  ///
171  {
172  std::lock_guard<lock_type> guard(dq_lock_);
173  if (data_queue_.empty())
174  return false;
175  v = data_queue_.front();
176  data_queue_.pop();
177  return true;
178  }
179 
180  /// @return true if empty
181  bool empty() const //noexcept(bm::is_lock_noexcept<lock_type>::value)
182  {
183  std::lock_guard<lock_type> guard(dq_lock_);
184  return data_queue_.empty();
185  }
186 
187  /// lock the queue access
188  /// @sa push_no_lock, unlock
189  void lock() noexcept(bm::is_lock_noexcept<lock_type>::value)
190  { dq_lock_.lock(); }
191 
192  /// Try to lock the queue exclusively
193  ///
195  { return dq_lock_.try_lock(); }
196 
197  /// unlock the queue access
198  /// @sa push_no_lock, lock
200  {
201  dq_lock_.unlock();
202  // lock-unlock is done to protect bulk push, need to wake up
203  // all waiting workers
204  queue_push_cond_.notify_all(); // noexcept
205  }
206 
207  template<typename QV, typename L> friend class bm::thread_pool;
208 
209 protected:
210  typedef std::queue<value_type> queue_type;
211 
212 private:
213  queue_sync(const queue_sync&) = delete;
214  queue_sync& operator=(const queue_sync&) = delete;
215 private:
216  queue_type data_queue_; ///< queue object
217  mutable lock_type dq_lock_; ///< lock for queue
218 
219  // signal structure for wait on empty queue
220 protected:
221  mutable std::mutex signal_mut_; ///< signal mutex for q submissions
222  std::condition_variable queue_push_cond_; ///< mutex paired conditional
223 };
224 
225 
226 /**
227  Thread pool with custom (thread safe) queue
228 
229  Thread pool implements a busy-wait task stealing
230  design pattern
231 
232  QValue - task queue value parameter
233  Lock - locking protection type (like std::mutex or spinlock)
234 
235  @ingroup bmtasks
236 */
237 template<typename QValue, typename Lock>
239 {
240 public:
241  typedef QValue value_type;
242  typedef Lock lock_type;
244 
245  /**
246  Stop modes for threads:
247  0 - keep running/waiting for jobs
248  1 - wait for empty task queue then stop threads
249  2 - stop threads now even if there are pending tasks
250  */
252  {
253  no_stop = 0, ///< keep spinning on busy-wait
254  stop_when_done = 1, ///< stop if tsak queue is empty
255  stop_now = 2 ///< stop right now
256  };
257 
258 public:
260  : stop_flag_(sm)
261  {}
262 
263  ~thread_pool();
264 
265  /** Setup the criteria for threads shutdown
266  Also notifies all threads on a new directive
267  @param sm - stop mode
268  */
269  void set_stop_mode(stop_mode sm) noexcept;
270 
271  /**
272  Request an immediate stop of all threads in the pool
273  */
274  void stop() noexcept { set_stop_mode(stop_now); }
275 
276  /**
277  Start thread pool worker threads.
278  @param tcount - number of threads to start
279  */
280  void start(unsigned tcount);
281 
282  /**
283  Wait for threads to finish (or stop if stop was requested)
284  */
285  void join();
286 
287  /**
288  Conditional spin-wait for the queue to empty
289  (Important note: tasks may still be running, but the queue is empty)
290  */
291  void wait_empty_queue();
292 
293  /// Get access to the job submission queue
294  ///
295  queue_type& get_job_queue() noexcept { return job_queue_; }
296 
297  /// Return if thread pool is stopped by a request
298  int is_stopped() const noexcept
299  { return stop_flag_.load(std::memory_order_relaxed); }
300 
301 protected:
302 
303  /// Internal worker wrapper with busy-wait spin loop
304  /// making pthread-like call for tasks
305  ///
306  void worker_func();
307 
308 private:
309  thread_pool(const thread_pool&)=delete;
311 
312 private:
313  queue_type job_queue_; ///< queue (thread sync)
314  std::vector<std::thread> thread_vect_; ///< threads servicing queue
315  std::atomic_int stop_flag_{0}; ///< stop flag to all threads
316 
317  // notification channel for results wait
318  mutable std::mutex task_done_mut_; ///< signal mutex for task done
319  std::condition_variable task_done_cond_;///< mutex paired conditional
320 
321 };
322 
323 /**
324  Utility class to submit task batch to the running thread pool
325  and optionally wait for it getting done
326 
327  @ingroup bmtasks
328  */
329 template<typename TPool>
331 {
332 public:
333  typedef TPool thread_pool_type;
335 
336 public:
338 
339  static
340  void run(thread_pool_type& tpool,
341  bm::task_batch_base& tasks,
342  bool wait_for_batch);
343 
344  /**
345  Check if all batch jobs in the specified interval are done
346  Spin wait if not.
347  */
348  static
350  bm::task_batch_base& tasks,
353 private:
356 };
357 
358 
359 // ========================================================================
360 // thread_pool<> implementations
361 // ========================================================================
362 
363 
364 // -----------------------------------------------------------------------
365 
366 template<typename QValue, typename Lock>
368 {
369  int is_stop = stop_flag_;
370  if (!is_stop) // finish the outstanding jobs and close threads
371  set_stop_mode(stop_when_done);
372  join();
373 }
374 
375 // -----------------------------------------------------------------------
376 
377 template<typename QValue, typename Lock>
379 {
380  stop_flag_ = sm;
381  job_queue_.queue_push_cond_.notify_all(); // this is noexcept
382 }
383 
384 // -----------------------------------------------------------------------
385 
386 template<typename QValue, typename Lock>
387 void thread_pool<QValue, Lock>::start(unsigned tcount)
388 {
389  int is_stop = stop_flag_.load(std::memory_order_relaxed);
390  if (is_stop == stop_now) // immediate stop requested
391  return;
392  // TODO: consider lock protect of thread_vect_ member
393  for(unsigned i = 0;i < tcount; ++i)
394  {
395  thread_vect_.emplace_back(
396  std::thread(&thread_pool::worker_func,this));
397  } // for
398 }
399 
400 // -----------------------------------------------------------------------
401 
402 template<typename QValue, typename Lock>
404 {
405  bm::join_multiple_threads(thread_vect_);
406  thread_vect_.resize(0);
407 }
408 
409 // -----------------------------------------------------------------------
410 
411 template<typename QValue, typename Lock>
413 {
414  const std::chrono::duration<int, std::milli> wait_duration(20);
415  while(1)
416  {
417  if (job_queue_.empty())
418  break;
419  std::cv_status wait_res;
420  {
421  std::unique_lock<std::mutex> lk(task_done_mut_);
422  wait_res = task_done_cond_.wait_for(lk, wait_duration);
423  }
424  if (wait_res == std::cv_status::timeout)
425  {
426  std::this_thread::yield();
427  int is_stop = is_stopped();
428  if (is_stop == stop_now) // immediate stop requested
429  return;
430  }
431  } // while
432 }
433 
434 // -----------------------------------------------------------------------
435 
436 template<typename QValue, typename Lock>
438 {
439  const std::chrono::duration<int, std::milli> wait_duration(10);
440  while(1)
441  {
442  int is_stop = is_stopped();
443  if (is_stop == stop_now) // immediate stop requested
444  break;
445 
447  if (job_queue_.try_pop(task_descr))
448  {
449  BM_ASSERT(task_descr->done == 0);
450  try
451  {
453  }
454  catch (...)
455  {
456  task_descr->err_code = -1;
457  }
458  task_descr->done.store(1, std::memory_order_release);
459  task_done_cond_.notify_one();
460  continue;
461  }
462  // queue appears to be empty, check if requested to stop
463  //
464  is_stop = is_stopped();
465  if (is_stop)
466  return;
467 
468  // enter a temporal condition wait
469  // notifications are treated as unreliable re-verified
470  // via spin over the poll of the queue
471  std::cv_status wait_res;
472  {
473  std::unique_lock<std::mutex> lk(job_queue_.signal_mut_);
474  wait_res =
475  job_queue_.queue_push_cond_.wait_for(lk, wait_duration);
476  }
477  if (wait_res == std::cv_status::timeout)
478  {
479  is_stop = is_stopped();
480  if (is_stop == stop_now) // immediate stop requested
481  return;
482  std::this_thread::yield();
483  }
484  } // while
485  return;
486 }
487 
488 // ========================================================================
489 // thread_pool_executor<> implementations
490 // ========================================================================
491 
492 
493 template<typename TPool>
495  thread_pool_type& tpool,
497  bool wait_for_batch)
498 {
499  typename thread_pool_type::queue_type& qu = tpool.get_job_queue();
500 
502  for (task_batch_base::size_type i = 0; i < batch_size; ++i)
503  {
504  bm::task_descr* tdescr = task_batch.get_task(i);
505  tdescr->argp = tdescr; // restore the self referenece
506  BM_ASSERT(!tdescr->done);
507 
508  if (tdescr->flags != bm::task_descr::no_flag) // barrier task ?
509  {
510  if (i) // wait until all previously scheduled tasks are done
511  {
512  tpool.wait_empty_queue();
513  wait_for_batch_done(tpool, task_batch, 0, batch_size - 1);
514  }
515 
516  // run the barrier proc on the curent thread
517  tdescr->err_code = tdescr->func(tdescr->argp);
518  tdescr->done.store(1, std::memory_order_release);
519 
520  // re-read the batch size, if barrier added more tasks
521  task_batch_base::size_type new_batch_size = task_batch.size();
522  if (new_batch_size != batch_size)
523  batch_size = new_batch_size;
524  continue;
525  }
526 
527  qu.push(tdescr); // locked push to the thread queue
528 
529  auto is_stop = tpool.is_stopped();
530  if (is_stop == thread_pool_type::stop_now)
531  break; // thread pool stop requested
532 
533  } // for
534 
535 
536  // implicit wait barrier for all tasks
537  if (wait_for_batch && batch_size)
538  {
539  tpool.wait_empty_queue();
540  wait_for_batch_done(tpool, task_batch, 0, batch_size - 1);
541  }
542 }
543 
544 
545 // -----------------------------------------------------------------------
546 
547 template<typename TPool>
549  thread_pool_type& tpool,
550  bm::task_batch_base& tasks,
553 {
554  BM_ASSERT(from_idx <= to_idx);
555  BM_ASSERT(to_idx < tasks.size());
556 
557  for (task_batch_base::size_type i = from_idx; i <= to_idx; ++i)
558  {
559  const bm::task_descr* tdescr = tasks.get_task(i);
560  auto done = tdescr->done.load(std::memory_order_consume);
561  while (!done)
562  {
563  auto is_stop = tpool.is_stopped();
564  if (is_stop == thread_pool_type::stop_now)
565  return; // thread pool stopped, jobs will not be done
566  std::this_thread::yield();
567  // TODO: subscribe to a conditional wait for job done in tpool
568  done = tdescr->done.load(std::memory_order_acquire);
569  } // while
570  } // for
571 
572 }
573 
574 // -----------------------------------------------------------------------
575 
576 
577 } // bm
578 
579 #endif
#define BM_ASSERT
Definition: bmdef.h:139
Task definitions for parallel programming with BitMagic.
Thread-sync queue with MT access protecion.
Definition: bmthreadpool.h:130
std::queue< value_type > queue_type
Definition: bmthreadpool.h:210
queue_sync(const queue_sync &)=delete
bool empty() const
Definition: bmthreadpool.h:181
bool try_pop(value_type &v)
Extract value.
Definition: bmthreadpool.h:170
void push(const value_type &v)
Push value to the back of the queue.
Definition: bmthreadpool.h:144
queue_type data_queue_
queue object
Definition: bmthreadpool.h:216
lock_type dq_lock_
lock for queue
Definition: bmthreadpool.h:217
void lock() noexcept(bm::is_lock_noexcept< lock_type >::value)
lock the queue access
Definition: bmthreadpool.h:189
void push_no_lock(const value_type &v)
Push value to the back of the queue without lock protection It is assumed that processing did not sta...
Definition: bmthreadpool.h:160
queue_sync & operator=(const queue_sync &)=delete
std::condition_variable queue_push_cond_
mutex paired conditional
Definition: bmthreadpool.h:222
std::mutex signal_mut_
signal mutex for q submissions
Definition: bmthreadpool.h:221
queue_sync() noexcept
constructor
Definition: bmthreadpool.h:137
bool try_lock() noexcept(bm::is_lock_noexcept< lock_type >::value)
Try to lock the queue exclusively.
Definition: bmthreadpool.h:194
void unlock() noexcept(bm::is_lock_noexcept< lock_type >::value)
unlock the queue access
Definition: bmthreadpool.h:199
Spin-lock with two-phase acquire (read + cas) padding parameter optionally adds a buffer to avoid CPU...
Definition: bmthreadpool.h:56
spin_lock() noexcept
Definition: bmthreadpool.h:58
spin_lock & operator=(const spin_lock &)=delete
bool try_lock() noexcept
Try to acquire the lock, return true if successfull.
Definition: bmthreadpool.h:78
void unlock() noexcept
Unlock the lock.
Definition: bmthreadpool.h:89
spin_lock(const spin_lock &)=delete
std::atomic< unsigned > locked_
Definition: bmthreadpool.h:98
void lock() noexcept
Lock the lock.
Definition: bmthreadpool.h:61
Interface definition (base class) for a group of tasks (batch)
Definition: bmtask.h:118
virtual size_type size() const =0
Return size of batch.
unsigned size_type
Definition: bmtask.h:120
virtual bm::task_descr * get_task(size_type task_idx)=0
Get task by index in the batch.
Basic implementation for collection of tasks for parallel execution.
Definition: bmtask.h:140
virtual size_type size() const noexcept
task_batch_base intreface implementation
Definition: bmtask.h:158
virtual bm::task_descr * get_task(size_type task_idx)
Get task by index in the batch.
Definition: bmtask.h:160
Utility class to submit task batch to the running thread pool and optionally wait for it getting done...
Definition: bmthreadpool.h:331
static void wait_for_batch_done(thread_pool_type &tpool, bm::task_batch_base &tasks, task_batch_base::size_type from_idx, task_batch_base::size_type to_idx)
Check if all batch jobs in the specified interval are done Spin wait if not.
Definition: bmthreadpool.h:548
thread_pool_executor(const thread_pool_executor &)=delete
static void run(thread_pool_type &tpool, bm::task_batch_base &tasks, bool wait_for_batch)
Definition: bmthreadpool.h:494
task_batch_base::size_type size_type
Definition: bmthreadpool.h:334
thread_pool_executor & operator=(const thread_pool_executor &)=delete
Thread pool with custom (thread safe) queue.
Definition: bmthreadpool.h:239
void stop() noexcept
Request an immediate stop of all threads in the pool.
Definition: bmthreadpool.h:274
queue_type job_queue_
queue (thread sync)
Definition: bmthreadpool.h:313
std::condition_variable task_done_cond_
mutex paired conditional
Definition: bmthreadpool.h:319
thread_pool(const thread_pool &)=delete
std::vector< std::thread > thread_vect_
threads servicing queue
Definition: bmthreadpool.h:314
queue_type & get_job_queue() noexcept
Get access to the job submission queue.
Definition: bmthreadpool.h:295
bm::queue_sync< QValue, lock_type > queue_type
Definition: bmthreadpool.h:243
void join()
Wait for threads to finish (or stop if stop was requested)
Definition: bmthreadpool.h:403
thread_pool(stop_mode sm=no_stop) noexcept
Definition: bmthreadpool.h:259
void worker_func()
Internal worker wrapper with busy-wait spin loop making pthread-like call for tasks.
Definition: bmthreadpool.h:437
int is_stopped() const noexcept
Return if thread pool is stopped by a request.
Definition: bmthreadpool.h:298
void start(unsigned tcount)
Start thread pool worker threads.
Definition: bmthreadpool.h:387
thread_pool & operator=(const thread_pool &)=delete
void wait_empty_queue()
Conditional spin-wait for the queue to empty (Important note: tasks may still be running,...
Definition: bmthreadpool.h:412
stop_mode
Stop modes for threads: 0 - keep running/waiting for jobs 1 - wait for empty task queue then stop thr...
Definition: bmthreadpool.h:252
@ no_stop
keep spinning on busy-wait
Definition: bmthreadpool.h:253
@ stop_when_done
stop if tsak queue is empty
Definition: bmthreadpool.h:254
@ stop_now
stop right now
Definition: bmthreadpool.h:255
std::atomic_int stop_flag_
stop flag to all threads
Definition: bmthreadpool.h:315
void set_stop_mode(stop_mode sm) noexcept
Setup the criteria for threads shutdown Also notifies all threads on a new directive.
Definition: bmthreadpool.h:378
std::mutex task_done_mut_
signal mutex for task done
Definition: bmthreadpool.h:318
static char * join(int argc, char *argv[], const char sep[])
Definition: dbpivot.c:359
int i
#include<zmmintrin.h>
Definition: bm.h:78
void join_multiple_threads(TCont &tcont)
Wait for multiple threads to exit.
Definition: bmthreadpool.h:108
const GenericPointer< typename T::ValueType > T2 value
Definition: pointer.h:1227
GenericValue< UTF8<> > Value
GenericValue with UTF8 encoding.
Definition: document.h:2107
static void _mm_pause()
Definition: sse2neon.h:5037
"noexcept" traits detection for T::lock()
Definition: bmtask.h:213
Empty padding.
Definition: bmthreadpool.h:40
Pad 60 bytes so that the final ocupiles 64 bytes (1 cache line)
Definition: bmthreadpool.h:37
BitMagic task with a captured function.
Definition: bmtask.h:62
void * argp
arg pointer
Definition: bmtask.h:72
std::atomic_bool done
0 - pending
Definition: bmtask.h:77
@ no_flag
no flag specified
Definition: bmtask.h:65
bm::id64_t flags
task flags to designate barriers
Definition: bmtask.h:75
task_function_t func
captured function callback
Definition: bmtask.h:71
int err_code
error code
Definition: bmtask.h:76
done
Definition: token1.c:1
#define const
Definition: zconf.h:232
Modified on Tue Apr 30 06:41:35 2024 by modify_doxy.py rev. 669887