ClassLens / chatkit /backend /app /status_tracker.py
chih.yikuan
πŸš€ ExamInsight: AI-powered exam analysis for teachers
054d73a
"""Real-time status tracking for AI agent reasoning."""
from __future__ import annotations
import asyncio
from datetime import datetime
from typing import Optional
from collections import defaultdict
# Global status store
_status_store: dict[str, dict] = defaultdict(lambda: {
"current_step": None,
"steps": [],
"started_at": None,
"completed_at": None,
})
_subscribers: dict[str, list[asyncio.Queue]] = defaultdict(list)
class WorkflowStep:
FETCH = "fetch"
NORMALIZE = "normalize"
GRADE = "grade"
EXPLAIN = "explain"
GROUP = "group"
REPORT = "report"
EMAIL = "email"
async def add_reasoning_step(session_id: str, step_type: str, content: str, status: str = "completed"):
"""Add a reasoning step to show what the AI is doing."""
store = _status_store[session_id]
if store["started_at"] is None:
store["started_at"] = datetime.utcnow().isoformat()
# Mark any active steps as completed
for s in store["steps"]:
if s.get("status") == "active":
s["status"] = "completed"
step_data = {
"type": step_type, # "tool", "result", "error", "thinking"
"content": content,
"status": status,
"timestamp": datetime.utcnow().isoformat(),
}
store["steps"].append(step_data)
store["current_step"] = step_type if status == "active" else None
# Emit with "reasoning" key for frontend
emit_data = {
"reasoning": store["steps"],
"started_at": store["started_at"],
"completed_at": store["completed_at"],
}
await notify_subscribers(session_id, emit_data)
async def update_status(session_id: str, step: str, status: str = "active", detail: str = ""):
"""Update the current workflow status (legacy support)."""
await add_reasoning_step(session_id, step, detail, status)
async def notify_subscribers(session_id: str, status: dict):
"""Notify all subscribers of status change."""
for queue in _subscribers[session_id]:
try:
await queue.put(status.copy())
except:
pass
def get_status(session_id: str) -> dict:
"""Get current status for a session."""
return dict(_status_store[session_id])
def reset_status(session_id: str):
"""Reset status for a new analysis."""
_status_store[session_id] = {
"current_step": None,
"steps": [],
"started_at": None,
"completed_at": None,
}
async def subscribe(session_id: str) -> asyncio.Queue:
"""Subscribe to status updates for a session."""
queue = asyncio.Queue()
_subscribers[session_id].append(queue)
return queue
def unsubscribe(session_id: str, queue: asyncio.Queue):
"""Unsubscribe from status updates."""
if queue in _subscribers[session_id]:
_subscribers[session_id].remove(queue)