| """ |
| Coding Agent Runner |
| |
| Manages the lifecycle of a coding agent session. Mirrors AgentRunner |
| but adapted for terminal-based coding agents (no Playwright). |
| |
| State machine: IDLE → RUNNING → PAUSED → COMPLETED → ERROR |
| Communication: SSE listener pattern (same as AgentRunner) |
| """ |
|
|
| import json |
| import logging |
| import os |
| import threading |
| import time |
| import uuid |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Any, Callable, Dict, List, Optional |
|
|
| from .coding_agent_backend import ( |
| CodingAgentBackend, |
| CodingAgentEvent, |
| CodingAgentEventType, |
| create_backend, |
| ) |
| from .coding_agent_branch import BranchManager |
| from .coding_agent_checkpoint import CheckpointManager |
| from .coding_agent_sandbox import SandboxManager |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class CodingAgentState(str, Enum): |
| IDLE = "idle" |
| RUNNING = "running" |
| PAUSED = "paused" |
| COMPLETED = "completed" |
| ERROR = "error" |
|
|
|
|
| @dataclass |
| class CodingAgentConfig: |
| """Configuration for a coding agent session.""" |
| backend_type: str = "ollama_tool_use" |
| ai_config: Dict[str, Any] = field(default_factory=dict) |
| working_dir: str = "." |
| max_turns: int = 50 |
| system_prompt: str = "" |
| sandbox_mode: str = "worktree" |
|
|
| @classmethod |
| def from_config(cls, config: dict) -> "CodingAgentConfig": |
| """Create from YAML config dict.""" |
| live_config = config.get("live_coding_agent", {}) |
| return cls( |
| backend_type=live_config.get("backend_type", "ollama_tool_use"), |
| ai_config=live_config.get("ai_config", {}), |
| working_dir=live_config.get("working_dir", "."), |
| max_turns=live_config.get("max_turns", 50), |
| system_prompt=live_config.get("system_prompt", ""), |
| sandbox_mode=live_config.get("sandbox_mode", "worktree"), |
| ) |
|
|
|
|
| class CodingAgentRunner: |
| """Manages a coding agent session with SSE event broadcasting.""" |
|
|
| def __init__(self, session_id: str, config: CodingAgentConfig, |
| trace_dir: str = ""): |
| self.session_id = session_id |
| self.config = config |
| self.trace_dir = trace_dir |
|
|
| self._state = CodingAgentState.IDLE |
| self._state_lock = threading.Lock() |
| self._listeners: List[Callable] = [] |
| self._listener_lock = threading.Lock() |
|
|
| self._backend: Optional[CodingAgentBackend] = None |
| self._sandbox: Optional[SandboxManager] = None |
| self._checkpoint_mgr: Optional[CheckpointManager] = None |
| self._branch_mgr: Optional[BranchManager] = None |
| self._structured_turns: List[Dict] = [] |
| self._task_description = "" |
| self._started_at = 0.0 |
| self._event_thread: Optional[threading.Thread] = None |
|
|
| @property |
| def state(self) -> CodingAgentState: |
| with self._state_lock: |
| return self._state |
|
|
| def _set_state(self, new_state: CodingAgentState): |
| with self._state_lock: |
| old = self._state |
| self._state = new_state |
| self._emit_event("state_change", {"old_state": old.value, "new_state": new_state.value}) |
|
|
| |
|
|
| def add_listener(self, callback: Callable) -> None: |
| with self._listener_lock: |
| self._listeners.append(callback) |
|
|
| def remove_listener(self, callback: Callable) -> None: |
| with self._listener_lock: |
| self._listeners = [l for l in self._listeners if l is not callback] |
|
|
| def _emit_event(self, event_type: str, data: dict): |
| with self._listener_lock: |
| for listener in self._listeners: |
| try: |
| listener(event_type, data) |
| except Exception: |
| pass |
|
|
| |
|
|
| def start(self, task_description: str) -> None: |
| """Start the coding agent session.""" |
| if self.state != CodingAgentState.IDLE: |
| raise RuntimeError(f"Cannot start in state {self.state}") |
|
|
| self._task_description = task_description |
| self._started_at = time.time() |
| self._structured_turns = [] |
|
|
| |
| self._sandbox = SandboxManager( |
| mode=self.config.sandbox_mode, |
| base_dir=os.path.abspath(self.config.working_dir), |
| ) |
| working_dir = self._sandbox.create(self.session_id) |
|
|
| |
| self._checkpoint_mgr = CheckpointManager(working_dir, self.session_id) |
| self._checkpoint_mgr.init() |
| self._branch_mgr = BranchManager(self.session_id, working_dir) |
|
|
| |
| backend_config = { |
| "ai_config": self.config.ai_config, |
| "max_turns": self.config.max_turns, |
| } |
| self._backend = create_backend(self.config.backend_type, backend_config) |
|
|
| |
| self._backend.start(task_description, working_dir, self.config.system_prompt) |
| self._set_state(CodingAgentState.RUNNING) |
|
|
| |
| self._event_thread = threading.Thread(target=self._consume_events, daemon=True) |
| self._event_thread.start() |
|
|
| self._emit_event("started", { |
| "session_id": self.session_id, |
| "task": task_description, |
| "backend": self.config.backend_type, |
| }) |
|
|
| def pause(self) -> None: |
| if self._backend: |
| self._backend.pause() |
| self._set_state(CodingAgentState.PAUSED) |
|
|
| def resume(self) -> None: |
| if self._backend: |
| self._backend.resume() |
| self._set_state(CodingAgentState.RUNNING) |
|
|
| def inject_instruction(self, instruction: str) -> None: |
| if self._backend: |
| self._backend.inject_instruction(instruction) |
| self._emit_event("instruction_received", {"instruction": instruction}) |
|
|
| def stop(self) -> None: |
| if self._backend: |
| self._backend.stop() |
| self._set_state(CodingAgentState.COMPLETED) |
| self._save_trace() |
|
|
| |
|
|
| def _consume_events(self): |
| """Consume events from the backend and broadcast via SSE.""" |
| if not self._backend: |
| return |
|
|
| current_turn: Optional[Dict] = None |
|
|
| try: |
| for event in self._backend.get_events(): |
| et = event.event_type |
| data = event.data |
|
|
| if et == CodingAgentEventType.THINKING: |
| self._emit_event("thinking", data) |
|
|
| elif et == CodingAgentEventType.TOOL_CALL_START: |
| self._emit_event("tool_call_start", data) |
|
|
| elif et == CodingAgentEventType.TOOL_CALL_END: |
| self._emit_event("tool_call", data) |
|
|
| |
| tool_name = data.get("tool", "") |
| if tool_name.lower() in ("edit", "write", "bash", "create", "replace"): |
| turn_idx = data.get("turn_index", len(self._structured_turns)) |
| if self._checkpoint_mgr: |
| cp_id = self._checkpoint_mgr.create_checkpoint( |
| turn_idx, tool_name, |
| f"{tool_name} at step {turn_idx}", |
| ) |
| if cp_id: |
| self._emit_event("checkpoint", { |
| "step_index": turn_idx, |
| "checkpoint_id": cp_id, |
| "tool": tool_name, |
| }) |
|
|
| elif et == CodingAgentEventType.TURN_END: |
| |
| turn = { |
| "role": "assistant", |
| "content": data.get("content", ""), |
| "tool_calls": data.get("tool_calls", []), |
| } |
| self._structured_turns.append(turn) |
| self._emit_event("turn_end", { |
| "turn_index": data.get("turn_index", len(self._structured_turns) - 1), |
| **turn, |
| }) |
|
|
| elif et == CodingAgentEventType.ERROR: |
| self._set_state(CodingAgentState.ERROR) |
| self._emit_event("error", data) |
|
|
| elif et == CodingAgentEventType.COMPLETE: |
| self._set_state(CodingAgentState.COMPLETED) |
| self._emit_event("complete", { |
| "total_turns": len(self._structured_turns), |
| }) |
| self._save_trace() |
|
|
| except Exception as e: |
| logger.exception("Error consuming backend events") |
| self._set_state(CodingAgentState.ERROR) |
| self._emit_event("error", {"message": str(e)}) |
|
|
| |
|
|
| def rollback_to_step(self, step_index: int) -> bool: |
| """Rollback files and conversation to the given step. |
| |
| Pauses the agent, restores files, truncates history. |
| Returns True on success. |
| """ |
| if self._backend: |
| self._backend.pause() |
| self._set_state(CodingAgentState.PAUSED) |
|
|
| |
| if self._checkpoint_mgr: |
| if not self._checkpoint_mgr.rollback_to(step_index): |
| return False |
|
|
| |
| self._structured_turns = self._structured_turns[:step_index + 1] |
|
|
| |
| if self._backend: |
| self._backend.truncate_history(step_index + 1) |
|
|
| self._emit_event("rollback", { |
| "step_index": step_index, |
| "remaining_turns": len(self._structured_turns), |
| }) |
| return True |
|
|
| def get_checkpoints(self) -> List[dict]: |
| """Return list of checkpoint metadata.""" |
| if self._checkpoint_mgr: |
| return self._checkpoint_mgr.list_checkpoints() |
| return [] |
|
|
| def replay_from_step(self, step_index: int, |
| instructions: Optional[str] = None, |
| edited_actions: Optional[List[Dict]] = None) -> Optional[str]: |
| """Create a new branch from step_index and resume the agent. |
| |
| Args: |
| step_index: Step to branch from |
| instructions: Optional new instructions to inject |
| edited_actions: Optional modified tool calls to execute first |
| |
| Returns: |
| The new branch_id, or None on failure. |
| """ |
| |
| if self._backend: |
| self._backend.pause() |
| self._set_state(CodingAgentState.PAUSED) |
|
|
| |
| if not self._branch_mgr: |
| return None |
|
|
| active_id = self._branch_mgr.active_branch_id |
| try: |
| branch = self._branch_mgr.create_branch( |
| active_id, step_index, |
| instructions=instructions, |
| edited_actions=edited_actions, |
| ) |
| except Exception as e: |
| logger.error(f"Failed to create branch: {e}") |
| return None |
|
|
| |
| if self._checkpoint_mgr: |
| self._checkpoint_mgr.rollback_to(step_index) |
|
|
| |
| self._structured_turns = list(branch.turns) |
| if self._backend: |
| self._backend.truncate_history(step_index + 1) |
|
|
| |
| if edited_actions: |
| from .coding_agent_backend import execute_tool |
| working_dir = self._sandbox.working_dir if self._sandbox else self.config.working_dir |
| for action in edited_actions: |
| tool_name = action.get("tool", "") |
| tool_input = action.get("input", {}) |
| output = execute_tool(tool_name, tool_input, working_dir) |
| action["output"] = output |
|
|
| |
| if instructions and self._backend: |
| self._backend.inject_instruction(instructions) |
|
|
| |
| if self._backend: |
| self._backend.resume() |
| self._set_state(CodingAgentState.RUNNING) |
|
|
| self._emit_event("branch_created", { |
| "branch_id": branch.branch_id, |
| "parent_branch": active_id, |
| "branch_point": step_index, |
| "instructions": instructions, |
| }) |
|
|
| return branch.branch_id |
|
|
| def get_branches(self) -> List[dict]: |
| """List all branches.""" |
| if self._branch_mgr: |
| return self._branch_mgr.list_branches() |
| return [] |
|
|
| def switch_branch(self, branch_id: str) -> bool: |
| """Switch to a different branch.""" |
| if not self._branch_mgr: |
| return False |
| if self._branch_mgr.switch_branch(branch_id): |
| branch = self._branch_mgr.get_branch(branch_id) |
| if branch: |
| self._structured_turns = list(branch.turns) |
| return True |
| return False |
|
|
| def get_diff_since_step(self, step_index: int) -> str: |
| """Get diff from a step to current state.""" |
| if self._checkpoint_mgr: |
| return self._checkpoint_mgr.get_diff_since(step_index) |
| return "" |
|
|
| |
|
|
| def get_trace(self) -> Dict[str, Any]: |
| """Get the full trace in CodingTraceDisplay format.""" |
| trace = { |
| "session_id": self.session_id, |
| "task_description": self._task_description, |
| "structured_turns": list(self._structured_turns), |
| "backend": self.config.backend_type, |
| "model": self.config.ai_config.get("model", ""), |
| "started_at": self._started_at, |
| "sandbox_mode": self.config.sandbox_mode, |
| } |
| |
| if self._branch_mgr and len(self._branch_mgr.list_branches()) > 1: |
| trace["branches"] = self._branch_mgr.save_all() |
| |
| if self._checkpoint_mgr: |
| trace["checkpoints"] = self._checkpoint_mgr.list_checkpoints() |
| return trace |
|
|
| def get_structured_turns(self) -> List[Dict]: |
| return list(self._structured_turns) |
|
|
| def get_state_summary(self) -> Dict[str, Any]: |
| return { |
| "session_id": self.session_id, |
| "state": self.state.value, |
| "task": self._task_description, |
| "turns": len(self._structured_turns), |
| "backend": self.config.backend_type, |
| } |
|
|
| def _save_trace(self): |
| """Save trace to disk.""" |
| if not self.trace_dir: |
| return |
|
|
| os.makedirs(self.trace_dir, exist_ok=True) |
| trace_path = os.path.join(self.trace_dir, "trace.json") |
| try: |
| with open(trace_path, "w", encoding="utf-8") as f: |
| json.dump(self.get_trace(), f, indent=2, ensure_ascii=False) |
| logger.info(f"Saved coding agent trace to {trace_path}") |
| except Exception as e: |
| logger.error(f"Failed to save trace: {e}") |
|
|
| |
|
|
| def cleanup(self): |
| """Clean up resources.""" |
| if self._backend: |
| try: |
| self._backend.stop() |
| except Exception: |
| pass |
| if self._sandbox: |
| try: |
| self._sandbox.cleanup() |
| except Exception: |
| pass |
|
|