NCBI C++ ToolKit
psgs_reply.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef PSGS_REPLY__HPP
2 #define PSGS_REPLY__HPP
3 
4 /* $Id: psgs_reply.hpp 101848 2024-02-22 13:57:25Z satskyse $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Sergey Satskiy
30  *
31  * File Description:
32  *
33  */
34 
36 #include <h2o.h>
37 
38 #include "pubseq_gateway_types.hpp"
39 #include "psgs_request.hpp"
40 #include "pubseq_gateway_utils.hpp"
41 
42 
43 class CPendingOperation;
44 class CCassBlobFetch;
45 class CHttpReply;
46 namespace idblob { class CCassDataCallbackReceiver; }
47 
48 // Keeps track of the protocol replies
50 {
51 public:
53  {
54  ePSGS_SendAccumulated, // Only flushes the accumulated chunks.
55  ePSGS_SendAndFinish // Flushes the accumulated chunks and closes
56  // the stream.
57  };
58 
59 public:
60  CPSGS_Reply(unique_ptr<CHttpReply> low_level_reply) :
61  m_Reply(low_level_reply.release()),
64  m_NextItemId(0),
67  m_RequestId(0),
69  {
71  m_Chunks.reserve(64);
72  }
73 
74  // This constructor is to reuse the infrastructure (PSG chunks, counting
75  // them etc) in the low level error reports
76  CPSGS_Reply(CHttpReply * low_level_reply) :
77  m_Reply(low_level_reply),
80  m_NextItemId(0),
83  m_RequestId(0),
85  {
87  m_Chunks.reserve(64);
88  }
89 
90  ~CPSGS_Reply();
91 
92 public:
93  // Flush can close the stream
94  void Flush(EPSGS_ReplyFlush how);
95 
96  // Tells the lower level that the pending op can be deleted
97  void SetCompleted(void);
98 
99  // Tells if the stream is closed and the pending op can be deleted
100  bool IsCompleted(void) const;
101 
102  // Tells if the stream is closed
103  bool IsFinished(void) const;
104 
105  // Tells if the output is ready
106  bool IsOutputReady(void) const;
107 
108  bool IsClosed(void) const;
109 
110  void Clear(void);
111  void SetContentType(EPSGS_ReplyMimeType mime_type);
112  void SetContentLength(uint64_t content_length);
113  size_t GetBytesSent(void) const;
114 
115  void SendOk(const char * payload, size_t payload_len, bool is_persist);
116  void Send202(const char * payload, size_t payload_len);
117  void Send400(const char * payload);
118  void Send401(const char * payload);
119  void Send404(const char * payload);
120  void Send409(const char * payload);
121  void Send500(const char * payload);
122  void Send502(const char * payload);
123  void Send503(const char * payload);
124 
125  void ConnectionCancel(void);
126  shared_ptr<idblob::CCassDataCallbackReceiver> GetDataReadyCB(void);
127 
129  {
130  return m_Reply;
131  }
132 
133  size_t GetItemId(void)
134  {
136  auto ret = ++m_NextItemId;
137  return ret;
138  }
139 
140  void SetRequestId(size_t request_id);
141 
142  size_t GetRequestId(void) const
143  {
144  return m_RequestId;
145  }
146 
148  {
150  }
151 
152  unsigned long GetTimespanFromLastActivityToNowMks(void) const
153  {
155  }
156 
157 public:
158  // PSG protocol facilities
159  void PrepareBioseqMessage(size_t item_id,
160  const string & processor_id,
161  const string & msg,
162  CRequestStatus::ECode status,
163  int err_code, EDiagSev severity);
164  void PrepareBioseqData(size_t item_id,
165  const string & processor_id,
166  const string & content,
168  void PrepareBioseqCompletion(size_t item_id,
169  const string & processor_id,
170  size_t chunk_count);
171  void PrepareBlobPropMessage(size_t item_id,
172  const string & processor_id,
173  const string & msg,
174  CRequestStatus::ECode status,
175  int err_code,
176  EDiagSev severity);
177  void PrepareBlobPropMessage(CCassBlobFetch * fetch_details,
178  const string & processor_id,
179  const string & msg,
180  CRequestStatus::ECode status,
181  int err_code,
182  EDiagSev severity);
183  void PrepareTSEBlobPropMessage(CCassBlobFetch * fetch_details,
184  const string & processor_id,
185  int64_t id2_chunk,
186  const string & id2_info,
187  const string & msg,
188  CRequestStatus::ECode status,
189  int err_code,
190  EDiagSev severity);
191  void PrepareBlobPropData(size_t item_id,
192  const string & processor_id,
193  const string & blob_id,
194  const string & content,
195  CBlobRecord::TTimestamp last_modified=-1);
196  void PrepareBlobPropData(CCassBlobFetch * fetch_details,
197  const string & processor_id,
198  const string & content,
199  CBlobRecord::TTimestamp last_modified=-1);
200  void PrepareTSEBlobPropData(size_t item_id,
201  const string & processor_id,
202  int64_t id2_chunk,
203  const string & id2_info,
204  const string & content);
205  void PrepareTSEBlobPropData(CCassBlobFetch * fetch_details,
206  const string & processor_id,
207  int64_t id2_chunk,
208  const string & id2_info,
209  const string & content);
210  void PrepareBlobData(size_t item_id,
211  const string & processor_id,
212  const string & blob_id,
213  const unsigned char * chunk_data,
214  unsigned int data_size,
215  int chunk_no,
216  CBlobRecord::TTimestamp last_modified=-1);
217  void PrepareBlobData(CCassBlobFetch * fetch_details,
218  const string & processor_id,
219  const unsigned char * chunk_data,
220  unsigned int data_size,
221  int chunk_no,
222  CBlobRecord::TTimestamp last_modified=-1);
223  void PrepareTSEBlobData(size_t item_id,
224  const string & processor_id,
225  const unsigned char * chunk_data,
226  unsigned int data_size,
227  int chunk_no,
228  int64_t id2_chunk,
229  const string & id2_info);
230  void PrepareTSEBlobData(CCassBlobFetch * fetch_details,
231  const string & processor_id,
232  const unsigned char * chunk_data,
233  unsigned int data_size,
234  int chunk_no,
235  int64_t id2_chunk,
236  const string & id2_info);
237  void PrepareBlobPropCompletion(size_t item_id,
238  const string & processor_id,
239  size_t chunk_count);
240  void PrepareBlobPropCompletion(CCassBlobFetch * fetch_details,
241  const string & processor_id);
242  void PrepareTSEBlobPropCompletion(CCassBlobFetch * fetch_details,
243  const string & processor_id);
244  void PrepareBlobMessage(size_t item_id,
245  const string & processor_id,
246  const string & blob_id,
247  const string & msg,
248  CRequestStatus::ECode status,
249  int err_code,
250  EDiagSev severity,
251  CBlobRecord::TTimestamp last_modified=-1);
252  void PrepareBlobMessage(CCassBlobFetch * fetch_details,
253  const string & processor_id,
254  const string & msg,
255  CRequestStatus::ECode status, int err_code,
256  EDiagSev severity,
257  CBlobRecord::TTimestamp last_modified=-1);
258  void PrepareTSEBlobMessage(CCassBlobFetch * fetch_details,
259  const string & processor_id,
260  int64_t id2_chunk,
261  const string & id2_info,
262  const string & msg,
263  CRequestStatus::ECode status, int err_code,
264  EDiagSev severity);
265  void PrepareBlobCompletion(size_t item_id,
266  const string & processor_id,
267  size_t chunk_count);
268  void PrepareTSEBlobCompletion(size_t item_id,
269  const string & processor_id,
270  size_t chunk_count);
271  void PrepareTSEBlobCompletion(CCassBlobFetch * fetch_details,
272  const string & processor_id);
273  void PrepareBlobExcluded(const string & blob_id,
274  const string & processor_id,
275  EPSGS_BlobSkipReason skip_reason,
276  CBlobRecord::TTimestamp last_modified=-1);
277  void PrepareBlobExcluded(size_t item_id,
278  const string & processor_id,
279  const string & blob_id,
280  EPSGS_BlobSkipReason skip_reason);
281  void PrepareBlobExcluded(const string & blob_id,
282  const string & processor_id,
283  unsigned long sent_mks_ago,
284  unsigned long until_resend_mks,
285  CBlobRecord::TTimestamp last_modified=-1);
286  // NOTE: the blob id argument is temporary to satisfy the older clients
287  void PrepareTSEBlobExcluded(const string & processor_id,
288  EPSGS_BlobSkipReason skip_reason,
289  const string & blob_id,
290  int64_t id2_chunk,
291  const string & id2_info);
292  // NOTE: the blob id argument is temporary to satisfy the older clients
293  void PrepareTSEBlobExcluded(const string & blob_id,
294  int64_t id2_chunk,
295  const string & id2_info,
296  const string & processor_id,
297  unsigned long sent_mks_ago,
298  unsigned long until_resend_mks);
299  void PrepareBlobCompletion(CCassBlobFetch * fetch_details,
300  const string & processor_id);
301  void PrepareProcessorMessage(size_t item_id, const string & processor_id,
302  const string & msg,
303  CRequestStatus::ECode status, int err_code,
304  EDiagSev severity);
305  void PreparePublicComment(const string & processor_id,
306  const string & public_comment,
307  const string & blob_id,
308  CBlobRecord::TTimestamp last_modified);
309  void PreparePublicComment(const string & processor_id,
310  const string & public_comment,
311  int64_t id2_chunk,
312  const string & id2_info);
313  void PrepareReplyMessage(const string & msg,
314  CRequestStatus::ECode status, int err_code,
315  EDiagSev severity,
316  bool need_update_last_activity=true);
317  void PrepareNamedAnnotationData(const string & annot_name,
318  const string & processor_id,
319  const string & content);
320  void SendPerNamedAnnotationResults(const string & content);
321  void PrepareAccVerHistoryData(const string & processor_id,
322  const string & content);
323  void PrepareIPGResolveData(const string & processor_id,
324  const string & content);
325  void PrepareIPGInfoMessageAndMeta(const string & processor_id,
326  const string & msg,
327  CRequestStatus::ECode status,
328  int err_code,
329  EDiagSev severity);
330  void PrepareRequestTimeoutMessage(const string & msg);
331  void PrepareProcessorProgressMessage(const string & processor_id,
332  const string & progress_status);
333 
335  const psg_time_point_t & create_timestamp);
336 
337  // The last activity timestamp needs to be updated if it was a processor
338  // initiated activity with the reply. If it was a trace from the processor
339  // dispatcher then the activity timestamp does not need to be updated
340  void SendTrace(const string & msg,
341  const psg_time_point_t & create_timestamp,
342  bool need_update_last_activity=true);
343 
344 private:
345  void x_PrepareTSEBlobPropCompletion(size_t item_id,
346  const string & processor_id,
347  size_t chunk_count);
348  void x_PrepareTSEBlobPropMessage(size_t item_id,
349  const string & processor_id,
350  int64_t id2_chunk,
351  const string & id2_info,
352  const string & msg,
353  CRequestStatus::ECode status,
354  int err_code,
355  EDiagSev severity);
356  void x_PrepareTSEBlobMessage(size_t item_id,
357  const string & processor_id,
358  int64_t id2_chunk,
359  const string & id2_info,
360  const string & msg,
361  CRequestStatus::ECode status,
362  int err_code,
363  EDiagSev severity);
364 
366  {
367  m_LastActivityTimestamp = psg_clock_t::now();
368  }
369 
370 private:
373  atomic<bool> m_NextItemIdLock;
374  size_t m_NextItemId;
377  vector<h2o_iovec_t> m_Chunks;
378  volatile bool m_ConnectionCanceled;
379  size_t m_RequestId;
381 };
382 
383 
384 #endif
int64_t TTimestamp
Definition: blob_record.hpp:56
void x_PrepareTSEBlobMessage(size_t item_id, const string &processor_id, int64_t id2_chunk, const string &id2_info, const string &msg, CRequestStatus::ECode status, int err_code, EDiagSev severity)
Definition: psgs_reply.cpp:784
bool m_ReplyOwned
Definition: psgs_reply.hpp:372
void PrepareBlobData(size_t item_id, const string &processor_id, const string &blob_id, const unsigned char *chunk_data, unsigned int data_size, int chunk_no, CBlobRecord::TTimestamp last_modified=-1)
Definition: psgs_reply.cpp:567
void Clear(void)
Definition: psgs_reply.cpp:124
void PrepareIPGResolveData(const string &processor_id, const string &content)
void x_PrepareTSEBlobPropMessage(size_t item_id, const string &processor_id, int64_t id2_chunk, const string &id2_info, const string &msg, CRequestStatus::ECode status, int err_code, EDiagSev severity)
Definition: psgs_reply.cpp:423
void PrepareBioseqCompletion(size_t item_id, const string &processor_id, size_t chunk_count)
Definition: psgs_reply.cpp:377
void PrepareBlobExcluded(const string &blob_id, const string &processor_id, EPSGS_BlobSkipReason skip_reason, CBlobRecord::TTimestamp last_modified=-1)
Definition: psgs_reply.cpp:883
psg_time_point_t m_LastActivityTimestamp
Definition: psgs_reply.hpp:380
void PrepareProcessorProgressMessage(const string &processor_id, const string &progress_status)
void PrepareTSEBlobPropMessage(CCassBlobFetch *fetch_details, const string &processor_id, int64_t id2_chunk, const string &id2_info, const string &msg, CRequestStatus::ECode status, int err_code, EDiagSev severity)
Definition: psgs_reply.cpp:466
void SendPerNamedAnnotationResults(const string &content)
shared_ptr< idblob::CCassDataCallbackReceiver > GetDataReadyCB(void)
Definition: psgs_reply.cpp:135
size_t GetBytesSent(void) const
Definition: psgs_reply.cpp:158
int32_t m_TotalSentReplyChunks
Definition: psgs_reply.hpp:375
void PrepareIPGInfoMessageAndMeta(const string &processor_id, const string &msg, CRequestStatus::ECode status, int err_code, EDiagSev severity)
void PrepareBlobCompletion(size_t item_id, const string &processor_id, size_t chunk_count)
Definition: psgs_reply.cpp:829
void ConnectionCancel(void)
Definition: psgs_reply.cpp:50
void SetContentLength(uint64_t content_length)
Definition: psgs_reply.cpp:150
void PrepareBioseqData(size_t item_id, const string &processor_id, const string &content, SPSGS_ResolveRequest::EPSGS_OutputFormat output_format)
Definition: psgs_reply.cpp:354
void PrepareBlobPropCompletion(size_t item_id, const string &processor_id, size_t chunk_count)
Definition: psgs_reply.cpp:661
void SendTrace(const string &msg, const psg_time_point_t &create_timestamp, bool need_update_last_activity=true)
void SetContentType(EPSGS_ReplyMimeType mime_type)
Definition: psgs_reply.cpp:141
void PrepareBlobMessage(size_t item_id, const string &processor_id, const string &blob_id, const string &msg, CRequestStatus::ECode status, int err_code, EDiagSev severity, CBlobRecord::TTimestamp last_modified=-1)
Definition: psgs_reply.cpp:737
void PrepareTSEBlobPropCompletion(CCassBlobFetch *fetch_details, const string &processor_id)
Definition: psgs_reply.cpp:720
void PrepareBlobPropData(size_t item_id, const string &processor_id, const string &blob_id, const string &content, CBlobRecord::TTimestamp last_modified=-1)
Definition: psgs_reply.cpp:485
void x_PrepareTSEBlobPropCompletion(size_t item_id, const string &processor_id, size_t chunk_count)
Definition: psgs_reply.cpp:682
unsigned long GetTimespanFromLastActivityToNowMks(void) const
Definition: psgs_reply.hpp:152
void Send400(const char *payload)
Definition: psgs_reply.cpp:202
mutex m_ChunksLock
Definition: psgs_reply.hpp:376
size_t m_NextItemId
Definition: psgs_reply.hpp:374
void Send404(const char *payload)
Definition: psgs_reply.cpp:238
size_t m_RequestId
Definition: psgs_reply.hpp:379
CHttpReply * m_Reply
Definition: psgs_reply.hpp:371
@ ePSGS_SendAccumulated
Definition: psgs_reply.hpp:54
@ ePSGS_SendAndFinish
Definition: psgs_reply.hpp:55
void PrepareTSEBlobMessage(CCassBlobFetch *fetch_details, const string &processor_id, int64_t id2_chunk, const string &id2_info, const string &msg, CRequestStatus::ECode status, int err_code, EDiagSev severity)
Definition: psgs_reply.cpp:811
size_t GetRequestId(void) const
Definition: psgs_reply.hpp:142
void PrepareReplyMessage(const string &msg, CRequestStatus::ECode status, int err_code, EDiagSev severity, bool need_update_last_activity=true)
bool IsOutputReady(void) const
Definition: psgs_reply.cpp:106
psg_time_point_t GetLastActivityTimestamp(void) const
Definition: psgs_reply.hpp:147
void SetRequestId(size_t request_id)
Definition: psgs_reply.cpp:117
void PreparePublicComment(const string &processor_id, const string &public_comment, const string &blob_id, CBlobRecord::TTimestamp last_modified)
void PrepareNamedAnnotationData(const string &annot_name, const string &processor_id, const string &content)
bool IsClosed(void) const
Definition: psgs_reply.cpp:112
bool IsCompleted(void) const
Definition: psgs_reply.cpp:94
void Send503(const char *payload)
Definition: psgs_reply.cpp:310
void Send409(const char *payload)
Definition: psgs_reply.cpp:256
void Send401(const char *payload)
Definition: psgs_reply.cpp:220
size_t GetItemId(void)
Definition: psgs_reply.hpp:133
void PrepareProcessorMessage(size_t item_id, const string &processor_id, const string &msg, CRequestStatus::ECode status, int err_code, EDiagSev severity)
void x_UpdateLastActivity(void)
Definition: psgs_reply.hpp:365
void PrepareTSEBlobExcluded(const string &processor_id, EPSGS_BlobSkipReason skip_reason, const string &blob_id, int64_t id2_chunk, const string &id2_info)
Definition: psgs_reply.cpp:949
void PrepareAccVerHistoryData(const string &processor_id, const string &content)
void PrepareTSEBlobPropData(size_t item_id, const string &processor_id, int64_t id2_chunk, const string &id2_info, const string &content)
Definition: psgs_reply.cpp:541
void PrepareTSEBlobData(size_t item_id, const string &processor_id, const unsigned char *chunk_data, unsigned int data_size, int chunk_no, int64_t id2_chunk, const string &id2_info)
Definition: psgs_reply.cpp:614
void Send502(const char *payload)
Definition: psgs_reply.cpp:292
void PrepareTSEBlobCompletion(size_t item_id, const string &processor_id, size_t chunk_count)
Definition: psgs_reply.cpp:863
atomic< bool > m_NextItemIdLock
Definition: psgs_reply.hpp:373
void PrepareBlobPropMessage(size_t item_id, const string &processor_id, const string &msg, CRequestStatus::ECode status, int err_code, EDiagSev severity)
Definition: psgs_reply.cpp:398
void Send202(const char *payload, size_t payload_len)
Definition: psgs_reply.cpp:184
void PrepareBioseqMessage(size_t item_id, const string &processor_id, const string &msg, CRequestStatus::ECode status, int err_code, EDiagSev severity)
Definition: psgs_reply.cpp:328
void PrepareReplyCompletion(CRequestStatus::ECode status, const psg_time_point_t &create_timestamp)
vector< h2o_iovec_t > m_Chunks
Definition: psgs_reply.hpp:377
CHttpReply * GetHttpReply(void)
Definition: psgs_reply.hpp:128
volatile bool m_ConnectionCanceled
Definition: psgs_reply.hpp:378
void SetCompleted(void)
Definition: psgs_reply.cpp:87
CPSGS_Reply(unique_ptr< CHttpReply > low_level_reply)
Definition: psgs_reply.hpp:60
void PrepareRequestTimeoutMessage(const string &msg)
void Send500(const char *payload)
Definition: psgs_reply.cpp:274
void Flush(EPSGS_ReplyFlush how)
Definition: psgs_reply.cpp:57
bool IsFinished(void) const
Definition: psgs_reply.cpp:100
CPSGS_Reply(CHttpReply *low_level_reply)
Definition: psgs_reply.hpp:76
void SendOk(const char *payload, size_t payload_len, bool is_persist)
Definition: psgs_reply.cpp:166
#define true
Definition: bool.h:35
#define false
Definition: bool.h:36
Uint8 uint64_t
Int4 int32_t
Int8 int64_t
EDiagSev
Severity level for the posted diagnostics.
Definition: ncbidiag.hpp:650
EPSGS_BlobSkipReason
chrono::steady_clock psg_clock_t
psg_clock_t::time_point psg_time_point_t
unsigned long GetTimespanToNowMks(const psg_time_point_t &t_point)
Defines CRequestStatus class for NCBI C++ diagnostic API.
Modified on Sat Apr 20 12:15:36 2024 by modify_doxy.py rev. 669887