File size: 8,398 Bytes
d8bad25 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | """
FastAPI Server for Gemini 3 Flash AI Trading Platform.
Connects the MCP MT5 bridge, Gemini Agent, and Next.js frontend via WebSockets.
"""
import os
import asyncio
import json
from contextlib import asynccontextmanager
from typing import List, Optional
from datetime import datetime
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
from models import (
CandleData, TickData, PositionInfo, AccountInfo, AgentDecision,
TradeRequest, WSMessage
)
from mt5_mcp import MT5Bridge
from agent import TradingAgent
from ws_manager import ConnectionManager
load_dotenv()
# Initialize components
mt5_bridge = MT5Bridge()
agent = TradingAgent()
ws_manager = ConnectionManager()
# Global state
agent_running = False
background_task_handle = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifecycle events: connect details on startup, cleanup on shutdown."""
print("[Main] Starting up...")
# 1. Connect to MT5
result = mt5_bridge.initialize()
if not result["success"]:
print(f"[Main] SEVERE: {result['message']}")
# In strict mode, we should ideally stop here
if os.getenv("FORCE_MT5_DATA", "false").lower() == "true":
raise RuntimeError(f"STRICT MODE: Failed to connect to MT5. {result['message']}")
else:
print(f"[Main] {result['message']}")
# 2. Start background data loop (non-blocking)
asyncio.create_task(market_data_loop())
yield
print("[Main] Shutting down...")
mt5_bridge.shutdown()
app = FastAPI(title="Gemini 3 Flash Trader", lifespan=lifespan)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify frontend URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ββ REST API ββββββββββββββββββββββββββββββββββββββββββββββββ
@app.get("/api/health")
async def health_check():
"""Check system health and connection status."""
return {
"status": "online",
"mt5_connected": mt5_bridge.connected,
"agent_running": agent_running,
"mode": "simulation" if mt5_bridge.simulation_mode else "live"
}
@app.get("/api/account", response_model=AccountInfo)
async def get_account():
"""Get current account information."""
info = mt5_bridge.get_account_info()
if "error" in info:
raise HTTPException(status_code=500, detail=info["error"])
return info
@app.get("/api/positions", response_model=List[PositionInfo])
async def get_positions():
"""Get all open positions."""
return mt5_bridge.get_positions()
@app.get("/api/candles", response_model=List[CandleData])
async def get_candles(timeframe: str = "M5", count: int = 200):
"""Get historical candle data."""
return mt5_bridge.get_rates(timeframe=timeframe, count=count)
@app.post("/api/trade")
async def execute_trade(trade: TradeRequest):
"""Execute a manual trade."""
if trade.action == "close":
if not trade.ticket:
raise HTTPException(status_code=400, detail="Ticket required for close")
result = mt5_bridge.close_position(trade.ticket)
else:
result = mt5_bridge.place_order(
action=trade.action,
symbol=trade.symbol,
volume=trade.volume,
sl=trade.sl or 0.0,
tp=trade.tp or 0.0
)
if not result.get("success"):
raise HTTPException(status_code=400, detail=result.get("message"))
# Broadcast trade event
await ws_manager.broadcast("trade_event", result)
return result
@app.post("/api/agent/toggle")
async def toggle_agent(enable: bool):
"""Start or stop the autonomous trading agent."""
global agent_running
agent_running = enable
status = "running" if agent_running else "stopped"
print(f"[Main] Agent {status}")
await ws_manager.broadcast("agent_status", {"status": status})
return {"status": status}
# ββ WebSocket βββββββββββββββββββββββββββββββββββββββββββββββ
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await ws_manager.connect(websocket)
try:
# Send initial state
await ws_manager.send_personal(websocket, "status", {
"mt5": mt5_bridge.connected,
"agent": agent_running
})
while True:
# Keep connection alive + handle incoming messages (e.g. ping)
data = await websocket.receive_text()
# Parse if needed, currently we just listen
except WebSocketDisconnect:
ws_manager.disconnect(websocket)
# ββ Background Loops ββββββββββββββββββββββββββββββββββββββββ
async def market_data_loop():
"""
Main loop:
1. Stream prices (candles/ticks) to frontend
2. Invoke Agent if enabled
"""
print("[Main] Market data loop started")
last_candle_time = 0
while True:
try:
# 1. Fetch & Broadcast Market Data
current_tick = mt5_bridge.get_tick()
candles = mt5_bridge.get_rates(count=2) # Just need latest for updates
if candles:
latest_candle = candles[-1]
await ws_manager.broadcast("candle_update", latest_candle)
if current_tick:
await ws_manager.broadcast("tick_update", current_tick)
# 2. Agent Logic (if running)
if agent_running:
# Run agent roughly every new candle or every N seconds
# For this demo, we'll run it every 10s to see activity
if int(datetime.now().timestamp()) % 10 == 0:
await run_agent_cycle(current_tick)
# 3. Broadcast Account/Positions periodically
if int(datetime.now().timestamp()) % 2 == 0:
positions = mt5_bridge.get_positions()
account = mt5_bridge.get_account_info()
await ws_manager.broadcast("positions", positions)
await ws_manager.broadcast("account", account)
await asyncio.sleep(1) # 1Hz update rate
except Exception as e:
print(f"[Loop Error] {e}")
await asyncio.sleep(5)
async def run_agent_cycle(tick: dict):
"""One cycle of the AI agent: Analyze -> Reason -> Act."""
print("[Agent] Analyzing market...")
# Gather context
candles = mt5_bridge.get_rates(count=50) # Give agent more history
positions = mt5_bridge.get_positions()
account = mt5_bridge.get_account_info()
# 1. Ask Gemini
decision = await agent.analyze(candles, tick, account, positions)
# 2. Broadcast Reasoning
await ws_manager.broadcast("reasoning", {
"action": decision["action"],
"reasoning": decision["reasoning"],
"confidence": decision["confidence"],
"timestamp": datetime.now().isoformat()
})
# 3. Execute Decision
if decision["action"] in ["BUY", "SELL"]:
# Check confidence threshold
if decision["confidence"] >= 0.7:
print(f"[Agent] Executing {decision['action']} ({decision['confidence']})")
result = mt5_bridge.place_order(
action=decision["action"],
volume=decision.get("volume", 0.01),
sl=decision.get("sl", 0.0),
tp=decision.get("tp", 0.0)
)
await ws_manager.broadcast("trade_event", result)
elif decision["action"] == "CLOSE":
# Close all relevant positions
for p in positions:
res = mt5_bridge.close_position(p["ticket"])
await ws_manager.broadcast("trade_event", res)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
|