NCBI C++ ToolKit
psgs_request.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: psgs_request.cpp 101645 2024-01-18 20:19:14Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description:
29  *
30  */
31 #include <ncbi_pch.hpp>
32 
33 #include <corelib/ncbistr.hpp>
34 
35 #include "psgs_request.hpp"
37 #include "pubseq_gateway_utils.hpp"
38 
40 
41 static atomic<bool> s_RequestIdLock(false);
42 static size_t s_NextRequestId = 0;
43 
44 
45 size_t GetNextRequestId(void)
46 {
48  auto request_id = ++s_NextRequestId;
49  return request_id;
50 }
51 
52 
54 {
55  char buf[64];
56  long len;
57  len = PSGToString(sat, buf);
58  m_Id.append(buf, len)
59  .append(1, '.');
60  len = PSGToString(sat_key, buf);
61  m_Id.append(buf, len);
62 }
63 
64 
66  m_RequestId(0),
67  m_BacklogTimeMks(0),
68  m_ConcurrentProcessorCount(0)
69 {}
70 
71 
73 {
74  for (auto it: m_Wait) {
75  switch (it.second->m_State) {
77  delete it.second;
78  break;
80  // This is rather strange: a processor has not unlocked an
81  // event and at the same time a request is going to be
82  // destroyed.
83  PSG_ERROR("Reply is going to be deleted when a processor has "
84  "not unlocked an event");
85  delete it.second;
86  break;
88  // This is rather strange: a processor is still waiting on the
89  // condition variable and at the same time the request is going
90  // to be destroyed.
91  // Just in case, lets unlock the condition variable and then
92  // delete the associated data
93  PSG_ERROR("Reply is going to be deleted when a processor is "
94  "locked on a condition variable");
95  it.second->m_WaitObject.notify_all();
96  delete it.second;
97  break;
98  }
99  }
100 }
101 
102 
104  unique_ptr<SPSGS_RequestBase> req,
105  CRef<CRequestContext> request_context) :
106  m_HttpRequest(http_request),
107  m_Request(move(req)),
108  m_RequestContext(request_context),
109  m_RequestId(GetNextRequestId()),
110  m_BacklogTimeMks(0),
111  m_ConcurrentProcessorCount(0)
112 {}
113 
114 
116 {
117  if (m_Request)
118  return m_Request->GetRequestType();
119  return ePSGS_UnknownRequest;
120 }
121 
122 
123 void CPSGS_Request::Lock(const string & event_name)
124 {
126  return; // No parallel processors so there is no point to wait
127 
128  unique_lock<mutex> scope_lock(m_WaitLock);
129 
130  if (m_Wait.find(event_name) != m_Wait.end()) {
131  // Double locking; it is rather an error
133  "Multiple lock of the same event is not supported");
134  }
135 
136  m_Wait[event_name] = new SWaitData();
137  m_Wait[event_name]->m_State = SWaitData::ePSGS_LockedNobodyWaits;
138 }
139 
140 
141 void CPSGS_Request::Unlock(const string & event_name)
142 {
144  return; // No parallel processors so there is no point to wait
145 
146  unique_lock<mutex> scope_lock(m_WaitLock);
147 
148  auto it = m_Wait.find(event_name);
149  if (it == m_Wait.end()) {
150  // Unlocking something which was not locked
151  return;
152  }
153 
154  switch (it->second->m_State) {
156  it->second->m_State = SWaitData::ePSGS_Unlocked;
157  break;
159  // Double unlocking; it's OK
160  break;
162  it->second->m_WaitCount = 0;
163  it->second->m_State = SWaitData::ePSGS_Unlocked;
164  it->second->m_WaitObject.notify_all();
165  break;
166  }
167 }
168 
169 
170 void CPSGS_Request::WaitFor(const string & event_name, size_t timeout_sec)
171 {
172  if (m_ConcurrentProcessorCount < 2) {
173  // No parallel processors so there is no point to wait
174  return;
175  }
176 
177  unique_lock<mutex> scope_lock(m_WaitLock);
178 
179  auto it = m_Wait.find(event_name);
180  if (it == m_Wait.end()) {
181  // There was no Lock() call for that event
182  return;
183  }
184 
185  switch (it->second->m_State) {
188  // The logic to initiate waiting is below
189  break;
191  // It has already been unlocked
192  return;
193  }
194 
195  ++it->second->m_WaitCount;
196  it->second->m_State = SWaitData::ePSGS_LockedSomebodyWaits;
197  auto status = it->second->m_WaitObject.wait_for(scope_lock,
198  chrono::seconds(timeout_sec));
199  if (status == cv_status::timeout) {
200  // Note: decrementing the count only in case of a timeout. In case of a
201  // normal Unlock() the count will be reset there.
202  if (--it->second->m_WaitCount == 0)
203  it->second->m_State = SWaitData::ePSGS_LockedNobodyWaits;
204 
205  string message = "Timeout (" + to_string(timeout_sec) +
206  " seconds) waiting on event '" + event_name + "'";
207 
208  PSG_WARNING(message);
209  NCBI_THROW(CPubseqGatewayException, eTimeout, message);
210  }
211 
212  // Here:
213  // - waiting has completed within the timeout
214  // - there is no need to change the state because it is done in Unlock()
215  // by unlocking processor and the state here is ePSGS_Unlocked
216 }
217 
218 
219 // Provides the original request context
221 {
222  return m_RequestContext;
223 }
224 
225 
226 // Sets the cloned request context so that many threads can produce messages
228 {
229  if (m_RequestContext.NotNull()) {
232  }
233 }
234 
235 
237 {
238  if (m_Request)
239  return m_Request->GetStartTimestamp();
240 
242  "User request is not initialized");
243 }
244 
245 
247 {
248  if (m_Request)
249  return m_Request->GetTrace() == SPSGS_RequestBase::ePSGS_WithTracing;
250 
252  "User request is not initialized");
253 }
254 
255 
257 {
258  if (m_Request)
259  return m_Request->GetProcessorEvents();
260 
262  "User request is not initialized");
263 }
264 
265 
266 optional<bool> CPSGS_Request::GetIncludeHUP(void)
267 {
268  if (m_Request) {
269  switch (m_Request->GetRequestType()) {
273  return GetRequest<SPSGS_BlobRequestBase>().m_IncludeHUP;
275  return GetRequest<SPSGS_TSEChunkRequest>().m_IncludeHUP;
279  break;
280  default:
281  break;
282  }
283 
284  return optional<bool>();
285  }
287  "User request is not initialized");
288 }
289 
290 
292 {
293  if (m_Request) {
294  switch (m_Request->GetRequestType()) {
296  return GetRequest<SPSGS_ResolveRequest>().m_Hops;
300  return GetRequest<SPSGS_BlobRequestBase>().m_Hops;
302  return GetRequest<SPSGS_TSEChunkRequest>().m_Hops;
304  return GetRequest<SPSGS_AccessionVersionHistoryRequest>().m_Hops;
306  break;
307  default:
308  break;
309  }
310 
311  return 0;
312  }
314  "User request is not initialized");
315 }
316 
317 
318 string CPSGS_Request::GetName(void) const
319 {
320  if (m_Request)
321  return m_Request->GetName();
322  return "unknown (request is not initialized)";
323 }
324 
325 
327 {
328  CJsonNode json;
329 
330  if (m_Request) {
331  json = m_Request->Serialize();
332  } else {
333  json = CJsonNode::NewObjectNode();
334  json.SetString("request name", GetName());
335  }
336 
337  json.SetInteger("request id", m_RequestId);
338  json.SetInteger("backlog time mks", m_BacklogTimeMks);
339  json.SetInteger("concurrent processor count", m_ConcurrentProcessorCount);
340  return json;
341 }
342 
343 
345 {
346  string msg;
347 
348  for (auto & item : m_LimitedProcessors) {
349  if (!msg.empty())
350  msg += "; ";
351  msg += "processor: " + item.first +
352  ", concurrency limit: " + to_string(item.second);
353  }
354  return msg;
355 }
356 
357 
359 {
360  json.SetString("trace", TraceToString(m_Trace));
361  json.SetBoolean("processor events", m_ProcessorEvents);
362 
363  auto now = psg_clock_t::now();
364  uint64_t mks = chrono::duration_cast<chrono::microseconds>
365  (now - m_StartTimestamp).count();
366  json.SetInteger("created ago mks", mks);
367 
368  CJsonNode enabled_procs(CJsonNode::NewArrayNode());
369  for (const auto & name : m_EnabledProcessors) {
370  enabled_procs.AppendString(name);
371  }
372  json.SetByKey("enabled processors", enabled_procs);
373 
374  CJsonNode disabled_procs(CJsonNode::NewArrayNode());
375  for (const auto & name : m_DisabledProcessors) {
376  disabled_procs.AppendString(name);
377  }
378  json.SetByKey("disabled processors", disabled_procs);
379 }
380 
381 
383 {
385 
386  json.SetString("request name", GetName());
387  json.SetString("seq id", m_SeqId);
388  json.SetInteger("seq id type", m_SeqIdType);
389  json.SetInteger("include data flags", m_IncludeDataFlags);
390  json.SetString("output format", OutputFormatToString(m_OutputFormat));
391  json.SetString("use cache", CacheAndDbUseToString(m_UseCache));
393  json.SetInteger("hops", m_Hops);
395  return json;
396 }
397 
398 
400 {
401  json.SetString("tse option", TSEOptionToString(m_TSEOption));
402  json.SetString("use cache", CacheAndDbUseToString(m_UseCache));
403  json.SetString("client id", m_ClientId);
404  json.SetInteger("send blob if small", m_SendBlobIfSmall);
405  json.SetInteger("hops", m_Hops);
406 }
407 
408 
410 {
412 
413  json.SetString("request name", GetName());
414  json.SetString("seq id", m_SeqId);
415  json.SetInteger("seq id type", m_SeqIdType);
416 
417  CJsonNode exclude_blobs(CJsonNode::NewArrayNode());
418  for (const auto & blob_id : m_ExcludeBlobs) {
419  exclude_blobs.AppendString(blob_id);
420  }
421  json.SetByKey("exclude blobs", exclude_blobs);
422 
424  json.SetInteger("resend timeout mks", m_ResendTimeoutMks);
425 
428  return json;
429 }
430 
431 
433 {
435 
436  json.SetString("request name", GetName());
437  json.SetString("blob id", m_BlobId.GetId());
438  json.SetInteger("last modified", m_LastModified);
439 
442  return json;
443 }
444 
445 
447 {
449 
450  json.SetString("request name", GetName());
451  json.SetString("seq id", m_SeqId);
452  json.SetInteger("seq id type", m_SeqIdType);
453 
455  for (const auto & name : m_Names) {
456  names.AppendString(name);
457  }
458  json.SetByKey("names", names);
459 
460  json.SetInteger("resend timeout mks", m_ResendTimeoutMks);
461 
463  for (const auto & seq_id : m_SeqIds) {
464  seq_ids.AppendString(seq_id);
465  }
466  json.SetByKey("seq ids", seq_ids);
467 
470  return json;
471 }
472 
473 
475 {
477 
478  json.SetString("request name", GetName());
479  json.SetString("seq id", m_SeqId);
480  json.SetInteger("seq id type", m_SeqIdType);
481  json.SetString("use cache", CacheAndDbUseToString(m_UseCache));
482  json.SetInteger("hops", m_Hops);
483 
485  return json;
486 }
487 
488 
490 {
492 
493  json.SetString("request name", GetName());
494  if (m_Protein.has_value())
495  json.SetString("protein", m_Protein.value());
496  else
497  json.SetString("protein", "<null>");
498  json.SetInteger("ipg", m_IPG);
499  if (m_Nucleotide.has_value())
500  json.SetString("nucleotide", m_Nucleotide.value());
501  else
502  json.SetString("nucleotide", "<null>");
503  json.SetString("use cache", CacheAndDbUseToString(m_UseCache));
504  json.SetBoolean("seq id resolve", m_SeqIdResolve);
505 
507  return json;
508 }
509 
510 
511 // If the name has already been processed then it returns a priority of
512 // the processor which did it.
513 // If the name is new to the list then returns kUnknownPriority
516  const string & name)
517 {
519  CSpinlockGuard guard(&m_Lock);
520 
521  for (auto & item : m_Processed) {
522  if (item.second == name) {
523  ret = max(ret, item.first);
524  }
525  }
526 
527  // Add to the list regardless if it was in the list or not
528  m_Processed.push_back(make_pair(priority, name));
529 
530  return ret;
531 }
532 
533 
536 {
537  CSpinlockGuard guard(&m_Lock);
540  return ret;
541 }
542 
543 
544 // The names could be processed by the other processors which priority is
545 // higher (or equal) than the given. Those names should not be provided.
546 vector<string>
548 {
549  vector<string> ret = m_Names;
550 
551  CSpinlockGuard guard(&m_Lock);
552  for (const auto & item : m_Processed) {
553  if (item.first >= priority) {
554  auto it = find(ret.begin(), ret.end(), item.second);
555  if (it != ret.end()) {
556  ret.erase(it);
557  }
558  }
559  }
560  return ret;
561 }
562 
563 
564 bool SPSGS_AnnotRequest::WasSent(const string & annot_name) const
565 {
566  for (const auto & item : m_Processed) {
567  if (item.second == annot_name)
568  return true;
569  }
570  return false;
571 }
572 
573 
574 vector<pair<TProcessorPriority, string>>
576 {
577  CSpinlockGuard guard(&m_Lock);
578  return m_Processed;
579 }
580 
581 
582 void
583 SPSGS_AnnotRequest::ReportResultStatus(const string & annot_name,
585 {
586  CSpinlockGuard guard(&m_Lock);
587 
588  switch (rs) {
589  case ePSGS_RS_NotFound:
590  m_NotFound.insert(annot_name);
591  break;
592  case ePSGS_RS_Error:
593  case ePSGS_RS_Timeout:
595  {
596  auto it = m_ErrorAnnotations.find(annot_name);
597  if (it == m_ErrorAnnotations.end()) {
598  m_ErrorAnnotations[annot_name] = int(rs);
599  } else {
600  if (it->second < int(rs)) {
601  it->second = int(rs);
602  }
603  }
604  }
605  break;
606  }
607 }
608 
609 
612 {
613  if (m_Names.size() != 0) {
614  // This kind of error report may happened only in the case when exactly
615  // one annotation was requested
616  return;
617  }
618 
619  CSpinlockGuard guard(&m_Lock);
620 
621  // Mark specifically this processor annotation as the one with an error
622  // i.e. move it from 'ok' to the corresponding list
623  for (auto it = m_Processed.begin(); it != m_Processed.end(); ++it) {
624  if (it->first == priority && it->second == m_Names[0]) {
625  m_Processed.erase(it);
626 
627  switch (rs) {
628  case ePSGS_RS_NotFound:
629  // Strictly speaking this must not happened.
630  // The blob error reporting is for errors.
632  break;
633  case ePSGS_RS_Error:
634  case ePSGS_RS_Timeout:
636  // The ePSGS_RS_Unavailable should not really happened
637  // because it is for the infrastructure to report the
638  // status of a named annotation while this method is for
639  // the processors.
640  {
641  auto it = m_ErrorAnnotations.find(m_Names[0]);
642  if (it == m_ErrorAnnotations.end()) {
643  m_ErrorAnnotations[m_Names[0]] = int(rs);
644  } else {
645  if (it->second < int(rs)) {
646  it->second = int(rs);
647  }
648  }
649  }
650  break;
651  }
652  break;
653  }
654  }
655 }
656 
657 
659 {
661 
662  json.SetString("request name", GetName());
663  json.SetInteger("id2 chunk", m_Id2Chunk);
664  json.SetString("id2 info", m_Id2Info);
665  json.SetString("use cache", CacheAndDbUseToString(m_UseCache));
666 
668  return json;
669 }
670 
HTTP request.
JSON node abstraction.
static CJsonNode NewArrayNode()
Create a new JSON array node.
void SetString(const string &key, const string &value)
Set a JSON object element to the specified string value.
void AppendString(const string &value)
For an array node, add a string node at the end of the array.
void SetBoolean(const string &key, bool value)
Set a JSON object element to the specified boolean value.
void SetInteger(const string &key, Int8 value)
Set a JSON object element to the specified integer value.
void SetByKey(const string &key, CJsonNode::TInstance value)
For a JSON object node, insert a new element or update an existing element.
static CJsonNode NewObjectNode()
Create a new JSON object node.
void WaitFor(const string &event_name, size_t timeout_sec=10)
void Lock(const string &event_name)
int GetHops(void)
map< string, SWaitData * > m_Wait
void SetRequestContext(void)
optional< bool > GetIncludeHUP(void)
virtual CJsonNode Serialize(void) const
CRef< CRequestContext > GetRequestContext(void)
bool NeedTrace(void)
EPSGS_Type GetRequestType(void) const
string GetLimitedProcessorsMessage(void)
CRef< CRequestContext > m_RequestContext
unique_ptr< SPSGS_RequestBase > m_Request
size_t m_ConcurrentProcessorCount
void Unlock(const string &event_name)
bool NeedProcessorEvents(void)
vector< pair< string, size_t > > m_LimitedProcessors
uint64_t m_BacklogTimeMks
@ ePSGS_BlobBySatSatKeyRequest
@ ePSGS_AccessionVersionHistoryRequest
psg_time_point_t GetStartTimestamp(void) const
virtual string GetName(void) const
const_iterator end() const
Definition: map.hpp:152
const_iterator find(const key_type &key) const
Definition: map.hpp:153
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
The NCBI C++ standard methods for dealing with std::string.
static const struct name_t names[]
Uint8 uint64_t
Int4 int32_t
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
Definition: ncbidiag.cpp:1907
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
Definition: ncbidiag.cpp:1901
CRef< CRequestContext > Clone(void) const
Copy current request context to a new one.
void SetReadOnly(bool read_only)
Switch request context to read-only mode.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
bool NotNull(void) const THROWS_NONE
Check if pointer is not null – same effect as NotEmpty().
Definition: ncbiobj.hpp:744
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
char * buf
int len
T max(T x_, T y_)
size_t GetNextRequestId(void)
static atomic< bool > s_RequestIdLock(false)
static size_t s_NextRequestId
USING_NCBI_SCOPE
#define PSG_ERROR(message)
#define PSG_WARNING(message)
int TProcessorPriority
const int kUnknownPriority
psg_clock_t::time_point psg_time_point_t
long PSGToString(long signed_value, char *buf)
virtual string GetName(void) const
virtual CJsonNode Serialize(void) const
TProcessorPriority RegisterProcessedName(TProcessorPriority priority, const string &name)
bool WasSent(const string &annot_name) const
vector< string > m_Names
map< string, int > m_ErrorAnnotations
vector< string > GetNotProcessedName(TProcessorPriority priority)
vector< pair< TProcessorPriority, string > > m_Processed
vector< pair< TProcessorPriority, string > > GetProcessedNames(void) const
vector< string > m_SeqIds
TProcessorPriority RegisterBioseqInfo(TProcessorPriority priority)
atomic< bool > m_Lock
virtual string GetName(void) const
void ReportBlobError(TProcessorPriority priority, EPSGS_ResultStatus rs)
set< string > m_NotFound
unsigned long m_ResendTimeoutMks
void ReportResultStatus(const string &annot_name, EPSGS_ResultStatus rs)
TProcessorPriority m_ProcessedBioseqInfo
virtual CJsonNode Serialize(void) const
virtual CJsonNode Serialize(void) const
virtual string GetName(void) const
CBlobRecord::TTimestamp m_LastModified
virtual CJsonNode Serialize(void) const
virtual string GetName(void) const
vector< string > m_ExcludeBlobs
unsigned long m_ResendTimeoutMks
EPSGS_AccSubstitutioOption m_AccSubstOption
string GetId(void) const
EPSGS_CacheAndDbUse m_UseCache
static string TSEOptionToString(EPSGS_TSEOption option)
void AppendCommonParameters(CJsonNode &json) const
unsigned long m_SendBlobIfSmall
EPSGS_TSEOption m_TSEOption
optional< string > m_Protein
EPSGS_CacheAndDbUse m_UseCache
optional< string > m_Nucleotide
virtual CJsonNode Serialize(void) const
virtual string GetName(void) const
static string TraceToString(EPSGS_Trace trace)
static string CacheAndDbUseToString(EPSGS_CacheAndDbUse option)
EPSGS_Trace m_Trace
vector< string > m_DisabledProcessors
vector< string > m_EnabledProcessors
static string AccSubstitutioOptionToString(EPSGS_AccSubstitutioOption option)
psg_time_point_t m_StartTimestamp
void AppendCommonParameters(CJsonNode &json) const
static string OutputFormatToString(EPSGS_OutputFormat format)
EPSGS_CacheAndDbUse m_UseCache
virtual CJsonNode Serialize(void) const
virtual string GetName(void) const
TPSGS_BioseqIncludeData m_IncludeDataFlags
EPSGS_OutputFormat m_OutputFormat
EPSGS_AccSubstitutioOption m_AccSubstOption
virtual CJsonNode Serialize(void) const
EPSGS_CacheAndDbUse m_UseCache
virtual string GetName(void) const
Modified on Fri Jun 14 16:54:59 2024 by modify_doxy.py rev. 669887