| """ |
| API Forge — dynamically integrates with external REST APIs. |
| |
| Allows agents to discover, authenticate, and call REST APIs |
| on the fly. Supports automatic parameter extraction and response parsing. |
| """ |
| import asyncio |
| import json |
| import os |
| from typing import Optional |
| from schemas.agent import ToolOutput |
|
|
| _ENABLE_API = os.getenv("ADAM_ENABLE_API", "true").lower() == "true" |
|
|
|
|
| class APIForge: |
| """ |
| Dynamic API integration tool. |
| |
| Agents can call any REST API using HTTP methods. |
| Includes automatic JSON response parsing, error handling, |
| and timeout management. |
| """ |
|
|
| def __init__(self): |
| self._call_count = 0 |
|
|
| async def integrate(self, task: str) -> str: |
| """ |
| Execute an API call based on a natural language description. |
| The task description should include the API endpoint and parameters. |
| """ |
| if not _ENABLE_API: |
| return f"API calls disabled. Task: {task[:200]}" |
|
|
| |
| url, method, headers, body = self._parse_api_request(task) |
| if not url: |
| return f"Cannot parse API request from: {task[:200]}" |
|
|
| return await self._execute_call(url, method, headers, body) |
|
|
| def _parse_api_request(self, task: str) -> tuple: |
| """ |
| Parse an API request from a natural language description. |
| Returns (url, method, headers, body). |
| """ |
| import re |
| task_lower = task.lower() |
|
|
| |
| url_match = re.search(r'https?://[^\s,;)]+', task) |
| url = url_match.group(0) if url_match else "" |
|
|
| |
| method = "GET" |
| if any(w in task_lower for w in ["post", "create", "submit", "send"]): |
| method = "POST" |
| elif any(w in task_lower for w in ["put", "update"]): |
| method = "PUT" |
| elif any(w in task_lower for w in ["delete", "remove"]): |
| method = "DELETE" |
| elif any(w in task_lower for w in ["patch"]): |
| method = "PATCH" |
|
|
| |
| body = None |
| body_match = re.search(r'\{[^{}]*\}', task, re.DOTALL) |
| if body_match: |
| try: |
| body = json.loads(body_match.group(0)) |
| except json.JSONDecodeError: |
| body = body_match.group(0) |
|
|
| return url, method, {}, body |
|
|
| async def _execute_call(self, url: str, method: str = "GET", |
| headers: dict = None, body: dict = None) -> str: |
| """Execute an HTTP API call.""" |
| self._call_count += 1 |
| try: |
| import aiohttp |
| async with aiohttp.ClientSession(headers=headers or {}) as session: |
| try: |
| async with session.request( |
| method, url, |
| json=body, |
| timeout=aiohttp.ClientTimeout(total=15), |
| ) as resp: |
| status = resp.status |
| try: |
| data = await resp.json() |
| formatted = json.dumps(data, indent=2)[:3000] |
| except Exception: |
| text = await resp.text() |
| formatted = text[:3000] |
|
|
| return f"HTTP {status}\n\n{formatted}" |
|
|
| except asyncio.TimeoutError: |
| return f"API call timed out (15s): {url[:100]}" |
| except Exception as e: |
| return f"API call failed: {str(e)[:200]}" |
|
|
| except ImportError: |
| return f"aiohttp not available for API call: {url[:100]}" |
|
|