Spaces:
Paused
Paused
File size: 3,646 Bytes
aceb1b2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | """
Generic HTTP Agent Proxy
POSTs to any REST endpoint with configurable field mapping.
Supports sending full conversation history and custom headers.
Configuration:
agent_proxy:
type: http
url: "http://localhost:8080/chat"
headers:
Authorization: "Bearer YOUR_KEY"
message_key: "message" # key in request body for user message
response_key: "response" # key in response JSON for agent reply
session_id_key: "session_id" # key in request/response for session tracking
send_history: false # whether to send full conversation history
history_key: "messages" # key for history array in request body
"""
import logging
import uuid
import requests
from .base import BaseAgentProxy, AgentMessage, AgentResponse, AgentProxyFactory
logger = logging.getLogger(__name__)
class GenericHTTPProxy(BaseAgentProxy):
"""Generic REST API proxy with configurable field mapping."""
proxy_type = "http"
def _initialize(self):
self.url = self.config.get("url")
if not self.url:
raise ValueError("http proxy requires 'url' in agent_proxy config")
self.headers = self.config.get("headers", {})
self.message_key = self.config.get("message_key", "message")
self.response_key = self.config.get("response_key", "response")
self.session_id_key = self.config.get("session_id_key", "session_id")
self.send_history = self.config.get("send_history", False)
self.history_key = self.config.get("history_key", "messages")
self.timeout = self.config.get("sandbox", {}).get(
"request_timeout_seconds", 60
)
def start_session(self, task_description: str) -> dict:
return {
"session_id": str(uuid.uuid4()),
"task_description": task_description,
"history": [],
}
def send_message(self, message: str, session_context: dict) -> AgentResponse:
payload = {
self.message_key: message,
self.session_id_key: session_context["session_id"],
}
if self.send_history:
payload[self.history_key] = session_context.get("history", [])
try:
resp = requests.post(
self.url,
json=payload,
headers=self.headers,
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
response_text = data.get(self.response_key, "")
if not response_text and isinstance(data, str):
response_text = data
# Update history
session_context.setdefault("history", []).append(
{"role": "user", "content": message}
)
session_context["history"].append(
{"role": "agent", "content": response_text}
)
return AgentResponse(
message=AgentMessage(role="agent", content=str(response_text))
)
except requests.Timeout:
return AgentResponse(
message=AgentMessage(role="error", content="Agent request timed out."),
error="timeout",
)
except requests.RequestException as e:
logger.error(f"HTTP proxy request failed: {e}")
return AgentResponse(
message=AgentMessage(
role="error", content=f"Agent communication error: {e}"
),
error=str(e),
)
# Register with factory
AgentProxyFactory.register("http", GenericHTTPProxy)
|