| | #pragma once
|
| |
|
| | #include <atomic>
|
| | #include <utility>
|
| | #include <cstring>
|
| | #include <type_traits>
|
| | #include <cstdint>
|
| |
|
| | #include "libipc/def.h"
|
| |
|
| | #include "libipc/platform/detail.h"
|
| | #include "libipc/circ/elem_def.h"
|
| | #include "libipc/utility/log.h"
|
| | #include "libipc/utility/utility.h"
|
| |
|
| | namespace ipc {
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | template <typename Flag>
|
| | struct prod_cons_impl;
|
| |
|
| | template <>
|
| | struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
| |
|
| | template <std::size_t DataSize, std::size_t AlignSize>
|
| | struct elem_t {
|
| | std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
| | };
|
| |
|
| | alignas(cache_line_size) std::atomic<circ::u2_t> rd_;
|
| | alignas(cache_line_size) std::atomic<circ::u2_t> wt_;
|
| |
|
| | constexpr circ::u2_t cursor() const noexcept {
|
| | return 0;
|
| | }
|
| |
|
| | template <typename W, typename F, typename E>
|
| | bool push(W* , F&& f, E* elems) {
|
| | auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed));
|
| | if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) {
|
| | return false;
|
| | }
|
| | std::forward<F>(f)(&(elems[cur_wt].data_));
|
| | wt_.fetch_add(1, std::memory_order_release);
|
| | return true;
|
| | }
|
| |
|
| | |
| | |
| | |
| |
|
| | template <typename W, typename F, typename E>
|
| | bool force_push(W* wrapper, F&&, E*) {
|
| | wrapper->elems()->disconnect_receiver(~static_cast<circ::cc_t>(0u));
|
| | return false;
|
| | }
|
| |
|
| | template <typename W, typename F, typename R, typename E>
|
| | bool pop(W* , circ::u2_t& , F&& f, R&& out, E* elems) {
|
| | auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
|
| | if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
|
| | return false;
|
| | }
|
| | std::forward<F>(f)(&(elems[cur_rd].data_));
|
| | std::forward<R>(out)(true);
|
| | rd_.fetch_add(1, std::memory_order_release);
|
| | return true;
|
| | }
|
| | };
|
| |
|
| | template <>
|
| | struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
|
| | : prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
| |
|
| | template <typename W, typename F, typename E>
|
| | bool force_push(W* wrapper, F&&, E*) {
|
| | wrapper->elems()->disconnect_receiver(1);
|
| | return false;
|
| | }
|
| |
|
| | template <typename W, typename F, typename R,
|
| | template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
| | bool pop(W* , circ::u2_t& , F&& f, R&& out, E<DS, AS>* elems) {
|
| | byte_t buff[DS];
|
| | for (unsigned k = 0;;) {
|
| | auto cur_rd = rd_.load(std::memory_order_relaxed);
|
| | if (circ::index_of(cur_rd) ==
|
| | circ::index_of(wt_.load(std::memory_order_acquire))) {
|
| | return false;
|
| | }
|
| | std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
|
| | if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
| | std::forward<F>(f)(buff);
|
| | std::forward<R>(out)(true);
|
| | return true;
|
| | }
|
| | ipc::yield(k);
|
| | }
|
| | }
|
| | };
|
| |
|
| | template <>
|
| | struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
| | : prod_cons_impl<wr<relat::single, relat::multi, trans::unicast>> {
|
| |
|
| | using flag_t = std::uint64_t;
|
| |
|
| | template <std::size_t DataSize, std::size_t AlignSize>
|
| | struct elem_t {
|
| | std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
| | std::atomic<flag_t> f_ct_ { 0 };
|
| | };
|
| |
|
| | alignas(cache_line_size) std::atomic<circ::u2_t> ct_;
|
| |
|
| | template <typename W, typename F, typename E>
|
| | bool push(W* , F&& f, E* elems) {
|
| | circ::u2_t cur_ct, nxt_ct;
|
| | for (unsigned k = 0;;) {
|
| | cur_ct = ct_.load(std::memory_order_relaxed);
|
| | if (circ::index_of(nxt_ct = cur_ct + 1) ==
|
| | circ::index_of(rd_.load(std::memory_order_acquire))) {
|
| | return false;
|
| | }
|
| | if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_acq_rel)) {
|
| | break;
|
| | }
|
| | ipc::yield(k);
|
| | }
|
| | auto* el = elems + circ::index_of(cur_ct);
|
| | std::forward<F>(f)(&(el->data_));
|
| |
|
| | el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release);
|
| | while (1) {
|
| | auto cac_ct = el->f_ct_.load(std::memory_order_acquire);
|
| | if (cur_ct != wt_.load(std::memory_order_relaxed)) {
|
| | return true;
|
| | }
|
| | if ((~cac_ct) != cur_ct) {
|
| | return true;
|
| | }
|
| | if (!el->f_ct_.compare_exchange_strong(cac_ct, 0, std::memory_order_relaxed)) {
|
| | return true;
|
| | }
|
| | wt_.store(nxt_ct, std::memory_order_release);
|
| | cur_ct = nxt_ct;
|
| | nxt_ct = cur_ct + 1;
|
| | el = elems + circ::index_of(cur_ct);
|
| | }
|
| | return true;
|
| | }
|
| |
|
| | template <typename W, typename F, typename E>
|
| | bool force_push(W* wrapper, F&&, E*) {
|
| | wrapper->elems()->disconnect_receiver(1);
|
| | return false;
|
| | }
|
| |
|
| | template <typename W, typename F, typename R,
|
| | template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
| | bool pop(W* , circ::u2_t& , F&& f, R&& out, E<DS, AS>* elems) {
|
| | byte_t buff[DS];
|
| | for (unsigned k = 0;;) {
|
| | auto cur_rd = rd_.load(std::memory_order_relaxed);
|
| | auto cur_wt = wt_.load(std::memory_order_acquire);
|
| | auto id_rd = circ::index_of(cur_rd);
|
| | auto id_wt = circ::index_of(cur_wt);
|
| | if (id_rd == id_wt) {
|
| | auto* el = elems + id_wt;
|
| | auto cac_ct = el->f_ct_.load(std::memory_order_acquire);
|
| | if ((~cac_ct) != cur_wt) {
|
| | return false;
|
| | }
|
| | if (el->f_ct_.compare_exchange_weak(cac_ct, 0, std::memory_order_relaxed)) {
|
| | wt_.store(cur_wt + 1, std::memory_order_release);
|
| | }
|
| | k = 0;
|
| | }
|
| | else {
|
| | std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
|
| | if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
| | std::forward<F>(f)(buff);
|
| | std::forward<R>(out)(true);
|
| | return true;
|
| | }
|
| | ipc::yield(k);
|
| | }
|
| | }
|
| | }
|
| | };
|
| |
|
| | template <>
|
| | struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
| |
|
| | using rc_t = std::uint64_t;
|
| |
|
| | enum : rc_t {
|
| | ep_mask = 0x00000000ffffffffull,
|
| | ep_incr = 0x0000000100000000ull
|
| | };
|
| |
|
| | template <std::size_t DataSize, std::size_t AlignSize>
|
| | struct elem_t {
|
| | std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
| | std::atomic<rc_t> rc_ { 0 };
|
| | };
|
| |
|
| | alignas(cache_line_size) std::atomic<circ::u2_t> wt_;
|
| | alignas(cache_line_size) rc_t epoch_ { 0 };
|
| |
|
| | circ::u2_t cursor() const noexcept {
|
| | return wt_.load(std::memory_order_acquire);
|
| | }
|
| |
|
| | template <typename W, typename F, typename E>
|
| | bool push(W* wrapper, F&& f, E* elems) {
|
| | E* el;
|
| | for (unsigned k = 0;;) {
|
| | circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
|
| | if (cc == 0) return false;
|
| | el = elems + circ::index_of(wt_.load(std::memory_order_relaxed));
|
| |
|
| | auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
| | circ::cc_t rem_cc = cur_rc & ep_mask;
|
| | if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch_)) {
|
| | return false;
|
| | }
|
| |
|
| | if (el->rc_.compare_exchange_weak(
|
| | cur_rc, epoch_ | static_cast<rc_t>(cc), std::memory_order_release)) {
|
| | break;
|
| | }
|
| | ipc::yield(k);
|
| | }
|
| | std::forward<F>(f)(&(el->data_));
|
| | wt_.fetch_add(1, std::memory_order_release);
|
| | return true;
|
| | }
|
| |
|
| | template <typename W, typename F, typename E>
|
| | bool force_push(W* wrapper, F&& f, E* elems) {
|
| | E* el;
|
| | epoch_ += ep_incr;
|
| | for (unsigned k = 0;;) {
|
| | circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
|
| | if (cc == 0) return false;
|
| | el = elems + circ::index_of(wt_.load(std::memory_order_relaxed));
|
| |
|
| | auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
| | circ::cc_t rem_cc = cur_rc & ep_mask;
|
| | if (cc & rem_cc) {
|
| | ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc);
|
| | cc = wrapper->elems()->disconnect_receiver(rem_cc);
|
| | if (cc == 0) return false;
|
| | }
|
| |
|
| | if (el->rc_.compare_exchange_weak(
|
| | cur_rc, epoch_ | static_cast<rc_t>(cc), std::memory_order_release)) {
|
| | break;
|
| | }
|
| | ipc::yield(k);
|
| | }
|
| | std::forward<F>(f)(&(el->data_));
|
| | wt_.fetch_add(1, std::memory_order_release);
|
| | return true;
|
| | }
|
| |
|
| | template <typename W, typename F, typename R, typename E>
|
| | bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) {
|
| | if (cur == cursor()) return false;
|
| | auto* el = elems + circ::index_of(cur++);
|
| | std::forward<F>(f)(&(el->data_));
|
| | for (unsigned k = 0;;) {
|
| | auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
| | if ((cur_rc & ep_mask) == 0) {
|
| | std::forward<R>(out)(true);
|
| | return true;
|
| | }
|
| | auto nxt_rc = cur_rc & ~static_cast<rc_t>(wrapper->connected_id());
|
| | if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
|
| | std::forward<R>(out)((nxt_rc & ep_mask) == 0);
|
| | return true;
|
| | }
|
| | ipc::yield(k);
|
| | }
|
| | }
|
| | };
|
| |
|
| | template <>
|
| | struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
|
| |
|
| | using rc_t = std::uint64_t;
|
| | using flag_t = std::uint64_t;
|
| |
|
| | enum : rc_t {
|
| | rc_mask = 0x00000000ffffffffull,
|
| | ep_mask = 0x00ffffffffffffffull,
|
| | ep_incr = 0x0100000000000000ull,
|
| | ic_mask = 0xff000000ffffffffull,
|
| | ic_incr = 0x0000000100000000ull
|
| | };
|
| |
|
| | template <std::size_t DataSize, std::size_t AlignSize>
|
| | struct elem_t {
|
| | std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
| | std::atomic<rc_t > rc_ { 0 };
|
| | std::atomic<flag_t> f_ct_ { 0 };
|
| | };
|
| |
|
| | alignas(cache_line_size) std::atomic<circ::u2_t> ct_;
|
| | alignas(cache_line_size) std::atomic<rc_t> epoch_ { 0 };
|
| |
|
| | circ::u2_t cursor() const noexcept {
|
| | return ct_.load(std::memory_order_acquire);
|
| | }
|
| |
|
| | constexpr static rc_t inc_rc(rc_t rc) noexcept {
|
| | return (rc & ic_mask) | ((rc + ic_incr) & ~ic_mask);
|
| | }
|
| |
|
| | constexpr static rc_t inc_mask(rc_t rc) noexcept {
|
| | return inc_rc(rc) & ~rc_mask;
|
| | }
|
| |
|
| | template <typename W, typename F, typename E>
|
| | bool push(W* wrapper, F&& f, E* elems) {
|
| | E* el;
|
| | circ::u2_t cur_ct;
|
| | rc_t epoch = epoch_.load(std::memory_order_acquire);
|
| | for (unsigned k = 0;;) {
|
| | circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
|
| | if (cc == 0) return false;
|
| | el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
|
| |
|
| | auto cur_rc = el->rc_.load(std::memory_order_relaxed);
|
| | circ::cc_t rem_cc = cur_rc & rc_mask;
|
| | if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch)) {
|
| | return false;
|
| | }
|
| | else if (!rem_cc) {
|
| | auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
|
| | if ((cur_fl != cur_ct) && cur_fl) {
|
| | return false;
|
| | }
|
| | }
|
| |
|
| | if (el->rc_.compare_exchange_weak(
|
| | cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_relaxed) &&
|
| | epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) {
|
| | break;
|
| | }
|
| | ipc::yield(k);
|
| | }
|
| |
|
| | ct_.store(cur_ct + 1, std::memory_order_release);
|
| | std::forward<F>(f)(&(el->data_));
|
| |
|
| | el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release);
|
| | return true;
|
| | }
|
| |
|
| | template <typename W, typename F, typename E>
|
| | bool force_push(W* wrapper, F&& f, E* elems) {
|
| | E* el;
|
| | circ::u2_t cur_ct;
|
| | rc_t epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr;
|
| | for (unsigned k = 0;;) {
|
| | circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
|
| | if (cc == 0) return false;
|
| | el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
|
| |
|
| | auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
| | circ::cc_t rem_cc = cur_rc & rc_mask;
|
| | if (cc & rem_cc) {
|
| | ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc);
|
| | cc = wrapper->elems()->disconnect_receiver(rem_cc);
|
| | if (cc == 0) return false;
|
| | }
|
| |
|
| | if (el->rc_.compare_exchange_weak(
|
| | cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_relaxed)) {
|
| | if (epoch == epoch_.load(std::memory_order_acquire)) {
|
| | break;
|
| | }
|
| | else if (push(wrapper, std::forward<F>(f), elems)) {
|
| | return true;
|
| | }
|
| | epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr;
|
| | }
|
| | ipc::yield(k);
|
| | }
|
| |
|
| | ct_.store(cur_ct + 1, std::memory_order_release);
|
| | std::forward<F>(f)(&(el->data_));
|
| |
|
| | el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release);
|
| | return true;
|
| | }
|
| |
|
| | template <typename W, typename F, typename R, typename E, std::size_t N>
|
| | bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) {
|
| | auto* el = elems + circ::index_of(cur);
|
| | auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
|
| | if (cur_fl != ~static_cast<flag_t>(cur)) {
|
| | return false;
|
| | }
|
| | ++cur;
|
| | std::forward<F>(f)(&(el->data_));
|
| | for (unsigned k = 0;;) {
|
| | auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
| | if ((cur_rc & rc_mask) == 0) {
|
| | std::forward<R>(out)(true);
|
| | el->f_ct_.store(cur + N - 1, std::memory_order_release);
|
| | return true;
|
| | }
|
| | auto nxt_rc = inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id());
|
| | bool last_one = false;
|
| | if ((last_one = (nxt_rc & rc_mask) == 0)) {
|
| | el->f_ct_.store(cur + N - 1, std::memory_order_release);
|
| | }
|
| | if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
|
| | std::forward<R>(out)(last_one);
|
| | return true;
|
| | }
|
| | ipc::yield(k);
|
| | }
|
| | }
|
| | };
|
| |
|
| | }
|
| |
|