NCBI C++ ToolKit
mpmc_w.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: mpmc_w.hpp 103120 2024-09-11 14:33:06Z saprykin $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitri Dmitrienko
27  *
28  * File Description:
29  *
30  * defines MPMC (multiple producers multiple consumers) queue class
31  * inspired by another no-lock queue implementation
32  * published at http://www.1024cores.net
33  *
34  */
35 
36 #ifndef _MPMC_W_HPP_
37 #define _MPMC_W_HPP_
38 
39 #include <atomic>
40 #include <cstddef>
43 
46 
47 template<typename T, size_t SZ>
49 public:
50  static constexpr const size_t kCpuCacheLineSz = 64;
51  static constexpr const size_t kPadSz = (-sizeof(std::atomic<size_t>) % kCpuCacheLineSz);
52  using cacheline_pad_t = char[kPadSz];
53  static constexpr const size_t kPadDataSz = ((-sizeof(T) - sizeof(std::atomic<size_t>)) % kCpuCacheLineSz);
55 
56  struct cell_t {
57  std::atomic<size_t> m_sequence;
60  };
61 
63  cell_t m_buffer[SZ];
65  std::atomic<size_t> m_push_pos;
67  std::atomic<size_t> m_pop_pos;
69 
72  atomic<size_t> m_size;
73 
74 public:
76  static_assert ((SZ & (SZ - 1)) == 0, "SZ template parameter value must be power of two");
77  static_assert (sizeof(intptr_t) == sizeof(size_t), "All of sudden size_t is of different size than intptr_t, you've to update sources");
78  clear();
79  }
84 
85  void clear() {
86  for (size_t i = 0; i < SZ; ++i)
87  m_buffer[i].m_sequence.store(i, std::memory_order_relaxed);
88  m_push_pos.store(0, std::memory_order_relaxed);
89  m_pop_pos.store(0, std::memory_order_relaxed);
90  }
91 
92  // advisory only (e.g. for monitoring)
93  size_t size() const {
94  return m_size.load();
95  }
96 
97  template<class TT>
98  bool push(TT&& data) {
99  static_assert(!is_const<typename remove_reference<TT>::type>::value, "argument for this method can not be const");
100  static_assert(is_same<typename remove_reference<TT>::type, typename remove_reference<T>::type>::value, "type of the argument must match the template class type");
101  cell_t* cell;
102  size_t pos = m_push_pos.load(std::memory_order_relaxed);
103  for (;;) {
104  cell = &m_buffer[pos & (SZ - 1)];
105  size_t seq = cell->m_sequence.load(std::memory_order_acquire);
106  intptr_t diff = (intptr_t)seq - (intptr_t)pos;
107  if (diff == 0) {
108  if (m_push_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
109  break;
110  }
111  else if (diff < 0)
112  return false;
113  else
114  pos = m_push_pos.load(std::memory_order_relaxed);
115  }
116  cell->m_data = std::move(data); // there is a "move", not "forward" and it is on purpose of grabbing original data for non-RValue references too
117  cell->m_sequence.store(pos + 1, std::memory_order_release);
118  ++m_size;
119  return true;
120  }
121 
122  template<class TT>
123  bool push_wait(TT&& data, int64_t timeoutmks) {
124  bool rv = false;
125  int prev_val = m_readypush_ev.Value();
126  rv = push(std::forward<TT>(data));
127  if (rv)
128  m_readypop_ev.Inc();
129  else if (timeoutmks > 0) {
130  CFutex::EWaitResult wr = m_readypush_ev.WaitWhile(prev_val, timeoutmks);
131  if (wr == CFutex::eWaitResultOk || wr == CFutex::eWaitResultOkFast)
132  rv = push(data);
133  }
134  return rv;
135  }
136 
137  template<class TT>
138  void push_wait(TT&& data) {
139  bool rslt = false;
140  while (!rslt) {
141  int prev_val = m_readypush_ev.Value();
142  rslt = push(std::forward<TT>(data));
143  if (rslt) {
144  m_readypop_ev.Inc();
145  break;
146  }
147  CFutex::EWaitResult wr = m_readypush_ev.WaitWhile(prev_val);
148  if (wr == CFutex::eWaitResultOk || wr == CFutex::eWaitResultOkFast)
149  rslt = push(std::forward<TT>(data));
150  };
151  }
152 
153  bool pop(T* data) {
154  cell_t* cell;
155  size_t pos = m_pop_pos.load(std::memory_order_relaxed);
156  for (;;) {
157  cell = &m_buffer[pos & (SZ - 1)];
158  size_t seq = cell->m_sequence.load(std::memory_order_acquire);
159  intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
160  if (diff == 0) {
161  if (m_pop_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
162  break;
163  }
164  else if (diff < 0)
165  return false;
166  else
167  pos = m_pop_pos.load(std::memory_order_relaxed);
168  }
169  *data = std::move(cell->m_data);
170  --m_size;
171  cell->m_sequence.store(pos + SZ, std::memory_order_release);
172  return true;
173  }
174 
175  bool pop_wait(T* data, int64_t timeoutmks) {
176  bool rv = false;
177  int prev_val = m_readypop_ev.Value();
178  rv = pop(data);
179  if (rv)
180  m_readypush_ev.Inc();
181  else if (timeoutmks > 0) {
182  CFutex::EWaitResult wr = m_readypop_ev.WaitWhile(prev_val, timeoutmks);
183  if (wr == CFutex::eWaitResultOk || wr == CFutex::eWaitResultOkFast)
184  rv = pop(data);
185  }
186  return rv;
187  }
188 
189  void pop_wait(T* data) {
190  bool rslt = false;
191  while (!rslt) {
192  int prev_val = m_readypop_ev.Value();
193  rslt = pop(data);
194  if (rslt) {
195  m_readypush_ev.Inc();
196  break;
197  }
198  CFutex::EWaitResult wr = m_readypop_ev.WaitWhile(prev_val);
199  if (wr == CFutex::eWaitResultOk || wr == CFutex::eWaitResultOkFast)
200  rslt = pop(data);
201  }
202  }
203 };
204 
206 
207 #endif
#define END_IDBLOB_SCOPE
Definition: IdCassScope.hpp:40
#define BEGIN_IDBLOB_SCOPE
Definition: IdCassScope.hpp:39
Wrapper around Linux's futex.
Definition: srv_sync.hpp:141
EWaitResult
Type of result returned from WaitValueChange()
Definition: srv_sync.hpp:162
mpmc_bounded_queue_w & operator=(const mpmc_bounded_queue_w &)=delete
std::atomic< size_t > m_push_pos
Definition: mpmc_w.hpp:65
cell_t m_buffer[SZ]
Definition: mpmc_w.hpp:63
cacheline_pad_t m_pad0
Definition: mpmc_w.hpp:62
mpmc_bounded_queue_w & operator=(mpmc_bounded_queue_w &&)=delete
bool pop_wait(T *data, int64_t timeoutmks)
Definition: mpmc_w.hpp:175
bool pop(T *data)
Definition: mpmc_w.hpp:153
static constexpr const size_t kCpuCacheLineSz
Definition: mpmc_w.hpp:50
bool push_wait(TT &&data, int64_t timeoutmks)
Definition: mpmc_w.hpp:123
CFutex m_readypush_ev
Definition: mpmc_w.hpp:70
cacheline_pad_t m_pad3
Definition: mpmc_w.hpp:68
void pop_wait(T *data)
Definition: mpmc_w.hpp:189
cacheline_pad_t m_pad2
Definition: mpmc_w.hpp:66
CFutex m_readypop_ev
Definition: mpmc_w.hpp:71
static constexpr const size_t kPadSz
Definition: mpmc_w.hpp:51
mpmc_bounded_queue_w(const mpmc_bounded_queue_w &)=delete
atomic< size_t > m_size
Definition: mpmc_w.hpp:72
static constexpr const size_t kPadDataSz
Definition: mpmc_w.hpp:53
std::atomic< size_t > m_pop_pos
Definition: mpmc_w.hpp:67
size_t size() const
Definition: mpmc_w.hpp:93
cacheline_pad_t m_pad1
Definition: mpmc_w.hpp:64
bool push(TT &&data)
Definition: mpmc_w.hpp:98
void push_wait(TT &&data)
Definition: mpmc_w.hpp:138
mpmc_bounded_queue_w(mpmc_bounded_queue_w &&)=delete
#define T(s)
Definition: common.h:230
static int type
Definition: getdata.c:31
char data[12]
Definition: iconv.c:80
Int8 int64_t
int intptr_t
Definition: ncbitype.h:185
int i
BEGIN_IDBLOB_SCOPE USING_NCBI_SCOPE
Definition: mpmc_w.hpp:45
const GenericPointer< typename T::ValueType > T2 value
Definition: pointer.h:1227
std::atomic< size_t > m_sequence
Definition: mpmc_w.hpp:57
cacheline_pad_data_t m_pad
Definition: mpmc_w.hpp:58
Modified on Fri Sep 20 14:57:43 2024 by modify_doxy.py rev. 669887