Spaces:
Paused
Paused
| """Python facade over the Rust smolcode agent engine (smolcode_core).""" | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import tempfile | |
| from collections.abc import Awaitable, Callable | |
| from dataclasses import dataclass, field | |
| from typing import Any | |
| from .trace_collector import TraceCollector, TraceEvent | |
| try: | |
| import smolcode_core as _rust | |
| except ImportError: | |
| _rust = None # type: ignore | |
| def rust_available() -> bool: | |
| return _rust is not None | |
| ApprovalHandler = Callable[[str], Awaitable[bool]] | |
| class RustRunResult: | |
| final: str | |
| hit_max_steps: bool = False | |
| errored: bool = False | |
| class RustSession: | |
| """Thin wrapper around smolcode_core.Session.""" | |
| def __init__( | |
| self, | |
| *, | |
| workspace: str | None = None, | |
| agent: str = "build", | |
| yolo: bool = False, | |
| model: str | None = None, | |
| base_url: str | None = None, | |
| api_key: str | None = None, | |
| profile: str = "full", | |
| approval_handler: ApprovalHandler | None = None, | |
| ) -> None: | |
| if _rust is None: | |
| raise RuntimeError( | |
| "smolcode_core is not installed; build with " | |
| "`maturin develop --release` in smolcode-cli/crates/smolcode-py" | |
| ) | |
| if workspace is None: | |
| workspace = os.environ.get( | |
| "SMALLCODE_WORKSPACE", | |
| tempfile.mkdtemp(prefix="smolcode-"), | |
| ) | |
| self._session = _rust.Session( | |
| workspace=workspace, | |
| agent=agent, | |
| yolo=yolo, | |
| model=model, | |
| base_url=base_url, | |
| api_key=api_key, | |
| profile=profile, | |
| ) | |
| self.trace_collector = TraceCollector() | |
| self.approval_handler = approval_handler | |
| self.hit_max_steps = False | |
| self.errored = False | |
| self._steps: list[dict[str, Any]] = [] | |
| self._final: str = "" | |
| self._cancelled = False | |
| def request_cancel(self) -> None: | |
| self._cancelled = True | |
| self.cancel_turn() | |
| def cancelled(self) -> bool: | |
| return self._cancelled | |
| def clear_cancel(self) -> None: | |
| self._cancelled = False | |
| def session_id(self) -> str: | |
| return self._session.session_id | |
| def workspace_path(self) -> str: | |
| return self._session.workspace() | |
| def set_model(self, model: str) -> None: | |
| self._session.set_model(model) | |
| def set_agent(self, agent: str) -> None: | |
| self._session.set_agent(agent) | |
| def set_think(self, level: str) -> None: | |
| self._session.set_think(level) | |
| def register_tool(self, name: str, fn: Callable[[dict], dict]) -> None: | |
| self._session.register_tool(name, fn) | |
| def files(self) -> dict[str, str]: | |
| out: dict[str, str] = {} | |
| for path in self._session.workspace_files(): | |
| content = self._session.read_file(path) | |
| if content is not None: | |
| out[path] = content | |
| return out | |
| def run_shell(self, command: str) -> str: | |
| return self._session.run_shell(command) | |
| async def run( | |
| self, | |
| task: str, | |
| *, | |
| think: str | None = None, | |
| yolo: bool | None = None, | |
| ) -> RustRunResult: | |
| """Run one agent turn to completion.""" | |
| self.hit_max_steps = False | |
| self.errored = False | |
| self._final = "" | |
| self.clear_cancel() | |
| self._session.start_turn(task, think=think, yolo=yolo) | |
| final_text = "" | |
| while True: | |
| if self._cancelled: | |
| break | |
| ev = await asyncio.to_thread(self._session.poll_event) | |
| if ev is None: | |
| await asyncio.sleep(0.05) | |
| continue | |
| kind = ev.get("kind") | |
| if kind == "approval": | |
| approved = True | |
| if self.approval_handler is not None: | |
| approved = await self.approval_handler(ev.get("desc", "")) | |
| elif not (yolo if yolo is not None else False): | |
| approved = False | |
| self._session.approve(approved) | |
| continue | |
| self._ingest_event(ev) | |
| if kind == "final": | |
| final_text = ev.get("text", "") | |
| if kind == "done": | |
| break | |
| if kind == "error": | |
| self.errored = True | |
| self._final = final_text | |
| if "step" in self._final.lower() and "without finishing" in self._final.lower(): | |
| self.hit_max_steps = True | |
| self._session.record_turn(task, final_text) | |
| return RustRunResult( | |
| final=final_text, | |
| hit_max_steps=self.hit_max_steps, | |
| errored=self.errored, | |
| ) | |
| async def poll_events_once(self) -> list[dict[str, Any]]: | |
| """Non-blocking poll for live UI updates during a turn.""" | |
| events: list[dict[str, Any]] = [] | |
| while True: | |
| ev = await asyncio.to_thread(self._session.poll_event) | |
| if ev is None: | |
| break | |
| kind = ev.get("kind") | |
| if kind == "approval": | |
| approved = True | |
| if self.approval_handler is not None: | |
| approved = await self.approval_handler(ev.get("desc", "")) | |
| self._session.approve(approved) | |
| continue | |
| self._ingest_event(ev) | |
| events.append(ev) | |
| if kind in ("done",): | |
| break | |
| return events | |
| def _ingest_event(self, ev: dict[str, Any]) -> None: | |
| kind = ev.get("kind") | |
| if kind == "tool_call": | |
| args_raw = ev.get("args", "{}") | |
| try: | |
| args = json.loads(args_raw) if isinstance(args_raw, str) else args_raw | |
| except json.JSONDecodeError: | |
| args = {"raw": args_raw} | |
| self.trace_collector.record_tool_call(ev.get("name", ""), args) | |
| elif kind == "tool_result": | |
| text = ev.get("text", "") | |
| try: | |
| result = json.loads(text) | |
| except json.JSONDecodeError: | |
| result = {"output": text} | |
| self.trace_collector.record_tool_result(ev.get("name", ""), result) | |
| elif kind == "final": | |
| self.trace_collector.record_final(ev.get("text", "")) | |
| elif kind == "error": | |
| self.trace_collector.record_error(ev.get("text", "")) | |
| self.errored = True | |
| def save(self) -> None: | |
| self._session.save() | |
| def list_sessions() -> list[dict[str, Any]]: | |
| if _rust is None: | |
| return [] | |
| return _rust.Session.list_sessions() | |
| def load_session(self, session_id: str) -> bool: | |
| return self._session.load_session(session_id) | |
| def fork(self) -> str | None: | |
| return self._session.fork() | |
| def rename(self, title: str) -> bool: | |
| return self._session.rename(title) | |
| def delete(self) -> bool: | |
| return self._session.delete() | |
| def cancel_turn(self) -> None: | |
| self._session.cancel_turn() | |
| def render_config(self) -> str: | |
| return self._session.render_config() | |
| def render_config(session: RustSession) -> str: | |
| return session.render_config() | |
| def apply_settings(session: RustSession, settings: Any) -> None: | |
| """Apply UI settings to a live Rust session before each agent turn. | |
| The "auto" / "auto:<size>" pseudo-selections are NOT real model tags — the Router | |
| picks the model and sets it on the session (see router.run_live), so we must not | |
| push them via set_model. Only concrete pins are applied here. | |
| """ | |
| session.set_think(settings.think) | |
| model = settings.model or "" | |
| if model and model != "auto" and not model.startswith("auto:"): | |
| session.set_model(model) | |
| session.set_agent(settings.agent) | |
| def list_commands(workspace: str) -> list[str]: | |
| if _rust is None: | |
| return [] | |
| return _rust.list_commands(workspace) | |
| def expand_command(workspace: str, name: str, args: str = "") -> str | None: | |
| if _rust is None: | |
| return None | |
| return _rust.expand_command(workspace, name, args) | |
| def list_rules(workspace: str) -> list[dict[str, Any]]: | |
| if _rust is None: | |
| return [] | |
| return _rust.list_rules(workspace) | |
| def list_skills(workspace: str) -> list[dict[str, Any]]: | |
| if _rust is None: | |
| return [] | |
| return _rust.list_skills(workspace) | |
| def expand_skill(workspace: str, name: str, args: str = "") -> str | None: | |
| if _rust is None: | |
| return None | |
| return _rust.expand_skill(workspace, name, args) | |
| def list_mcp(session: RustSession) -> list[dict[str, Any]]: | |
| return session._session.list_mcp() | |
| def list_background_jobs() -> str: | |
| if _rust is None: | |
| return "" | |
| return _rust.list_background_jobs() | |
| def write_agents_md(workspace: str) -> str: | |
| if _rust is None: | |
| raise RuntimeError("smolcode_core not installed") | |
| return _rust.write_agents_md(workspace) | |
| def git_status(workspace: str) -> str: | |
| if _rust is None: | |
| return "" | |
| return _rust.git_status(workspace) | |
| def workspace_tree(workspace: str, depth: int = 3) -> str: | |
| if _rust is None: | |
| return "" | |
| return _rust.workspace_tree(workspace, depth=depth) | |
| UI_FILE_LIMIT = 1500 | |
| AUTOCOMPLETE_FILE_LIMIT = 200 | |
| ATTACH_FILE_MAX_BYTES = 8192 | |
| def read_workspace_file( | |
| workspace: str, | |
| path: str, | |
| *, | |
| max_bytes: int = ATTACH_FILE_MAX_BYTES, | |
| rust: RustSession | None = None, | |
| ) -> str | None: | |
| """Read a workspace file for @-attachment inlining. Returns None if missing.""" | |
| if _rust is None: | |
| return None | |
| try: | |
| session = rust if rust is not None else RustSession(workspace=workspace, yolo=True) | |
| content = session._session.read_file(path) | |
| if content is None: | |
| return None | |
| if len(content) > max_bytes: | |
| return content[:max_bytes] + "\n… (truncated)" | |
| return content | |
| except Exception: | |
| return None | |
| def workspace_paths(workspace: str, *, limit: int = UI_FILE_LIMIT) -> tuple[list[str], int]: | |
| """Workspace paths for UI sidebars (no file reads). Returns (paths, total_count).""" | |
| if _rust is None: | |
| return [], 0 | |
| session = RustSession(workspace=workspace, yolo=True) | |
| paths = sorted(session._session.workspace_files()) | |
| total = len(paths) | |
| if total > limit: | |
| paths = paths[:limit] | |
| return paths, total | |
| def workspace_files(workspace: str) -> dict[str, str]: | |
| session = RustSession(workspace=workspace, yolo=True) | |
| return session.files() | |
| def export_transcript(session_id: str, path: str | None = None) -> str: | |
| if _rust is None: | |
| raise RuntimeError("smolcode_core not installed") | |
| return _rust.export_transcript(session_id, path) | |
| def session_timeline(session_id: str) -> list[str]: | |
| if _rust is None: | |
| return [] | |
| return _rust.session_timeline(session_id) | |
| def get_session_chat(session_id: str) -> list[dict[str, str]]: | |
| if _rust is None: | |
| return [] | |
| return _rust.get_session_chat(session_id) | |
| def chat_from_stored(lines: list[dict[str, str]]) -> list[dict[str, str]]: | |
| """Convert stored session lines to Gradio chat messages.""" | |
| out: list[dict[str, str]] = [] | |
| for m in lines: | |
| role = m.get("role", "assistant") | |
| text = m.get("text", "") | |
| if role == "user": | |
| out.append({"role": "user", "content": text}) | |
| else: | |
| out.append({"role": "assistant", "content": text}) | |
| return out | |
| def session_choices() -> list[str]: | |
| """Dropdown labels: `title (id)`.""" | |
| return [ | |
| f"{r['title']} ({r['id']})" | |
| for r in RustSession.list_sessions() | |
| ] | |
| def parse_session_label(label: str) -> str | None: | |
| if not label or "(" not in label: | |
| return None | |
| return label.rsplit("(", 1)[-1].rstrip(")") | |
| def load_rust_config( | |
| *, | |
| model: str | None = None, | |
| base_url: str | None = None, | |
| api_key: str | None = None, | |
| agent: str | None = None, | |
| yolo: bool = False, | |
| ) -> dict[str, Any]: | |
| """Load layered config.toml via Rust Config.""" | |
| if _rust is None: | |
| return {} | |
| cfg = _rust.Config.load( | |
| model=model, | |
| base_url=base_url, | |
| api_key=api_key, | |
| agent=agent, | |
| yolo=yolo, | |
| ) | |
| return { | |
| "model": cfg.model, | |
| "base_url": cfg.base_url, | |
| "agent": cfg.agent, | |
| "yolo": cfg.yolo, | |
| } | |