File size: 2,866 Bytes
054d73a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""Real-time status tracking for AI agent reasoning."""

from __future__ import annotations

import asyncio
from datetime import datetime
from typing import Optional
from collections import defaultdict

# Global status store
_status_store: dict[str, dict] = defaultdict(lambda: {
    "current_step": None,
    "steps": [],
    "started_at": None,
    "completed_at": None,
})

_subscribers: dict[str, list[asyncio.Queue]] = defaultdict(list)


class WorkflowStep:
    FETCH = "fetch"
    NORMALIZE = "normalize"
    GRADE = "grade"
    EXPLAIN = "explain"
    GROUP = "group"
    REPORT = "report"
    EMAIL = "email"


async def add_reasoning_step(session_id: str, step_type: str, content: str, status: str = "completed"):
    """Add a reasoning step to show what the AI is doing."""
    store = _status_store[session_id]
    
    if store["started_at"] is None:
        store["started_at"] = datetime.utcnow().isoformat()
    
    # Mark any active steps as completed
    for s in store["steps"]:
        if s.get("status") == "active":
            s["status"] = "completed"
    
    step_data = {
        "type": step_type,  # "tool", "result", "error", "thinking"
        "content": content,
        "status": status,
        "timestamp": datetime.utcnow().isoformat(),
    }
    
    store["steps"].append(step_data)
    store["current_step"] = step_type if status == "active" else None
    
    # Emit with "reasoning" key for frontend
    emit_data = {
        "reasoning": store["steps"],
        "started_at": store["started_at"],
        "completed_at": store["completed_at"],
    }
    
    await notify_subscribers(session_id, emit_data)


async def update_status(session_id: str, step: str, status: str = "active", detail: str = ""):
    """Update the current workflow status (legacy support)."""
    await add_reasoning_step(session_id, step, detail, status)


async def notify_subscribers(session_id: str, status: dict):
    """Notify all subscribers of status change."""
    for queue in _subscribers[session_id]:
        try:
            await queue.put(status.copy())
        except:
            pass


def get_status(session_id: str) -> dict:
    """Get current status for a session."""
    return dict(_status_store[session_id])


def reset_status(session_id: str):
    """Reset status for a new analysis."""
    _status_store[session_id] = {
        "current_step": None,
        "steps": [],
        "started_at": None,
        "completed_at": None,
    }


async def subscribe(session_id: str) -> asyncio.Queue:
    """Subscribe to status updates for a session."""
    queue = asyncio.Queue()
    _subscribers[session_id].append(queue)
    return queue


def unsubscribe(session_id: str, queue: asyncio.Queue):
    """Unsubscribe from status updates."""
    if queue in _subscribers[session_id]:
        _subscribers[session_id].remove(queue)