smolcode / engine /live_run.py
seanpoyner's picture
Upload folder using huggingface_hub
daea45b verified
Raw
History Blame Contribute Delete
2.94 kB
"""Live polling helper for Gradio streaming updates."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator, Awaitable, Callable
from dataclasses import dataclass, field
from typing import Any, TypeVar
from .agent import SmallCodeAgent, Step
from .trace_collector import TraceEvent
T = TypeVar("T")
@dataclass
class LiveFrame:
steps: list[Step] = field(default_factory=list)
events: list[TraceEvent] = field(default_factory=list)
files: dict[str, str] = field(default_factory=dict)
done: bool = False
result: Any = None
raw_event: dict | None = None
async def run_with_live_updates(
coro: Awaitable[T],
agent: SmallCodeAgent,
*,
poll_interval: float = 0.35,
) -> AsyncIterator[LiveFrame]:
"""Yield snapshots while `coro` runs, then a final frame with the result."""
task = asyncio.create_task(coro)
try:
while not task.done():
yield _live_snapshot(agent)
await asyncio.sleep(poll_interval)
result = await task
yield _final_snapshot(agent, result=result)
except asyncio.CancelledError:
task.cancel()
raise
async def stream_live(
make_coro: Callable[[], Awaitable[T]],
get_agent: Callable[[], SmallCodeAgent | None],
*,
poll_interval: float = 0.35,
) -> AsyncIterator[LiveFrame]:
"""Like run_with_live_updates but agent may appear only after coro starts."""
task = asyncio.create_task(make_coro())
try:
while not task.done():
agent = get_agent()
yield _live_snapshot(agent) if agent is not None else LiveFrame()
await asyncio.sleep(poll_interval)
result = await task
agent = get_agent()
if agent is not None:
yield _final_snapshot(agent, result=result)
else:
yield LiveFrame(done=True, result=result)
except asyncio.CancelledError:
task.cancel()
raise
def _live_snapshot(agent: SmallCodeAgent) -> LiveFrame:
"""A mid-run snapshot.
IMPORTANT: never touch the LiteForge agent object (history/state) while a run
is in flight — the Rust ToolCallingAgent is not reentrant and `run()` holds an
internal lock for its whole duration, so `current_steps()` would deadlock. We
read only the trace collector (a plain Python list the wrapped tools append to)
and the workspace files (plain disk reads).
"""
return LiveFrame(
steps=[],
events=agent.trace_collector.snapshot(),
files=agent.files(),
done=False,
)
def _final_snapshot(agent: SmallCodeAgent, *, result: Any = None) -> LiveFrame:
"""A post-run snapshot — safe to read the agent now that `run()` has returned."""
return LiveFrame(
steps=agent.current_steps(),
events=agent.trace_collector.snapshot(),
files=agent.files(),
done=True,
result=result,
)