NCBI C++ ToolKit
misc.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef OBJTOOLS__PUBSEQ_GATEWAY__CLIENT__IMPL__MISC__HPP
2 #define OBJTOOLS__PUBSEQ_GATEWAY__CLIENT__IMPL__MISC__HPP
3 
4 /* $Id: misc.hpp 101197 2023-11-15 14:44:11Z sadyrovr $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Rafael Sadyrov
30  *
31  */
32 
33 #include <atomic>
34 #include <chrono>
35 #include <condition_variable>
36 #include <memory>
37 #include <mutex>
38 #include <deque>
39 #include <thread>
40 
41 #include <corelib/ncbitime.hpp>
42 #include <corelib/ncbi_param.hpp>
43 
45 
47 
48 template <>
49 struct SThreadSafe<void>
50 {
51 protected:
52  mutex m_Mutex;
53 };
54 
55 template <typename TType = void>
56 struct SPSG_CV : SThreadSafe<TType>
57 {
58 public:
59  void NotifyOne() volatile { GetThis().x_NotifyOne(); }
60  void NotifyAll() volatile { GetThis().x_NotifyAll(); }
61 
62  template <class... TArgs>
63  bool WaitUntil(TArgs&&... args) volatile
64  {
65  return GetThis().x_WaitUntil(forward<TArgs>(args)...);
66  }
67 
68  bool Reset() volatile { return GetThis().x_Reset(); }
69 
70 private:
71  using clock = chrono::system_clock;
72 
73  void x_NotifyOne() { x_Signal(); m_CV.notify_one(); }
74  void x_NotifyAll() { x_Signal(); m_CV.notify_all(); }
75 
76  bool x_WaitUntil(const CDeadline& deadline)
77  {
78  return deadline.IsInfinite() ? x_Wait() : x_Wait(x_GetTP(deadline));
79  }
80 
81  template <typename T = bool>
82  bool x_WaitUntil(const volatile atomic<T>& a, const CDeadline& deadline, T v = false, bool rv = false)
83  {
84  constexpr auto kWait = chrono::milliseconds(100);
85  const auto until = deadline.IsInfinite() ? clock::time_point::max() : x_GetTP(deadline);
86 
87  do {
88  const auto max = clock::now() + kWait;
89 
90  if (until < max) {
91  return x_Wait(until);
92  }
93 
94  if (x_Wait(max)) {
95  return true;
96  }
97  }
98  while (a == v);
99 
100  return rv;
101  }
102 
104  {
105  time_t seconds;
106  unsigned int nanoseconds;
107 
108  d.GetExpirationTime(&seconds, &nanoseconds);
109  const auto ns = chrono::duration_cast<clock::duration>(chrono::nanoseconds(nanoseconds));
110  return clock::from_time_t(seconds) + ns;
111  }
112 
113  template <class... TArgs>
114  bool x_Wait(TArgs&&... args)
115  {
116  unique_lock<mutex> lock(SThreadSafe<TType>::m_Mutex);
117 
118  if (!x_CvWait(lock, forward<TArgs>(args)...)) return false;
119 
120  m_Signal--;
121  return true;
122  }
123 
124  bool x_CvWait(unique_lock<mutex>& l, const clock::time_point& t)
125  {
126  return m_CV.wait_until(l, t, [&](){ return m_Signal > 0; });
127  }
128 
129  bool x_CvWait(unique_lock<mutex>& l)
130  {
131  m_CV.wait(l, [&](){ return m_Signal > 0; });
132  return true;
133  }
134 
135  void x_Signal()
136  {
137  lock_guard<mutex> lock(SThreadSafe<TType>::m_Mutex);
138  m_Signal++;
139  }
140 
141  bool x_Reset()
142  {
143  lock_guard<mutex> lock(SThreadSafe<TType>::m_Mutex);
144  return exchange(m_Signal, 0);
145  }
146 
147  SPSG_CV& GetThis() volatile { return const_cast<SPSG_CV&>(*this); }
148 
149  condition_variable m_CV;
150  int m_Signal = 0;
151 };
152 
153 template <class TValue>
154 struct CPSG_WaitingQueue : SPSG_CV<deque<TValue>>
155 {
157 
158  void Push(TValue value)
159  {
160  if (m_Stopped) return;
161 
162  this->GetLock()->push_back(move(value));
163  this->NotifyOne();
164  }
165 
166  bool Pop(TValue& value, const CDeadline& deadline = CDeadline::eInfinite)
167  {
168  do {
169  if (auto locked = this->GetLock()) {
170  if (!locked->empty()) {
171  value = move(locked->front());
172  locked->pop_front();
173  return true;
174  }
175  }
176  }
177  while (this->WaitUntil(m_Stopped, deadline));
178 
179  return false;
180  }
181 
182  enum EStop { eDrain, eClear };
183  void Stop(EStop stop)
184  {
185  m_Stopped.store(true);
186  if (stop == eClear) this->GetLock()->clear();
187  this->NotifyAll();
188  }
189 
190  const atomic_bool& Stopped() const { return m_Stopped; }
191  bool Empty() const { return m_Stopped && this->GetLock()->empty(); }
192 
193 private:
194  atomic_bool m_Stopped;
195 };
196 
197 template <class TParam>
199 {
200  using TValue = typename TParam::TValueType;
201 
202  // Getting default incurs some performance penalty, so this ctor is explicit
205  explicit SPSG_ParamValue(function<TValue(TValue)> adjust) : SPSG_ParamValue( adjust(TParam::GetDefault())) {}
206 
207  operator TValue() const { return m_Value; }
208  TValue Get() const { return m_Value; }
209 
210  static TValue GetDefault() { return TParam::GetDefault(); }
211 
212  template <typename T>
213  static void SetDefault(const T& value)
214  {
215  // Forbid setting after it's already used
216  _ASSERT(!sm_Used);
217 
218  TParam::SetDefault(static_cast<TValue>(value));
219  }
220 
221  static void SetDefault(const string& value)
222  {
223  SetDefaultImpl(TParam(), value);
224  }
225 
226  // Overriding default but only if it's not configured explicitly
227  template <typename T>
228  static void SetImplicitDefault(const T& value)
229  {
230  bool sourcing_complete;
231  typename TParam::EParamSource param_source;
232  TParam::GetDefault();
233  TParam::GetState(&sourcing_complete, &param_source);
234 
235  if (sourcing_complete && (param_source == TParam::eSource_Default)) {
236  SetDefault(value);
237  }
238  }
239 
240 private:
242 
243  // TDescription is not publicly available in CParam, but it's needed for string to enum conversion.
244  // This templated method circumvents that shortcoming.
245  template <class TDescription>
246  static void SetDefaultImpl(const CParam<TDescription>&, const string& value)
247  {
248  SetDefault(CParam<TDescription>::TParamParser::StringToValue(value, TDescription::sm_ParamDescription));
249  }
250 
251  static TValue sm_Adjust(TValue value) { return value; }
252 
254  _DEBUG_ARG(static bool sm_Used);
255 };
256 
257 _DEBUG_ARG(template <class TParam> bool SPSG_ParamValue<TParam>::sm_Used = false);
258 
259 #define PSG_PARAM_VALUE_TYPE(section, name) SPSG_ParamValue<NCBI_PARAM_TYPE(section, name)>
260 
261 #define PSG_PARAM_VALUE_DEF_ADJUST(type, section, name, default_value) \
262  NCBI_PARAM_DEF(type, section, name, default_value); \
263  template <> \
264  typename SPSG_ParamValue<NCBI_PARAM_TYPE(section, name)>::TValue \
265  SPSG_ParamValue<NCBI_PARAM_TYPE(section, name)>::sm_Adjust(SPSG_ParamValue<NCBI_PARAM_TYPE(section, name)>::TValue value)
266 
267 #define PSG_PARAM_VALUE_DEF_MIN(type, section, name, default_value, min_value) \
268  PSG_PARAM_VALUE_DEF_ADJUST(type, section, name, default_value) \
269  { \
270  if (value >= min_value) return value; \
271  \
272  ERR_POST(Warning << "[" #section "] " #name " ('" << value << "')" \
273  " was increased to the minimum allowed value ('" #min_value "')"); \
274  return min_value; \
275  }
276 
277 NCBI_PARAM_DECL(unsigned, PSG, rd_buf_size);
278 typedef NCBI_PARAM_TYPE(PSG, rd_buf_size) TPSG_RdBufSize;
279 
280 NCBI_PARAM_DECL(size_t, PSG, wr_buf_size);
281 typedef NCBI_PARAM_TYPE(PSG, wr_buf_size) TPSG_WrBufSize;
282 
283 NCBI_PARAM_DECL(unsigned, PSG, max_concurrent_streams);
284 typedef NCBI_PARAM_TYPE(PSG, max_concurrent_streams) TPSG_MaxConcurrentStreams;
285 
286 NCBI_PARAM_DECL(unsigned, PSG, max_concurrent_submits);
287 using TPSG_MaxConcurrentSubmits = PSG_PARAM_VALUE_TYPE(PSG, max_concurrent_submits);
288 
289 NCBI_PARAM_DECL(unsigned, PSG, max_sessions);
290 typedef NCBI_PARAM_TYPE(PSG, max_sessions) TPSG_MaxSessions;
291 
292 NCBI_PARAM_DECL(unsigned, PSG, max_concurrent_requests_per_server);
293 using TPSG_MaxConcurrentRequestsPerServer = PSG_PARAM_VALUE_TYPE(PSG, max_concurrent_requests_per_server);
294 
295 NCBI_PARAM_DECL(unsigned, PSG, num_io);
296 typedef NCBI_PARAM_TYPE(PSG, num_io) TPSG_NumIo;
297 
298 NCBI_PARAM_DECL(unsigned, PSG, reader_timeout);
299 typedef NCBI_PARAM_TYPE(PSG, reader_timeout) TPSG_ReaderTimeout;
300 
301 NCBI_PARAM_DECL(double, PSG, rebalance_time);
302 typedef NCBI_PARAM_TYPE(PSG, rebalance_time) TPSG_RebalanceTime;
303 
304 NCBI_PARAM_DECL(double, PSG, io_timer_period);
305 using TPSG_IoTimerPeriod = PSG_PARAM_VALUE_TYPE(PSG, io_timer_period);
306 
307 NCBI_PARAM_DECL(double, PSG, request_timeout);
308 typedef NCBI_PARAM_TYPE(PSG, request_timeout) TPSG_RequestTimeout;
309 
310 NCBI_PARAM_DECL(double, PSG, competitive_after);
311 typedef NCBI_PARAM_TYPE(PSG, competitive_after) TPSG_CompetitiveAfter;
312 
313 NCBI_PARAM_DECL(size_t, PSG, requests_per_io);
314 using TPSG_RequestsPerIo = PSG_PARAM_VALUE_TYPE(PSG, requests_per_io);
315 
316 NCBI_PARAM_DECL(unsigned, PSG, request_retries);
317 using TPSG_RequestRetries = PSG_PARAM_VALUE_TYPE(PSG, request_retries);
318 
319 NCBI_PARAM_DECL(unsigned, PSG, refused_stream_retries);
320 using TPSG_RefusedStreamRetries = PSG_PARAM_VALUE_TYPE(PSG, refused_stream_retries);
321 
322 NCBI_PARAM_DECL(string, PSG, request_user_args);
323 typedef NCBI_PARAM_TYPE(PSG, request_user_args) TPSG_RequestUserArgs;
324 
325 NCBI_PARAM_DECL(bool, PSG, user_request_ids);
326 using TPSG_UserRequestIds = PSG_PARAM_VALUE_TYPE(PSG, user_request_ids);
327 
328 NCBI_PARAM_DECL(unsigned, PSG, localhost_preference);
329 typedef NCBI_PARAM_TYPE(PSG, localhost_preference) TPSG_LocalhostPreference;
330 
331 NCBI_PARAM_DECL(bool, PSG, fail_on_unknown_items);
332 typedef NCBI_PARAM_TYPE(PSG, fail_on_unknown_items) TPSG_FailOnUnknownItems;
333 
334 NCBI_PARAM_DECL(bool, PSG, fail_on_unknown_chunks);
335 typedef NCBI_PARAM_TYPE(PSG, fail_on_unknown_chunks) TPSG_FailOnUnknownChunks;
336 
337 NCBI_PARAM_DECL(bool, PSG, https);
338 typedef NCBI_PARAM_TYPE(PSG, https) TPSG_Https;
339 
340 NCBI_PARAM_DECL(double, PSG, no_servers_retry_delay);
341 typedef NCBI_PARAM_TYPE(PSG, no_servers_retry_delay) TPSG_NoServersRetryDelay;
342 
343 NCBI_PARAM_DECL(string, PSG, service);
344 using TPSG_Service = NCBI_PARAM_TYPE(PSG, service);
345 
346 NCBI_PARAM_DECL(string, PSG, auth_token_name);
347 using TPSG_AuthTokenName = PSG_PARAM_VALUE_TYPE(PSG, auth_token_name);
348 
349 NCBI_PARAM_DECL(string, PSG, auth_token);
350 using TPSG_AuthToken = PSG_PARAM_VALUE_TYPE(PSG, auth_token);
351 
352 NCBI_PARAM_DECL(bool, PSG, stats);
353 typedef NCBI_PARAM_TYPE(PSG, stats) TPSG_Stats;
354 
355 NCBI_PARAM_DECL(double, PSG, stats_period);
356 typedef NCBI_PARAM_TYPE(PSG, stats_period) TPSG_StatsPeriod;
357 
358 NCBI_PARAM_DECL(double, PSG, throttle_relaxation_period);
359 using TPSG_ThrottlePeriod = NCBI_PARAM_TYPE(PSG, throttle_relaxation_period);
360 
361 NCBI_PARAM_DECL(unsigned, PSG, throttle_by_consecutive_connection_failures);
362 using TPSG_ThrottleMaxFailures = PSG_PARAM_VALUE_TYPE(PSG, throttle_by_consecutive_connection_failures);
363 
364 NCBI_PARAM_DECL(bool, PSG, throttle_hold_until_active_in_lb);
365 using TPSG_ThrottleUntilDiscovery = PSG_PARAM_VALUE_TYPE(PSG, throttle_hold_until_active_in_lb);
366 
367 NCBI_PARAM_DECL(string, PSG, throttle_by_connection_error_rate);
368 using TPSG_ThrottleThreshold = NCBI_PARAM_TYPE(PSG, throttle_by_connection_error_rate);
369 
372 using TPSG_DebugPrintout = PSG_PARAM_VALUE_TYPE(PSG, debug_printout);
373 
374 enum class EPSG_UseCache { eDefault, eNo, eYes };
376 using TPSG_UseCache = PSG_PARAM_VALUE_TYPE(PSG, use_cache);
377 
378 // Performance reporting/request IDs for psg_client app
380 NCBI_PARAM_ENUM_DECL(EPSG_PsgClientMode, PSG, internal_psg_client_mode);
381 using TPSG_PsgClientMode = PSG_PARAM_VALUE_TYPE(PSG, internal_psg_client_mode);
382 
384 
385 #endif
#define false
Definition: bool.h:36
CDeadline.
Definition: ncbitime.hpp:1830
CParam.
Definition: ncbi_param.hpp:447
char value[7]
Definition: config.c:431
#define T(s)
Definition: common.h:230
#define _DEBUG_ARG(arg)
Definition: ncbidbg.hpp:134
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
bool IsInfinite(void) const
Check if the deadline is infinite.
Definition: ncbitime.hpp:1853
void GetExpirationTime(time_t *sec, unsigned int *nanosec) const
Get the number of seconds and nanoseconds (since 1/1/1970).
Definition: ncbitime.cpp:3841
@ eInfinite
Infinite deadline.
Definition: ncbitime.hpp:1845
NCBI_PARAM_TYPE(PSG, throttle_by_connection_error_rate) TPSG_ThrottleThreshold
Definition: misc.hpp:368
EPSG_UseCache
Definition: misc.hpp:374
NCBI_PARAM_ENUM_DECL(EPSG_DebugPrintout, PSG, debug_printout)
typedef NCBI_PARAM_TYPE(PSG, rd_buf_size) TPSG_RdBufSize
NCBI_PARAM_DECL(unsigned, PSG, rd_buf_size)
NCBI_PARAM_TYPE(PSG, service) TPSG_Service
Definition: misc.hpp:344
EPSG_DebugPrintout
Definition: misc.hpp:370
#define PSG_PARAM_VALUE_TYPE(section, name)
Definition: misc.hpp:259
EPSG_PsgClientMode
Definition: misc.hpp:379
NCBI_PARAM_TYPE(PSG, throttle_relaxation_period) TPSG_ThrottlePeriod
Definition: misc.hpp:359
chrono::system_clock::time_point time_point
unsigned int a
Definition: ncbi_localip.c:102
EIPRangeType t
Definition: ncbi_localip.c:101
Defines: CTimeFormat - storage class for time format.
T max(T x_, T y_)
atomic_bool m_Stopped
Definition: misc.hpp:194
const atomic_bool & Stopped() const
Definition: misc.hpp:190
bool Pop(TValue &value, const CDeadline &deadline=CDeadline::eInfinite)
Definition: misc.hpp:166
bool Empty() const
Definition: misc.hpp:191
void Stop(EStop stop)
Definition: misc.hpp:183
void Push(TValue value)
Definition: misc.hpp:158
Definition: misc.hpp:57
bool x_CvWait(unique_lock< mutex > &l)
Definition: misc.hpp:129
void NotifyOne() volatile
Definition: misc.hpp:59
bool x_WaitUntil(const volatile atomic< T > &a, const CDeadline &deadline, T v=false, bool rv=false)
Definition: misc.hpp:82
SPSG_CV & GetThis() volatile
Definition: misc.hpp:147
chrono::system_clock clock
Definition: misc.hpp:71
bool x_Wait(TArgs &&... args)
Definition: misc.hpp:114
void x_Signal()
Definition: misc.hpp:135
bool x_Reset()
Definition: misc.hpp:141
bool Reset() volatile
Definition: misc.hpp:68
void x_NotifyOne()
Definition: misc.hpp:73
static clock::time_point x_GetTP(const CDeadline &d)
Definition: misc.hpp:103
void NotifyAll() volatile
Definition: misc.hpp:60
bool x_CvWait(unique_lock< mutex > &l, const clock::time_point &t)
Definition: misc.hpp:124
condition_variable m_CV
Definition: misc.hpp:149
bool WaitUntil(TArgs &&... args) volatile
Definition: misc.hpp:63
bool x_WaitUntil(const CDeadline &deadline)
Definition: misc.hpp:76
int m_Signal
Definition: misc.hpp:150
void x_NotifyAll()
Definition: misc.hpp:74
static void SetDefault(const T &value)
Definition: misc.hpp:213
TValue Get() const
Definition: misc.hpp:208
TValue m_Value
Definition: misc.hpp:253
SPSG_ParamValue(TValue value)
Definition: misc.hpp:241
SPSG_ParamValue(function< TValue(TValue)> adjust)
Definition: misc.hpp:205
SPSG_ParamValue(EGetDefault)
Definition: misc.hpp:204
static TValue sm_Adjust(TValue value)
Definition: misc.hpp:251
static TValue GetDefault()
Definition: misc.hpp:210
static void SetDefault(const string &value)
Definition: misc.hpp:221
static void SetDefaultImpl(const CParam< TDescription > &, const string &value)
Definition: misc.hpp:246
static void SetImplicitDefault(const T &value)
Definition: misc.hpp:228
static bool sm_Used
Definition: misc.hpp:254
SLock< deque< TValue > > GetLock()
#define _ASSERT
Modified on Wed Nov 29 02:19:53 2023 by modify_doxy.py rev. 669887