NCBI C++ ToolKit
time_series_stat.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: time_series_stat.cpp 102864 2024-08-01 13:06:21Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description:
29  * PSG server request time series statistics
30  *
31  */
32 #include <ncbi_pch.hpp>
33 #include <corelib/ncbistd.hpp>
34 #include <cmath>
35 
36 #include "time_series_stat.hpp"
37 
38 
39 static string kActualTimeRangeStart("TimeRangeStart");
40 static string kActualTimeRangeEnd("TimeRangeEnd");
41 
42 
44  m_TotalMinutesCollected(1),
45  m_Loop(false),
46  m_CurrentIndex(0)
47 {}
48 
49 
51 CTimeSeriesBase::CheckToSkip(int most_ancient_time,
52  int most_recent_time,
53  ssize_t current_values_start_sec,
54  ssize_t current_values_end_sec) const
55 {
56  if (most_recent_time >= 0) {
57  // most_recent_time is defined
58  if (most_recent_time > current_values_end_sec) {
59  return EPSGS_SkipCheckResult::ePSGS_SkipBegin;
60  }
61  }
62 
63  if (most_ancient_time >= 0) {
64  // most_ancient_time is defined
65  if (most_ancient_time < current_values_start_sec) {
66  return EPSGS_SkipCheckResult::ePSGS_SkipEnd;
67  }
68  }
69 
70  return EPSGS_SkipCheckResult::ePSGS_DontSkip;
71 }
72 
73 
75 {
76  // Rotate should:
77  // - calculate avarage
78  // - store it in the current index cell
79  // - rotate the current index
80 
81  size_t current_index = m_CurrentIndex.load();
82 
83  if (m_AccumulatedCount == 0) {
84  m_Values[current_index] = 0.0;
85  } else {
86  m_Values[current_index] = double(m_Accumulated) / double(m_AccumulatedCount);
87  }
88  m_TotalValues += m_Values[current_index];
89  if (m_Values[current_index] > m_MaxValue) {
90  m_MaxValue = m_Values[current_index];
91  }
92 
93  m_Accumulated = 0;
95 
96  size_t new_current_index = m_CurrentIndex.load();
97  if (new_current_index == kSeriesIntervals - 1) {
98  new_current_index = 0;
99  } else {
100  ++new_current_index;
101  }
102 
103  m_Values[new_current_index] = 0.0;
104 
105  m_CurrentIndex.store(new_current_index);
107  if (new_current_index == 0) {
108  m_Loop = true;
109  }
110 }
111 
112 
114 {
115  for (size_t k = 0; k < kSeriesIntervals; ++k)
116  m_Values[k] = 0.0;
117  m_TotalValues = 0.0;
118  m_MaxValue = 0.0;
119 
120  m_Accumulated = 0;
121  m_AccumulatedCount = 0;
122 
123  m_CurrentIndex.store(0);
124  m_TotalMinutesCollected.store(1);
125  m_Loop = false;
126 }
127 
128 
129 CJsonNode
130 CMomentousCounterSeries::Serialize(const vector<pair<int, int>> & time_series,
131  int most_ancient_time,
132  int most_recent_time,
133  bool loop, size_t current_index) const
134 {
136 
137  ret.SetByKey("AverageValues",
138  x_SerializeOneSeries(time_series,
139  most_ancient_time, most_recent_time,
140  loop, current_index));
141  return ret;
142 }
143 
144 
145 CJsonNode
146 CMomentousCounterSeries::x_SerializeOneSeries(const vector<pair<int, int>> & time_series,
147  int most_ancient_time,
148  int most_recent_time,
149  bool loop,
150  size_t current_index) const
151 {
153 
154  if (current_index == 0 && loop == false) {
155  // There is no data collected yet
156  return ret;
157  }
158 
159  CJsonNode output_series(CJsonNode::NewArrayNode());
160 
161  // Index in the array where the data are collected
162  size_t raw_index;
163  if (current_index == 0) {
164  raw_index = kSeriesIntervals - 1;
165  loop = false; // to avoid going the second time over
166  } else {
167  raw_index = current_index - 1;
168  }
169 
170  size_t current_accumulated_mins = 0;
171  double current_accumulated_vals = 0;
172  size_t output_data_index = 0;
173  double total_processed_vals = 0.0;
174  double max_observed_val = 0.0;
175  size_t observed_minutes = 0;
176  double total_observed_vals = 0.0;
177 
178  // The current index in the 'time_series', i.e. a pair of
179  // <mins to accumulate>:<last sequential data index>
180  // It is guaranteed they are both > 0.
181  size_t range_index = 0;
182  size_t current_mins_to_accumulate = time_series[range_index].first;
183  size_t current_last_seq_index = time_series[range_index].second;
184 
185  ssize_t actual_range_start_sec = -1;
186  ssize_t actual_range_end_sec = -1;
187  ssize_t past_minute = -1;
188 
189  for ( ;; ) {
190  total_processed_vals += m_Values[raw_index];
191 
192  ++past_minute;
193  ssize_t current_values_start_sec = past_minute * 60;
194  ssize_t current_values_end_sec = current_values_start_sec + 59;
195 
196  auto skip_check = CheckToSkip(most_ancient_time, most_recent_time,
197  current_values_start_sec,
198  current_values_end_sec);
199  if (skip_check != EPSGS_SkipCheckResult::ePSGS_DontSkip) {
200  // Regardless how it was skipped (at the beginning or at the end)
201  // it needs to continue due to a necessity to calculate
202  // total_processed_vals
203  if (raw_index == 0)
204  break;
205  --raw_index;
206  continue;
207  }
208 
209  // Update the actual covered time frame
210  if (actual_range_start_sec < 0) {
211  actual_range_start_sec = current_values_start_sec;
212  }
213  actual_range_end_sec = current_values_end_sec;
214 
215 
216  double val = m_Values[raw_index];
217  if (val > max_observed_val) {
218  max_observed_val = val;
219  }
220  ++observed_minutes;
221  total_observed_vals += val;
222 
223  ++current_accumulated_mins;
224  current_accumulated_vals += val;
225 
226  if (current_accumulated_mins >= current_mins_to_accumulate) {
227  output_series.AppendDouble(double(current_accumulated_vals) /
228  double(current_accumulated_mins));
229  current_accumulated_mins = 0;
230  current_accumulated_vals = 0.0;
231  }
232 
233  ++output_data_index;
234  if (output_data_index > current_last_seq_index) {
235  ++range_index;
236  current_mins_to_accumulate = time_series[range_index].first;
237  current_last_seq_index = time_series[range_index].second;
238  }
239 
240  if (raw_index == 0)
241  break;
242  --raw_index;
243  }
244 
245  if (loop) {
246  raw_index = kSeriesIntervals - 1;
247  while (raw_index > current_index + 1) {
248  total_processed_vals += m_Values[raw_index];
249 
250  ++past_minute;
251  ssize_t current_values_start_sec = past_minute * 60;
252  ssize_t current_values_end_sec = current_values_start_sec + 59;
253  auto skip_check = CheckToSkip(most_ancient_time, most_recent_time,
254  current_values_start_sec,
255  current_values_end_sec);
256  if (skip_check != EPSGS_SkipCheckResult::ePSGS_DontSkip) {
257  // Regardless how it was skipped (at the beginning or at the end)
258  // it needs to continue due to a necessity to calculate
259  // total_processed_vals
260  --raw_index;
261  continue;
262  }
263 
264  // Update the actual covered time frame
265  if (actual_range_start_sec < 0) {
266  actual_range_start_sec = current_values_start_sec;
267  }
268  actual_range_end_sec = current_values_end_sec;
269 
270  double val = m_Values[raw_index];
271  if (val > max_observed_val) {
272  max_observed_val = val;
273  }
274  ++observed_minutes;
275  total_observed_vals += val;
276 
277  --raw_index;
278 
279  ++current_accumulated_mins;
280  current_accumulated_vals += val;
281 
282  if (current_accumulated_mins >= current_mins_to_accumulate) {
283  output_series.AppendDouble(double(current_accumulated_vals) /
284  double(current_accumulated_mins));
285  current_accumulated_mins = 0;
286  current_accumulated_vals = 0;
287  }
288 
289  ++output_data_index;
290  if (output_data_index > current_last_seq_index) {
291  ++range_index;
292  current_mins_to_accumulate = time_series[range_index].first;
293  current_last_seq_index = time_series[range_index].second;
294  }
295  }
296  }
297 
298  if (actual_range_start_sec == -1 && actual_range_end_sec == -1) {
299  // No data points were picked
300  return ret;
301  }
302 
303  ret.SetInteger(kActualTimeRangeStart, actual_range_start_sec);
304  ret.SetInteger(kActualTimeRangeEnd, actual_range_end_sec);
305 
306  if (current_accumulated_mins > 0) {
307  output_series.AppendDouble(double(current_accumulated_vals) /
308  double(current_accumulated_mins));
309  }
310 
311  if (loop && most_ancient_time == -1) {
312  // The current minute is not participating in the calculation
313  int64_t rest_mins = m_TotalMinutesCollected.load() - kSeriesIntervals - 1;
314  double rest_vals = m_TotalValues - total_processed_vals - m_Values[current_index];
315 
316  if (rest_mins > 0) {
317  ret.SetDouble("RestAvgPerSec", rest_vals / (rest_mins * 60.0));
318  }
319  }
320 
321  ret.SetDouble("MaxAllTimePerSec", m_MaxValue / 60.0);
322  ret.SetDouble("AvgAllTimePerSec", m_TotalValues /
323  ((m_TotalMinutesCollected.load() - 1) * 60.0));
324 
325  ret.SetDouble("MaxObservedPerSec", max_observed_val / 60.0);
326  ret.SetDouble("AvgObservedPerSec", total_observed_vals / (observed_minutes * 60.0));
327 
328  ret.SetByKey("time_series", output_series);
329  return ret;
330 }
331 
332 
334 {
335  // Rotate should:
336  // - store it in the current index cell
337  // - rotate the current index
338 
339  size_t current_index = m_CurrentIndex.load();
340 
341  m_IntervalSum[current_index] = m_Accumulated;
342  m_IntervalCOunt[current_index] = m_AccumulatedCount;
343 
344  m_Accumulated = 0;
345  m_AccumulatedCount = 0;
346 
347  size_t new_current_index = m_CurrentIndex.load();
348  if (new_current_index == kSeriesIntervals - 1) {
349  new_current_index = 0;
350  } else {
351  ++new_current_index;
352  }
353 
354  m_IntervalSum[new_current_index] = 0;
355  m_IntervalCOunt[new_current_index] = 0;
356 
357  m_CurrentIndex.store(new_current_index);
359  if (new_current_index == 0) {
360  m_Loop = true;
361  }
362 }
363 
364 
366 {
367  for (size_t k = 0; k < kSeriesIntervals; ++k) {
368  m_IntervalSum[k] = 0;
369  m_IntervalCOunt[k] = 0;
370  }
371 
372  m_Accumulated = 0;
373  m_AccumulatedCount = 0;
374 
377 
378  m_CurrentIndex.store(0);
379  m_TotalMinutesCollected.store(1);
380  m_Loop = false;
381 }
382 
383 
384 CJsonNode
385 CAvgPerformanceSeries::Serialize(const vector<pair<int, int>> & time_series,
386  int most_ancient_time,
387  int most_recent_time,
388  bool loop, size_t current_index) const
389 {
391 
392  ret.SetByKey("AverageValues",
393  x_SerializeOneSeries(time_series,
394  most_ancient_time, most_recent_time,
395  loop, current_index));
396  return ret;
397 }
398 
399 
400 CJsonNode
401 CAvgPerformanceSeries::x_SerializeOneSeries(const vector<pair<int, int>> & time_series,
402  int most_ancient_time,
403  int most_recent_time,
404  bool loop,
405  size_t current_index) const
406 {
408 
409  if (current_index == 0 && loop == false) {
410  // There is no data collected yet
411  return ret;
412  }
413 
414  CJsonNode output_series(CJsonNode::NewArrayNode());
415 
416  // Index in the array where the data are collected
417  size_t raw_index;
418  if (current_index == 0) {
419  raw_index = kSeriesIntervals - 1;
420  loop = false; // to avoid going the second time over
421  } else {
422  raw_index = current_index - 1;
423  }
424 
425  size_t current_accumulated_mins = 0;
426 
427  uint64_t current_accumulated_sums = 0;
428  size_t current_accumulated_count = 0;
429 
430  double total_max_avg = 0.0;
431  double total_min_avg = 0.0;
432  double observed_max_avg = 0.0;
433  double observed_min_avg = 0.0;
434 
435  size_t output_data_index = 0;
436 
437  // The current index in the 'time_series', i.e. a pair of
438  // <mins to accumulate>:<last sequential data index>
439  // It is guaranteed they are both > 0.
440  size_t range_index = 0;
441  size_t current_mins_to_accumulate = time_series[range_index].first;
442  size_t current_last_seq_index = time_series[range_index].second;
443 
444  ssize_t actual_range_start_sec = -1;
445  ssize_t actual_range_end_sec = -1;
446  ssize_t past_minute = -1;
447 
448  for ( ;; ) {
449  double current_avg = 0.0;
450  if (m_IntervalCOunt[raw_index] > 0) {
451  current_avg = (double)(m_IntervalSum[raw_index]) / (double)(m_IntervalCOunt[raw_index]);
452  }
453  if (current_avg > total_max_avg) {
454  total_max_avg = current_avg;
455  }
456  if (current_avg > 0.0) {
457  if (current_avg < total_min_avg || total_min_avg == 0.0) {
458  total_min_avg = current_avg;
459  }
460  }
461 
462  ++past_minute;
463  ssize_t current_values_start_sec = past_minute * 60;
464  ssize_t current_values_end_sec = current_values_start_sec + 59;
465 
466  auto skip_check = CheckToSkip(most_ancient_time, most_recent_time,
467  current_values_start_sec,
468  current_values_end_sec);
469  if (skip_check != EPSGS_SkipCheckResult::ePSGS_DontSkip) {
470  // Regardless how it was skipped (at the beginning or at the end)
471  // it needs to continue due to a necessity to calculate
472  // total_[min/max]_avg
473  if (raw_index == 0)
474  break;
475  --raw_index;
476  continue;
477  }
478 
479  // Update the actual covered time frame
480  if (actual_range_start_sec < 0) {
481  actual_range_start_sec = current_values_start_sec;
482  }
483  actual_range_end_sec = current_values_end_sec;
484 
485 
486  uint64_t val_sum = m_IntervalSum[raw_index];
487  size_t val_cnt = m_IntervalCOunt[raw_index];
488 
489  if (current_avg > observed_max_avg) {
490  observed_max_avg = current_avg;
491  }
492  if (current_avg > 0.0) {
493  if (current_avg < observed_min_avg || observed_min_avg == 0.0) {
494  observed_min_avg = current_avg;
495  }
496  }
497 
498  ++current_accumulated_mins;
499  current_accumulated_sums += val_sum;
500  current_accumulated_count += val_cnt;
501 
502  if (current_accumulated_mins >= current_mins_to_accumulate) {
503  if (current_accumulated_count > 0) {
504  output_series.AppendInteger(
505  lround(double(current_accumulated_sums) /
506  double(current_accumulated_count)));
507  } else {
508  output_series.AppendInteger(0);
509  }
510  current_accumulated_sums = 0;
511  current_accumulated_count = 0;
512  current_accumulated_mins = 0;
513  }
514 
515  ++output_data_index;
516  if (output_data_index > current_last_seq_index) {
517  ++range_index;
518  current_mins_to_accumulate = time_series[range_index].first;
519  current_last_seq_index = time_series[range_index].second;
520  }
521 
522  if (raw_index == 0)
523  break;
524  --raw_index;
525  }
526 
527  if (loop) {
528  raw_index = kSeriesIntervals - 1;
529  while (raw_index > current_index + 1) {
530  double current_avg = 0.0;
531  if (m_IntervalCOunt[raw_index] > 0) {
532  current_avg = (double)(m_IntervalSum[raw_index]) / (double)(m_IntervalCOunt[raw_index]);
533  }
534  if (current_avg > total_max_avg) {
535  total_max_avg = current_avg;
536  }
537  if (current_avg > 0.0) {
538  if (current_avg < total_min_avg || total_min_avg == 0.0) {
539  total_min_avg = current_avg;
540  }
541  }
542 
543  ++past_minute;
544  ssize_t current_values_start_sec = past_minute * 60;
545  ssize_t current_values_end_sec = current_values_start_sec + 59;
546  auto skip_check = CheckToSkip(most_ancient_time, most_recent_time,
547  current_values_start_sec,
548  current_values_end_sec);
549  if (skip_check != EPSGS_SkipCheckResult::ePSGS_DontSkip) {
550  // Regardless how it was skipped (at the beginning or at the end)
551  // it needs to continue due to a necessity to calculate
552  // total_processed_vals
553  --raw_index;
554  continue;
555  }
556 
557  // Update the actual covered time frame
558  if (actual_range_start_sec < 0) {
559  actual_range_start_sec = current_values_start_sec;
560  }
561  actual_range_end_sec = current_values_end_sec;
562 
563  uint64_t val_sum = m_IntervalSum[raw_index];
564  size_t val_cnt = m_IntervalCOunt[raw_index];
565 
566  if (current_avg > observed_max_avg) {
567  observed_max_avg = current_avg;
568  }
569  if (current_avg > 0.0) {
570  if (current_avg < observed_min_avg || observed_min_avg == 0.0) {
571  observed_min_avg = current_avg;
572  }
573  }
574 
575  --raw_index;
576 
577  ++current_accumulated_mins;
578  current_accumulated_sums += val_sum;
579  current_accumulated_count += val_cnt;
580 
581  if (current_accumulated_mins >= current_mins_to_accumulate) {
582  if (current_accumulated_count > 0) {
583  output_series.AppendInteger(
584  lround(double(current_accumulated_sums) /
585  double(current_accumulated_count)));
586  } else {
587  output_series.AppendInteger(0);
588  }
589  current_accumulated_sums = 0;
590  current_accumulated_count = 0;
591  current_accumulated_mins = 0;
592  }
593 
594  ++output_data_index;
595  if (output_data_index > current_last_seq_index) {
596  ++range_index;
597  current_mins_to_accumulate = time_series[range_index].first;
598  current_last_seq_index = time_series[range_index].second;
599  }
600  }
601  }
602 
603  if (actual_range_start_sec == -1 && actual_range_end_sec == -1) {
604  // No data points were picked
605  return ret;
606  }
607 
608  ret.SetInteger(kActualTimeRangeStart, actual_range_start_sec);
609  ret.SetInteger(kActualTimeRangeEnd, actual_range_end_sec);
610 
611  if (current_accumulated_mins > 0) {
612  if (current_accumulated_count > 0) {
613  output_series.AppendInteger(
614  lround(double(current_accumulated_sums) /
615  double(current_accumulated_count)));
616  } else {
617  output_series.AppendInteger(0);
618  }
619  }
620 
621  ret.SetString("Description", "The time_series array contains "
622  "average mks required to complete an operation");
623  ret.SetInteger("AllTimeMaxAvg", lround(total_max_avg));
624  ret.SetInteger("AllTimeMinAvg", lround(total_min_avg));
625  ret.SetInteger("ObservedMaxAvg", lround(observed_max_avg));
626  ret.SetInteger("ObservedMinAvg", lround(observed_min_avg));
627  ret.SetInteger("AllTimeAbsoluteMax", m_AllTimeAbsoluteMaxMks);
628  ret.SetInteger("AllTimeAbsoluteMin", m_AllTimeAbsoluteMinMks);
629 
630  ret.SetByKey("time_series", output_series);
631  return ret;
632 }
633 
634 
636 {
637  size_t current_index = m_CurrentIndex.load();
638  ++m_Values[current_index];
639  ++m_TotalValues;
640 
641  if (m_Values[current_index] > m_MaxValue) {
642  m_MaxValue = m_Values[current_index];
643  }
644 }
645 
646 
648 {
649  size_t new_current_index = m_CurrentIndex.load();
650  if (new_current_index == kSeriesIntervals - 1) {
651  new_current_index = 0;
652  } else {
653  ++new_current_index;
654  }
655 
656  m_Values[new_current_index] = 0;
657 
658  m_CurrentIndex.store(new_current_index);
660  if (new_current_index == 0) {
661  m_Loop = true;
662  }
663 }
664 
665 
667 {
668  memset(m_Values, 0, sizeof(m_Values));
669  m_TotalValues = 0;
670  m_MaxValue = 0;
671 
672  m_CurrentIndex.store(0);
673  m_TotalMinutesCollected.store(1);
674  m_Loop = false;
675 }
676 
677 
678 CJsonNode
679 CMonotonicCounterSeries::Serialize(const vector<pair<int, int>> & time_series,
680  int most_ancient_time,
681  int most_recent_time,
682  bool loop, size_t current_index) const
683 {
685 
686  ret.SetByKey("Values",
688  time_series,
689  most_ancient_time, most_recent_time,
690  loop, current_index));
691  return ret;
692 }
693 
694 
695 CJsonNode
697  uint64_t grand_total,
698  const vector<pair<int, int>> & time_series,
699  int most_ancient_time,
700  int most_recent_time,
701  bool loop,
702  size_t current_index) const
703 {
705 
706  if (current_index == 0 && loop == false) {
707  // There is no data collected yet
708  return ret;
709  }
710 
711  CJsonNode output_series(CJsonNode::NewArrayNode());
712 
713  // Index in the array where the data are collected
714  size_t raw_index;
715  if (current_index == 0) {
716  raw_index = kSeriesIntervals - 1;
717  loop = false; // to avoid going the second time over
718  } else {
719  raw_index = current_index - 1;
720  }
721 
722  size_t current_accumulated_mins = 0;
723  uint64_t current_accumulated_vals = 0;
724  size_t output_data_index = 0;
725  uint64_t total_processed_vals = 0;
726  uint64_t max_observed_val = 0;
727  size_t observed_minutes = 0;
728  uint64_t total_observed_vals = 0;
729 
730  // The current index in the 'time_series', i.e. a pair of
731  // <mins to accumulate>:<last sequential data index>
732  // It is guaranteed they are both > 0.
733  size_t range_index = 0;
734  size_t current_mins_to_accumulate = time_series[range_index].first;
735  size_t current_last_seq_index = time_series[range_index].second;
736 
737  ssize_t actual_range_start_sec = -1;
738  ssize_t actual_range_end_sec = -1;
739  ssize_t past_minute = -1;
740 
741  for ( ;; ) {
742  total_processed_vals += values[raw_index];
743 
744  ++past_minute;
745  ssize_t current_values_start_sec = past_minute * 60;
746  ssize_t current_values_end_sec = current_values_start_sec + 59;
747 
748  auto skip_check = CheckToSkip(most_ancient_time, most_recent_time,
749  current_values_start_sec,
750  current_values_end_sec);
751  if (skip_check != EPSGS_SkipCheckResult::ePSGS_DontSkip) {
752  // Regardless how it was skipped (at the beginning or at the end)
753  // it needs to continue due to a necessity to calculate
754  // total_processed_vals
755  if (raw_index == 0)
756  break;
757  --raw_index;
758  continue;
759  }
760 
761  // Update the actual covered time frame
762  if (actual_range_start_sec < 0) {
763  actual_range_start_sec = current_values_start_sec;
764  }
765  actual_range_end_sec = current_values_end_sec;
766 
767 
768  uint64_t vals = values[raw_index];
769  if (vals > max_observed_val) {
770  max_observed_val = vals;
771  }
772  ++observed_minutes;
773  total_observed_vals += vals;
774 
775  ++current_accumulated_mins;
776  current_accumulated_vals += vals;
777 
778  if (current_accumulated_mins >= current_mins_to_accumulate) {
779  output_series.AppendInteger(current_accumulated_vals);
780  current_accumulated_mins = 0;
781  current_accumulated_vals = 0;
782  }
783 
784  ++output_data_index;
785  if (output_data_index > current_last_seq_index) {
786  ++range_index;
787  current_mins_to_accumulate = time_series[range_index].first;
788  current_last_seq_index = time_series[range_index].second;
789  }
790 
791  if (raw_index == 0)
792  break;
793  --raw_index;
794  }
795 
796  if (loop) {
797  raw_index = kSeriesIntervals - 1;
798  while (raw_index > current_index + 1) {
799  total_processed_vals += values[raw_index];
800 
801  ++past_minute;
802  ssize_t current_values_start_sec = past_minute * 60;
803  ssize_t current_values_end_sec = current_values_start_sec + 59;
804  auto skip_check = CheckToSkip(most_ancient_time, most_recent_time,
805  current_values_start_sec,
806  current_values_end_sec);
807  if (skip_check != EPSGS_SkipCheckResult::ePSGS_DontSkip) {
808  // Regardless how it was skipped (at the beginning or at the end)
809  // it needs to continue due to a necessity to calculate
810  // total_processed_vals
811  --raw_index;
812  continue;
813  }
814 
815  // Update the actual covered time frame
816  if (actual_range_start_sec < 0) {
817  actual_range_start_sec = current_values_start_sec;
818  }
819  actual_range_end_sec = current_values_end_sec;
820 
821  uint64_t vals = values[raw_index];
822  if (vals > max_observed_val) {
823  max_observed_val = vals;
824  }
825  ++observed_minutes;
826  total_observed_vals += vals;
827 
828  --raw_index;
829 
830  ++current_accumulated_mins;
831  current_accumulated_vals += vals;
832 
833  if (current_accumulated_mins >= current_mins_to_accumulate) {
834  output_series.AppendInteger(current_accumulated_vals);
835  current_accumulated_mins = 0;
836  current_accumulated_vals = 0;
837  }
838 
839  ++output_data_index;
840  if (output_data_index > current_last_seq_index) {
841  ++range_index;
842  current_mins_to_accumulate = time_series[range_index].first;
843  current_last_seq_index = time_series[range_index].second;
844  }
845  }
846  }
847 
848  if (actual_range_start_sec == -1 && actual_range_end_sec == -1) {
849  // No data points were picked
850  return ret;
851  }
852 
853  ret.SetInteger(kActualTimeRangeStart, actual_range_start_sec);
854  ret.SetInteger(kActualTimeRangeEnd, actual_range_end_sec);
855 
856  if (current_accumulated_mins > 0) {
857  output_series.AppendInteger(current_accumulated_vals);
858  }
859 
860  if (loop && most_ancient_time == -1) {
861  // The current minute and the last minute in case of a loop are not
862  // sent to avoid unreliable data
863  uint64_t rest_mins = m_TotalMinutesCollected.load() - kSeriesIntervals - 1;
864  uint64_t rest_vals = grand_total - total_processed_vals - values[current_index];
865 
866  if (rest_mins > 0) {
867  ret.SetDouble("RestAvgPerSec", rest_vals / (rest_mins * 60.0));
868  }
869  }
870 
871  ret.SetDouble("MaxAllTimePerSec", m_MaxValue / 60.0);
872  ret.SetDouble("AvgAllTimePerSec", grand_total /
873  ((m_TotalMinutesCollected.load() - 1) * 60.0));
874 
875  ret.SetDouble("MaxObservedPerSec", max_observed_val / 60.0);
876  ret.SetDouble("AvgObservedPerSec", total_observed_vals / (observed_minutes * 60.0));
877 
878  ret.SetInteger("ValObserved", total_observed_vals);
879  ret.SetInteger("ValAllTime", grand_total - values[current_index]);
880 
881  ret.SetByKey("time_series", output_series);
882  return ret;
883 }
884 
885 
887 {
888  Reset();
889 }
890 
891 
893 {
894  size_t current_index = m_CurrentIndex.load();
895  ++m_Requests[current_index];
896  ++m_TotalRequests;
897 
898  if (m_Requests[current_index] > m_MaxValue) {
899  m_MaxValue = m_Requests[current_index];
900  }
901 }
902 
903 
905 {
906  size_t new_current_index = m_CurrentIndex.load();
907  if (new_current_index == kSeriesIntervals - 1) {
908  new_current_index = 0;
909  } else {
910  ++new_current_index;
911  }
912 
913  m_Requests[new_current_index] = 0;
914 
915  m_CurrentIndex.store(new_current_index);
917  if (new_current_index == 0) {
918  m_Loop = true;
919  }
920 }
921 
922 
924 {
925  memset(m_Requests, 0, sizeof(m_Requests));
926  m_TotalRequests = 0;
927  m_MaxValue = 0;
928 
929  m_CurrentIndex.store(0);
930  m_TotalMinutesCollected.store(1);
931  m_Loop = false;
932 }
933 
934 
935 CJsonNode
936 CProcessorRequestTimeSeries::Serialize(const vector<pair<int, int>> & time_series,
937  int most_ancient_time, int most_recent_time,
938  bool loop, size_t current_index) const
939 {
941 
942  ret.SetByKey("Requests",
944  time_series, most_ancient_time,
945  most_recent_time, loop, current_index));
946  return ret;
947 }
948 
949 
950 CJsonNode
952  uint64_t grand_total,
953  uint64_t grand_max,
954  const vector<pair<int, int>> & time_series,
955  int most_ancient_time,
956  int most_recent_time,
957  bool loop,
958  size_t current_index) const
959 {
961 
962  if (current_index == 0 && loop == false) {
963  // There is no data collected yet
964  return ret;
965  }
966 
967  CJsonNode output_series(CJsonNode::NewArrayNode());
968 
969  // Needed to calculate max and average reqs/sec
970  uint64_t max_observed_val = 0;
971  uint64_t total_observed_vals = 0;
972  uint64_t observed_minutes = 0;
973 
974  // Index in the array where the data are collected
975  size_t raw_index;
976  if (current_index == 0) {
977  raw_index = kSeriesIntervals - 1;
978  loop = false; // to avoid going the second time over
979  } else {
980  raw_index = current_index - 1;
981  }
982 
983  size_t current_accumulated_mins = 0;
984  uint64_t current_accumulated_reqs = 0;
985  size_t output_data_index = 0;
986  uint64_t total_processed_vals = 0;
987 
988  // The current index in the 'time_series', i.e. a pair of
989  // <mins to accumulate>:<last sequential data index>
990  // It is guaranteed they are both > 0.
991  size_t range_index = 0;
992  size_t current_mins_to_accumulate = time_series[range_index].first;
993  size_t current_last_seq_index = time_series[range_index].second;
994 
995 
996  ssize_t actual_range_start_sec = -1;
997  ssize_t actual_range_end_sec = -1;
998  ssize_t past_minute = -1;
999 
1000  for ( ;; ) {
1001  total_processed_vals += values[raw_index];
1002 
1003  ++past_minute;
1004  ssize_t current_values_start_sec = past_minute * 60;
1005  ssize_t current_values_end_sec = current_values_start_sec + 59;
1006 
1007  auto skip_check = CheckToSkip(most_ancient_time, most_recent_time,
1008  current_values_start_sec,
1009  current_values_end_sec);
1010  if (skip_check != EPSGS_SkipCheckResult::ePSGS_DontSkip) {
1011  // Regardless how it was skipped (at the beginning or at the end)
1012  // it needs to continue due to a necessity to calculate
1013  // total_processed_vals
1014  if (raw_index == 0)
1015  break;
1016  --raw_index;
1017  continue;
1018  }
1019 
1020  // Update the actual covered time frame
1021  if (actual_range_start_sec < 0) {
1022  actual_range_start_sec = current_values_start_sec;
1023  }
1024  actual_range_end_sec = current_values_end_sec;
1025 
1026 
1027  uint64_t reqs = values[raw_index];
1028  if (reqs > max_observed_val) {
1029  max_observed_val = reqs;
1030  }
1031  ++observed_minutes;
1032  total_observed_vals += reqs;
1033 
1034  ++current_accumulated_mins;
1035  current_accumulated_reqs += reqs;
1036 
1037  if (current_accumulated_mins >= current_mins_to_accumulate) {
1038  output_series.AppendDouble(double(current_accumulated_reqs) /
1039  (double(current_accumulated_mins) * 60.0));
1040  current_accumulated_mins = 0;
1041  current_accumulated_reqs = 0;
1042  }
1043 
1044  ++output_data_index;
1045  if (output_data_index > current_last_seq_index) {
1046  ++range_index;
1047  current_mins_to_accumulate = time_series[range_index].first;
1048  current_last_seq_index = time_series[range_index].second;
1049  }
1050 
1051  if (raw_index == 0)
1052  break;
1053  --raw_index;
1054  }
1055 
1056  if (loop) {
1057  raw_index = kSeriesIntervals - 1;
1058  while (raw_index > current_index + 1) {
1059  total_processed_vals += values[raw_index];
1060 
1061  ++past_minute;
1062  ssize_t current_values_start_sec = past_minute * 60;
1063  ssize_t current_values_end_sec = current_values_start_sec + 59;
1064  auto skip_check = CheckToSkip(most_ancient_time, most_recent_time,
1065  current_values_start_sec,
1066  current_values_end_sec);
1067  if (skip_check != EPSGS_SkipCheckResult::ePSGS_DontSkip) {
1068  // Regardless how it was skipped (at the beginning or at the end)
1069  // it needs to continue due to a necessity to calculate
1070  // total_processed_vals
1071  --raw_index;
1072  continue;
1073  }
1074 
1075  // Update the actual covered time frame
1076  if (actual_range_start_sec < 0) {
1077  actual_range_start_sec = current_values_start_sec;
1078  }
1079  actual_range_end_sec = current_values_end_sec;
1080 
1081  uint64_t reqs = values[raw_index];
1082  --raw_index;
1083 
1084  if (reqs > max_observed_val) {
1085  max_observed_val = reqs;
1086  }
1087  ++observed_minutes;
1088  total_observed_vals += reqs;
1089 
1090  ++current_accumulated_mins;
1091  current_accumulated_reqs += reqs;
1092 
1093  if (current_accumulated_mins >= current_mins_to_accumulate) {
1094  output_series.AppendDouble(double(current_accumulated_reqs) /
1095  (double(current_accumulated_mins) * 60.0));
1096  current_accumulated_mins = 0;
1097  current_accumulated_reqs = 0;
1098  }
1099 
1100  ++output_data_index;
1101  if (output_data_index > current_last_seq_index) {
1102  ++range_index;
1103  current_mins_to_accumulate = time_series[range_index].first;
1104  current_last_seq_index = time_series[range_index].second;
1105  }
1106  }
1107  }
1108 
1109  if (actual_range_start_sec == -1 && actual_range_end_sec == -1) {
1110  // No data points were picked
1111  return ret;
1112  }
1113 
1114  ret.SetInteger(kActualTimeRangeStart, actual_range_start_sec);
1115  ret.SetInteger(kActualTimeRangeEnd, actual_range_end_sec);
1116 
1117  if (current_accumulated_mins > 0) {
1118  output_series.AppendDouble(double(current_accumulated_reqs) /
1119  (double(current_accumulated_mins) * 60.0));
1120  }
1121 
1122  if (loop && most_ancient_time == -1) {
1123  // The current minute is not participating in the calculation
1124  uint64_t rest_mins = m_TotalMinutesCollected.load() - kSeriesIntervals - 1;
1125  uint64_t rest_reqs = grand_total - total_processed_vals - values[current_index];
1126 
1127  if (rest_mins > 0) {
1128  ret.SetDouble("RestAvgPerSec", rest_reqs / (rest_mins * 60.0));
1129  }
1130  }
1131 
1132  ret.SetDouble("MaxAllTimePerSec", grand_max / 60.0);
1133  ret.SetDouble("AvgAllTimePerSec", grand_total /
1134  ((m_TotalMinutesCollected.load() - 1) * 60.0));
1135 
1136  ret.SetDouble("MaxObservedPerSec", max_observed_val / 60.0);
1137  ret.SetDouble("AvgObservedPerSec", total_observed_vals / (observed_minutes * 60.0));
1138 
1139  ret.SetInteger("ValObserved", total_observed_vals);
1140  ret.SetInteger("ValAllTime", grand_total - values[current_index]);
1141 
1142  ret.SetByKey("time_series", output_series);
1143  return ret;
1144 }
1145 
1146 
1147 
1148 
1149 // Converts a request status to the counter in the time series
1150 // The logic matches the logic in GRID dashboard
1153 {
1154  if (status == CRequestStatus::e404_NotFound)
1155  return eNotFound;
1156 
1158  return eError;
1159 
1160  if (status >= CRequestStatus::e400_BadRequest)
1161  return eWarning;
1162 
1163  return eRequest;
1164 }
1165 
1166 
1168 {
1169  Reset();
1170 }
1171 
1172 
1174 {
1175  size_t current_index = m_CurrentIndex.load();
1176  switch (counter) {
1177  case eRequest:
1178  ++m_Requests[current_index];
1179  ++m_TotalRequests;
1180  if (m_Requests[current_index] > m_MaxValue) {
1181  m_MaxValue = m_Requests[current_index];
1182  }
1183  break;
1184  case eError:
1185  ++m_Errors[current_index];
1186  ++m_TotalErrors;
1187  if (m_Errors[current_index] > m_MaxErrors) {
1188  m_MaxErrors = m_Errors[current_index];
1189  }
1190  break;
1191  case eWarning:
1192  ++m_Warnings[current_index];
1193  ++m_TotalWarnings;
1194  if (m_Warnings[current_index] > m_MaxWarnings) {
1195  m_MaxWarnings = m_Warnings[current_index];
1196  }
1197  break;
1198  case eNotFound:
1199  ++m_NotFound[current_index];
1200  ++m_TotalNotFound;
1201  if (m_NotFound[current_index] > m_MaxNotFound) {
1202  m_MaxNotFound = m_NotFound[current_index];
1203  }
1204  break;
1205  default:
1206  break;
1207  }
1208 }
1209 
1210 
1212 {
1213  size_t new_current_index = m_CurrentIndex.load();
1214  if (new_current_index == kSeriesIntervals - 1) {
1215  new_current_index = 0;
1216  } else {
1217  ++new_current_index;
1218  }
1219 
1220  m_Requests[new_current_index] = 0;
1221  m_Errors[new_current_index] = 0;
1222  m_Warnings[new_current_index] = 0;
1223  m_NotFound[new_current_index] = 0;
1224 
1225  m_CurrentIndex.store(new_current_index);
1227  if (new_current_index == 0) {
1228  m_Loop = true;
1229  }
1230 }
1231 
1232 
1234 {
1235  memset(m_Requests, 0, sizeof(m_Requests));
1236  m_TotalRequests = 0;
1237  m_MaxValue = 0;
1238  memset(m_Errors, 0, sizeof(m_Errors));
1239  m_TotalErrors = 0;
1240  m_MaxErrors = 0;
1241  memset(m_Warnings, 0, sizeof(m_Warnings));
1242  m_TotalWarnings = 0;
1243  m_MaxWarnings = 0;
1244  memset(m_NotFound, 0, sizeof(m_NotFound));
1245  m_TotalNotFound = 0;
1246  m_MaxNotFound = 0;
1247 
1248  m_CurrentIndex.store(0);
1249  m_TotalMinutesCollected.store(1);
1250  m_Loop = false;
1251 }
1252 
1253 
1254 CJsonNode
1255 CRequestTimeSeries::Serialize(const vector<pair<int, int>> & time_series,
1256  int most_ancient_time, int most_recent_time,
1257  bool loop, size_t current_index) const
1258 {
1260 
1261  ret.SetByKey("Requests",
1263  time_series,
1264  most_ancient_time, most_recent_time,
1265  loop, current_index));
1266  ret.SetByKey("Errors",
1268  time_series,
1269  most_ancient_time, most_recent_time,
1270  loop, current_index));
1271  ret.SetByKey("Warnings",
1273  time_series,
1274  most_ancient_time, most_recent_time,
1275  loop, current_index));
1276  ret.SetByKey("NotFound",
1278  time_series,
1279  most_ancient_time, most_recent_time,
1280  loop, current_index));
1281  return ret;
1282 }
1283 
uint64_t m_IntervalSum[kSeriesIntervals]
CJsonNode Serialize(const vector< pair< int, int >> &time_series, int most_ancient_time, int most_recent_time, bool loop, size_t current_index) const
size_t m_IntervalCOunt[kSeriesIntervals]
CJsonNode x_SerializeOneSeries(const vector< pair< int, int >> &time_series, int most_ancient_time, int most_recent_time, bool loop, size_t current_index) const
JSON node abstraction.
static CJsonNode NewArrayNode()
Create a new JSON array node.
void SetDouble(const string &key, double value)
Set a JSON object element to the specified floating point value.
void SetString(const string &key, const string &value)
Set a JSON object element to the specified string value.
void AppendInteger(Int8 value)
For an array node, add a integer node at the end of the array.
void SetInteger(const string &key, Int8 value)
Set a JSON object element to the specified integer value.
void SetByKey(const string &key, CJsonNode::TInstance value)
For a JSON object node, insert a new element or update an existing element.
void AppendDouble(double value)
For an array node, add a floating point node at the end of the array.
static CJsonNode NewObjectNode()
Create a new JSON object node.
CJsonNode x_SerializeOneSeries(const vector< pair< int, int >> &time_series, int most_ancient_time, int most_recent_time, bool loop, size_t current_index) const
CJsonNode Serialize(const vector< pair< int, int >> &time_series, int most_ancient_time, int most_recent_time, bool loop, size_t current_index) const
uint64_t m_Values[kSeriesIntervals]
CJsonNode Serialize(const vector< pair< int, int >> &time_series, int most_ancient_time, int most_recent_time, bool loop, size_t current_index) const
CJsonNode x_SerializeOneSeries(const uint64_t *values, uint64_t grand_total, const vector< pair< int, int >> &time_series, int most_ancient_time, int most_recent_time, bool loop, size_t current_index) const
uint64_t m_Values[kSeriesIntervals]
CJsonNode x_SerializeOneSeries(const uint64_t *values, uint64_t grand_total, uint64_t grand_max, const vector< pair< int, int >> &time_series, int most_ancient_time, int most_recent_time, bool loop, size_t current_index) const
uint64_t m_Requests[kSeriesIntervals]
CJsonNode Serialize(const vector< pair< int, int >> &time_series, int most_ancient_time, int most_recent_time, bool loop, size_t current_index) const
CJsonNode Serialize(const vector< pair< int, int >> &time_series, int most_ancient_time, int most_recent_time, bool loop, size_t current_index) const
static EPSGSCounter RequestStatusToCounter(CRequestStatus::ECode status)
uint64_t m_Warnings[kSeriesIntervals]
uint64_t m_NotFound[kSeriesIntervals]
uint64_t m_Errors[kSeriesIntervals]
atomic_uint_fast64_t m_CurrentIndex
atomic_uint_fast64_t m_TotalMinutesCollected
EPSGS_SkipCheckResult CheckToSkip(int most_ancient_time, int most_recent_time, ssize_t current_values_start_sec, ssize_t current_values_end_sec) const
Include a standard set of the NCBI C++ Toolkit most basic headers.
#define false
Definition: bool.h:36
Uint8 uint64_t
Int8 int64_t
int ssize_t
Definition: ncbiconf_msvc.h:93
static string kActualTimeRangeStart("TimeRangeStart")
static string kActualTimeRangeEnd("TimeRangeEnd")
static constexpr size_t kSeriesIntervals
Modified on Fri Sep 20 14:57:03 2024 by modify_doxy.py rev. 669887