Spaces:
Paused
Paused
| """ | |
| Anthropic Tool Use Backend | |
| Custom agent loop using the Anthropic Messages API with tool_use. | |
| Defines Read/Edit/Write/Bash/Grep/Glob tools and executes them | |
| in the working directory. | |
| """ | |
| import json | |
| import logging | |
| import time | |
| import threading | |
| from typing import Any, Dict, Iterator, List, Optional | |
| from ..coding_agent_backend import ( | |
| CodingAgentBackend, | |
| CodingAgentEvent, | |
| CodingAgentEventType, | |
| CODING_TOOLS, | |
| execute_tool, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class AnthropicToolUseBackend(CodingAgentBackend): | |
| """Agent loop using Anthropic Messages API with tool_use.""" | |
| def __init__(self, config: dict): | |
| self._config = config | |
| ai = config.get("ai_config", {}) | |
| self._model = ai.get("model", "claude-sonnet-4-20250514") | |
| self._api_key = ai.get("api_key", "") | |
| self._max_tokens = ai.get("max_tokens", 8192) | |
| self._temperature = ai.get("temperature", 0.3) | |
| self._max_turns = config.get("max_turns", 50) | |
| self._state = "idle" | |
| self._working_dir = "" | |
| self._messages: List[Dict] = [] | |
| self._system_prompt = "" | |
| self._events: list = [] | |
| self._event_idx = 0 | |
| self._pause_event = threading.Event() | |
| self._pause_event.set() # Not paused initially | |
| self._stop_flag = False | |
| self._instruction_queue: list = [] | |
| self._lock = threading.Lock() | |
| self._client = None | |
| def start(self, task: str, working_dir: str, system_prompt: str = "") -> None: | |
| self._working_dir = working_dir | |
| self._system_prompt = system_prompt or ( | |
| "You are a coding agent. You have access to tools for reading, " | |
| "editing, and creating files, running bash commands, and searching code. " | |
| "Use these tools to complete the task. When you are done, stop calling tools " | |
| "and summarize what you did." | |
| ) | |
| self._messages = [{"role": "user", "content": task}] | |
| self._state = "running" | |
| self._stop_flag = False | |
| self._events = [] | |
| self._event_idx = 0 | |
| # Initialize Anthropic client | |
| try: | |
| import anthropic | |
| self._client = anthropic.Anthropic(api_key=self._api_key or None) | |
| except ImportError: | |
| self._emit(CodingAgentEventType.ERROR, {"message": "anthropic package not installed"}) | |
| self._state = "error" | |
| return | |
| # Run the agent loop in a thread | |
| thread = threading.Thread(target=self._run_loop, daemon=True) | |
| thread.start() | |
| def _run_loop(self): | |
| """Main agent loop: query LLM → execute tools → repeat.""" | |
| turn_index = 0 | |
| try: | |
| while not self._stop_flag and turn_index < self._max_turns: | |
| # Check for pause | |
| self._pause_event.wait() | |
| if self._stop_flag: | |
| break | |
| # Check for injected instructions | |
| with self._lock: | |
| if self._instruction_queue: | |
| instruction = self._instruction_queue.pop(0) | |
| self._messages.append({"role": "user", "content": instruction}) | |
| # Convert tools to Anthropic format | |
| tools = [ | |
| {"name": t["name"], "description": t["description"], | |
| "input_schema": t["input_schema"]} | |
| for t in CODING_TOOLS | |
| ] | |
| # Query LLM | |
| self._emit(CodingAgentEventType.THINKING, { | |
| "turn_index": turn_index, | |
| "text": "Thinking...", | |
| }) | |
| try: | |
| response = self._client.messages.create( | |
| model=self._model, | |
| max_tokens=self._max_tokens, | |
| temperature=self._temperature, | |
| system=self._system_prompt, | |
| messages=self._messages, | |
| tools=tools, | |
| ) | |
| except Exception as e: | |
| self._emit(CodingAgentEventType.ERROR, {"message": str(e)}) | |
| self._state = "error" | |
| return | |
| # Process response | |
| reasoning_parts = [] | |
| tool_calls = [] | |
| tool_use_blocks = [] | |
| for block in response.content: | |
| if block.type == "text": | |
| reasoning_parts.append(block.text) | |
| self._emit(CodingAgentEventType.THINKING, { | |
| "turn_index": turn_index, | |
| "text": block.text, | |
| }) | |
| elif block.type == "tool_use": | |
| tool_use_blocks.append(block) | |
| # Add assistant message to history | |
| self._messages.append({ | |
| "role": "assistant", | |
| "content": [b.model_dump() for b in response.content], | |
| }) | |
| # Execute tool calls | |
| tool_results = [] | |
| for block in tool_use_blocks: | |
| if self._stop_flag: | |
| break | |
| # Check for pause between tool executions | |
| self._pause_event.wait() | |
| if self._stop_flag: | |
| break | |
| tool_name = block.name | |
| tool_input = block.input if isinstance(block.input, dict) else {} | |
| self._emit(CodingAgentEventType.TOOL_CALL_START, { | |
| "turn_index": turn_index, | |
| "tool": tool_name, | |
| "input": tool_input, | |
| }) | |
| # Execute the tool | |
| output = execute_tool(tool_name, tool_input, self._working_dir) | |
| # Classify output type | |
| output_type = self._classify_output_type(tool_name) | |
| tc = { | |
| "tool": tool_name, | |
| "input": tool_input, | |
| "output": output, | |
| "output_type": output_type, | |
| } | |
| tool_calls.append(tc) | |
| self._emit(CodingAgentEventType.TOOL_CALL_END, { | |
| "turn_index": turn_index, | |
| "tool_index": len(tool_calls) - 1, | |
| **tc, | |
| }) | |
| tool_results.append({ | |
| "type": "tool_result", | |
| "tool_use_id": block.id, | |
| "content": output, | |
| }) | |
| # Add tool results to history | |
| if tool_results: | |
| self._messages.append({ | |
| "role": "user", | |
| "content": tool_results, | |
| }) | |
| # Emit turn_end | |
| self._emit(CodingAgentEventType.TURN_END, { | |
| "turn_index": turn_index, | |
| "content": "\n".join(reasoning_parts), | |
| "tool_calls": tool_calls, | |
| }) | |
| turn_index += 1 | |
| # If no tool calls, the agent is done | |
| if not tool_use_blocks or response.stop_reason == "end_turn": | |
| break | |
| self._state = "completed" | |
| self._emit(CodingAgentEventType.COMPLETE, {"total_turns": turn_index}) | |
| except Exception as e: | |
| logger.exception("Agent loop error") | |
| self._state = "error" | |
| self._emit(CodingAgentEventType.ERROR, {"message": str(e)}) | |
| def _classify_output_type(self, tool_name: str) -> str: | |
| name = tool_name.lower() | |
| if name in ("bash", "terminal", "shell"): | |
| return "terminal" | |
| if name in ("edit", "replace"): | |
| return "diff" | |
| if name in ("read", "grep", "glob", "search", "write"): | |
| return "code" | |
| return "generic" | |
| def _emit(self, event_type: CodingAgentEventType, data: dict): | |
| event = CodingAgentEvent( | |
| event_type=event_type, | |
| timestamp=time.time(), | |
| data=data, | |
| ) | |
| with self._lock: | |
| self._events.append(event) | |
| def get_events(self) -> Iterator[CodingAgentEvent]: | |
| while True: | |
| with self._lock: | |
| if self._event_idx < len(self._events): | |
| event = self._events[self._event_idx] | |
| self._event_idx += 1 | |
| yield event | |
| if event.event_type in (CodingAgentEventType.COMPLETE, CodingAgentEventType.ERROR): | |
| return | |
| continue | |
| # No new events, wait a bit | |
| if self._state in ("completed", "error"): | |
| return | |
| time.sleep(0.1) | |
| def pause(self) -> None: | |
| self._pause_event.clear() | |
| self._state = "paused" | |
| def resume(self) -> None: | |
| self._state = "running" | |
| self._pause_event.set() | |
| def inject_instruction(self, text: str) -> None: | |
| with self._lock: | |
| self._instruction_queue.append(text) | |
| def stop(self) -> None: | |
| self._stop_flag = True | |
| self._pause_event.set() # Unblock if paused | |
| self._state = "completed" | |
| def get_conversation_history(self) -> List[Dict]: | |
| with self._lock: | |
| return list(self._messages) | |
| def get_state(self) -> str: | |
| return self._state | |
| def truncate_history(self, to_step: int) -> None: | |
| """Truncate conversation to the given turn index.""" | |
| with self._lock: | |
| # Keep initial user message + 2 messages per turn (assistant + tool_results) | |
| keep = 1 + (to_step * 2) | |
| self._messages = self._messages[:keep] | |
| # Also truncate events | |
| new_events = [] | |
| for e in self._events: | |
| ti = e.data.get("turn_index", -1) | |
| if ti < to_step or ti == -1: | |
| new_events.append(e) | |
| self._events = new_events | |
| self._event_idx = min(self._event_idx, len(self._events)) | |