import base64 import json import os import time from typing import List, Optional from uuid import uuid4 from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile from fastapi.middleware.cors import CORSMiddleware from langchain_core.messages import HumanMessage from pydantic import BaseModel from prometheus_client import make_asgi_app, Counter, Histogram, Gauge from src.infrastructure.logging import setup_logging, get_logger from src.infrastructure.rate_limiter import setup_rate_limiter, limiter from src.agents.config import Config from src.agents.supervisor.agent import Supervisor from src.models.chatMessage import ChatMessage from src.routes.chat_manager_routes import router as chat_manager_router from src.service.chat_manager import chat_manager_instance from src.agents.crypto_data.tools import get_coingecko_id, get_tradingview_symbol from src.agents.metadata import metadata # Setup structured logging log_level = os.getenv("LOG_LEVEL", "DEBUG") log_format = os.getenv("LOG_FORMAT", "color") setup_logging(level=log_level, format_type=log_format) logger = get_logger(__name__) logger.info("Starting Zico Agent API") # Initialize FastAPI app app = FastAPI( title="Zico Agent API", version="2.0", description="Multi-agent AI assistant with streaming support", ) # Setup rate limiting setup_rate_limiter(app) # Enable CORS for local/frontend dev app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Metrics REQUEST_COUNT = Counter( "http_requests_total", "Total HTTP requests", ["method", "endpoint", "status"] ) REQUEST_LATENCY = Histogram( "http_request_duration_seconds", "HTTP request latency", ["method", "endpoint"] ) ACTIVE_CONVERSATIONS = Gauge( "active_conversations", "Number of active conversations" ) # Add metrics endpoint metrics_app = make_asgi_app() app.mount("/metrics", metrics_app) @app.middleware("http") async def log_request_timing(request: Request, call_next): request_id = request.headers.get("x-request-id") or str(uuid4()) start = time.perf_counter() status_code = 500 method = request.method path = request.url.path try: response = await call_next(request) status_code = response.status_code response.headers["x-request-id"] = request_id return response finally: duration = time.perf_counter() - start duration_ms = duration * 1000 # Update metrics REQUEST_COUNT.labels(method=method, endpoint=path, status=status_code).inc() REQUEST_LATENCY.labels(method=method, endpoint=path).observe(duration) logger.info( "HTTP %s %s -> %s in %.1fms request_id=%s", method, path, status_code, duration_ms, request_id, ) # Instantiate Supervisor agent (singleton LLM with cost tracking) supervisor = Supervisor(Config.get_llm(with_cost_tracking=True)) class ChatRequest(BaseModel): message: ChatMessage chain_id: str = "default" wallet_address: str = "default" conversation_id: str = "default" user_id: str = "anonymous" # Lightweight in-memory agent config for frontend integrations AVAILABLE_AGENTS = [ {"name": "default", "human_readable_name": "Default General Purpose", "description": "General chat and meta-queries about agents."}, {"name": "crypto data", "human_readable_name": "Crypto Data Fetcher", "description": "Real-time cryptocurrency prices, market cap, FDV, TVL."}, {"name": "token swap", "human_readable_name": "Token Swap Agent", "description": "Swap tokens using supported DEX APIs."}, {"name": "realtime search", "human_readable_name": "Real-Time Search", "description": "Search the web for recent information."}, {"name": "dexscreener", "human_readable_name": "DexScreener Analyst", "description": "Fetches and analyzes DEX trading data."}, {"name": "rugcheck", "human_readable_name": "Token Safety Analyzer", "description": "Analyzes token safety and trends (Solana)."}, {"name": "imagen", "human_readable_name": "Image Generator", "description": "Generate images from text prompts."}, {"name": "rag", "human_readable_name": "Document Assistant", "description": "Answer questions about uploaded documents."}, {"name": "tweet sizzler", "human_readable_name": "Tweet / X-Post Generator", "description": "Generate engaging tweets."}, {"name": "dca", "human_readable_name": "DCA Strategy Manager", "description": "Plan and manage DCA strategies."}, {"name": "base", "human_readable_name": "Base Transaction Manager", "description": "Handle transactions on Base network."}, {"name": "mor rewards", "human_readable_name": "MOR Rewards Tracker", "description": "Track MOR rewards and balances."}, {"name": "mor claims", "human_readable_name": "MOR Claims Agent", "description": "Claim MOR tokens."}, {"name": "lending", "human_readable_name": "Lending Agent", "description": "Supply, borrow, repay, or withdraw assets."}, ] # Default to a small, reasonable subset SELECTED_AGENTS = [agent["name"] for agent in AVAILABLE_AGENTS[:6]] # Commands exposed to the ChatInput autocomplete AGENT_COMMANDS = [ {"command": "morpheus", "name": "Default General Purpose", "description": "General assistant for simple queries and meta-questions."}, {"command": "crypto", "name": "Crypto Data Fetcher", "description": "Get prices, market cap, FDV, TVL and more."}, {"command": "document", "name": "Document Assistant", "description": "Ask questions about uploaded documents."}, {"command": "tweet", "name": "Tweet / X-Post Generator", "description": "Create engaging tweets about crypto and web3."}, {"command": "search", "name": "Real-Time Search", "description": "Search the web for recent events or updates."}, {"command": "dexscreener", "name": "DexScreener Analyst", "description": "Analyze DEX trading data on supported chains."}, {"command": "rugcheck", "name": "Token Safety Analyzer", "description": "Check token safety and view trending tokens."}, {"command": "dca", "name": "DCA Strategy Manager", "description": "Plan a dollar-cost averaging strategy."}, {"command": "base", "name": "Base Transaction Manager", "description": "Send tokens and swap on Base."}, {"command": "rewards", "name": "MOR Rewards Tracker", "description": "Check rewards balance and accrual."}, {"command": "lending", "name": "Lending Agent", "description": "Supply, borrow, repay, or withdraw assets."}, ] # Agents endpoints expected by the frontend @app.get("/agents/available") def get_available_agents(): return { "selected_agents": SELECTED_AGENTS, "available_agents": AVAILABLE_AGENTS, } @app.post("/agents/selected") async def set_selected_agents(request: Request): global SELECTED_AGENTS data = await request.json() agents = data.get("agents", []) # Validate provided names against available agents available_names = {a["name"] for a in AVAILABLE_AGENTS} valid_agents = [a for a in agents if a in available_names] if not valid_agents: # Keep previous selection if nothing valid provided return {"status": "no_change", "agents": SELECTED_AGENTS} # Update selection SELECTED_AGENTS = valid_agents[:6] return {"status": "success", "agents": SELECTED_AGENTS} @app.get("/agents/commands") def get_agent_commands(): return {"commands": AGENT_COMMANDS} # Map agent runtime names to high-level types for storage/analytics def _map_agent_type(agent_name: str) -> str: mapping = { "crypto_agent": "crypto data", "default_agent": "default", "database_agent": "analysis", "search_agent": "realtime search", "swap_agent": "token swap", "lending_agent": "lending", "staking_agent": "staking", "supervisor": "supervisor", } return mapping.get(agent_name, "supervisor") def _sanitize_user_message_content(content: str | None) -> str | None: """Strip wrapper prompts (e.g., 'User Message: ...') the frontend might send.""" if not content: return content text = content.strip() marker = "user message:" lowered = text.lower() idx = lowered.rfind(marker) if idx != -1: candidate = text[idx + len(marker):].strip() if candidate: return candidate return text def _resolve_identity(request: ChatRequest) -> tuple[str, str]: """Ensure each request has a stable user and conversation identifier.""" user_id = (request.user_id or "").strip() if not user_id or user_id.lower() == "anonymous": wallet = (request.wallet_address or "").strip() if wallet and wallet.lower() != "default": user_id = f"wallet::{wallet.lower()}" else: raise HTTPException( status_code=400, detail="A stable 'user_id' or wallet_address is required for swap operations.", ) conversation_id = (request.conversation_id or "").strip() or "default" return user_id, conversation_id @app.get("/health") def health_check(): return {"status": "ok"} @app.get("/costs") def get_costs(): """Get current LLM cost summary.""" cost_tracker = Config.get_cost_tracker() return cost_tracker.get_summary() @app.get("/costs/detailed") def get_detailed_costs(): """Get detailed LLM cost report.""" cost_tracker = Config.get_cost_tracker() return cost_tracker.get_detailed_report() @app.get("/costs/conversation") def get_conversation_costs(request: Request): """Get accumulated LLM costs for a specific conversation.""" params = request.query_params conversation_id = params.get("conversation_id") user_id = params.get("user_id") if not conversation_id or not user_id: raise HTTPException( status_code=400, detail="Both 'conversation_id' and 'user_id' query parameters are required.", ) costs = chat_manager_instance.get_conversation_costs( conversation_id=conversation_id, user_id=user_id, ) return { "conversation_id": conversation_id, "user_id": user_id, "costs": costs, } @app.get("/models") def get_available_models(): """List available LLM models.""" return { "models": Config.list_available_models(), "providers": Config.list_available_providers(), "default": Config.DEFAULT_MODEL, } @app.get("/chat/messages") def get_messages(request: Request): params = request.query_params conversation_id = params.get("conversation_id", "default") user_id = params.get("user_id", "anonymous") return {"messages": chat_manager_instance.get_messages(conversation_id, user_id)} @app.get("/chat/conversations") def get_conversations(request: Request): params = request.query_params user_id = params.get("user_id", "anonymous") return {"conversation_ids": chat_manager_instance.get_all_conversation_ids(user_id)} @app.post("/chat") async def chat(request: ChatRequest): user_id: str | None = None conversation_id: str | None = None try: logger.debug("Received chat payload: %s", request.model_dump()) user_id, conversation_id = _resolve_identity(request) logger.debug( "Resolved chat identity user=%s conversation=%s wallet=%s", user_id, conversation_id, (request.wallet_address or "").strip() if request.wallet_address else None, ) wallet = request.wallet_address.strip() if request.wallet_address else None if wallet and wallet.lower() == "default": wallet = None display_name = None if isinstance(request.message.metadata, dict): display_name = request.message.metadata.get("display_name") chat_manager_instance.ensure_session( user_id, conversation_id, wallet_address=wallet, display_name=display_name, ) # Track active conversation ACTIVE_CONVERSATIONS.inc() if request.message.role == "user": clean_content = _sanitize_user_message_content(request.message.content) if clean_content is not None: request.message.content = clean_content # Add the user message to the conversation chat_manager_instance.add_message( message=request.message.dict(), conversation_id=conversation_id, user_id=user_id ) # Get all messages from the conversation to pass to the agent conversation_messages = chat_manager_instance.get_messages( conversation_id=conversation_id, user_id=user_id ) # Take cost snapshot before invoking cost_tracker = Config.get_cost_tracker() cost_snapshot = cost_tracker.get_snapshot() # Invoke the supervisor agent with the conversation result = await supervisor.invoke( conversation_messages, conversation_id=conversation_id, user_id=user_id, ) # Calculate and save cost delta for this request cost_delta = cost_tracker.calculate_delta(cost_snapshot) if cost_delta.get("cost", 0) > 0 or cost_delta.get("calls", 0) > 0: chat_manager_instance.update_conversation_costs( cost_delta, conversation_id=conversation_id, user_id=user_id, ) logger.debug( "Supervisor returned result for user=%s conversation=%s: %s", user_id, conversation_id, result, ) # Add the agent's response to the conversation if result and isinstance(result, dict): agent_name = result.get("agent", "supervisor") agent_name = _map_agent_type(agent_name) # Build response metadata and enrich with coin info for crypto price queries response_metadata = {"supervisor_result": result} swap_meta_snapshot = None # Prefer supervisor-provided metadata if isinstance(result, dict) and result.get("metadata"): response_metadata.update(result.get("metadata") or {}) elif agent_name == "token swap": swap_meta = metadata.get_swap_agent( user_id=user_id, conversation_id=conversation_id, ) if swap_meta: response_metadata.update(swap_meta) swap_meta_snapshot = swap_meta elif agent_name == "lending": lending_meta = metadata.get_lending_agent( user_id=user_id, conversation_id=conversation_id, ) if lending_meta: response_metadata.update(lending_meta) elif agent_name == "staking": staking_meta = metadata.get_staking_agent( user_id=user_id, conversation_id=conversation_id, ) if staking_meta: response_metadata.update(staking_meta) logger.debug( "Response metadata for user=%s conversation=%s: %s", user_id, conversation_id, response_metadata, ) # Create a ChatMessage from the supervisor response response_message = ChatMessage( role="assistant", content=result.get("response", "No response available"), agent_name=agent_name, agent_type=_map_agent_type(agent_name), metadata=result.get("metadata", {}), conversation_id=conversation_id, user_id=user_id, requires_action=True if agent_name in ["token swap", "lending", "staking"] else False, action_type="swap" if agent_name == "token swap" else "lending" if agent_name == "lending" else "staking" if agent_name == "staking" else None ) # Add the response message to the conversation chat_manager_instance.add_message( message=response_message.dict(), conversation_id=conversation_id, user_id=user_id ) # Return only the clean response response_payload = { "response": result.get("response", "No response available"), "agentName": agent_name, } response_meta = result.get("metadata") or {} if agent_name == "token swap" and not response_meta: if swap_meta_snapshot: response_meta = swap_meta_snapshot else: swap_meta = metadata.get_swap_agent( user_id=user_id, conversation_id=conversation_id, ) if swap_meta: response_meta = swap_meta if response_meta: response_payload["metadata"] = response_meta if agent_name == "token swap": should_clear = False if response_meta: status = response_meta.get("status") if isinstance(response_meta, dict) else None event = response_meta.get("event") if isinstance(response_meta, dict) else None should_clear = status == "ready" or event == "swap_intent_ready" if should_clear: metadata.set_swap_agent( {}, user_id=user_id, conversation_id=conversation_id, ) if agent_name == "lending": should_clear = False if response_meta: status = response_meta.get("status") if isinstance(response_meta, dict) else None event = response_meta.get("event") if isinstance(response_meta, dict) else None should_clear = status == "ready" or event == "lending_intent_ready" if should_clear: metadata.set_lending_agent( {}, user_id=user_id, conversation_id=conversation_id, ) if agent_name == "staking": should_clear = False if response_meta: status = response_meta.get("status") if isinstance(response_meta, dict) else None event = response_meta.get("event") if isinstance(response_meta, dict) else None should_clear = status == "ready" or event == "staking_intent_ready" if should_clear: metadata.set_staking_agent( {}, user_id=user_id, conversation_id=conversation_id, ) return response_payload return {"response": "No response available", "agent": "supervisor"} except Exception as e: logger.exception( "Chat handler failed for user=%s conversation=%s", user_id, conversation_id, ) raise HTTPException(status_code=500, detail=str(e)) # Supported audio MIME types AUDIO_MIME_TYPES = { ".mp3": "audio/mpeg", ".wav": "audio/wav", ".flac": "audio/flac", ".ogg": "audio/ogg", ".webm": "audio/webm", ".m4a": "audio/mp4", ".aac": "audio/aac", } # Max audio file size (20MB) MAX_AUDIO_SIZE = 20 * 1024 * 1024 def _get_audio_mime_type(filename: str, content_type: str | None) -> str: """Determine the MIME type for an audio file.""" # Try from filename extension first if filename: ext = os.path.splitext(filename.lower())[1] if ext in AUDIO_MIME_TYPES: return AUDIO_MIME_TYPES[ext] # Fall back to content type from upload if content_type and content_type.startswith("audio/"): return content_type # Default to mpeg return "audio/mpeg" @app.post("/chat/audio") async def chat_audio( audio: UploadFile = File(..., description="Audio file (mp3, wav, flac, ogg, webm, m4a)"), user_id: str = Form(..., description="User ID"), conversation_id: str = Form(..., description="Conversation ID"), wallet_address: str = Form("default", description="Wallet address"), ): """ Process audio input through the agent pipeline. The audio is first transcribed using Gemini, then the transcription is passed to the supervisor agent for processing (just like text input). """ request_user_id: str | None = user_id request_conversation_id: str | None = conversation_id try: # Validate user_id if not user_id or user_id.lower() == "anonymous": wallet = (wallet_address or "").strip() if wallet and wallet.lower() != "default": request_user_id = f"wallet::{wallet.lower()}" else: raise HTTPException( status_code=400, detail="A stable 'user_id' or wallet_address is required.", ) logger.debug( "Received audio chat request user=%s conversation=%s filename=%s", request_user_id, request_conversation_id, audio.filename, ) # Validate file size audio_content = await audio.read() if len(audio_content) > MAX_AUDIO_SIZE: raise HTTPException( status_code=413, detail=f"Audio file too large. Maximum size is {MAX_AUDIO_SIZE // (1024*1024)}MB.", ) if len(audio_content) == 0: raise HTTPException( status_code=400, detail="Audio file is empty.", ) # Get MIME type mime_type = _get_audio_mime_type(audio.filename or "", audio.content_type) logger.debug("Audio MIME type: %s, size: %d bytes", mime_type, len(audio_content)) # Encode audio to base64 encoded_audio = base64.b64encode(audio_content).decode("utf-8") # Ensure session exists wallet = wallet_address.strip() if wallet_address else None if wallet and wallet.lower() == "default": wallet = None chat_manager_instance.ensure_session( request_user_id, request_conversation_id, wallet_address=wallet, ) # Track active conversation ACTIVE_CONVERSATIONS.inc() # Take cost snapshot before invoking cost_tracker = Config.get_cost_tracker() cost_snapshot = cost_tracker.get_snapshot() # Step 1: Transcribe the audio using Gemini transcription_message = HumanMessage( content=[ {"type": "text", "text": "Transcribe exactly what is being said in this audio. Return ONLY the transcription, nothing else."}, {"type": "media", "data": encoded_audio, "mime_type": mime_type}, ] ) llm = Config.get_llm(with_cost_tracking=True) transcription_response = llm.invoke([transcription_message]) # Extract transcription text transcribed_text = transcription_response.content if isinstance(transcribed_text, list): text_parts = [] for part in transcribed_text: if isinstance(part, dict) and part.get("text"): text_parts.append(part["text"]) elif isinstance(part, str): text_parts.append(part) transcribed_text = " ".join(text_parts).strip() if not transcribed_text: raise HTTPException( status_code=400, detail="Could not transcribe the audio. Please try again with a clearer recording.", ) logger.info("Audio transcribed: %s", transcribed_text[:200]) # Step 2: Store the user message with the transcription user_message = ChatMessage( role="user", content=transcribed_text, metadata={ "source": "audio", "audio_filename": audio.filename, "audio_size": len(audio_content), "audio_mime_type": mime_type, }, ) chat_manager_instance.add_message( message=user_message.dict(), conversation_id=request_conversation_id, user_id=request_user_id, ) # Step 3: Get conversation history and invoke supervisor conversation_messages = chat_manager_instance.get_messages( conversation_id=request_conversation_id, user_id=request_user_id, ) result = await supervisor.invoke( conversation_messages, conversation_id=request_conversation_id, user_id=request_user_id, ) # Calculate and save cost delta cost_delta = cost_tracker.calculate_delta(cost_snapshot) if cost_delta.get("cost", 0) > 0 or cost_delta.get("calls", 0) > 0: chat_manager_instance.update_conversation_costs( cost_delta, conversation_id=request_conversation_id, user_id=request_user_id, ) logger.debug( "Supervisor returned result for audio user=%s conversation=%s: %s", request_user_id, request_conversation_id, result, ) # Step 4: Process and store the agent response (same as /chat endpoint) if result and isinstance(result, dict): agent_name = result.get("agent", "supervisor") agent_name = _map_agent_type(agent_name) response_metadata = {"supervisor_result": result, "source": "audio"} swap_meta_snapshot = None if isinstance(result, dict) and result.get("metadata"): response_metadata.update(result.get("metadata") or {}) elif agent_name == "token swap": swap_meta = metadata.get_swap_agent( user_id=request_user_id, conversation_id=request_conversation_id, ) if swap_meta: response_metadata.update(swap_meta) swap_meta_snapshot = swap_meta elif agent_name == "lending": lending_meta = metadata.get_lending_agent( user_id=request_user_id, conversation_id=request_conversation_id, ) if lending_meta: response_metadata.update(lending_meta) elif agent_name == "staking": staking_meta = metadata.get_staking_agent( user_id=request_user_id, conversation_id=request_conversation_id, ) if staking_meta: response_metadata.update(staking_meta) response_message = ChatMessage( role="assistant", content=result.get("response", "No response available"), agent_name=agent_name, agent_type=_map_agent_type(agent_name), metadata=result.get("metadata", {}), conversation_id=request_conversation_id, user_id=request_user_id, requires_action=True if agent_name in ["token swap", "lending", "staking"] else False, action_type="swap" if agent_name == "token swap" else "lending" if agent_name == "lending" else "staking" if agent_name == "staking" else None, ) chat_manager_instance.add_message( message=response_message.dict(), conversation_id=request_conversation_id, user_id=request_user_id, ) # Build response payload response_payload = { "response": result.get("response", "No response available"), "agentName": agent_name, "transcription": transcribed_text, } response_meta = result.get("metadata") or {} if agent_name == "token swap" and not response_meta: if swap_meta_snapshot: response_meta = swap_meta_snapshot else: swap_meta = metadata.get_swap_agent( user_id=request_user_id, conversation_id=request_conversation_id, ) if swap_meta: response_meta = swap_meta if response_meta: response_payload["metadata"] = response_meta # Clear metadata after ready events (same as /chat) if agent_name == "token swap": should_clear = False if response_meta: status = response_meta.get("status") if isinstance(response_meta, dict) else None event = response_meta.get("event") if isinstance(response_meta, dict) else None should_clear = status == "ready" or event == "swap_intent_ready" if should_clear: metadata.set_swap_agent({}, user_id=request_user_id, conversation_id=request_conversation_id) if agent_name == "lending": should_clear = False if response_meta: status = response_meta.get("status") if isinstance(response_meta, dict) else None event = response_meta.get("event") if isinstance(response_meta, dict) else None should_clear = status == "ready" or event == "lending_intent_ready" if should_clear: metadata.set_lending_agent({}, user_id=request_user_id, conversation_id=request_conversation_id) if agent_name == "staking": should_clear = False if response_meta: status = response_meta.get("status") if isinstance(response_meta, dict) else None event = response_meta.get("event") if isinstance(response_meta, dict) else None should_clear = status == "ready" or event == "staking_intent_ready" if should_clear: metadata.set_staking_agent({}, user_id=request_user_id, conversation_id=request_conversation_id) return response_payload return { "response": "No response available", "agentName": "supervisor", "transcription": transcribed_text, } except HTTPException: raise except Exception as e: logger.exception( "Audio chat handler failed for user=%s conversation=%s", request_user_id, request_conversation_id, ) raise HTTPException(status_code=500, detail=str(e)) # Include chat manager router app.include_router(chat_manager_router)