"""Real-time status tracking for AI agent reasoning.""" from __future__ import annotations import asyncio from datetime import datetime from typing import Optional from collections import defaultdict # Global status store _status_store: dict[str, dict] = defaultdict(lambda: { "current_step": None, "steps": [], "started_at": None, "completed_at": None, }) _subscribers: dict[str, list[asyncio.Queue]] = defaultdict(list) class WorkflowStep: FETCH = "fetch" NORMALIZE = "normalize" GRADE = "grade" EXPLAIN = "explain" GROUP = "group" REPORT = "report" EMAIL = "email" async def add_reasoning_step(session_id: str, step_type: str, content: str, status: str = "completed"): """Add a reasoning step to show what the AI is doing.""" store = _status_store[session_id] if store["started_at"] is None: store["started_at"] = datetime.utcnow().isoformat() # Mark any active steps as completed for s in store["steps"]: if s.get("status") == "active": s["status"] = "completed" step_data = { "type": step_type, # "tool", "result", "error", "thinking" "content": content, "status": status, "timestamp": datetime.utcnow().isoformat(), } store["steps"].append(step_data) store["current_step"] = step_type if status == "active" else None # Emit with "reasoning" key for frontend emit_data = { "reasoning": store["steps"], "started_at": store["started_at"], "completed_at": store["completed_at"], } await notify_subscribers(session_id, emit_data) async def update_status(session_id: str, step: str, status: str = "active", detail: str = ""): """Update the current workflow status (legacy support).""" await add_reasoning_step(session_id, step, detail, status) async def notify_subscribers(session_id: str, status: dict): """Notify all subscribers of status change.""" for queue in _subscribers[session_id]: try: await queue.put(status.copy()) except: pass def get_status(session_id: str) -> dict: """Get current status for a session.""" return dict(_status_store[session_id]) def reset_status(session_id: str): """Reset status for a new analysis.""" _status_store[session_id] = { "current_step": None, "steps": [], "started_at": None, "completed_at": None, } async def subscribe(session_id: str) -> asyncio.Queue: """Subscribe to status updates for a session.""" queue = asyncio.Queue() _subscribers[session_id].append(queue) return queue def unsubscribe(session_id: str, queue: asyncio.Queue): """Unsubscribe from status updates.""" if queue in _subscribers[session_id]: _subscribers[session_id].remove(queue)