Spaces:
Running
Running
| import base64 | |
| import json | |
| import os | |
| import time | |
| from typing import List, Optional | |
| from uuid import uuid4 | |
| from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from langchain_core.messages import HumanMessage | |
| from pydantic import BaseModel | |
| from prometheus_client import make_asgi_app, Counter, Histogram, Gauge | |
| from src.infrastructure.logging import setup_logging, get_logger | |
| from src.infrastructure.rate_limiter import setup_rate_limiter, limiter | |
| from src.agents.config import Config | |
| from src.agents.supervisor.agent import Supervisor | |
| from src.models.chatMessage import ChatMessage | |
| from src.routes.chat_manager_routes import router as chat_manager_router | |
| from src.service.chat_manager import chat_manager_instance | |
| from src.agents.crypto_data.tools import get_coingecko_id, get_tradingview_symbol | |
| from src.agents.metadata import metadata | |
| # Setup structured logging | |
| log_level = os.getenv("LOG_LEVEL", "DEBUG") | |
| log_format = os.getenv("LOG_FORMAT", "color") | |
| setup_logging(level=log_level, format_type=log_format) | |
| logger = get_logger(__name__) | |
| logger.info("Starting Zico Agent API") | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Zico Agent API", | |
| version="2.0", | |
| description="Multi-agent AI assistant with streaming support", | |
| ) | |
| # Setup rate limiting | |
| setup_rate_limiter(app) | |
| # Enable CORS for local/frontend dev | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Metrics | |
| REQUEST_COUNT = Counter( | |
| "http_requests_total", "Total HTTP requests", ["method", "endpoint", "status"] | |
| ) | |
| REQUEST_LATENCY = Histogram( | |
| "http_request_duration_seconds", "HTTP request latency", ["method", "endpoint"] | |
| ) | |
| ACTIVE_CONVERSATIONS = Gauge( | |
| "active_conversations", "Number of active conversations" | |
| ) | |
| # Add metrics endpoint | |
| metrics_app = make_asgi_app() | |
| app.mount("/metrics", metrics_app) | |
| async def log_request_timing(request: Request, call_next): | |
| request_id = request.headers.get("x-request-id") or str(uuid4()) | |
| start = time.perf_counter() | |
| status_code = 500 | |
| method = request.method | |
| path = request.url.path | |
| try: | |
| response = await call_next(request) | |
| status_code = response.status_code | |
| response.headers["x-request-id"] = request_id | |
| return response | |
| finally: | |
| duration = time.perf_counter() - start | |
| duration_ms = duration * 1000 | |
| # Update metrics | |
| REQUEST_COUNT.labels(method=method, endpoint=path, status=status_code).inc() | |
| REQUEST_LATENCY.labels(method=method, endpoint=path).observe(duration) | |
| logger.info( | |
| "HTTP %s %s -> %s in %.1fms request_id=%s", | |
| method, | |
| path, | |
| status_code, | |
| duration_ms, | |
| request_id, | |
| ) | |
| # Instantiate Supervisor agent (singleton LLM with cost tracking) | |
| supervisor = Supervisor(Config.get_llm(with_cost_tracking=True)) | |
| class ChatRequest(BaseModel): | |
| message: ChatMessage | |
| chain_id: str = "default" | |
| wallet_address: str = "default" | |
| conversation_id: str = "default" | |
| user_id: str = "anonymous" | |
| # Lightweight in-memory agent config for frontend integrations | |
| AVAILABLE_AGENTS = [ | |
| {"name": "default", "human_readable_name": "Default General Purpose", "description": "General chat and meta-queries about agents."}, | |
| {"name": "crypto data", "human_readable_name": "Crypto Data Fetcher", "description": "Real-time cryptocurrency prices, market cap, FDV, TVL."}, | |
| {"name": "token swap", "human_readable_name": "Token Swap Agent", "description": "Swap tokens using supported DEX APIs."}, | |
| {"name": "realtime search", "human_readable_name": "Real-Time Search", "description": "Search the web for recent information."}, | |
| {"name": "dexscreener", "human_readable_name": "DexScreener Analyst", "description": "Fetches and analyzes DEX trading data."}, | |
| {"name": "rugcheck", "human_readable_name": "Token Safety Analyzer", "description": "Analyzes token safety and trends (Solana)."}, | |
| {"name": "imagen", "human_readable_name": "Image Generator", "description": "Generate images from text prompts."}, | |
| {"name": "rag", "human_readable_name": "Document Assistant", "description": "Answer questions about uploaded documents."}, | |
| {"name": "tweet sizzler", "human_readable_name": "Tweet / X-Post Generator", "description": "Generate engaging tweets."}, | |
| {"name": "dca", "human_readable_name": "DCA Strategy Manager", "description": "Plan and manage DCA strategies."}, | |
| {"name": "base", "human_readable_name": "Base Transaction Manager", "description": "Handle transactions on Base network."}, | |
| {"name": "mor rewards", "human_readable_name": "MOR Rewards Tracker", "description": "Track MOR rewards and balances."}, | |
| {"name": "mor claims", "human_readable_name": "MOR Claims Agent", "description": "Claim MOR tokens."}, | |
| {"name": "lending", "human_readable_name": "Lending Agent", "description": "Supply, borrow, repay, or withdraw assets."}, | |
| ] | |
| # Default to a small, reasonable subset | |
| SELECTED_AGENTS = [agent["name"] for agent in AVAILABLE_AGENTS[:6]] | |
| # Commands exposed to the ChatInput autocomplete | |
| AGENT_COMMANDS = [ | |
| {"command": "morpheus", "name": "Default General Purpose", "description": "General assistant for simple queries and meta-questions."}, | |
| {"command": "crypto", "name": "Crypto Data Fetcher", "description": "Get prices, market cap, FDV, TVL and more."}, | |
| {"command": "document", "name": "Document Assistant", "description": "Ask questions about uploaded documents."}, | |
| {"command": "tweet", "name": "Tweet / X-Post Generator", "description": "Create engaging tweets about crypto and web3."}, | |
| {"command": "search", "name": "Real-Time Search", "description": "Search the web for recent events or updates."}, | |
| {"command": "dexscreener", "name": "DexScreener Analyst", "description": "Analyze DEX trading data on supported chains."}, | |
| {"command": "rugcheck", "name": "Token Safety Analyzer", "description": "Check token safety and view trending tokens."}, | |
| {"command": "dca", "name": "DCA Strategy Manager", "description": "Plan a dollar-cost averaging strategy."}, | |
| {"command": "base", "name": "Base Transaction Manager", "description": "Send tokens and swap on Base."}, | |
| {"command": "rewards", "name": "MOR Rewards Tracker", "description": "Check rewards balance and accrual."}, | |
| {"command": "lending", "name": "Lending Agent", "description": "Supply, borrow, repay, or withdraw assets."}, | |
| ] | |
| # Agents endpoints expected by the frontend | |
| def get_available_agents(): | |
| return { | |
| "selected_agents": SELECTED_AGENTS, | |
| "available_agents": AVAILABLE_AGENTS, | |
| } | |
| async def set_selected_agents(request: Request): | |
| global SELECTED_AGENTS | |
| data = await request.json() | |
| agents = data.get("agents", []) | |
| # Validate provided names against available agents | |
| available_names = {a["name"] for a in AVAILABLE_AGENTS} | |
| valid_agents = [a for a in agents if a in available_names] | |
| if not valid_agents: | |
| # Keep previous selection if nothing valid provided | |
| return {"status": "no_change", "agents": SELECTED_AGENTS} | |
| # Update selection | |
| SELECTED_AGENTS = valid_agents[:6] | |
| return {"status": "success", "agents": SELECTED_AGENTS} | |
| def get_agent_commands(): | |
| return {"commands": AGENT_COMMANDS} | |
| # Map agent runtime names to high-level types for storage/analytics | |
| def _map_agent_type(agent_name: str) -> str: | |
| mapping = { | |
| "crypto_agent": "crypto data", | |
| "default_agent": "default", | |
| "database_agent": "analysis", | |
| "search_agent": "realtime search", | |
| "swap_agent": "token swap", | |
| "lending_agent": "lending", | |
| "staking_agent": "staking", | |
| "supervisor": "supervisor", | |
| } | |
| return mapping.get(agent_name, "supervisor") | |
| def _sanitize_user_message_content(content: str | None) -> str | None: | |
| """Strip wrapper prompts (e.g., 'User Message: ...') the frontend might send.""" | |
| if not content: | |
| return content | |
| text = content.strip() | |
| marker = "user message:" | |
| lowered = text.lower() | |
| idx = lowered.rfind(marker) | |
| if idx != -1: | |
| candidate = text[idx + len(marker):].strip() | |
| if candidate: | |
| return candidate | |
| return text | |
| def _resolve_identity(request: ChatRequest) -> tuple[str, str]: | |
| """Ensure each request has a stable user and conversation identifier.""" | |
| user_id = (request.user_id or "").strip() | |
| if not user_id or user_id.lower() == "anonymous": | |
| wallet = (request.wallet_address or "").strip() | |
| if wallet and wallet.lower() != "default": | |
| user_id = f"wallet::{wallet.lower()}" | |
| else: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="A stable 'user_id' or wallet_address is required for swap operations.", | |
| ) | |
| conversation_id = (request.conversation_id or "").strip() or "default" | |
| return user_id, conversation_id | |
| def health_check(): | |
| return {"status": "ok"} | |
| def get_costs(): | |
| """Get current LLM cost summary.""" | |
| cost_tracker = Config.get_cost_tracker() | |
| return cost_tracker.get_summary() | |
| def get_detailed_costs(): | |
| """Get detailed LLM cost report.""" | |
| cost_tracker = Config.get_cost_tracker() | |
| return cost_tracker.get_detailed_report() | |
| def get_conversation_costs(request: Request): | |
| """Get accumulated LLM costs for a specific conversation.""" | |
| params = request.query_params | |
| conversation_id = params.get("conversation_id") | |
| user_id = params.get("user_id") | |
| if not conversation_id or not user_id: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Both 'conversation_id' and 'user_id' query parameters are required.", | |
| ) | |
| costs = chat_manager_instance.get_conversation_costs( | |
| conversation_id=conversation_id, | |
| user_id=user_id, | |
| ) | |
| return { | |
| "conversation_id": conversation_id, | |
| "user_id": user_id, | |
| "costs": costs, | |
| } | |
| def get_available_models(): | |
| """List available LLM models.""" | |
| return { | |
| "models": Config.list_available_models(), | |
| "providers": Config.list_available_providers(), | |
| "default": Config.DEFAULT_MODEL, | |
| } | |
| def get_messages(request: Request): | |
| params = request.query_params | |
| conversation_id = params.get("conversation_id", "default") | |
| user_id = params.get("user_id", "anonymous") | |
| return {"messages": chat_manager_instance.get_messages(conversation_id, user_id)} | |
| def get_conversations(request: Request): | |
| params = request.query_params | |
| user_id = params.get("user_id", "anonymous") | |
| return {"conversation_ids": chat_manager_instance.get_all_conversation_ids(user_id)} | |
| async def chat(request: ChatRequest): | |
| user_id: str | None = None | |
| conversation_id: str | None = None | |
| try: | |
| logger.debug("Received chat payload: %s", request.model_dump()) | |
| user_id, conversation_id = _resolve_identity(request) | |
| logger.debug( | |
| "Resolved chat identity user=%s conversation=%s wallet=%s", | |
| user_id, | |
| conversation_id, | |
| (request.wallet_address or "").strip() if request.wallet_address else None, | |
| ) | |
| wallet = request.wallet_address.strip() if request.wallet_address else None | |
| if wallet and wallet.lower() == "default": | |
| wallet = None | |
| display_name = None | |
| if isinstance(request.message.metadata, dict): | |
| display_name = request.message.metadata.get("display_name") | |
| chat_manager_instance.ensure_session( | |
| user_id, | |
| conversation_id, | |
| wallet_address=wallet, | |
| display_name=display_name, | |
| ) | |
| # Track active conversation | |
| ACTIVE_CONVERSATIONS.inc() | |
| if request.message.role == "user": | |
| clean_content = _sanitize_user_message_content(request.message.content) | |
| if clean_content is not None: | |
| request.message.content = clean_content | |
| # Add the user message to the conversation | |
| chat_manager_instance.add_message( | |
| message=request.message.dict(), | |
| conversation_id=conversation_id, | |
| user_id=user_id | |
| ) | |
| # Get all messages from the conversation to pass to the agent | |
| conversation_messages = chat_manager_instance.get_messages( | |
| conversation_id=conversation_id, | |
| user_id=user_id | |
| ) | |
| # Take cost snapshot before invoking | |
| cost_tracker = Config.get_cost_tracker() | |
| cost_snapshot = cost_tracker.get_snapshot() | |
| # Invoke the supervisor agent with the conversation | |
| result = await supervisor.invoke( | |
| conversation_messages, | |
| conversation_id=conversation_id, | |
| user_id=user_id, | |
| ) | |
| # Calculate and save cost delta for this request | |
| cost_delta = cost_tracker.calculate_delta(cost_snapshot) | |
| if cost_delta.get("cost", 0) > 0 or cost_delta.get("calls", 0) > 0: | |
| chat_manager_instance.update_conversation_costs( | |
| cost_delta, | |
| conversation_id=conversation_id, | |
| user_id=user_id, | |
| ) | |
| logger.debug( | |
| "Supervisor returned result for user=%s conversation=%s: %s", | |
| user_id, | |
| conversation_id, | |
| result, | |
| ) | |
| # Add the agent's response to the conversation | |
| if result and isinstance(result, dict): | |
| agent_name = result.get("agent", "supervisor") | |
| agent_name = _map_agent_type(agent_name) | |
| # Build response metadata and enrich with coin info for crypto price queries | |
| response_metadata = {"supervisor_result": result} | |
| swap_meta_snapshot = None | |
| # Prefer supervisor-provided metadata | |
| if isinstance(result, dict) and result.get("metadata"): | |
| response_metadata.update(result.get("metadata") or {}) | |
| elif agent_name == "token swap": | |
| swap_meta = metadata.get_swap_agent( | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if swap_meta: | |
| response_metadata.update(swap_meta) | |
| swap_meta_snapshot = swap_meta | |
| elif agent_name == "lending": | |
| lending_meta = metadata.get_lending_agent( | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if lending_meta: | |
| response_metadata.update(lending_meta) | |
| elif agent_name == "staking": | |
| staking_meta = metadata.get_staking_agent( | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if staking_meta: | |
| response_metadata.update(staking_meta) | |
| logger.debug( | |
| "Response metadata for user=%s conversation=%s: %s", | |
| user_id, | |
| conversation_id, | |
| response_metadata, | |
| ) | |
| # Create a ChatMessage from the supervisor response | |
| response_message = ChatMessage( | |
| role="assistant", | |
| content=result.get("response", "No response available"), | |
| agent_name=agent_name, | |
| agent_type=_map_agent_type(agent_name), | |
| metadata=result.get("metadata", {}), | |
| conversation_id=conversation_id, | |
| user_id=user_id, | |
| requires_action=True if agent_name in ["token swap", "lending", "staking"] else False, | |
| action_type="swap" if agent_name == "token swap" else "lending" if agent_name == "lending" else "staking" if agent_name == "staking" else None | |
| ) | |
| # Add the response message to the conversation | |
| chat_manager_instance.add_message( | |
| message=response_message.dict(), | |
| conversation_id=conversation_id, | |
| user_id=user_id | |
| ) | |
| # Return only the clean response | |
| response_payload = { | |
| "response": result.get("response", "No response available"), | |
| "agentName": agent_name, | |
| } | |
| response_meta = result.get("metadata") or {} | |
| if agent_name == "token swap" and not response_meta: | |
| if swap_meta_snapshot: | |
| response_meta = swap_meta_snapshot | |
| else: | |
| swap_meta = metadata.get_swap_agent( | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if swap_meta: | |
| response_meta = swap_meta | |
| if response_meta: | |
| response_payload["metadata"] = response_meta | |
| if agent_name == "token swap": | |
| should_clear = False | |
| if response_meta: | |
| status = response_meta.get("status") if isinstance(response_meta, dict) else None | |
| event = response_meta.get("event") if isinstance(response_meta, dict) else None | |
| should_clear = status == "ready" or event == "swap_intent_ready" | |
| if should_clear: | |
| metadata.set_swap_agent( | |
| {}, | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if agent_name == "lending": | |
| should_clear = False | |
| if response_meta: | |
| status = response_meta.get("status") if isinstance(response_meta, dict) else None | |
| event = response_meta.get("event") if isinstance(response_meta, dict) else None | |
| should_clear = status == "ready" or event == "lending_intent_ready" | |
| if should_clear: | |
| metadata.set_lending_agent( | |
| {}, | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if agent_name == "staking": | |
| should_clear = False | |
| if response_meta: | |
| status = response_meta.get("status") if isinstance(response_meta, dict) else None | |
| event = response_meta.get("event") if isinstance(response_meta, dict) else None | |
| should_clear = status == "ready" or event == "staking_intent_ready" | |
| if should_clear: | |
| metadata.set_staking_agent( | |
| {}, | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| return response_payload | |
| return {"response": "No response available", "agent": "supervisor"} | |
| except Exception as e: | |
| logger.exception( | |
| "Chat handler failed for user=%s conversation=%s", | |
| user_id, | |
| conversation_id, | |
| ) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Supported audio MIME types | |
| AUDIO_MIME_TYPES = { | |
| ".mp3": "audio/mpeg", | |
| ".wav": "audio/wav", | |
| ".flac": "audio/flac", | |
| ".ogg": "audio/ogg", | |
| ".webm": "audio/webm", | |
| ".m4a": "audio/mp4", | |
| ".aac": "audio/aac", | |
| } | |
| # Max audio file size (20MB) | |
| MAX_AUDIO_SIZE = 20 * 1024 * 1024 | |
| def _get_audio_mime_type(filename: str, content_type: str | None) -> str: | |
| """Determine the MIME type for an audio file.""" | |
| # Try from filename extension first | |
| if filename: | |
| ext = os.path.splitext(filename.lower())[1] | |
| if ext in AUDIO_MIME_TYPES: | |
| return AUDIO_MIME_TYPES[ext] | |
| # Fall back to content type from upload | |
| if content_type and content_type.startswith("audio/"): | |
| return content_type | |
| # Default to mpeg | |
| return "audio/mpeg" | |
| async def chat_audio( | |
| audio: UploadFile = File(..., description="Audio file (mp3, wav, flac, ogg, webm, m4a)"), | |
| user_id: str = Form(..., description="User ID"), | |
| conversation_id: str = Form(..., description="Conversation ID"), | |
| wallet_address: str = Form("default", description="Wallet address"), | |
| ): | |
| """ | |
| Process audio input through the agent pipeline. | |
| The audio is first transcribed using Gemini, then the transcription | |
| is passed to the supervisor agent for processing (just like text input). | |
| """ | |
| request_user_id: str | None = user_id | |
| request_conversation_id: str | None = conversation_id | |
| try: | |
| # Validate user_id | |
| if not user_id or user_id.lower() == "anonymous": | |
| wallet = (wallet_address or "").strip() | |
| if wallet and wallet.lower() != "default": | |
| request_user_id = f"wallet::{wallet.lower()}" | |
| else: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="A stable 'user_id' or wallet_address is required.", | |
| ) | |
| logger.debug( | |
| "Received audio chat request user=%s conversation=%s filename=%s", | |
| request_user_id, | |
| request_conversation_id, | |
| audio.filename, | |
| ) | |
| # Validate file size | |
| audio_content = await audio.read() | |
| if len(audio_content) > MAX_AUDIO_SIZE: | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"Audio file too large. Maximum size is {MAX_AUDIO_SIZE // (1024*1024)}MB.", | |
| ) | |
| if len(audio_content) == 0: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Audio file is empty.", | |
| ) | |
| # Get MIME type | |
| mime_type = _get_audio_mime_type(audio.filename or "", audio.content_type) | |
| logger.debug("Audio MIME type: %s, size: %d bytes", mime_type, len(audio_content)) | |
| # Encode audio to base64 | |
| encoded_audio = base64.b64encode(audio_content).decode("utf-8") | |
| # Ensure session exists | |
| wallet = wallet_address.strip() if wallet_address else None | |
| if wallet and wallet.lower() == "default": | |
| wallet = None | |
| chat_manager_instance.ensure_session( | |
| request_user_id, | |
| request_conversation_id, | |
| wallet_address=wallet, | |
| ) | |
| # Track active conversation | |
| ACTIVE_CONVERSATIONS.inc() | |
| # Take cost snapshot before invoking | |
| cost_tracker = Config.get_cost_tracker() | |
| cost_snapshot = cost_tracker.get_snapshot() | |
| # Step 1: Transcribe the audio using Gemini | |
| transcription_message = HumanMessage( | |
| content=[ | |
| {"type": "text", "text": "Transcribe exactly what is being said in this audio. Return ONLY the transcription, nothing else."}, | |
| {"type": "media", "data": encoded_audio, "mime_type": mime_type}, | |
| ] | |
| ) | |
| llm = Config.get_llm(with_cost_tracking=True) | |
| transcription_response = llm.invoke([transcription_message]) | |
| # Extract transcription text | |
| transcribed_text = transcription_response.content | |
| if isinstance(transcribed_text, list): | |
| text_parts = [] | |
| for part in transcribed_text: | |
| if isinstance(part, dict) and part.get("text"): | |
| text_parts.append(part["text"]) | |
| elif isinstance(part, str): | |
| text_parts.append(part) | |
| transcribed_text = " ".join(text_parts).strip() | |
| if not transcribed_text: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Could not transcribe the audio. Please try again with a clearer recording.", | |
| ) | |
| logger.info("Audio transcribed: %s", transcribed_text[:200]) | |
| # Step 2: Store the user message with the transcription | |
| user_message = ChatMessage( | |
| role="user", | |
| content=transcribed_text, | |
| metadata={ | |
| "source": "audio", | |
| "audio_filename": audio.filename, | |
| "audio_size": len(audio_content), | |
| "audio_mime_type": mime_type, | |
| }, | |
| ) | |
| chat_manager_instance.add_message( | |
| message=user_message.dict(), | |
| conversation_id=request_conversation_id, | |
| user_id=request_user_id, | |
| ) | |
| # Step 3: Get conversation history and invoke supervisor | |
| conversation_messages = chat_manager_instance.get_messages( | |
| conversation_id=request_conversation_id, | |
| user_id=request_user_id, | |
| ) | |
| result = await supervisor.invoke( | |
| conversation_messages, | |
| conversation_id=request_conversation_id, | |
| user_id=request_user_id, | |
| ) | |
| # Calculate and save cost delta | |
| cost_delta = cost_tracker.calculate_delta(cost_snapshot) | |
| if cost_delta.get("cost", 0) > 0 or cost_delta.get("calls", 0) > 0: | |
| chat_manager_instance.update_conversation_costs( | |
| cost_delta, | |
| conversation_id=request_conversation_id, | |
| user_id=request_user_id, | |
| ) | |
| logger.debug( | |
| "Supervisor returned result for audio user=%s conversation=%s: %s", | |
| request_user_id, | |
| request_conversation_id, | |
| result, | |
| ) | |
| # Step 4: Process and store the agent response (same as /chat endpoint) | |
| if result and isinstance(result, dict): | |
| agent_name = result.get("agent", "supervisor") | |
| agent_name = _map_agent_type(agent_name) | |
| response_metadata = {"supervisor_result": result, "source": "audio"} | |
| swap_meta_snapshot = None | |
| if isinstance(result, dict) and result.get("metadata"): | |
| response_metadata.update(result.get("metadata") or {}) | |
| elif agent_name == "token swap": | |
| swap_meta = metadata.get_swap_agent( | |
| user_id=request_user_id, | |
| conversation_id=request_conversation_id, | |
| ) | |
| if swap_meta: | |
| response_metadata.update(swap_meta) | |
| swap_meta_snapshot = swap_meta | |
| elif agent_name == "lending": | |
| lending_meta = metadata.get_lending_agent( | |
| user_id=request_user_id, | |
| conversation_id=request_conversation_id, | |
| ) | |
| if lending_meta: | |
| response_metadata.update(lending_meta) | |
| elif agent_name == "staking": | |
| staking_meta = metadata.get_staking_agent( | |
| user_id=request_user_id, | |
| conversation_id=request_conversation_id, | |
| ) | |
| if staking_meta: | |
| response_metadata.update(staking_meta) | |
| response_message = ChatMessage( | |
| role="assistant", | |
| content=result.get("response", "No response available"), | |
| agent_name=agent_name, | |
| agent_type=_map_agent_type(agent_name), | |
| metadata=result.get("metadata", {}), | |
| conversation_id=request_conversation_id, | |
| user_id=request_user_id, | |
| requires_action=True if agent_name in ["token swap", "lending", "staking"] else False, | |
| action_type="swap" if agent_name == "token swap" else "lending" if agent_name == "lending" else "staking" if agent_name == "staking" else None, | |
| ) | |
| chat_manager_instance.add_message( | |
| message=response_message.dict(), | |
| conversation_id=request_conversation_id, | |
| user_id=request_user_id, | |
| ) | |
| # Build response payload | |
| response_payload = { | |
| "response": result.get("response", "No response available"), | |
| "agentName": agent_name, | |
| "transcription": transcribed_text, | |
| } | |
| response_meta = result.get("metadata") or {} | |
| if agent_name == "token swap" and not response_meta: | |
| if swap_meta_snapshot: | |
| response_meta = swap_meta_snapshot | |
| else: | |
| swap_meta = metadata.get_swap_agent( | |
| user_id=request_user_id, | |
| conversation_id=request_conversation_id, | |
| ) | |
| if swap_meta: | |
| response_meta = swap_meta | |
| if response_meta: | |
| response_payload["metadata"] = response_meta | |
| # Clear metadata after ready events (same as /chat) | |
| if agent_name == "token swap": | |
| should_clear = False | |
| if response_meta: | |
| status = response_meta.get("status") if isinstance(response_meta, dict) else None | |
| event = response_meta.get("event") if isinstance(response_meta, dict) else None | |
| should_clear = status == "ready" or event == "swap_intent_ready" | |
| if should_clear: | |
| metadata.set_swap_agent({}, user_id=request_user_id, conversation_id=request_conversation_id) | |
| if agent_name == "lending": | |
| should_clear = False | |
| if response_meta: | |
| status = response_meta.get("status") if isinstance(response_meta, dict) else None | |
| event = response_meta.get("event") if isinstance(response_meta, dict) else None | |
| should_clear = status == "ready" or event == "lending_intent_ready" | |
| if should_clear: | |
| metadata.set_lending_agent({}, user_id=request_user_id, conversation_id=request_conversation_id) | |
| if agent_name == "staking": | |
| should_clear = False | |
| if response_meta: | |
| status = response_meta.get("status") if isinstance(response_meta, dict) else None | |
| event = response_meta.get("event") if isinstance(response_meta, dict) else None | |
| should_clear = status == "ready" or event == "staking_intent_ready" | |
| if should_clear: | |
| metadata.set_staking_agent({}, user_id=request_user_id, conversation_id=request_conversation_id) | |
| return response_payload | |
| return { | |
| "response": "No response available", | |
| "agentName": "supervisor", | |
| "transcription": transcribed_text, | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.exception( | |
| "Audio chat handler failed for user=%s conversation=%s", | |
| request_user_id, | |
| request_conversation_id, | |
| ) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Include chat manager router | |
| app.include_router(chat_manager_router) | |