EuNEx / dashboard /app.py
RayMelius's picture
Fix C++ engine buy/sell and add Optiq actor stages to Message Flow
8f9591b
"""
EuNEx Dashboard β€” Web GUI for the matching engine.
Provides:
- Real-time order book view (bids/asks per symbol)
- Recent trades table with SQLite persistence
- Market data snapshots (BBO, last price, volume)
- Order submission (new, cancel, amend)
- OHLCV chart data (1h, 8h, 1d, 1w)
- Session controls (start/suspend/resume)
- SSE streaming for live updates
- Clearing House integration
- AI Analyst (Ollama/Groq/HuggingFace)
- Developer Message Flow Visualizer
Run: python dashboard/app.py
"""
import json
import time
import queue
import threading
import random
import sys
import os
import signal
import subprocess
import urllib.request
import urllib.error
from collections import deque
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from flask import Flask, render_template, request, jsonify, Response
from dashboard.database import (
init_db, save_order, save_trade, record_ohlcv,
get_ohlcv, get_recent_orders, get_recent_trades, get_active_orders,
save_daily_close, get_last_closing_prices, get_daily_closes,
)
from datetime import date
import socket
from shared.config import SYMBOLS, DASHBOARD_PORT, DASHBOARD_DB, CH_URL, FIX_PORT, SIM_INTERVAL, SIM_ORDERS_PER_ROUND
app = Flask(__name__)
# ── In-memory state ─────────────────────────────────────────────────
state_lock = threading.Lock()
symbols = SYMBOLS
orders = []
trades = []
snapshots = {}
session_status = "idle"
next_order_id = 1000
db_path = DASHBOARD_DB
sse_clients = []
# ── Matching engine bridge ──────────────────────────────────────────
class MatchingEngine:
def __init__(self):
self.books = {}
for sym_id in symbols:
self.books[sym_id] = {"bids": [], "asks": []}
def submit_order(self, symbol_id, side, order_type, price, quantity,
tif="Day", source="dashboard", cl_ord_id=""):
global next_order_id
oid = next_order_id
next_order_id += 1
order = {
"orderId": oid,
"clOrdId": cl_ord_id or f"D-{oid}",
"symbolIdx": symbol_id,
"symbol": symbols.get(symbol_id, {}).get("name", "???"),
"side": side,
"orderType": order_type,
"price": price,
"quantity": quantity,
"remainingQty": quantity,
"status": "New",
"tif": tif,
"source": source,
"timestamp": time.time(),
}
with state_lock:
matched_trades = self._match(order)
if order["remainingQty"] > 0 and order["status"] not in ("Cancelled", "Rejected"):
if order_type == "Market" or tif == "IOC":
order["status"] = "Cancelled"
elif tif == "FOK":
order["status"] = "Rejected"
else:
orders.append(order)
self._insert_resting(order)
elif order["remainingQty"] == 0:
order["status"] = "Filled"
self._update_snapshot(symbol_id)
save_order(db_path, order)
broadcast_event("order", order)
for t in matched_trades:
save_trade(db_path, t)
record_ohlcv(db_path, t["symbol"], t["price"], t["quantity"])
broadcast_event("trade", t)
broadcast_event("snapshot", snapshots.get(symbol_id, {}))
return order
def cancel_order(self, order_id):
with state_lock:
for o in orders:
if o["orderId"] == order_id and o["status"] in ("New", "PartiallyFilled"):
o["status"] = "Cancelled"
self._remove_from_book(o)
self._update_snapshot(o["symbolIdx"])
save_order(db_path, o)
broadcast_event("order", o)
return o
return None
def amend_order(self, order_id, new_price=None, new_quantity=None):
with state_lock:
for o in orders:
if o["orderId"] == order_id and o["status"] in ("New", "PartiallyFilled"):
self._remove_from_book(o)
if new_price is not None:
o["price"] = new_price
if new_quantity is not None:
o["quantity"] = new_quantity
o["remainingQty"] = new_quantity
self._insert_resting(o)
self._update_snapshot(o["symbolIdx"])
save_order(db_path, o)
broadcast_event("order", o)
return o
return None
def _match(self, incoming):
matched = []
sym = incoming["symbolIdx"]
book = self.books[sym]
if incoming["side"] == "Buy":
opposite = book["asks"]
else:
opposite = book["bids"]
if incoming["tif"] == "FOK":
available = 0
for level in opposite:
if incoming["side"] == "Buy":
if incoming["orderType"] == "Market" or level["price"] <= incoming["price"]:
available += level["remainingQty"]
else:
if incoming["orderType"] == "Market" or level["price"] >= incoming["price"]:
available += level["remainingQty"]
if available < incoming["quantity"]:
incoming["status"] = "Rejected"
return []
i = 0
while i < len(opposite) and incoming["remainingQty"] > 0:
resting = opposite[i]
price_ok = incoming["orderType"] == "Market"
if not price_ok:
if incoming["side"] == "Buy":
price_ok = resting["price"] <= incoming["price"]
else:
price_ok = resting["price"] >= incoming["price"]
if not price_ok:
break
fill_qty = min(incoming["remainingQty"], resting["remainingQty"])
fill_price = resting["price"]
incoming["remainingQty"] -= fill_qty
resting["remainingQty"] -= fill_qty
if incoming["remainingQty"] > 0:
incoming["status"] = "PartiallyFilled"
else:
incoming["status"] = "Filled"
if resting["remainingQty"] == 0:
resting["status"] = "Filled"
opposite.pop(i)
for o in orders:
if o["orderId"] == resting["orderId"]:
o["status"] = "Filled"
o["remainingQty"] = 0
save_order(db_path, o)
broadcast_event("order", o)
break
else:
resting["status"] = "PartiallyFilled"
for o in orders:
if o["orderId"] == resting["orderId"]:
o["status"] = "PartiallyFilled"
o["remainingQty"] = resting["remainingQty"]
save_order(db_path, o)
broadcast_event("order", o)
break
i += 1
trade = {
"tradeId": len(trades) + 1,
"symbolIdx": sym,
"symbol": symbols.get(sym, {}).get("name", "???"),
"price": fill_price,
"quantity": fill_qty,
"buyOrderId": incoming["orderId"] if incoming["side"] == "Buy" else resting["orderId"],
"sellOrderId": resting["orderId"] if incoming["side"] == "Buy" else incoming["orderId"],
"timestamp": time.time(),
}
trades.append(trade)
matched.append(trade)
return matched
def _insert_resting(self, order):
sym = order["symbolIdx"]
book = self.books[sym]
entry = {"orderId": order["orderId"], "price": order["price"],
"remainingQty": order["remainingQty"], "side": order["side"]}
if order["side"] == "Buy":
book["bids"].append(entry)
book["bids"].sort(key=lambda x: -x["price"])
else:
book["asks"].append(entry)
book["asks"].sort(key=lambda x: x["price"])
def _remove_from_book(self, order):
sym = order["symbolIdx"]
book = self.books[sym]
side_list = book["bids"] if order["side"] == "Buy" else book["asks"]
for i, e in enumerate(side_list):
if e["orderId"] == order["orderId"]:
side_list.pop(i)
break
def _update_snapshot(self, sym_id):
book = self.books[sym_id]
sym_trades = [t for t in trades if t["symbolIdx"] == sym_id]
snap = {
"symbolIdx": sym_id,
"symbol": symbols.get(sym_id, {}).get("name", "???"),
"segment": symbols.get(sym_id, {}).get("segment", ""),
"bestBid": book["bids"][0]["price"] if book["bids"] else 0,
"bestAsk": book["asks"][0]["price"] if book["asks"] else 0,
"bidDepth": len(book["bids"]),
"askDepth": len(book["asks"]),
"lastPrice": sym_trades[-1]["price"] if sym_trades else 0,
"lastQty": sym_trades[-1]["quantity"] if sym_trades else 0,
"tradeCount": len(sym_trades),
"volume": sum(t["quantity"] for t in sym_trades),
"bids": [{"price": b["price"], "qty": b["remainingQty"]} for b in book["bids"][:10]],
"asks": [{"price": a["price"], "qty": a["remainingQty"]} for a in book["asks"][:10]],
}
snapshots[sym_id] = snap
engine = MatchingEngine()
# ── C++ Engine Bridge (FIX 4.4 TCP) ──────────────────────────────────
engine_mode = "python" # "python" or "cpp"
SOH = "\x01"
class CppEngineBridge:
def __init__(self, host="localhost", port=FIX_PORT):
self.host = host
self.port = port
self._sock = None
self._lock = threading.RLock()
self._seq = 1
self._recv_thread = None
self._connected = False
self._pending = {}
def connect(self):
with self._lock:
if self._connected:
return True
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(5)
self._sock.connect((self.host, self.port))
self._sock.settimeout(None)
self._connected = True
self._seq = 1
self._send_logon()
self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
self._recv_thread.start()
print(f"[CPP] Connected to C++ engine at {self.host}:{self.port}")
return True
except Exception as e:
print(f"[CPP] Connection failed: {e}")
self._connected = False
self._sock = None
return False
def disconnect(self):
with self._lock:
if self._sock:
try:
self._send_logout()
self._sock.close()
except Exception:
pass
self._sock = None
self._connected = False
print("[CPP] Disconnected from C++ engine")
def is_healthy(self):
if self._connected:
return True
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2)
s.connect((self.host, self.port))
s.close()
return True
except Exception:
return False
def submit_order(self, symbol_id, side, order_type, price, quantity,
tif="Day", source="dashboard", cl_ord_id=""):
global next_order_id
oid = next_order_id
next_order_id += 1
cl_ord_id = cl_ord_id or str(oid)
sym_name = symbols.get(symbol_id, {}).get("name", "???")
fix_side = "1" if side == "Buy" else "2"
fix_type = "1" if order_type == "Market" else "2"
fix_tif = {"Day": "0", "GTC": "1", "IOC": "3", "FOK": "4"}.get(tif, "0")
fields = {
35: "D",
49: f"DASHBOARD-{source}",
56: "EUNEX",
11: cl_ord_id,
55: sym_name,
54: fix_side,
40: fix_type,
44: f"{price:.2f}",
38: str(quantity),
59: fix_tif,
}
order = {
"orderId": oid,
"clOrdId": cl_ord_id,
"symbolIdx": symbol_id,
"symbol": sym_name,
"side": side,
"orderType": order_type,
"price": price,
"quantity": quantity,
"remainingQty": quantity,
"status": "New",
"tif": tif,
"source": source,
"timestamp": time.time(),
}
if self._connected:
self._pending[cl_ord_id] = order
log_message("OEG", f"[C++] NewOrder {sym_name} {side} {quantity}@{price:.2f} β†’ FIXAcceptor", order_id=oid)
log_message("Book", f"[C++] 35=D clOrdId={cl_ord_id} β†’ OEGActor β†’ MECoreActor", order_id=oid)
self._send_fix(fields)
else:
order["status"] = "Rejected"
order["remainingQty"] = quantity
log_message("OEG", f"[C++] NewOrder REJECTED β€” not connected", order_id=oid)
with state_lock:
orders.append(order)
save_order(db_path, order)
broadcast_event("order", order)
return order
def cancel_order(self, order_id):
with state_lock:
target = None
for o in orders:
if o["orderId"] == order_id and o["status"] in ("New", "PartiallyFilled"):
target = o
break
if not target:
return None
fields = {
35: "F",
49: "DASHBOARD",
56: "EUNEX",
11: f"CXL-{order_id}-{int(time.time()*1000)}",
41: target["clOrdId"],
55: target["symbol"],
54: "1" if target["side"] == "Buy" else "2",
38: str(target["remainingQty"]),
}
if self._connected:
self._send_fix(fields)
with state_lock:
target["status"] = "Cancelled"
save_order(db_path, target)
broadcast_event("order", target)
return target
def amend_order(self, order_id, new_price=None, new_quantity=None):
with state_lock:
target = None
for o in orders:
if o["orderId"] == order_id and o["status"] in ("New", "PartiallyFilled"):
target = o
break
if not target:
return None
fields = {
35: "G",
49: "DASHBOARD",
56: "EUNEX",
11: f"AMD-{order_id}-{int(time.time()*1000)}",
41: target["clOrdId"],
55: target["symbol"],
54: "1" if target["side"] == "Buy" else "2",
44: f"{(new_price if new_price is not None else target['price']):.2f}",
38: str(new_quantity if new_quantity is not None else target["quantity"]),
}
if self._connected:
self._send_fix(fields)
with state_lock:
if new_price is not None:
target["price"] = new_price
if new_quantity is not None:
target["quantity"] = new_quantity
target["remainingQty"] = new_quantity
save_order(db_path, target)
broadcast_event("order", target)
return target
def _send_fix(self, fields):
fields[34] = str(self._seq)
self._seq += 1
fields[52] = time.strftime("%Y%m%d-%H:%M:%S")
body_parts = []
for tag, val in fields.items():
if tag not in (8, 9, 10):
body_parts.append(f"{tag}={val}")
body = SOH.join(body_parts)
header = f"8=FIX.4.4{SOH}9={len(body) + 1}{SOH}"
msg = header + body + SOH
checksum = sum(ord(c) for c in msg) % 256
raw = msg + f"10={checksum:03d}{SOH}"
with self._lock:
try:
self._sock.sendall(raw.encode())
except Exception as e:
print(f"[CPP] Send error: {e}")
self._connected = False
def _send_logon(self):
self._send_fix({35: "A", 49: "DASHBOARD", 56: "EUNEX", 98: "0", 108: "30"})
def _send_logout(self):
self._send_fix({35: "5", 49: "DASHBOARD", 56: "EUNEX"})
def _recv_loop(self):
buf = ""
while self._connected:
try:
data = self._sock.recv(4096)
if not data:
break
buf += data.decode("ascii", errors="replace")
while "10=" in buf:
end = buf.find(SOH, buf.find("10="))
if end < 0:
break
msg_raw = buf[:end + 1]
buf = buf[end + 1:]
self._handle_fix_msg(msg_raw)
except Exception as e:
if self._connected:
print(f"[CPP] Recv error: {e}")
break
self._connected = False
def _handle_fix_msg(self, raw):
fields = {}
for pair in raw.split(SOH):
if "=" in pair:
tag, val = pair.split("=", 1)
try:
fields[int(tag)] = val
except ValueError:
pass
msg_type = fields.get(35)
if msg_type == "8": # ExecutionReport
self._handle_exec_report(fields)
elif msg_type == "0": # Heartbeat
pass
def _handle_exec_report(self, fields):
cl_ord_id = fields.get(11, "")
exec_type = fields.get(150, "")
ord_status = fields.get(39, "")
last_px = float(fields.get(31, 0))
last_qty = int(fields.get(32, 0))
leaves_qty = int(fields.get(151, 0))
order = self._pending.get(cl_ord_id)
if not order:
return
symbol = order.get("symbol", "?")
side = order.get("side", "?")
oid = order.get("orderId", "?")
order["remainingQty"] = leaves_qty
if exec_type == "F" or ord_status == "2":
if leaves_qty == 0:
order["status"] = "Filled"
else:
order["status"] = "PartiallyFilled"
elif exec_type == "0" or ord_status == "0":
order["status"] = "New"
elif exec_type == "4" or ord_status == "4":
order["status"] = "Cancelled"
elif exec_type == "8" or ord_status == "8":
order["status"] = "Rejected"
save_order(db_path, order)
broadcast_event("order", order)
log_message("MECore", f"[C++] ExecReport Order#{oid} β†’ {order['status']}", order_id=oid)
if exec_type == "F" and last_qty > 0:
sym_id = order.get("symbolIdx", 0)
trade = {
"tradeId": len(trades) + 1,
"symbolIdx": sym_id,
"symbol": symbol,
"price": last_px,
"quantity": last_qty,
"buyOrderId": oid if side == "Buy" else 0,
"sellOrderId": oid if side == "Sell" else 0,
"timestamp": time.time(),
"source": "cpp-engine",
}
with state_lock:
trades.append(trade)
save_trade(db_path, trade)
record_ohlcv(db_path, symbol, last_px, last_qty)
log_message("Match", f"[C++] Trade#{trade['tradeId']} {symbol} {last_qty}@{last_px:.2f}", order_id=oid)
log_message("Trade", f"[C++] Trade#{trade['tradeId']} persisted", order_id=oid, trade_id=trade["tradeId"])
log_message("DB", f"[C++] Trade#{trade['tradeId']} β†’ SQLite", trade_id=trade["tradeId"])
log_message("CH", f"[C++] Trade#{trade['tradeId']} β†’ clearing", order_id=oid, trade_id=trade["tradeId"])
broadcast_event("trade", trade)
if sym_id:
engine._update_snapshot(sym_id)
broadcast_event("snapshot", snapshots.get(sym_id, {}))
cpp_bridge = CppEngineBridge()
# ── C++ engine process management ──────────────────────────────────
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
_PID_DIR = os.path.join(_PROJECT_ROOT, ".pids")
_EUNEX_ME_PATHS = [
os.path.join(_PROJECT_ROOT, "eunex_me"),
os.path.join(_PROJECT_ROOT, "build", "Release", "eunex_me"),
os.path.join(_PROJECT_ROOT, "build", "eunex_me"),
"/app/eunex_me",
]
_cpp_process = None
_cpp_process_lock = threading.Lock()
def _read_log_tail(path, lines=20):
try:
with open(path, "r") as f:
all_lines = f.readlines()
return "".join(all_lines[-lines:]).strip()
except Exception:
return "(no log output)"
def _find_eunex_me():
for p in _EUNEX_ME_PATHS:
if os.path.isfile(p) and os.access(p, os.X_OK):
return p
return None
def _kill_pid_file(name):
pidfile = os.path.join(_PID_DIR, f"{name}.pid")
if not os.path.isfile(pidfile):
return
try:
pid = int(open(pidfile).read().strip())
os.kill(pid, signal.SIGTERM)
for _ in range(20):
time.sleep(0.25)
try:
os.kill(pid, 0)
except OSError:
break
print(f"[ENGINE] Stopped {name} (PID {pid})")
except (ValueError, OSError):
pass
try:
os.remove(pidfile)
except OSError:
pass
def _start_cpp_engine():
global _cpp_process
binary = _find_eunex_me()
if not binary:
return None, "eunex_me binary not found"
_kill_pid_file("fix_gateway")
time.sleep(0.5)
with _cpp_process_lock:
if _cpp_process and _cpp_process.poll() is None:
return _cpp_process, None
log_path = os.path.join(_PROJECT_ROOT, "logs", "eunex_me.log")
os.makedirs(os.path.dirname(log_path), exist_ok=True)
log_f = open(log_path, "w")
_cpp_process = subprocess.Popen(
[binary],
stdout=log_f, stderr=subprocess.STDOUT,
cwd=_PROJECT_ROOT,
)
print(f"[ENGINE] Started eunex_me (PID {_cpp_process.pid}) β€” log: {log_path}")
for i in range(30):
time.sleep(0.5)
rc = _cpp_process.poll()
if rc is not None:
log_tail = _read_log_tail(log_path, 20)
_cpp_process = None
_restart_fix_gateway()
return None, f"eunex_me exited with code {rc}\n\n{log_tail}"
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
s.connect(("localhost", FIX_PORT))
s.close()
print(f"[ENGINE] eunex_me ready on port {FIX_PORT}")
return _cpp_process, None
except OSError:
pass
log_tail = _read_log_tail(log_path, 20)
_stop_cpp_engine()
_restart_fix_gateway()
return None, f"eunex_me started but not listening within 15s\n\n{log_tail}"
def _stop_cpp_engine():
global _cpp_process
with _cpp_process_lock:
if _cpp_process and _cpp_process.poll() is None:
_cpp_process.terminate()
try:
_cpp_process.wait(timeout=5)
except subprocess.TimeoutExpired:
_cpp_process.kill()
print(f"[ENGINE] Stopped eunex_me (PID {_cpp_process.pid})")
_cpp_process = None
def _restart_fix_gateway():
python = sys.executable
fix_script = os.path.join(_PROJECT_ROOT, "fix_gateway", "fix_server.py")
if not os.path.isfile(fix_script):
return
log_path = os.path.join(_PROJECT_ROOT, "logs", "fix_gateway.log")
os.makedirs(os.path.dirname(log_path), exist_ok=True)
log_f = open(log_path, "w")
proc = subprocess.Popen(
[python, fix_script],
stdout=log_f, stderr=subprocess.STDOUT,
cwd=_PROJECT_ROOT,
)
os.makedirs(_PID_DIR, exist_ok=True)
with open(os.path.join(_PID_DIR, "fix_gateway.pid"), "w") as f:
f.write(str(proc.pid))
print(f"[ENGINE] Restarted fix_gateway (PID {proc.pid})")
# ── SSE broadcasting ────────────────────────────────────────────────
def broadcast_event(event_type, data):
msg = f"event: {event_type}\ndata: {json.dumps(data, default=str)}\n\n"
dead = []
for i, q in enumerate(sse_clients):
try:
q.put_nowait(msg)
except queue.Full:
dead.append(i)
for i in reversed(dead):
sse_clients.pop(i)
# ── Routes ──────────────────────────────────────────────────────────
@app.route("/")
def index():
return render_template("index.html")
@app.route("/health")
def health():
return jsonify({"status": "ok", "session": session_status,
"orders": len(orders), "trades": len(trades),
"engine_mode": engine_mode})
@app.route("/engine/mode", methods=["GET", "POST"])
def engine_mode_route():
global engine_mode
if request.method == "POST":
d = request.json
mode = d.get("mode", "python")
if mode not in ("python", "cpp"):
return jsonify({"error": "Invalid mode"}), 400
if mode == "cpp":
if not _find_eunex_me():
return jsonify({"error": "eunex_me binary not found",
"hint": "Build it: cd build && cmake .. && cmake --build ."}), 400
proc, err = _start_cpp_engine()
if err:
return jsonify({"error": err}), 503
if not cpp_bridge.connect():
_stop_cpp_engine()
_restart_fix_gateway()
return jsonify({"error": "C++ engine started but FIX handshake failed",
"host": cpp_bridge.host, "port": cpp_bridge.port}), 503
else:
cpp_bridge.disconnect()
_stop_cpp_engine()
_restart_fix_gateway()
engine_mode = mode
broadcast_event("engine_mode", {"mode": engine_mode})
print(f"[ENGINE] Switched to {mode.upper()} engine")
return jsonify({"mode": engine_mode})
return jsonify({"mode": engine_mode,
"cpp_available": _find_eunex_me() is not None,
"cpp_binary": _find_eunex_me(),
"cpp_host": cpp_bridge.host,
"cpp_port": cpp_bridge.port})
@app.route("/engine/log")
def engine_log():
log_path = os.path.join(_PROJECT_ROOT, "logs", "eunex_me.log")
tail = _read_log_tail(log_path, 50)
return Response(tail or "(no log)", mimetype="text/plain")
@app.route("/data")
def data():
with state_lock:
return jsonify({
"orders": orders[-50:],
"trades": trades[-100:],
"snapshots": {k: v for k, v in snapshots.items()},
"symbols": {k: v for k, v in symbols.items()},
"session": session_status,
"engine_mode": engine_mode,
})
@app.route("/stream")
def stream():
q = queue.Queue(maxsize=200)
sse_clients.append(q)
def generate():
try:
while True:
try:
msg = q.get(timeout=30)
yield msg
except queue.Empty:
yield "event: ping\ndata: {}\n\n"
except GeneratorExit:
if q in sse_clients:
sse_clients.remove(q)
return Response(generate(), mimetype="text/event-stream",
headers={"Cache-Control": "no-cache",
"X-Accel-Buffering": "no"})
def _active_engine():
return cpp_bridge if engine_mode == "cpp" else engine
@app.route("/order/new", methods=["POST"])
def new_order():
d = request.json
active = _active_engine()
order = active.submit_order(
symbol_id=int(d["symbolIdx"]),
side=d["side"],
order_type=d.get("orderType", "Limit"),
price=float(d.get("price", 0)),
quantity=int(d["quantity"]),
tif=d.get("tif", "Day"),
source=d.get("source", "dashboard"),
cl_ord_id=d.get("clOrdId", ""),
)
return jsonify(order)
@app.route("/order/cancel", methods=["POST"])
def cancel_order():
d = request.json
result = _active_engine().cancel_order(int(d["orderId"]))
if result:
return jsonify(result)
return jsonify({"error": "Order not found or not cancellable"}), 404
@app.route("/order/amend", methods=["POST"])
def amend_order():
d = request.json
result = _active_engine().amend_order(
order_id=int(d["orderId"]),
new_price=float(d["price"]) if "price" in d else None,
new_quantity=int(d["quantity"]) if "quantity" in d else None,
)
if result:
return jsonify(result)
return jsonify({"error": "Order not found or not amendable"}), 404
@app.route("/orderbook/<int:symbol_id>")
def orderbook(symbol_id):
with state_lock:
snap = snapshots.get(symbol_id, {})
return jsonify(snap)
@app.route("/chart/<symbol>")
def chart_data(symbol):
period = request.args.get("period", "1h")
period_map = {"1h": 3600, "8h": 28800, "1d": 86400, "1w": 604800}
seconds = period_map.get(period, 3600)
bars = get_ohlcv(db_path, symbol, seconds)
return jsonify(bars)
@app.route("/history/orders")
def history_orders():
limit = int(request.args.get("limit", 100))
rows = get_recent_orders(db_path, limit)
return jsonify(rows)
@app.route("/history/trades")
def history_trades():
limit = int(request.args.get("limit", 200))
rows = get_recent_trades(db_path, limit)
return jsonify(rows)
@app.route("/history/daily/<symbol>")
def history_daily(symbol):
limit = int(request.args.get("limit", 30))
rows = get_daily_closes(db_path, symbol, limit)
return jsonify(rows)
@app.route("/ch/leaderboard")
def ch_leaderboard():
try:
req = urllib.request.Request(f"{CH_URL}/api/leaderboard")
with urllib.request.urlopen(req, timeout=3) as resp:
return jsonify(json.loads(resp.read()))
except Exception:
return jsonify([])
# ── Session controls ────────────────────────────────────────────────
@app.route("/session/start", methods=["POST"])
def session_start():
global session_status
session_status = "active"
broadcast_event("session", {"status": session_status})
_notify_ch("start")
_seed_initial_orders()
return jsonify({"status": session_status})
@app.route("/session/suspend", methods=["POST"])
def session_suspend():
global session_status
session_status = "suspended"
broadcast_event("session", {"status": session_status})
_notify_ch("suspend")
return jsonify({"status": session_status})
@app.route("/session/resume", methods=["POST"])
def session_resume():
global session_status
session_status = "active"
broadcast_event("session", {"status": session_status})
_notify_ch("resume")
return jsonify({"status": session_status})
@app.route("/session/end", methods=["POST"])
def session_end():
global session_status
_save_closing_prices()
session_status = "idle"
broadcast_event("session", {"status": session_status})
_notify_ch("stop")
return jsonify({"status": session_status})
@app.route("/session/status")
def session_status_route():
return jsonify({"status": session_status})
def _seed_initial_orders():
prev_closes = get_last_closing_prices(db_path)
for sym_id, info in symbols.items():
sym_name = info["name"]
if sym_name in prev_closes:
ref = prev_closes[sym_name]["close_price"]
else:
ref = info["startPrice"]
for offset, qty in [(-0.50, 150), (-1.00, 100), (0.50, 200), (1.00, 100)]:
side = "Buy" if offset < 0 else "Sell"
engine.submit_order(
symbol_id=sym_id, side=side, order_type="Limit",
price=round(ref + offset, 2), quantity=qty,
tif="Day", source="seed",
)
def _save_closing_prices():
today = date.today().isoformat()
with state_lock:
for sym_id, info in symbols.items():
snap = snapshots.get(sym_id, {})
sym_trades = [t for t in trades if t["symbolIdx"] == sym_id]
close_price = snap.get("lastPrice", 0) or info["startPrice"]
bid = snap.get("bestBid", 0)
ask = snap.get("bestAsk", 0)
volume = sum(t["quantity"] for t in sym_trades)
trade_count = len(sym_trades)
save_daily_close(db_path, info["name"], today,
close_price, bid, ask, volume, trade_count)
def _notify_ch(action):
try:
data = json.dumps({"action": action}).encode()
req = urllib.request.Request(
f"{CH_URL}/api/control",
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
urllib.request.urlopen(req, timeout=2)
except Exception:
pass
# ── Market Simulation ──────────────────────────────────────────────
class MarketSimulator:
def __init__(self, engine_ref, interval=SIM_INTERVAL, orders_per_round=SIM_ORDERS_PER_ROUND):
self.engine = engine_ref
self.interval = interval
self.orders_per_round = orders_per_round
self._thread = None
self._running = False
self._ref_prices = {sid: info["startPrice"] for sid, info in symbols.items()}
self._load_closing_refs()
def start(self):
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._running = False
def _run(self):
while self._running:
if session_status == "active":
self._round()
time.sleep(self.interval)
def _round(self):
sym_ids = list(symbols.keys())
random.shuffle(sym_ids)
for sym_id in sym_ids:
ref = self._current_ref(sym_id)
for _ in range(self.orders_per_round):
side = random.choice(["Buy", "Sell"])
spread_pct = random.uniform(-0.005, 0.005)
price = round(ref * (1 + spread_pct), 2)
if price <= 0:
price = 0.01
qty = random.randint(10, 200)
self.engine.submit_order(
symbol_id=sym_id,
side=side,
order_type="Limit",
price=price,
quantity=qty,
tif="Day",
source="sim",
)
cross_side = random.choice(["Buy", "Sell"])
snap = snapshots.get(sym_id, {})
if cross_side == "Buy" and snap.get("bestAsk", 0) > 0:
cross_price = snap["bestAsk"] + 0.01
elif cross_side == "Sell" and snap.get("bestBid", 0) > 0:
cross_price = snap["bestBid"] - 0.01
else:
cross_price = ref
if cross_price <= 0:
cross_price = 0.01
cross_qty = random.randint(10, 50)
self.engine.submit_order(
symbol_id=sym_id,
side=cross_side,
order_type="Limit",
price=round(cross_price, 2),
quantity=cross_qty,
tif="Day",
source="sim",
)
def _load_closing_refs(self):
try:
prev = get_last_closing_prices(db_path)
for sid, info in symbols.items():
if info["name"] in prev:
self._ref_prices[sid] = prev[info["name"]]["close_price"]
except Exception:
pass
def _current_ref(self, sym_id):
snap = snapshots.get(sym_id, {})
bid = snap.get("bestBid", 0)
ask = snap.get("bestAsk", 0)
if bid > 0 and ask > 0:
return (bid + ask) / 2
if snap.get("lastPrice", 0) > 0:
return snap["lastPrice"]
return self._ref_prices.get(sym_id, 100.0)
simulator = MarketSimulator(engine)
# ── AI Analyst ─────────────────────────────────────────────────────
OLLAMA_HOST = os.environ.get("OLLAMA_HOST", "http://localhost:11434")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.2:3b")
GROQ_API_KEY = os.environ.get("GROQ_API_KEY", "")
GROQ_MODEL = os.environ.get("GROQ_MODEL", "llama-3.1-8b-instant")
def _read_hf_token():
tok = os.environ.get("HF_TOKEN", "")
if tok:
return tok
for p in [
os.path.join(os.path.expanduser("~"), ".cache", "huggingface", "token"),
os.path.join(os.environ.get("HF_HOME", ""), "token"),
]:
if p and os.path.isfile(p):
try:
with open(p) as f:
tok = f.read().strip()
if tok:
print(f"[AI] HF_TOKEN loaded from {p}")
return tok
except Exception:
pass
return ""
HF_TOKEN = _read_hf_token()
HF_MODEL = os.environ.get("HF_MODEL", "Qwen/Qwen2.5-7B-Instruct")
ai_insights = deque(maxlen=20)
ai_provider = os.environ.get("LLM_PROVIDER", "auto")
ai_model_override = None
ai_generating = False
def _build_market_prompt():
now_str = time.strftime("%H:%M:%S")
session = session_status.upper()
trade_lines = []
with state_lock:
by_sym = {}
for t in trades[-200:]:
sym = t.get("symbol", "?")
by_sym.setdefault(sym, []).append(t)
for sym, ts in sorted(by_sym.items()):
prices = [t["price"] for t in ts if t.get("price")]
vol = sum(t.get("quantity", 0) for t in ts)
if prices:
trade_lines.append(
f" {sym}: {len(ts)} trade(s), range {min(prices):.2f}-{max(prices):.2f}, "
f"vol {vol}, last {prices[-1]:.2f}"
)
book_lines = []
for sid, snap in sorted(snapshots.items()):
bid = snap.get("bestBid", 0)
ask = snap.get("bestAsk", 0)
spread = ask - bid if bid > 0 and ask > 0 else 0
book_lines.append(
f" {snap.get('symbol','?')}: Bid {bid:.2f} / Ask {ask:.2f} (spread {spread:.2f})"
)
trades_text = "\n".join(trade_lines) if trade_lines else " No recent trades"
book_text = "\n".join(book_lines) if book_lines else " No order book data"
return (
"You are a concise financial market analyst for the EuNEx simulated exchange "
"(Euronext Optiq architecture). "
f"Time: {now_str} | Session: {session}\n\n"
f"Recent trades:\n{trades_text}\n\n"
f"Order book:\n{book_text}\n\n"
"In 3-4 sentences: activity level, notable price moves, market sentiment. "
"Plain prose, no headers, no bullet points."
)
def _try_ollama(prompt, model=None):
model = model or OLLAMA_MODEL
try:
data = json.dumps({
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
}).encode()
req = urllib.request.Request(
f"{OLLAMA_HOST}/api/chat",
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=120) as resp:
result = json.loads(resp.read())
return result.get("message", {}).get("content", "")
except Exception as e:
print(f"[AI] Ollama error: {e}")
return None
def _try_groq(prompt, model=None):
if not GROQ_API_KEY:
print("[AI] Groq skipped: no GROQ_API_KEY")
return None
model = model or GROQ_MODEL
try:
data = json.dumps({
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 300,
"temperature": 0.7,
}).encode()
req = urllib.request.Request(
"https://api.groq.com/openai/v1/chat/completions",
data=data,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {GROQ_API_KEY}",
},
)
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read())
return result["choices"][0]["message"]["content"]
except Exception as e:
print(f"[AI] Groq error: {e}")
return None
def _try_hf(prompt, model=None):
if not HF_TOKEN:
print("[AI] HuggingFace skipped: no HF_TOKEN")
return None
model = model or HF_MODEL
try:
data = json.dumps({
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 300,
"temperature": 0.7,
}).encode()
req = urllib.request.Request(
"https://router.huggingface.co/v1/chat/completions",
data=data,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {HF_TOKEN}",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=90) as resp:
result = json.loads(resp.read())
return result["choices"][0]["message"]["content"]
except Exception as e:
print(f"[AI] HuggingFace error: {e}")
return None
def _call_llm(prompt):
provider = ai_provider
model = ai_model_override
if provider == "ollama":
return _try_ollama(prompt, model), "ollama"
elif provider == "groq":
return _try_groq(prompt, model), "groq"
elif provider == "hf":
return _try_hf(prompt, model), "hf"
for name, func in [("ollama", _try_ollama), ("groq", _try_groq), ("hf", _try_hf)]:
text = func(prompt, model)
if text:
return text, name
return None, None
def _generate_insight():
global ai_generating
if ai_generating:
return
ai_generating = True
try:
prompt = _build_market_prompt()
text, source = _call_llm(prompt)
if text:
insight = {
"text": text.strip(),
"timestamp": time.time(),
"source": source or "unknown",
}
ai_insights.append(insight)
broadcast_event("ai_insight", insight)
else:
broadcast_event("ai_insight", {
"text": "LLM call failed. Check that Ollama is running, or set GROQ_API_KEY / HF_TOKEN.",
"timestamp": time.time(),
"source": "error",
})
except Exception as e:
print(f"[AI] Generate insight error: {e}")
broadcast_event("ai_insight", {
"text": f"Error: {e}",
"timestamp": time.time(),
"source": "error",
})
finally:
ai_generating = False
@app.route("/ai/generate", methods=["POST"])
def ai_generate():
threading.Thread(target=_generate_insight, daemon=True).start()
return jsonify({"status": "generating"})
@app.route("/ai/insights")
def ai_insights_list():
return jsonify(list(ai_insights))
@app.route("/ai/config")
def ai_config():
return jsonify({
"provider": ai_provider,
"model": ai_model_override,
"providers": {
"auto": {"label": "Auto (fallback)", "available": True},
"ollama": {"label": f"Ollama ({OLLAMA_MODEL})", "available": bool(OLLAMA_HOST)},
"groq": {"label": f"Groq ({GROQ_MODEL})", "available": bool(GROQ_API_KEY)},
"hf": {"label": f"HuggingFace ({HF_MODEL})", "available": bool(HF_TOKEN)},
},
})
@app.route("/ai/select", methods=["POST"])
def ai_select():
global ai_provider, ai_model_override
d = request.json
ai_provider = d.get("provider", "auto")
ai_model_override = d.get("model") or None
return jsonify({"provider": ai_provider, "model": ai_model_override})
@app.route("/ai/ollama/models")
def ai_ollama_models():
try:
req = urllib.request.Request(f"{OLLAMA_HOST}/api/tags", method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read())
models = [m["name"] for m in data.get("models", [])]
return jsonify({"models": models, "current": ai_model_override or OLLAMA_MODEL,
"status": "connected"})
except Exception as e:
return jsonify({"models": [], "current": OLLAMA_MODEL, "status": "disconnected",
"error": str(e)})
@app.route("/ai/ollama/health")
def ai_ollama_health():
try:
with urllib.request.urlopen(f"{OLLAMA_HOST}/api/tags", timeout=3) as resp:
data = json.loads(resp.read())
models = [m["name"] for m in data.get("models", [])]
return jsonify({"ok": True, "models": models})
except Exception:
return jsonify({"ok": False, "models": []})
# ── Developer Message Flow Log ─────────────────────────────────────
message_log = deque(maxlen=500)
def log_message(stage, detail, order_id=None, trade_id=None):
entry = {
"timestamp": time.time(),
"stage": stage,
"detail": detail,
"orderId": order_id,
"tradeId": trade_id,
}
message_log.append(entry)
broadcast_event("msgflow", entry)
@app.route("/dev/messages")
def dev_messages():
limit = int(request.args.get("limit", 100))
items = list(message_log)[-limit:]
return jsonify(items)
# Patch engine to log message flow
_orig_submit = engine.submit_order
def _traced_submit(symbol_id, side, order_type, price, quantity,
tif="Day", source="dashboard", cl_ord_id=""):
sym_name = symbols.get(symbol_id, {}).get("name", "?")
result = _orig_submit(symbol_id, side, order_type, price, quantity, tif, source, cl_ord_id)
oid = result.get("orderId", "?")
status = result.get("status", "?")
log_message("OEG", f"NewOrder {sym_name} {side} {quantity}@{price:.2f} [{source}]", order_id=oid)
log_message("Book", f"Order#{oid} β†’ {status}", order_id=oid)
if status == "Filled":
log_message("Match", f"Order#{oid} fully filled", order_id=oid)
elif status == "PartiallyFilled":
log_message("Match", f"Order#{oid} partial fill, rem={result.get('remainingQty', '?')}", order_id=oid)
return result
engine.submit_order = _traced_submit
_orig_cancel = engine.cancel_order
def _traced_cancel(order_id):
log_message("OEG", f"CancelOrder #{order_id}", order_id=order_id)
result = _orig_cancel(order_id)
if result:
log_message("Book", f"Order#{order_id} cancelled", order_id=order_id)
return result
engine.cancel_order = _traced_cancel
# Patch trade saving to log trade and clearing steps
_orig_broadcast = broadcast_event
def _traced_broadcast(event_type, data):
if event_type == "trade":
tid = data.get("tradeId", "?")
sym = data.get("symbol", "?")
buy_oid = data.get("buyOrderId", "?")
sell_oid = data.get("sellOrderId", "?")
log_message("Trade", f"Trade#{tid} {sym} {data.get('quantity',0)}@{data.get('price',0):.2f}",
order_id=buy_oid, trade_id=tid)
log_message("Trade", f"Trade#{tid} {sym} {data.get('quantity',0)}@{data.get('price',0):.2f}",
order_id=sell_oid, trade_id=tid)
log_message("DB", f"Trade#{tid} persisted to SQLite", trade_id=tid)
log_message("CH", f"Trade#{tid} β†’ clearing (buy#{buy_oid}, sell#{sell_oid})", trade_id=tid)
_orig_broadcast(event_type, data)
broadcast_event = _traced_broadcast
if __name__ == "__main__":
os.makedirs(os.path.dirname(db_path), exist_ok=True)
init_db(db_path)
simulator.start()
port = DASHBOARD_PORT
print(f"EuNEx Dashboard starting on http://localhost:{port}")
print(f" Database: {db_path}")
print(f" Simulation: every {SIM_INTERVAL}s, {SIM_ORDERS_PER_ROUND} orders/symbol/round")
print(f" AI Analyst: provider={ai_provider} (Ollama={OLLAMA_HOST})")
app.run(host="0.0.0.0", port=port, debug=True, threaded=True)