Spaces:
Running
Running
File size: 3,679 Bytes
ba2ada2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | """
API Forge — dynamically integrates with external REST APIs.
Allows agents to discover, authenticate, and call REST APIs
on the fly. Supports automatic parameter extraction and response parsing.
"""
import asyncio
import json
import os
from typing import Optional
from schemas.agent import ToolOutput
_ENABLE_API = os.getenv("ADAM_ENABLE_API", "true").lower() == "true"
class APIForge:
"""
Dynamic API integration tool.
Agents can call any REST API using HTTP methods.
Includes automatic JSON response parsing, error handling,
and timeout management.
"""
def __init__(self):
self._call_count = 0
async def integrate(self, task: str) -> str:
"""
Execute an API call based on a natural language description.
The task description should include the API endpoint and parameters.
"""
if not _ENABLE_API:
return f"API calls disabled. Task: {task[:200]}"
# Extract API details from the task description
url, method, headers, body = self._parse_api_request(task)
if not url:
return f"Cannot parse API request from: {task[:200]}"
return await self._execute_call(url, method, headers, body)
def _parse_api_request(self, task: str) -> tuple:
"""
Parse an API request from a natural language description.
Returns (url, method, headers, body).
"""
import re
task_lower = task.lower()
# Extract URL
url_match = re.search(r'https?://[^\s,;)]+', task)
url = url_match.group(0) if url_match else ""
# Determine HTTP method
method = "GET"
if any(w in task_lower for w in ["post", "create", "submit", "send"]):
method = "POST"
elif any(w in task_lower for w in ["put", "update"]):
method = "PUT"
elif any(w in task_lower for w in ["delete", "remove"]):
method = "DELETE"
elif any(w in task_lower for w in ["patch"]):
method = "PATCH"
# Extract JSON body (if present)
body = None
body_match = re.search(r'\{[^{}]*\}', task, re.DOTALL)
if body_match:
try:
body = json.loads(body_match.group(0))
except json.JSONDecodeError:
body = body_match.group(0)
return url, method, {}, body
async def _execute_call(self, url: str, method: str = "GET",
headers: dict = None, body: dict = None) -> str:
"""Execute an HTTP API call."""
self._call_count += 1
try:
import aiohttp
async with aiohttp.ClientSession(headers=headers or {}) as session:
try:
async with session.request(
method, url,
json=body,
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
status = resp.status
try:
data = await resp.json()
formatted = json.dumps(data, indent=2)[:3000]
except Exception:
text = await resp.text()
formatted = text[:3000]
return f"HTTP {status}\n\n{formatted}"
except asyncio.TimeoutError:
return f"API call timed out (15s): {url[:100]}"
except Exception as e:
return f"API call failed: {str(e)[:200]}"
except ImportError:
return f"aiohttp not available for API call: {url[:100]}"
|