Spaces:
Sleeping
Sleeping
| """FastAPI backend for DeFi Agents simulation dashboard.""" | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| from typing import List, Optional, Dict, Any | |
| import json | |
| import traceback | |
| from api.supabase_client import SupabaseClient, ActionData | |
| from core.simulation import Simulation | |
| from core.analyzer import Analyzer | |
| from core.summarizer import Summarizer | |
| app = FastAPI( | |
| title="DeFi Agents API", | |
| description="Multi-agent LLM simulation in DeFi markets", | |
| version="0.1.0" | |
| ) | |
| # Global exception handler for better error messages | |
| async def global_exception_handler(request: Request, exc: Exception): | |
| error_msg = str(exc) | |
| tb_str = traceback.format_exception(type(exc), exc, exc.__traceback__) | |
| print(f"[GLOBAL ERROR] {error_msg}") | |
| print("".join(tb_str)) | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "error": "Internal Server Error", | |
| "detail": error_msg, | |
| "type": type(exc).__name__ | |
| } | |
| ) | |
| # CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize clients | |
| supabase = None | |
| try: | |
| supabase = SupabaseClient() | |
| except ValueError: | |
| print("Warning: Supabase not configured") | |
| # ==================== Pydantic Models ==================== | |
| class RunRequest(BaseModel): | |
| num_agents: int = 5 | |
| turns_per_run: int = 10 | |
| class RunResponse(BaseModel): | |
| run_number: int | |
| metrics: Dict[str, Any] | |
| agents: List[Dict[str, Any]] | |
| class AgentActionRequest(BaseModel): | |
| agent_name: str | |
| action: str | |
| payload: Dict = {} | |
| # ==================== Root Endpoint ==================== | |
| def root(): | |
| """Root endpoint - shows API info and links.""" | |
| return { | |
| "name": "Agent Arena API", | |
| "description": "Multi-agent LLM simulation in DeFi markets", | |
| "version": "0.1.0", | |
| "links": { | |
| "docs": "/docs", | |
| "health": "/health", | |
| "runs": "/api/runs", | |
| "trends": "/api/analysis/trends" | |
| }, | |
| "usage": { | |
| "start_run": "POST /api/runs with {\"num_agents\": 5, \"turns_per_run\": 10}", | |
| "list_runs": "GET /api/runs", | |
| "view_run": "GET /api/runs/{id}" | |
| } | |
| } | |
| # ==================== Health Endpoints ==================== | |
| def health_check(): | |
| """Health check endpoint.""" | |
| return { | |
| "status": "healthy", | |
| "supabase": "connected" if supabase and supabase.health_check() else "disconnected" | |
| } | |
| def debug_supabase(): | |
| """Debug endpoint to check supabase status.""" | |
| return { | |
| "supabase_is_none": supabase is None, | |
| "supabase_type": type(supabase).__name__ if supabase else None, | |
| } | |
| def test_save(): | |
| """Debug endpoint to test saving data.""" | |
| if not supabase: | |
| return {"error": "Supabase not configured"} | |
| try: | |
| # Get a real run_id | |
| runs = supabase.get_all_runs() | |
| if not runs: | |
| return {"error": "No runs found"} | |
| run_id = runs[0]['id'] | |
| # Test saving a simple action | |
| supabase.save_action(ActionData( | |
| run_id=run_id, | |
| turn=0, | |
| agent_name="debug_test", | |
| action_type="test", | |
| payload={"test": True}, | |
| reasoning_trace="debug test", | |
| thinking_trace="" | |
| )) | |
| return {"success": True, "message": f"Test action saved to run {run_id}"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # ==================== Run Endpoints ==================== | |
| def create_run(request: RunRequest): | |
| """Start a new simulation run.""" | |
| import traceback | |
| sim = None | |
| run_id = None | |
| print(f"[DEBUG] create_run: supabase={'connected' if supabase else 'None'}") | |
| try: | |
| sim = Simulation( | |
| num_agents=request.num_agents, | |
| turns_per_run=request.turns_per_run, | |
| supabase=supabase | |
| ) | |
| print(f"[DEBUG] After Simulation init: sim.supabase={'yes' if sim.supabase else 'NO'}") | |
| # Store run_id for error recovery | |
| run_id = sim.current_run_id | |
| metrics = sim.run() | |
| # Get agent states | |
| agent_data = [] | |
| for agent in sim.agents: | |
| agent_data.append({ | |
| "name": agent.name, | |
| "token_a": agent.token_a, | |
| "token_b": agent.token_b, | |
| "profit": agent.calculate_profit(), | |
| "strategy": agent.infer_strategy() | |
| }) | |
| return RunResponse( | |
| run_number=sim.current_run_number - 1, | |
| metrics=metrics, | |
| agents=agent_data | |
| ) | |
| except TimeoutError as e: | |
| error_msg = str(e) | |
| print(f"[ERROR] Run timed out: {error_msg}") | |
| if run_id and supabase: | |
| try: | |
| supabase.update_run_status(run_id, "timeout") | |
| print(f"[ERROR] Marked run {run_id} as timeout") | |
| except: | |
| pass | |
| raise HTTPException(status_code=504, detail=f"Run timed out: {error_msg}") | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f"[ERROR] Run failed: {error_msg}") | |
| traceback.print_exc() | |
| # Try to mark run as failed if we have a run_id | |
| if run_id and supabase: | |
| try: | |
| supabase.update_run_status(run_id, "failed") | |
| print(f"[ERROR] Marked run {run_id} as failed") | |
| except: | |
| pass | |
| # Include more detail in error for debugging | |
| import traceback as tb | |
| detail = f"Run failed: {error_msg}" | |
| if "MiniMax" in error_msg or "API" in error_msg or "API key" in error_msg: | |
| detail += " (MiniMax API error - check API key configuration)" | |
| raise HTTPException(status_code=500, detail=detail) | |
| def get_all_runs(): | |
| """Get all runs.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| runs = supabase.get_all_runs() | |
| return {"runs": runs} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_run_detail(run_id: int): | |
| """Get detailed run data.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| detail = supabase.get_run_detail(run_id) | |
| return detail | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def clear_stuck_runs(): | |
| """Clear all runs marked as 'running' - marks them as failed.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| from datetime import datetime | |
| # Get all running runs | |
| runs = supabase.get_all_runs() | |
| stuck_runs = [r for r in runs if r.get("status") == "running"] | |
| updated = 0 | |
| for run in stuck_runs: | |
| supabase.client.table("runs").update({ | |
| "status": "failed", | |
| "end_time": datetime.now().isoformat() | |
| }).eq("id", run["id"]).execute() | |
| updated += 1 | |
| return {"cleared": updated, "message": f"Marked {updated} stuck runs as failed"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def reset_all(): | |
| """Clear ALL data and reset run counter to start fresh.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| from datetime import datetime | |
| # Delete all actions (need id.gt(0) to match all) | |
| supabase.client.table("actions").delete().gt("id", 0).execute() | |
| # Delete all agent states | |
| supabase.client.table("agent_states").delete().gt("id", 0).execute() | |
| # Delete all pool states | |
| supabase.client.table("pool_states").delete().gt("id", 0).execute() | |
| # Delete all run metrics | |
| supabase.client.table("run_metrics").delete().gt("id", 0).execute() | |
| # Delete all run summaries | |
| supabase.client.table("run_summaries").delete().gt("id", 0).execute() | |
| # Delete all runs | |
| supabase.client.table("runs").delete().gt("id", 0).execute() | |
| return { | |
| "message": "All data cleared. Ready to start fresh from Run 1.", | |
| "status": "reset" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def fix_summary_runids(): | |
| """Fix summary run_id values to use run_number instead of internal ID.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| # Get all runs | |
| runs = supabase.get_all_runs() | |
| run_id_to_number = {r["id"]: r.get("run_number", r["id"]) for r in runs} | |
| # Get all summaries | |
| summaries = supabase.get_all_summaries() | |
| fixed = 0 | |
| for summary in summaries: | |
| old_run_id = summary.get("run_id") | |
| if old_run_id in run_id_to_number: | |
| correct_run_number = run_id_to_number[old_run_id] | |
| if old_run_id != correct_run_number: | |
| supabase.client.table("run_summaries").update({ | |
| "run_id": correct_run_number | |
| }).eq("id", summary["id"]).execute() | |
| fixed += 1 | |
| return { | |
| "message": f"Fixed {fixed} summary run_ids", | |
| "fixed": fixed | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def fix_gini_values(): | |
| """Fix old Gini coefficient values that exceed 1.0.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| # Get all run metrics | |
| runs = supabase.get_all_runs() | |
| fixed = 0 | |
| for run in runs: | |
| run_id = run["id"] | |
| metrics = supabase.get_metrics(run_id) | |
| if metrics: | |
| gini = metrics.get("gini_coefficient", 0) | |
| if gini > 1: | |
| clamped_gini = max(0, min(1, gini)) | |
| supabase.client.table("run_metrics").update({ | |
| "gini_coefficient": clamped_gini | |
| }).eq("id", metrics["id"]).execute() | |
| fixed += 1 | |
| return { | |
| "message": f"Fixed {fixed} Gini values", | |
| "fixed": fixed | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ==================== Metrics Endpoints ==================== | |
| def get_run_metrics(run_id: int): | |
| """Get metrics for a specific run.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| metrics = supabase.get_metrics(run_id) | |
| if not metrics: | |
| raise HTTPException(status_code=404, detail="Run not found") | |
| return metrics | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_trends(): | |
| """Get trend analysis across all runs.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| runs = supabase.get_all_runs() | |
| run_data = [r for r in runs if r.get("status") == "completed"] | |
| metrics = [] | |
| for r in run_data: | |
| run_metrics = supabase.get_metrics(r["id"]) | |
| if run_metrics: | |
| metrics.append(run_metrics) | |
| trends = Analyzer.detect_trends(metrics) | |
| return trends | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_action_distribution(): | |
| """Get action distribution across all runs.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| runs = supabase.get_all_runs() | |
| completed_runs = [r for r in runs if r.get("status") == "completed"] | |
| action_counts = {} | |
| for run in completed_runs: | |
| actions = supabase.get_actions(run["id"]) | |
| for action in actions: | |
| action_type = action.get("action_type", "unknown") | |
| # Filter out bonus actions for cleaner chart | |
| if not action_type.endswith("_bonus") and action_type != "alliance_success": | |
| action_counts[action_type] = action_counts.get(action_type, 0) + 1 | |
| data = [{"action_type": k, "count": v} for k, v in action_counts.items()] | |
| data.sort(key=lambda x: x["count"], reverse=True) | |
| return {"data": data} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_chaos_events(): | |
| """Get MarketMaker and ChaosAgent events.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| runs = supabase.get_all_runs() | |
| completed_runs = [r for r in runs if r.get("status") == "completed"] | |
| events = [] | |
| for run in completed_runs: | |
| actions = supabase.get_actions(run["id"]) | |
| for action in actions: | |
| action_type = action.get("action_type", "") | |
| if "marketmaker" in action_type.lower() or "chaos" in action_type.lower() or "price_shock" in action_type.lower(): | |
| events.append({ | |
| "run_id": run.get("run_number"), | |
| "turn": action.get("turn"), | |
| "action_type": action_type, | |
| "reasoning": action.get("reasoning_trace", "")[:100], | |
| "payload": action.get("payload", {}) | |
| }) | |
| # Sort by run, then by turn | |
| events.sort(key=lambda x: (x["run_id"], x["turn"]), reverse=True) | |
| return {"data": events[:50]} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ==================== Thinking/Reasoning Endpoints ==================== | |
| def get_thinking_trace(action_id: int): | |
| """Get the thinking trace for a specific action.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| thinking = supabase.get_thinking_trace(action_id) | |
| if thinking is None: | |
| raise HTTPException(status_code=404, detail="Action not found") | |
| return {"thinking": thinking} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ==================== Analysis Endpoints ==================== | |
| def get_arms_race_analysis(run_id: int): | |
| """Detect arms race patterns in a run.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| actions = supabase.get_actions(run_id) | |
| analysis = Analyzer.detect_arms_races(actions) | |
| return analysis | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ==================== Summary Endpoints ==================== | |
| def get_run_summary(run_id: int): | |
| """Get summary for a specific run.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| summary = supabase.get_run_summary(run_id) | |
| if not summary: | |
| return {"run_id": run_id, "summary": None, "message": "No summary generated yet"} | |
| return summary | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_all_summaries(): | |
| """Get all run summaries.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| summaries = supabase.get_all_summaries() | |
| return {"summaries": summaries} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def generate_run_summary(run_id: int): | |
| """Generate and save a summary for a run.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| summarizer = Summarizer(supabase=supabase) | |
| result = summarizer.summarize_and_save(run_id) | |
| return result | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ==================== Agent Profit Endpoints ==================== | |
| def get_all_agents(): | |
| """Get all unique agent names.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| agents = supabase.get_all_agent_names() | |
| return {"agents": agents} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_agent_profits(agent_name: str): | |
| """Get profit history for an agent across all runs.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| profits = supabase.get_agent_profits_all_runs(agent_name) | |
| # Format for chart: [{run: 1, profit: 0, turn: 0}, ...] | |
| chart_data = [] | |
| for p in profits: | |
| chart_data.append({ | |
| "run": p["run_id"], | |
| "turn": p["turn"], | |
| "profit": p["profit"], | |
| "strategy": p.get("strategy", "unknown") | |
| }) | |
| return { | |
| "agent_name": agent_name, | |
| "data": chart_data | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_all_agents_profits(): | |
| """Get profit history for ALL agents across all runs - combined for charting.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| agents = supabase.get_all_agent_names() | |
| all_profits = {} | |
| for agent_name in agents: | |
| profits = supabase.get_agent_profits_all_runs(agent_name) | |
| for p in profits: | |
| run_id = p["run_id"] | |
| if run_id not in all_profits: | |
| all_profits[run_id] = {"run": run_id} | |
| all_profits[run_id][agent_name] = p["profit"] | |
| # Convert to array and sort by run | |
| chart_data = sorted(all_profits.values(), key=lambda x: x["run"]) | |
| return { | |
| "agents": agents, | |
| "data": chart_data | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ==================== Wealth Trajectory Endpoints ==================== | |
| def get_wealth_trajectory(run_id: int): | |
| """Get wealth trajectory for all agents in a run - shows token balances over turns.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| # Get all agent states for this run, ordered by turn | |
| agent_states = supabase.get_agent_states(run_id) | |
| # Group by turn, then by agent | |
| trajectory = {} | |
| for state in agent_states: | |
| turn = state["turn"] | |
| agent = state["agent_name"] | |
| total_tokens = state["token_a_balance"] + state["token_b_balance"] | |
| if turn not in trajectory: | |
| trajectory[turn] = {} | |
| trajectory[turn][agent] = round(total_tokens, 2) | |
| # Convert to array format sorted by turn | |
| turns = sorted(trajectory.keys()) | |
| chart_data = [{"turn": turn, "balances": trajectory[turn]} for turn in turns] | |
| # Get agent names | |
| agents = list(set(state["agent_name"] for state in agent_states)) | |
| return { | |
| "run_id": run_id, | |
| "agents": agents, | |
| "data": chart_data | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_all_wealth_trajectories(): | |
| """Get wealth trajectory summaries for all completed runs.""" | |
| if not supabase: | |
| raise HTTPException(status_code=503, detail="Supabase not configured") | |
| try: | |
| runs = supabase.get_all_runs() | |
| completed_runs = [r for r in runs if r.get("status") == "completed"] | |
| trajectories = [] | |
| for run in completed_runs: | |
| run_id = run["id"] | |
| run_number = run.get("run_number", run_id) | |
| # Get agent states for this run | |
| agent_states = supabase.get_agent_states(run_id) | |
| if not agent_states: | |
| continue | |
| # Calculate start and end wealth per agent | |
| agent_start = {} | |
| agent_end = {} | |
| for state in agent_states: | |
| agent = state["agent_name"] | |
| total = state["token_a_balance"] + state["token_b_balance"] | |
| if agent not in agent_start: | |
| agent_start[agent] = total | |
| agent_end[agent] = total | |
| # Calculate gains/losses | |
| gains = {} | |
| for agent in agent_start: | |
| gains[agent] = round(agent_end.get(agent, 0) - agent_start[agent], 2) | |
| trajectories.append({ | |
| "run_number": run_number, | |
| "agents": list(agent_start.keys()), | |
| "start_wealth": agent_start, | |
| "end_wealth": agent_end, | |
| "gains": gains, | |
| "winner": max(gains, key=gains.get) if gains else None, | |
| "winner_gain": max(gains.values()) if gains else 0 | |
| }) | |
| return {"trajectories": trajectories} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_version(): | |
| """Get server version and git info.""" | |
| import subprocess | |
| try: | |
| commit = subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True, text=True) | |
| branch = subprocess.run(["git", "rev-parse", "--abbrev-ref", "HEAD"], capture_output=True, text=True) | |
| return { | |
| "commit": commit.stdout.strip()[:8], | |
| "branch": branch.stdout.strip(), | |
| "status": "running" | |
| } | |
| except Exception: | |
| return {"commit": "unknown", "branch": "unknown", "status": "running"} | |
| def restart_server(): | |
| """ | |
| Signal the server to restart. | |
| For HuggingFace Spaces, this works with their restart mechanism. | |
| """ | |
| import os | |
| import signal | |
| # Set environment variable to signal restart | |
| os.environ["RESTART_REQUESTED"] = "1" | |
| # Get PID for graceful shutdown | |
| pid = os.getpid() | |
| return {"status": "restarting", "message": "Restart signal sent", "pid": pid} | |
| def run_server(): | |
| """Run the FastAPI server.""" | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |
| if __name__ == "__main__": | |
| print("Starting DeFi Agents API server on http://0.0.0.0:8000") | |
| run_server() | |