File size: 1,605 Bytes
8f4d2d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include "iaca/IacaAggregator.hpp"
#include <algorithm>

namespace eunex::iaca {

bool FragmentChain::checkComplete() const {
    for (auto& frag : fragments) {
        int childCount = 0;
        for (auto& other : fragments) {
            if (other.previousOrigin == frag.origin)
                ++childCount;
        }
        if (childCount != frag.nextCount)
            return false;
    }
    return !fragments.empty();
}

void IacaAggregator::addFragment(const IacaFragment& fragment) {
    auto& chain = chains_[fragment.chainId];
    chain.chainId = fragment.chainId;
    chain.fragments.push_back(fragment);
    tryComplete(fragment.chainId);
}

void IacaAggregator::registerHandler(std::shared_ptr<FragmentHandler> handler) {
    handlers_.push_back(std::move(handler));
}

void IacaAggregator::tryComplete(ChainId_t chainId) {
    auto it = chains_.find(chainId);
    if (it == chains_.end()) return;

    auto& chain = it->second;
    if (!chain.checkComplete()) return;

    chain.complete = true;
    ++completedCount_;

    for (auto& handler : handlers_) {
        if (handler->canProcess(chain)) {
            handler->process(chain);
        }
    }

    chains_.erase(it);
}

bool NewOrderHandler::canProcess(const FragmentChain& chain) const {
    return std::any_of(chain.fragments.begin(), chain.fragments.end(),
        [](const IacaFragment& f) {
            return f.causeId == CAUSE_NEW_ORDER_BUY || f.causeId == CAUSE_NEW_ORDER_SELL;
        });
}

void NewOrderHandler::process(const FragmentChain& chain) {
    if (callback_) callback_(chain);
}

} // namespace eunex::iaca