1 #ifndef BMTPOOL__H__INCLUDED__
2 #define BMTPOOL__H__INCLUDED__
21 #include <type_traits>
26 #include <condition_variable>
54 template<
class Pad = bm::pad0_struct>
65 unsigned locked =
locked_.load(std::memory_order_relaxed);
67 locked_.compare_exchange_weak(locked,
true,
68 std::memory_order_acquire))
70 #if defined(BMSSE2OPT) || defined(BMSSE42OPT) || defined(BMAVX2OPT) || defined(BMAVX512OPT)
80 unsigned locked =
locked_.load(std::memory_order_relaxed);
82 locked_.compare_exchange_weak(locked,
true,
83 std::memory_order_acquire))
91 locked_.store(
false, std::memory_order_release);
107 template<
typename TCont>
110 typename TCont::iterator it_end = tcont.end();
111 for (
typename TCont::iterator it = tcont.begin(); it != it_end; ++it)
119 template<
typename QValue,
typename Lock>
class thread_pool;
128 template<
typename Value,
typename Lock>
147 std::lock_guard<lock_type> lg(
dq_lock_);
172 std::lock_guard<lock_type> guard(
dq_lock_);
183 std::lock_guard<lock_type> guard(
dq_lock_);
237 template<
typename QValue,
typename Lock>
280 void start(
unsigned tcount);
299 {
return stop_flag_.load(std::memory_order_relaxed); }
329 template<
typename TPool>
342 bool wait_for_batch);
366 template<
typename QValue,
typename Lock>
369 int is_stop = stop_flag_;
371 set_stop_mode(stop_when_done);
377 template<
typename QValue,
typename Lock>
381 job_queue_.queue_push_cond_.notify_all();
386 template<
typename QValue,
typename Lock>
389 int is_stop = stop_flag_.load(std::memory_order_relaxed);
390 if (is_stop == stop_now)
393 for(
unsigned i = 0;
i < tcount; ++
i)
395 thread_vect_.emplace_back(
402 template<
typename QValue,
typename Lock>
406 thread_vect_.resize(0);
411 template<
typename QValue,
typename Lock>
414 const std::chrono::duration<int, std::milli> wait_duration(20);
417 if (job_queue_.empty())
419 std::cv_status wait_res;
421 std::unique_lock<std::mutex> lk(task_done_mut_);
422 wait_res = task_done_cond_.wait_for(lk, wait_duration);
424 if (wait_res == std::cv_status::timeout)
426 std::this_thread::yield();
427 int is_stop = is_stopped();
428 if (is_stop == stop_now)
436 template<
typename QValue,
typename Lock>
439 const std::chrono::duration<int, std::milli> wait_duration(10);
442 int is_stop = is_stopped();
443 if (is_stop == stop_now)
459 task_done_cond_.notify_one();
464 is_stop = is_stopped();
471 std::cv_status wait_res;
473 std::unique_lock<std::mutex> lk(job_queue_.signal_mut_);
475 job_queue_.queue_push_cond_.wait_for(lk, wait_duration);
477 if (wait_res == std::cv_status::timeout)
479 is_stop = is_stopped();
480 if (is_stop == stop_now)
482 std::this_thread::yield();
493 template<
typename TPool>
499 typename thread_pool_type::queue_type& qu = tpool.get_job_queue();
505 tdescr->
argp = tdescr;
512 tpool.wait_empty_queue();
513 wait_for_batch_done(tpool,
task_batch, 0, batch_size - 1);
518 tdescr->
done.store(1, std::memory_order_release);
522 if (new_batch_size != batch_size)
523 batch_size = new_batch_size;
529 auto is_stop = tpool.is_stopped();
530 if (is_stop == thread_pool_type::stop_now)
537 if (wait_for_batch && batch_size)
539 tpool.wait_empty_queue();
540 wait_for_batch_done(tpool,
task_batch, 0, batch_size - 1);
547 template<
typename TPool>
560 auto done = tdescr->
done.load(std::memory_order_consume);
563 auto is_stop = tpool.is_stopped();
564 if (is_stop == thread_pool_type::stop_now)
566 std::this_thread::yield();
568 done = tdescr->
done.load(std::memory_order_acquire);
Task definitions for parallel programming with BitMagic.
Thread-sync queue with MT access protecion.
std::queue< value_type > queue_type
queue_sync(const queue_sync &)=delete
bool try_pop(value_type &v)
Extract value.
void push(const value_type &v)
Push value to the back of the queue.
queue_type data_queue_
queue object
lock_type dq_lock_
lock for queue
void lock() noexcept(bm::is_lock_noexcept< lock_type >::value)
lock the queue access
void push_no_lock(const value_type &v)
Push value to the back of the queue without lock protection It is assumed that processing did not sta...
queue_sync & operator=(const queue_sync &)=delete
std::condition_variable queue_push_cond_
mutex paired conditional
std::mutex signal_mut_
signal mutex for q submissions
queue_sync() noexcept
constructor
bool try_lock() noexcept(bm::is_lock_noexcept< lock_type >::value)
Try to lock the queue exclusively.
void unlock() noexcept(bm::is_lock_noexcept< lock_type >::value)
unlock the queue access
Spin-lock with two-phase acquire (read + cas) padding parameter optionally adds a buffer to avoid CPU...
spin_lock & operator=(const spin_lock &)=delete
bool try_lock() noexcept
Try to acquire the lock, return true if successfull.
void unlock() noexcept
Unlock the lock.
spin_lock(const spin_lock &)=delete
std::atomic< unsigned > locked_
void lock() noexcept
Lock the lock.
Interface definition (base class) for a group of tasks (batch)
virtual size_type size() const =0
Return size of batch.
virtual bm::task_descr * get_task(size_type task_idx)=0
Get task by index in the batch.
Basic implementation for collection of tasks for parallel execution.
virtual size_type size() const noexcept
task_batch_base intreface implementation
virtual bm::task_descr * get_task(size_type task_idx)
Get task by index in the batch.
Utility class to submit task batch to the running thread pool and optionally wait for it getting done...
static void wait_for_batch_done(thread_pool_type &tpool, bm::task_batch_base &tasks, task_batch_base::size_type from_idx, task_batch_base::size_type to_idx)
Check if all batch jobs in the specified interval are done Spin wait if not.
thread_pool_executor(const thread_pool_executor &)=delete
static void run(thread_pool_type &tpool, bm::task_batch_base &tasks, bool wait_for_batch)
task_batch_base::size_type size_type
thread_pool_executor & operator=(const thread_pool_executor &)=delete
Thread pool with custom (thread safe) queue.
void stop() noexcept
Request an immediate stop of all threads in the pool.
queue_type job_queue_
queue (thread sync)
std::condition_variable task_done_cond_
mutex paired conditional
thread_pool(const thread_pool &)=delete
std::vector< std::thread > thread_vect_
threads servicing queue
queue_type & get_job_queue() noexcept
Get access to the job submission queue.
bm::queue_sync< QValue, lock_type > queue_type
void join()
Wait for threads to finish (or stop if stop was requested)
thread_pool(stop_mode sm=no_stop) noexcept
void worker_func()
Internal worker wrapper with busy-wait spin loop making pthread-like call for tasks.
int is_stopped() const noexcept
Return if thread pool is stopped by a request.
void start(unsigned tcount)
Start thread pool worker threads.
thread_pool & operator=(const thread_pool &)=delete
void wait_empty_queue()
Conditional spin-wait for the queue to empty (Important note: tasks may still be running,...
stop_mode
Stop modes for threads: 0 - keep running/waiting for jobs 1 - wait for empty task queue then stop thr...
@ no_stop
keep spinning on busy-wait
@ stop_when_done
stop if tsak queue is empty
std::atomic_int stop_flag_
stop flag to all threads
void set_stop_mode(stop_mode sm) noexcept
Setup the criteria for threads shutdown Also notifies all threads on a new directive.
std::mutex task_done_mut_
signal mutex for task done
static char * join(int argc, char *argv[], const char sep[])
void join_multiple_threads(TCont &tcont)
Wait for multiple threads to exit.
const GenericPointer< typename T::ValueType > T2 value
GenericValue< UTF8<> > Value
GenericValue with UTF8 encoding.
"noexcept" traits detection for T::lock()
Pad 60 bytes so that the final ocupiles 64 bytes (1 cache line)
BitMagic task with a captured function.
std::atomic_bool done
0 - pending
@ no_flag
no flag specified
bm::id64_t flags
task flags to designate barriers
task_function_t func
captured function callback