NCBI C++ ToolKit
psgs_request.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: psgs_request.cpp 100918 2023-09-29 15:56:54Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description:
29  *
30  */
31 #include <ncbi_pch.hpp>
32 
33 #include <corelib/ncbistr.hpp>
34 
35 #include "psgs_request.hpp"
37 #include "pubseq_gateway_utils.hpp"
38 
40 
41 static atomic<bool> s_RequestIdLock(false);
42 static size_t s_NextRequestId = 0;
43 
44 
45 size_t GetNextRequestId(void)
46 {
48  auto request_id = ++s_NextRequestId;
49  return request_id;
50 }
51 
52 
54 {
55  char buf[64];
56  long len;
57  len = PSGToString(sat, buf);
58  m_Id.append(buf, len)
59  .append(1, '.');
60  len = PSGToString(sat_key, buf);
61  m_Id.append(buf, len);
62 }
63 
64 
66  m_RequestId(0),
67  m_BacklogTimeMks(0),
68  m_ConcurrentProcessorCount(0)
69 {}
70 
71 
73 {
74  for (auto it: m_Wait) {
75  switch (it.second->m_State) {
77  delete it.second;
78  break;
80  // This is rather strange: a processor has not unlocked an
81  // event and at the same time a request is going to be
82  // destroyed.
83  PSG_ERROR("Reply is going to be deleted when a processor has "
84  "not unlocked an event");
85  delete it.second;
86  break;
88  // This is rather strange: a processor is still waiting on the
89  // condition variable and at the same time the request is going
90  // to be destroyed.
91  // Just in case, lets unlock the condition variable and then
92  // delete the associated data
93  PSG_ERROR("Reply is going to be deleted when a processor is "
94  "locked on a condition variable");
95  it.second->m_WaitObject.notify_all();
96  delete it.second;
97  break;
98  }
99  }
100 }
101 
102 
104  unique_ptr<SPSGS_RequestBase> req,
105  CRef<CRequestContext> request_context) :
106  m_HttpRequest(http_request),
107  m_Request(move(req)),
108  m_RequestContext(request_context),
109  m_RequestId(GetNextRequestId()),
110  m_BacklogTimeMks(0),
111  m_ConcurrentProcessorCount(0)
112 {}
113 
114 
116 {
117  if (m_Request)
118  return m_Request->GetRequestType();
119  return ePSGS_UnknownRequest;
120 }
121 
122 
123 void CPSGS_Request::Lock(const string & event_name)
124 {
126  return; // No parallel processors so there is no point to wait
127 
128  unique_lock<mutex> scope_lock(m_WaitLock);
129 
130  if (m_Wait.find(event_name) != m_Wait.end()) {
131  // Double locking; it is rather an error
133  "Multiple lock of the same event is not supported");
134  }
135 
136  m_Wait[event_name] = new SWaitData();
137  m_Wait[event_name]->m_State = SWaitData::ePSGS_LockedNobodyWaits;
138 }
139 
140 
141 void CPSGS_Request::Unlock(const string & event_name)
142 {
144  return; // No parallel processors so there is no point to wait
145 
146  unique_lock<mutex> scope_lock(m_WaitLock);
147 
148  auto it = m_Wait.find(event_name);
149  if (it == m_Wait.end()) {
150  // Unlocking something which was not locked
151  return;
152  }
153 
154  switch (it->second->m_State) {
156  it->second->m_State = SWaitData::ePSGS_Unlocked;
157  break;
159  // Double unlocking; it's OK
160  break;
162  it->second->m_WaitCount = 0;
163  it->second->m_State = SWaitData::ePSGS_Unlocked;
164  it->second->m_WaitObject.notify_all();
165  break;
166  }
167 }
168 
169 
170 void CPSGS_Request::WaitFor(const string & event_name, size_t timeout_sec)
171 {
172  if (m_ConcurrentProcessorCount < 2) {
173  // No parallel processors so there is no point to wait
174  return;
175  }
176 
177  unique_lock<mutex> scope_lock(m_WaitLock);
178 
179  auto it = m_Wait.find(event_name);
180  if (it == m_Wait.end()) {
181  // There was no Lock() call for that event
182  return;
183  }
184 
185  switch (it->second->m_State) {
188  // The logic to initiate waiting is below
189  break;
191  // It has already been unlocked
192  return;
193  }
194 
195  ++it->second->m_WaitCount;
196  it->second->m_State = SWaitData::ePSGS_LockedSomebodyWaits;
197  auto status = it->second->m_WaitObject.wait_for(scope_lock,
198  chrono::seconds(timeout_sec));
199  if (status == cv_status::timeout) {
200  // Note: decrementing the count only in case of a timeout. In case of a
201  // normal Unlock() the count will be reset there.
202  if (--it->second->m_WaitCount == 0)
203  it->second->m_State = SWaitData::ePSGS_LockedNobodyWaits;
204 
205  string message = "Timeout (" + to_string(timeout_sec) +
206  " seconds) waiting on event '" + event_name + "'";
207 
208  PSG_WARNING(message);
209  NCBI_THROW(CPubseqGatewayException, eTimeout, message);
210  }
211 
212  // Here:
213  // - waiting has completed within the timeout
214  // - there is no need to change the state because it is done in Unlock()
215  // by unlocking processor and the state here is ePSGS_Unlocked
216 }
217 
218 
219 // Provides the original request context
221 {
222  return m_RequestContext;
223 }
224 
225 
226 // Sets the cloned request context so that many threads can produce messages
228 {
229  if (m_RequestContext.NotNull()) {
232  }
233 }
234 
235 
237 {
238  if (m_Request)
239  return m_Request->GetStartTimestamp();
240 
242  "User request is not initialized");
243 }
244 
245 
247 {
248  if (m_Request)
249  return m_Request->GetTrace() == SPSGS_RequestBase::ePSGS_WithTracing;
250 
252  "User request is not initialized");
253 }
254 
255 
257 {
258  if (m_Request)
259  return m_Request->GetProcessorEvents();
260 
262  "User request is not initialized");
263 }
264 
265 
267 {
268  if (m_Request) {
269  switch (m_Request->GetRequestType()) {
271  return GetRequest<SPSGS_ResolveRequest>().m_Hops;
275  return GetRequest<SPSGS_BlobRequestBase>().m_Hops;
277  return GetRequest<SPSGS_TSEChunkRequest>().m_Hops;
279  return GetRequest<SPSGS_AccessionVersionHistoryRequest>().m_Hops;
281  break;
282  default:
283  break;
284  }
285 
286  return 0;
287  }
289  "User request is not initialized");
290 }
291 
292 
293 string CPSGS_Request::GetName(void) const
294 {
295  if (m_Request)
296  return m_Request->GetName();
297  return "unknown (request is not initialized)";
298 }
299 
300 
302 {
303  CJsonNode json;
304 
305  if (m_Request) {
306  json = m_Request->Serialize();
307  } else {
308  json = CJsonNode::NewObjectNode();
309  json.SetString("request name", GetName());
310  }
311 
312  json.SetInteger("request id", m_RequestId);
313  json.SetInteger("backlog time mks", m_BacklogTimeMks);
314  json.SetInteger("concurrent processor count", m_ConcurrentProcessorCount);
315  return json;
316 }
317 
318 
320 {
321  string msg;
322 
323  for (auto & item : m_LimitedProcessors) {
324  if (!msg.empty())
325  msg += "; ";
326  msg += "processor: " + item.first +
327  ", concurrency limit: " + to_string(item.second);
328  }
329  return msg;
330 }
331 
332 
334 {
335  json.SetString("trace", TraceToString(m_Trace));
336  json.SetBoolean("processor events", m_ProcessorEvents);
337 
338  auto now = psg_clock_t::now();
339  uint64_t mks = chrono::duration_cast<chrono::microseconds>
340  (now - m_StartTimestamp).count();
341  json.SetInteger("created ago mks", mks);
342 
343  CJsonNode enabled_procs(CJsonNode::NewArrayNode());
344  for (const auto & name : m_EnabledProcessors) {
345  enabled_procs.AppendString(name);
346  }
347  json.SetByKey("enabled processors", enabled_procs);
348 
349  CJsonNode disabled_procs(CJsonNode::NewArrayNode());
350  for (const auto & name : m_DisabledProcessors) {
351  disabled_procs.AppendString(name);
352  }
353  json.SetByKey("disabled processors", disabled_procs);
354 }
355 
356 
358 {
360 
361  json.SetString("request name", GetName());
362  json.SetString("seq id", m_SeqId);
363  json.SetInteger("seq id type", m_SeqIdType);
364  json.SetInteger("include data flags", m_IncludeDataFlags);
365  json.SetString("output format", OutputFormatToString(m_OutputFormat));
366  json.SetString("use cache", CacheAndDbUseToString(m_UseCache));
368  json.SetInteger("hops", m_Hops);
370  return json;
371 }
372 
373 
375 {
376  json.SetString("tse option", TSEOptionToString(m_TSEOption));
377  json.SetString("use cache", CacheAndDbUseToString(m_UseCache));
378  json.SetString("client id", m_ClientId);
379  json.SetInteger("send blob if small", m_SendBlobIfSmall);
380  json.SetInteger("hops", m_Hops);
381 }
382 
383 
385 {
387 
388  json.SetString("request name", GetName());
389  json.SetString("seq id", m_SeqId);
390  json.SetInteger("seq id type", m_SeqIdType);
391 
392  CJsonNode exclude_blobs(CJsonNode::NewArrayNode());
393  for (const auto & blob_id : m_ExcludeBlobs) {
394  exclude_blobs.AppendString(blob_id);
395  }
396  json.SetByKey("exclude blobs", exclude_blobs);
397 
399  json.SetInteger("resend timeout mks", m_ResendTimeoutMks);
400 
403  return json;
404 }
405 
406 
408 {
410 
411  json.SetString("request name", GetName());
412  json.SetString("blob id", m_BlobId.GetId());
413  json.SetInteger("last modified", m_LastModified);
414 
417  return json;
418 }
419 
420 
422 {
424 
425  json.SetString("request name", GetName());
426  json.SetString("seq id", m_SeqId);
427  json.SetInteger("seq id type", m_SeqIdType);
428 
430  for (const auto & name : m_Names) {
431  names.AppendString(name);
432  }
433  json.SetByKey("names", names);
434 
435  json.SetInteger("resend timeout mks", m_ResendTimeoutMks);
436 
438  for (const auto & seq_id : m_SeqIds) {
439  seq_ids.AppendString(seq_id);
440  }
441  json.SetByKey("seq ids", seq_ids);
442 
445  return json;
446 }
447 
448 
450 {
452 
453  json.SetString("request name", GetName());
454  json.SetString("seq id", m_SeqId);
455  json.SetInteger("seq id type", m_SeqIdType);
456  json.SetString("use cache", CacheAndDbUseToString(m_UseCache));
457  json.SetInteger("hops", m_Hops);
458 
460  return json;
461 }
462 
463 
465 {
467 
468  json.SetString("request name", GetName());
469  if (m_Protein.has_value())
470  json.SetString("protein", m_Protein.value());
471  else
472  json.SetString("protein", "<null>");
473  json.SetInteger("ipg", m_IPG);
474  if (m_Nucleotide.has_value())
475  json.SetString("nucleotide", m_Nucleotide.value());
476  else
477  json.SetString("nucleotide", "<null>");
478  json.SetString("use cache", CacheAndDbUseToString(m_UseCache));
479  json.SetBoolean("seq id resolve", m_SeqIdResolve);
480 
482  return json;
483 }
484 
485 
486 // If the name has already been processed then it returns a priority of
487 // the processor which did it.
488 // If the name is new to the list then returns kUnknownPriority
491  const string & name)
492 {
494  CSpinlockGuard guard(&m_Lock);
495 
496  for (auto & item : m_Processed) {
497  if (item.second == name) {
498  ret = max(ret, item.first);
499  }
500  }
501 
502  // Add to the list regardless if it was in the list or not
503  m_Processed.push_back(make_pair(priority, name));
504 
505  return ret;
506 }
507 
508 
511 {
512  CSpinlockGuard guard(&m_Lock);
515  return ret;
516 }
517 
518 
519 // The names could be processed by the other processors which priority is
520 // higher (or equal) than the given. Those names should not be provided.
521 vector<string>
523 {
524  vector<string> ret = m_Names;
525 
526  CSpinlockGuard guard(&m_Lock);
527  for (const auto & item : m_Processed) {
528  if (item.first >= priority) {
529  auto it = find(ret.begin(), ret.end(), item.second);
530  if (it != ret.end()) {
531  ret.erase(it);
532  }
533  }
534  }
535  return ret;
536 }
537 
538 
539 bool SPSGS_AnnotRequest::WasSent(const string & annot_name) const
540 {
541  for (const auto & item : m_Processed) {
542  if (item.second == annot_name)
543  return true;
544  }
545  return false;
546 }
547 
548 
549 vector<pair<TProcessorPriority, string>>
551 {
552  CSpinlockGuard guard(&m_Lock);
553  return m_Processed;
554 }
555 
556 
557 void
558 SPSGS_AnnotRequest::ReportResultStatus(const string & annot_name,
560 {
561  CSpinlockGuard guard(&m_Lock);
562 
563  switch (rs) {
564  case ePSGS_RS_NotFound:
565  m_NotFound.insert(annot_name);
566  break;
567  case ePSGS_RS_Error:
568  case ePSGS_RS_Timeout:
570  {
571  auto it = m_ErrorAnnotations.find(annot_name);
572  if (it == m_ErrorAnnotations.end()) {
573  m_ErrorAnnotations[annot_name] = int(rs);
574  } else {
575  if (it->second < int(rs)) {
576  it->second = int(rs);
577  }
578  }
579  }
580  break;
581  }
582 }
583 
584 
587 {
588  if (m_Names.size() != 0) {
589  // This kind of error report may happened only in the case when exactly
590  // one annotation was requested
591  return;
592  }
593 
594  CSpinlockGuard guard(&m_Lock);
595 
596  // Mark specifically this processor annotation as the one with an error
597  // i.e. move it from 'ok' to the corresponding list
598  for (auto it = m_Processed.begin(); it != m_Processed.end(); ++it) {
599  if (it->first == priority && it->second == m_Names[0]) {
600  m_Processed.erase(it);
601 
602  switch (rs) {
603  case ePSGS_RS_NotFound:
604  // Strictly speaking this must not happened.
605  // The blob error reporting is for errors.
607  break;
608  case ePSGS_RS_Error:
609  case ePSGS_RS_Timeout:
611  // The ePSGS_RS_Unavailable should not really happened
612  // because it is for the infrastructure to report the
613  // status of a named annotation while this method is for
614  // the processors.
615  {
616  auto it = m_ErrorAnnotations.find(m_Names[0]);
617  if (it == m_ErrorAnnotations.end()) {
618  m_ErrorAnnotations[m_Names[0]] = int(rs);
619  } else {
620  if (it->second < int(rs)) {
621  it->second = int(rs);
622  }
623  }
624  }
625  break;
626  }
627  break;
628  }
629  }
630 }
631 
632 
634 {
636 
637  json.SetString("request name", GetName());
638  json.SetInteger("id2 chunk", m_Id2Chunk);
639  json.SetString("id2 info", m_Id2Info);
640  json.SetString("use cache", CacheAndDbUseToString(m_UseCache));
641 
643  return json;
644 }
645 
HTTP request.
JSON node abstraction.
static CJsonNode NewArrayNode()
Create a new JSON array node.
void SetString(const string &key, const string &value)
Set a JSON object element to the specified string value.
void AppendString(const string &value)
For an array node, add a string node at the end of the array.
void SetBoolean(const string &key, bool value)
Set a JSON object element to the specified boolean value.
void SetInteger(const string &key, Int8 value)
Set a JSON object element to the specified integer value.
void SetByKey(const string &key, CJsonNode::TInstance value)
For a JSON object node, insert a new element or update an existing element.
static CJsonNode NewObjectNode()
Create a new JSON object node.
void WaitFor(const string &event_name, size_t timeout_sec=10)
void Lock(const string &event_name)
int GetHops(void)
map< string, SWaitData * > m_Wait
void SetRequestContext(void)
virtual CJsonNode Serialize(void) const
CRef< CRequestContext > GetRequestContext(void)
bool NeedTrace(void)
EPSGS_Type GetRequestType(void) const
string GetLimitedProcessorsMessage(void)
CRef< CRequestContext > m_RequestContext
unique_ptr< SPSGS_RequestBase > m_Request
size_t m_ConcurrentProcessorCount
void Unlock(const string &event_name)
bool NeedProcessorEvents(void)
vector< pair< string, size_t > > m_LimitedProcessors
uint64_t m_BacklogTimeMks
@ ePSGS_BlobBySatSatKeyRequest
@ ePSGS_AccessionVersionHistoryRequest
psg_time_point_t GetStartTimestamp(void) const
virtual string GetName(void) const
const_iterator end() const
Definition: map.hpp:152
const_iterator find(const key_type &key) const
Definition: map.hpp:153
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
The NCBI C++ standard methods for dealing with std::string.
static const struct name_t names[]
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
Definition: ncbidiag.cpp:1907
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
Definition: ncbidiag.cpp:1901
CRef< CRequestContext > Clone(void) const
Copy current request context to a new one.
void SetReadOnly(bool read_only)
Switch request context to read-only mode.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
bool NotNull(void) const THROWS_NONE
Check if pointer is not null – same effect as NotEmpty().
Definition: ncbiobj.hpp:744
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
char * buf
int len
T max(T x_, T y_)
size_t GetNextRequestId(void)
static atomic< bool > s_RequestIdLock(false)
static size_t s_NextRequestId
USING_NCBI_SCOPE
#define PSG_ERROR(message)
#define PSG_WARNING(message)
int TProcessorPriority
const int kUnknownPriority
psg_clock_t::time_point psg_time_point_t
long PSGToString(long signed_value, char *buf)
signed int int32_t
Definition: stdint.h:123
unsigned __int64 uint64_t
Definition: stdint.h:136
virtual string GetName(void) const
virtual CJsonNode Serialize(void) const
TProcessorPriority RegisterProcessedName(TProcessorPriority priority, const string &name)
bool WasSent(const string &annot_name) const
vector< string > m_Names
map< string, int > m_ErrorAnnotations
vector< string > GetNotProcessedName(TProcessorPriority priority)
vector< pair< TProcessorPriority, string > > m_Processed
vector< pair< TProcessorPriority, string > > GetProcessedNames(void) const
vector< string > m_SeqIds
TProcessorPriority RegisterBioseqInfo(TProcessorPriority priority)
atomic< bool > m_Lock
virtual string GetName(void) const
void ReportBlobError(TProcessorPriority priority, EPSGS_ResultStatus rs)
set< string > m_NotFound
unsigned long m_ResendTimeoutMks
void ReportResultStatus(const string &annot_name, EPSGS_ResultStatus rs)
TProcessorPriority m_ProcessedBioseqInfo
virtual CJsonNode Serialize(void) const
virtual CJsonNode Serialize(void) const
virtual string GetName(void) const
CBlobRecord::TTimestamp m_LastModified
virtual CJsonNode Serialize(void) const
virtual string GetName(void) const
vector< string > m_ExcludeBlobs
unsigned long m_ResendTimeoutMks
EPSGS_AccSubstitutioOption m_AccSubstOption
string GetId(void) const
EPSGS_CacheAndDbUse m_UseCache
static string TSEOptionToString(EPSGS_TSEOption option)
void AppendCommonParameters(CJsonNode &json) const
unsigned long m_SendBlobIfSmall
EPSGS_TSEOption m_TSEOption
optional< string > m_Protein
EPSGS_CacheAndDbUse m_UseCache
optional< string > m_Nucleotide
virtual CJsonNode Serialize(void) const
virtual string GetName(void) const
static string TraceToString(EPSGS_Trace trace)
static string CacheAndDbUseToString(EPSGS_CacheAndDbUse option)
EPSGS_Trace m_Trace
vector< string > m_DisabledProcessors
vector< string > m_EnabledProcessors
static string AccSubstitutioOptionToString(EPSGS_AccSubstitutioOption option)
psg_time_point_t m_StartTimestamp
void AppendCommonParameters(CJsonNode &json) const
static string OutputFormatToString(EPSGS_OutputFormat format)
EPSGS_CacheAndDbUse m_UseCache
virtual CJsonNode Serialize(void) const
virtual string GetName(void) const
TPSGS_BioseqIncludeData m_IncludeDataFlags
EPSGS_OutputFormat m_OutputFormat
EPSGS_AccSubstitutioOption m_AccSubstOption
virtual CJsonNode Serialize(void) const
EPSGS_CacheAndDbUse m_UseCache
virtual string GetName(void) const
Modified on Mon Dec 11 02:42:12 2023 by modify_doxy.py rev. 669887