"""Interaction Agent Runtime - handles LLM calls for user and agent turns.""" import json from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Set from .agent import build_system_prompt, prepare_message_with_history from .tools import ToolResult, get_tool_schemas, handle_tool_call from ...config import get_settings from ...services.conversation import get_conversation_log, get_working_memory_log from ...openrouter_client import request_chat_completion from ...logging_config import logger @dataclass class InteractionResult: """Result from the interaction agent.""" success: bool response: str error: Optional[str] = None execution_agents_used: int = 0 @dataclass class _ToolCall: """Parsed tool invocation from an LLM response.""" identifier: Optional[str] name: str arguments: Dict[str, Any] @dataclass class _LoopSummary: """Aggregate information produced by the interaction loop.""" last_assistant_text: str = "" user_messages: List[str] = field(default_factory=list) tool_names: List[str] = field(default_factory=list) execution_agents: Set[str] = field(default_factory=set) class InteractionAgentRuntime: """Manages the interaction agent's request processing.""" MAX_TOOL_ITERATIONS = 8 # Initialize interaction agent runtime with settings and service dependencies def __init__(self) -> None: settings = get_settings() self.api_key = settings.api_key self.model = settings.interaction_agent_model self.settings = settings self.conversation_log = get_conversation_log() self.working_memory_log = get_working_memory_log() self.tool_schemas = get_tool_schemas() if not self.api_key: raise ValueError( "API key not configured. Set API_KEY environment variable." ) # Main entry point for processing user messages through the LLM interaction loop async def execute(self, user_message: str) -> InteractionResult: """Handle a user-authored message.""" try: transcript_before = self._load_conversation_transcript() self.conversation_log.record_user_message(user_message) system_prompt = build_system_prompt() messages = prepare_message_with_history( user_message, transcript_before, message_type="user" ) logger.info("Processing user message through interaction agent") summary = await self._run_interaction_loop(system_prompt, messages) final_response = self._finalize_response(summary) if final_response and not summary.user_messages: self.conversation_log.record_reply(final_response) return InteractionResult( success=True, response=final_response, execution_agents_used=len(summary.execution_agents), ) except Exception as exc: logger.error("Interaction agent failed", extra={"error": str(exc)}) return InteractionResult( success=False, response="", error=str(exc), ) # Handle incoming messages from execution agents and generate appropriate responses async def handle_agent_message(self, agent_message: str) -> InteractionResult: """Process a status update emitted by an execution agent.""" try: transcript_before = self._load_conversation_transcript() self.conversation_log.record_agent_message(agent_message) system_prompt = build_system_prompt() messages = prepare_message_with_history( agent_message, transcript_before, message_type="agent" ) logger.info("Processing execution agent results") summary = await self._run_interaction_loop(system_prompt, messages) final_response = self._finalize_response(summary) if final_response and not summary.user_messages: self.conversation_log.record_reply(final_response) return InteractionResult( success=True, response=final_response, execution_agents_used=len(summary.execution_agents), ) except Exception as exc: logger.error("Interaction agent (agent message) failed", extra={"error": str(exc)}) return InteractionResult( success=False, response="", error=str(exc), ) # Core interaction loop that handles LLM calls and tool executions until completion async def _run_interaction_loop( self, system_prompt: str, messages: List[Dict[str, Any]], ) -> _LoopSummary: """Iteratively query the LLM until it issues a final response.""" summary = _LoopSummary() for iteration in range(self.MAX_TOOL_ITERATIONS): response = await self._make_llm_call(system_prompt, messages) assistant_message = self._extract_assistant_message(response) assistant_content = (assistant_message.get("content") or "").strip() if assistant_content: summary.last_assistant_text = assistant_content raw_tool_calls = assistant_message.get("tool_calls") or [] parsed_tool_calls = self._parse_tool_calls(raw_tool_calls) assistant_entry: Dict[str, Any] = { "role": "assistant", "content": assistant_message.get("content", "") or "", } if raw_tool_calls: assistant_entry["tool_calls"] = raw_tool_calls messages.append(assistant_entry) if not parsed_tool_calls: break for tool_call in parsed_tool_calls: summary.tool_names.append(tool_call.name) if tool_call.name == "send_message_to_agent": agent_name = tool_call.arguments.get("agent_name") if isinstance(agent_name, str) and agent_name: summary.execution_agents.add(agent_name) result = self._execute_tool(tool_call) if result.user_message: summary.user_messages.append(result.user_message) tool_message = { "role": "tool", "tool_call_id": tool_call.identifier or tool_call.name, "content": self._format_tool_result(tool_call, result), } messages.append(tool_message) else: raise RuntimeError("Reached tool iteration limit without final response") if not summary.user_messages and not summary.last_assistant_text: logger.warning("Interaction loop exited without assistant content") return summary # Load conversation history, preferring summarized version if available def _load_conversation_transcript(self) -> str: if self.settings.summarization_enabled: rendered = self.working_memory_log.render_transcript() if rendered.strip(): return rendered return self.conversation_log.load_transcript() # Execute API call with system prompt, messages, and tool schemas async def _make_llm_call( self, system_prompt: str, messages: List[Dict[str, Any]], ) -> Dict[str, Any]: """Make an LLM call via API.""" logger.debug( "Interaction agent calling LLM", extra={"model": self.model, "tools": len(self.tool_schemas)}, ) return await request_chat_completion( model=self.model, messages=messages, system=system_prompt, api_key=self.api_key, tools=self.tool_schemas, ) # Extract the assistant's message from the API response structure def _extract_assistant_message(self, response: Dict[str, Any]) -> Dict[str, Any]: """Return the assistant message from the raw response payload.""" choice = (response.get("choices") or [{}])[0] message = choice.get("message") if not isinstance(message, dict): raise RuntimeError("LLM response did not include an assistant message") return message # Convert raw LLM tool calls into structured _ToolCall objects with validation def _parse_tool_calls(self, raw_tool_calls: List[Dict[str, Any]]) -> List[_ToolCall]: """Normalize tool call payloads from the LLM.""" parsed: List[_ToolCall] = [] for raw in raw_tool_calls: function_block = raw.get("function") or {} name = function_block.get("name") if not isinstance(name, str) or not name: logger.warning("Skipping tool call without name", extra={"tool": raw}) continue arguments, error = self._parse_tool_arguments(function_block.get("arguments")) if error: logger.warning("Tool call arguments invalid", extra={"tool": name, "error": error}) parsed.append( _ToolCall( identifier=raw.get("id"), name=name, arguments={"__invalid_arguments__": error}, ) ) continue parsed.append( _ToolCall(identifier=raw.get("id"), name=name, arguments=arguments) ) return parsed # Parse and validate tool arguments from various formats (dict, JSON string, etc.) def _parse_tool_arguments( self, raw_arguments: Any ) -> tuple[Dict[str, Any], Optional[str]]: """Convert tool arguments into a dictionary, reporting errors.""" if raw_arguments is None: return {}, None if isinstance(raw_arguments, dict): return raw_arguments, None if isinstance(raw_arguments, str): if not raw_arguments.strip(): return {}, None try: parsed = json.loads(raw_arguments) except json.JSONDecodeError as exc: return {}, f"invalid json: {exc}" if isinstance(parsed, dict): return parsed, None return {}, "decoded arguments were not an object" return {}, f"unsupported argument type: {type(raw_arguments).__name__}" # Execute tool calls with error handling and logging, returning standardized results def _execute_tool(self, tool_call: _ToolCall) -> ToolResult: """Execute a tool call and convert low-level errors into structured results.""" if "__invalid_arguments__" in tool_call.arguments: error = tool_call.arguments["__invalid_arguments__"] self._log_tool_invocation(tool_call, stage="rejected", detail={"error": error}) return ToolResult(success=False, payload={"error": error}) try: self._log_tool_invocation(tool_call, stage="start") result = handle_tool_call(tool_call.name, tool_call.arguments) except Exception as exc: # pragma: no cover - defensive logger.error( "Tool execution crashed", extra={"tool": tool_call.name, "error": str(exc)}, ) self._log_tool_invocation( tool_call, stage="error", detail={"error": str(exc)}, ) return ToolResult(success=False, payload={"error": str(exc)}) if not isinstance(result, ToolResult): logger.warning( "Tool did not return ToolResult; coercing", extra={"tool": tool_call.name}, ) wrapped = ToolResult(success=True, payload=result) self._log_tool_invocation(tool_call, stage="done", result=wrapped) return wrapped status = "success" if result.success else "error" logger.debug( "Tool executed", extra={ "tool": tool_call.name, "status": status, }, ) self._log_tool_invocation(tool_call, stage="done", result=result) return result # Format tool execution results into JSON for LLM consumption def _format_tool_result(self, tool_call: _ToolCall, result: ToolResult) -> str: """Render a tool execution result back to the LLM.""" payload: Dict[str, Any] = { "tool": tool_call.name, "status": "success" if result.success else "error", "arguments": { key: value for key, value in tool_call.arguments.items() if key != "__invalid_arguments__" }, } if result.payload is not None: key = "result" if result.success else "error" payload[key] = result.payload return self._safe_json_dump(payload) # Safely serialize objects to JSON with fallback to string representation def _safe_json_dump(self, payload: Any) -> str: """Serialize payload to JSON, falling back to repr on failure.""" try: return json.dumps(payload, default=str) except TypeError: return repr(payload) # Log tool execution stages (start, done, error) with structured metadata def _log_tool_invocation( self, tool_call: _ToolCall, *, stage: str, result: Optional[ToolResult] = None, detail: Optional[Dict[str, Any]] = None, ) -> None: """Emit structured logs for tool lifecycle events.""" cleaned_args = { key: value for key, value in tool_call.arguments.items() if key != "__invalid_arguments__" } log_payload: Dict[str, Any] = { "tool": tool_call.name, "stage": stage, "arguments": cleaned_args, } if result is not None: log_payload["success"] = result.success if result.payload is not None: log_payload["payload"] = result.payload if detail: log_payload.update(detail) if stage == "done": logger.info(f"Tool '{tool_call.name}' completed") elif stage in {"error", "rejected"}: logger.warning(f"Tool '{tool_call.name}' {stage}") else: logger.debug(f"Tool '{tool_call.name}' {stage}") # Determine final user-facing response from interaction loop summary def _finalize_response(self, summary: _LoopSummary) -> str: """Decide what text should be exposed to the user as the final reply.""" if summary.user_messages: return summary.user_messages[-1] return summary.last_assistant_text