| """ |
| Generic HTTP Agent Proxy |
| |
| POSTs to any REST endpoint with configurable field mapping. |
| Supports sending full conversation history and custom headers. |
| |
| Configuration: |
| agent_proxy: |
| type: http |
| url: "http://localhost:8080/chat" |
| headers: |
| Authorization: "Bearer YOUR_KEY" |
| message_key: "message" # key in request body for user message |
| response_key: "response" # key in response JSON for agent reply |
| session_id_key: "session_id" # key in request/response for session tracking |
| send_history: false # whether to send full conversation history |
| history_key: "messages" # key for history array in request body |
| """ |
|
|
| import logging |
| import uuid |
|
|
| import requests |
|
|
| from .base import BaseAgentProxy, AgentMessage, AgentResponse, AgentProxyFactory |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class GenericHTTPProxy(BaseAgentProxy): |
| """Generic REST API proxy with configurable field mapping.""" |
|
|
| proxy_type = "http" |
|
|
| def _initialize(self): |
| self.url = self.config.get("url") |
| if not self.url: |
| raise ValueError("http proxy requires 'url' in agent_proxy config") |
|
|
| self.headers = self.config.get("headers", {}) |
| self.message_key = self.config.get("message_key", "message") |
| self.response_key = self.config.get("response_key", "response") |
| self.session_id_key = self.config.get("session_id_key", "session_id") |
| self.send_history = self.config.get("send_history", False) |
| self.history_key = self.config.get("history_key", "messages") |
| self.timeout = self.config.get("sandbox", {}).get( |
| "request_timeout_seconds", 60 |
| ) |
|
|
| def start_session(self, task_description: str) -> dict: |
| return { |
| "session_id": str(uuid.uuid4()), |
| "task_description": task_description, |
| "history": [], |
| } |
|
|
| def send_message(self, message: str, session_context: dict) -> AgentResponse: |
| payload = { |
| self.message_key: message, |
| self.session_id_key: session_context["session_id"], |
| } |
|
|
| if self.send_history: |
| payload[self.history_key] = session_context.get("history", []) |
|
|
| try: |
| resp = requests.post( |
| self.url, |
| json=payload, |
| headers=self.headers, |
| timeout=self.timeout, |
| ) |
| resp.raise_for_status() |
| data = resp.json() |
|
|
| response_text = data.get(self.response_key, "") |
| if not response_text and isinstance(data, str): |
| response_text = data |
|
|
| |
| session_context.setdefault("history", []).append( |
| {"role": "user", "content": message} |
| ) |
| session_context["history"].append( |
| {"role": "agent", "content": response_text} |
| ) |
|
|
| return AgentResponse( |
| message=AgentMessage(role="agent", content=str(response_text)) |
| ) |
|
|
| except requests.Timeout: |
| return AgentResponse( |
| message=AgentMessage(role="error", content="Agent request timed out."), |
| error="timeout", |
| ) |
| except requests.RequestException as e: |
| logger.error(f"HTTP proxy request failed: {e}") |
| return AgentResponse( |
| message=AgentMessage( |
| role="error", content=f"Agent communication error: {e}" |
| ), |
| error=str(e), |
| ) |
|
|
|
|
| |
| AgentProxyFactory.register("http", GenericHTTPProxy) |
|
|