#pragma once #include #include #include #include #include /** * @brief Lock-free Single-Producer Single-Consumer ring buffer. * Cache-line padded to eliminate false sharing. * Capacity must be power of 2. * * BUG FIX: Original stored (t+1) & Mask into tail_, which collapsed the * index to the range [0, Capacity-1]. This caused the "full" check * next == head_ * to trigger when next wrapped to 0 and head_ was also 0 (i.e. after * exactly Capacity pushes without a pop), but it also made size() return * wrong values because the raw distance calculation assumed monotonically * growing indices. The standard SPSC pattern stores raw (never-masked) * indices and only applies the mask when indexing into buf_. Fixed below. */ template class alignas(64) SPSCQueue { static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be power of 2"); static constexpr size_t Mask = Capacity - 1; struct alignas(64) PaddedAtomic { std::atomic val{0}; char pad[64 - sizeof(std::atomic)]{}; }; alignas(64) std::array buf_; PaddedAtomic head_; // written by consumer (monotonically increasing) PaddedAtomic tail_; // written by producer (monotonically increasing) public: SPSCQueue() = default; SPSCQueue(const SPSCQueue&) = delete; SPSCQueue& operator=(const SPSCQueue&) = delete; // Producer side bool push(const T& item) noexcept { const size_t t = tail_.val.load(std::memory_order_relaxed); const size_t next = t + 1; // Full check: the queue is full if the slot next would occupy is still // held by the consumer (i.e. next & Mask == head_ & Mask, and we've // lapped). Simplified: if (next - head) == Capacity → full. if (next - head_.val.load(std::memory_order_acquire) > Mask) return false; buf_[t & Mask] = item; tail_.val.store(next, std::memory_order_release); return true; } bool push(T&& item) noexcept { const size_t t = tail_.val.load(std::memory_order_relaxed); const size_t next = t + 1; if (next - head_.val.load(std::memory_order_acquire) > Mask) return false; buf_[t & Mask] = std::move(item); tail_.val.store(next, std::memory_order_release); return true; } // Consumer side std::optional pop() noexcept { const size_t h = head_.val.load(std::memory_order_relaxed); if (h == tail_.val.load(std::memory_order_acquire)) return std::nullopt; T item = std::move(buf_[h & Mask]); head_.val.store(h + 1, std::memory_order_release); return item; } bool empty() const noexcept { return head_.val.load(std::memory_order_acquire) == tail_.val.load(std::memory_order_acquire); } size_t size() const noexcept { const size_t t = tail_.val.load(std::memory_order_acquire); const size_t h = head_.val.load(std::memory_order_acquire); return t - h; // correct because indices are monotonic, not masked } }; /** * @brief Lock-free MPMC queue using atomic CAS. */ template class MPMCQueue { static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be power of 2"); struct Slot { alignas(64) std::atomic seq; T data; }; alignas(64) std::array slots_; alignas(64) std::atomic enqueuePos_{0}; alignas(64) std::atomic dequeuePos_{0}; static constexpr size_t Mask = Capacity - 1; public: MPMCQueue() { for (size_t i = 0; i < Capacity; ++i) slots_[i].seq.store(i, std::memory_order_relaxed); } bool push(T&& item) noexcept { size_t pos = enqueuePos_.load(std::memory_order_relaxed); while (true) { Slot& slot = slots_[pos & Mask]; size_t seq = slot.seq.load(std::memory_order_acquire); intptr_t diff = static_cast(seq) - static_cast(pos); if (diff == 0) { if (enqueuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { slot.data = std::move(item); slot.seq.store(pos + 1, std::memory_order_release); return true; } } else if (diff < 0) { return false; } else { pos = enqueuePos_.load(std::memory_order_relaxed); } } } bool pop(T& item) noexcept { size_t pos = dequeuePos_.load(std::memory_order_relaxed); while (true) { Slot& slot = slots_[pos & Mask]; size_t seq = slot.seq.load(std::memory_order_acquire); intptr_t diff = static_cast(seq) - static_cast(pos + 1); if (diff == 0) { if (dequeuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { item = std::move(slot.data); slot.seq.store(pos + Capacity, std::memory_order_release); return true; } } else if (diff < 0) { return false; } else { pos = dequeuePos_.load(std::memory_order_relaxed); } } } };