File size: 3,686 Bytes
44cddac
8f4d2d0
 
 
 
44cddac
8f4d2d0
a3b10ab
06cfa59
 
8f4d2d0
 
 
06cfa59
8f4d2d0
a3b10ab
 
 
8f4d2d0
 
 
 
 
44cddac
8f4d2d0
 
 
 
 
 
 
 
 
5759f0b
8f4d2d0
06cfa59
 
5759f0b
 
 
06cfa59
5759f0b
 
 
 
 
 
 
 
 
 
8f4d2d0
 
 
 
44cddac
8f4d2d0
 
 
 
 
 
 
 
 
 
 
44cddac
8f4d2d0
 
 
 
 
 
 
 
 
 
 
44cddac
8f4d2d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
06cfa59
 
 
 
 
 
 
 
8f4d2d0
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include "actors/MECoreActor.hpp"
#include <iostream>

namespace eunex {

MECoreActor::MECoreActor(SymbolIndex_t symbolIdx,
                               const tredzone::ActorId& oeGatewayId,
                               const tredzone::ActorId& marketDataId,
                               const tredzone::ActorId& clearingHouseId,
                               KafkaBus* kafkaBus)
    : book_(symbolIdx)
    , oePipe_(*this, oeGatewayId)
    , mdPipe_(*this, marketDataId)
    , kafka_(kafkaBus)
{
    if (clearingHouseId.id != 0) {
        chPipe_.emplace(*this, clearingHouseId);
    }
    registerEventHandler<NewOrderEvent>(*this);
    registerEventHandler<CancelOrderEvent>(*this);
    registerEventHandler<ModifyOrderEvent>(*this);
}

void MECoreActor::onEvent(const NewOrderEvent& event) {
    Order order{};
    order.clOrdId    = event.clOrdId;
    order.symbolIdx  = event.symbolIdx;
    order.side       = event.side;
    order.ordType    = event.ordType;
    order.tif        = event.tif;
    order.price      = event.price;
    order.quantity   = event.quantity;
    order.sessionId  = event.sessionId;
    order.stopPrice  = event.stopPrice;

    if (kafka_) kafka_->publishOrder(order);

    auto onTrade = [this](const Trade& trade) {
        mdPipe_.push<TradeEvent>(trade);
        if (chPipe_) chPipe_->push<TradeEvent>(trade);
        if (kafka_) kafka_->publishTrade(trade);
    };
    auto onExec = [this, &event](const ExecutionReport& rpt) {
        oePipe_.push<ExecReportEvent>(rpt, event.sessionId);
    };

    book_.newOrder(order, onTrade, onExec);

    if (book_.stopOrderCount() > 0 && book_.lastTradePrice() != 0) {
        book_.triggerStopOrders(book_.lastTradePrice(), onTrade, onExec);
    }

    publishBookUpdate();
}

void MECoreActor::onEvent(const CancelOrderEvent& event) {
    ExecutionReport rpt{};
    if (book_.cancelOrder(event.orderId, rpt)) {
        oePipe_.push<ExecReportEvent>(rpt, event.sessionId);
        publishBookUpdate();
    } else {
        ExecutionReport reject{event.orderId, event.origClOrdId,
                               OrderStatus::Rejected, 0, 0, 0, 0, 0};
        oePipe_.push<ExecReportEvent>(reject, event.sessionId);
    }
}

void MECoreActor::onEvent(const ModifyOrderEvent& event) {
    ExecutionReport rpt{};
    if (book_.modifyOrder(event.orderId, event.newPrice, event.newQuantity, rpt)) {
        oePipe_.push<ExecReportEvent>(rpt, event.sessionId);
        publishBookUpdate();
    } else {
        ExecutionReport reject{event.orderId, event.origClOrdId,
                               OrderStatus::Rejected, 0, 0, 0, 0, 0};
        oePipe_.push<ExecReportEvent>(reject, event.sessionId);
    }
}

void MECoreActor::publishBookUpdate() {
    BookUpdateEvent update;
    update.symbolIdx = book_.symbolIndex();

    auto bidLevels = book_.getBids(10);
    update.bidDepth = static_cast<int>(bidLevels.size());
    for (int i = 0; i < update.bidDepth; ++i) {
        update.bids[i] = {bidLevels[i].price, bidLevels[i].totalQty};
    }

    auto askLevels = book_.getAsks(10);
    update.askDepth = static_cast<int>(askLevels.size());
    for (int i = 0; i < update.askDepth; ++i) {
        update.asks[i] = {askLevels[i].price, askLevels[i].totalQty};
    }

    mdPipe_.push<BookUpdateEvent>(update);

    if (kafka_) {
        Price_t bb = bidLevels.empty() ? 0 : bidLevels[0].price;
        Price_t ba = askLevels.empty() ? 0 : askLevels[0].price;
        Quantity_t bq = bidLevels.empty() ? 0 : bidLevels[0].totalQty;
        Quantity_t aq = askLevels.empty() ? 0 : askLevels[0].totalQty;
        kafka_->publishMarketData(book_.symbolIndex(), bb, ba, bq, aq);
    }
}

} // namespace eunex