47 template<
typename T,
size_t SZ>
76 static_assert ((SZ & (SZ - 1)) == 0,
"SZ template parameter value must be power of two");
77 static_assert (
sizeof(
intptr_t) ==
sizeof(
size_t),
"All of sudden size_t is of different size than intptr_t, you've to update sources");
86 for (
size_t i = 0;
i < SZ; ++
i)
87 m_buffer[
i].m_sequence.store(
i, std::memory_order_relaxed);
88 m_push_pos.store(0, std::memory_order_relaxed);
89 m_pop_pos.store(0, std::memory_order_relaxed);
102 size_t pos =
m_push_pos.load(std::memory_order_relaxed);
105 size_t seq = cell->
m_sequence.load(std::memory_order_acquire);
108 if (
m_push_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
114 pos =
m_push_pos.load(std::memory_order_relaxed);
116 cell->m_data = std::move(
data);
117 cell->m_sequence.store(pos + 1, std::memory_order_release);
129 else if (timeoutmks > 0) {
131 if (wr == CFutex::eWaitResultOk || wr == CFutex::eWaitResultOkFast)
142 rslt =
push(std::forward<TT>(
data));
148 if (wr == CFutex::eWaitResultOk || wr == CFutex::eWaitResultOkFast)
149 rslt =
push(std::forward<TT>(
data));
155 size_t pos =
m_pop_pos.load(std::memory_order_relaxed);
158 size_t seq = cell->
m_sequence.load(std::memory_order_acquire);
161 if (
m_pop_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
167 pos =
m_pop_pos.load(std::memory_order_relaxed);
169 *
data = std::move(cell->m_data);
171 cell->m_sequence.store(pos + SZ, std::memory_order_release);
181 else if (timeoutmks > 0) {
183 if (wr == CFutex::eWaitResultOk || wr == CFutex::eWaitResultOkFast)
199 if (wr == CFutex::eWaitResultOk || wr == CFutex::eWaitResultOkFast)
#define BEGIN_IDBLOB_SCOPE
Wrapper around Linux's futex.
EWaitResult
Type of result returned from WaitValueChange()
char[kPadDataSz] cacheline_pad_data_t
mpmc_bounded_queue_w & operator=(const mpmc_bounded_queue_w &)=delete
std::atomic< size_t > m_push_pos
char[kPadSz] cacheline_pad_t
mpmc_bounded_queue_w & operator=(mpmc_bounded_queue_w &&)=delete
bool pop_wait(T *data, int64_t timeoutmks)
static constexpr const size_t kCpuCacheLineSz
bool push_wait(TT &&data, int64_t timeoutmks)
static constexpr const size_t kPadSz
mpmc_bounded_queue_w(const mpmc_bounded_queue_w &)=delete
static constexpr const size_t kPadDataSz
std::atomic< size_t > m_pop_pos
void push_wait(TT &&data)
mpmc_bounded_queue_w(mpmc_bounded_queue_w &&)=delete
BEGIN_IDBLOB_SCOPE USING_NCBI_SCOPE
const GenericPointer< typename T::ValueType > T2 value
std::atomic< size_t > m_sequence
cacheline_pad_data_t m_pad