Spaces:
Sleeping
Sleeping
| """HTTP handler for LLM providers. | |
| This module provides a generic HTTP handler for making requests to LLM providers | |
| when direct SDK usage is not desired. | |
| """ | |
| import json | |
| import asyncio | |
| import random | |
| import time | |
| from typing import Any, Dict, List, Optional, Union, Generator, AsyncGenerator | |
| import requests | |
| from requests import HTTPError | |
| from aworld.logs.util import logger | |
| from aworld.utils import import_package | |
| class LLMHTTPHandler: | |
| """HTTP handler for LLM providers. | |
| This class provides methods to make HTTP requests to LLM providers | |
| instead of using their SDKs directly. | |
| """ | |
| def __init__( | |
| self, | |
| base_url: str, | |
| api_key: str, | |
| model_name: str, | |
| headers: Optional[Dict[str, str]] = None, | |
| timeout: int = 180, | |
| max_retries: int = 3, | |
| ) -> None: | |
| """Initialize the HTTP handler. | |
| Args: | |
| base_url: Base URL for the LLM API. | |
| api_key: API key for authentication. | |
| model_name: Name of the model to use. | |
| headers: Additional headers to include in requests. | |
| timeout: Request timeout in seconds. | |
| max_retries: Maximum number of retries for failed requests. | |
| """ | |
| import_package("aiohttp") | |
| self.base_url = base_url.rstrip("/") | |
| self.api_key = api_key | |
| self.model_name = model_name | |
| self.timeout = timeout | |
| self.max_retries = max_retries | |
| # Set up default headers | |
| self.headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {api_key}", | |
| } | |
| if headers: | |
| self.headers.update(headers) | |
| def _parse_sse_line(self, line: bytes) -> Optional[Dict[str, Any]]: | |
| """Parse a Server-Sent Events (SSE) line. | |
| Args: | |
| line: Raw SSE line. | |
| Returns: | |
| Parsed JSON data if successful, None otherwise. | |
| """ | |
| try: | |
| # Remove 'data: ' prefix if present | |
| line_str = line.decode('utf-8').strip() | |
| if line_str.startswith('data: '): | |
| line_str = line_str[6:] | |
| # Skip empty lines | |
| if not line_str: | |
| return None | |
| return json.loads(line_str) | |
| except (json.JSONDecodeError, UnicodeDecodeError) as e: | |
| logger.warning(f"Failed to parse SSE line: {line}, error: {str(e)}") | |
| return None | |
| def _make_request( | |
| self, | |
| endpoint: str, | |
| data: Dict[str, Any], | |
| stream: bool = False, | |
| headers: Optional[Dict[str, str]] = None, | |
| ) -> Union[Dict[str, Any], Generator[Dict[str, Any], None, None]]: | |
| """Make a synchronous HTTP request. | |
| Args: | |
| endpoint: API endpoint to call. | |
| data: Request data to send. | |
| stream: Whether to stream the response. | |
| Returns: | |
| Response data or generator of response chunks. | |
| Raises: | |
| requests.exceptions.RequestException: If the request fails. | |
| """ | |
| url = f"{self.base_url}/{endpoint.lstrip('/')}" | |
| request_headers = self.headers.copy() | |
| if headers: | |
| request_headers.update(headers) | |
| try: | |
| if stream: | |
| response = requests.post( | |
| url, | |
| headers=request_headers, | |
| json=data, | |
| stream=True, | |
| timeout=self.timeout, | |
| ) | |
| response.raise_for_status() | |
| def generate_chunks(): | |
| for line in response.iter_lines(): | |
| if line: | |
| line_str = line.decode('utf-8').strip() | |
| if line_str.startswith('data: '): | |
| line_content = line_str[6:] | |
| if line_content == "[DONE]": | |
| yield {"status": "done", "message": "Stream completed"} | |
| break | |
| elif line_content == "[REVOKE]": | |
| yield {"status": "revoke", "message": "Content should be revoked"} | |
| continue | |
| elif line_content == "[FAIL]": | |
| yield {"status": "fail", "message": "Request failed"} | |
| break | |
| elif line_content.startswith("[FAIL]_stream was reset: CANCEL"): | |
| yield {"status": "cancel", "message": "Stream was cancelled"} | |
| break | |
| chunk = self._parse_sse_line(line) | |
| if chunk is not None: | |
| yield chunk | |
| return generate_chunks() | |
| else: | |
| response = requests.post( | |
| url, | |
| headers=request_headers, | |
| json=data, | |
| timeout=self.timeout, | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"Error in HttpHandler: {str(e)}") | |
| raise | |
| async def _make_async_request_stream( | |
| self, | |
| endpoint: str, | |
| data: Dict[str, Any], | |
| headers: Optional[Dict[str, str]] = None, | |
| ) -> AsyncGenerator[Dict[str, Any], None]: | |
| """Make an asynchronous streaming HTTP request. | |
| Args: | |
| endpoint: API endpoint to call. | |
| data: Request data to send. | |
| Yields: | |
| Response chunks. | |
| Raises: | |
| aiohttp.ClientError: If the request fails. | |
| """ | |
| import aiohttp | |
| url = f"{self.base_url}/{endpoint.lstrip('/')}" | |
| request_headers = self.headers.copy() | |
| if headers: | |
| request_headers.update(headers) | |
| # Create an independent session and keep it open | |
| session = aiohttp.ClientSession() | |
| try: | |
| response = await session.post( | |
| url, | |
| headers=request_headers, | |
| json=data, | |
| timeout=self.timeout, | |
| ) | |
| response.raise_for_status() | |
| # Implement async generator directly | |
| async for line in response.content: | |
| if line: | |
| line_str = line.decode('utf-8').strip() | |
| if line_str.startswith('data: '): | |
| line_content = line_str[6:] | |
| if line_content == "[DONE]": | |
| yield {"status": "done", "message": "Stream completed"} | |
| break | |
| elif line_content == "[REVOKE]": | |
| yield {"status": "revoke", "message": "Content should be revoked"} | |
| continue | |
| elif line_content == "[FAIL]": | |
| yield {"status": "fail", "message": "Request failed"} | |
| break | |
| elif line_content.startswith("[FAIL]_stream was reset: CANCEL"): | |
| yield {"status": "cancel", "message": "Stream was cancelled"} | |
| break | |
| chunk = self._parse_sse_line(line) | |
| if chunk is not None: | |
| yield chunk | |
| except Exception as e: | |
| logger.error(f"Error in stream: {str(e)}") | |
| raise | |
| finally: | |
| # Ensure the session is eventually closed | |
| await session.close() | |
| async def _make_async_request( | |
| self, | |
| endpoint: str, | |
| data: Dict[str, Any], | |
| headers: Optional[Dict[str, str]] = None, | |
| ) -> Dict[str, Any]: | |
| """Make an asynchronous non-streaming HTTP request. | |
| Args: | |
| endpoint: API endpoint to call. | |
| data: Request data to send. | |
| Returns: | |
| Response data. | |
| Raises: | |
| aiohttp.ClientError: If the request fails. | |
| """ | |
| import aiohttp | |
| url = f"{self.base_url}/{endpoint.lstrip('/')}" | |
| request_headers = self.headers.copy() | |
| if headers: | |
| request_headers.update(headers) | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| url, | |
| headers=request_headers, | |
| json=data, | |
| timeout=self.timeout, | |
| ) as response: | |
| response.raise_for_status() | |
| return await response.json() | |
| def sync_call( | |
| self, | |
| data: Dict[str, Any], | |
| endpoint: str = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| ) -> Dict[str, Any]: | |
| """Make a synchronous completion request. | |
| Args: | |
| data: Request data. | |
| Returns: | |
| Response data. | |
| """ | |
| logger.debug(f"sync_call request data: {data}") | |
| if not endpoint: | |
| endpoint = "chat/completions" | |
| retries = 0 | |
| while retries < self.max_retries: | |
| try: | |
| response = self._make_request(endpoint, data, headers=headers) | |
| return response | |
| except Exception as e: | |
| last_error = e | |
| retries += 1 | |
| if retries < self.max_retries: | |
| logger.warning(f"Request failed, retrying ({retries}/{self.max_retries}): {str(e)}") | |
| # Exponential backoff with jitter | |
| backoff = min(2 ** retries + random.uniform(0, 1), 10) | |
| time.sleep(backoff) | |
| else: | |
| logger.error(f"Request failed after {self.max_retries} retries: {str(e)}") | |
| raise last_error | |
| async def async_call( | |
| self, | |
| data: Dict[str, Any], | |
| endpoint: str = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| ) -> Dict[str, Any]: | |
| """Make an asynchronous completion request. | |
| Args: | |
| data: Request data. | |
| Returns: | |
| Response data. | |
| """ | |
| import aiohttp | |
| logger.info(f"async_call request data: {data}") | |
| retries = 0 | |
| last_error = None | |
| if not endpoint: | |
| endpoint = "chat/completions" | |
| while retries < self.max_retries: | |
| try: | |
| response = await self._make_async_request(endpoint, data, headers=headers) | |
| return response | |
| except (aiohttp.ClientError, asyncio.TimeoutError) as e: | |
| last_error = e | |
| retries += 1 | |
| if retries < self.max_retries: | |
| logger.warning(f"Request failed, retrying ({retries}/{self.max_retries}): {str(e)}") | |
| # Exponential backoff with jitter | |
| backoff = min(2 ** retries + random.uniform(0, 1), 10) | |
| await asyncio.sleep(backoff) | |
| else: | |
| logger.error(f"Request failed after {self.max_retries} retries: {str(e)}") | |
| raise last_error | |
| def sync_stream_call( | |
| self, | |
| data: Dict[str, Any], | |
| endpoint: str = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| ) -> Generator[Dict[str, Any], None, None]: | |
| """Make a synchronous streaming completion request. | |
| Args: | |
| data: Request data. | |
| Yields: | |
| Response chunks. | |
| """ | |
| data["stream"] = True | |
| logger.info(f"sync_stream_call request data: {data}") | |
| retries = 0 | |
| while retries < self.max_retries: | |
| try: | |
| for chunk in self._make_request(endpoint or "chat/completions", data, stream=True, headers=headers): | |
| yield chunk | |
| return # Exit after completing stream processing | |
| except Exception as e: | |
| last_error = e | |
| retries += 1 | |
| if retries < self.max_retries: | |
| logger.warning(f"Stream connection failed, retrying ({retries}/{self.max_retries}): {str(e)}") | |
| else: | |
| logger.error(f"Stream connection failed after {self.max_retries} retries: {str(e)}") | |
| raise last_error | |
| async def async_stream_call( | |
| self, | |
| data: Dict[str, Any], | |
| endpoint: str = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| ) -> AsyncGenerator[Dict[str, Any], None]: | |
| """Make an asynchronous streaming completion request. | |
| Args: | |
| data: Request data. | |
| Yields: | |
| Response chunks. | |
| """ | |
| import aiohttp | |
| data["stream"] = True | |
| logger.info(f"async_stream_call request data: {data}") | |
| retries = 0 | |
| last_error = None | |
| while retries < self.max_retries: | |
| try: | |
| async for chunk in self._make_async_request_stream(endpoint or "chat/completions", data, headers=headers): | |
| yield chunk | |
| return # Exit after completing stream processing | |
| except (aiohttp.ClientError, asyncio.TimeoutError) as e: | |
| last_error = e | |
| retries += 1 | |
| if retries < self.max_retries: | |
| logger.warning(f"Stream connection failed, retrying ({retries}/{self.max_retries}): {str(e)}") | |
| await asyncio.sleep(1) # Wait one second before retrying | |
| else: | |
| logger.error(f"Stream connection failed after {self.max_retries} retries: {str(e)}") | |
| raise last_error | |