Spaces:
Paused
Paused
| """ | |
| Generic HTTP Agent Proxy | |
| POSTs to any REST endpoint with configurable field mapping. | |
| Supports sending full conversation history and custom headers. | |
| Configuration: | |
| agent_proxy: | |
| type: http | |
| url: "http://localhost:8080/chat" | |
| headers: | |
| Authorization: "Bearer YOUR_KEY" | |
| message_key: "message" # key in request body for user message | |
| response_key: "response" # key in response JSON for agent reply | |
| session_id_key: "session_id" # key in request/response for session tracking | |
| send_history: false # whether to send full conversation history | |
| history_key: "messages" # key for history array in request body | |
| """ | |
| import logging | |
| import uuid | |
| import requests | |
| from .base import BaseAgentProxy, AgentMessage, AgentResponse, AgentProxyFactory | |
| logger = logging.getLogger(__name__) | |
| class GenericHTTPProxy(BaseAgentProxy): | |
| """Generic REST API proxy with configurable field mapping.""" | |
| proxy_type = "http" | |
| def _initialize(self): | |
| self.url = self.config.get("url") | |
| if not self.url: | |
| raise ValueError("http proxy requires 'url' in agent_proxy config") | |
| self.headers = self.config.get("headers", {}) | |
| self.message_key = self.config.get("message_key", "message") | |
| self.response_key = self.config.get("response_key", "response") | |
| self.session_id_key = self.config.get("session_id_key", "session_id") | |
| self.send_history = self.config.get("send_history", False) | |
| self.history_key = self.config.get("history_key", "messages") | |
| self.timeout = self.config.get("sandbox", {}).get( | |
| "request_timeout_seconds", 60 | |
| ) | |
| def start_session(self, task_description: str) -> dict: | |
| return { | |
| "session_id": str(uuid.uuid4()), | |
| "task_description": task_description, | |
| "history": [], | |
| } | |
| def send_message(self, message: str, session_context: dict) -> AgentResponse: | |
| payload = { | |
| self.message_key: message, | |
| self.session_id_key: session_context["session_id"], | |
| } | |
| if self.send_history: | |
| payload[self.history_key] = session_context.get("history", []) | |
| try: | |
| resp = requests.post( | |
| self.url, | |
| json=payload, | |
| headers=self.headers, | |
| timeout=self.timeout, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| response_text = data.get(self.response_key, "") | |
| if not response_text and isinstance(data, str): | |
| response_text = data | |
| # Update history | |
| session_context.setdefault("history", []).append( | |
| {"role": "user", "content": message} | |
| ) | |
| session_context["history"].append( | |
| {"role": "agent", "content": response_text} | |
| ) | |
| return AgentResponse( | |
| message=AgentMessage(role="agent", content=str(response_text)) | |
| ) | |
| except requests.Timeout: | |
| return AgentResponse( | |
| message=AgentMessage(role="error", content="Agent request timed out."), | |
| error="timeout", | |
| ) | |
| except requests.RequestException as e: | |
| logger.error(f"HTTP proxy request failed: {e}") | |
| return AgentResponse( | |
| message=AgentMessage( | |
| role="error", content=f"Agent communication error: {e}" | |
| ), | |
| error=str(e), | |
| ) | |
| # Register with factory | |
| AgentProxyFactory.register("http", GenericHTTPProxy) | |