NCBI C++ ToolKit
thread_pool.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: thread_pool.cpp 99785 2023-05-10 17:50:50Z vasilche $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Pavel Ivanov, Denis Vakatov
27 *
28 * File Description:
29 * Pool of threads.
30 */
31 
32 #include <ncbi_pch.hpp>
33 #include <util/thread_pool.hpp>
35 #include <util/sync_queue.hpp>
36 #include <util/error_codes.hpp>
37 
38 #define NCBI_USE_ERRCODE_X Util_Thread
39 
41 
42 
43 class CThreadPool_Guard;
45 
46 
47 /// Functor to compare tasks by priority
50  const CRef<CThreadPool_Task>& right) const
51  {
52  return left->GetPriority() < right->GetPriority();
53  }
54 };
55 
56 
57 /// Real implementation of all ThreadPool functions
58 class CThreadPool_Impl : public CObject
59 {
60 public:
62 
63  /// Convert pointer to CThreadPool object into pointer to CThreadPool_Impl
64  /// object. Can be done only here to avoid excessive friendship to
65  /// CThreadPool class.
67 
68  /// Call x_SetTaskStatus() for the given task.
69  /// Method introduced to avoid excessive friendship to CThreadPool_Task
70  /// class.
71  ///
72  /// @sa CThreadPool_Task::x_SetTaskStatus()
73  static void sx_SetTaskStatus(CThreadPool_Task* task,
75 
76  /// Call x_RequestToCancel() for the given task.
77  /// Method introduced to avoid excessive friendship to CThreadPool_Task
78  /// class.
79  ///
80  /// @sa CThreadPool_Task::x_RequestToCancel()
81  static void sx_RequestToCancel(CThreadPool_Task* task);
82 
83 
84  /// Constructor with default controller
85  /// @param pool_intf
86  /// ThreadPool interface object attached to this implementation
87  ///
88  /// @sa CThreadPool::CThreadPool()
89  CThreadPool_Impl(CThreadPool* pool_intf,
90  unsigned int queue_size,
91  unsigned int max_threads,
92  unsigned int min_threads,
94 
95  /// Constructor with explicitly given controller
96  /// @param pool_intf
97  /// ThreadPool interface object attached to this implementation
98  ///
99  /// @sa CThreadPool::CThreadPool()
100  CThreadPool_Impl(CThreadPool* pool_intf,
101  unsigned int queue_size,
102  CThreadPool_Controller* controller,
103  CThread::TRunMode threads_mode = CThread::fRunDefault);
104 
105  /// Get pointer to ThreadPool interface object
106  CThreadPool* GetPoolInterface(void) const;
107 
108  /// Set destroy timeout for the pool
109  ///
110  /// @sa CThreadPool::SetDestroyTimeout()
111  void SetDestroyTimeout(const CTimeSpan& timeout);
112 
113  /// Get destroy timeout for the pool
114  ///
115  /// @sa CThreadPool::GetDestroyTimeout()
116  const CTimeSpan& GetDestroyTimeout(void) const;
117 
118  /// Destroy reference to this object
119  /// Method is called when CThreadPool object is destroyed which means
120  /// that implementation can be destroyed too if there is no references
121  /// to it left.
122  void DestroyReference(void);
123 
124  /// Get main pool mutex
125  ///
126  /// @sa CThreadPool::GetMainPoolMutex()
127  CMutex& GetMainPoolMutex(void);
128 
129  /// Add task to the pool
130  ///
131  /// @sa CThreadPool::AddTask()
132  void AddTask(CThreadPool_Task* task, const CTimeSpan* timeout);
133 
134  /// Request to cancel the task
135  ///
136  /// @sa CThreadPool::CancelTask()
137  void CancelTask(CThreadPool_Task* task);
138 
139  /// Cancel the selected groups of tasks in the pool
140  ///
141  /// @sa CThreadPool::CancelTasks()
142  void CancelTasks(TExclusiveFlags tasks_group);
143 
144  /// Add the task for exclusive execution in the pool
145  ///
146  /// @sa CThreadPool::RequestExclusiveExecution()
149 
150  /// Launch new threads in pool
151  /// @param count
152  /// Number of threads to launch
153  void LaunchThreads(unsigned int count);
154 
155  /// Finish threads in pool
156  /// Stop first all idle threads then stop busy threads without
157  /// cancelation of currently executing tasks.
158  /// @param count
159  /// Number of threads to finish
160  void FinishThreads(unsigned int count);
161 
162  /// Get number of threads running in the pool
163  unsigned int GetThreadsCount(void) const;
164 
165  /// Mark thread as idle or non-idle
166  /// @param thread
167  /// Thread to mark
168  /// @param is_idle
169  /// If thread should be marked as idle or not
170  bool SetThreadIdle(CThreadPool_ThreadImpl* thread, bool is_idle);
171 
172  /// Callback from working thread when it finished its Main() method
174 
175  /// Callback when some thread changed its idleness or finished
176  /// (including service thread)
177  void ThreadStateChanged(void);
178 
179  /// Get next task from queue if there is one
180  /// If the queue is empty then return NULL.
182 
183  /// Callback from thread when it is starting to execute task
184  void TaskStarting(void);
185 
186  /// Callback from thread when it has finished to execute task
187  void TaskFinished(void);
188 
189  /// Get the number of tasks currently waiting in queue
190  unsigned int GetQueuedTasksCount(void) const;
191 
192  /// Get the number of currently executing tasks
193  unsigned int GetExecutingTasksCount(void) const;
194 
195  /// Type for storing information about exclusive task launching
200  : flags(f), task(t) {}
201  };
202 
203  /// Get the next exclusive task to execute
204  SExclusiveTaskInfo TryGetExclusiveTask(void);
205 
206  /// Request suspension of the pool
207  /// @param flags
208  /// Parameters for necessary exclusive execution environment
210 
211  /// Resume the pool operation after exclusive task execution
212  void ResumeWork(void);
213 
214  /// Check if the pool is suspended for exclusive execution
215  bool IsSuspended(void) const;
216 
217  /// Check if it is already allowed to execute exclusive task
218  bool CanDoExclusiveTask(void) const;
219 
220  /// Abort the pool operation
221  ///
222  /// @sa CThreadPool::Abort()
223  void Abort(const CTimeSpan* timeout);
224 
225  /// Check if the pool is already aborted
226  bool IsAborted(void) const;
227 
228  /// Finish all current threads and replace them with new ones
229  ///
230  /// @sa CThreadPool::FlushThreads()
231  void FlushThreads(CThreadPool::EFlushType flush_type);
232 
233  /// Call the CThreadPool_Controller::HandleEvent() method of the pool
234  /// controller with the given event type. If ThreadPool is already aborted
235  /// and controller is reset then do nothing.
237 
238  /// Schedule running of CThreadPool_Controller::HandleEvent() with eOther
239  /// event type
240  void CallControllerOther(void);
241 
242  /// Call the CThreadPool_Controller::GetSafeSleepTime() method of the pool
243  /// controller. If ThreadPool is already aborted and controller is reset
244  /// then return time period of 1 second.
245  CTimeSpan GetSafeSleepTime(void) const;
246 
247  /// Mark that initialization of the interface was finished
248  void SetInterfaceStarted(void);
249 
250 
251 private:
252  /// Type of queue used for storing tasks
257  /// Type of queue used for storing information about exclusive tasks
259  /// Type of list of all poolled threads
261 
262 
263  /// Prohibit copying and assigning
266 
267  /// Transform size of queue given in constructor to the size passed to
268  /// CSyncQueue constructor.
269  /// Method can be called only from constructor because it initializes
270  /// value of m_IsQueueAllowed member variable.
271  unsigned int x_GetQueueSize(unsigned int queue_size);
272 
273  /// Initialization of all class member variables that can be initialized
274  /// outside of constructor
275  /// @param pool_intf
276  /// ThreadPool interface object attached to this implementation
277  /// @param controller
278  /// Controller for the pool
279  void x_Init(CThreadPool* pool_intf,
280  CThreadPool_Controller* controller,
281  CThread::TRunMode threads_mode);
282 
283  /// Destructor. Will be called from CRef
284  ~CThreadPool_Impl(void);
285 
286  /// Delete task from the queue
287  /// If task does not exist in queue then does nothing.
288  void x_RemoveTaskFromQueue(const CThreadPool_Task* task);
289 
290  /// Cancel all tasks waiting in the queue
291  void x_CancelQueuedTasks(void);
292 
293  /// Cancel all currently executing tasks
294  void x_CancelExecutingTasks(void);
295 
296  /// Type of some simple predicate
297  ///
298  /// @sa x_WaitForPredicate
299  typedef bool (CThreadPool_Impl::*TWaitPredicate)(void) const;
300 
301  /// Check if addeding new tasks to the pool is prohibited
302  bool x_NoNewTaskAllowed(void) const;
303 
304  /// Check if new task can be added to the pool when queueiing is disabled
305  bool x_CanAddImmediateTask(void) const;
306 
307  /// Check if all threads in pool finished their work
308  bool x_HasNoThreads(void) const;
309 
310  /// Wait for some predicate to be true
311  /// @param wait_func
312  /// Predicate to wait for
313  /// @param pool_guard
314  /// Guardian that locks main pool mutex at the time of method call and
315  /// that have to be unlocked for the time of waiting
316  /// @param wait_sema
317  /// Semaphore which will be posted when predicate become true
318  /// @param timeout
319  /// Maximum amount of time to wait
320  /// @param timer
321  /// Timer for mesuring elapsed time. Method assumes that timer is
322  /// started at the moment from which timeout should be calculated.
323  bool x_WaitForPredicate(TWaitPredicate wait_func,
324  CThreadPool_Guard* pool_guard,
325  CSemaphore* wait_sema,
326  const CTimeSpan* timeout,
327  const CStopWatch* timer);
328 
329 
330 private:
331  /// ThreadPool interface object attached to this implementation
333  /// Reference to this pool to prevent its destroying earlier than we
334  /// allow it to
336  /// Timeout to wait for all threads to finish before the ThreadPool
337  /// interface object will be able to destroy
339  /// Queue for storing tasks
341  /// Mutex for guarding all changes in the pool, its threads and controller
343  /// Semaphore for waiting for available threads to process task when
344  /// queuing is disabled.
346  /// Controller managing count of threads in pool
348  /// List of all idle threads
350  /// List of all threads currently executing some tasks
352  /// Running mode of all threads
354  /// Total number of threads
355  /// Introduced for more adequate and fast reflecting to threads starting
356  /// and stopping events
358  /// Number of tasks executing now
359  /// Introduced for more adequate and fast reflecting to task executing
360  /// start and finish events
362  /// Total number of tasks acquired by pool
363  /// Includes queued tasks and executing (but not exclusive!) tasks.
364  /// Introduced for maintaining atomicity of this number changing
366  /// Flag about working with special case:
367  /// FALSE - queue_size == 0, TRUE - queue_size > 0
369  /// If pool is already aborted or not
370  atomic<bool> m_Aborted;
371  /// Semaphore for waiting for threads finishing in Abort() method
372  ///
373  /// @sa Abort()
375  /// If pool is suspended for exclusive task execution or not.
376  /// Thread Checker can complain that access to this variable everywhere is
377  /// not guarded by some mutex. But it's okay because special care is taken
378  /// to make any race a matter of timing - suspend will happen properly in
379  /// any case. Also everything is written with the assumption that there's
380  /// no other threads (besides this very thread pool) that could call any
381  /// methods here.
382  atomic<bool> m_Suspended;
383  /// Requested requirements for the exclusive execution environment
385  /// Flag indicating if flush of threads requested after adding exclusive
386  /// task but before it is started its execution.
387  volatile bool m_FlushRequested;
388  /// Thread for execution of exclusive tasks and passing of events
389  /// to the controller
391  /// Queue for information about exclusive tasks
393 };
394 
395 
396 
397 /// Real implementation of all CThreadPool_Thread functions
399 {
400 public:
401  /// Convert pointer to CThreadPool_Thread object into pointer
402  /// to CThreadPool_ThreadImpl object. Can be done only here to avoid
403  /// excessive friendship to CThreadPool_Thread class.
404  static CThreadPool_ThreadImpl*
406 
407  /// Create new CThreadPool_Thread object
408  /// Method introduced to avoid excessive friendship to CThreadPool_Thread
409  /// class.
410  ///
411  /// @sa CThreadPool_Thread::CThreadPool_Thread()
413 
414  /// Constructor
415  /// @param thread_intf
416  /// ThreadPool_Thread interface object attached to this implementation
417  /// @param pool
418  /// Pool implementation owning this thread
420  CThreadPool_Impl* pool);
421 
422  /// Destructor
423  /// Called directly from CThreadPool destructor
425 
426  /// Get ThreadPool interface object owning this thread
427  ///
428  /// @sa CThreadPool_Thread::GetPool()
429  CThreadPool* GetPool(void) const;
430 
431  /// Request this thread to finish its operation.
432  /// It renders the thread unusable and eventually ready for destruction
433  /// (as soon as its current task is finished and there are no CRefs to
434  /// this thread left).
435  void RequestToFinish(void);
436 
437  /// If finishing of this thread is already in progress or not
438  bool IsFinishing(void) const;
439 
440  /// Wake up the thread from idle state
441  ///
442  /// @sa x_Idle
443  void WakeUp(void);
444 
445  /// Get task currently executing in the thread
446  /// May be NULL if thread is idle or is in the middle of changing of
447  /// current task
448  ///
449  /// @sa CThreadPool_Thread::GetCurrentTask()
451 
452  /// Request to cancel current task execution
453  void CancelCurrentTask(void);
454 
455  /// Implementation of thread Main() method
456  ///
457  /// @sa CThreadPool_Thread::Main()
458  void Main(void);
459 
460  /// Implementation of threadOnExit() method
461  ///
462  /// @sa CThreadPool_Thread::OnExit()
463  void OnExit(void);
464 
465 private:
466  /// Prohibit copying and assigning
469 
470  /// Suspend until the wake up signal.
471  ///
472  /// @sa WakeUp()
473  void x_Idle(void);
474 
475  /// Mark the thread idle or non-idle
476  bool x_SetIdleState(bool is_idle);
477 
478  /// Do finalizing when task finished its execution
479  /// @param status
480  /// Status that the task must get
482 
483 
484  /// ThreadPool_Thread interface object attached to this implementation
486  /// Pool running the thread
488  /// If the thread is already asked to finish or not
489  atomic<bool> m_Finishing;
490  /// If cancel of the currently executing task is requested or not
491  atomic<bool> m_CancelRequested;
492  /// Idleness of the thread
493  bool m_IsIdle;
494  /// Task currently executing in the thread
496  /// Semaphore for waking up from idle waiting
498  /// General-use mutex for very (very!) trivial ops
500 };
501 
502 
503 
504 /// Thread used in pool for different internal needs: execution of exclusive
505 /// tasks and passing events to controller
507 {
508 public:
509  /// Constructor
510  /// @param pool
511  /// ThreadPool owning this thread
513 
514  /// Wake up from idle waiting or waiting of pool preparing exclusive
515  /// environment
516  void WakeUp(void);
517 
518  /// Request finishing of the thread
519  void RequestToFinish(void);
520 
521  /// Check if this thread have already finished or not
522  bool IsFinished(void);
523 
524  /// Tell the thread that controller should handle eOther event
525  ///
526  /// @sa CThreadPool_Controller::HandleEvent()
527  void NeedCallController(void);
528 
529 protected:
530  /// Destructor. Will be called from CRef
531  virtual ~CThreadPool_ServiceThread(void);
532 
533 private:
534  /// Main thread execution
535  virtual void* Main(void);
536 
537  /// Do "idle" work when thread is not busy executing exclusive tasks
538  void x_Idle(void);
539 
540  /// Pool owning this thread
542  /// Semaphore for idle sleeping
544  /// If finishing of the thread is already requested
545  atomic<bool> m_Finishing;
546  /// If the thread has already finished its Main() method
547  atomic<bool> m_Finished;
548  /// Currently executing exclusive task
550  /// Flag indicating that thread should pass eOther event to the controller
552  /// General-use mutex for very (very!) trivial ops
554 };
555 
556 
557 
558 /// Guardian for protecting pool by locking its main mutex
560 {
561 public:
562  /// Constructor
563  /// @param pool
564  /// Pool to protect
565  /// @param is_active
566  /// If the mutex should be locked in constructor or not
567  CThreadPool_Guard(CThreadPool_Impl* pool, bool is_active = true);
568 
569  /// Turn this guardian on
570  void Guard(void);
571 
572  /// Turn this guardian off
573  void Release(void);
574 
575 private:
576  /// Pool protected by the guardian
578 };
579 
580 
581 
582 /// Special task which does nothing
583 /// It's used in FlushThreads to force pool to wait while all old threads
584 /// finish their operation to start new ones.
585 ///
586 /// @sa CThreadPool_Impl::FlushThreads()
588 {
589 public:
590  /// Empty main method
591  virtual EStatus Execute(void) { return eCompleted; }
592 
593  // In the absence of the following constructor, new compilers (as required
594  // by the new C++ standard) may fill the object memory with zeros,
595  // erasing flags set by CObject::operator new (see CXX-1808)
597 };
598 
599 
600 
601 /// Check if status returned from CThreadPool_Task::Execute() is allowed
602 /// and change it to eCompleted value if it is invalid
603 static inline CThreadPool_Task::EStatus
605 {
607  || status == CThreadPool_Task::eFailed
608  || status == CThreadPool_Task::eCanceled);
609 
610  if (status != CThreadPool_Task::eCompleted
611  && status != CThreadPool_Task::eFailed
612  && status != CThreadPool_Task::eCanceled)
613  {
615  << "Wrong status returned from "
616  "CThreadPool_Task::Execute(): "
617  << status);
619  }
620 
621  return status;
622 }
623 
624 
625 
627 
628 
629 inline void
631 {
633 }
634 
635 inline void
637 {
640  }
641  else {
642  WakeUp();
643  }
644 }
645 
646 
647 
648 inline void
650 {
652 }
653 
654 
655 
656 inline CMutex&
658 {
659  return m_MainPoolMutex;
660 }
661 
662 
663 
666  m_Pool(pool)
667 {
668  _ASSERT(pool);
669 
670  if (is_active)
671  Guard();
672 }
673 
674 void
676 {
678 }
679 
680 void
682 {
684 }
685 
686 
687 
688 inline void
691 {
692  task->x_SetStatus(status);
693 }
694 
695 inline void
697 {
698  task->x_RequestToCancel();
699 }
700 
701 inline CThreadPool*
703 {
704  return m_Interface;
705 }
706 
707 inline void
709 {
711 }
712 
713 inline bool
715 {
716  return m_Aborted;
717 }
718 
719 inline bool
721 {
722  return m_Suspended.load(memory_order_acquire);
723 }
724 
725 inline unsigned int
727 {
728  return (unsigned int)m_ThreadsCount.Get();
729 }
730 
731 inline unsigned int
733 {
734  return (unsigned int)m_Queue.GetSize();
735 }
736 
737 inline unsigned int
739 {
740  return (unsigned int)m_ExecutingTasks.Get();
741 }
742 
743 inline CTimeSpan
745 {
746  // m_Controller variable can be uninitialized in only when ThreadPool
747  // is already aborted
749  if (controller && ! m_Aborted) {
750  return controller->GetSafeSleepTime();
751  }
752  else {
753  return CTimeSpan(0, 0);
754  }
755 }
756 
757 inline void
759 {
761  if (controller && ! m_Aborted &&
763  {
764  controller->HandleEvent(event);
765  }
766 }
767 
768 inline void
770 {
772  if (thread) {
773  thread->NeedCallController();
774  }
775 }
776 
777 inline void
779 {
781  // In current implementation controller operation doesn't depend on this
782  // action. So we will save mutex locks for the sake of performance
783  //CallControllerOther();
784 }
785 
786 inline void
788 {
789  m_ExecutingTasks.Add(-1);
790  m_TotalTasks.Add(-1);
791  if ( !m_IsQueueAllowed ) {
792  m_RoomWait.Post();
793  }
795 }
796 
797 inline void
799 {
800  if (m_Aborted) {
801 
802  // This lock is actually to protect access to the threads containers.
803  // It was decided that this lock must not be inside x_HasNoThreads()
804  // but to be outside.
805  bool has_no_threads = false;
806  {{
807  CThreadPool_Guard guard(this);
808  has_no_threads = x_HasNoThreads();
809  }}
810 
811  if (has_no_threads) {
812  m_AbortWait.Post();
813  }
814  }
815  else if (IsSuspended()) {
817  && GetThreadsCount() == 0)
819  && m_WorkingThreads.size() == 0))
820  {
822  }
823  }
824 }
825 
826 inline void
828 {
829  CThreadPool_Guard guard(this);
830 
831  m_ThreadsCount.Add(-1);
832 
833  m_IdleThreads.erase(thread);
834  m_WorkingThreads.erase(thread);
835 
837 
839 }
840 
843 {
844  if ( !IsSuspended() ) {
846 
847  if (m_Queue.GetSize() != 0) {
848  return m_Queue.Pop();
849  }
850  }
851 
852  return CRef<CThreadPool_Task>();
853 }
854 
855 
858 {
860 
861  if (m_ExclusiveQueue.GetSize() == 0
862  || ((guard.Begin()->flags & CThreadPool::fExecuteQueuedTasks) != 0
863  && (m_TotalTasks.Get() != 0))) {
865  }
866 
868 
869  if (m_FlushRequested) {
871  m_FlushRequested = false;
872  }
873  return info;
874 }
875 
876 
877 inline bool
879 {
881  && GetThreadsCount() != 0)
882  {
883  return false;
884  }
885 
886  return m_WorkingThreads.size() == 0;
887 }
888 
889 inline void
891 {
893  m_Suspended.store(true, memory_order_release);
896  }
899  }
900 
902  FinishThreads((unsigned int)m_IdleThreads.size());
903  }
904 
906 }
907 
908 inline void
910 {
911  m_Suspended.store(false, memory_order_release);
912 
914 
916  (*it)->WakeUp();
917  }
918 }
919 
920 
921 
922 inline void
924 {
925  if (m_Pool != NULL) {
926  NCBI_THROW(CThreadPoolException, eControllerBusy,
927  "Cannot attach Controller to several ThreadPools.");
928  }
929 
930  m_Pool = pool;
931 }
932 
933 inline void
935 {
936  m_Pool = NULL;
937 }
938 
939 
940 
942 {
943  x_Init(priority);
944 }
945 
947 {
948  x_Init(other.m_Priority);
949 }
950 
951 void
952 CThreadPool_Task::x_Init(unsigned int priority)
953 {
954  m_Pool = NULL;
955  m_Priority = priority;
956  // Thread Checker complains here but this code is called only from
957  // constructor, so no one else can reference this task yet.
958  m_Status = eIdle;
959  m_CancelRequested = false;
960 }
961 
963 {}
964 
967 {
968  if (m_IsBusy.Get() != 0) {
969  NCBI_THROW(CThreadPoolException, eTaskBusy,
970  "Cannot change task when it is already added "
971  "to ThreadPool");
972  }
973 
974  CObject::operator= (other);
975  // There can be race with CThreadPool_Impl::AddTask()
976  // If task will be already added to queue and priority will be then
977  // changed queue can crush later
978  m_Priority = other.m_Priority;
979  return *this;
980 }
981 
982 void
984 {}
985 
986 void
988 {}
989 
990 inline void
992 {
993  if (m_IsBusy.Add(1) != 1) {
994  m_IsBusy.Add(-1);
995  NCBI_THROW(CThreadPoolException, eTaskBusy,
996  "Cannot add task in ThreadPool several times");
997  }
998 
999  // Thread Checker complains that this races with task canceling and
1000  // resetting m_Pool below. But it's an thread pool usage error if
1001  // someone tries to call concurrently AddTask and CancelTask. With a proper
1002  // workflow CancelTask shouldn't be called until AddTask has returned.
1003  m_Pool = pool;
1004 }
1005 
1006 inline void
1008 {
1009  m_Pool = NULL;
1010  m_IsBusy.Add(-1);
1011 }
1012 
1013 void
1015 {
1016  EStatus old_status = m_Status;
1017  if (old_status != new_status && old_status != eCanceled) {
1018  // Thread Checker complains here, but all status transitions are
1019  // properly guarded with different mutexes and they cannot mix with
1020  // each other.
1021  m_Status = new_status;
1022  OnStatusChange(old_status);
1023  }
1024 
1025  if (IsFinished()) {
1026  // Thread Checker complains here. See comment in x_SetOwner above for
1027  // details.
1028  m_Pool = NULL;
1029  }
1030 }
1031 
1032 inline void
1034 {
1035  m_CancelRequested = true;
1036 
1038 
1039  if (GetStatus() <= eQueued) {
1040  // This can race with calling task's Execute() method but it's okay.
1041  // For details see comment in CThreadPool_ThreadImpl::Main().
1043  }
1044 }
1045 
1046 void
1048 {
1049  // Protect from possible reseting of the pool variable during execution
1050  CThreadPool_Impl* pool = m_Pool;
1051  if (IsFinished()) {
1052  return;
1053  }
1054  else if (!pool) {
1056  }
1057  else {
1058  pool->CancelTask(this);
1059  }
1060 }
1061 
1062 CThreadPool*
1064 {
1065  // GCC thread sanitizer complains on GetPool() when a task is cancelled in
1066  // a thread pool. This is however a false positive. The CancelTask() may
1067  // happened only when the thread pool exists so there m_Pool set properly.
1068 
1069  // Protect from possible reseting of the pool variable during execution
1070  CThreadPool_Impl* pool_impl = m_Pool;
1071  return pool_impl? pool_impl->GetPoolInterface(): NULL;
1072 }
1073 
1074 
1075 
1077  : m_Pool(pool),
1078  m_IdleTrigger(0, kMax_Int),
1079  m_Finishing(false),
1080  m_Finished(false)
1081 {
1082  _ASSERT(pool);
1083 
1085 }
1086 
1088 {}
1089 
1090 inline bool
1092 {
1093  return m_Finished;
1094 }
1095 
1096 inline void
1098 {
1101  }
1103 
1104  CTimeSpan timeout = m_Pool->GetSafeSleepTime();
1105  // TODO: it would be better to use CTimeout for all timeouts
1106  _ASSERT(timeout.GetSign() != eNegative);
1107  m_IdleTrigger.TryWait((unsigned int)timeout.GetCompleteSeconds(),
1108  (unsigned int)timeout.GetNanoSecondsAfterSecond());
1109 }
1110 
1111 inline void
1113 {
1114  m_Finishing = true;
1115  WakeUp();
1116 
1118  {{
1119  CFastMutexGuard fast_guard(m_FastMutex);
1120  task = m_CurrentTask;
1121  }}
1122 
1123  if ( task.NotNull() ) {
1125  }
1126 }
1127 
1128 void*
1130 {
1131  while (! m_Finishing) {
1134 
1135  {{
1136  CFastMutexGuard fast_guard(m_FastMutex);
1137  m_CurrentTask = task_info.task;
1138  }}
1139 
1140 
1141  if ( m_CurrentTask.IsNull() ) {
1142  x_Idle();
1143  continue;
1144  }
1145 
1146  CThreadPool_Guard guard(m_Pool);
1147 
1148  if (m_Finishing) {
1149  if (! m_CurrentTask->IsCancelRequested()) {
1151  }
1154  break;
1155  }
1156 
1157 
1158  // Signal to suspend the threads for the execution of exclusive task
1159  m_Pool->RequestSuspend(task_info.flags
1161 
1162  // Wait until pool is ready for execution of the exclusive task
1163  while (! m_Pool->IsAborted() && ! m_Pool->CanDoExclusiveTask()) {
1164  guard.Release();
1165  m_IdleTrigger.Wait();
1166  guard.Guard();
1167  }
1168 
1169  if (m_Finishing) {
1170  if (!m_CurrentTask->IsCancelRequested()) {
1172  }
1175  break;
1176  }
1177 
1178  guard.Release();
1179 
1182  try {
1183  CThreadPool_Task::EStatus status =
1186  }
1187  NCBI_CATCH_ALL_X(11, "Exception from exclusive task in ThreadPool");
1188 
1189  guard.Guard();
1190  m_Pool->ResumeWork();
1191  }
1192 
1193  m_Finished = true;
1195 
1196  return NULL;
1197 }
1198 
1199 
1200 
1201 inline CThreadPool_ThreadImpl*
1203 {
1204  return thread->m_Impl;
1205 }
1206 
1207 inline CThreadPool_Thread*
1209 {
1210  return new CThreadPool_Thread(pool);
1211 }
1212 
1213 inline
1215 (
1216  CThreadPool_Thread* thread_intf,
1217  CThreadPool_Impl* pool
1218 )
1219  : m_Interface(thread_intf),
1220  m_Pool(pool),
1221  m_Finishing(false),
1222  m_CancelRequested(false),
1223  m_IsIdle(true),
1224  m_IdleTrigger(0, kMax_Int)
1225 {}
1226 
1227 inline
1229 {}
1230 
1231 inline CThreadPool*
1233 {
1234  return m_Pool->GetPoolInterface();
1235 }
1236 
1237 inline bool
1239 {
1240  return m_Finishing;
1241 }
1242 
1245 {
1246  CFastMutexGuard fast_guard(m_FastMutex);
1247  return m_CurrentTask;
1248 }
1249 
1250 inline bool
1252 {
1253  if (m_IsIdle == is_idle)
1254  return true;
1255 
1256  if ( !m_Pool->SetThreadIdle(this, is_idle) )
1257  return false;
1258 
1259  m_IsIdle = is_idle;
1260  return true;
1261 }
1262 
1263 inline void
1265 {
1268  }
1269 
1270  {{
1271  CFastMutexGuard fast_guard(m_FastMutex);
1272  m_CurrentTask.Reset();
1273  }}
1274  m_Pool->TaskFinished();
1275 }
1276 
1277 inline void
1279 {
1280  if ( x_SetIdleState(true) )
1281  m_IdleTrigger.Wait();
1282 }
1283 
1284 inline void
1286 {
1287  m_Finishing = true;
1288  WakeUp();
1289 }
1290 
1291 inline void
1293 {
1294  // Avoid resetting of the pointer during execution
1296  {{
1297  CFastMutexGuard fast_guard(m_FastMutex);
1298  task = m_CurrentTask;
1299  }}
1300 
1301  if (task.NotNull()) {
1303  }
1304  else {
1305  m_CancelRequested = true;
1306  }
1307 }
1308 
1309 inline void
1311 {
1313 
1314  while (!m_Finishing) {
1315  // We have to heed call to CancelCurrentTask() only after this point.
1316  // So we reset value of m_CancelRequested here without any mutexes.
1317  // If CancelCurrentTask() is called earlier or this assignment races
1318  // with assignment in CancelCurrentTask() then caller of
1319  // CancelCurrentTask() will make sure that TryGetNextTask() returns
1320  // NULL.
1321  m_CancelRequested = false;
1322 
1323  {{
1325  CFastMutexGuard fast_guard(m_FastMutex);
1326  m_CurrentTask = task;
1327  }}
1328 
1329 
1330  if (m_CurrentTask.IsNull()) {
1331  x_Idle();
1332  }
1333  else {
1335  // Some race can appear if task is canceled at the time
1336  // when it's being queued or at the time when it's being
1337  // unqueued
1338  if (! m_CurrentTask->IsCancelRequested()) {
1340  }
1343  CFastMutexGuard fast_guard(m_FastMutex);
1344  m_CurrentTask = NULL;
1345  continue;
1346  }
1347 
1348  x_SetIdleState(false);
1349  m_Pool->TaskStarting();
1350 
1351  // This can race with canceling of the task. This can result in
1352  // task's Execute() method called with the state of eCanceled
1353  // already set or cancellation being totally ignored in the task's
1354  // status (m_CancelRequested will be still set). Both outcomes are
1355  // simple timing and cancellation should be checked in the task's
1356  // Execute() method anyways. The worst outcome here is that task
1357  // can be marked as eCanceled when it's completely and successfully
1358  // executed. I don't think it's too bad though.
1361 
1362  try {
1363  CThreadPool_Task::EStatus status =
1365  x_TaskFinished(status);
1366  }
1367  catch (exception& e) {
1368  ERR_POST_X(7, "Exception from task in ThreadPool: " << e);
1369  if (m_CurrentTask.NotEmpty()) {
1371  }
1372  }
1373  catch (...) {
1374  ERR_POST_X(7, "Non-standard exception from task in ThreadPool");
1375  if (m_CurrentTask.NotEmpty()) {
1377  }
1378  throw;
1379  }
1380  }
1381  }
1382 }
1383 
1384 inline void
1386 {
1387  try {
1388  m_Interface->Finalize();
1389  } STD_CATCH_ALL_X(8, "Finalize")
1390 
1391  m_Pool->ThreadStopped(this);
1392 }
1393 
1394 
1395 
1396 inline CThreadPool_Impl*
1398 {
1399  return pool->m_Impl;
1400 }
1401 
1402 inline unsigned int
1403 CThreadPool_Impl::x_GetQueueSize(unsigned int queue_size)
1404 {
1405  if (queue_size == 0) {
1406  // 10 is just in case, in fact when queue_size == 0 pool will always
1407  // check for idle threads, so tasks will never crowd in the queue
1408  queue_size = 10;
1409  m_IsQueueAllowed = false;
1410  }
1411  else {
1412  m_IsQueueAllowed = true;
1413  }
1414 
1415  return queue_size;
1416 }
1417 
1418 inline
1420  unsigned int queue_size,
1421  unsigned int max_threads,
1422  unsigned int min_threads,
1423  CThread::TRunMode threads_mode)
1424  : m_Queue(x_GetQueueSize(queue_size)),
1425  m_RoomWait(0, kMax_Int),
1426  m_AbortWait(0, kMax_Int)
1427 {
1428  x_Init(pool_intf,
1429  new CThreadPool_Controller_PID(max_threads, min_threads),
1430  threads_mode);
1431 }
1432 
1433 inline
1435  unsigned int queue_size,
1436  CThreadPool_Controller* controller,
1437  CThread::TRunMode threads_mode)
1438  : m_Queue(x_GetQueueSize(queue_size)),
1439  m_RoomWait(0, kMax_Int),
1440  m_AbortWait(0, kMax_Int)
1441 {
1442  x_Init(pool_intf, controller, threads_mode);
1443 }
1444 
1445 void
1447  CThreadPool_Controller* controller,
1448  CThread::TRunMode threads_mode)
1449 {
1450  m_Interface = pool_intf;
1451  m_SelfRef = this;
1452  m_DestroyTimeout = CTimeSpan(10, 0);
1453  m_ThreadsCount.Set(0);
1454  m_ExecutingTasks.Set(0);
1455  m_TotalTasks.Set(0);
1456  m_Aborted = false;
1457  m_Suspended.store(false, memory_order_relaxed);
1458  m_FlushRequested = false;
1459  m_ThreadsMode = (threads_mode | CThread::fRunDetached)
1461 
1462  controller->x_AttachToPool(this);
1463  m_Controller = controller;
1464 
1466 }
1467 
1469 {}
1470 
1471 inline void
1473 {
1474  // Abort even if m_Aborted == true because threads can still be running
1475  // and we have to wait for their termination
1477 
1478  m_Interface = NULL;
1479  {{
1480  CThreadPool_Guard guard(this);
1482  }}
1483  m_SelfRef = NULL;
1484 }
1485 
1486 inline void
1488 {
1489  m_DestroyTimeout = timeout;
1490 }
1491 
1492 inline const CTimeSpan&
1494 {
1495  return m_DestroyTimeout;
1496 }
1497 
1498 void
1500 {
1501  if (count == 0)
1502  return;
1503 
1504  CThreadPool_Guard guard(this);
1505 
1506  for (unsigned int i = 0; i < count; ++i) {
1510  thread->Run(m_ThreadsMode);
1511  }
1512 
1515 }
1516 
1517 void
1519 {
1520  if (count == 0)
1521  return;
1522 
1523  CThreadPool_Guard guard(this);
1524 
1525  // The cast is theoretically extraneous, but Sun's WorkShop
1526  // compiler otherwise calls the wrong versions of begin() and
1527  // end() and refuses to convert the resulting iterators.
1529  static_cast<const TThreadsList&>(m_IdleThreads))
1530  {
1531  // Maybe in case of several quick consecutive calls we should favor
1532  // the willing to finish several threads.
1533  //if ((*it)->IsFinishing())
1534  // continue;
1535 
1536  (*it)->RequestToFinish();
1537  --count;
1538  if (count == 0)
1539  break;
1540  }
1541 
1543  static_cast<const TThreadsList&>(m_WorkingThreads))
1544  {
1545  if (count == 0)
1546  break;
1547 
1548  (*it)->RequestToFinish();
1549  --count;
1550  }
1551 }
1552 
1553 
1554 bool
1556 {
1557  CThreadPool_Guard guard(this);
1558 
1559  if (is_idle && !IsSuspended() && m_Queue.GetSize() != 0) {
1560  thread->WakeUp();
1561  return false;
1562  }
1563 
1564  TThreadsList* to_del;
1565  TThreadsList* to_ins;
1566  if (is_idle) {
1567  to_del = &m_WorkingThreads;
1568  to_ins = &m_IdleThreads;
1569  }
1570  else {
1571  to_del = &m_IdleThreads;
1572  to_ins = &m_WorkingThreads;
1573  }
1574 
1575  TThreadsList::iterator it = to_del->find(thread);
1576  if (it != to_del->end()) {
1577  to_del->erase(it);
1578  }
1579  to_ins->insert(thread);
1580 
1581  if (is_idle && IsSuspended()
1583  {
1584  thread->RequestToFinish();
1585  }
1586 
1588  return true;
1589 }
1590 
1591 inline bool
1593 {
1594  return
1595  m_Aborted ||
1597 }
1598 
1599 bool
1601 {
1602  if ( x_NoNewTaskAllowed() ) {
1603  // A very special kludge -- to allow immediately breaking the wait
1604  // loop when adding new tasks to the pool has been explicitly
1605  // prohibited (including if Abort() was called)
1606  return true;
1607  }
1608 
1609  return
1610  !IsSuspended() &&
1611  (unsigned int) m_TotalTasks.Get() < m_Controller->GetMaxThreads();
1612 }
1613 
1614 bool
1616 {
1618  return m_IdleThreads.size() + m_WorkingThreads.size() == 0
1619  && (!thread || thread->IsFinished());
1620 }
1621 
1622 bool
1623 CThreadPool_Impl::x_WaitForPredicate(TWaitPredicate wait_func,
1624  CThreadPool_Guard* pool_guard,
1625  CSemaphore* wait_sema,
1626  const CTimeSpan* timeout,
1627  const CStopWatch* timer)
1628 {
1629  bool done = (this->*wait_func)();
1630  if (done) {
1631  wait_sema->TryWait();
1632  return true;
1633  }
1634 
1635  while ( !done ) {
1636  pool_guard->Release();
1637 
1638  if (timeout) {
1639  CTimeSpan next_tm = CTimeSpan(timeout->GetAsDouble() - timer->Elapsed());
1640  if (next_tm.GetSign() == eNegative) {
1641  return false;
1642  }
1643  if (! wait_sema->TryWait(CTimeout(next_tm))) {
1644  return false;
1645  }
1646  }
1647  else {
1648  wait_sema->Wait();
1649  }
1650 
1651  pool_guard->Guard();
1652  done = (this->*wait_func)();
1653  }
1654 
1655  return true;
1656 }
1657 
1658 /// Throw an exception with standard message when AddTask() is called
1659 /// but ThreadPool is aborted or do not allow new tasks
1660 NCBI_NORETURN
1661 static inline void
1663 {
1664  NCBI_THROW(CThreadPoolException, eProhibited,
1665  "Adding of new tasks is prohibited");
1666 }
1667 
1668 inline void
1670 {
1671  _ASSERT(task);
1672 
1673  // To be sure that if simple new operator was passed as argument the task
1674  // will still be referenced even if some exception happen in this method
1675  CRef<CThreadPool_Task> task_ref(task);
1676 
1677  if ( x_NoNewTaskAllowed() ) {
1679  }
1680 
1681  CThreadPool_Guard guard(this, false);
1682  unique_ptr<CTimeSpan> adjusted_timeout;
1683 
1684  if (!m_IsQueueAllowed) {
1685  guard.Guard();
1686 
1689  &guard, &m_RoomWait, timeout, &timer))
1690  {
1692  "Cannot add task - all threads are busy");
1693  }
1694 
1695  if ( x_NoNewTaskAllowed() ) {
1697  }
1698 
1699  if ( timeout ) {
1700  adjusted_timeout.reset(new CTimeSpan
1701  (timeout->GetAsDouble() - timer.Elapsed()));
1702  timeout = adjusted_timeout.get();
1703  }
1704  }
1705 
1706  task->x_SetOwner(this);
1708  try {
1709  // Pushing to queue must be out of mutex to be able to wait
1710  // for available space.
1711  m_Queue.Push(Ref(task), timeout);
1712  }
1713  catch (...) {
1715  task->x_ResetOwner();
1716  throw;
1717  }
1718 
1719  if (m_IsQueueAllowed) {
1720  guard.Guard();
1721  }
1722 
1723  // Check if someone aborted the pool or suspended it with cancelation of
1724  // queued tasks after we added this task to the queue but before we were
1725  // able to acquire the mutex
1728  if (m_Aborted || (IsSuspended()
1730  {
1731  if (m_Queue.GetSize() != 0) {
1733  }
1734  return;
1735  }
1736 
1737  unsigned int cnt_req = (unsigned int) m_TotalTasks.Add(1);
1738 
1739  if (!m_IsQueueAllowed && cnt_req > GetThreadsCount()) {
1740  LaunchThreads(cnt_req - GetThreadsCount());
1741  }
1742 
1743  if (! IsSuspended()) {
1744  int count = GetQueuedTasksCount();
1746  if (! (*it)->IsFinishing()) {
1747  (*it)->WakeUp();
1748  --count;
1749  if (count == 0)
1750  break;
1751  }
1752  }
1753  }
1754 
1756 }
1757 
1758 inline void
1760 {
1761  TQueue::TAccessGuard q_guard(m_Queue);
1762 
1763  TQueue::TAccessGuard::TIterator it = q_guard.Begin();
1764  while (it != q_guard.End() && *it != task) {
1765  ++it;
1766  }
1767 
1768  if (it != q_guard.End()) {
1769  q_guard.Erase(it);
1770  }
1771 }
1772 
1773 void
1776 {
1777  _ASSERT(task);
1778 
1779  // To be sure that if simple new operator was passed as argument the task
1780  // will still be referenced even if some exception happen in this method
1781  CRef<CThreadPool_Task> task_ref(task);
1782 
1783  if (m_Aborted) {
1784  NCBI_THROW(CThreadPoolException, eProhibited,
1785  "Cannot add exclusive task when ThreadPool is aborted");
1786  }
1787 
1788  task->x_SetOwner(this);
1791 
1793  if (thread) {
1794  thread->WakeUp();
1795  }
1796 }
1797 
1798 void
1800 {
1801  _ASSERT(task);
1802 
1803  if (task->IsFinished()) {
1804  return;
1805  }
1806  // Some race can happen here if the task is being queued now
1807  if (task->GetStatus() == CThreadPool_Task::eIdle) {
1808  task->x_RequestToCancel();
1809  return;
1810  }
1811 
1812  CThreadPool* task_pool = task->GetPool();
1813  if (task_pool != m_Interface) {
1814  if (!task_pool) {
1815  // Task have just finished - we can do nothing
1816  return;
1817  }
1818 
1819  NCBI_THROW(CThreadPoolException, eInvalid,
1820  "Cannot cancel task execution "
1821  "if it is inserted in another ThreadPool");
1822  }
1823 
1824  task->x_RequestToCancel();
1825  x_RemoveTaskFromQueue(task);
1826 
1828 }
1829 
1830 inline void
1832 {
1835  == tasks_group
1836  && tasks_group != 0);
1837 
1838  if (tasks_group & CThreadPool::fCancelQueuedTasks) {
1840  }
1841  if (tasks_group & CThreadPool::fCancelExecutingTasks) {
1843  }
1844 
1846 }
1847 
1848 void
1850 {
1851  CThreadPool_Guard guard(this);
1852 
1854  (*it)->CancelCurrentTask();
1855  }
1856 
1857  // CThreadPool_ThreadImpl::Main() acts not under guard, so we cannot be
1858  // sure that it doesn't have already task to execute before it marked
1859  // itself as working
1861  (*it)->CancelCurrentTask();
1862  }
1863 }
1864 
1865 void
1867 {
1868  TQueue::TAccessGuard q_guard(m_Queue);
1869 
1870  for (TQueue::TAccessGuard::TIterator it = q_guard.Begin();
1871  it != q_guard.End(); ++it)
1872  {
1873  it->GetNCPointer()->x_RequestToCancel();
1874  }
1875 
1876  m_Queue.Clear();
1877 }
1878 
1879 inline void
1881 {
1882  CThreadPool_Guard guard(this);
1883 
1884  if (m_Aborted) {
1885  NCBI_THROW(CThreadPoolException, eProhibited,
1886  "Cannot flush threads when ThreadPool aborted");
1887  }
1888 
1889  if (flush_type == CThreadPool::eStartImmediately
1890  || (flush_type == CThreadPool::eWaitToFinish && IsSuspended()))
1891  {
1893  }
1894  else if (flush_type == CThreadPool::eWaitToFinish) {
1895  bool need_add = true;
1896 
1897  {{
1898  // To avoid races with TryGetExclusiveTask() we need to put
1899  // guard here
1901 
1902  if (m_ExclusiveQueue.GetSize() != 0) {
1903  m_FlushRequested = true;
1904  need_add = false;
1905  }
1906  }}
1907 
1908  if (need_add) {
1911  }
1912  }
1913 }
1914 
1915 inline void
1917 {
1918  CThreadPool_Guard guard(this);
1919 
1920  // Method can be called several times in a row and every time we need
1921  // to wait for threads to finish operation
1922  m_Aborted = true;
1923 
1924  // Cancel queued tasks
1925  unsigned int n_queued_tasks = GetQueuedTasksCount();
1926  if ( n_queued_tasks ) {
1927  ERR_POST_X(14, Warning <<
1928  "CThreadPool is being aborted or destroyed while still "
1929  "having " << n_queued_tasks << " regular tasks "
1930  "waiting to be executed; they are now canceled");
1931  }
1933 
1934  // Cancel currently executing tasks
1936 
1937  // Cancel exclusive tasks
1938  {{
1940 
1941  TExclusiveQueue::TSize n_exclusive_tasks = m_ExclusiveQueue.GetSize();
1942  if ( n_exclusive_tasks ) {
1943  ERR_POST_X(15, Warning <<
1944  "CThreadPool is being aborted or destroyed while still "
1945  "having " << n_exclusive_tasks << " exclusive tasks "
1946  "waiting to be executed; they are now canceled");
1947  }
1948 
1949  for (TExclusiveQueue::TAccessGuard::TIterator it = q_guard.Begin();
1950  it != q_guard.End(); ++it)
1951  {
1952  it->task->x_RequestToCancel();
1953  }
1954 
1956  }}
1957 
1958  // Stop threads
1959  if (m_ServiceThread.NotNull()) {
1961  }
1962 
1964 
1965  if (m_Controller.NotNull()) {
1967  }
1968 
1971  &guard, &m_AbortWait, timeout, &timer);
1972 
1974  if ( timeout )
1975  ERR_POST_X(16, Warning <<
1976  "CThreadPool::Abort() was unable to terminate "
1977  "all of its threads within the specified timeout: "
1978  << timeout->AsSmartString());
1979  else
1980  ERR_POST_X(17, Critical <<
1981  "CThreadPool::Abort() was not able to terminate"
1982  "all of its threads despite being given an infinite "
1983  "time for doing so");
1984  }
1985 
1986  m_AbortWait.Post();
1987 
1988  // This assigning can destroy the controller. If some threads are not
1989  // finished yet and at this very moment will call controller it can crash.
1990  //m_Controller = NULL;
1991 }
1992 
1993 
1994 
1996  unsigned int min_threads)
1997  : m_Pool(NULL),
1998  m_MinThreads(min_threads),
1999  m_MaxThreads(max_threads),
2000  m_InHandleEvent(false)
2001 {
2002  if (max_threads < min_threads || max_threads == 0) {
2004  "Invalid numbers of min and max number of threads:"
2005  " min=" << min_threads << ", max=" << max_threads);
2006  }
2007 }
2008 
2010 {}
2011 
2012 CThreadPool*
2014 {
2015  // Avoid changing of pointer during method execution
2016  CThreadPool_Impl* pool = m_Pool;
2017  return pool? pool->GetPoolInterface(): NULL;
2018 }
2019 
2020 CMutex&
2022 {
2024  if (!impl) {
2025  NCBI_THROW(CThreadPoolException, eInactive,
2026  "Cannot do active work when not attached "
2027  "to some ThreadPool");
2028  }
2029  return impl->GetMainPoolMutex();
2030 }
2031 
2032 void
2034 {
2035  CThreadPool_Impl* pool = m_Pool;
2036 
2037  if (! pool)
2038  return;
2039 
2040  Uint4 count = pool->GetThreadsCount();
2041  if (count > m_MaxThreads) {
2042  pool->FinishThreads(count - m_MaxThreads);
2043  }
2044  if (count < m_MinThreads) {
2045  pool->LaunchThreads(m_MinThreads - count);
2046  }
2047 }
2048 
2049 void
2050 CThreadPool_Controller::SetMinThreads(unsigned int min_threads)
2051 {
2052  CThreadPool_Guard guard(m_Pool, false);
2053  if (m_Pool)
2054  guard.Guard();
2055 
2056  m_MinThreads = min_threads;
2057 
2058  EnsureLimits();
2059 }
2060 
2061 void
2062 CThreadPool_Controller::SetMaxThreads(unsigned int max_threads)
2063 {
2064  CThreadPool_Guard guard(m_Pool, false);
2065  if (m_Pool)
2066  guard.Guard();
2067 
2068  m_MaxThreads = max_threads;
2069 
2070  EnsureLimits();
2071 }
2072 
2073 void
2075 {
2076  if (count > GetMaxThreads())
2077  count = GetMaxThreads();
2078  if (count < GetMinThreads())
2079  count = GetMinThreads();
2080 
2081  CThreadPool_Impl* pool = m_Pool;
2082 
2083  unsigned int now_cnt = pool->GetThreadsCount();
2084  if (count > now_cnt) {
2085  pool->LaunchThreads(count - now_cnt);
2086  }
2087  else if (count < now_cnt) {
2088  pool->FinishThreads(now_cnt - count);
2089  }
2090 }
2091 
2092 void
2094 {
2095  CThreadPool_Impl* pool = m_Pool;
2096  if (! pool)
2097  return;
2098 
2099  CThreadPool_Guard guard(pool);
2100 
2101  if (m_InHandleEvent || pool->IsAborted() || pool->IsSuspended())
2102  return;
2103 
2104  m_InHandleEvent = true;
2105 
2106  try {
2107  OnEvent(event);
2108  m_InHandleEvent = false;
2109  }
2110  catch (...) {
2111  m_InHandleEvent = false;
2112  throw;
2113  }
2114 }
2115 
2116 CTimeSpan
2118 {
2119  if (m_Pool) {
2120  return CTimeSpan(1, 0);
2121  }
2122  else {
2123  return CTimeSpan(0, 0);
2124  }
2125 }
2126 
2127 
2128 
2130 {
2131  _ASSERT(pool);
2132 
2133  m_Impl = new CThreadPool_ThreadImpl(this,
2135 }
2136 
2138 {
2139  delete m_Impl;
2140 }
2141 
2142 void
2144 {}
2145 
2146 void
2148 {}
2149 
2150 CThreadPool*
2152 {
2153  return m_Impl->GetPool();
2154 }
2155 
2158 {
2159  return m_Impl->GetCurrentTask();
2160 }
2161 
2162 void*
2164 {
2165  m_Impl->Main();
2166  return NULL;
2167 }
2168 
2169 void
2171 {
2172  m_Impl->OnExit();
2173 }
2174 
2175 
2176 
2177 CThreadPool::CThreadPool(unsigned int queue_size,
2178  unsigned int max_threads,
2179  unsigned int min_threads,
2180  CThread::TRunMode threads_mode)
2181 {
2182  m_Impl = new CThreadPool_Impl(this, queue_size, max_threads, min_threads,
2183  threads_mode);
2185 }
2186 
2187 CThreadPool::CThreadPool(unsigned int queue_size,
2188  CThreadPool_Controller* controller,
2189  CThread::TRunMode threads_mode)
2190 {
2191  m_Impl = new CThreadPool_Impl(this, queue_size, controller, threads_mode);
2193 }
2194 
2196 {
2198 }
2199 
2200 CMutex&
2202 {
2203  return m_Impl->GetMainPoolMutex();
2204 }
2205 
2208 {
2210 }
2211 
2212 void
2214 {
2215  m_Impl->AddTask(task, timeout);
2216 }
2217 
2218 void
2220 {
2221  m_Impl->CancelTask(task);
2222 }
2223 
2224 void
2226 {
2227  m_Impl->Abort(timeout);
2228 }
2229 
2230 bool
2232 {
2233  return m_Impl->IsAborted();
2234 }
2235 
2236 void
2238 {
2239  m_Impl->SetDestroyTimeout(timeout);
2240 }
2241 
2242 const CTimeSpan&
2244 {
2245  return m_Impl->GetDestroyTimeout();
2246 }
2247 
2248 void
2251 {
2253 }
2254 
2255 void
2257 {
2258  m_Impl->CancelTasks(tasks_group);
2259 }
2260 
2261 void
2263 {
2264  m_Impl->FlushThreads(flush_type);
2265 }
2266 
2267 unsigned int
2269 {
2270  return m_Impl->GetThreadsCount();
2271 }
2272 
2273 unsigned int
2275 {
2276  return m_Impl->GetQueuedTasksCount();
2277 }
2278 
2279 unsigned int
2281 {
2282  return m_Impl->GetExecutingTasksCount();
2283 }
2284 
2285 
2286 
CAtomicCounter –.
Definition: ncbicntr.hpp:71
CFastMutex –.
Definition: ncbimtx.hpp:667
void Guard(resource_type &resource)
Manually force the guard to protect some other resource.
Definition: guard.hpp:175
void Release()
Manually force the resource to be released.
Definition: guard.hpp:166
CMutex –.
Definition: ncbimtx.hpp:749
CObject –.
Definition: ncbiobj.hpp:180
CSemaphore –.
Definition: ncbimtx.hpp:1375
CStopWatch –.
Definition: ncbitime.hpp:1937
Exception object used throughout all CSyncQueue classes.
Definition: sync_queue.hpp:972
Access guard to non-constant CSyncQueue.
Definition: sync_queue.hpp:728
TIterator End(void)
Get iterator pointing to the tail of the queue.
TIterator Begin(void)
Get iterator pointing to the head of the queue.
TIterator Erase(TIterator iter)
Erase one element in the queue.
Iterator for CSyncQueue (constant or non-constant depending on template parameters).
Definition: sync_queue.hpp:849
Adaptor class to use STL multiset<> in CSyncQueue.
Thread-safe queue object with a blocking mechanism.
Definition: sync_queue.hpp:217
TSize GetSize(void) const
Get count of elements already stored in the queue.
Container::size_type TSize
Type of size of the queue.
Definition: sync_queue.hpp:224
void Push(const TValue &elem, const CTimeSpan *timeout=NULL)
Add new element to the end of queue.
TValue Pop(const CTimeSpan *timeout=NULL)
Retrieve an element from the queue.
void Clear(const CTimeSpan *timeout=NULL)
Remove all elements from the queue.
Exception class for all ThreadPool-related classes.
Default ThreadPool controller based on Proportional-Integral-Derivative algorithm.
Abstract class for controlling the number of threads in pool.
Special task which does nothing It's used in FlushThreads to force pool to wait while all old threads...
virtual EStatus Execute(void)
Empty main method.
Guardian for protecting pool by locking its main mutex.
CThreadPool_Guard(CThreadPool_Impl *pool, bool is_active=true)
Constructor.
CThreadPool_Impl * m_Pool
Pool protected by the guardian.
void Release(void)
Turn this guardian off.
void Guard(void)
Turn this guardian on.
Real implementation of all ThreadPool functions.
Definition: thread_pool.cpp:59
void TaskStarting(void)
Callback from thread when it is starting to execute task.
TExclusiveQueue m_ExclusiveQueue
Queue for information about exclusive tasks.
void FlushThreads(CThreadPool::EFlushType flush_type)
Finish all current threads and replace them with new ones.
CRef< CThreadPool_Impl > m_SelfRef
Reference to this pool to prevent its destroying earlier than we allow it to.
SExclusiveTaskInfo TryGetExclusiveTask(void)
Get the next exclusive task to execute.
void x_RemoveTaskFromQueue(const CThreadPool_Task *task)
Delete task from the queue If task does not exist in queue then does nothing.
void AddTask(CThreadPool_Task *task, const CTimeSpan *timeout)
Add task to the pool.
static CThreadPool_Impl * s_GetImplPointer(CThreadPool *pool)
Convert pointer to CThreadPool object into pointer to CThreadPool_Impl object.
CThreadPool_Impl(const CThreadPool_Impl &)
Prohibit copying and assigning.
static void sx_SetTaskStatus(CThreadPool_Task *task, CThreadPool_Task::EStatus status)
Call x_SetTaskStatus() for the given task.
volatile TExclusiveFlags m_SuspendFlags
Requested requirements for the exclusive execution environment.
CRef< CThreadPool_ServiceThread > m_ServiceThread
Thread for execution of exclusive tasks and passing of events to the controller.
CThread::TRunMode m_ThreadsMode
Running mode of all threads.
void RequestExclusiveExecution(CThreadPool_Task *task, TExclusiveFlags flags)
Add the task for exclusive execution in the pool.
bool(CThreadPool_Impl::* TWaitPredicate)(void) const
Type of some simple predicate.
CAtomicCounter m_ThreadsCount
Total number of threads Introduced for more adequate and fast reflecting to threads starting and stop...
void x_CancelExecutingTasks(void)
Cancel all currently executing tasks.
void DestroyReference(void)
Destroy reference to this object Method is called when CThreadPool object is destroyed which means th...
const CTimeSpan & GetDestroyTimeout(void) const
Get destroy timeout for the pool.
CThreadPool_Impl & operator=(const CThreadPool_Impl &)
unsigned int GetQueuedTasksCount(void) const
Get the number of tasks currently waiting in queue.
void CallController(CThreadPool_Controller::EEvent event)
Call the CThreadPool_Controller::HandleEvent() method of the pool controller with the given event typ...
TThreadsList m_IdleThreads
List of all idle threads.
CSemaphore m_RoomWait
Semaphore for waiting for available threads to process task when queuing is disabled.
CMutex & GetMainPoolMutex(void)
Get main pool mutex.
CTimeSpan GetSafeSleepTime(void) const
Call the CThreadPool_Controller::GetSafeSleepTime() method of the pool controller.
bool CanDoExclusiveTask(void) const
Check if it is already allowed to execute exclusive task.
void ThreadStopped(CThreadPool_ThreadImpl *thread)
Callback from working thread when it finished its Main() method.
atomic< bool > m_Suspended
If pool is suspended for exclusive task execution or not.
void LaunchThreads(unsigned int count)
Launch new threads in pool.
void ResumeWork(void)
Resume the pool operation after exclusive task execution.
TQueue m_Queue
Queue for storing tasks.
bool m_IsQueueAllowed
Flag about working with special case: FALSE - queue_size == 0, TRUE - queue_size > 0.
CThreadPool * m_Interface
ThreadPool interface object attached to this implementation.
void SetInterfaceStarted(void)
Mark that initialization of the interface was finished.
volatile bool m_FlushRequested
Flag indicating if flush of threads requested after adding exclusive task but before it is started it...
bool x_HasNoThreads(void) const
Check if all threads in pool finished their work.
void Abort(const CTimeSpan *timeout)
Abort the pool operation.
void CancelTasks(TExclusiveFlags tasks_group)
Cancel the selected groups of tasks in the pool.
CRef< CThreadPool_Controller > m_Controller
Controller managing count of threads in pool.
CThreadPool * GetPoolInterface(void) const
Get pointer to ThreadPool interface object.
unsigned int GetThreadsCount(void) const
Get number of threads running in the pool.
void x_Init(CThreadPool *pool_intf, CThreadPool_Controller *controller, CThread::TRunMode threads_mode)
Initialization of all class member variables that can be initialized outside of constructor.
bool x_CanAddImmediateTask(void) const
Check if new task can be added to the pool when queueiing is disabled.
CAtomicCounter m_TotalTasks
Total number of tasks acquired by pool Includes queued tasks and executing (but not exclusive!...
void FinishThreads(unsigned int count)
Finish threads in pool Stop first all idle threads then stop busy threads without cancelation of curr...
void TaskFinished(void)
Callback from thread when it has finished to execute task.
CThreadPool_Impl(CThreadPool *pool_intf, unsigned int queue_size, unsigned int max_threads, unsigned int min_threads, CThread::TRunMode threads_mode=CThread::fRunDefault)
Constructor with default controller.
CSemaphore m_AbortWait
Semaphore for waiting for threads finishing in Abort() method.
set< CThreadPool_ThreadImpl * > TThreadsList
Type of list of all poolled threads.
void CancelTask(CThreadPool_Task *task)
Request to cancel the task.
bool IsAborted(void) const
Check if the pool is already aborted.
void x_CancelQueuedTasks(void)
Cancel all tasks waiting in the queue.
void SetDestroyTimeout(const CTimeSpan &timeout)
Set destroy timeout for the pool.
~CThreadPool_Impl(void)
Destructor. Will be called from CRef.
static void sx_RequestToCancel(CThreadPool_Task *task)
Call x_RequestToCancel() for the given task.
bool IsSuspended(void) const
Check if the pool is suspended for exclusive execution.
TThreadsList m_WorkingThreads
List of all threads currently executing some tasks.
CAtomicCounter m_ExecutingTasks
Number of tasks executing now Introduced for more adequate and fast reflecting to task executing star...
void RequestSuspend(TExclusiveFlags flags)
Request suspension of the pool.
unsigned int GetExecutingTasksCount(void) const
Get the number of currently executing tasks.
bool SetThreadIdle(CThreadPool_ThreadImpl *thread, bool is_idle)
Mark thread as idle or non-idle.
CThreadPool::TExclusiveFlags TExclusiveFlags
Definition: thread_pool.cpp:61
void CallControllerOther(void)
Schedule running of CThreadPool_Controller::HandleEvent() with eOther event type.
void ThreadStateChanged(void)
Callback when some thread changed its idleness or finished (including service thread)
CTimeSpan m_DestroyTimeout
Timeout to wait for all threads to finish before the ThreadPool interface object will be able to dest...
CRef< CThreadPool_Task > TryGetNextTask(void)
Get next task from queue if there is one If the queue is empty then return NULL.
CSyncQueue< SExclusiveTaskInfo > TExclusiveQueue
Type of queue used for storing information about exclusive tasks.
CSyncQueue< CRef< CThreadPool_Task >, CSyncQueue_multiset< CRef< CThreadPool_Task >, SThreadPool_TaskCompare > > TQueue
Type of queue used for storing tasks.
atomic< bool > m_Aborted
If pool is already aborted or not.
unsigned int x_GetQueueSize(unsigned int queue_size)
Transform size of queue given in constructor to the size passed to CSyncQueue constructor.
bool x_WaitForPredicate(TWaitPredicate wait_func, CThreadPool_Guard *pool_guard, CSemaphore *wait_sema, const CTimeSpan *timeout, const CStopWatch *timer)
Wait for some predicate to be true.
bool x_NoNewTaskAllowed(void) const
Check if addeding new tasks to the pool is prohibited.
CMutex m_MainPoolMutex
Mutex for guarding all changes in the pool, its threads and controller.
Thread used in pool for different internal needs: execution of exclusive tasks and passing events to ...
bool IsFinished(void)
Check if this thread have already finished or not.
CRef< CThreadPool_Impl > m_Pool
Pool owning this thread.
void NeedCallController(void)
Tell the thread that controller should handle eOther event.
CSemaphore m_IdleTrigger
Semaphore for idle sleeping.
void WakeUp(void)
Wake up from idle waiting or waiting of pool preparing exclusive environment.
virtual void * Main(void)
Main thread execution.
virtual ~CThreadPool_ServiceThread(void)
Destructor. Will be called from CRef.
CThreadPool_ServiceThread(CThreadPool_Impl *pool)
Constructor.
atomic< bool > m_Finished
If the thread has already finished its Main() method.
CRef< CThreadPool_Task > m_CurrentTask
Currently executing exclusive task.
atomic< bool > m_Finishing
If finishing of the thread is already requested.
CAtomicCounter m_NeedCallController
Flag indicating that thread should pass eOther event to the controller.
void RequestToFinish(void)
Request finishing of the thread.
CFastMutex m_FastMutex
General-use mutex for very (very!) trivial ops.
void x_Idle(void)
Do "idle" work when thread is not busy executing exclusive tasks.
Abstract class for representing single task executing in pool of threads To use this class in applica...
Definition: thread_pool.hpp:76
Real implementation of all CThreadPool_Thread functions.
CThreadPool * GetPool(void) const
Get ThreadPool interface object owning this thread.
CRef< CThreadPool_Task > m_CurrentTask
Task currently executing in the thread.
static CThreadPool_ThreadImpl * s_GetImplPointer(CThreadPool_Thread *thread)
Convert pointer to CThreadPool_Thread object into pointer to CThreadPool_ThreadImpl object.
void RequestToFinish(void)
Request this thread to finish its operation.
~CThreadPool_ThreadImpl(void)
Destructor Called directly from CThreadPool destructor.
CThreadPool_ThreadImpl & operator=(const CThreadPool_ThreadImpl &)
CThreadPool_ThreadImpl(CThreadPool_Thread *thread_intf, CThreadPool_Impl *pool)
Constructor.
bool IsFinishing(void) const
If finishing of this thread is already in progress or not.
void x_TaskFinished(CThreadPool_Task::EStatus status)
Do finalizing when task finished its execution.
CSemaphore m_IdleTrigger
Semaphore for waking up from idle waiting.
void WakeUp(void)
Wake up the thread from idle state.
CThreadPool_ThreadImpl(const CThreadPool_ThreadImpl &)
Prohibit copying and assigning.
bool x_SetIdleState(bool is_idle)
Mark the thread idle or non-idle.
atomic< bool > m_Finishing
If the thread is already asked to finish or not.
CFastMutex m_FastMutex
General-use mutex for very (very!) trivial ops.
void x_Idle(void)
Suspend until the wake up signal.
bool m_IsIdle
Idleness of the thread.
void Main(void)
Implementation of thread Main() method.
CRef< CThreadPool_Impl > m_Pool
Pool running the thread.
atomic< bool > m_CancelRequested
If cancel of the currently executing task is requested or not.
void CancelCurrentTask(void)
Request to cancel current task execution.
static CThreadPool_Thread * s_CreateThread(CThreadPool *pool)
Create new CThreadPool_Thread object Method introduced to avoid excessive friendship to CThreadPool_T...
CRef< CThreadPool_Task > GetCurrentTask(void) const
Get task currently executing in the thread May be NULL if thread is idle or is in the middle of chang...
CThreadPool_Thread * m_Interface
ThreadPool_Thread interface object attached to this implementation.
void OnExit(void)
Implementation of threadOnExit() method.
Base class for a thread running inside CThreadPool and executing tasks.
Main class implementing functionality of pool of threads.
CTimeSpan.
Definition: ncbitime.hpp:1313
CTimeout – Timeout interval.
Definition: ncbitime.hpp:1693
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
parent_type::iterator iterator
Definition: set.hpp:80
size_type size() const
Definition: set.hpp:132
const_iterator find(const key_type &key) const
Definition: set.hpp:137
void erase(iterator pos)
Definition: set.hpp:151
const_iterator end() const
Definition: set.hpp:136
static uch flags
static void check_flags(TDSCOLUMN *curcol, int n, const char *possible_results)
Definition: flags.c:37
#define true
Definition: bool.h:35
#define false
Definition: bool.h:36
#define bool
Definition: bool.h:34
static FILE * f
Definition: readconf.c:23
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
#define REVERSE_ITERATE(Type, Var, Cont)
ITERATE macro to reverse sequence through container elements.
Definition: ncbimisc.hpp:827
@ eNegative
Value is negative.
Definition: ncbimisc.hpp:121
#define NULL
Definition: ncbistd.hpp:225
TNCBIAtomicValue TValue
Alias TValue for TNCBIAtomicValue.
Definition: ncbicntr.hpp:73
void Set(TValue new_value) THROWS_NONE
Set atomic counter value.
Definition: ncbicntr.hpp:185
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
Definition: ncbicntr.hpp:278
TValue Get(void) const THROWS_NONE
Get atomic counter value.
Definition: ncbicntr.hpp:168
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
Definition: ncbidiag.hpp:550
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1203
#define NCBI_CATCH_ALL_X(err_subcode, message)
Definition: ncbiexpt.hpp:619
#define STD_CATCH_ALL_X(err_subcode, message)
Standard handling of "exception"-derived exceptions; catches non-standard exceptions and generates "u...
Definition: ncbiexpt.hpp:608
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
#define NCBI_THROW_FMT(exception_class, err_code, message)
The same as NCBI_THROW but with message processed as output to ostream.
Definition: ncbiexpt.hpp:719
TObjectType * GetNCPointerOrNull(void) const THROWS_NONE
Get pointer value.
Definition: ncbiobj.hpp:1162
bool NotNull(void) const THROWS_NONE
Check if pointer is not null – same effect as NotEmpty().
Definition: ncbiobj.hpp:744
CRef< C > Ref(C *object)
Helper functions to get CRef<> and CConstRef<> objects.
Definition: ncbiobj.hpp:2015
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:773
bool NotEmpty(void) const THROWS_NONE
Check if CRef is not empty – pointing to an object and has a non-null value.
Definition: ncbiobj.hpp:726
CObject & operator=(const CObject &src) THROWS_NONE
Assignment operator.
Definition: ncbiobj.hpp:482
bool IsNull(void) const THROWS_NONE
Check if pointer is null – same effect as Empty().
Definition: ncbiobj.hpp:735
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
#define kMax_Int
Definition: ncbi_limits.h:184
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
virtual void OnEvent(EEvent event)=0
Main method for the implementation of controlling algorithm.
atomic< CThreadPool_Impl * > m_Pool
ThreadPool to which this controller is attached.
friend class CThreadPool_ThreadImpl
EEvent
Events that can happen with ThreadPool.
CThreadPool(unsigned int queue_size, unsigned int max_threads, unsigned int min_threads=2, CThread::TRunMode threads_mode=CThread::fRunDefault)
Constructor.
atomic< EStatus > m_Status
Status of the task.
void x_AttachToPool(CThreadPool_Impl *pool)
Attach the controller to ThreadPool.
void EnsureLimits(void)
Ensure that constraints of minimum and maximum count of threads in pool are met.
unsigned int m_Priority
Priority of the task.
virtual void OnStatusChange(EStatus old)
Callback to notify on changes in the task status.
virtual EStatus Execute(void)=0
Do the actual job.
void CancelTask(CThreadPool_Task *task)
Request to cancel the task and remove it from queue if it is there.
virtual void OnExit(void)
To prevent overriding - do cleanup after exiting from thread.
unsigned int GetMinThreads(void) const
Get the minimum number of threads in pool.
CThreadPool * GetPool(void) const
Get the thread pool in which this thread is running.
atomic< CThreadPool_Impl * > m_Pool
Pool owning this task.
EStatus GetStatus(void) const
Get status of the task.
atomic< bool > m_CancelRequested
Flag indicating if cancellation of the task was already requested.
CThreadPool * GetPool(void) const
Get pool to which this class is attached.
void x_Init(unsigned int priority)
Init all members in constructor.
unsigned int GetPriority(void) const
Get priority of the task.
virtual CThreadPool_Thread * CreateThread(void)
Create new thread for the pool.
virtual CTimeSpan GetSafeSleepTime(void) const
Get maximum timeout for which calls to method HandleEvent() can be missing.
virtual void OnCancelRequested(void)
Callback to notify when cancellation of the task is requested.
CThreadPool_Controller(unsigned int max_threads, unsigned int min_threads)
Constructor.
CRef< CThreadPool_Task > GetCurrentTask(void) const
Get the task currently executing in the thread.
void x_SetOwner(CThreadPool_Impl *pool)
Set pool as owner of this task.
unsigned int GetThreadsCount(void) const
Get total number of threads currently running in pool.
unsigned int m_MaxThreads
Maximum number of threads in pool.
CThreadPool * GetPool(void) const
The thread pool which accepted this task for execution.
CMutex & GetMainPoolMutex(CThreadPool *pool) const
Get mutex which guards access to pool All work in controller should be based on the same mutex as in ...
virtual void Initialize(void)
Init this thread. It is called at beginning of Main()
void x_RequestToCancel(void)
Internal canceling of the task.
void x_ResetOwner(void)
Detach task from the pool (if insertion into the pool has failed).
void Abort(const CTimeSpan *timeout=NULL)
Abort all functions of the pool – cancel all queued tasks, send cancellation notifications to all cur...
void FlushThreads(EFlushType flush_type)
Finish all current threads and replace them with new ones.
unsigned int GetMaxThreads(void) const
Get the maximum number of threads in pool.
CThreadPool_Task(unsigned int priority=0)
Constructor.
EStatus
Status of the task.
Definition: thread_pool.hpp:79
virtual ~CThreadPool(void)
Destructor – will wait for all its threads to finish with the timeout set by CThreadPool::SetDestroyT...
CThreadPool_Impl * m_Impl
Actual implementation of the pool.
CThreadPool_ThreadImpl * m_Impl
Actual implementation of the thread.
virtual ~CThreadPool_Controller(void)
Destructor. Have to be called only from CRef.
void x_DetachFromPool(void)
Detach the controller from pool when pool is aborted.
EFlushType
When to start new threads after flushing old ones.
bool IsAborted(void) const
Does method Abort() was already called for this ThreadPool.
void HandleEvent(EEvent event)
This method is called every time something happens in a pool, such as: new task added,...
void RequestExclusiveExecution(CThreadPool_Task *task, TExclusiveFlags flags=0)
Add the task for exclusive execution in the pool By default the pool suspends all new and queued task...
CMutex & GetMainPoolMutex(void)
Get the mutex that protects all changes in the pool.
const CTimeSpan & GetDestroyTimeout(void) const
Get timeout to wait for all threads to finish before the pool will be able to destroy.
unsigned int TExclusiveFlags
Type of bit-masked combination of several values from EExclusiveFlags.
virtual ~CThreadPool_Thread(void)
Destructor.
void SetMinThreads(unsigned int min_threads)
Set the minimum number of threads in pool.
bool IsFinished(void) const
Check if task execution has been already finished (successfully or not)
void RequestToCancel(void)
Cancel the task.
void CancelTasks(TExclusiveFlags tasks_group)
Cancel the selected groups of tasks in the pool.
CAtomicCounter_WithAutoInit m_IsBusy
Flag indicating that the task is already added to some pool.
void SetThreadsCount(unsigned int count)
Set number of threads in pool Adjust given number to conform to minimum and maximum threads count con...
void SetDestroyTimeout(const CTimeSpan &timeout)
Set timeout to wait for all threads to finish before the pool should be able to destroy.
virtual void Finalize(void)
Clean up. It is called by OnExit()
CThreadPool_Task & operator=(const CThreadPool_Task &other)
Assignment.
virtual ~CThreadPool_Task(void)
Destructor. Will be called from CRef.
unsigned int GetExecutingTasksCount(void) const
Get the number of currently executing tasks.
bool IsCancelRequested(void) const
Check if cancellation of the task was requested.
virtual void * Main(void)
To prevent overriding - main thread function.
void x_SetStatus(EStatus new_status)
Set task status.
bool m_InHandleEvent
If controller is already inside HandleEvent() processing.
void SetMaxThreads(unsigned int max_threads)
Set the maximum number of threads in pool.
void AddTask(CThreadPool_Task *task, const CTimeSpan *timeout=NULL)
Add task to the pool for execution.
unsigned int GetQueuedTasksCount(void) const
Get the number of tasks currently waiting in queue.
CThreadPool_Thread(CThreadPool *pool)
Construct and attach to the pool.
unsigned int m_MinThreads
Minimum number of threads in pool.
friend class CThreadPool_Impl
@ eOther
All other events (happen asynchronously, so cannot be further distinguished)
@ eResume
ThreadPool is resumed after exclusive task execution.
@ eSuspend
ThreadPool is suspended for exclusive task execution.
@ eIdle
has not been placed in queue yet
Definition: thread_pool.hpp:80
@ eQueued
in the queue, awaiting execution
Definition: thread_pool.hpp:81
@ eExecuting
being executed
Definition: thread_pool.hpp:82
@ eFailed
failure during execution
Definition: thread_pool.hpp:84
@ eCompleted
executed successfully
Definition: thread_pool.hpp:83
@ eCanceled
canceled - possible only if canceled before processing was started or if method Execute() returns res...
Definition: thread_pool.hpp:85
@ eWaitToFinish
New threads can be started only when all old threads finished their execution.
@ eStartImmediately
New threads can be started immediately.
@ fCancelQueuedTasks
Cancel all tasks waiting in the queue and not yet executing.
@ fCancelExecutingTasks
Cancel all currently executing tasks.
@ fExecuteQueuedTasks
Execute all tasks waiting in the queue before execution of exclusive task.
@ fDoNotAllowNewTasks
Do not allow to add new tasks to the pool during exclusive task execution.
@ fFlushThreads
Finish all threads currently running in the pool.
unsigned int m_MaxThreads
Maximum simultaneous threads.
bool Run(TRunMode flags=fRunDefault)
Run the thread.
Definition: ncbithr.cpp:724
void Wait(void)
Wait on semaphore.
Definition: ncbimtx.cpp:1787
int TRunMode
Bitwise OR'd flags for thread creation passed to Run().
Definition: ncbithr.hpp:558
bool TryWait(unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Timed wait.
Definition: ncbimtx.cpp:1844
void Post(unsigned int count=1)
Increment the semaphore by "count".
Definition: ncbimtx.cpp:1971
@ fRunAllowST
Allow threads to run in single thread builds.
Definition: ncbithr.hpp:549
@ fRunDefault
Default mode.
Definition: ncbithr.hpp:540
@ fRunDetached
Run the thread detached (non-joinable)
Definition: ncbithr.hpp:541
double Elapsed(void) const
Return time elapsed since first Start() or last Restart() call (in seconds).
Definition: ncbitime.hpp:2775
long GetNanoSecondsAfterSecond(void) const
Get number of nanoseconds.
Definition: ncbitime.hpp:2562
double GetAsDouble(void) const
Return time span as number of seconds.
Definition: ncbitime.hpp:2565
ESign GetSign(void) const
Get sign of time span.
Definition: ncbitime.hpp:2529
string AsSmartString(ESmartStringPrecision precision, ERound rounding, ESmartStringZeroMode zero_mode=eSSZ_SkipZero) const
Transform time span to "smart" string.
Definition: ncbitime.hpp:2688
long GetCompleteSeconds(void) const
Get number of complete seconds.
Definition: ncbitime.hpp:2559
@ eStart
Start timer immediately after creating.
Definition: ncbitime.hpp:1941
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
@ eEmptyGuard
Definition: guard.hpp:94
Definition of all error codes used in util (xutil.lib).
int i
static MDB_envinfo info
Definition: mdb_load.c:37
EIPRangeType t
Definition: ncbi_localip.c:101
#define count
Type for storing information about exclusive task launching.
CRef< CThreadPool_Task > task
SExclusiveTaskInfo(TExclusiveFlags f, CRef< CThreadPool_Task > t)
Functor to compare tasks by priority.
Definition: thread_pool.cpp:48
bool operator()(const CRef< CThreadPool_Task > &left, const CRef< CThreadPool_Task > &right) const
Definition: thread_pool.cpp:49
Definition of synchronized queue (CSyncQueue template) and templates related to it.
#define _ASSERT
static CThreadPool_Task::EStatus s_ConvertTaskResult(CThreadPool_Task::EStatus status)
Check if status returned from CThreadPool_Task::Execute() is allowed and change it to eCompleted valu...
const CAtomicCounter::TValue kNeedCallController_Shift
static void ThrowAddProhibited(void)
Throw an exception with standard message when AddTask() is called but ThreadPool is aborted or do not...
Pool of generic task-executing threads.
Implementations of controllers for ThreadPool.
done
Definition: token1.c:1
Modified on Fri Sep 20 14:58:06 2024 by modify_doxy.py rev. 669887