Spaces:
Sleeping
Sleeping
File size: 7,387 Bytes
f0023cf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | // ============================================================================
// auditor.cpp β Nanobind Python Bindings for the ReconciliationEngine
// ============================================================================
//
// This file is the SOLE bridge between C++ and Python. It contains
// ONLY binding definitions β no business logic. All engine code lives
// in the header files:
// - order_pool.hpp β O(1) flat-array trade storage
// - timer_wheel.hpp β O(1) hierarchical timer wheel
// - spsc_ring_buffer.hpp β Lock-free SPSC ring buffer
// - reconciliation_engine.hpp β Orchestrator
//
// The compiled .pyd module exposes a single class:
// hft_auditor.ReconciliationEngine
//
// ============================================================================
#include "reconciliation_engine.hpp"
NB_MODULE(hft_auditor, m) {
nb::enum_<Difficulty>(m, "Difficulty")
.value("EASY", Difficulty::EASY)
.value("MEDIUM", Difficulty::MEDIUM)
.value("HARD", Difficulty::HARD)
.export_values();
m.doc() = "High-Frequency Trading Reconciliation Engine\n"
"C++20 zero-allocation backend for RL-based trade auditing.\n"
"Processes 1M+ trades/sec with O(1) matching, O(1) expiration,\n"
"and lock-free concurrent ingestion.";
nb::class_<ReconciliationEngine>(m, "ReconciliationEngine")
// ββ Constructor ββββββββββββββββββββββββββββββββββββββββββββββ
.def(nb::init<size_t>(), nb::arg("capacity") = 1048576,
"Initialize the engine with a pre-allocated trade pool.\n"
"Capacity is rounded up to the next power of 2.\n"
"Default: 1,048,576 (2^20) slots.")
// ββ Producer API (lock-free, thread-safe) ββββββββββββββββββββ
.def("submit_trade", &ReconciliationEngine::submit_trade,
nb::arg("trade_id"), nb::arg("price"), nb::arg("quantity"),
nb::arg("counterparty_id"), nb::arg("timestamp_ns"),
"Enqueue a trade into the lock-free SPSC ring buffer.\n"
"Safe to call from a separate ingestion thread.\n"
"Returns False if the buffer is full (back-pressure).")
// ββ Engine API (single-threaded) βββββββββββββββββββββββββββββ
.def("tick", &ReconciliationEngine::tick, nb::arg("new_watermark_ns"),
"Main processing loop. Call once per RL step.\n"
"1. Drains the SPSC ring buffer into the OrderPool\n"
"2. Advances the timer wheel watermark\n"
"3. Expires trades older than Ξ_max (5.0 seconds)\n"
"Returns: number of trades ingested from the ring buffer.")
.def("reconcile", &ReconciliationEngine::reconcile, nb::arg("trade_id"),
nb::arg("expected_price"), nb::arg("expected_qty"),
"Match a trade against an external bank receipt.\n"
"Returns: 0=MATCH, 1=PRICE_MISMATCH, 2=QTY_MISMATCH,\n"
" 3=NOT_FOUND, 4=ALREADY_DONE")
.def("ingest_trade", &ReconciliationEngine::ingest_trade,
nb::arg("trade_id"), nb::arg("price"), nb::arg("quantity"),
nb::arg("counterparty_id"), nb::arg("timestamp_ns"),
"Direct insertion bypassing the ring buffer.\n"
"Use in single-threaded mode (no separate ingestion thread).")
.def("ingest_trade_labeled", &ReconciliationEngine::ingest_trade_labeled,
nb::arg("trade_id"), nb::arg("price"), nb::arg("quantity"),
nb::arg("counterparty_id"), nb::arg("timestamp_ns"),
nb::arg("is_anomaly"),
"Ingest a trade with a known ground-truth label (0=safe, 1=anomaly).")
.def("generate_batch", &ReconciliationEngine::generate_batch,
nb::arg("difficulty"), nb::arg("batch_size"), nb::arg("timestamp_ns"),
"Generate a batch of trades with difficulty-appropriate anomaly labels.\n"
"Returns: number of anomalies in the batch.")
.def("compute_reward",
[](ReconciliationEngine& self, nb::ndarray<uint8_t, nb::ndim<1>> actions) -> float {
return self.compute_reward(actions.data(), actions.shape(0));
},
nb::arg("agent_actions"),
"Compute asymmetric reward for the agent's decisions on expired trades.\n"
"Cost matrix: TP=+1.0, TN=+0.5, FP=+0.1, FN=0.0.")
.def("set_seed", &ReconciliationEngine::set_seed, nb::arg("seed"),
"Set the PRNG seed for reproducible batch generation.")
// ββ Observation (zero-copy) ββββββββββββββββββββββββββββββββββ
.def("get_observation_matrix",
&ReconciliationEngine::get_observation_matrix,
nb::rv_policy::reference_internal,
"Zero-copy (N, 4) numpy matrix of active trade features:\n"
" [0] time_elapsed: (watermark - trade_ts) / Ξ_max\n"
" [1] price_delta: placeholder (0.0)\n"
" [2] missing_frequency: total_expired / total_ingested\n"
" [3] risk_score: (counterparty_id %% 100) / 100.0\n"
"Memory is owned by C++; Python must not outlive the engine.")
.def("get_anomaly_matrix",
&ReconciliationEngine::get_anomaly_matrix,
nb::rv_policy::reference_internal,
"Zero-copy (N, 4) numpy matrix of EXPIRED trade features.\n"
"Memory owned by C++; do not retain reference beyond next tick().")
// ββ Read-only properties βββββββββββββββββββββββββββββββββββββ
.def_prop_ro("total_ingested", &ReconciliationEngine::total_ingested,
"Total trades ingested since engine creation.")
.def_prop_ro("total_reconciled", &ReconciliationEngine::total_reconciled,
"Total trades successfully reconciled.")
.def_prop_ro("total_expired", &ReconciliationEngine::total_expired,
"Total trades expired (exceeded Ξ_max threshold).")
.def_prop_ro("last_expired_count", &ReconciliationEngine::last_expired_count,
"Number of trades expired in the most recent tick() call.")
.def_prop_ro("last_tp", &ReconciliationEngine::last_tp, "True Positives in last reward call.")
.def_prop_ro("last_tn", &ReconciliationEngine::last_tn, "True Negatives in last reward call.")
.def_prop_ro("last_fp", &ReconciliationEngine::last_fp, "False Positives in last reward call.")
.def_prop_ro("last_fn", &ReconciliationEngine::last_fn, "False Negatives in last reward call.")
.def_prop_ro("active_count", &ReconciliationEngine::active_count,
"Number of currently active (unreconciled) trades.")
.def_prop_ro("ring_buffer_size", &ReconciliationEngine::ring_buffer_size,
"Current number of entries in the SPSC ring buffer.")
.def_prop_ro("watermark", &ReconciliationEngine::watermark,
"Current event-time watermark in nanoseconds.")
.def_prop_ro("pool_capacity", &ReconciliationEngine::pool_capacity,
"Total pool capacity (power of 2).");
} |