NCBI C++ ToolKit
ipsgs_processor.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef IPSGS_PROCESSOR__HPP
2 #define IPSGS_PROCESSOR__HPP
3 
4 /* $Id: ipsgs_processor.hpp 100918 2023-09-29 15:56:54Z satskyse $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Sergey Satskiy
30  *
31  * File Description: PSG processor interface
32  *
33  */
34 
35 #include "psgs_request.hpp"
36 #include "psgs_reply.hpp"
37 #include "psgs_uv_loop_binder.hpp"
38 #include "psgs_io_callbacks.hpp"
40 
42 
43 /// Interface class (and self-factory) for request processor objects that can
44 /// retrieve data from a given data source.
45 /// The overal life cycle of the processors is as follows.
46 /// There is a one-time processors registration stage. On this stage a default
47 /// processor constructor will be used.
48 /// Then at the time when a request comes, all the registred processors will
49 /// receive the CreateProcessor(...) call. All not NULL processors will be
50 /// considered as those which are able to process the request.
51 /// Later the infrastructure will call the created processors Process() method
52 /// in parallel and periodically will call GetStatus() method. When all
53 /// processors finished it is considered as the end of the request processing.
54 ///
55 /// There are a few agreements for the processors.
56 /// - The server replies use PSG protocol. When something is needed to be sent
57 /// to the client then m_Reply method should be used.
58 /// - When a processor is finished it should call the
59 /// SignalProcessorFinished() method.
60 /// - If a processor needs to do any logging then two thing need to be done:
61 /// - Set request context fot the current thread.
62 /// - Use one of the macro PSG_TRACE, PSG_INFO, PSG_WARNING, PSG_ERROR,
63 /// PSG_CRITICAL, PSG_MESSAGE (pubseq_gateway_logging.hpp)
64 /// - Reset request context
65 /// E.g.:
66 /// { CRequestContextResetter context_resetter;
67 /// m_Request->SetRequestContext();
68 /// ...
69 /// PSG_WARNING("Something"); }
70 /// - The ProcessEvents() method can be called periodically (in addition to
71 /// some events like Cassandra data ready)
73 {
74 public:
75  /// The GetStatus() method returns a processor current status.
76  /// The order is important: basing on it a worst (max) and best (min)
77  /// status is calculated for a group of processors.
78  enum EPSGS_Status {
79  ePSGS_InProgress, //< Processor is still working.
80  ePSGS_Done, //< Processor finished and found what needed.
81  ePSGS_NotFound, //< Processor finished and did not find anything.
82  ePSGS_Canceled, //< Processor finished because earlier it received
83  //< the Cancel() call.
84  ePSGS_Timeout, //< Processor finished because of a backend timeout.
85  ePSGS_Error, //< Processor finished and there was an error.
86  ePSGS_Unauthorized //< Processor finsihed and there was an authorization error.
87  };
88 
89  /// Converts the processor status to a string for tracing and logging
90  /// purposes.
91  static string StatusToString(EPSGS_Status status);
92 
93  /// Converts the processor status to a string for protocol message.
94  static string StatusToProgressMessage(EPSGS_Status status);
95 
96 public:
102  {}
103 
105  {}
106 
107 public:
108  /// Tells if processor can process the given request
109  /// @param request
110  /// PSG request to retrieve the data for. It is guaranteed to be not null.
111  /// @param reply
112  /// The way to send reply chunks to the client. It is guaranteed to
113  /// be not null.
114  /// @return
115  /// true if the processor can process the request
116  virtual bool CanProcess(shared_ptr<CPSGS_Request> request,
117  shared_ptr<CPSGS_Reply> reply) const
118  {
119  return true;
120  }
121 
122  /// Needs to be implemented only for the ID/get_na requests.
123  /// It returns a list of named annotations which a processor recognizes as
124  /// suitable for processing.
125  /// @param request
126  /// PSG request to retrieve the data for. It is guaranteed to be not null.
127  /// @param reply
128  /// The way to send reply chunks to the client. It is guaranteed to
129  /// be not null.
130  /// @return
131  /// a list of annotations which can be processed
132  virtual vector<string> WhatCanProcess(shared_ptr<CPSGS_Request> request,
133  shared_ptr<CPSGS_Reply> reply) const
134  {
135  return vector<string>();
136  }
137 
138 
139  /// Create processor to fulfil PSG request using the data source
140  /// @param request
141  /// PSG request to retrieve the data for. It is guaranteed to be not null.
142  /// @param reply
143  /// The way to send reply chunks to the client. It is guaranteed to
144  /// be not null.
145  /// @return
146  /// New processor object if this processor can theoretically fulfill
147  /// (all or a part of) the request; else NULL.
148  virtual IPSGS_Processor* CreateProcessor(shared_ptr<CPSGS_Request> request,
149  shared_ptr<CPSGS_Reply> reply,
150  TProcessorPriority priority) const = 0;
151 
152  /// Main processing function.
153  /// It should avoid throwing exceptions. In case of errors it must make
154  /// sure that:
155  /// - the consequent GetStatus() calls return appropriate status
156  /// - call SignalFinishProcessing() if there in no more processor activity
157  /// If an exception is generated it is still a must for a processor to
158  /// fulfill the obligations above. The dispatching code will log the
159  /// message (and possibly trace) and continue in this case.
160  virtual void Process(void) = 0;
161 
162  /// The infrastructure request to cancel processing
163  virtual void Cancel(void) = 0;
164 
165  /// Tells the processor status (if it has finished or in progress)
166  /// @return
167  /// the current processor status
168  virtual EPSGS_Status GetStatus(void) = 0;
169 
170  /// Tells the processor name (used in logging and tracing)
171  /// @return
172  /// the processor name
173  virtual string GetName(void) const = 0;
174 
175  /// Tells the processor group name. For example, all the processors which
176  /// retrieve data from Cassandra should return the same name in response to
177  /// this call. This name is used to control the total number of
178  /// simultaneously working processors retrieving from the same backend.
179  /// @return
180  /// the processor group name
181  virtual string GetGroupName(void) const = 0;
182 
183  /// Called when an event happened which may require to have
184  /// some processing. By default nothing should be done.
185  /// This method can be called as well on a timer event.
186  virtual void ProcessEvent(void)
187  {}
188 
189  /// Provides the user request
190  /// @return
191  /// User request
192  shared_ptr<CPSGS_Request> GetRequest(void) const
193  {
194  return m_Request;
195  }
196 
197  /// Provides the reply wrapper
198  /// @return
199  /// Reply wrapper which lets to send reply chunks to the user
200  shared_ptr<CPSGS_Reply> GetReply(void) const
201  {
202  return m_Reply;
203  }
204 
205  /// Provides the processor priority
206  /// @return
207  /// The processor priority
209  {
210  return m_Priority;
211  }
212 
213  /// The provided callback will be called from the libuv loop assigned to
214  /// the processor
215  /// @param cb
216  /// The callback to be called from the libuv loop
217  /// @param user_data
218  /// The data to be passed to the callback
220  void * user_data);
221 
222  /// The provided callbacks will be called from the libuv loop assigned to
223  /// the processor when the corresponding event appeared on the provided
224  /// socket.
225  /// @param fd
226  /// The socket to poll
227  /// @param event
228  /// The event to wait for
229  /// @param timeout_millisec
230  /// The timeout of waiting for the event
231  /// @param user_data
232  /// The data to be passed to the callback
233  /// @param event_cb
234  /// The event callback
235  /// @param timeout_cb
236  /// The timeout callback
237  /// @param error_cb
238  /// The error callback
239  /// @note
240  /// The processor must make sure the socket is valid
241  void SetSocketCallback(int fd,
243  uint64_t timeout_millisec,
244  void * user_data,
248 
249  /// Saves the libuv worker thread id which runs the processor.
250  /// To be used by the server framework only.
251  /// @param uv_thread_id
252  /// The libuv worker thread id which runs the processor
253  void SetUVThreadId(uv_thread_t uv_thread_id)
254  {
255  m_UVThreadId = uv_thread_id;
256  }
257 
258  /// Provides the libuv thread id which runs the processor.
259  /// @return
260  /// The libuv worker thread id which runs the processor
261  uv_thread_t GetUVThreadId(void) const
262  {
263  return m_UVThreadId;
264  }
265 
266  /// Tells if a libuv thread id has been assigned to the processor.
267  /// Basically the assignment of the libuv thread means that the processor
268  /// has been started i.e. Process() was called before.
269  /// @return
270  /// true if the libuv thread has been assigned
271  bool IsUVThreadAssigned(void) const
272  {
273  return m_UVThreadId != 0;
274  }
275 
276  /// Provides the timestamp of when the Process() method was called
277  /// @return
278  /// Process() method invoke timestamp
280  {
283  }
284 
285  /// Provides the timestamp of when the processor called
286  /// SignalStartProcessing() method
287  /// @return
288  /// SignalStartProcessing() method invoke timestamp
290  {
292  return m_SignalStartTimestamp;
293  }
294 
295  /// Provides the timestamp of when the processor called
296  /// SignalFinishProcessing() method
297  /// @return
298  /// SignalFinishProcessing() method invoke timestamp
300  {
303  }
304 
305  /// Called just before the virtual Process() method is called
306  void OnBeforeProcess(void);
307 
308 public:
309  /// Tells wether to continue or not after a processor called
310  /// SignalStartProcessing() method.
314  };
315 
316  /// A processor should call the method when it decides that it
317  /// successfully started processing the request. The other processors
318  /// which are handling this request in parallel will be cancelled.
319  /// @return
320  /// The flag to continue or to stop further activity
322 
323  /// A processor should call this method when it decides that there is
324  /// nothing else to be done.
325  void SignalFinishProcessing(void);
326 
327 public:
328  /// Parse seq-id from a string and type representation.
329  /// @param seq_id
330  /// Destination seq-id to place parsed value into.
331  /// @param request_seq_id
332  /// Input string containing seq-id to parse.
333  /// @param request_seq_id_type
334  /// Input seq-id type
335  /// @param err_msg
336  /// Optional string to receive error message if any.
337  /// @return
338  /// ePSGS_ParsedOK on success, ePSGS_ParseFailed otherwise.
340  objects::CSeq_id& seq_id,
341  const string& request_seq_id,
342  int request_seq_id_type,
343  string* err_msg = nullptr);
344 
345 protected:
347  const objects::CSeq_id& parsed_seq_id,
348  int request_seq_id_type,
349  int16_t& eff_seq_id_type,
350  bool need_trace);
351 
352 protected:
353  shared_ptr<CPSGS_Request> m_Request;
354  shared_ptr<CPSGS_Reply> m_Reply;
356 
357 protected:
359  uv_thread_t m_UVThreadId;
360 
361 private:
364 
367 
370 };
371 
372 
373 // Basically the logic is the same as in GetEffectiveSeqIdType() member
374 // This one does not send traces and does not provide the effective seq_id,
375 // just tells if there is not conflict between types
377  const objects::CSeq_id& parsed_seq_id,
378  int request_seq_id_type);
379 
380 
381 #endif // IPSGS_PROCESSOR__HPP
382 
#define false
Definition: bool.h:36
function< EPSGS_PollContinue(void *user_data)> TEventCB
function< EPSGS_PollContinue(void *user_data)> TTimeoutCB
function< EPSGS_PollContinue(const string &message, void *user_data)> TErrorCB
function< void(void *user_data)> TProcessorCB
Interface class (and self-factory) for request processor objects that can retrieve data from a given ...
virtual void Cancel(void)=0
The infrastructure request to cancel processing.
virtual string GetName(void) const =0
Tells the processor name (used in logging and tracing)
void SetUVThreadId(uv_thread_t uv_thread_id)
Saves the libuv worker thread id which runs the processor.
psg_time_point_t GetProcessInvokeTimestamp(bool &is_valid) const
Provides the timestamp of when the Process() method was called.
virtual IPSGS_Processor * CreateProcessor(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply, TProcessorPriority priority) const =0
Create processor to fulfil PSG request using the data source.
static string StatusToString(EPSGS_Status status)
Converts the processor status to a string for tracing and logging purposes.
virtual EPSGS_Status GetStatus(void)=0
Tells the processor status (if it has finished or in progress)
uv_thread_t m_UVThreadId
virtual bool CanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const
Tells if processor can process the given request.
virtual void Process(void)=0
Main processing function.
psg_time_point_t GetSignalFinishTimestamp(bool &is_valid) const
Provides the timestamp of when the processor called SignalFinishProcessing() method.
virtual ~IPSGS_Processor()
psg_time_point_t m_ProcessInvokeTimestamp
void OnBeforeProcess(void)
Called just before the virtual Process() method is called.
psg_time_point_t m_SignalStartTimestamp
void SetSocketCallback(int fd, CPSGS_SocketIOCallback::EPSGS_Event event, uint64_t timeout_millisec, void *user_data, CPSGS_SocketIOCallback::TEventCB event_cb, CPSGS_SocketIOCallback::TTimeoutCB timeout_cb, CPSGS_SocketIOCallback::TErrorCB error_cb)
The provided callbacks will be called from the libuv loop assigned to the processor when the correspo...
shared_ptr< CPSGS_Reply > GetReply(void) const
Provides the reply wrapper.
virtual vector< string > WhatCanProcess(shared_ptr< CPSGS_Request > request, shared_ptr< CPSGS_Reply > reply) const
Needs to be implemented only for the ID/get_na requests.
uv_thread_t GetUVThreadId(void) const
Provides the libuv thread id which runs the processor.
shared_ptr< CPSGS_Request > GetRequest(void) const
Provides the user request.
bool IsUVThreadAssigned(void) const
Tells if a libuv thread id has been assigned to the processor.
virtual string GetGroupName(void) const =0
Tells the processor group name.
shared_ptr< CPSGS_Reply > m_Reply
EPSGS_StartProcessing
Tells wether to continue or not after a processor called SignalStartProcessing() method.
psg_time_point_t GetSignalStartTimestamp(bool &is_valid) const
Provides the timestamp of when the processor called SignalStartProcessing() method.
bool m_SignalStartTimestampInitialized
bool GetEffectiveSeqIdType(const objects::CSeq_id &parsed_seq_id, int request_seq_id_type, int16_t &eff_seq_id_type, bool need_trace)
psg_time_point_t m_SignalFinishTimestamp
TProcessorPriority GetPriority(void) const
Provides the processor priority.
EPSGS_Status
The GetStatus() method returns a processor current status.
void PostponeInvoke(CPSGS_UvLoopBinder::TProcessorCB cb, void *user_data)
The provided callback will be called from the libuv loop assigned to the processor.
bool m_ProcessInvokeTimestampInitialized
void SignalFinishProcessing(void)
A processor should call this method when it decides that there is nothing else to be done.
EPSGS_StartProcessing SignalStartProcessing(void)
A processor should call the method when it decides that it successfully started processing the reques...
shared_ptr< CPSGS_Request > m_Request
bool m_SignalFinishTimestampInitialized
virtual void ProcessEvent(void)
Called when an event happened which may require to have some processing.
EPSGS_SeqIdParsingResult ParseInputSeqId(objects::CSeq_id &seq_id, const string &request_seq_id, int request_seq_id_type, string *err_msg=nullptr)
Parse seq-id from a string and type representation.
static string StatusToProgressMessage(EPSGS_Status status)
Converts the processor status to a string for protocol message.
TProcessorPriority m_Priority
bool AreSeqIdTypesMatched(const objects::CSeq_id &parsed_seq_id, int request_seq_id_type)
USING_NCBI_SCOPE
EPSGS_SeqIdParsingResult
int TProcessorPriority
psg_clock_t::time_point psg_time_point_t
signed short int16_t
Definition: stdint.h:122
unsigned __int64 uint64_t
Definition: stdint.h:136
Modified on Sun Feb 25 03:03:20 2024 by modify_doxy.py rev. 669887