codebook / potato /coding_agent_backends /anthropic_backend.py
davidjurgens's picture
Deploy: Potato — Codebook Annotation
aceb1b2 verified
Raw
History Blame Contribute Delete
10.3 kB
"""
Anthropic Tool Use Backend
Custom agent loop using the Anthropic Messages API with tool_use.
Defines Read/Edit/Write/Bash/Grep/Glob tools and executes them
in the working directory.
"""
import json
import logging
import time
import threading
from typing import Any, Dict, Iterator, List, Optional
from ..coding_agent_backend import (
CodingAgentBackend,
CodingAgentEvent,
CodingAgentEventType,
CODING_TOOLS,
execute_tool,
)
logger = logging.getLogger(__name__)
class AnthropicToolUseBackend(CodingAgentBackend):
"""Agent loop using Anthropic Messages API with tool_use."""
def __init__(self, config: dict):
self._config = config
ai = config.get("ai_config", {})
self._model = ai.get("model", "claude-sonnet-4-20250514")
self._api_key = ai.get("api_key", "")
self._max_tokens = ai.get("max_tokens", 8192)
self._temperature = ai.get("temperature", 0.3)
self._max_turns = config.get("max_turns", 50)
self._state = "idle"
self._working_dir = ""
self._messages: List[Dict] = []
self._system_prompt = ""
self._events: list = []
self._event_idx = 0
self._pause_event = threading.Event()
self._pause_event.set() # Not paused initially
self._stop_flag = False
self._instruction_queue: list = []
self._lock = threading.Lock()
self._client = None
def start(self, task: str, working_dir: str, system_prompt: str = "") -> None:
self._working_dir = working_dir
self._system_prompt = system_prompt or (
"You are a coding agent. You have access to tools for reading, "
"editing, and creating files, running bash commands, and searching code. "
"Use these tools to complete the task. When you are done, stop calling tools "
"and summarize what you did."
)
self._messages = [{"role": "user", "content": task}]
self._state = "running"
self._stop_flag = False
self._events = []
self._event_idx = 0
# Initialize Anthropic client
try:
import anthropic
self._client = anthropic.Anthropic(api_key=self._api_key or None)
except ImportError:
self._emit(CodingAgentEventType.ERROR, {"message": "anthropic package not installed"})
self._state = "error"
return
# Run the agent loop in a thread
thread = threading.Thread(target=self._run_loop, daemon=True)
thread.start()
def _run_loop(self):
"""Main agent loop: query LLM → execute tools → repeat."""
turn_index = 0
try:
while not self._stop_flag and turn_index < self._max_turns:
# Check for pause
self._pause_event.wait()
if self._stop_flag:
break
# Check for injected instructions
with self._lock:
if self._instruction_queue:
instruction = self._instruction_queue.pop(0)
self._messages.append({"role": "user", "content": instruction})
# Convert tools to Anthropic format
tools = [
{"name": t["name"], "description": t["description"],
"input_schema": t["input_schema"]}
for t in CODING_TOOLS
]
# Query LLM
self._emit(CodingAgentEventType.THINKING, {
"turn_index": turn_index,
"text": "Thinking...",
})
try:
response = self._client.messages.create(
model=self._model,
max_tokens=self._max_tokens,
temperature=self._temperature,
system=self._system_prompt,
messages=self._messages,
tools=tools,
)
except Exception as e:
self._emit(CodingAgentEventType.ERROR, {"message": str(e)})
self._state = "error"
return
# Process response
reasoning_parts = []
tool_calls = []
tool_use_blocks = []
for block in response.content:
if block.type == "text":
reasoning_parts.append(block.text)
self._emit(CodingAgentEventType.THINKING, {
"turn_index": turn_index,
"text": block.text,
})
elif block.type == "tool_use":
tool_use_blocks.append(block)
# Add assistant message to history
self._messages.append({
"role": "assistant",
"content": [b.model_dump() for b in response.content],
})
# Execute tool calls
tool_results = []
for block in tool_use_blocks:
if self._stop_flag:
break
# Check for pause between tool executions
self._pause_event.wait()
if self._stop_flag:
break
tool_name = block.name
tool_input = block.input if isinstance(block.input, dict) else {}
self._emit(CodingAgentEventType.TOOL_CALL_START, {
"turn_index": turn_index,
"tool": tool_name,
"input": tool_input,
})
# Execute the tool
output = execute_tool(tool_name, tool_input, self._working_dir)
# Classify output type
output_type = self._classify_output_type(tool_name)
tc = {
"tool": tool_name,
"input": tool_input,
"output": output,
"output_type": output_type,
}
tool_calls.append(tc)
self._emit(CodingAgentEventType.TOOL_CALL_END, {
"turn_index": turn_index,
"tool_index": len(tool_calls) - 1,
**tc,
})
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
# Add tool results to history
if tool_results:
self._messages.append({
"role": "user",
"content": tool_results,
})
# Emit turn_end
self._emit(CodingAgentEventType.TURN_END, {
"turn_index": turn_index,
"content": "\n".join(reasoning_parts),
"tool_calls": tool_calls,
})
turn_index += 1
# If no tool calls, the agent is done
if not tool_use_blocks or response.stop_reason == "end_turn":
break
self._state = "completed"
self._emit(CodingAgentEventType.COMPLETE, {"total_turns": turn_index})
except Exception as e:
logger.exception("Agent loop error")
self._state = "error"
self._emit(CodingAgentEventType.ERROR, {"message": str(e)})
def _classify_output_type(self, tool_name: str) -> str:
name = tool_name.lower()
if name in ("bash", "terminal", "shell"):
return "terminal"
if name in ("edit", "replace"):
return "diff"
if name in ("read", "grep", "glob", "search", "write"):
return "code"
return "generic"
def _emit(self, event_type: CodingAgentEventType, data: dict):
event = CodingAgentEvent(
event_type=event_type,
timestamp=time.time(),
data=data,
)
with self._lock:
self._events.append(event)
def get_events(self) -> Iterator[CodingAgentEvent]:
while True:
with self._lock:
if self._event_idx < len(self._events):
event = self._events[self._event_idx]
self._event_idx += 1
yield event
if event.event_type in (CodingAgentEventType.COMPLETE, CodingAgentEventType.ERROR):
return
continue
# No new events, wait a bit
if self._state in ("completed", "error"):
return
time.sleep(0.1)
def pause(self) -> None:
self._pause_event.clear()
self._state = "paused"
def resume(self) -> None:
self._state = "running"
self._pause_event.set()
def inject_instruction(self, text: str) -> None:
with self._lock:
self._instruction_queue.append(text)
def stop(self) -> None:
self._stop_flag = True
self._pause_event.set() # Unblock if paused
self._state = "completed"
def get_conversation_history(self) -> List[Dict]:
with self._lock:
return list(self._messages)
def get_state(self) -> str:
return self._state
def truncate_history(self, to_step: int) -> None:
"""Truncate conversation to the given turn index."""
with self._lock:
# Keep initial user message + 2 messages per turn (assistant + tool_results)
keep = 1 + (to_step * 2)
self._messages = self._messages[:keep]
# Also truncate events
new_events = []
for e in self._events:
ti = e.data.get("turn_index", -1)
if ti < to_step or ti == -1:
new_events.append(e)
self._events = new_events
self._event_idx = min(self._event_idx, len(self._events))