Spaces:
Paused
Paused
| """Live polling helper for Gradio streaming updates.""" | |
| from __future__ import annotations | |
| import asyncio | |
| from collections.abc import AsyncIterator, Awaitable, Callable | |
| from dataclasses import dataclass, field | |
| from typing import Any, TypeVar | |
| from .agent import SmallCodeAgent, Step | |
| from .trace_collector import TraceEvent | |
| T = TypeVar("T") | |
| class LiveFrame: | |
| steps: list[Step] = field(default_factory=list) | |
| events: list[TraceEvent] = field(default_factory=list) | |
| files: dict[str, str] = field(default_factory=dict) | |
| done: bool = False | |
| result: Any = None | |
| raw_event: dict | None = None | |
| async def run_with_live_updates( | |
| coro: Awaitable[T], | |
| agent: SmallCodeAgent, | |
| *, | |
| poll_interval: float = 0.35, | |
| ) -> AsyncIterator[LiveFrame]: | |
| """Yield snapshots while `coro` runs, then a final frame with the result.""" | |
| task = asyncio.create_task(coro) | |
| try: | |
| while not task.done(): | |
| yield _live_snapshot(agent) | |
| await asyncio.sleep(poll_interval) | |
| result = await task | |
| yield _final_snapshot(agent, result=result) | |
| except asyncio.CancelledError: | |
| task.cancel() | |
| raise | |
| async def stream_live( | |
| make_coro: Callable[[], Awaitable[T]], | |
| get_agent: Callable[[], SmallCodeAgent | None], | |
| *, | |
| poll_interval: float = 0.35, | |
| ) -> AsyncIterator[LiveFrame]: | |
| """Like run_with_live_updates but agent may appear only after coro starts.""" | |
| task = asyncio.create_task(make_coro()) | |
| try: | |
| while not task.done(): | |
| agent = get_agent() | |
| yield _live_snapshot(agent) if agent is not None else LiveFrame() | |
| await asyncio.sleep(poll_interval) | |
| result = await task | |
| agent = get_agent() | |
| if agent is not None: | |
| yield _final_snapshot(agent, result=result) | |
| else: | |
| yield LiveFrame(done=True, result=result) | |
| except asyncio.CancelledError: | |
| task.cancel() | |
| raise | |
| def _live_snapshot(agent: SmallCodeAgent) -> LiveFrame: | |
| """A mid-run snapshot. | |
| IMPORTANT: never touch the LiteForge agent object (history/state) while a run | |
| is in flight — the Rust ToolCallingAgent is not reentrant and `run()` holds an | |
| internal lock for its whole duration, so `current_steps()` would deadlock. We | |
| read only the trace collector (a plain Python list the wrapped tools append to) | |
| and the workspace files (plain disk reads). | |
| """ | |
| return LiveFrame( | |
| steps=[], | |
| events=agent.trace_collector.snapshot(), | |
| files=agent.files(), | |
| done=False, | |
| ) | |
| def _final_snapshot(agent: SmallCodeAgent, *, result: Any = None) -> LiveFrame: | |
| """A post-run snapshot — safe to read the agent now that `run()` has returned.""" | |
| return LiveFrame( | |
| steps=agent.current_steps(), | |
| events=agent.trace_collector.snapshot(), | |
| files=agent.files(), | |
| done=True, | |
| result=result, | |
| ) | |