codebook / potato /coding_agent_backends /claude_sdk_backend.py
davidjurgens's picture
Deploy: Potato — Codebook Annotation
aceb1b2 verified
Raw
History Blame Contribute Delete
9.99 kB
"""
Claude Agent SDK Backend
Spawns Claude Code as a subprocess via the official Agent SDK.
Communicates via JSON-lines over stdin/stdout.
Inherits CLAUDE.md, hooks, MCP servers from the working directory.
Requires: pip install claude-agent-sdk
"""
import json
import logging
import subprocess
import threading
import time
from typing import Any, Dict, Iterator, List, Optional
from ..coding_agent_backend import (
CodingAgentBackend,
CodingAgentEvent,
CodingAgentEventType,
)
logger = logging.getLogger(__name__)
class ClaudeSDKBackend(CodingAgentBackend):
"""Backend using Claude Agent SDK (subprocess with JSON-lines IPC)."""
def __init__(self, config: dict):
self._config = config
self._state = "idle"
self._working_dir = ""
self._events: list = []
self._event_idx = 0
self._lock = threading.Lock()
self._process: Optional[subprocess.Popen] = None
self._stop_flag = False
self._pause_event = threading.Event()
self._pause_event.set()
self._messages: List[Dict] = []
def start(self, task: str, working_dir: str, system_prompt: str = "") -> None:
self._working_dir = working_dir
self._state = "running"
self._stop_flag = False
self._events = []
self._event_idx = 0
thread = threading.Thread(target=self._run_sdk, args=(task,), daemon=True)
thread.start()
def _run_sdk(self, task: str):
"""Run Claude Code via the Agent SDK."""
try:
# Try to use the Agent SDK
try:
from claude_agent_sdk import query
self._run_with_sdk(task)
return
except ImportError:
pass
# Fallback: spawn claude CLI directly
self._run_with_cli(task)
except Exception as e:
logger.exception("Claude SDK backend error")
self._state = "error"
self._emit(CodingAgentEventType.ERROR, {"message": str(e)})
def _run_with_sdk(self, task: str):
"""Use the claude-agent-sdk Python package."""
import asyncio
from claude_agent_sdk import query
async def _run():
turn_index = 0
current_reasoning = []
current_tool_calls = []
async for message in query(prompt=task, options={"cwd": self._working_dir}):
if self._stop_flag:
break
self._pause_event.wait()
if self._stop_flag:
break
msg_type = message.get("type", "")
if msg_type == "assistant":
# Assistant reasoning
content = message.get("message", {}).get("content", [])
for block in content:
if isinstance(block, dict):
if block.get("type") == "text":
text = block.get("text", "")
current_reasoning.append(text)
self._emit(CodingAgentEventType.THINKING, {
"turn_index": turn_index,
"text": text,
})
elif block.get("type") == "tool_use":
tool_name = block.get("name", "unknown")
tool_input = block.get("input", {})
self._emit(CodingAgentEventType.TOOL_CALL_START, {
"turn_index": turn_index,
"tool": tool_name,
"input": tool_input,
})
elif msg_type == "result":
# Tool result
result = message.get("result", "")
tool_name = message.get("tool_name", "")
tool_input = message.get("tool_input", {})
output_type = self._classify_output_type(tool_name)
tc = {
"tool": tool_name,
"input": tool_input,
"output": str(result),
"output_type": output_type,
}
current_tool_calls.append(tc)
self._emit(CodingAgentEventType.TOOL_CALL_END, {
"turn_index": turn_index,
"tool_index": len(current_tool_calls) - 1,
**tc,
})
# Check if turn ended (no more tool calls pending)
if msg_type in ("assistant",) and not message.get("message", {}).get("content", []):
if current_reasoning or current_tool_calls:
self._emit(CodingAgentEventType.TURN_END, {
"turn_index": turn_index,
"content": "\n".join(current_reasoning),
"tool_calls": current_tool_calls,
})
turn_index += 1
current_reasoning = []
current_tool_calls = []
# Final turn
if current_reasoning or current_tool_calls:
self._emit(CodingAgentEventType.TURN_END, {
"turn_index": turn_index,
"content": "\n".join(current_reasoning),
"tool_calls": current_tool_calls,
})
turn_index += 1
self._state = "completed"
self._emit(CodingAgentEventType.COMPLETE, {"total_turns": turn_index})
asyncio.run(_run())
def _run_with_cli(self, task: str):
"""Fallback: spawn claude CLI as subprocess."""
try:
self._process = subprocess.Popen(
["claude", "--bare", "-p", task],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=self._working_dir,
text=True,
)
except FileNotFoundError:
self._emit(CodingAgentEventType.ERROR, {
"message": "claude CLI not found. Install Claude Code or claude-agent-sdk."
})
self._state = "error"
return
turn_index = 0
output_lines = []
for line in self._process.stdout:
if self._stop_flag:
break
line = line.strip()
if not line:
continue
# Try to parse as JSON-lines
try:
msg = json.loads(line)
msg_type = msg.get("type", "")
if msg_type in ("text", "content"):
text = msg.get("text", msg.get("content", ""))
self._emit(CodingAgentEventType.THINKING, {
"turn_index": turn_index,
"text": text,
})
elif msg_type == "tool_use":
self._emit(CodingAgentEventType.TOOL_CALL_START, {
"turn_index": turn_index,
"tool": msg.get("name", ""),
"input": msg.get("input", {}),
})
elif msg_type == "tool_result":
self._emit(CodingAgentEventType.TOOL_CALL_END, {
"turn_index": turn_index,
"tool": msg.get("name", ""),
"input": msg.get("input", {}),
"output": msg.get("output", ""),
"output_type": self._classify_output_type(msg.get("name", "")),
})
except json.JSONDecodeError:
# Plain text output
output_lines.append(line)
self._process.wait()
self._state = "completed"
self._emit(CodingAgentEventType.COMPLETE, {"total_turns": turn_index + 1})
def _classify_output_type(self, tool_name: str) -> str:
name = (tool_name or "").lower()
if name in ("bash", "terminal", "shell"):
return "terminal"
if name in ("edit", "replace"):
return "diff"
return "code"
def _emit(self, event_type: CodingAgentEventType, data: dict):
event = CodingAgentEvent(event_type=event_type, timestamp=time.time(), data=data)
with self._lock:
self._events.append(event)
def get_events(self) -> Iterator[CodingAgentEvent]:
while True:
with self._lock:
if self._event_idx < len(self._events):
event = self._events[self._event_idx]
self._event_idx += 1
yield event
if event.event_type in (CodingAgentEventType.COMPLETE, CodingAgentEventType.ERROR):
return
continue
if self._state in ("completed", "error"):
return
time.sleep(0.1)
def pause(self) -> None:
self._pause_event.clear()
self._state = "paused"
def resume(self) -> None:
self._state = "running"
self._pause_event.set()
def inject_instruction(self, text: str) -> None:
# Claude SDK doesn't support mid-session instruction injection
# This would require restarting with appended context
logger.warning("inject_instruction not fully supported in Claude SDK backend")
def stop(self) -> None:
self._stop_flag = True
self._pause_event.set()
if self._process and self._process.poll() is None:
self._process.terminate()
self._state = "completed"
def get_conversation_history(self) -> List[Dict]:
return list(self._messages)
def get_state(self) -> str:
return self._state