Spaces:
Runtime error
Runtime error
File size: 2,866 Bytes
054d73a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | """Real-time status tracking for AI agent reasoning."""
from __future__ import annotations
import asyncio
from datetime import datetime
from typing import Optional
from collections import defaultdict
# Global status store
_status_store: dict[str, dict] = defaultdict(lambda: {
"current_step": None,
"steps": [],
"started_at": None,
"completed_at": None,
})
_subscribers: dict[str, list[asyncio.Queue]] = defaultdict(list)
class WorkflowStep:
FETCH = "fetch"
NORMALIZE = "normalize"
GRADE = "grade"
EXPLAIN = "explain"
GROUP = "group"
REPORT = "report"
EMAIL = "email"
async def add_reasoning_step(session_id: str, step_type: str, content: str, status: str = "completed"):
"""Add a reasoning step to show what the AI is doing."""
store = _status_store[session_id]
if store["started_at"] is None:
store["started_at"] = datetime.utcnow().isoformat()
# Mark any active steps as completed
for s in store["steps"]:
if s.get("status") == "active":
s["status"] = "completed"
step_data = {
"type": step_type, # "tool", "result", "error", "thinking"
"content": content,
"status": status,
"timestamp": datetime.utcnow().isoformat(),
}
store["steps"].append(step_data)
store["current_step"] = step_type if status == "active" else None
# Emit with "reasoning" key for frontend
emit_data = {
"reasoning": store["steps"],
"started_at": store["started_at"],
"completed_at": store["completed_at"],
}
await notify_subscribers(session_id, emit_data)
async def update_status(session_id: str, step: str, status: str = "active", detail: str = ""):
"""Update the current workflow status (legacy support)."""
await add_reasoning_step(session_id, step, detail, status)
async def notify_subscribers(session_id: str, status: dict):
"""Notify all subscribers of status change."""
for queue in _subscribers[session_id]:
try:
await queue.put(status.copy())
except:
pass
def get_status(session_id: str) -> dict:
"""Get current status for a session."""
return dict(_status_store[session_id])
def reset_status(session_id: str):
"""Reset status for a new analysis."""
_status_store[session_id] = {
"current_step": None,
"steps": [],
"started_at": None,
"completed_at": None,
}
async def subscribe(session_id: str) -> asyncio.Queue:
"""Subscribe to status updates for a session."""
queue = asyncio.Queue()
_subscribers[session_id].append(queue)
return queue
def unsubscribe(session_id: str, queue: asyncio.Queue):
"""Unsubscribe from status updates."""
if queue in _subscribers[session_id]:
_subscribers[session_id].remove(queue)
|