avfranco's picture
HF Space deploy snapshot (minimal allow-list)
d64fd55
import logging
import json
import time
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
from .models import RouteDecision
from .hooks import RouterHook, NoOpHook
from .compact import compact_raw_context
from llm.base import LLMClient
from llm.errors import StructuredOutputError
from observability import logger as obs_logger
from observability import components as obs_components
logger = logging.getLogger(__name__)
ROUTER_INSTRUCTION = """
You are a routing agent for a running analysis system.
Your goal is to decide whether the user's message requires a visualization (CHART) or a conversational response (CHAT).
- ROUTE TO 'CHART' if the user explicitly asks for a plot, chart, graph, or visualization of their data.
- ROUTE TO 'CHAT' for general questions, advice, summaries, or text-based communication.
For 'CHART' routes, extract:
- 'metric': heart_rate, pace, volume, frequency, zones, etc.
- 'period':
- Use 'current_week', 'this_month', 'last_week', 'last_30_days'.
- Use 'all_time' ONLY if the user says "all", "history", "everything", "all my runs".
- If NO timeframe is mentioned, set 'period' to null.
- 'target_date':
- If the user refers to a specific week (e.g. 'last week'), resolve it to the START DATE (Monday) of that week in YYYY-MM-DD format based on today's date: {today}.
- For 'this month', you can leave 'period' as 'this_month' and 'target_date' as null.
EXAMPLES:
- "Plot my HR" -> metric='heart_rate', period=null, target_date=null
- "Show my pace this month" -> metric='pace', period='this_month', target_date=null
- "Show all my runs" -> metric=null, period='all_time', target_date=null
- "Last week's pace" -> metric='pace', period='last_week', target_date='[RELEVANT_MONDAY]'
Return a JSON object conforming to the RouteDecision schema.
"""
def route(
user_query: str,
raw_context: Dict[str, Any],
*,
llm_client: LLMClient,
hook: Optional[RouterHook] = None,
) -> RouteDecision:
"""
Determines the routing for a user query using an LLM.
Fails safely to 'CHAT' on any error.
"""
# This is a synchronous wrapper for an async operation if needed,
# but since our llm_client.chat is async, we should probably make this async too.
# Actually, the user requested 'def route' in implementation tasks 2,
# but the orchestrator chat is async. Let's make it async.
pass
async def async_route(
user_query: str,
raw_context: Dict[str, Any],
*,
llm_client: LLMClient,
hook: Optional[RouterHook] = None,
) -> RouteDecision:
"""
asynchronous version of route.
"""
with obs_logger.start_span("router.async_route", obs_components.ROUTER):
start_time = time.time()
hook = hook or NoOpHook()
compacted_context = compact_raw_context(raw_context)
prompt = (
f"User Query: {user_query}\n\nContext Summary: {json.dumps(compacted_context, default=str)}"
)
try:
from datetime import date
today_str = date.today().isoformat()
decision = await llm_client.chat(
messages=[{"role": "user", "content": prompt}],
instruction=ROUTER_INSTRUCTION.format(today=today_str),
schema=RouteDecision,
name="router",
)
# In case the client returns a dict (depending on implementation details)
if isinstance(decision, dict):
decision = RouteDecision(**decision)
hook.on_route_decision(
route=decision.route,
provider=getattr(llm_client, "provider", "unknown"),
model=getattr(llm_client, "model_name", "unknown"),
fallback=False,
)
obs_logger.log_event(
"info",
f"Router decision: {decision.route}",
component=obs_components.ROUTER,
fields={"route": decision.route},
)
return decision
except (StructuredOutputError, Exception) as e:
duration_ms = (time.time() - start_time) * 1000
reason = type(e).__name__
logger.warning(f"Router failed ({reason}), falling back to CHAT. Error: {e}")
fallback_decision = RouteDecision(route="CHAT")
hook.on_route_decision(
route=fallback_decision.route,
provider=getattr(llm_client, "provider", "unknown"),
model=getattr(llm_client, "model_name", "unknown"),
fallback=True,
reason=reason,
)
obs_logger.log_event(
"error",
f"Router failed: {e}",
event="error",
component=obs_components.ROUTER,
duration_ms=duration_ms,
)
return fallback_decision