BeatDebate / src /api /base_client.py
SulmanK's picture
Remove obsolete test files and refactor agent imports - Deleted unused test files for `test_fix.py` and `test_smart_context.py`. Updated agent imports in `__init__.py` and other files to streamline the codebase and improve maintainability. This cleanup supports ongoing refactoring efforts and enhances the overall structure of the project.
fec9e33
Raw
History Blame Contribute Delete
14.5 kB
"""
Base API Client
Provides unified HTTP request handling, rate limiting, and error handling
for all external API clients in the BeatDebate system.
Aligned with Phase 4 agent architecture using dependency injection patterns.
"""
import asyncio
import time
from typing import Dict, List, Optional, Any, Union
from abc import ABC, abstractmethod
import json
import aiohttp
import structlog
logger = structlog.get_logger(__name__)
class BaseAPIClient(ABC):
"""
Base HTTP client with unified request handling, rate limiting, and error handling.
All API clients (LastFM, Spotify, etc.) should inherit from this class to ensure
consistent behavior across the system. Designed to work with dependency injection
patterns used in our agent architecture.
"""
def __init__(
self,
base_url: str,
rate_limiter: "UnifiedRateLimiter",
timeout: int = 10,
service_name: str = "api"
):
"""
Initialize base API client.
Args:
base_url: Base URL for the API
rate_limiter: Rate limiter instance for this client
timeout: Request timeout in seconds
service_name: Service name for logging and identification
"""
self.base_url = base_url.rstrip('/')
self.rate_limiter = rate_limiter
self.timeout = timeout
self.service_name = service_name
self.session: Optional[aiohttp.ClientSession] = None
# Enhanced logging with service context (matches agent pattern)
self.logger = logger.bind(
service=service_name,
component="BaseAPIClient",
base_url=base_url
)
self.logger.debug("Base API client initialized", timeout=timeout)
async def __aenter__(self):
"""Async context manager entry."""
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.timeout)
)
self.logger.debug("API client session started")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.session:
await self.session.close()
self.logger.debug("API client session closed")
async def _make_request(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
method: str = "GET",
headers: Optional[Dict[str, str]] = None,
retries: int = 3
) -> Dict[str, Any]:
"""
Make rate-limited HTTP request with error handling and retries.
Enhanced with agent-style logging and error handling patterns.
Args:
endpoint: API endpoint (relative to base_url)
params: Query parameters
method: HTTP method (GET, POST, etc.)
headers: Additional headers
retries: Number of retry attempts
Returns:
Parsed JSON response data
Raises:
Exception: For unrecoverable errors
"""
if not self.session:
error_msg = f"{self.service_name} client not initialized. Use async context manager."
self.logger.error("Client not initialized")
raise RuntimeError(error_msg)
# Wait for rate limiter (shared pattern with agents)
await self.rate_limiter.wait_if_needed()
# Build full URL
url = f"{self.base_url}/{endpoint.lstrip('/')}" if endpoint else self.base_url
# Prepare request parameters
request_params = params or {}
request_headers = headers or {}
# Add default headers (service identification pattern)
request_headers.setdefault('User-Agent', f'BeatDebate-{self.service_name}/1.0')
# Enhanced logging with request context
request_context = {
"method": method,
"endpoint": endpoint,
"url": url,
"param_count": len(request_params),
"has_headers": bool(request_headers)
}
for attempt in range(retries + 1):
try:
self.logger.debug(
"Making API request",
attempt=attempt + 1,
max_attempts=retries + 1,
**request_context
)
async with self.session.request(
method=method,
url=url,
params=request_params if method == "GET" else None,
json=request_params if method in ["POST", "PUT", "PATCH"] else None,
headers=request_headers
) as response:
# Handle successful responses
if response.status == 200:
data = await self._parse_response(response)
# Check for API-specific errors in response body
error_info = self._extract_api_error(data)
if error_info:
self.logger.error(
"API error in response body",
error=error_info,
endpoint=endpoint,
status=response.status
)
raise Exception(f"{self.service_name} API error: {error_info}")
self.logger.debug(
"API request successful",
endpoint=endpoint,
status=response.status,
response_size=len(str(data)) if data else 0
)
return data
# Handle rate limiting (common pattern across services)
elif response.status == 429:
wait_time = await self._calculate_backoff_time(response, attempt)
self.logger.warning(
"Rate limited - backing off",
attempt=attempt + 1,
wait_time=wait_time,
endpoint=endpoint,
retry_after=response.headers.get('Retry-After')
)
await asyncio.sleep(wait_time)
continue
# Handle other HTTP errors
else:
await self._handle_http_error(response, endpoint, attempt, retries)
if attempt < retries:
await self._exponential_backoff(attempt)
continue
else:
error_msg = f"{self.service_name} request failed after {retries + 1} attempts"
self.logger.error(
"Request failed after all retries",
final_status=response.status,
endpoint=endpoint,
total_attempts=retries + 1
)
raise Exception(error_msg)
except asyncio.TimeoutError:
self.logger.warning(
"Request timeout",
attempt=attempt + 1,
endpoint=endpoint,
timeout=self.timeout
)
if attempt == retries:
raise Exception(f"{self.service_name} request timed out after {retries + 1} attempts")
await self._exponential_backoff(attempt)
except aiohttp.ClientError as e:
self.logger.error(
"HTTP client error",
error=str(e),
error_type=type(e).__name__,
attempt=attempt + 1,
endpoint=endpoint
)
if attempt == retries:
raise Exception(f"{self.service_name} client error: {str(e)}")
await self._exponential_backoff(attempt)
except Exception as e:
self.logger.error(
"Unexpected error during request",
error=str(e),
error_type=type(e).__name__,
attempt=attempt + 1,
endpoint=endpoint
)
if attempt == retries:
raise
await self._exponential_backoff(attempt)
# Should never reach here, but safety fallback
raise Exception(f"{self.service_name} request failed after {retries + 1} attempts")
async def _parse_response(self, response: aiohttp.ClientResponse) -> Dict[str, Any]:
"""
Parse API response. Can be overridden by subclasses for custom parsing.
Args:
response: HTTP response object
Returns:
Parsed response data
"""
try:
return await response.json()
except json.JSONDecodeError as e:
self.logger.error(f"{self.service_name} invalid JSON response", error=str(e))
raise Exception(f"{self.service_name} returned invalid JSON")
@abstractmethod
def _extract_api_error(self, data: Dict[str, Any]) -> Optional[str]:
"""
Extract API-specific error information from response data.
Must be implemented by subclasses.
Args:
data: Parsed response data
Returns:
Error message if found, None otherwise
"""
pass
async def _calculate_backoff_time(
self,
response: aiohttp.ClientResponse,
attempt: int
) -> float:
"""
Calculate backoff time for rate limiting.
Args:
response: HTTP response with rate limit information
attempt: Current attempt number
Returns:
Wait time in seconds
"""
# Check for Retry-After header
retry_after = response.headers.get('Retry-After')
if retry_after:
try:
return float(retry_after)
except ValueError:
pass
# Default exponential backoff
return min(2 ** attempt, 60) # Max 60 seconds
async def _handle_http_error(
self,
response: aiohttp.ClientResponse,
endpoint: str,
attempt: int,
max_retries: int
):
"""
Handle HTTP error responses.
Args:
response: HTTP response object
endpoint: Request endpoint
attempt: Current attempt number
max_retries: Maximum retry attempts
"""
self.logger.warning(
f"{self.service_name} HTTP error",
status=response.status,
endpoint=endpoint,
attempt=attempt + 1,
max_retries=max_retries + 1
)
# For 4xx errors (except 429), don't retry
if 400 <= response.status < 500 and response.status != 429:
raise Exception(f"{self.service_name} client error: {response.status}")
async def _exponential_backoff(self, attempt: int, base_delay: float = 1.0):
"""
Implement exponential backoff with jitter.
Args:
attempt: Current attempt number (0-based)
base_delay: Base delay in seconds
"""
import random
# Exponential backoff with jitter
delay = base_delay * (2 ** attempt)
jitter = random.uniform(0.1, 0.3) * delay
total_delay = min(delay + jitter, 60.0) # Max 60 seconds
self.logger.debug(
"Backing off before retry",
attempt=attempt + 1,
delay=total_delay,
base_delay=base_delay
)
await asyncio.sleep(total_delay)
async def health_check(self) -> Dict[str, Any]:
"""
Perform health check for this API client.
Supports dependency injection pattern used by agents for service monitoring.
Returns:
Health status information
"""
try:
if not self.session:
return {
"service": self.service_name,
"status": "not_initialized",
"healthy": False,
"message": "Client session not initialized"
}
# Try a simple request to check connectivity
# Subclasses should override this for service-specific health checks
start_time = time.time()
await self._make_request("", method="HEAD", retries=1)
response_time = time.time() - start_time
return {
"service": self.service_name,
"status": "healthy",
"healthy": True,
"response_time_ms": int(response_time * 1000),
"base_url": self.base_url,
"rate_limiter": "active"
}
except Exception as e:
self.logger.warning("Health check failed", error=str(e))
return {
"service": self.service_name,
"status": "unhealthy",
"healthy": False,
"error": str(e),
"base_url": self.base_url
}
def get_service_info(self) -> Dict[str, Any]:
"""
Get service information for dependency injection and monitoring.
Returns:
Service configuration and status information
"""
return {
"service_name": self.service_name,
"base_url": self.base_url,
"timeout": self.timeout,
"session_active": self.session is not None,
"rate_limiter_type": type(self.rate_limiter).__name__,
"component_type": "BaseAPIClient"
}