Spaces:
Runtime error
Runtime error
| """Real-time status tracking for AI agent reasoning.""" | |
| from __future__ import annotations | |
| import asyncio | |
| from datetime import datetime | |
| from typing import Optional | |
| from collections import defaultdict | |
| # Global status store | |
| _status_store: dict[str, dict] = defaultdict(lambda: { | |
| "current_step": None, | |
| "steps": [], | |
| "started_at": None, | |
| "completed_at": None, | |
| }) | |
| _subscribers: dict[str, list[asyncio.Queue]] = defaultdict(list) | |
| class WorkflowStep: | |
| FETCH = "fetch" | |
| NORMALIZE = "normalize" | |
| GRADE = "grade" | |
| EXPLAIN = "explain" | |
| GROUP = "group" | |
| REPORT = "report" | |
| EMAIL = "email" | |
| async def add_reasoning_step(session_id: str, step_type: str, content: str, status: str = "completed"): | |
| """Add a reasoning step to show what the AI is doing.""" | |
| store = _status_store[session_id] | |
| if store["started_at"] is None: | |
| store["started_at"] = datetime.utcnow().isoformat() | |
| # Mark any active steps as completed | |
| for s in store["steps"]: | |
| if s.get("status") == "active": | |
| s["status"] = "completed" | |
| step_data = { | |
| "type": step_type, # "tool", "result", "error", "thinking" | |
| "content": content, | |
| "status": status, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| } | |
| store["steps"].append(step_data) | |
| store["current_step"] = step_type if status == "active" else None | |
| # Emit with "reasoning" key for frontend | |
| emit_data = { | |
| "reasoning": store["steps"], | |
| "started_at": store["started_at"], | |
| "completed_at": store["completed_at"], | |
| } | |
| await notify_subscribers(session_id, emit_data) | |
| async def update_status(session_id: str, step: str, status: str = "active", detail: str = ""): | |
| """Update the current workflow status (legacy support).""" | |
| await add_reasoning_step(session_id, step, detail, status) | |
| async def notify_subscribers(session_id: str, status: dict): | |
| """Notify all subscribers of status change.""" | |
| for queue in _subscribers[session_id]: | |
| try: | |
| await queue.put(status.copy()) | |
| except: | |
| pass | |
| def get_status(session_id: str) -> dict: | |
| """Get current status for a session.""" | |
| return dict(_status_store[session_id]) | |
| def reset_status(session_id: str): | |
| """Reset status for a new analysis.""" | |
| _status_store[session_id] = { | |
| "current_step": None, | |
| "steps": [], | |
| "started_at": None, | |
| "completed_at": None, | |
| } | |
| async def subscribe(session_id: str) -> asyncio.Queue: | |
| """Subscribe to status updates for a session.""" | |
| queue = asyncio.Queue() | |
| _subscribers[session_id].append(queue) | |
| return queue | |
| def unsubscribe(session_id: str, queue: asyncio.Queue): | |
| """Unsubscribe from status updates.""" | |
| if queue in _subscribers[session_id]: | |
| _subscribers[session_id].remove(queue) | |