Guilherme34's picture
Upload folder using huggingface_hub
aa15bce verified
"""Interaction Agent Runtime - handles LLM calls for user and agent turns."""
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set
from .agent import build_system_prompt, prepare_message_with_history
from .tools import ToolResult, get_tool_schemas, handle_tool_call
from ...config import get_settings
from ...services.conversation import get_conversation_log, get_working_memory_log
from ...openrouter_client import request_chat_completion
from ...logging_config import logger
@dataclass
class InteractionResult:
"""Result from the interaction agent."""
success: bool
response: str
error: Optional[str] = None
execution_agents_used: int = 0
@dataclass
class _ToolCall:
"""Parsed tool invocation from an LLM response."""
identifier: Optional[str]
name: str
arguments: Dict[str, Any]
@dataclass
class _LoopSummary:
"""Aggregate information produced by the interaction loop."""
last_assistant_text: str = ""
user_messages: List[str] = field(default_factory=list)
tool_names: List[str] = field(default_factory=list)
execution_agents: Set[str] = field(default_factory=set)
class InteractionAgentRuntime:
"""Manages the interaction agent's request processing."""
MAX_TOOL_ITERATIONS = 8
# Initialize interaction agent runtime with settings and service dependencies
def __init__(self) -> None:
settings = get_settings()
self.api_key = settings.api_key
self.model = settings.interaction_agent_model
self.settings = settings
self.conversation_log = get_conversation_log()
self.working_memory_log = get_working_memory_log()
self.tool_schemas = get_tool_schemas()
if not self.api_key:
raise ValueError(
"API key not configured. Set API_KEY environment variable."
)
# Main entry point for processing user messages through the LLM interaction loop
async def execute(self, user_message: str) -> InteractionResult:
"""Handle a user-authored message."""
try:
transcript_before = self._load_conversation_transcript()
self.conversation_log.record_user_message(user_message)
system_prompt = build_system_prompt()
messages = prepare_message_with_history(
user_message, transcript_before, message_type="user"
)
logger.info("Processing user message through interaction agent")
summary = await self._run_interaction_loop(system_prompt, messages)
final_response = self._finalize_response(summary)
if final_response and not summary.user_messages:
self.conversation_log.record_reply(final_response)
return InteractionResult(
success=True,
response=final_response,
execution_agents_used=len(summary.execution_agents),
)
except Exception as exc:
logger.error("Interaction agent failed", extra={"error": str(exc)})
return InteractionResult(
success=False,
response="",
error=str(exc),
)
# Handle incoming messages from execution agents and generate appropriate responses
async def handle_agent_message(self, agent_message: str) -> InteractionResult:
"""Process a status update emitted by an execution agent."""
try:
transcript_before = self._load_conversation_transcript()
self.conversation_log.record_agent_message(agent_message)
system_prompt = build_system_prompt()
messages = prepare_message_with_history(
agent_message, transcript_before, message_type="agent"
)
logger.info("Processing execution agent results")
summary = await self._run_interaction_loop(system_prompt, messages)
final_response = self._finalize_response(summary)
if final_response and not summary.user_messages:
self.conversation_log.record_reply(final_response)
return InteractionResult(
success=True,
response=final_response,
execution_agents_used=len(summary.execution_agents),
)
except Exception as exc:
logger.error("Interaction agent (agent message) failed", extra={"error": str(exc)})
return InteractionResult(
success=False,
response="",
error=str(exc),
)
# Core interaction loop that handles LLM calls and tool executions until completion
async def _run_interaction_loop(
self,
system_prompt: str,
messages: List[Dict[str, Any]],
) -> _LoopSummary:
"""Iteratively query the LLM until it issues a final response."""
summary = _LoopSummary()
for iteration in range(self.MAX_TOOL_ITERATIONS):
response = await self._make_llm_call(system_prompt, messages)
assistant_message = self._extract_assistant_message(response)
assistant_content = (assistant_message.get("content") or "").strip()
if assistant_content:
summary.last_assistant_text = assistant_content
raw_tool_calls = assistant_message.get("tool_calls") or []
parsed_tool_calls = self._parse_tool_calls(raw_tool_calls)
assistant_entry: Dict[str, Any] = {
"role": "assistant",
"content": assistant_message.get("content", "") or "",
}
if raw_tool_calls:
assistant_entry["tool_calls"] = raw_tool_calls
messages.append(assistant_entry)
if not parsed_tool_calls:
break
for tool_call in parsed_tool_calls:
summary.tool_names.append(tool_call.name)
if tool_call.name == "send_message_to_agent":
agent_name = tool_call.arguments.get("agent_name")
if isinstance(agent_name, str) and agent_name:
summary.execution_agents.add(agent_name)
result = self._execute_tool(tool_call)
if result.user_message:
summary.user_messages.append(result.user_message)
tool_message = {
"role": "tool",
"tool_call_id": tool_call.identifier or tool_call.name,
"content": self._format_tool_result(tool_call, result),
}
messages.append(tool_message)
else:
raise RuntimeError("Reached tool iteration limit without final response")
if not summary.user_messages and not summary.last_assistant_text:
logger.warning("Interaction loop exited without assistant content")
return summary
# Load conversation history, preferring summarized version if available
def _load_conversation_transcript(self) -> str:
if self.settings.summarization_enabled:
rendered = self.working_memory_log.render_transcript()
if rendered.strip():
return rendered
return self.conversation_log.load_transcript()
# Execute API call with system prompt, messages, and tool schemas
async def _make_llm_call(
self,
system_prompt: str,
messages: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Make an LLM call via API."""
logger.debug(
"Interaction agent calling LLM",
extra={"model": self.model, "tools": len(self.tool_schemas)},
)
return await request_chat_completion(
model=self.model,
messages=messages,
system=system_prompt,
api_key=self.api_key,
tools=self.tool_schemas,
)
# Extract the assistant's message from the API response structure
def _extract_assistant_message(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""Return the assistant message from the raw response payload."""
choice = (response.get("choices") or [{}])[0]
message = choice.get("message")
if not isinstance(message, dict):
raise RuntimeError("LLM response did not include an assistant message")
return message
# Convert raw LLM tool calls into structured _ToolCall objects with validation
def _parse_tool_calls(self, raw_tool_calls: List[Dict[str, Any]]) -> List[_ToolCall]:
"""Normalize tool call payloads from the LLM."""
parsed: List[_ToolCall] = []
for raw in raw_tool_calls:
function_block = raw.get("function") or {}
name = function_block.get("name")
if not isinstance(name, str) or not name:
logger.warning("Skipping tool call without name", extra={"tool": raw})
continue
arguments, error = self._parse_tool_arguments(function_block.get("arguments"))
if error:
logger.warning("Tool call arguments invalid", extra={"tool": name, "error": error})
parsed.append(
_ToolCall(
identifier=raw.get("id"),
name=name,
arguments={"__invalid_arguments__": error},
)
)
continue
parsed.append(
_ToolCall(identifier=raw.get("id"), name=name, arguments=arguments)
)
return parsed
# Parse and validate tool arguments from various formats (dict, JSON string, etc.)
def _parse_tool_arguments(
self, raw_arguments: Any
) -> tuple[Dict[str, Any], Optional[str]]:
"""Convert tool arguments into a dictionary, reporting errors."""
if raw_arguments is None:
return {}, None
if isinstance(raw_arguments, dict):
return raw_arguments, None
if isinstance(raw_arguments, str):
if not raw_arguments.strip():
return {}, None
try:
parsed = json.loads(raw_arguments)
except json.JSONDecodeError as exc:
return {}, f"invalid json: {exc}"
if isinstance(parsed, dict):
return parsed, None
return {}, "decoded arguments were not an object"
return {}, f"unsupported argument type: {type(raw_arguments).__name__}"
# Execute tool calls with error handling and logging, returning standardized results
def _execute_tool(self, tool_call: _ToolCall) -> ToolResult:
"""Execute a tool call and convert low-level errors into structured results."""
if "__invalid_arguments__" in tool_call.arguments:
error = tool_call.arguments["__invalid_arguments__"]
self._log_tool_invocation(tool_call, stage="rejected", detail={"error": error})
return ToolResult(success=False, payload={"error": error})
try:
self._log_tool_invocation(tool_call, stage="start")
result = handle_tool_call(tool_call.name, tool_call.arguments)
except Exception as exc: # pragma: no cover - defensive
logger.error(
"Tool execution crashed",
extra={"tool": tool_call.name, "error": str(exc)},
)
self._log_tool_invocation(
tool_call,
stage="error",
detail={"error": str(exc)},
)
return ToolResult(success=False, payload={"error": str(exc)})
if not isinstance(result, ToolResult):
logger.warning(
"Tool did not return ToolResult; coercing",
extra={"tool": tool_call.name},
)
wrapped = ToolResult(success=True, payload=result)
self._log_tool_invocation(tool_call, stage="done", result=wrapped)
return wrapped
status = "success" if result.success else "error"
logger.debug(
"Tool executed",
extra={
"tool": tool_call.name,
"status": status,
},
)
self._log_tool_invocation(tool_call, stage="done", result=result)
return result
# Format tool execution results into JSON for LLM consumption
def _format_tool_result(self, tool_call: _ToolCall, result: ToolResult) -> str:
"""Render a tool execution result back to the LLM."""
payload: Dict[str, Any] = {
"tool": tool_call.name,
"status": "success" if result.success else "error",
"arguments": {
key: value
for key, value in tool_call.arguments.items()
if key != "__invalid_arguments__"
},
}
if result.payload is not None:
key = "result" if result.success else "error"
payload[key] = result.payload
return self._safe_json_dump(payload)
# Safely serialize objects to JSON with fallback to string representation
def _safe_json_dump(self, payload: Any) -> str:
"""Serialize payload to JSON, falling back to repr on failure."""
try:
return json.dumps(payload, default=str)
except TypeError:
return repr(payload)
# Log tool execution stages (start, done, error) with structured metadata
def _log_tool_invocation(
self,
tool_call: _ToolCall,
*,
stage: str,
result: Optional[ToolResult] = None,
detail: Optional[Dict[str, Any]] = None,
) -> None:
"""Emit structured logs for tool lifecycle events."""
cleaned_args = {
key: value
for key, value in tool_call.arguments.items()
if key != "__invalid_arguments__"
}
log_payload: Dict[str, Any] = {
"tool": tool_call.name,
"stage": stage,
"arguments": cleaned_args,
}
if result is not None:
log_payload["success"] = result.success
if result.payload is not None:
log_payload["payload"] = result.payload
if detail:
log_payload.update(detail)
if stage == "done":
logger.info(f"Tool '{tool_call.name}' completed")
elif stage in {"error", "rejected"}:
logger.warning(f"Tool '{tool_call.name}' {stage}")
else:
logger.debug(f"Tool '{tool_call.name}' {stage}")
# Determine final user-facing response from interaction loop summary
def _finalize_response(self, summary: _LoopSummary) -> str:
"""Decide what text should be exposed to the user as the final reply."""
if summary.user_messages:
return summary.user_messages[-1]
return summary.last_assistant_text