Spaces:
Running
Running
ADAM v2.0: Advanced Agentic Mesh — DAG orchestrator, cognition, knowledge web, forge tools, runtime optimization
ba2ada2 | """ | |
| API Forge — dynamically integrates with external REST APIs. | |
| Allows agents to discover, authenticate, and call REST APIs | |
| on the fly. Supports automatic parameter extraction and response parsing. | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| from typing import Optional | |
| from schemas.agent import ToolOutput | |
| _ENABLE_API = os.getenv("ADAM_ENABLE_API", "true").lower() == "true" | |
| class APIForge: | |
| """ | |
| Dynamic API integration tool. | |
| Agents can call any REST API using HTTP methods. | |
| Includes automatic JSON response parsing, error handling, | |
| and timeout management. | |
| """ | |
| def __init__(self): | |
| self._call_count = 0 | |
| async def integrate(self, task: str) -> str: | |
| """ | |
| Execute an API call based on a natural language description. | |
| The task description should include the API endpoint and parameters. | |
| """ | |
| if not _ENABLE_API: | |
| return f"API calls disabled. Task: {task[:200]}" | |
| # Extract API details from the task description | |
| url, method, headers, body = self._parse_api_request(task) | |
| if not url: | |
| return f"Cannot parse API request from: {task[:200]}" | |
| return await self._execute_call(url, method, headers, body) | |
| def _parse_api_request(self, task: str) -> tuple: | |
| """ | |
| Parse an API request from a natural language description. | |
| Returns (url, method, headers, body). | |
| """ | |
| import re | |
| task_lower = task.lower() | |
| # Extract URL | |
| url_match = re.search(r'https?://[^\s,;)]+', task) | |
| url = url_match.group(0) if url_match else "" | |
| # Determine HTTP method | |
| method = "GET" | |
| if any(w in task_lower for w in ["post", "create", "submit", "send"]): | |
| method = "POST" | |
| elif any(w in task_lower for w in ["put", "update"]): | |
| method = "PUT" | |
| elif any(w in task_lower for w in ["delete", "remove"]): | |
| method = "DELETE" | |
| elif any(w in task_lower for w in ["patch"]): | |
| method = "PATCH" | |
| # Extract JSON body (if present) | |
| body = None | |
| body_match = re.search(r'\{[^{}]*\}', task, re.DOTALL) | |
| if body_match: | |
| try: | |
| body = json.loads(body_match.group(0)) | |
| except json.JSONDecodeError: | |
| body = body_match.group(0) | |
| return url, method, {}, body | |
| async def _execute_call(self, url: str, method: str = "GET", | |
| headers: dict = None, body: dict = None) -> str: | |
| """Execute an HTTP API call.""" | |
| self._call_count += 1 | |
| try: | |
| import aiohttp | |
| async with aiohttp.ClientSession(headers=headers or {}) as session: | |
| try: | |
| async with session.request( | |
| method, url, | |
| json=body, | |
| timeout=aiohttp.ClientTimeout(total=15), | |
| ) as resp: | |
| status = resp.status | |
| try: | |
| data = await resp.json() | |
| formatted = json.dumps(data, indent=2)[:3000] | |
| except Exception: | |
| text = await resp.text() | |
| formatted = text[:3000] | |
| return f"HTTP {status}\n\n{formatted}" | |
| except asyncio.TimeoutError: | |
| return f"API call timed out (15s): {url[:100]}" | |
| except Exception as e: | |
| return f"API call failed: {str(e)[:200]}" | |
| except ImportError: | |
| return f"aiohttp not available for API call: {url[:100]}" | |