NCBI C++ ToolKit
thread_pool_old.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef UTIL__THREAD_POOL_OLD__HPP
2 #define UTIL__THREAD_POOL_OLD__HPP
3 
4 /* $Id: thread_pool_old.hpp 99264 2023-03-03 16:11:53Z ucko $
5 * ===========================================================================
6 *
7 * PUBLIC DOMAIN NOTICE
8 * National Center for Biotechnology Information
9 *
10 * This software/database is a "United States Government Work" under the
11 * terms of the United States Copyright Act. It was written as part of
12 * the author's official duties as a United States Government employee and
13 * thus cannot be copyrighted. This software/database is freely available
14 * to the public for use. The National Library of Medicine and the U.S.
15 * Government have not placed any restriction on its use or reproduction.
16 *
17 * Although all reasonable efforts have been taken to ensure the accuracy
18 * and reliability of the software and data, the NLM and the U.S.
19 * Government do not and cannot warrant the performance or results that
20 * may be obtained by using this software or data. The NLM and the U.S.
21 * Government disclaim all warranties, express or implied, including
22 * warranties of performance, merchantability or fitness for any particular
23 * purpose.
24 *
25 * Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Author: Aaron Ucko
30 *
31 * File Description:
32 * Pools of generic request-handling threads.
33 *
34 * TEMPLATES:
35 * CBlockingQueue<> -- queue of requests, with efficiently blocking Get()
36 * CThreadInPool<> -- abstract request-handling thread
37 * CPoolOfThreads<> -- abstract pool of threads sharing a request queue
38 *
39 * SPECIALIZATIONS:
40 * CStdRequest -- abstract request type
41 * CStdThreadInPool -- thread handling CStdRequest
42 * CStdPoolOfThreads -- pool of threads handling CStdRequest
43 */
44 
45 #include <corelib/ncbistd.hpp>
46 #include <corelib/ncbithr.hpp>
47 #include <corelib/ncbitime.hpp>
48 #include <corelib/ncbi_limits.hpp>
49 #include <corelib/ncbi_param.hpp>
50 #include <util/util_exception.hpp>
51 #include <util/error_codes.hpp>
52 
53 #include <set>
54 
55 
56 /** @addtogroup ThreadedPools
57  *
58  * @{
59  */
60 
62 
63 
64 /////////////////////////////////////////////////////////////////////////////
65 ///
66 /// CQueueItemBase -- skeleton blocking-queue item, sans actual request
67 
68 class CQueueItemBase : public CObject {
69 public:
70  enum EStatus {
71  ePending, ///< still in the queue
72  eActive, ///< extracted but not yet released
73  eComplete, ///< extracted and released
74  eWithdrawn, ///< dropped by submitter's request
75  eForciblyCaught ///< let an exception escape
76  };
77 
78  /// Every request has an associated 32-bit priority field, but
79  /// only the top eight bits are under direct user control. (The
80  /// rest are a counter.)
81  typedef Uint4 TPriority;
83 
85  : m_Priority(priority), m_Status(ePending)
86  { }
87 
88  bool operator> (const CQueueItemBase& item) const
89  { return m_Priority > item.m_Priority; }
90 
91  const TPriority& GetPriority(void) const { return m_Priority; }
92  const EStatus& GetStatus(void) const { return m_Status; }
93  TUserPriority GetUserPriority(void) const { return TUserPriority(m_Priority >> 24); }
94 
97 
98 protected:
101 
102  virtual void x_SetStatus(EStatus new_status)
103  { m_Status = new_status; }
104 };
105 
106 
107 /////////////////////////////////////////////////////////////////////////////
108 ///
109 /// CBlockingQueue<> -- queue of requests, with efficiently blocking Get()
110 
111 template <typename TRequest>
113 {
114 public:
117 
118  class CQueueItem;
120 
121  /// It may be desirable to store handles obtained from GetHandle() in
122  /// instances of CCompletingHandle to ensure that they are marked as
123  /// complete when all is said and done, even in the face of exceptions.
125  {
126  public:
128  : TItemHandle(h)
129  { }
130 
132  if (this->NotEmpty()) {
133  this->GetObject().MarkAsComplete();
134  }
135  }
136  };
137 
138  /// Constructor
139  ///
140  /// @param max_size
141  /// The maximum size of the queue (may not be zero!)
142  CBlockingQueue(size_t max_size = kMax_UInt)
143  : m_GetSem(0,1), m_PutSem(1,1), m_HungerSem(0,1), m_HungerCnt(0),
144  m_MaxSize(min(max_size, size_t(0xFFFFFF))),
145  m_RequestCounter(0xFFFFFF)
146  { _ASSERT(max_size > 0); }
147 
148  /// Put a request into the queue. If the queue remains full for
149  /// the duration of the (optional) timeout, throw an exception.
150  ///
151  /// @param request
152  /// Request
153  /// @param priority
154  /// The priority of the request. The higher the priority
155  /// the sooner the request will be processed.
156  /// @param timeout_sec
157  /// Number of whole seconds in timeout
158  /// @param timeout_nsec
159  /// Number of additional nanoseconds in timeout
160  TItemHandle Put(const TRequest& request, TUserPriority priority = 0,
161  unsigned int timeout_sec = 0,
162  unsigned int timeout_nsec = 0);
163 
164  /// Wait for room in the queue for up to
165  /// timeout_sec + timeout_nsec/1E9 seconds.
166  ///
167  /// @param timeout_sec
168  /// Number of seconds
169  /// @param timeout_nsec
170  /// Number of nanoseconds
171  void WaitForRoom(unsigned int timeout_sec = kMax_UInt,
172  unsigned int timeout_nsec = 0) const;
173 
174  /// Wait for the queue to have waiting readers, for up to
175  /// timeout_sec + timeout_nsec/1E9 seconds.
176  ///
177  /// @param timeout_sec
178  /// Number of seconds
179  /// @param timeout_nsec
180  /// Number of nanoseconds
181  void WaitForHunger(unsigned int timeout_sec = kMax_UInt,
182  unsigned int timeout_nsec = 0) const;
183 
184  /// Get the first available request from the queue, and return a
185  /// handle to it.
186  /// Blocks politely if empty.
187  /// Waits up to timeout_sec + timeout_nsec/1E9 seconds.
188  ///
189  /// @param timeout_sec
190  /// Number of seconds
191  /// @param timeout_nsec
192  /// Number of nanoseconds
193  TItemHandle GetHandle(unsigned int timeout_sec = kMax_UInt,
194  unsigned int timeout_nsec = 0);
195 
196  /// Get the first available request from the queue, and return
197  /// just the request.
198  /// Blocks politely if empty.
199  /// Waits up to timeout_sec + timeout_nsec/1E9 seconds.
200  ///
201  /// @param timeout_sec
202  /// Number of seconds
203  /// @param timeout_nsec
204  /// Number of nanoseconds
206  TRequest Get(unsigned int timeout_sec = kMax_UInt,
207  unsigned int timeout_nsec = 0);
208 
209  /// Get the number of requests in the queue
210  size_t GetSize (void) const;
211 
212  /// Get the maximun number of requests that can be put into the queue
213  size_t GetMaxSize (void) const { return m_MaxSize; }
214 
215  /// Check if the queue is empty
216  bool IsEmpty (void) const { return GetSize() == 0; }
217 
218  /// Check if the queue is full
219  bool IsFull (void) const { return GetSize() == GetMaxSize(); }
220 
221  /// Adjust a pending request's priority.
222  void SetUserPriority(TItemHandle handle, TUserPriority priority);
223 
224  /// Withdraw a pending request from consideration.
225  void Withdraw(TItemHandle handle);
226 
227  /// Get the number of threads waiting for requests, for debugging
228  /// purposes only.
229  size_t GetHunger(void) const { return m_HungerCnt; }
230 
231  class CQueueItem : public CQueueItemBase
232  {
233  public:
234  // typedef CBlockingQueue<TRequest> TQueue;
235  CQueueItem(Uint4 priority, TRequest request)
236  : CQueueItemBase(priority), m_Request(request)
237  { }
238 
239  const TRequest& GetRequest(void) const { return m_Request; }
240  TRequest& SetRequest(void) { return m_Request; }
241  // void SetUserPriority(TUserPriority p);
242  // void Withdraw(void);
243 
244  protected:
245  // Specialized for CRef<CStdRequest> in thread_pool.cpp
246  void x_SetStatus(EStatus new_status)
247  { CQueueItemBase::x_SetStatus(new_status); }
248 
249  private:
250  friend class CBlockingQueue<TRequest>;
251 
252  // TQueue& m_Queue;
253  TRequest m_Request;
254  };
255 
256 protected:
258  bool operator()(const TItemHandle& i1, const TItemHandle& i2) const
259  { return static_cast<CQueueItemBase>(*i1)
260  > static_cast<CQueueItemBase>(*i2); }
261  };
262 
263  /// The type of the queue
265 
266  // Derived classes should take care to use these members properly.
267  volatile TRealQueue m_Queue; ///< The queue
268  CSemaphore m_GetSem; ///< Raised if the queue contains data
269  mutable CSemaphore m_PutSem; ///< Raised if the queue has room
270  mutable CSemaphore m_HungerSem; ///< Raised if Get[Handle] has to wait
271  mutable CMutex m_Mutex; ///< Guards access to queue
272  size_t m_HungerCnt; ///< Number of threads waiting for data
273 
274 private:
275  size_t m_MaxSize; ///< The maximum size of the queue
277 
278  typedef bool (CBlockingQueue::*TQueuePredicate)(const TRealQueue& q) const;
279 
280  bool x_GetSemPred(const TRealQueue& q) const
281  { return !q.empty(); }
282  bool x_PutSemPred(const TRealQueue& q) const
283  { return q.size() < m_MaxSize; }
284  bool x_HungerSemPred(const TRealQueue& q) const
285  { return m_HungerCnt > q.size(); }
286 
288  CMutexGuard& guard, unsigned int timeout_sec,
289  unsigned int timeout_nsec) const;
290 
291 private:
292  /// forbidden
295 };
296 
297 
298 /////////////////////////////////////////////////////////////////////////////
299 ///
300 /// CThreadInPool<> -- abstract request-handling thread
301 
302 template <typename TRequest> class CPoolOfThreads;
303 
304 template <typename TRequest>
305 class CThreadInPool : public CThread
306 {
307 public:
312 
313  /// Thread run mode
314  enum ERunMode {
315  eNormal, ///< Process request and stay in the pool
316  eRunOnce ///< Process request and die
317  };
318 
319  /// Constructor
320  ///
321  /// @param pool
322  /// A pool where this thead is placed
323  /// @param mode
324  /// A running mode of this thread
326  : m_Pool(pool), m_RunMode(mode), m_Counter(NULL) {}
327 
328  void CountSelf(CAtomicCounter* counter);
329 
330 protected:
331  /// Destructor
332  virtual ~CThreadInPool(void);
333 
334  /// Intit this thread. It is called at beginning of Main()
335  virtual void Init(void) {}
336 
337  /// Process a request.
338  /// It is called from Main() for each request this thread handles
339  ///
340  /// @param
341  /// A request for processing
342  virtual void ProcessRequest(TItemHandle handle);
343 
344  /// Older interface (still delegated to by default)
345  virtual void ProcessRequest(const TRequest& req) = 0;
346 
347  /// Clean up. It is called by OnExit()
348  virtual void x_OnExit(void) {}
349 
350  /// Get run mode
351  ERunMode GetRunMode(void) const { return m_RunMode; }
352 
353 private:
354  // to prevent overriding; inherited from CThread
355  virtual void* Main(void);
356  virtual void OnExit(void);
357 
358  void x_HandleOneRequest(bool catch_all);
359  void x_UnregisterThread(void);
360 
362  {
363  public:
367 
368  private:
370  };
371 
372  friend class CAutoUnregGuard;
373 
374 
375  TPool* m_Pool; ///< The pool that holds this thread
376  ERunMode m_RunMode; ///< How long to keep running
378 };
379 
380 
381 /////////////////////////////////////////////////////////////////////////////
382 ///
383 /// CPoolOfThreads<> -- abstract pool of threads sharing a request queue
384 
385 template <typename TRequest>
387 {
388 public:
390  typedef typename TThread::ERunMode ERunMode;
391 
395 
396  /// Constructor
397  ///
398  /// @param max_threads
399  /// The maximum number of threads that this pool can run
400  /// @param queue_size
401  /// The maximum number of requests in the queue
402  /// @param spawn_threashold
403  /// The number of requests in the queue after which
404  /// a new thread is started
405  /// @param max_urgent_threads
406  /// The maximum number of urgent threads running simultaneously
407  CPoolOfThreads(unsigned int max_threads, unsigned int queue_size,
408  unsigned int spawn_threshold = 1,
409  unsigned int max_urgent_threads = kMax_UInt,
410  const string& thread_name = kEmptyStr);
411 
412  /// Destructor
413  virtual ~CPoolOfThreads(void);
414 
415  /// Start processing threads
416  ///
417  /// @param num_threads
418  /// The number of threads to start
419  void Spawn(unsigned int num_threads);
420 
421  /// Put a request in the queue with a given priority
422  ///
423  /// @param request
424  /// A request
425  /// @param priority
426  /// The priority of the request. The higher the priority
427  /// the sooner the request will be processed.
428  TItemHandle AcceptRequest(const TRequest& request,
429  TUserPriority priority = 0,
430  unsigned int timeout_sec = 0,
431  unsigned int timeout_nsec = 0);
432 
433  /// Puts a request in the queue with the highest priority
434  /// It will run a new thread even if the maximum of allowed threads
435  /// has been already reached
436  ///
437  /// @param request
438  /// A request
439  TItemHandle AcceptUrgentRequest(const TRequest& request,
440  unsigned int timeout_sec = 0,
441  unsigned int timeout_nsec = 0);
442 
443  /// Wait for the room in the queue up to
444  /// timeout_sec + timeout_nsec/1E9 seconds.
445  ///
446  /// @param timeout_sec
447  /// Number of seconds
448  /// @param timeout_nsec
449  /// Number of nanoseconds
450  void WaitForRoom(unsigned int timeout_sec = kMax_UInt,
451  unsigned int timeout_nsec = 0);
452 
453  /// Check if the queue is full
454  bool IsFull(void) const { return m_Queue.IsFull(); }
455 
456  /// Check if the queue is empty
457  bool IsEmpty(void) const { return m_Queue.IsEmpty(); }
458 
459  /// Check whether a new request could be immediately processed
460  ///
461  /// @param urgent
462  /// Whether the request would be urgent.
463  bool HasImmediateRoom(bool urgent = false) const;
464 
465  /// Adjust a pending request's priority.
466  void SetUserPriority(TItemHandle handle, TUserPriority priority);
467 
468  /// Withdraw a pending request from consideration.
469  void Withdraw(TItemHandle handle)
470  { m_Queue.Withdraw(handle); }
471 
472  /// Get the number of requests in the queue
473  size_t GetQueueSize(void) const
474  { return m_Queue.GetSize(); }
475 
476 
477 protected:
478 
479  /// Create a new thread
480  ///
481  /// @param mode
482  /// How long the thread should stay around
484 
485  /// Register a thread. It is called by TThread::Main.
486  /// It should detach a thread if not tracking
487  ///
488  /// @param thread
489  /// A thread to register
490  virtual void Register(TThread& thread) { thread.Detach(); }
491 
492  /// Unregister a thread
493  ///
494  /// @param thread
495  /// A thread to unregister
496  virtual void UnRegister(TThread&) {}
497 
498 
500 
501  /// The maximum number of threads the pool can hold
503  /// The maximum number of urgent threads running simultaneously
505  int m_Threshold; ///< for delta
506  /// The current number of threads in the pool
508  /// The current number of urgent threads running now
510  /// The difference between the number of unfinished requests and
511  /// the total number of threads in the pool.
512  atomic<int> m_Delta;
513  /// The guard for m_MaxThreads, m_MaxUrgentThreads, and m_Delta.
514  mutable CMutex m_Mutex;
515  /// The request queue
518 
519  const string m_ThreadName;
520 
521 private:
522  friend class CThreadInPool<TRequest>;
523  TItemHandle x_AcceptRequest(const TRequest& req,
524  TUserPriority priority,
525  bool urgent,
526  unsigned int timeout_sec = 0,
527  unsigned int timeout_nsec = 0);
528 
530 };
531 
532 /////////////////////////////////////////////////////////////////////////////
533 //
534 // SPECIALIZATIONS:
535 //
536 
537 /////////////////////////////////////////////////////////////////////////////
538 //
539 // CStdRequest -- abstract request type
540 
541 class CStdRequest : public CObject
542 {
543 public:
544  ///Destructor
545  virtual ~CStdRequest(void) {}
546 
547  /// Do the actual job
548  /// Called by whichever thread handles this request.
549  virtual void Process(void) = 0;
550 
552 
553  /// Callback for status changes
554  virtual void OnStatusChange(EStatus /* old */, EStatus /* new */) {}
555 };
556 
557 
559 inline
560 void CBlockingQueue<CRef<CStdRequest> >::CQueueItem::x_SetStatus
561 (EStatus new_status)
562 {
563  EStatus old_status = GetStatus();
564  CQueueItemBase::x_SetStatus(new_status);
565  m_Request->OnStatusChange(old_status, new_status);
566 }
567 
568 
569 
570 /////////////////////////////////////////////////////////////////////////////
571 //
572 // CStdThreadInPool -- thread handling CStdRequest
573 
575  : public CThreadInPool< CRef< CStdRequest > >
576 {
577 public:
579 
580  /// Constructor
581  ///
582  /// @param pool
583  /// A pool where this thead is placed
584  /// @param mode
585  /// A running mode of this thread
586  CStdThreadInPool(TPool* pool, ERunMode mode = eNormal)
587  : TParent(pool, mode) {}
588 
589 protected:
590  /// Process a request.
591  ///
592  /// @param
593  /// A request for processing
594  virtual void ProcessRequest(const CRef<CStdRequest>& req)
595  { const_cast<CStdRequest&>(*req).Process(); }
596 
597  // Avoid shadowing the handle-based version.
598  virtual void ProcessRequest(TItemHandle handle)
599  { TParent::ProcessRequest(handle); }
600 };
601 
602 /////////////////////////////////////////////////////////////////////////////
603 //
604 // CStdPoolOfThreads -- pool of threads handling CStdRequest
605 
607  : public CPoolOfThreads< CRef< CStdRequest > >
608 {
609 public:
611 
612  /// Constructor
613  ///
614  /// @param max_threads
615  /// The maximum number of threads that this pool can run
616  /// @param queue_size
617  /// The maximum number of requests in the queue
618  /// @param spawn_threshold
619  /// The number of requests in the queue after which
620  /// a new thread is started
621  /// @param max_urgent_threads
622  /// The maximum number of urgent threads running simultaneously
623  CStdPoolOfThreads(unsigned int max_threads, unsigned int queue_size,
624  unsigned int spawn_threshold = 1,
625  unsigned int max_urgent_threads = kMax_UInt,
626  const string& thread_name = kEmptyStr)
627  : TParent(max_threads, queue_size, spawn_threshold, max_urgent_threads,
628  thread_name)
629  {}
630 
631  virtual ~CStdPoolOfThreads();
632 
633  enum EKillFlags {
634  fKill_Wait = 0x1, ///< Wait for all threads in the pool to finish.
635  fKill_Reopen = 0x2 ///< Allow a fresh batch of worker threads.
636  };
637  typedef int TKillFlags; ///< binary OR of EKillFlags
638 
639  /// Causes all threads in the pool to exit cleanly after finishing
640  /// all pending requests, optionally waiting for them to die.
641  ///
642  /// @param flags
643  /// Governs optional behavior
644  virtual void KillAllThreads(TKillFlags flags);
645 
646  /// Causes all threads in the pool to exit cleanly after finishing
647  /// all pending requests, optionally waiting for them to die.
648  ///
649  /// @param wait
650  /// If true will wait until all thread in the pool finish their job
651  virtual void KillAllThreads(bool wait)
652  { KillAllThreads(wait ? (fKill_Wait | fKill_Reopen) : fKill_Reopen); }
653 
654  /// Register a thread.
655  ///
656  /// @param thread
657  /// A thread to register
658  virtual void Register(TThread& thread);
659 
660  /// Unregister a thread
661  ///
662  /// @param thread
663  /// A thread to unregister
664  virtual void UnRegister(TThread& thread);
665 
666 protected:
667  /// Create a new thread
668  ///
669  /// @param mode
670  /// A thread's running mode
671  virtual TThread* NewThread(TThread::ERunMode mode)
672  { return new CStdThreadInPool(this, mode); }
673 
674 private:
675  typedef list<CRef<TThread> > TThreads;
677 };
678 
679 
680 NCBI_PARAM_DECL(bool, ThreadPool, Catch_Unhandled_Exceptions);
681 typedef NCBI_PARAM_TYPE(ThreadPool, Catch_Unhandled_Exceptions) TParamThreadPoolCatchExceptions;
682 
683 
684 
685 /////////////////////////////////////////////////////////////////////////////
686 
687 /////////////////////////////////////////////////////////////////////////////
688 // IMPLEMENTATION of INLINE functions
689 /////////////////////////////////////////////////////////////////////////////
690 
691 
692 /////////////////////////////////////////////////////////////////////////////
693 // CBlockingQueue<>::
694 //
695 
696 template <typename TRequest>
698 CBlockingQueue<TRequest>::Put(const TRequest& data, TUserPriority priority,
699  unsigned int timeout_sec,
700  unsigned int timeout_nsec)
701 {
702  CMutexGuard guard(m_Mutex);
703  // Having the mutex, we can safely drop "volatile"
704  TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
705  if ( !x_WaitForPredicate(&CBlockingQueue::x_PutSemPred, m_PutSem, guard,
706  timeout_sec, timeout_nsec) ) {
708  "CBlockingQueue<>::Put: "
709  "attempt to insert into a full queue");
710  }
711  if (m_RequestCounter == 0) {
712  m_RequestCounter = 0xFFFFFF;
713  NON_CONST_ITERATE (typename TRealQueue, it, q) {
714  CQueueItem& val = const_cast<CQueueItem&>(**it);
715  val.m_Priority = (val.m_Priority & 0xFF000000) | m_RequestCounter--;
716  }
717  }
718  /// Structure of the internal priority:
719  /// The highest byte is a user specified priority;
720  /// the other three bytes are a counter which ensures that
721  /// requests with the same user-specified priority are processed
722  /// in FIFO order
723  TPriority real_priority = (priority << 24) | m_RequestCounter--;
724  TItemHandle handle(new CQueueItem(real_priority, data));
725  q.insert(handle);
726  m_GetSem.TryWait();
727  m_GetSem.Post();
728  if (q.size() == m_MaxSize) {
729  m_PutSem.TryWait();
730  }
731  return handle;
732 }
733 
734 
735 template <typename TRequest>
736 void CBlockingQueue<TRequest>::WaitForRoom(unsigned int timeout_sec,
737  unsigned int timeout_nsec) const
738 {
739  // Make sure there's room, but don't actually change any state
740  CMutexGuard guard(m_Mutex);
741  if (x_WaitForPredicate(&CBlockingQueue::x_PutSemPred, m_PutSem, guard,
742  timeout_sec, timeout_nsec)) {
743  m_PutSem.Post(); // signal that the room still exists
744  } else {
746  "CBlockingQueue<>::WaitForRoom: timed out");
747  }
748 }
749 
750 template <typename TRequest>
751 void CBlockingQueue<TRequest>::WaitForHunger(unsigned int timeout_sec,
752  unsigned int timeout_nsec) const
753 {
754  CMutexGuard guard(m_Mutex);
755  if (x_WaitForPredicate(&CBlockingQueue::x_HungerSemPred, m_HungerSem, guard,
756  timeout_sec, timeout_nsec)) {
757  m_HungerSem.Post();
758  } else {
760  "CBlockingQueue<>::WaitForHunger: timed out");
761  }
762 }
763 
764 
765 template <typename TRequest>
767 CBlockingQueue<TRequest>::GetHandle(unsigned int timeout_sec,
768  unsigned int timeout_nsec)
769 {
770  CMutexGuard guard(m_Mutex);
771  // Having the mutex, we can safely drop "volatile"
772  TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
773 
774  if (q.empty()) {
775  _VERIFY(++m_HungerCnt);
776  m_HungerSem.TryWait();
777  m_HungerSem.Post();
778 
779  bool ok = x_WaitForPredicate(&CBlockingQueue::x_GetSemPred, m_GetSem,
780  guard, timeout_sec, timeout_nsec);
781 
782  if (--m_HungerCnt <= q.size()) {
783  m_HungerSem.TryWait();
784  }
785 
786  if ( !ok ) {
788  "CBlockingQueue<>::Get[Handle]: timed out");
789  }
790  }
791 
792  TItemHandle handle(*q.begin());
793  q.erase(q.begin());
794  if (m_HungerCnt > q.size()) {
795  m_HungerSem.TryWait();
796  m_HungerSem.Post();
797  }
798  if ( ! q.empty() ) {
799  m_GetSem.TryWait();
800  m_GetSem.Post();
801  }
802 
803  // Get the attention of WaitForRoom() or the like; do this
804  // regardless of queue size because derived classes may want
805  // to insert multiple objects atomically.
806  m_PutSem.TryWait();
807  m_PutSem.Post();
808 
809  guard.Release(); // avoid possible deadlocks from x_SetStatus
810  handle->x_SetStatus(CQueueItem::eActive);
811  return handle;
812 }
813 
814 template <typename TRequest>
815 TRequest CBlockingQueue<TRequest>::Get(unsigned int timeout_sec,
816  unsigned int timeout_nsec)
817 {
818  TItemHandle handle = GetHandle(timeout_sec, timeout_nsec);
819  handle->MarkAsComplete(); // almost certainly premature, but our last chance
820  return handle->GetRequest();
821 }
822 
823 
824 template <typename TRequest>
826 {
827  CMutexGuard guard(m_Mutex);
828  return const_cast<const TRealQueue&>(m_Queue).size();
829 }
830 
831 
832 template <typename TRequest>
834  TUserPriority priority)
835 {
836  if (handle->GetUserPriority() == priority
837  || handle->GetStatus() != CQueueItem::ePending) {
838  return;
839  }
840  CMutexGuard guard(m_Mutex);
841  // Having the mutex, we can safely drop "volatile"
842  TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
843  typename TRealQueue::iterator it = q.find(handle);
844  // These sanity checks protect against race conditions and
845  // accidental use of handles from other queues.
846  if (it != q.end() && *it == handle) {
847  q.erase(it);
848  TPriority counter = handle->m_Priority & 0xFFFFFF;
849  handle->m_Priority = (priority << 24) | counter;
850  q.insert(handle);
851  }
852 }
853 
854 
855 template <typename TRequest>
857 {
858  if (handle->GetStatus() != CQueueItem::ePending) {
859  return;
860  }
861  {{
862  CMutexGuard guard(m_Mutex);
863  // Having the mutex, we can safely drop "volatile"
864  TRealQueue& q = const_cast<TRealQueue&>(m_Queue);
865  typename TRealQueue::iterator it = q.find(handle);
866  // These sanity checks protect against race conditions and
867  // accidental use of handles from other queues.
868  if (it != q.end() && *it == handle) {
869  q.erase(it);
870 
871  if(q.empty()) {
872  // m_GetSem may be signaled - clear it
873  m_GetSem.TryWait();
874  }
875  } else {
876  return;
877  }
878  }}
879  // run outside the guard to avoid possible deadlocks from x_SetStatus
880  handle->x_SetStatus(CQueueItem::eWithdrawn);
881 }
882 
883 template <typename TRequest>
885  CSemaphore& sem,
886  CMutexGuard& guard,
887  unsigned int timeout_sec,
888  unsigned int timeout_nsec)
889  const
890 {
891  const TRealQueue& q = const_cast<const TRealQueue&>(m_Queue);
892  if ( !(this->*pred)(q) ) {
893 #if SIZEOF_INT == SIZEOF_LONG
894  // If long is larger, converting from unsigned int to (signed)
895  // long for CTimeSpan will automatically be safe.
896  unsigned int extra_sec = timeout_nsec / kNanoSecondsPerSecond;
897  timeout_nsec %= kNanoSecondsPerSecond;
898  // Do the comparison this way to avoid overflow.
899  if (timeout_sec >= kMax_Int - extra_sec) {
900  timeout_sec = kMax_Int; // clamp
901  } else {
902  timeout_sec += extra_sec;
903  }
904 #endif
905  // _ASSERT(timeout_nsec <= (unsigned long)kMax_Long);
906  CTimeSpan span(timeout_sec, timeout_nsec);
907  while (span.GetSign() == ePositive && !(this->*pred)(q) ) {
909  // Temporarily release the mutex while waiting, to avoid deadlock.
910  guard.Release();
911  sem.TryWait((unsigned int)span.GetCompleteSeconds(),
912  (unsigned int)span.GetNanoSecondsAfterSecond());
913  guard.Guard(m_Mutex);
914  span -= CurrentTime(CTime::eGmt) - start;
915  }
916  }
917  sem.TryWait();
918  return (this->*pred)(q);
919 }
920 
921 /////////////////////////////////////////////////////////////////////////////
922 // CThreadInPool<>::
923 //
924 
925 template <typename TRequest>
927 {
928  _ASSERT(m_Counter == NULL);
929  counter->Add(1);
930  m_Counter = counter;
931 }
932 
933 template <typename TRequest>
935 {
936  if (m_Counter != NULL) {
937  m_Counter->Add(-1);
938  }
939 }
940 
941 template <typename TRequest>
943  : m_Thread(thr)
944 {}
945 
946 template <typename TRequest>
948 {
949  m_Thread->x_UnregisterThread();
950 }
951 
952 
953 template <typename TRequest>
955 {
956  if (m_Counter != NULL) {
957  m_Counter->Add(-1);
958  m_Counter = NULL;
959  }
960  m_Pool->UnRegister(*this);
961 }
962 
963 template <typename TRequest>
965 {
966  TItemHandle handle;
967  {{
968  CMutexGuard guard(m_Pool->m_Mutex);
969  --m_Pool->m_Delta;
970  }}
971  try {
972  handle.Reset(m_Pool->m_Queue.GetHandle());
973  } catch (CBlockingQueueException& e) {
974  // work around "impossible" timeouts
975  NCBI_REPORT_EXCEPTION_XX(Util_Thread, 1, "Unexpected timeout", e);
976  CMutexGuard guard(m_Pool->m_Mutex);
977  ++m_Pool->m_Delta;
978  return;
979  }
980  if (catch_all) {
981  try {
982  ProcessRequest(handle);
983  } catch (std::exception& e) {
984  handle->MarkAsForciblyCaught();
985  NCBI_REPORT_EXCEPTION_XX(Util_Thread, 2,
986  "Exception from thread in pool: ", e);
987  // throw;
988  } catch (...) {
989  handle->MarkAsForciblyCaught();
990  // silently propagate non-standard exceptions because they're
991  // likely to be CExitThreadException.
992  // ERR_POST_XX(Util_Thread, 3,
993  // "Thread in pool threw non-standard exception.");
994  throw;
995  }
996  }
997  else {
998  ProcessRequest(handle);
999  }
1000 }
1001 
1002 template <typename TRequest>
1004 {
1005  _ASSERT(m_Pool);
1006 
1007  const string& name = m_Pool->m_ThreadName;
1008 
1009  if (!name.empty()) {
1010  SetCurrentThreadName(name);
1011  }
1012 
1013  try {
1014  m_Pool->Register(*this);
1015  } catch (CThreadException&) {
1016  ERR_POST(Warning << "New worker thread blocked at the last minute.");
1017  return 0;
1018  }
1019  CAutoUnregGuard guard(this);
1020 
1021  Init();
1022  bool catch_all = TParamThreadPoolCatchExceptions::GetDefault();
1023 
1024  for (;;) {
1025  x_HandleOneRequest(catch_all);
1026  if (m_RunMode == eRunOnce)
1027  break;
1028  }
1029 
1030  return 0;
1031 }
1032 
1033 
1034 template <typename TRequest>
1036 {
1037  try {
1038  x_OnExit();
1039  } STD_CATCH_ALL_XX(Util_Thread, 6, "x_OnExit")
1040 }
1041 
1042 template <typename TRequest>
1044 {
1045  TCompletingHandle completer = handle;
1046  ProcessRequest(completer->GetRequest());
1047 }
1048 
1049 
1050 /////////////////////////////////////////////////////////////////////////////
1051 // CPoolOfThreads<>::
1052 //
1053 
1054 template <typename TRequest>
1056  unsigned int queue_size,
1057  unsigned int spawn_threshold,
1058  unsigned int max_urgent_threads,
1059  const string& thread_name)
1060  : m_MaxThreads(max_threads), m_MaxUrgentThreads(max_urgent_threads),
1061  m_Threshold(spawn_threshold), m_Delta(0),
1062  m_Queue(queue_size > 0 ? queue_size : max_threads),
1063  m_QueuingForbidden(queue_size == 0),
1064  m_ThreadName(thread_name)
1065 {
1066 }
1067 
1068 
1069 template <typename TRequest>
1071 {
1072  CAtomicCounter::TValue n = m_ThreadCount.Get() + m_UrgentThreadCount.Get();
1073  if (n) {
1074  ERR_POST_XX(Util_Thread, 4,
1075  Warning << "CPoolOfThreads<>::~CPoolOfThreads: "
1076  << n << " thread(s) still active");
1077  }
1078 }
1079 
1080 template <typename TRequest>
1081 void CPoolOfThreads<TRequest>::Spawn(unsigned int num_threads)
1082 {
1083  for (unsigned int i = 0; i < num_threads; i++)
1084  {
1085  x_RunNewThread(TThread::eNormal, &m_ThreadCount);
1086  }
1087 }
1088 
1089 
1090 template <typename TRequest>
1091 inline
1094  TUserPriority priority,
1095  unsigned int timeout_sec,
1096  unsigned int timeout_nsec)
1097 {
1098  return x_AcceptRequest(req, priority, false, timeout_sec, timeout_nsec);
1099 }
1100 
1101 template <typename TRequest>
1102 inline
1105  unsigned int timeout_sec,
1106  unsigned int timeout_nsec)
1107 {
1108  return x_AcceptRequest(req, 0xFF, true, timeout_sec, timeout_nsec);
1109 }
1110 
1111 template <typename TRequest>
1112 inline
1114 {
1115  CMutexGuard guard(m_Mutex);
1116 
1117  if (m_Queue.IsFull()) {
1118  return false; // temporary blockage
1119  } else if (m_Delta.load() < 0) {
1120  return true;
1121  } else if (m_ThreadCount.Get() < m_MaxThreads.Get()) {
1122  return true;
1123  } else if (urgent
1124  && m_UrgentThreadCount.Get() < m_MaxUrgentThreads.Get()) {
1125  return true;
1126  } else {
1127  try {
1128  m_Queue.WaitForHunger(0);
1129  // This should be redundant with the m_Delta < 0 case, now that
1130  // m_Mutex guards it.
1131  ERR_POST_XX(Util_Thread, 5,
1132  "Possible thread pool bug. delta: " << m_Delta.load()
1133  << "; hunger: " << m_Queue.GetHunger());
1134  return true;
1135  } catch (...) {
1136  }
1137  return false;
1138  }
1139 }
1140 
1141 template <typename TRequest>
1142 inline
1143 void CPoolOfThreads<TRequest>::WaitForRoom(unsigned int timeout_sec,
1144  unsigned int timeout_nsec)
1145 {
1146  if (HasImmediateRoom()) {
1147  return;
1148  } else if (m_QueuingForbidden) {
1149  m_Queue.WaitForHunger(timeout_sec, timeout_nsec);
1150  } else {
1151  m_Queue.WaitForRoom(timeout_sec, timeout_nsec);
1152  }
1153 }
1154 
1155 template <typename TRequest>
1156 inline
1159  TUserPriority priority,
1160  bool urgent,
1161  unsigned int timeout_sec,
1162  unsigned int timeout_nsec)
1163 {
1164  bool new_thread = false;
1165  TItemHandle handle;
1166  {{
1167  CMutexGuard guard(m_Mutex);
1168  // we reserved 0xFF priority for urgent requests
1169  if( priority == 0xFF && !urgent )
1170  --priority;
1171  if (m_QueuingForbidden && !HasImmediateRoom(urgent) ) {
1173  "CPoolOfThreads<>::x_AcceptRequest: "
1174  "attempt to insert into a full queue");
1175  }
1176  handle = m_Queue.Put(req, priority, timeout_sec, timeout_nsec);
1177  if (++m_Delta >= m_Threshold
1178  && m_ThreadCount.Get() < m_MaxThreads.Get()) {
1179  // Add another thread to the pool because they're all busy.
1180  new_thread = true;
1181  } else if (urgent
1182  && m_UrgentThreadCount.Get() >= m_MaxUrgentThreads.Get()) {
1183  // Prevent from running a new urgent thread if we have reached
1184  // the maximum number of urgent threads
1185  urgent = false;
1186  }
1187  }}
1188 
1189  if (new_thread) {
1190  x_RunNewThread(TThread::eNormal, &m_ThreadCount);
1191  } else if (urgent) {
1192  x_RunNewThread(TThread::eRunOnce, &m_UrgentThreadCount);
1193  }
1194 
1195  return handle;
1196 }
1197 
1198 template <typename TRequest>
1199 inline
1201  CAtomicCounter* counter)
1202 {
1203  try {
1204  CRef<TThread> thr(NewThread(mode));
1205  thr->CountSelf(counter);
1206  thr->Run();
1207  }
1208  catch (CThreadException& ex) {
1209  ERR_POST_XX(Util_Thread, 13,
1210  Critical << "Ignoring error while starting new thread: "
1211  << ex);
1212  }
1213 }
1214 
1215 template <typename TRequest>
1216 inline
1218  TUserPriority priority)
1219 {
1220  // Maintain segregation between urgent and non-urgent requests
1221  if (handle->GetUserPriority() == 0xFF) {
1222  return;
1223  } else if (priority == 0xFF) {
1224  priority = 0xFE;
1225  }
1226  m_Queue.SetUserPriority(handle, priority);
1227 }
1228 
1230 
1231 
1232 /* @} */
1233 
1234 #endif /* UTIL__THREAD_POOL_OLD__HPP */
EStatus
#define bool
Definition: bool.h:34
CAtomicCounter_WithAutoInit –.
Definition: ncbicntr.hpp:120
CAtomicCounter –.
Definition: ncbicntr.hpp:71
It may be desirable to store handles obtained from GetHandle() in instances of CCompletingHandle to e...
CBlockingQueue<> – queue of requests, with efficiently blocking Get()
void Guard(resource_type &resource)
Manually force the guard to protect some other resource.
Definition: guard.hpp:175
void Release()
Manually force the resource to be released.
Definition: guard.hpp:166
CMutex –.
Definition: ncbimtx.hpp:749
CObject –.
Definition: ncbiobj.hpp:180
CThreadInPool<> – abstract request-handling thread.
CQueueItemBase – skeleton blocking-queue item, sans actual request.
CRef –.
Definition: ncbiobj.hpp:618
CSemaphore –.
Definition: ncbimtx.hpp:1375
CTimeSpan.
Definition: ncbitime.hpp:1313
CTime –.
Definition: ncbitime.hpp:296
Definition: set.hpp:45
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
const_iterator begin() const
Definition: set.hpp:135
parent_type::iterator iterator
Definition: set.hpp:80
size_type size() const
Definition: set.hpp:132
bool empty() const
Definition: set.hpp:133
const_iterator find(const key_type &key) const
Definition: set.hpp:137
void erase(iterator pos)
Definition: set.hpp:151
const_iterator end() const
Definition: set.hpp:136
Include a standard set of the NCBI C++ Toolkit most basic headers.
static uch flags
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
@ ePositive
Value is positive.
Definition: ncbimisc.hpp:123
#define NULL
Definition: ncbistd.hpp:225
TNCBIAtomicValue TValue
Alias TValue for TNCBIAtomicValue.
Definition: ncbicntr.hpp:73
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
Definition: ncbicntr.hpp:278
#define _VERIFY(expr)
Definition: ncbidbg.hpp:161
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define ERR_POST_XX(error_name, err_subcode, message)
Error posting with error code having given name and with given error subcode.
Definition: ncbidiag.hpp:564
#define STD_CATCH_ALL_XX(err_name, err_subcode, message)
Standard handling of "exception"-derived exceptions; catches non-standard exceptions and generates "u...
Definition: ncbiexpt.hpp:640
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1203
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
#define NCBI_REPORT_EXCEPTION_XX(err_name, err_subcode, title, ex)
Generate a report on the exception with default error code and given subcode.
Definition: ncbiexpt.hpp:766
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:773
bool NotEmpty(void) const THROWS_NONE
Check if CRef is not empty – pointing to an object and has a non-null value.
Definition: ncbiobj.hpp:726
TObjectType & GetObject(void)
Get object.
Definition: ncbiobj.hpp:1011
uint8_t Uint1
1-byte (8-bit) unsigned integer
Definition: ncbitype.h:99
#define NCBI_DEPRECATED
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
#define kMax_Int
Definition: ncbi_limits.h:184
#define kMax_UInt
Definition: ncbi_limits.h:185
#define EMPTY_TEMPLATE
Definition: ncbistl.hpp:159
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
#define kEmptyStr
Definition: ncbistr.hpp:123
CMutex m_Mutex
Guards access to queue.
CQueueItemBase::TPriority TPriority
TPool * m_Pool
The pool that holds this thread.
size_t m_HungerCnt
Number of threads waiting for data.
const TPriority & GetPriority(void) const
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
TPriority m_Priority
bool IsEmpty(void) const
Check if the queue is empty.
bool(CBlockingQueue::* TQueuePredicate)(const TRealQueue &q) const
const EStatus & GetStatus(void) const
void MarkAsComplete(void)
void WaitForRoom(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0) const
Wait for room in the queue for up to timeout_sec + timeout_nsec/1E9 seconds.
bool operator()(const TItemHandle &i1, const TItemHandle &i2) const
TQueue::TUserPriority TUserPriority
virtual TThread * NewThread(ERunMode mode)=0
Create a new thread.
virtual void ProcessRequest(TItemHandle handle)
CAtomicCounter * m_Counter
virtual ~CStdRequest(void)
Destructor.
CSemaphore m_HungerSem
Raised if Get[Handle] has to wait.
CThreadInPool< CRef< CStdRequest > > TParent
virtual void x_OnExit(void)
Clean up. It is called by OnExit()
virtual void UnRegister(TThread &)
Unregister a thread.
TItemHandle Put(const TRequest &request, TUserPriority priority=0, unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Put a request into the queue.
TUserPriority GetUserPriority(void) const
CMutex m_Mutex
The guard for m_MaxThreads, m_MaxUrgentThreads, and m_Delta.
atomic< int > m_Delta
The difference between the number of unfinished requests and the total number of threads in the pool.
bool x_HungerSemPred(const TRealQueue &q) const
size_t GetSize(void) const
Get the number of requests in the queue.
void Withdraw(TItemHandle handle)
Withdraw a pending request from consideration.
void WaitForHunger(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0) const
Wait for the queue to have waiting readers, for up to timeout_sec + timeout_nsec/1E9 seconds.
virtual ~CPoolOfThreads(void)
Destructor.
void x_SetStatus(EStatus new_status)
friend class CAutoUnregGuard
void WaitForRoom(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0)
Wait for the room in the queue up to timeout_sec + timeout_nsec/1E9 seconds.
size_t GetHunger(void) const
Get the number of threads waiting for requests, for debugging purposes only.
TThread::ERunMode ERunMode
TItemHandle AcceptRequest(const TRequest &request, TUserPriority priority=0, unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Put a request in the queue with a given priority.
bool HasImmediateRoom(bool urgent=false) const
Check whether a new request could be immediately processed.
CQueueItemBase::TUserPriority TUserPriority
CBlockingQueue(size_t max_size=kMax_UInt)
Constructor.
CAtomicCounter::TValue TACValue
bool IsEmpty(void) const
Check if the queue is empty.
size_t GetMaxSize(void) const
Get the maximun number of requests that can be put into the queue.
ERunMode m_RunMode
How long to keep running.
bool IsFull(void) const
Check if the queue is full.
void x_HandleOneRequest(bool catch_all)
virtual ~CThreadInPool(void)
Destructor.
size_t m_MaxSize
The maximum size of the queue.
size_t GetQueueSize(void) const
Get the number of requests in the queue.
Uint4 TPriority
Every request has an associated 32-bit priority field, but only the top eight bits are under direct u...
virtual void ProcessRequest(const TRequest &req)=0
Older interface (still delegated to by default)
TRequest Get(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0)
Get the first available request from the queue, and return just the request.
bool x_GetSemPred(const TRealQueue &q) const
TItemHandle x_AcceptRequest(const TRequest &req, TUserPriority priority, bool urgent, unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
TQueue m_Queue
The request queue.
virtual TThread * NewThread(TThread::ERunMode mode)
Create a new thread.
virtual void Init(void)
Intit this thread. It is called at beginning of Main()
CStdThreadInPool(TPool *pool, ERunMode mode=eNormal)
Constructor.
bool x_PutSemPred(const TRealQueue &q) const
CStdPoolOfThreads(unsigned int max_threads, unsigned int queue_size, unsigned int spawn_threshold=1, unsigned int max_urgent_threads=kMax_UInt, const string &thread_name=kEmptyStr)
Constructor.
virtual void OnStatusChange(EStatus, EStatus)
Callback for status changes.
CBlockingQueue< TRequest > TQueue
CBlockingQueue & operator=(const CBlockingQueue &)
volatile TRealQueue m_Queue
The queue.
list< CRef< TThread > > TThreads
CBlockingQueue(const CBlockingQueue &)
forbidden
CQueueItemBase::EStatus EStatus
bool x_WaitForPredicate(TQueuePredicate pred, CSemaphore &sem, CMutexGuard &guard, unsigned int timeout_sec, unsigned int timeout_nsec) const
void Spawn(unsigned int num_threads)
Start processing threads.
CCompletingHandle(const TItemHandle &h)
virtual void Register(TThread &thread)
Register a thread.
CQueueItem(Uint4 priority, TRequest request)
CSemaphore m_PutSem
Raised if the queue has room.
TItemHandle AcceptUrgentRequest(const TRequest &request, unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Puts a request in the queue with the highest priority It will run a new thread even if the maximum of...
virtual void ProcessRequest(TItemHandle handle)
Process a request.
bool IsFull(void) const
Check if the queue is full.
void Withdraw(TItemHandle handle)
Withdraw a pending request from consideration.
int m_Threshold
for delta
void x_UnregisterThread(void)
void MarkAsForciblyCaught(void)
void x_RunNewThread(ERunMode mode, CAtomicCounter *counter)
CThreadInPool(TPool *pool, ERunMode mode=eNormal)
Constructor.
void SetUserPriority(TItemHandle handle, TUserPriority priority)
Adjust a pending request's priority.
int TKillFlags
binary OR of EKillFlags
const TRequest & GetRequest(void) const
CAtomicCounter_WithAutoInit m_ThreadCount
The current number of threads in the pool.
CThreadInPool< TRequest > TThread
ERunMode GetRunMode(void) const
Get run mode.
virtual void OnExit(void)
Override this to execute finalization code.
virtual void ProcessRequest(const CRef< CStdRequest > &req)
Process a request.
CBlockingQueue< TRequest >::TItemHandle TItemHandle
CAtomicCounter_WithAutoInit m_MaxThreads
The maximum number of threads the pool can hold.
virtual void KillAllThreads(bool wait)
Causes all threads in the pool to exit cleanly after finishing all pending requests,...
CQueueItemBase(TPriority priority)
NCBI_PARAM_DECL(bool, ThreadPool, Catch_Unhandled_Exceptions)
CPoolOfThreads< CRef< CStdRequest > > TParent
virtual void x_SetStatus(EStatus new_status)
CRef< CQueueItem > TItemHandle
typedef NCBI_PARAM_TYPE(ThreadPool, Catch_Unhandled_Exceptions) TParamThreadPoolCatchExceptions
CThreadInPool< TRequest > TThread
const string m_ThreadName
CAtomicCounter_WithAutoInit m_MaxUrgentThreads
The maximum number of urgent threads running simultaneously.
CPoolOfThreads< TRequest > TPool
TItemHandle GetHandle(unsigned int timeout_sec=kMax_UInt, unsigned int timeout_nsec=0)
Get the first available request from the queue, and return a handle to it.
CBlockingQueue< TRequest >::CCompletingHandle TCompletingHandle
bool operator>(const CQueueItemBase &item) const
virtual void Process(void)=0
Do the actual job Called by whichever thread handles this request.
CAtomicCounter_WithAutoInit m_UrgentThreadCount
The current number of urgent threads running now.
CSemaphore m_GetSem
Raised if the queue contains data.
TQueue::TItemHandle TItemHandle
set< TItemHandle, SItemHandleGreater > TRealQueue
The type of the queue.
void CountSelf(CAtomicCounter *counter)
CPoolOfThreads(unsigned int max_threads, unsigned int queue_size, unsigned int spawn_threshold=1, unsigned int max_urgent_threads=kMax_UInt, const string &thread_name=kEmptyStr)
Constructor.
void SetUserPriority(TItemHandle handle, TUserPriority priority)
Adjust a pending request's priority.
@ eComplete
extracted and released
@ eForciblyCaught
let an exception escape
@ ePending
still in the queue
@ eWithdrawn
dropped by submitter's request
@ eActive
extracted but not yet released
@ eNormal
Process request and stay in the pool.
@ eRunOnce
Process request and die.
unsigned int m_MaxThreads
Maximum simultaneous threads.
ERunMode
Which mode should the thread run in.
Definition: ncbithr.hpp:539
void Detach(void)
Inform the thread that user does not need to wait for its termination.
Definition: ncbithr.cpp:833
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
Definition: ncbithr.cpp:958
bool TryWait(unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Timed wait.
Definition: ncbimtx.cpp:1844
long GetNanoSecondsAfterSecond(void) const
Get number of nanoseconds.
Definition: ncbitime.hpp:2563
ESign GetSign(void) const
Get sign of time span.
Definition: ncbitime.hpp:2530
CTime CurrentTime(CTime::ETimeZone tz=CTime::eLocal, CTime::ETimeZonePrecision tzp=CTime::eTZPrecisionDefault)
Definition: ncbitime.hpp:2185
long GetCompleteSeconds(void) const
Get number of complete seconds.
Definition: ncbitime.hpp:2560
const long kNanoSecondsPerSecond
Number of nanoseconds in one second.
Definition: ncbitime.hpp:86
@ eCurrent
Use current time. See also CCurrentTime.
Definition: ncbitime.hpp:300
@ eGmt
GMT (Greenwich Mean Time)
Definition: ncbitime.hpp:308
Definition of all error codes used in util (xutil.lib).
int i
yy_size_t n
mdb_mode_t mode
Definition: lmdb++.h:38
const struct ncbi::grid::netcache::search::fields::SIZE size
Multi-threading – classes, functions, and features.
Defines: CTimeFormat - storage class for time format.
T min(T x_, T y_)
ERunMode
Definition: splign_app.cpp:549
NCBI_XUTIL_EXPORT
Parameter to control printing diagnostic message about conversion of static array data from a differe...
Definition: static_set.hpp:72
#define _ASSERT
CRef< CTestThread > thr[k_NumThreadsMax]
Definition: test_mt.cpp:267
Modified on Sat Mar 02 10:56:48 2024 by modify_doxy.py rev. 669887