"""Live polling helper for Gradio streaming updates.""" from __future__ import annotations import asyncio from collections.abc import AsyncIterator, Awaitable, Callable from dataclasses import dataclass, field from typing import Any, TypeVar from .agent import SmallCodeAgent, Step from .trace_collector import TraceEvent T = TypeVar("T") @dataclass class LiveFrame: steps: list[Step] = field(default_factory=list) events: list[TraceEvent] = field(default_factory=list) files: dict[str, str] = field(default_factory=dict) done: bool = False result: Any = None raw_event: dict | None = None async def run_with_live_updates( coro: Awaitable[T], agent: SmallCodeAgent, *, poll_interval: float = 0.35, ) -> AsyncIterator[LiveFrame]: """Yield snapshots while `coro` runs, then a final frame with the result.""" task = asyncio.create_task(coro) try: while not task.done(): yield _live_snapshot(agent) await asyncio.sleep(poll_interval) result = await task yield _final_snapshot(agent, result=result) except asyncio.CancelledError: task.cancel() raise async def stream_live( make_coro: Callable[[], Awaitable[T]], get_agent: Callable[[], SmallCodeAgent | None], *, poll_interval: float = 0.35, ) -> AsyncIterator[LiveFrame]: """Like run_with_live_updates but agent may appear only after coro starts.""" task = asyncio.create_task(make_coro()) try: while not task.done(): agent = get_agent() yield _live_snapshot(agent) if agent is not None else LiveFrame() await asyncio.sleep(poll_interval) result = await task agent = get_agent() if agent is not None: yield _final_snapshot(agent, result=result) else: yield LiveFrame(done=True, result=result) except asyncio.CancelledError: task.cancel() raise def _live_snapshot(agent: SmallCodeAgent) -> LiveFrame: """A mid-run snapshot. IMPORTANT: never touch the LiteForge agent object (history/state) while a run is in flight — the Rust ToolCallingAgent is not reentrant and `run()` holds an internal lock for its whole duration, so `current_steps()` would deadlock. We read only the trace collector (a plain Python list the wrapped tools append to) and the workspace files (plain disk reads). """ return LiveFrame( steps=[], events=agent.trace_collector.snapshot(), files=agent.files(), done=False, ) def _final_snapshot(agent: SmallCodeAgent, *, result: Any = None) -> LiveFrame: """A post-run snapshot — safe to read the agent now that `run()` has returned.""" return LiveFrame( steps=agent.current_steps(), events=agent.trace_collector.snapshot(), files=agent.files(), done=True, result=result, )