| """ |
| HTTP utility functions for making synchronous and asynchronous HTTP requests. |
| |
| This module provides a unified interface for HTTP operations using httpx, |
| with proper error handling, timeout configuration, and retry logic. |
| """ |
|
|
| import asyncio |
| import os |
| import time |
| import uuid |
| from typing import Any, Dict, Optional |
|
|
| import httpx |
|
|
| from utils.bio_logger import bio_logger as logger |
|
|
|
|
| class HTTPError(Exception): |
| """Custom exception for HTTP-related errors.""" |
|
|
| def __init__(self, status_code: int, message: str, url: str): |
| self.status_code = status_code |
| self.message = message |
| self.url = url |
| super().__init__(f"HTTP {status_code}: {message} for {url}") |
|
|
|
|
| def _create_timeout(timeout: float = 10.0) -> httpx.Timeout: |
| """Create a timeout configuration for HTTP requests.""" |
| return httpx.Timeout(timeout, connect=5.0) |
|
|
|
|
| def _handle_response(response: httpx.Response, url: str) -> Any: |
| """Handle HTTP response and raise appropriate exceptions.""" |
| if response.status_code == 200: |
| return response.json() |
|
|
| logger.error(f"HTTP request failed: {response.status_code} for {url}") |
| raise HTTPError( |
| status_code=response.status_code, |
| message=f"Request failed with status {response.status_code}", |
| url=url, |
| ) |
|
|
|
|
| async def async_http_get( |
| url: str, |
| params: Optional[Dict[str, Any]] = None, |
| timeout: float = 10.0, |
| headers: Optional[Dict[str, str]] = None, |
| ) -> Any: |
| """ |
| Make an asynchronous HTTP GET request. |
| |
| Args: |
| url: The URL to make the request to |
| params: Query parameters to include in the request |
| timeout: Request timeout in seconds |
| headers: Optional headers to include in the request |
| |
| Returns: |
| The JSON response from the server |
| |
| Raises: |
| HTTPError: If the request fails |
| httpx.RequestError: If there's a network error |
| """ |
| timeout_config = _create_timeout(timeout) |
| start_time = time.time() |
|
|
| async with httpx.AsyncClient(timeout=timeout_config) as client: |
| response = await client.get(url=url, params=params, headers=headers) |
| duration = time.time() - start_time |
|
|
| |
| logger.log_api_call("GET", url, response.status_code, duration) |
|
|
| return _handle_response(response, url) |
|
|
|
|
| def http_get( |
| url: str, |
| params: Optional[Dict[str, Any]] = None, |
| timeout: float = 10.0, |
| headers: Optional[Dict[str, str]] = None, |
| ) -> Any: |
| """ |
| Make a synchronous HTTP GET request. |
| |
| Args: |
| url: The URL to make the request to |
| params: Query parameters to include in the request |
| timeout: Request timeout in seconds |
| headers: Optional headers to include in the request |
| |
| Returns: |
| The JSON response from the server |
| |
| Raises: |
| HTTPError: If the request fails |
| httpx.RequestError: If there's a network error |
| """ |
| timeout_config = _create_timeout(timeout) |
| start_time = time.time() |
|
|
| with httpx.Client(timeout=timeout_config) as client: |
| response = client.get(url=url, params=params, headers=headers) |
| duration = time.time() - start_time |
|
|
| |
| logger.log_api_call("GET", url, response.status_code, duration) |
|
|
| return _handle_response(response, url) |
|
|
|
|
| def http_post( |
| url: str, data: Any, headers: Optional[Dict[str, Any]] = None, timeout: float = 10.0 |
| ) -> Any: |
| """ |
| Make a synchronous HTTP POST request. |
| |
| Args: |
| url: The URL to make the request to |
| data: The data to send in the request body |
| headers: Optional headers to include in the request |
| timeout: Request timeout in seconds |
| |
| Returns: |
| The JSON response from the server |
| |
| Raises: |
| HTTPError: If the request fails |
| httpx.RequestError: If there's a network error |
| """ |
| timeout_config = _create_timeout(timeout) |
| start_time = time.time() |
|
|
| with httpx.Client(timeout=timeout_config) as client: |
| response = client.post(url=url, json=data, headers=headers) |
| duration = time.time() - start_time |
|
|
| |
| logger.log_api_call("POST", url, response.status_code, duration) |
|
|
| return _handle_response(response, url) |
|
|
|
|
| async def async_http_post( |
| url: str, |
| data: Any, |
| headers: Optional[Dict[str, Any]] = None, |
| timeout: float = 10.0, |
| max_retries: int = 3, |
| retry_delay: float = 0.5, |
| ) -> Any: |
| """ |
| Make an asynchronous HTTP POST request with retry logic. |
| |
| Args: |
| url: The URL to make the request to |
| data: The data to send in the request body |
| headers: Optional headers to include in the request |
| timeout: Request timeout in seconds |
| max_retries: Maximum number of retry attempts |
| retry_delay: Delay between retries in seconds |
| |
| Returns: |
| The JSON response from the server |
| |
| Raises: |
| HTTPError: If the request fails after all retries |
| httpx.RequestError: If there's a network error |
| """ |
| timeout_config = _create_timeout(timeout) |
|
|
| async with httpx.AsyncClient(timeout=timeout_config) as client: |
| for attempt in range(1, max_retries + 1): |
| try: |
| start_time = time.time() |
| response = await client.post(url=url, json=data, headers=headers) |
| duration = time.time() - start_time |
|
|
| |
| logger.log_api_call("POST", url, response.status_code, duration) |
|
|
| if response.status_code == 200: |
| return response.json() |
| else: |
| logger.error( |
| f"HTTP POST failed (attempt {attempt}/{max_retries}): " |
| f"{response.status_code} for {url}" |
| ) |
| if attempt < max_retries: |
| await asyncio.sleep(retry_delay) |
| else: |
| raise HTTPError( |
| status_code=response.status_code, |
| message=f"Request failed after {max_retries} attempts", |
| url=url, |
| ) |
| except httpx.RequestError as e: |
| logger.error(f"Network error on attempt {attempt}: {e}") |
| if attempt < max_retries: |
| await asyncio.sleep(retry_delay) |
| else: |
| raise HTTPError( |
| status_code=0, |
| message=f"Network error after {max_retries} attempts: {str(e)}", |
| url=url, |
| ) from e |
|
|
| raise HTTPError( |
| status_code=0, |
| message=f"Failed to fetch data from {url} after {max_retries} attempts", |
| url=url, |
| ) |
|
|
|
|
| def download_file( |
| file_url: str, directory_path: str, timeout: int = 60, verify_ssl: bool = True |
| ) -> Optional[str]: |
| """ |
| Download a file from a URL to a local directory. |
| |
| Args: |
| file_url: The URL of the file to download |
| directory_path: The directory to save the file in |
| timeout: Request timeout in seconds |
| verify_ssl: Whether to verify SSL certificates |
| |
| Returns: |
| The path to the downloaded file, or None if download failed |
| """ |
| |
| file_extension = file_url.split(".")[-1].split("?")[0] |
| random_filename = f"{uuid.uuid4()}.{file_extension}" |
|
|
| |
| os.makedirs(directory_path, exist_ok=True) |
| file_path = os.path.join(directory_path, random_filename) |
|
|
| try: |
| with httpx.Client(timeout=timeout, verify=verify_ssl) as client: |
| with client.stream("GET", file_url) as response: |
| if response.status_code == 200: |
| with open(file_path, "wb") as file: |
| for chunk in response.iter_bytes(chunk_size=8192): |
| file.write(chunk) |
| logger.info(f"Successfully downloaded file to {file_path}") |
| return file_path |
| else: |
| logger.error( |
| f"Download failed with status code: {response.status_code}" |
| ) |
| return None |
| except httpx.TimeoutException: |
| logger.error("Download request timed out") |
| return None |
| except httpx.RequestError as e: |
| logger.error(f"Download request failed: {e}") |
| return None |
| except Exception as e: |
| logger.error(f"Unexpected error during download: {e}") |
| return None |
|
|
|
|
| |
| async def async_http_post_legacy(url: str, params: dict) -> Any: |
| """ |
| Legacy async HTTP POST function for backward compatibility. |
| |
| This function maintains the old interface but uses the new implementation. |
| """ |
| return await async_http_post(url=url, data=params) |
|
|