| import sys |
| sys.path.insert(0, "/app") |
|
|
| import time, json, threading, uuid |
| import quickfix as fix |
| import quickfix44 as fix44 |
|
|
| from shared.config import Config |
| from shared.kafka_utils import create_producer, create_consumer |
|
|
| |
| order_sessions = {} |
| order_sessions_lock = threading.Lock() |
|
|
| class Application(fix.Application): |
| def __init__(self): |
| super().__init__() |
| self.producer = create_producer(component_name="FIX-OEG") |
| self.sessions = {} |
|
|
| def onCreate(self, sessionID): |
| print("FIX-OEG onCreate:", sessionID) |
| self.sessions[sessionID] = sessionID |
|
|
| def onLogon(self, sessionID): |
| print("FIX-OEG onLogon:", sessionID) |
|
|
| def onLogout(self, sessionID): |
| print("FIX-OEG onLogout:", sessionID) |
| if sessionID in self.sessions: |
| del self.sessions[sessionID] |
|
|
| def toAdmin(self, message, sessionID): pass |
| def fromAdmin(self, message, sessionID): pass |
| def toApp(self, message, sessionID): pass |
|
|
| def fromApp(self, message, sessionID): |
| msgType = fix.MsgType() |
| message.getHeader().getField(msgType) |
| mtype = msgType.getValue() |
| if mtype == fix.MsgType_NewOrderSingle: |
| self.onNewOrderSingle(message, sessionID) |
| elif mtype == fix.MsgType_OrderCancelRequest: |
| self.onOrderCancelRequest(message, sessionID) |
| elif mtype == fix.MsgType_OrderCancelReplaceRequest: |
| self.onOrderCancelReplaceRequest(message, sessionID) |
| else: |
| print("FIX-OEG: Unsupported MsgType:", mtype) |
| self.sendReject(sessionID, message, f"Unsupported message type: {mtype}") |
|
|
| def validateNewOrderSingle(self, message): |
| """Validate required fields for NewOrderSingle. Returns (valid, error_msg).""" |
| errors = [] |
|
|
| def gf(tag, name): |
| try: |
| return message.getField(tag) |
| except Exception: |
| return None |
|
|
| |
| cl_ord_id = gf(11, "ClOrdID") |
| symbol = gf(55, "Symbol") |
| side = gf(54, "Side") |
| qty = gf(38, "OrderQty") |
|
|
| if not cl_ord_id: |
| errors.append("Missing required field ClOrdID (11)") |
| if not symbol: |
| errors.append("Missing required field Symbol (55)") |
| if not side: |
| errors.append("Missing required field Side (54)") |
| elif side not in ("1", "2"): |
| errors.append(f"Invalid Side (54): must be 1 (Buy) or 2 (Sell), got {side}") |
| if not qty: |
| errors.append("Missing required field OrderQty (38)") |
| else: |
| try: |
| qty_val = float(qty) |
| if qty_val <= 0: |
| errors.append(f"OrderQty (38) must be positive, got {qty_val}") |
| except ValueError: |
| errors.append(f"Invalid OrderQty (38): {qty}") |
|
|
| |
| ord_type = gf(40, "OrdType") |
| if ord_type == "2": |
| price = gf(44, "Price") |
| if not price: |
| errors.append("Missing Price (44) for limit order") |
| else: |
| try: |
| if float(price) <= 0: |
| errors.append(f"Price (44) must be positive, got {price}") |
| except ValueError: |
| errors.append(f"Invalid Price (44): {price}") |
|
|
| return (len(errors) == 0, "; ".join(errors)) |
|
|
| def sendReject(self, sessionID, original_msg, reason): |
| """Send a Reject message (35=3) for invalid messages.""" |
| reject = fix.Message() |
| reject.getHeader().setField(fix.MsgType(fix.MsgType_Reject)) |
| reject.setField(fix.RefSeqNum(0)) |
| reject.setField(fix.Text(reason)) |
| try: |
| fix.Session.sendToTarget(reject, sessionID) |
| print(f"FIX-OEG: Sent Reject: {reason}") |
| except Exception as e: |
| print(f"FIX-OEG: Failed to send Reject: {e}") |
|
|
| def sendExecutionReport(self, sessionID, order, exec_type, ord_status, |
| exec_qty=0, exec_price=0, cum_qty=0, leaves_qty=0): |
| """Send an ExecutionReport (35=8).""" |
| exec_report = fix.Message() |
| exec_report.getHeader().setField(fix.MsgType(fix.MsgType_ExecutionReport)) |
|
|
| |
| exec_report.setField(fix.OrderID(order.get('order_id', str(uuid.uuid4())))) |
| exec_report.setField(fix.ClOrdID(order.get('cl_ord_id', ''))) |
| exec_report.setField(fix.ExecID(str(uuid.uuid4()))) |
| exec_report.setField(fix.ExecType(exec_type)) |
| exec_report.setField(fix.OrdStatus(ord_status)) |
| exec_report.setField(fix.Symbol(order.get('symbol', ''))) |
| exec_report.setField(fix.Side(fix.Side_BUY if order.get('type') == 'buy' else fix.Side_SELL)) |
| exec_report.setField(fix.OrderQty(order.get('quantity', 0))) |
|
|
| if order.get('price'): |
| exec_report.setField(fix.Price(order.get('price'))) |
|
|
| |
| exec_report.setField(fix.LastShares(exec_qty)) |
| exec_report.setField(fix.LastPx(exec_price)) |
| exec_report.setField(fix.CumQty(cum_qty)) |
| exec_report.setField(fix.LeavesQty(leaves_qty)) |
| exec_report.setField(fix.AvgPx(exec_price if cum_qty > 0 else 0)) |
|
|
| try: |
| fix.Session.sendToTarget(exec_report, sessionID) |
| print(f"FIX-OEG: Sent ExecutionReport: ExecType={exec_type}, OrdStatus={ord_status}") |
| except Exception as e: |
| print(f"FIX-OEG: Failed to send ExecutionReport: {e}") |
|
|
| def onNewOrderSingle(self, message, sessionID): |
| |
| valid, error_msg = self.validateNewOrderSingle(message) |
| if not valid: |
| self.sendReject(sessionID, message, error_msg) |
| return |
|
|
| def gf(tag, default=None, cast=str): |
| try: |
| return cast(message.getField(tag)) |
| except Exception: |
| return default |
|
|
| cl_ord_id = gf(11) |
| symbol = gf(55, "FOO") |
| side_val = gf(54, "1") |
| qty = gf(38, 0.0, float) |
| price = gf(44, 0.0, float) |
|
|
| order_id = cl_ord_id or f"fix-{int(time.time()*1000)}" |
| order = { |
| "order_id": order_id, |
| "cl_ord_id": cl_ord_id, |
| "symbol": symbol, |
| "side": "BUY" if str(side_val) == "1" else "SELL", |
| "quantity": qty, |
| "price": price, |
| "timestamp": time.time(), |
| "source": "fix-oeg" |
| } |
|
|
| |
| with order_sessions_lock: |
| order_sessions[cl_ord_id] = (sessionID, order) |
|
|
| try: |
| meta = self.producer.send(Config.ORDERS_TOPIC, value=order).get(timeout=10) |
| print(f"📥 FIX → Kafka: {order}") |
| print(json.dumps({"component":"fix-oeg","event":"order_received","payload":{"order":order,"topic":meta.topic,"partition":meta.partition,"offset":meta.offset}})) |
|
|
| |
| self.sendExecutionReport( |
| sessionID, order, |
| exec_type=fix.ExecType_NEW, |
| ord_status=fix.OrdStatus_NEW, |
| leaves_qty=qty |
| ) |
| except Exception as e: |
| print(json.dumps({"component":"fix-oeg","event":"produce_failed","payload":{"order":order,"error":str(e)}})) |
|
|
| def onOrderCancelRequest(self, message, sessionID): |
| """Handle Order Cancel Request (35=F).""" |
| def gf(tag, default=None): |
| try: |
| return message.getField(tag) |
| except Exception: |
| return default |
|
|
| orig_cl_ord_id = gf(41) |
| cl_ord_id = gf(11) |
| symbol = gf(55) |
|
|
| if not orig_cl_ord_id: |
| self.sendReject(sessionID, message, "Missing OrigClOrdID (41)") |
| return |
|
|
| cancel_msg = { |
| "type": "cancel", |
| "orig_cl_ord_id": orig_cl_ord_id, |
| "cl_ord_id": cl_ord_id, |
| "symbol": symbol, |
| "timestamp": time.time(), |
| "source": "fix-oeg" |
| } |
|
|
| try: |
| self.producer.send(Config.ORDERS_TOPIC, value=cancel_msg).get(timeout=10) |
| print(f"📥 FIX Cancel → Kafka: {cancel_msg}") |
| except Exception as e: |
| print(f"FIX-OEG: Failed to send cancel: {e}") |
|
|
| def onOrderCancelReplaceRequest(self, message, sessionID): |
| """Handle Order Cancel/Replace Request (35=G) - amend order price/quantity.""" |
| def gf(tag, default=None, cast=str): |
| try: |
| return cast(message.getField(tag)) |
| except Exception: |
| return default |
|
|
| orig_cl_ord_id = gf(41) |
| cl_ord_id = gf(11) |
| symbol = gf(55) |
| side = gf(54) |
| new_qty = gf(38, 0.0, float) |
| new_price = gf(44, 0.0, float) |
|
|
| if not orig_cl_ord_id: |
| self.sendReject(sessionID, message, "Missing OrigClOrdID (41)") |
| return |
|
|
| amend_msg = { |
| "type": "amend", |
| "orig_cl_ord_id": orig_cl_ord_id, |
| "cl_ord_id": cl_ord_id, |
| "symbol": symbol, |
| "side": "buy" if str(side) == "1" else "sell", |
| "quantity": new_qty, |
| "price": new_price, |
| "timestamp": time.time(), |
| "source": "fix-oeg" |
| } |
|
|
| |
| with order_sessions_lock: |
| order_sessions[cl_ord_id] = (sessionID, amend_msg) |
|
|
| try: |
| self.producer.send(Config.ORDERS_TOPIC, value=amend_msg).get(timeout=10) |
| print(f"📥 FIX Amend → Kafka: {amend_msg}") |
| except Exception as e: |
| print(f"FIX-OEG: Failed to send amend: {e}") |
|
|
|
|
| def start_trades_consumer(app): |
| """Consume trades from Kafka and send execution reports to FIX clients.""" |
| try: |
| consumer = create_consumer( |
| topics=[Config.TRADES_TOPIC], |
| group_id="fix-oeg-execreports", |
| component_name="FIX-OEG-TradesConsumer" |
| ) |
| except Exception as e: |
| print(f"FIX-OEG: Failed to create trades consumer: {e}") |
| return |
|
|
| for msg in consumer: |
| try: |
| trade = msg.value |
| buy_id = trade.get('buy_id') |
| sell_id = trade.get('sell_id') |
| price = trade.get('price', 0) |
| qty = trade.get('quantity', 0) |
|
|
| |
| with order_sessions_lock: |
| if buy_id and buy_id in order_sessions: |
| session_id, order = order_sessions[buy_id] |
| |
| app.sendExecutionReport( |
| session_id, order, |
| exec_type=fix.ExecType_TRADE, |
| ord_status=fix.OrdStatus_FILLED, |
| exec_qty=qty, |
| exec_price=price, |
| cum_qty=qty, |
| leaves_qty=0 |
| ) |
| print(f"FIX-OEG: Sent fill report to buyer {buy_id}") |
|
|
| |
| if sell_id and sell_id in order_sessions: |
| session_id, order = order_sessions[sell_id] |
| app.sendExecutionReport( |
| session_id, order, |
| exec_type=fix.ExecType_TRADE, |
| ord_status=fix.OrdStatus_FILLED, |
| exec_qty=qty, |
| exec_price=price, |
| cum_qty=qty, |
| leaves_qty=0 |
| ) |
| print(f"FIX-OEG: Sent fill report to seller {sell_id}") |
|
|
| except Exception as e: |
| print(f"FIX-OEG: Error processing trade: {e}") |
|
|
|
|
| if __name__ == "__main__": |
| settings = fix.SessionSettings("fix_server.cfg") |
| app = Application() |
| store_factory = fix.FileStoreFactory(settings) |
| log_factory = fix.FileLogFactory(settings) |
| acceptor = fix.SocketAcceptor(app, store_factory, settings, log_factory) |
| acceptor.start() |
| print("FIX OEG Gateway listening on 0.0.0.0:5001 (FIX 4.4)") |
|
|
| |
| trades_thread = threading.Thread(target=start_trades_consumer, args=(app,), daemon=True) |
| trades_thread.start() |
| print("FIX-OEG: Started trades consumer for execution reports") |
|
|
| try: |
| while True: |
| time.sleep(1) |
| except KeyboardInterrupt: |
| pass |
| finally: |
| acceptor.stop() |
|
|