zico-agent / src /app.py
ColettoG's picture
add: fallback and monitoring
acde124
import base64
import json
import os
import time
from typing import List, Optional
from uuid import uuid4
from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from langchain_core.messages import HumanMessage
from pydantic import BaseModel
from prometheus_client import make_asgi_app, Counter, Histogram, Gauge
from src.infrastructure.logging import setup_logging, get_logger
from src.infrastructure.rate_limiter import setup_rate_limiter, limiter
from src.agents.config import Config
from src.agents.supervisor.agent import Supervisor
from src.models.chatMessage import ChatMessage
from src.routes.chat_manager_routes import router as chat_manager_router
from src.service.chat_manager import chat_manager_instance
from src.agents.crypto_data.tools import get_coingecko_id, get_tradingview_symbol
from src.agents.metadata import metadata
# Setup structured logging
log_level = os.getenv("LOG_LEVEL", "DEBUG")
log_format = os.getenv("LOG_FORMAT", "color")
setup_logging(level=log_level, format_type=log_format)
logger = get_logger(__name__)
logger.info("Starting Zico Agent API")
# Initialize FastAPI app
app = FastAPI(
title="Zico Agent API",
version="2.0",
description="Multi-agent AI assistant with streaming support",
)
# Setup rate limiting
setup_rate_limiter(app)
# Enable CORS for local/frontend dev
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Metrics
REQUEST_COUNT = Counter(
"http_requests_total", "Total HTTP requests", ["method", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds", "HTTP request latency", ["method", "endpoint"]
)
ACTIVE_CONVERSATIONS = Gauge(
"active_conversations", "Number of active conversations"
)
# Add metrics endpoint
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
@app.middleware("http")
async def log_request_timing(request: Request, call_next):
request_id = request.headers.get("x-request-id") or str(uuid4())
start = time.perf_counter()
status_code = 500
method = request.method
path = request.url.path
try:
response = await call_next(request)
status_code = response.status_code
response.headers["x-request-id"] = request_id
return response
finally:
duration = time.perf_counter() - start
duration_ms = duration * 1000
# Update metrics
REQUEST_COUNT.labels(method=method, endpoint=path, status=status_code).inc()
REQUEST_LATENCY.labels(method=method, endpoint=path).observe(duration)
logger.info(
"HTTP %s %s -> %s in %.1fms request_id=%s",
method,
path,
status_code,
duration_ms,
request_id,
)
# Instantiate Supervisor agent (singleton LLM with cost tracking)
supervisor = Supervisor(Config.get_llm(with_cost_tracking=True))
class ChatRequest(BaseModel):
message: ChatMessage
chain_id: str = "default"
wallet_address: str = "default"
conversation_id: str = "default"
user_id: str = "anonymous"
# Lightweight in-memory agent config for frontend integrations
AVAILABLE_AGENTS = [
{"name": "default", "human_readable_name": "Default General Purpose", "description": "General chat and meta-queries about agents."},
{"name": "crypto data", "human_readable_name": "Crypto Data Fetcher", "description": "Real-time cryptocurrency prices, market cap, FDV, TVL."},
{"name": "token swap", "human_readable_name": "Token Swap Agent", "description": "Swap tokens using supported DEX APIs."},
{"name": "realtime search", "human_readable_name": "Real-Time Search", "description": "Search the web for recent information."},
{"name": "dexscreener", "human_readable_name": "DexScreener Analyst", "description": "Fetches and analyzes DEX trading data."},
{"name": "rugcheck", "human_readable_name": "Token Safety Analyzer", "description": "Analyzes token safety and trends (Solana)."},
{"name": "imagen", "human_readable_name": "Image Generator", "description": "Generate images from text prompts."},
{"name": "rag", "human_readable_name": "Document Assistant", "description": "Answer questions about uploaded documents."},
{"name": "tweet sizzler", "human_readable_name": "Tweet / X-Post Generator", "description": "Generate engaging tweets."},
{"name": "dca", "human_readable_name": "DCA Strategy Manager", "description": "Plan and manage DCA strategies."},
{"name": "base", "human_readable_name": "Base Transaction Manager", "description": "Handle transactions on Base network."},
{"name": "mor rewards", "human_readable_name": "MOR Rewards Tracker", "description": "Track MOR rewards and balances."},
{"name": "mor claims", "human_readable_name": "MOR Claims Agent", "description": "Claim MOR tokens."},
{"name": "lending", "human_readable_name": "Lending Agent", "description": "Supply, borrow, repay, or withdraw assets."},
]
# Default to a small, reasonable subset
SELECTED_AGENTS = [agent["name"] for agent in AVAILABLE_AGENTS[:6]]
# Commands exposed to the ChatInput autocomplete
AGENT_COMMANDS = [
{"command": "morpheus", "name": "Default General Purpose", "description": "General assistant for simple queries and meta-questions."},
{"command": "crypto", "name": "Crypto Data Fetcher", "description": "Get prices, market cap, FDV, TVL and more."},
{"command": "document", "name": "Document Assistant", "description": "Ask questions about uploaded documents."},
{"command": "tweet", "name": "Tweet / X-Post Generator", "description": "Create engaging tweets about crypto and web3."},
{"command": "search", "name": "Real-Time Search", "description": "Search the web for recent events or updates."},
{"command": "dexscreener", "name": "DexScreener Analyst", "description": "Analyze DEX trading data on supported chains."},
{"command": "rugcheck", "name": "Token Safety Analyzer", "description": "Check token safety and view trending tokens."},
{"command": "dca", "name": "DCA Strategy Manager", "description": "Plan a dollar-cost averaging strategy."},
{"command": "base", "name": "Base Transaction Manager", "description": "Send tokens and swap on Base."},
{"command": "rewards", "name": "MOR Rewards Tracker", "description": "Check rewards balance and accrual."},
{"command": "lending", "name": "Lending Agent", "description": "Supply, borrow, repay, or withdraw assets."},
]
# Agents endpoints expected by the frontend
@app.get("/agents/available")
def get_available_agents():
return {
"selected_agents": SELECTED_AGENTS,
"available_agents": AVAILABLE_AGENTS,
}
@app.post("/agents/selected")
async def set_selected_agents(request: Request):
global SELECTED_AGENTS
data = await request.json()
agents = data.get("agents", [])
# Validate provided names against available agents
available_names = {a["name"] for a in AVAILABLE_AGENTS}
valid_agents = [a for a in agents if a in available_names]
if not valid_agents:
# Keep previous selection if nothing valid provided
return {"status": "no_change", "agents": SELECTED_AGENTS}
# Update selection
SELECTED_AGENTS = valid_agents[:6]
return {"status": "success", "agents": SELECTED_AGENTS}
@app.get("/agents/commands")
def get_agent_commands():
return {"commands": AGENT_COMMANDS}
# Map agent runtime names to high-level types for storage/analytics
def _map_agent_type(agent_name: str) -> str:
mapping = {
"crypto_agent": "crypto data",
"default_agent": "default",
"database_agent": "analysis",
"search_agent": "realtime search",
"swap_agent": "token swap",
"lending_agent": "lending",
"staking_agent": "staking",
"supervisor": "supervisor",
}
return mapping.get(agent_name, "supervisor")
def _sanitize_user_message_content(content: str | None) -> str | None:
"""Strip wrapper prompts (e.g., 'User Message: ...') the frontend might send."""
if not content:
return content
text = content.strip()
marker = "user message:"
lowered = text.lower()
idx = lowered.rfind(marker)
if idx != -1:
candidate = text[idx + len(marker):].strip()
if candidate:
return candidate
return text
def _resolve_identity(request: ChatRequest) -> tuple[str, str]:
"""Ensure each request has a stable user and conversation identifier."""
user_id = (request.user_id or "").strip()
if not user_id or user_id.lower() == "anonymous":
wallet = (request.wallet_address or "").strip()
if wallet and wallet.lower() != "default":
user_id = f"wallet::{wallet.lower()}"
else:
raise HTTPException(
status_code=400,
detail="A stable 'user_id' or wallet_address is required for swap operations.",
)
conversation_id = (request.conversation_id or "").strip() or "default"
return user_id, conversation_id
@app.get("/health")
def health_check():
return {"status": "ok"}
@app.get("/costs")
def get_costs():
"""Get current LLM cost summary."""
cost_tracker = Config.get_cost_tracker()
return cost_tracker.get_summary()
@app.get("/costs/detailed")
def get_detailed_costs():
"""Get detailed LLM cost report."""
cost_tracker = Config.get_cost_tracker()
return cost_tracker.get_detailed_report()
@app.get("/costs/conversation")
def get_conversation_costs(request: Request):
"""Get accumulated LLM costs for a specific conversation."""
params = request.query_params
conversation_id = params.get("conversation_id")
user_id = params.get("user_id")
if not conversation_id or not user_id:
raise HTTPException(
status_code=400,
detail="Both 'conversation_id' and 'user_id' query parameters are required.",
)
costs = chat_manager_instance.get_conversation_costs(
conversation_id=conversation_id,
user_id=user_id,
)
return {
"conversation_id": conversation_id,
"user_id": user_id,
"costs": costs,
}
@app.get("/models")
def get_available_models():
"""List available LLM models."""
return {
"models": Config.list_available_models(),
"providers": Config.list_available_providers(),
"default": Config.DEFAULT_MODEL,
}
@app.get("/chat/messages")
def get_messages(request: Request):
params = request.query_params
conversation_id = params.get("conversation_id", "default")
user_id = params.get("user_id", "anonymous")
return {"messages": chat_manager_instance.get_messages(conversation_id, user_id)}
@app.get("/chat/conversations")
def get_conversations(request: Request):
params = request.query_params
user_id = params.get("user_id", "anonymous")
return {"conversation_ids": chat_manager_instance.get_all_conversation_ids(user_id)}
@app.post("/chat")
async def chat(request: ChatRequest):
user_id: str | None = None
conversation_id: str | None = None
try:
logger.debug("Received chat payload: %s", request.model_dump())
user_id, conversation_id = _resolve_identity(request)
logger.debug(
"Resolved chat identity user=%s conversation=%s wallet=%s",
user_id,
conversation_id,
(request.wallet_address or "").strip() if request.wallet_address else None,
)
wallet = request.wallet_address.strip() if request.wallet_address else None
if wallet and wallet.lower() == "default":
wallet = None
display_name = None
if isinstance(request.message.metadata, dict):
display_name = request.message.metadata.get("display_name")
chat_manager_instance.ensure_session(
user_id,
conversation_id,
wallet_address=wallet,
display_name=display_name,
)
# Track active conversation
ACTIVE_CONVERSATIONS.inc()
if request.message.role == "user":
clean_content = _sanitize_user_message_content(request.message.content)
if clean_content is not None:
request.message.content = clean_content
# Add the user message to the conversation
chat_manager_instance.add_message(
message=request.message.dict(),
conversation_id=conversation_id,
user_id=user_id
)
# Get all messages from the conversation to pass to the agent
conversation_messages = chat_manager_instance.get_messages(
conversation_id=conversation_id,
user_id=user_id
)
# Take cost snapshot before invoking
cost_tracker = Config.get_cost_tracker()
cost_snapshot = cost_tracker.get_snapshot()
# Invoke the supervisor agent with the conversation
result = await supervisor.invoke(
conversation_messages,
conversation_id=conversation_id,
user_id=user_id,
)
# Calculate and save cost delta for this request
cost_delta = cost_tracker.calculate_delta(cost_snapshot)
if cost_delta.get("cost", 0) > 0 or cost_delta.get("calls", 0) > 0:
chat_manager_instance.update_conversation_costs(
cost_delta,
conversation_id=conversation_id,
user_id=user_id,
)
logger.debug(
"Supervisor returned result for user=%s conversation=%s: %s",
user_id,
conversation_id,
result,
)
# Add the agent's response to the conversation
if result and isinstance(result, dict):
agent_name = result.get("agent", "supervisor")
agent_name = _map_agent_type(agent_name)
# Build response metadata and enrich with coin info for crypto price queries
response_metadata = {"supervisor_result": result}
swap_meta_snapshot = None
# Prefer supervisor-provided metadata
if isinstance(result, dict) and result.get("metadata"):
response_metadata.update(result.get("metadata") or {})
elif agent_name == "token swap":
swap_meta = metadata.get_swap_agent(
user_id=user_id,
conversation_id=conversation_id,
)
if swap_meta:
response_metadata.update(swap_meta)
swap_meta_snapshot = swap_meta
elif agent_name == "lending":
lending_meta = metadata.get_lending_agent(
user_id=user_id,
conversation_id=conversation_id,
)
if lending_meta:
response_metadata.update(lending_meta)
elif agent_name == "staking":
staking_meta = metadata.get_staking_agent(
user_id=user_id,
conversation_id=conversation_id,
)
if staking_meta:
response_metadata.update(staking_meta)
logger.debug(
"Response metadata for user=%s conversation=%s: %s",
user_id,
conversation_id,
response_metadata,
)
# Create a ChatMessage from the supervisor response
response_message = ChatMessage(
role="assistant",
content=result.get("response", "No response available"),
agent_name=agent_name,
agent_type=_map_agent_type(agent_name),
metadata=result.get("metadata", {}),
conversation_id=conversation_id,
user_id=user_id,
requires_action=True if agent_name in ["token swap", "lending", "staking"] else False,
action_type="swap" if agent_name == "token swap" else "lending" if agent_name == "lending" else "staking" if agent_name == "staking" else None
)
# Add the response message to the conversation
chat_manager_instance.add_message(
message=response_message.dict(),
conversation_id=conversation_id,
user_id=user_id
)
# Return only the clean response
response_payload = {
"response": result.get("response", "No response available"),
"agentName": agent_name,
}
response_meta = result.get("metadata") or {}
if agent_name == "token swap" and not response_meta:
if swap_meta_snapshot:
response_meta = swap_meta_snapshot
else:
swap_meta = metadata.get_swap_agent(
user_id=user_id,
conversation_id=conversation_id,
)
if swap_meta:
response_meta = swap_meta
if response_meta:
response_payload["metadata"] = response_meta
if agent_name == "token swap":
should_clear = False
if response_meta:
status = response_meta.get("status") if isinstance(response_meta, dict) else None
event = response_meta.get("event") if isinstance(response_meta, dict) else None
should_clear = status == "ready" or event == "swap_intent_ready"
if should_clear:
metadata.set_swap_agent(
{},
user_id=user_id,
conversation_id=conversation_id,
)
if agent_name == "lending":
should_clear = False
if response_meta:
status = response_meta.get("status") if isinstance(response_meta, dict) else None
event = response_meta.get("event") if isinstance(response_meta, dict) else None
should_clear = status == "ready" or event == "lending_intent_ready"
if should_clear:
metadata.set_lending_agent(
{},
user_id=user_id,
conversation_id=conversation_id,
)
if agent_name == "staking":
should_clear = False
if response_meta:
status = response_meta.get("status") if isinstance(response_meta, dict) else None
event = response_meta.get("event") if isinstance(response_meta, dict) else None
should_clear = status == "ready" or event == "staking_intent_ready"
if should_clear:
metadata.set_staking_agent(
{},
user_id=user_id,
conversation_id=conversation_id,
)
return response_payload
return {"response": "No response available", "agent": "supervisor"}
except Exception as e:
logger.exception(
"Chat handler failed for user=%s conversation=%s",
user_id,
conversation_id,
)
raise HTTPException(status_code=500, detail=str(e))
# Supported audio MIME types
AUDIO_MIME_TYPES = {
".mp3": "audio/mpeg",
".wav": "audio/wav",
".flac": "audio/flac",
".ogg": "audio/ogg",
".webm": "audio/webm",
".m4a": "audio/mp4",
".aac": "audio/aac",
}
# Max audio file size (20MB)
MAX_AUDIO_SIZE = 20 * 1024 * 1024
def _get_audio_mime_type(filename: str, content_type: str | None) -> str:
"""Determine the MIME type for an audio file."""
# Try from filename extension first
if filename:
ext = os.path.splitext(filename.lower())[1]
if ext in AUDIO_MIME_TYPES:
return AUDIO_MIME_TYPES[ext]
# Fall back to content type from upload
if content_type and content_type.startswith("audio/"):
return content_type
# Default to mpeg
return "audio/mpeg"
@app.post("/chat/audio")
async def chat_audio(
audio: UploadFile = File(..., description="Audio file (mp3, wav, flac, ogg, webm, m4a)"),
user_id: str = Form(..., description="User ID"),
conversation_id: str = Form(..., description="Conversation ID"),
wallet_address: str = Form("default", description="Wallet address"),
):
"""
Process audio input through the agent pipeline.
The audio is first transcribed using Gemini, then the transcription
is passed to the supervisor agent for processing (just like text input).
"""
request_user_id: str | None = user_id
request_conversation_id: str | None = conversation_id
try:
# Validate user_id
if not user_id or user_id.lower() == "anonymous":
wallet = (wallet_address or "").strip()
if wallet and wallet.lower() != "default":
request_user_id = f"wallet::{wallet.lower()}"
else:
raise HTTPException(
status_code=400,
detail="A stable 'user_id' or wallet_address is required.",
)
logger.debug(
"Received audio chat request user=%s conversation=%s filename=%s",
request_user_id,
request_conversation_id,
audio.filename,
)
# Validate file size
audio_content = await audio.read()
if len(audio_content) > MAX_AUDIO_SIZE:
raise HTTPException(
status_code=413,
detail=f"Audio file too large. Maximum size is {MAX_AUDIO_SIZE // (1024*1024)}MB.",
)
if len(audio_content) == 0:
raise HTTPException(
status_code=400,
detail="Audio file is empty.",
)
# Get MIME type
mime_type = _get_audio_mime_type(audio.filename or "", audio.content_type)
logger.debug("Audio MIME type: %s, size: %d bytes", mime_type, len(audio_content))
# Encode audio to base64
encoded_audio = base64.b64encode(audio_content).decode("utf-8")
# Ensure session exists
wallet = wallet_address.strip() if wallet_address else None
if wallet and wallet.lower() == "default":
wallet = None
chat_manager_instance.ensure_session(
request_user_id,
request_conversation_id,
wallet_address=wallet,
)
# Track active conversation
ACTIVE_CONVERSATIONS.inc()
# Take cost snapshot before invoking
cost_tracker = Config.get_cost_tracker()
cost_snapshot = cost_tracker.get_snapshot()
# Step 1: Transcribe the audio using Gemini
transcription_message = HumanMessage(
content=[
{"type": "text", "text": "Transcribe exactly what is being said in this audio. Return ONLY the transcription, nothing else."},
{"type": "media", "data": encoded_audio, "mime_type": mime_type},
]
)
llm = Config.get_llm(with_cost_tracking=True)
transcription_response = llm.invoke([transcription_message])
# Extract transcription text
transcribed_text = transcription_response.content
if isinstance(transcribed_text, list):
text_parts = []
for part in transcribed_text:
if isinstance(part, dict) and part.get("text"):
text_parts.append(part["text"])
elif isinstance(part, str):
text_parts.append(part)
transcribed_text = " ".join(text_parts).strip()
if not transcribed_text:
raise HTTPException(
status_code=400,
detail="Could not transcribe the audio. Please try again with a clearer recording.",
)
logger.info("Audio transcribed: %s", transcribed_text[:200])
# Step 2: Store the user message with the transcription
user_message = ChatMessage(
role="user",
content=transcribed_text,
metadata={
"source": "audio",
"audio_filename": audio.filename,
"audio_size": len(audio_content),
"audio_mime_type": mime_type,
},
)
chat_manager_instance.add_message(
message=user_message.dict(),
conversation_id=request_conversation_id,
user_id=request_user_id,
)
# Step 3: Get conversation history and invoke supervisor
conversation_messages = chat_manager_instance.get_messages(
conversation_id=request_conversation_id,
user_id=request_user_id,
)
result = await supervisor.invoke(
conversation_messages,
conversation_id=request_conversation_id,
user_id=request_user_id,
)
# Calculate and save cost delta
cost_delta = cost_tracker.calculate_delta(cost_snapshot)
if cost_delta.get("cost", 0) > 0 or cost_delta.get("calls", 0) > 0:
chat_manager_instance.update_conversation_costs(
cost_delta,
conversation_id=request_conversation_id,
user_id=request_user_id,
)
logger.debug(
"Supervisor returned result for audio user=%s conversation=%s: %s",
request_user_id,
request_conversation_id,
result,
)
# Step 4: Process and store the agent response (same as /chat endpoint)
if result and isinstance(result, dict):
agent_name = result.get("agent", "supervisor")
agent_name = _map_agent_type(agent_name)
response_metadata = {"supervisor_result": result, "source": "audio"}
swap_meta_snapshot = None
if isinstance(result, dict) and result.get("metadata"):
response_metadata.update(result.get("metadata") or {})
elif agent_name == "token swap":
swap_meta = metadata.get_swap_agent(
user_id=request_user_id,
conversation_id=request_conversation_id,
)
if swap_meta:
response_metadata.update(swap_meta)
swap_meta_snapshot = swap_meta
elif agent_name == "lending":
lending_meta = metadata.get_lending_agent(
user_id=request_user_id,
conversation_id=request_conversation_id,
)
if lending_meta:
response_metadata.update(lending_meta)
elif agent_name == "staking":
staking_meta = metadata.get_staking_agent(
user_id=request_user_id,
conversation_id=request_conversation_id,
)
if staking_meta:
response_metadata.update(staking_meta)
response_message = ChatMessage(
role="assistant",
content=result.get("response", "No response available"),
agent_name=agent_name,
agent_type=_map_agent_type(agent_name),
metadata=result.get("metadata", {}),
conversation_id=request_conversation_id,
user_id=request_user_id,
requires_action=True if agent_name in ["token swap", "lending", "staking"] else False,
action_type="swap" if agent_name == "token swap" else "lending" if agent_name == "lending" else "staking" if agent_name == "staking" else None,
)
chat_manager_instance.add_message(
message=response_message.dict(),
conversation_id=request_conversation_id,
user_id=request_user_id,
)
# Build response payload
response_payload = {
"response": result.get("response", "No response available"),
"agentName": agent_name,
"transcription": transcribed_text,
}
response_meta = result.get("metadata") or {}
if agent_name == "token swap" and not response_meta:
if swap_meta_snapshot:
response_meta = swap_meta_snapshot
else:
swap_meta = metadata.get_swap_agent(
user_id=request_user_id,
conversation_id=request_conversation_id,
)
if swap_meta:
response_meta = swap_meta
if response_meta:
response_payload["metadata"] = response_meta
# Clear metadata after ready events (same as /chat)
if agent_name == "token swap":
should_clear = False
if response_meta:
status = response_meta.get("status") if isinstance(response_meta, dict) else None
event = response_meta.get("event") if isinstance(response_meta, dict) else None
should_clear = status == "ready" or event == "swap_intent_ready"
if should_clear:
metadata.set_swap_agent({}, user_id=request_user_id, conversation_id=request_conversation_id)
if agent_name == "lending":
should_clear = False
if response_meta:
status = response_meta.get("status") if isinstance(response_meta, dict) else None
event = response_meta.get("event") if isinstance(response_meta, dict) else None
should_clear = status == "ready" or event == "lending_intent_ready"
if should_clear:
metadata.set_lending_agent({}, user_id=request_user_id, conversation_id=request_conversation_id)
if agent_name == "staking":
should_clear = False
if response_meta:
status = response_meta.get("status") if isinstance(response_meta, dict) else None
event = response_meta.get("event") if isinstance(response_meta, dict) else None
should_clear = status == "ready" or event == "staking_intent_ready"
if should_clear:
metadata.set_staking_agent({}, user_id=request_user_id, conversation_id=request_conversation_id)
return response_payload
return {
"response": "No response available",
"agentName": "supervisor",
"transcription": transcribed_text,
}
except HTTPException:
raise
except Exception as e:
logger.exception(
"Audio chat handler failed for user=%s conversation=%s",
request_user_id,
request_conversation_id,
)
raise HTTPException(status_code=500, detail=str(e))
# Include chat manager router
app.include_router(chat_manager_router)