#pragma once #ifdef EUNEX_USE_KAFKA #include "persistence/PersistenceStore.hpp" #include #include #include #include #include #include namespace eunex::persistence { struct KafkaConfig { std::string brokers = "localhost:9092"; std::string topic = "eunex.recovery.fragments"; std::string groupId = "eunex-recovery"; int flushTimeoutMs = 5000; int batchSize = 100; }; // ── Kafka-backed persistence store ───────────────────────────────── class KafkaStore : public PersistenceStore { public: explicit KafkaStore(const KafkaConfig& cfg) : config_(cfg) { initProducer(); initConsumer(); } ~KafkaStore() override { if (producer_) producer_->flush(config_.flushTimeoutMs); } void append(const Fragment& frag) override { if (!producer_ || !topic_) return; std::vector buf = serialize(frag); std::string key = std::to_string(frag.originId) + ":" + std::to_string(frag.originKey); RdKafka::ErrorCode err = producer_->produce( topic_.get(), RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, buf.data(), buf.size(), key.data(), key.size(), 0, nullptr); if (err != RdKafka::ERR_NO_ERROR) { std::cerr << "[KafkaStore] produce failed: " << RdKafka::err2str(err) << "\n"; } producer_->poll(0); count_.fetch_add(1); } std::vector readAll() const override { if (!consumer_) return {}; std::vector result; while (true) { auto msg = std::unique_ptr( consumer_->consume(1000)); if (!msg || msg->err() == RdKafka::ERR__TIMED_OUT || msg->err() == RdKafka::ERR__PARTITION_EOF) { break; } if (msg->err() != RdKafka::ERR_NO_ERROR) break; Fragment frag = deserialize( static_cast(msg->payload()), msg->len()); result.push_back(frag); } return result; } size_t size() const override { return count_.load(); } void flush() override { if (producer_) producer_->flush(config_.flushTimeoutMs); } bool isConnected() const { return producer_ != nullptr; } private: KafkaConfig config_; std::unique_ptr producer_; std::unique_ptr topic_; mutable std::unique_ptr consumer_; std::atomic count_{0}; void initProducer() { std::string errstr; auto conf = std::unique_ptr( RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); conf->set("bootstrap.servers", config_.brokers, errstr); conf->set("queue.buffering.max.messages", "100000", errstr); conf->set("batch.size", std::to_string(config_.batchSize), errstr); producer_.reset(RdKafka::Producer::create(conf.get(), errstr)); if (!producer_) { std::cerr << "[KafkaStore] producer creation failed: " << errstr << "\n"; return; } auto tconf = std::unique_ptr( RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC)); topic_.reset(RdKafka::Topic::create(producer_.get(), config_.topic, tconf.get(), errstr)); } void initConsumer() { std::string errstr; auto conf = std::unique_ptr( RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); conf->set("bootstrap.servers", config_.brokers, errstr); conf->set("group.id", config_.groupId, errstr); conf->set("auto.offset.reset", "earliest", errstr); conf->set("enable.auto.commit", "false", errstr); consumer_.reset(RdKafka::KafkaConsumer::create(conf.get(), errstr)); if (consumer_) { consumer_->subscribe({config_.topic}); } } static std::vector serialize(const Fragment& frag) { // Header: fixed fields, then payload size_t headerSize = sizeof(uint64_t) * 2 + sizeof(uint16_t) + sizeof(uint32_t) + sizeof(uint8_t) + sizeof(int) + sizeof(size_t); std::vector buf(headerSize + frag.payloadSize); uint8_t* p = buf.data(); std::memcpy(p, &frag.sequenceNumber, sizeof(uint64_t)); p += sizeof(uint64_t); std::memcpy(p, &frag.chainId, sizeof(uint64_t)); p += sizeof(uint64_t); std::memcpy(p, &frag.originId, sizeof(uint16_t)); p += sizeof(uint16_t); std::memcpy(p, &frag.originKey, sizeof(uint32_t)); p += sizeof(uint32_t); std::memcpy(p, &frag.persistenceId, sizeof(uint8_t)); p += sizeof(uint8_t); std::memcpy(p, &frag.nextCount, sizeof(int)); p += sizeof(int); std::memcpy(p, &frag.payloadSize, sizeof(size_t)); p += sizeof(size_t); std::memcpy(p, frag.payload, frag.payloadSize); return buf; } static Fragment deserialize(const uint8_t* data, size_t len) { Fragment frag{}; const uint8_t* p = data; std::memcpy(&frag.sequenceNumber, p, sizeof(uint64_t)); p += sizeof(uint64_t); std::memcpy(&frag.chainId, p, sizeof(uint64_t)); p += sizeof(uint64_t); std::memcpy(&frag.originId, p, sizeof(uint16_t)); p += sizeof(uint16_t); std::memcpy(&frag.originKey, p, sizeof(uint32_t)); p += sizeof(uint32_t); std::memcpy(&frag.persistenceId, p, sizeof(uint8_t)); p += sizeof(uint8_t); std::memcpy(&frag.nextCount, p, sizeof(int)); p += sizeof(int); std::memcpy(&frag.payloadSize, p, sizeof(size_t)); p += sizeof(size_t); size_t remaining = len - static_cast(p - data); size_t copySize = std::min(frag.payloadSize, remaining); copySize = std::min(copySize, sizeof(frag.payload)); std::memcpy(frag.payload, p, copySize); return frag; } }; } // namespace eunex::persistence #endif // EUNEX_USE_KAFKA