File size: 10,760 Bytes
1cbe390
8f4d2d0
 
5412a90
8f4d2d0
44cddac
 
 
a3b10ab
8f4d2d0
 
 
 
a3b10ab
1cbe390
8f4d2d0
 
44cddac
 
 
a3b10ab
44cddac
a3b10ab
06cfa59
8f4d2d0
a3b10ab
 
 
 
06cfa59
f178600
8f4d2d0
 
 
 
a3b10ab
 
 
 
8f4d2d0
1cbe390
f178600
a3b10ab
1cbe390
8f4d2d0
a3b10ab
 
 
06cfa59
 
 
 
 
 
 
 
 
1cbe390
a3b10ab
 
 
f178600
 
 
 
a3b10ab
f178600
 
 
a3b10ab
1cbe390
44cddac
8f4d2d0
1cbe390
44cddac
a3b10ab
1cbe390
a3b10ab
 
 
 
 
 
 
 
 
 
 
 
8f4d2d0
1cbe390
06cfa59
f178600
 
 
 
 
 
 
 
 
a3b10ab
1cbe390
44cddac
a3b10ab
1cbe390
a3b10ab
 
1cbe390
a3b10ab
 
 
1cbe390
a3b10ab
44cddac
 
f178600
 
 
 
 
 
44cddac
 
a3b10ab
 
 
 
 
f178600
06cfa59
 
 
 
 
 
a3b10ab
1cbe390
a3b10ab
 
 
 
 
f178600
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8f4d2d0
 
a3b10ab
 
 
 
 
 
 
 
 
1cbe390
a3b10ab
 
 
 
 
 
 
 
 
1cbe390
a3b10ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f178600
 
 
a3b10ab
 
 
 
06cfa59
 
 
 
 
a3b10ab
8f4d2d0
a3b10ab
 
8f4d2d0
 
1cbe390
a3b10ab
 
8f4d2d0
1cbe390
a3b10ab
8f4d2d0
 
f178600
a3b10ab
8f4d2d0
a3b10ab
8f4d2d0
1cbe390
a3b10ab
 
 
 
 
 
 
8f4d2d0
 
a3b10ab
06cfa59
 
 
 
 
 
 
 
1cbe390
8f4d2d0
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
// ====================================================================
// EuNEx Matching Engine β€” Main Entry Point
//
// Multi-threaded actor topology (mirrors Optiq architecture):
//
//   Core 0: OEGActor + FIXAcceptorActor
//   Core 1: MECoreActor per symbol (matching engine)
//   Core 2: MDGActor
//   Core 3: ClearingHouseActor + AITraderActor
//
// Optiq equivalent topology:
//   OEActor β†’ LogicalCoreActor (Book) β†’ MDLimit β†’ MDIMP
//                                     β†’ OE Ack (back to OEActor)
//                                     β†’ ClearingHouse (via Kafka/PTB)
// ====================================================================

#include "engine/SimplxShim.hpp"
#include "actors/MECoreActor.hpp"
#include "actors/OEGActor.hpp"
#include "actors/MDGActor.hpp"
#include "actors/ClearingHouseActor.hpp"
#include "actors/FIXAcceptorActor.hpp"
#include "actors/AITraderActor.hpp"
#include "persistence/KafkaBus.hpp"
#include <iostream>
#include <thread>
#include <chrono>
#include <csignal>
#include <atomic>
#include <cstdlib>
#include <unordered_map>

using namespace tredzone;
using namespace eunex;

static std::atomic<bool> g_running{true};

void signalHandler(int) { g_running = false; }

int main() {
    std::cout << "===========================================\n";
    std::cout << "  EuNEx Matching Engine v0.6\n";
    std::cout << "  C++ Actor Architecture (Optiq model)\n";
    std::cout << "===========================================\n\n";

    std::signal(SIGINT, signalHandler);
    std::signal(SIGTERM, signalHandler);

    // --Kafka Bus (optional) --------------------------------------
    std::unique_ptr<KafkaBus> kafkaBus;
    const char* kafkaBrokers = std::getenv("EUNEX_KAFKA_BROKERS");
    if (kafkaBrokers && kafkaBrokers[0] != '\0') {
        KafkaBusConfig cfg;
        cfg.brokers = kafkaBrokers;
        kafkaBus = std::make_unique<KafkaBus>(cfg);
    }

    // --Symbol definitions ----------------------------------------
    constexpr SymbolIndex_t SYM_AAPL  = 1;
    constexpr SymbolIndex_t SYM_MSFT  = 2;
    constexpr SymbolIndex_t SYM_GOOGL = 3;
    constexpr SymbolIndex_t SYM_TSLA  = 4;
    constexpr SymbolIndex_t SYM_NVDA  = 5;
    constexpr SymbolIndex_t SYM_AMD   = 6;
    constexpr SymbolIndex_t SYM_ENX   = 7;

    std::vector<SymbolIndex_t> allSymbols = {
        SYM_AAPL, SYM_MSFT, SYM_GOOGL, SYM_TSLA, SYM_NVDA, SYM_AMD, SYM_ENX
    };

    // --Core 0: OE Gateway ----------------------------------------
    auto oeGateway = std::make_unique<OEGActor>();

    // --Core 2: Market Data ---------------------------------------
    auto mdActor = std::make_unique<MDGActor>();

    // --Core 3: Clearing House ------------------------------------
    auto chActor = std::make_unique<ClearingHouseActor>();

    // Map AI trader sessions to clearing house members
    for (int i = 0; i < 10; ++i) {
        chActor->mapSession(static_cast<SessionId_t>(200 + i),
                            static_cast<MemberId_t>(i + 1));
    }
    // Map FIX gateway sessions (100-109) to members too
    for (int i = 0; i < 10; ++i) {
        chActor->mapSession(static_cast<SessionId_t>(100 + i),
                            static_cast<MemberId_t>(i + 1));
    }

    // --Core 1: Order Books (per symbol) --------------------------
    KafkaBus* kb = kafkaBus.get();
    auto oeId = oeGateway->getActorId();
    auto mdId = mdActor->getActorId();
    auto chId = chActor->getActorId();

    std::unordered_map<SymbolIndex_t, std::unique_ptr<MECoreActor>> books;
    for (auto sym : allSymbols) {
        books[sym] = std::make_unique<MECoreActor>(sym, oeId, mdId, chId, kb);
        oeGateway->mapSymbol(sym, books[sym]->getActorId());
    }

    // --Core 0: FIX Gateway --------------------------------------
    auto fixGateway = std::make_unique<FIXAcceptorActor>(oeGateway->getActorId(), 9001);

    // --Core 3: AI Trader -----------------------------------------
    auto aiTrader = std::make_unique<AITraderActor>(oeGateway->getActorId(), allSymbols);

    // --Wire exec report subscribers ------------------------------
    oeGateway->addExecReportSubscriber(fixGateway->getActorId());
    oeGateway->addExecReportSubscriber(aiTrader->getActorId());

    // --Print topology --------------------------------------------
    std::cout << "Actor topology:\n";
    std::cout << "  Core 0: OEG (id=" << oeGateway->getActorId().id
              << "), FIXAcceptor (id=" << fixGateway->getActorId().id << ")\n";
    std::cout << "  Core 1: ";
    for (auto sym : allSymbols) {
        std::cout << FIXAcceptorActor::symbolToString(sym) << "(id="
                  << books[sym]->getActorId().id << ") ";
    }
    std::cout << "\n";
    std::cout << "  Core 2: MDG (id=" << mdActor->getActorId().id << ")\n";
    std::cout << "  Core 3: CH (id=" << chActor->getActorId().id
              << "), AITrader (id=" << aiTrader->getActorId().id << ")\n\n";

    std::cout << "Services:\n";
    std::cout << "  FIX Gateway:  TCP port 9001\n";
    std::cout << "  AI Traders:   10 members (MBR01-MBR10)\n";
    std::cout << "  Symbols:      AAPL, MSFT, GOOGL, TSLA, NVDA, AMD, ENX\n";
    if (kafkaBus && kafkaBus->isConnected()) {
        std::cout << "  Kafka Bus:    " << kafkaBrokers << " (connected)\n";
    } else {
        std::cout << "  Kafka Bus:    disabled (set EUNEX_KAFKA_BROKERS to enable)\n";
    }
    std::cout << "\n";

    // --Seed initial orders for AI to have market data ------------
    std::cout << "Seeding initial order book...\n";
    SessionId_t seedSession = 200;

    struct SeedOrder { SymbolIndex_t sym; Side side; double price; Quantity_t qty; };
    SeedOrder seeds[] = {
        // AAPL ~$154
        {SYM_AAPL,  Side::Sell, 155.00, 100}, {SYM_AAPL,  Side::Sell, 154.50, 200},
        {SYM_AAPL,  Side::Buy,  153.50, 150}, {SYM_AAPL,  Side::Buy,  153.00, 100},
        // MSFT ~$324
        {SYM_MSFT,  Side::Sell, 325.00, 100}, {SYM_MSFT,  Side::Sell, 324.50, 150},
        {SYM_MSFT,  Side::Buy,  323.50, 200}, {SYM_MSFT,  Side::Buy,  323.00, 100},
        // GOOGL ~$141
        {SYM_GOOGL, Side::Sell, 142.00, 100}, {SYM_GOOGL, Side::Sell, 141.50, 200},
        {SYM_GOOGL, Side::Buy,  140.50, 150}, {SYM_GOOGL, Side::Buy,  140.00, 100},
        // TSLA ~$375
        {SYM_TSLA,  Side::Sell, 376.00, 80},  {SYM_TSLA,  Side::Sell, 375.50, 120},
        {SYM_TSLA,  Side::Buy,  374.50, 100}, {SYM_TSLA,  Side::Buy,  374.00, 80},
        // NVDA ~$201
        {SYM_NVDA,  Side::Sell, 202.00, 100}, {SYM_NVDA,  Side::Sell, 201.50, 150},
        {SYM_NVDA,  Side::Buy,  200.50, 120}, {SYM_NVDA,  Side::Buy,  200.00, 100},
        // AMD ~$320
        {SYM_AMD,   Side::Sell, 321.00, 90},  {SYM_AMD,   Side::Sell, 320.50, 130},
        {SYM_AMD,   Side::Buy,  319.50, 110}, {SYM_AMD,   Side::Buy,  319.00, 80},
        // ENX ~EUR146
        {SYM_ENX,   Side::Sell, 147.00, 60},  {SYM_ENX,   Side::Sell, 146.50, 100},
        {SYM_ENX,   Side::Buy,  145.50, 80},  {SYM_ENX,   Side::Buy,  145.00, 60},
    };

    ClOrdId_t seedClOrd = 1;
    for (auto& s : seeds) {
        oeGateway->submitNewOrder(seedClOrd++, s.sym, s.side, OrderType::Limit,
                                   TimeInForce::Day, toFixedPrice(s.price), s.qty, seedSession);
    }
    oeGateway->clearReports();

    std::cout << "Initial orders seeded.\n\n";

    // --Run AI trading rounds -------------------------------------
    std::cout << "Running AI trading... (Ctrl+C to stop)\n";
    std::cout << "FIX clients can connect to localhost:9001\n\n";

    int round = 0;
    while (g_running) {
        aiTrader->onCallback();
        round++;

        if (round % 10 == 0) {
            std::cout << "--Round " << round << " --\n";

            auto leaderboard = chActor->getLeaderboard();
            std::cout << "  Leaderboard:\n";
            for (int i = 0; i < std::min(5, static_cast<int>(leaderboard.size())); ++i) {
                auto& e = leaderboard[i];
                std::cout << "    " << e.name
                          << "  Capital=" << static_cast<int>(e.capital)
                          << "  P&L=" << static_cast<int>(e.pnl)
                          << "  Trades=" << e.tradeCount << "\n";
            }

            auto printBBO = [&](SymbolIndex_t sym, const char* name) {
                auto* snap = mdActor->getSnapshot(sym);
                if (snap) {
                    std::cout << "  " << name << ":"
                              << " Bid=" << toDouble(snap->bestBid)
                              << " Ask=" << toDouble(snap->bestAsk)
                              << " Last=" << toDouble(snap->lastTradePrice)
                              << " Trades=" << snap->tradeCount << "\n";
                }
            };

            for (auto sym : allSymbols) {
                printBBO(sym, FIXAcceptorActor::symbolToString(sym).c_str());
            }

            if (fixGateway->isRunning()) {
                std::cout << "  FIX clients: " << fixGateway->clientCount() << "\n";
            }
            if (kafkaBus && kafkaBus->isConnected()) {
                std::cout << "  Kafka: orders=" << kafkaBus->orderCount()
                          << " trades=" << kafkaBus->tradeCount()
                          << " md=" << kafkaBus->mdCount() << "\n";
            }
            std::cout << "\n";
        }

        std::this_thread::sleep_for(std::chrono::seconds(3));
    }

    // --Shutdown --------------------------------------------------
    std::cout << "\nShutting down...\n";
    fixGateway->stop();

    std::cout << "\n--Final Market Data ---------------------\n";
    for (auto sym : allSymbols) {
        auto* snap = mdActor->getSnapshot(sym);
        if (snap) {
            std::cout << "  " << FIXAcceptorActor::symbolToString(sym) << ": "
                      << snap->tradeCount << " trades, last=" << toDouble(snap->lastTradePrice) << "\n";
        }
    }

    std::cout << "\n--Final Leaderboard ---------------------\n";
    auto lb = chActor->getLeaderboard();
    for (auto& e : lb) {
        std::cout << "  " << e.name
                  << "  Capital=" << static_cast<int>(e.capital)
                  << "  P&L=" << static_cast<int>(e.pnl)
                  << "  Trades=" << e.tradeCount
                  << "  Holdings=" << e.holdingCount << "\n";
    }

    std::cout << "\nTrades processed: " << mdActor->getRecentTrades().size() << "\n";

    if (kafkaBus) {
        kafkaBus->flush();
        std::cout << "Kafka totals: orders=" << kafkaBus->orderCount()
                  << " trades=" << kafkaBus->tradeCount()
                  << " md=" << kafkaBus->mdCount() << "\n";
    }

    std::cout << "===========================================\n";
    std::cout << "  Engine stopped.\n";
    return 0;
}