NCBI C++ ToolKit
thread_pool_for_server.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef CONNECT__IMPL__THREAD_POOL_FOR_SERVER__HPP
2 #define CONNECT__IMPL__THREAD_POOL_FOR_SERVER__HPP
3 
4 /* $Id: thread_pool_for_server.hpp 73890 2016-07-29 15:32:34Z satskyse $
5 * ===========================================================================
6 *
7 * PUBLIC DOMAIN NOTICE
8 * National Center for Biotechnology Information
9 *
10 * This software/database is a "United States Government Work" under the
11 * terms of the United States Copyright Act. It was written as part of
12 * the author's official duties as a United States Government employee and
13 * thus cannot be copyrighted. This software/database is freely available
14 * to the public for use. The National Library of Medicine and the U.S.
15 * Government have not placed any restriction on its use or reproduction.
16 *
17 * Although all reasonable efforts have been taken to ensure the accuracy
18 * and reliability of the software and data, the NLM and the U.S.
19 * Government do not and cannot warrant the performance or results that
20 * may be obtained by using this software or data. The NLM and the U.S.
21 * Government disclaim all warranties, express or implied, including
22 * warranties of performance, merchantability or fitness for any particular
23 * purpose.
24 *
25 * Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Author: Pavel Ivanov
30 *
31 * File Description:
32 */
33 
34 
35 #include <util/thread_pool_old.hpp>
36 
37 
38 /** @addtogroup ThreadedPools
39  *
40  * @{
41  */
42 
44 
45 
47 public:
49 
51  : m_Status(CQueueItemBase::ePending)
52  {}
53 
54  const EStatus& GetStatus(void) const { return m_Status; }
55 
58 
59 protected:
61 
62  virtual void x_SetStatus(EStatus new_status)
63  { m_Status = new_status; }
64 };
65 
66 
68 {
69 public:
70  class CQueueItem;
73 
74  /// It may be desirable to store handles obtained from GetHandle() in
75  /// instances of CCompletingHandle to ensure that they are marked as
76  /// complete when all is said and done, even in the face of exceptions.
78  {
79  public:
81  : TItemHandle(h)
82  {}
83 
85  {
86  if (this->NotEmpty()) {
87  this->GetObject().MarkAsComplete();
88  }
89  }
90  };
91 
92  /// Constructor
94  {}
95 
96  /// Put a request into the queue. If the queue remains full for
97  /// the duration of the (optional) timeout, throw an exception.
98  ///
99  /// @param request
100  /// Request
101  TItemHandle Put(const TRequest& request);
102 
103  /// Get the first available request from the queue, and return a
104  /// handle to it.
105  /// Blocks politely if empty.
106  TItemHandle GetHandle(void);
107 
109  {
110  public:
111  // typedef CBlockingQueue<TRequest> TQueue;
113  : m_Request(request)
114  {}
115 
116  const TRequest& GetRequest(void) const { return m_Request; }
117 
118  protected:
119  // Specialized for CRef<CStdRequest> in thread_pool.cpp
120  void x_SetStatus(EStatus new_status)
121  {
122  EStatus old_status = GetStatus();
124  m_Request->OnStatusChange(old_status, new_status);
125  }
126 
127  private:
129 
131  };
132 
133 protected:
134  /// The type of the queue
135  typedef deque<TItemHandle> TRealQueue;
136 
137  // Derived classes should take care to use these members properly.
138  TRealQueue m_Queue; ///< The queue
139 
141  mutable CMutex m_Mutex; ///< Guards access to queue
142 
143 private:
144  /// forbidden
147 };
148 
149 
151 
153 {
154 public:
159 
160  /// Constructor
161  ///
162  /// @param pool
163  /// A pool where this thead is placed
164  /// @param mode
165  /// A running mode of this thread
167  : m_Pool(pool), m_Counted(false)
168  {}
169  void CountSelf(void);
170 
171 protected:
172  /// Destructor
173  virtual ~CThreadInPool_ForServer(void);
174 
175  /// Process a request.
176  /// It is called from Main() for each request this thread handles
177  ///
178  /// @param
179  /// A request for processing
180  void ProcessRequest(TItemHandle handle);
181 
182  /// Older interface (still delegated to by default)
183  void ProcessRequest(const TRequest& req)
184  { req.GetNCPointerOrNull()->Process(); }
185 
186 private:
187  // to prevent overriding; inherited from CThread
188  virtual void* Main(void);
189 
190  void x_HandleOneRequest(bool catch_all);
191  void x_UnregisterThread(void);
192 
194  {
195  public:
198  ~CAutoUnregGuard(void);
199 
200  private:
202  };
203 
204  friend class CAutoUnregGuard;
205 
206 
207  TPool* m_Pool; ///< The pool that holds this thread
208  bool m_Counted;
209 };
210 
211 
213 {
214 public:
216 
220 
221  /// Constructor
222  ///
223  /// @param max_threads
224  /// The maximum number of threads that this pool can run
225  CPoolOfThreads_ForServer(unsigned int max_threads, const string& thr_suffix);
226 
227  /// Destructor
228  virtual ~CPoolOfThreads_ForServer(void);
229 
230  /// Start processing threads
231  ///
232  /// @param num_threads
233  /// The number of threads to start
234  void Spawn(unsigned int num_threads);
235 
236  /// Put a request in the queue with a given priority
237  ///
238  /// @param request
239  /// A request
240  void AcceptRequest(const TRequest& request);
241  TItemHandle GetHandle(void);
242 
243  /// Causes all threads in the pool to exit cleanly after finishing
244  /// all pending requests, optionally waiting for them to die.
245  ///
246  /// @param wait
247  /// If true will wait until all thread in the pool finish their job
248  void KillAllThreads(bool wait);
249 
250 private:
252 
253  /// Create a new thread
255  { return new CThreadInPool_ForServer(this); }
256 
257  /// Register a thread. It is called by TThread::Main.
258  ///
259  /// @param thread
260  /// A thread to register
261  /// @param return
262  /// Whether registration succeeded. (KillAllThreads disables it.)
263  bool Register(TThread& thread);
264 
265  /// Unregister a thread
266  ///
267  /// @param thread
268  /// A thread to unregister
269  void UnRegister(TThread&);
270 
271 
273 
274  /// The maximum number of threads the pool can hold
276  /// The current number of threads in the pool
278  /// The guard for m_MaxThreads and m_MaxUrgentThreads
280 
282  string m_ThrSuffix;
283 
284  typedef list<CRef<TThread> > TThreads;
287 };
288 
289 
291 
292 
293 /* @} */
294 
295 #endif /* CONNECT__IMPL__THREAD_POOL_FOR_SERVER__HPP */
EStatus
#define false
Definition: bool.h:36
CAtomicCounter –.
Definition: ncbicntr.hpp:71
It may be desirable to store handles obtained from GetHandle() in instances of CCompletingHandle to e...
CMutex –.
Definition: ncbimtx.hpp:749
CObject –.
Definition: ncbiobj.hpp:180
CQueueItemBase – skeleton blocking-queue item, sans actual request.
CRef –.
Definition: ncbiobj.hpp:618
TNCBIAtomicValue TValue
Alias TValue for TNCBIAtomicValue.
Definition: ncbicntr.hpp:73
TObjectType * GetNCPointerOrNull(void) const THROWS_NONE
Get pointer value.
Definition: ncbiobj.hpp:1162
bool NotEmpty(void) const THROWS_NONE
Check if CRef is not empty – pointing to an object and has a non-null value.
Definition: ncbiobj.hpp:726
TObjectType & GetObject(void)
Get object.
Definition: ncbiobj.hpp:1011
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
void ProcessRequest(const TRequest &req)
Older interface (still delegated to by default)
void CountSelf(void)
Definition: server.cpp:164
bool Register(TThread &thread)
Register a thread.
Definition: server.cpp:315
CMutex m_Mutex
The guard for m_MaxThreads and m_MaxUrgentThreads.
TItemHandle GetHandle(void)
Definition: server.cpp:280
CBlockingQueue_ForServer(void)
Constructor.
TPool * m_Pool
The pool that holds this thread.
CThreadInPool_ForServer(TPool *pool)
Constructor.
void UnRegister(TThread &)
Unregister a thread.
Definition: server.cpp:327
CAtomicCounter::TValue TACValue
CBlockingQueue_ForServer::CCompletingHandle TCompletingHandle
deque< TItemHandle > TRealQueue
The type of the queue.
void AcceptRequest(const TRequest &request)
Put a request in the queue with a given priority.
Definition: server.cpp:274
const EStatus & GetStatus(void) const
CThreadInPool_ForServer TThread
virtual void x_SetStatus(EStatus new_status)
void x_UnregisterThread(void)
Definition: server.cpp:179
CPoolOfThreads_ForServer(unsigned int max_threads, const string &thr_suffix)
Constructor.
Definition: server.cpp:239
CMutex m_Mutex
Guards access to queue.
list< CRef< TThread > > TThreads
CBlockingQueue_ForServer & operator=(const CBlockingQueue_ForServer &)
CBlockingQueue_ForServer::TItemHandle TItemHandle
volatile TACValue m_MaxThreads
The maximum number of threads the pool can hold.
virtual ~CPoolOfThreads_ForServer(void)
Destructor.
Definition: server.cpp:248
void KillAllThreads(bool wait)
Causes all threads in the pool to exit cleanly after finishing all pending requests,...
Definition: server.cpp:294
CBlockingQueue_ForServer::TRequest TRequest
TItemHandle Put(const TRequest &request)
Put a request into the queue.
Definition: server.cpp:125
TItemHandle GetHandle(void)
Get the first available request from the queue, and return a handle to it.
Definition: server.cpp:137
virtual void OnStatusChange(EStatus, EStatus)
Callback for status changes.
CBlockingQueue_ForServer(const CBlockingQueue_ForServer &)
forbidden
virtual ~CThreadInPool_ForServer(void)
Destructor.
Definition: server.cpp:171
TThread * NewThread(void)
Create a new thread.
CBlockingQueue_ForServer TQueue
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
Definition: server.cpp:208
void ProcessRequest(TItemHandle handle)
Process a request.
Definition: server.cpp:232
void x_HandleOneRequest(bool catch_all)
Definition: server.cpp:185
CAtomicCounter m_ThreadCount
The current number of threads in the pool.
virtual void Process(void)=0
Do the actual job Called by whichever thread handles this request.
CPoolOfThreads_ForServer TPool
void Spawn(unsigned int num_threads)
Start processing threads.
Definition: server.cpp:263
CQueueItemBase::EStatus EStatus
@ eComplete
extracted and released
@ eForciblyCaught
let an exception escape
CRef< CTestThread > thr[k_NumThreadsMax]
Definition: test_mt.cpp:267
Modified on Tue Feb 27 05:50:13 2024 by modify_doxy.py rev. 669887