File size: 3,679 Bytes
ba2ada2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""
API Forge — dynamically integrates with external REST APIs.

Allows agents to discover, authenticate, and call REST APIs
on the fly. Supports automatic parameter extraction and response parsing.
"""
import asyncio
import json
import os
from typing import Optional
from schemas.agent import ToolOutput

_ENABLE_API = os.getenv("ADAM_ENABLE_API", "true").lower() == "true"


class APIForge:
    """
    Dynamic API integration tool.

    Agents can call any REST API using HTTP methods.
    Includes automatic JSON response parsing, error handling,
    and timeout management.
    """

    def __init__(self):
        self._call_count = 0

    async def integrate(self, task: str) -> str:
        """
        Execute an API call based on a natural language description.
        The task description should include the API endpoint and parameters.
        """
        if not _ENABLE_API:
            return f"API calls disabled. Task: {task[:200]}"

        # Extract API details from the task description
        url, method, headers, body = self._parse_api_request(task)
        if not url:
            return f"Cannot parse API request from: {task[:200]}"

        return await self._execute_call(url, method, headers, body)

    def _parse_api_request(self, task: str) -> tuple:
        """
        Parse an API request from a natural language description.
        Returns (url, method, headers, body).
        """
        import re
        task_lower = task.lower()

        # Extract URL
        url_match = re.search(r'https?://[^\s,;)]+', task)
        url = url_match.group(0) if url_match else ""

        # Determine HTTP method
        method = "GET"
        if any(w in task_lower for w in ["post", "create", "submit", "send"]):
            method = "POST"
        elif any(w in task_lower for w in ["put", "update"]):
            method = "PUT"
        elif any(w in task_lower for w in ["delete", "remove"]):
            method = "DELETE"
        elif any(w in task_lower for w in ["patch"]):
            method = "PATCH"

        # Extract JSON body (if present)
        body = None
        body_match = re.search(r'\{[^{}]*\}', task, re.DOTALL)
        if body_match:
            try:
                body = json.loads(body_match.group(0))
            except json.JSONDecodeError:
                body = body_match.group(0)

        return url, method, {}, body

    async def _execute_call(self, url: str, method: str = "GET",
                             headers: dict = None, body: dict = None) -> str:
        """Execute an HTTP API call."""
        self._call_count += 1
        try:
            import aiohttp
            async with aiohttp.ClientSession(headers=headers or {}) as session:
                try:
                    async with session.request(
                        method, url,
                        json=body,
                        timeout=aiohttp.ClientTimeout(total=15),
                    ) as resp:
                        status = resp.status
                        try:
                            data = await resp.json()
                            formatted = json.dumps(data, indent=2)[:3000]
                        except Exception:
                            text = await resp.text()
                            formatted = text[:3000]

                        return f"HTTP {status}\n\n{formatted}"

                except asyncio.TimeoutError:
                    return f"API call timed out (15s): {url[:100]}"
                except Exception as e:
                    return f"API call failed: {str(e)[:200]}"

        except ImportError:
            return f"aiohttp not available for API call: {url[:100]}"