NCBI C++ ToolKit
psgs_dispatcher.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef PSGS_DISPATCHER__HPP
2 #define PSGS_DISPATCHER__HPP
3 
4 /* $Id: psgs_dispatcher.hpp 101946 2024-03-07 13:50:06Z satskyse $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Sergey Satskiy
30  *
31  * File Description: PSG server request dispatcher
32  *
33  */
34 
35 #include <list>
36 #include <mutex>
37 #include "ipsgs_processor.hpp"
39 
40 // Must be more than the processor groups registered via the AddProcessor()
41 // call
42 #define MAX_PROCESSOR_GROUPS 16
43 #define PROC_BUCKETS 100
44 
45 // libuv request timer callback
46 void request_timer_cb(uv_timer_t * handle);
47 void request_timer_close_cb(uv_handle_t *handle);
48 
49 
50 // From the dispatcher point of view each request corresponds to a group of
51 // processors and each processor can be in one of the state:
52 // - created and doing what it needs (Up)
53 // - the Cancel() was called for a processor (Canceled)
54 // - a processor reported that it finished (Finished)
59 };
60 
61 // Auxiliary structure to store a processor properties
63 {
64  // The processor is shared between CPendingOperation and the dispatcher
65  shared_ptr<IPSGS_Processor> m_Processor;
71 
72  SProcessorData(shared_ptr<IPSGS_Processor> processor,
73  EPSGS_ProcessorStatus dispatch_status,
74  IPSGS_Processor::EPSGS_Status finish_status) :
75  m_Processor(processor),
76  m_DispatchStatus(dispatch_status),
77  m_FinishStatus(finish_status),
81  {}
82 };
83 
84 // Auxiliary structure to store the group of processors data
86 {
87  size_t m_RequestId;
88  vector<SProcessorData> m_Processors;
89  uv_timer_t m_RequestTimer;
92 
93  // During the normal processing (no abrupt connection dropping by the
94  // client) there are three conditions to delete the group. All the
95  // flags below must be true when a group is deleted from memory
99 
100  // In case of a low level close (abrupt connection dropping) there will
101  // be no lib h2o finish notification and probably there will be no
102  // final flush.
104 
105  // To control that the request stop is issued for this group
107 
108  // A processor which has started to supply data
110 
111  SProcessorGroup(size_t request_id) :
112  m_RequestId(request_id),
118  {
120  }
121 
123  {
124  if (!m_TimerClosed) {
125  PSG_ERROR("The request timer (request id: " +
126  to_string(m_RequestId) +
127  ") must be stopped and its handle closed before the "
128  "processor group is destroyed");
129  }
130  }
131 
132  bool IsSafeToDelete(void) const
133  {
134  return m_TimerClosed &&
135  (
136  // Normal flow safe case
137  (m_FinallyFlushed &&
139  m_Libh2oFinished) ||
140  // Abrupt connection drop case
142  );
143  }
144 
145  void StartRequestTimer(uv_loop_t * uv_loop,
146  uint64_t timer_millisec)
147  {
148  if (m_TimerActive)
149  NCBI_THROW(CPubseqGatewayException, eInvalidTimerStart,
150  "Request timer cannot be started twice (request id: " +
151  to_string(m_RequestId) + ")");
152 
153  int ret = uv_timer_init(uv_loop, &m_RequestTimer);
154  if (ret < 0) {
155  NCBI_THROW(CPubseqGatewayException, eInvalidTimerInit,
156  uv_strerror(ret));
157  }
158  m_RequestTimer.data = (void *)(m_RequestId);
159 
160  ret = uv_timer_start(&m_RequestTimer, request_timer_cb,
161  timer_millisec, 0);
162  if (ret < 0) {
163  NCBI_THROW(CPubseqGatewayException, eInvalidTimerStart,
164  uv_strerror(ret));
165  }
166  m_TimerActive = true;
167  m_TimerClosed = false;
168  }
169 
170  void StopRequestTimer(void)
171  {
172  if (m_TimerActive) {
173  m_TimerActive = false;
174 
175  int ret = uv_timer_stop(&m_RequestTimer);
176  if (ret < 0) {
177  PSG_ERROR("Stop request timer error: " +
178  string(uv_strerror(ret)));
179  }
180 
181  uv_close(reinterpret_cast<uv_handle_t*>(&m_RequestTimer),
183  }
184  }
185 
186  void RestartTimer(uint64_t timer_millisec)
187  {
188  if (m_TimerActive) {
189  // Consequent call just updates the timer
190  int ret = uv_timer_start(&m_RequestTimer, request_timer_cb,
191  timer_millisec, 0);
192  if (ret < 0) {
193  NCBI_THROW(CPubseqGatewayException, eInvalidTimerStart,
194  uv_strerror(ret));
195  }
196  }
197  }
198 };
199 
200 
201 /// Based on various attributes of the request: {{seq_id}}; NA name;
202 /// {{blob_id}}; etc (or a combination thereof)...
203 /// provide a list of Processors to retrieve the requested data
205 {
206 public:
207  // The information that a processor has finished may come from a processor
208  // itself or from the framework.
212  };
213 
215  {
216  switch (source) {
217  case ePSGS_Processor: return "processor";
218  case ePSGS_Fromework: return "framework";
219  default: break;
220  }
221  return "unknown";
222  }
223 
224 public:
225  CPSGS_Dispatcher(double request_timeout)
226  {
227  m_RequestTimeoutMillisec = static_cast<uint64_t>(request_timeout * 1000);
228  }
229 
230  // Low level can have the pending request removed e.g. due to a canceled
231  // connection. This method is used to notify the dispatcher that the
232  // request is deleted and that the processors group is not needed anymore.
233  void NotifyRequestFinished(size_t request_id);
234 
235  /// Register processor (one to serve as a processor factory)
236  void AddProcessor(unique_ptr<IPSGS_Processor> processor);
237 
238  /// Provides a map between a processor group name and a unique zero-based
239  /// index of the group
241 
242  /// Return list of processor names which reported that they can process the
243  /// request.
244  list<string>
245  PreliminaryDispatchRequest(shared_ptr<CPSGS_Request> request,
246  shared_ptr<CPSGS_Reply> reply);
247 
248  /// Return list of processors which can be used to process the request.
249  /// The caller accepts the ownership.
250  list<shared_ptr<IPSGS_Processor>>
251  DispatchRequest(shared_ptr<CPSGS_Request> request,
252  shared_ptr<CPSGS_Reply> reply,
253  const list<string> & processor_names);
254 
255  /// The processor signals that it is going to provide data to the client
258 
259  /// The processor signals that it finished one way or another; including
260  /// when a processor is canceled.
261  void SignalFinishProcessing(IPSGS_Processor * processor,
263 
264  /// An http connection can be canceled so this method will be invoked for
265  /// such a case
266  void SignalConnectionCanceled(size_t request_id);
267 
268  void CancelAll(void);
269 
270  void OnRequestTimer(size_t request_id);
271  void StartRequestTimer(size_t request_id);
272 
273  void EraseProcessorGroup(size_t request_id);
274  void OnLibh2oFinished(size_t request_id);
275  void OnRequestTimerClose(size_t request_id);
276 
278  bool IsGroupAlive(size_t request_id);
279  void PopulateStatus(CJsonNode & status);
280 
281 private:
282  void x_PrintRequestStop(shared_ptr<CPSGS_Request> request,
283  CRequestStatus::ECode status,
284  size_t bytes_sent);
288  x_ConcludeIDGetNARequestStatus(shared_ptr<CPSGS_Request> request,
289  shared_ptr<CPSGS_Reply> reply,
290  bool low_level_close);
292  x_ConcludeRequestStatus(shared_ptr<CPSGS_Request> request,
293  shared_ptr<CPSGS_Reply> reply,
294  vector<IPSGS_Processor::EPSGS_Status> proc_statuses,
295  bool low_level_close);
296  void x_SendTrace(const string & msg,
297  shared_ptr<CPSGS_Request> request,
298  shared_ptr<CPSGS_Reply> reply);
300  IPSGS_Processor * processor,
301  shared_ptr<CPSGS_Request> request,
302  shared_ptr<CPSGS_Reply> reply);
303 
304 private:
305  // Registered processors
306  list<unique_ptr<IPSGS_Processor>> m_RegisteredProcessors;
307 
308 private:
309  // Note: the data are spread between buckets so that there is less
310  // contention on the data protecting mutexes
311 
312  size_t x_GetBucketIndex(size_t request_id) const
313  {
314  return request_id % PROC_BUCKETS;
315  }
316 
317  // The dispatcher shares the created processors with pending operation.
318  // The map below makes a correspondance between the request id (size_t;
319  // generated in the request constructor) and a list of processors with
320  // their properties.
321  unordered_map<size_t,
322  unique_ptr<SProcessorGroup>> m_ProcessorGroups[PROC_BUCKETS];
324 
326 
327 public:
329  {
330  switch (st) {
331  case ePSGS_Up: return "Up";
332  case ePSGS_Canceled: return "Canceled";
333  case ePSGS_Finished: return "Finished";
334  default: ;
335  }
336  return "Unknown";
337  }
338 
339 private:
340  // Processor concurrency support:
341  // Each group of processors may have its own limits and needs to store a
342  // current number of running instances
343 
345 
347  {
348  size_t m_Limit;
351  mutable atomic<bool> m_CountLock;
352 
356  {}
357 
358  size_t GetCurrentCount(void) const
359  {
360  CSpinlockGuard guard(&m_CountLock);
361  return m_CurrentCount;
362  }
363 
365  {
366  CSpinlockGuard guard(&m_CountLock);
368  }
369 
370  void GetCurrentAndLimitReachedCounts(size_t * current,
371  size_t * limit_reached)
372  {
373  CSpinlockGuard guard(&m_CountLock);
374  *current = m_CurrentCount;
375  *limit_reached = m_LimitReachedCount;
376  }
377 
379  {
380  CSpinlockGuard guard(&m_CountLock);
381  ++m_CurrentCount;
382  }
383 
385  {
386  CSpinlockGuard guard(&m_CountLock);
387  --m_CurrentCount;
388  }
389  };
390 
393 };
394 
395 
396 #endif // PSGS_DISPATCHER__HPP
397 
JSON node abstraction.
Based on various attributes of the request: {{seq_id}}; NA name; {{blob_id}}; etc (or a combination t...
void SignalFinishProcessing(IPSGS_Processor *processor, EPSGS_SignalSource source)
The processor signals that it finished one way or another; including when a processor is canceled.
bool IsGroupAlive(size_t request_id)
map< string, size_t > GetConcurrentCounters(void)
void OnRequestTimer(size_t request_id)
void AddProcessor(unique_ptr< IPSGS_Processor > processor)
Register processor (one to serve as a processor factory)
vector< string > m_RegisteredProcessorGroups
void PopulateStatus(CJsonNode &status)
CRequestStatus::ECode x_ConcludeRequestStatus(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, vector< IPSGS_Processor::EPSGS_Status > proc_statuses, bool low_level_close)
uint64_t m_RequestTimeoutMillisec
map< string, size_t > GetProcessorGroupToIndexMap(void) const
Provides a map between a processor group name and a unique zero-based index of the group.
SProcessorConcurrency m_ProcessorConcurrency[16]
void NotifyRequestFinished(size_t request_id)
void OnRequestTimerClose(size_t request_id)
CRequestStatus::ECode x_MapProcessorFinishToStatus(IPSGS_Processor::EPSGS_Status status) const
void OnLibh2oFinished(size_t request_id)
CRequestStatus::ECode x_ConcludeIDGetNARequestStatus(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, bool low_level_close)
void x_PrintRequestStop(shared_ptr< CPSGS_Request > request, CRequestStatus::ECode status, size_t bytes_sent)
void StartRequestTimer(size_t request_id)
void x_SendProgressMessage(IPSGS_Processor::EPSGS_Status finish_status, IPSGS_Processor *processor, shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)
static string SignalSourceToString(EPSGS_SignalSource source)
void x_SendTrace(const string &msg, shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)
list< string > PreliminaryDispatchRequest(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply)
Return list of processor names which reported that they can process the request.
void x_DecrementConcurrencyCounter(IPSGS_Processor *processor)
CPSGS_Dispatcher(double request_timeout)
static string ProcessorStatusToString(EPSGS_ProcessorStatus st)
void EraseProcessorGroup(size_t request_id)
size_t x_GetBucketIndex(size_t request_id) const
unordered_map< size_t, unique_ptr< SProcessorGroup > > m_ProcessorGroups[100]
void SignalConnectionCanceled(size_t request_id)
An http connection can be canceled so this method will be invoked for such a case.
list< shared_ptr< IPSGS_Processor > > DispatchRequest(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, const list< string > &processor_names)
Return list of processors which can be used to process the request.
IPSGS_Processor::EPSGS_StartProcessing SignalStartProcessing(IPSGS_Processor *processor)
The processor signals that it is going to provide data to the client.
list< unique_ptr< IPSGS_Processor > > m_RegisteredProcessors
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
EPSGS_StartProcessing
Tells wether to continue or not after a processor called SignalStartProcessing() method.
EPSGS_Status
The GetStatus() method returns a processor current status.
#define true
Definition: bool.h:35
#define false
Definition: bool.h:36
Uint8 uint64_t
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
const CharType(& source)[N]
Definition: pointer.h:1149
#define nullptr
Definition: ncbimisc.hpp:45
#define MAX_PROCESSOR_GROUPS
void request_timer_cb(uv_timer_t *handle)
#define PROC_BUCKETS
EPSGS_ProcessorStatus
@ ePSGS_Canceled
@ ePSGS_Finished
@ ePSGS_Up
void request_timer_close_cb(uv_handle_t *handle)
#define PSG_ERROR(message)
void GetCurrentAndLimitReachedCounts(size_t *current, size_t *limit_reached)
EPSGS_ProcessorStatus m_DispatchStatus
IPSGS_Processor::EPSGS_Status m_LastReportedTraceStatus
bool m_ProcPerformanceRegistered
SProcessorData(shared_ptr< IPSGS_Processor > processor, EPSGS_ProcessorStatus dispatch_status, IPSGS_Processor::EPSGS_Status finish_status)
IPSGS_Processor::EPSGS_Status m_FinishStatus
shared_ptr< IPSGS_Processor > m_Processor
vector< SProcessorData > m_Processors
SProcessorGroup(size_t request_id)
IPSGS_Processor * m_StartedProcessing
void RestartTimer(uint64_t timer_millisec)
void StopRequestTimer(void)
uv_timer_t m_RequestTimer
void StartRequestTimer(uv_loop_t *uv_loop, uint64_t timer_millisec)
bool IsSafeToDelete(void) const
Modified on Sun Apr 21 03:42:44 2024 by modify_doxy.py rev. 669887