| import os | |
| import httpx | |
| from typing import Dict, Any, Optional | |
| from ..domain.exceptions import HttpClientException, AuthenticationException, RateLimitException | |
| class HttpClient: | |
| def __init__(self): | |
| self.base_url = "https://api.pipedream.com/v1" | |
| self.session: Optional[httpx.AsyncClient] = None | |
| self.access_token: Optional[str] = None | |
| self.rate_limit_token: Optional[str] = None | |
| async def _get_session(self) -> httpx.AsyncClient: | |
| if self.session is None or self.session.is_closed: | |
| self.session = httpx.AsyncClient( | |
| timeout=httpx.Timeout(30.0), | |
| headers={"User-Agent": "Suna-Pipedream-Client/1.0"} | |
| ) | |
| return self.session | |
| async def _ensure_access_token(self) -> str: | |
| if self.access_token: | |
| return self.access_token | |
| project_id = os.getenv("PIPEDREAM_PROJECT_ID") | |
| client_id = os.getenv("PIPEDREAM_CLIENT_ID") | |
| client_secret = os.getenv("PIPEDREAM_CLIENT_SECRET") | |
| if not all([project_id, client_id, client_secret]): | |
| raise AuthenticationException("Missing required environment variables") | |
| session = await self._get_session() | |
| try: | |
| response = await session.post( | |
| f"{self.base_url}/oauth/token", | |
| data={ | |
| "grant_type": "client_credentials", | |
| "client_id": client_id, | |
| "client_secret": client_secret | |
| } | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| self.access_token = data["access_token"] | |
| return self.access_token | |
| except httpx.HTTPStatusError as e: | |
| if e.response.status_code == 429: | |
| raise RateLimitException() | |
| raise AuthenticationException(f"Failed to obtain access token: {e}") | |
| async def get(self, url: str, headers: Dict[str, str] = None, params: Dict[str, Any] = None) -> Dict[str, Any]: | |
| session = await self._get_session() | |
| access_token = await self._ensure_access_token() | |
| request_headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Content-Type": "application/json" | |
| } | |
| if headers: | |
| request_headers.update(headers) | |
| if self.rate_limit_token: | |
| request_headers["x-pd-rate-limit"] = self.rate_limit_token | |
| try: | |
| response = await session.get(url, headers=request_headers, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPStatusError as e: | |
| if e.response.status_code == 429: | |
| raise RateLimitException() | |
| raise HttpClientException(url, e.response.status_code, str(e)) | |
| async def post(self, url: str, headers: Dict[str, str] = None, json: Dict[str, Any] = None) -> Dict[str, Any]: | |
| session = await self._get_session() | |
| access_token = await self._ensure_access_token() | |
| request_headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Content-Type": "application/json" | |
| } | |
| if headers: | |
| request_headers.update(headers) | |
| if self.rate_limit_token: | |
| request_headers["x-pd-rate-limit"] = self.rate_limit_token | |
| try: | |
| response = await session.post(url, headers=request_headers, json=json) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPStatusError as e: | |
| if e.response.status_code == 429: | |
| raise RateLimitException() | |
| raise HttpClientException(url, e.response.status_code, str(e)) | |
| async def close(self) -> None: | |
| if self.session and not self.session.is_closed: | |
| await self.session.aclose() |