""" API Forge — dynamically integrates with external REST APIs. Allows agents to discover, authenticate, and call REST APIs on the fly. Supports automatic parameter extraction and response parsing. """ import asyncio import json import os from typing import Optional from schemas.agent import ToolOutput _ENABLE_API = os.getenv("ADAM_ENABLE_API", "true").lower() == "true" class APIForge: """ Dynamic API integration tool. Agents can call any REST API using HTTP methods. Includes automatic JSON response parsing, error handling, and timeout management. """ def __init__(self): self._call_count = 0 async def integrate(self, task: str) -> str: """ Execute an API call based on a natural language description. The task description should include the API endpoint and parameters. """ if not _ENABLE_API: return f"API calls disabled. Task: {task[:200]}" # Extract API details from the task description url, method, headers, body = self._parse_api_request(task) if not url: return f"Cannot parse API request from: {task[:200]}" return await self._execute_call(url, method, headers, body) def _parse_api_request(self, task: str) -> tuple: """ Parse an API request from a natural language description. Returns (url, method, headers, body). """ import re task_lower = task.lower() # Extract URL url_match = re.search(r'https?://[^\s,;)]+', task) url = url_match.group(0) if url_match else "" # Determine HTTP method method = "GET" if any(w in task_lower for w in ["post", "create", "submit", "send"]): method = "POST" elif any(w in task_lower for w in ["put", "update"]): method = "PUT" elif any(w in task_lower for w in ["delete", "remove"]): method = "DELETE" elif any(w in task_lower for w in ["patch"]): method = "PATCH" # Extract JSON body (if present) body = None body_match = re.search(r'\{[^{}]*\}', task, re.DOTALL) if body_match: try: body = json.loads(body_match.group(0)) except json.JSONDecodeError: body = body_match.group(0) return url, method, {}, body async def _execute_call(self, url: str, method: str = "GET", headers: dict = None, body: dict = None) -> str: """Execute an HTTP API call.""" self._call_count += 1 try: import aiohttp async with aiohttp.ClientSession(headers=headers or {}) as session: try: async with session.request( method, url, json=body, timeout=aiohttp.ClientTimeout(total=15), ) as resp: status = resp.status try: data = await resp.json() formatted = json.dumps(data, indent=2)[:3000] except Exception: text = await resp.text() formatted = text[:3000] return f"HTTP {status}\n\n{formatted}" except asyncio.TimeoutError: return f"API call timed out (15s): {url[:100]}" except Exception as e: return f"API call failed: {str(e)[:200]}" except ImportError: return f"aiohttp not available for API call: {url[:100]}"