""" API helper utilities for reliable data fetching with retry logic """ import time import logging import functools import numpy as np from typing import Any, Dict, Optional, Callable, TypeVar, cast, Union import pandas as pd import requests from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryError ) # Set up logging logger = logging.getLogger("api_helpers") # Type variable for return type of functions T = TypeVar('T') def validate_dataframe(df: pd.DataFrame, required_columns: list, min_rows: int = 1) -> bool: """ Validate that a pandas DataFrame meets minimum requirements Args: df: DataFrame to validate required_columns: List of column names that must be present min_rows: Minimum number of rows required Returns: True if valid, False otherwise """ # Check if DataFrame is empty if df is None or df.empty or len(df) < min_rows: logger.warning(f"DataFrame validation failed: empty or too few rows (expected {min_rows}, got {0 if df is None or df.empty else len(df)})") return False # Check for required columns missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: logger.warning(f"DataFrame validation failed: missing columns {missing_columns}") return False return True def convert_numpy_types(obj: Any) -> Any: """ Convert numpy types to native Python types for JSON serialization Args: obj: Object that might contain numpy types Returns: Object with numpy types converted to Python types """ if isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, pd.DataFrame): return obj.to_dict(orient='records') elif isinstance(obj, pd.Series): return obj.to_dict() elif isinstance(obj, dict): return {k: convert_numpy_types(v) for k, v in obj.items()} elif isinstance(obj, list): return [convert_numpy_types(item) for item in obj] else: return obj def safe_api_call( func: Callable[..., T], max_retries: int = 3, backoff_factor: float = 2.0, timeout: int = 30, expected_exceptions: tuple = (requests.exceptions.RequestException,), validation_func: Optional[Callable[[T], bool]] = None ) -> Callable[..., Dict[str, Any]]: """ Decorator for safely making API calls with retries and error handling Args: func: Function to wrap max_retries: Maximum number of retry attempts backoff_factor: Exponential backoff factor timeout: Request timeout in seconds expected_exceptions: Exceptions to retry on validation_func: Optional function to validate the response Returns: Wrapped function that returns a dict with either data or error """ @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Dict[str, Any]: """ Wrapper function that adds retry logic and error handling Returns: Dictionary with either successful data or error information """ try: # Add the timeout parameter if it's a keyword argument in the original function if 'timeout' in kwargs: # Only override if not explicitly provided if kwargs['timeout'] is None: kwargs['timeout'] = timeout # Apply the retry decorator dynamically retried_func = retry( stop=stop_after_attempt(max_retries), wait=wait_exponential(multiplier=1, min=backoff_factor, max=backoff_factor * 10), retry=retry_if_exception_type(expected_exceptions), reraise=True )(func) # Call the function with retries result = retried_func(*args, **kwargs) # Validate result if validation function is provided if validation_func and not validation_func(result): return { "success": False, "error": "Data validation failed", "data": None } # Convert numpy types for JSON serialization result = convert_numpy_types(result) return { "success": True, "data": result, "error": None } except RetryError as e: # This means we exceeded max retries original_error = e.__cause__ logger.error(f"Max retries exceeded in {func.__name__}: {str(original_error)}") return { "success": False, "error": f"Max retries exceeded: {str(original_error)}", "data": None } except Exception as e: logger.error(f"Error in {func.__name__}: {str(e)}", exc_info=True) return { "success": False, "error": str(e), "data": None } return wrapper def with_exponential_backoff( max_retries: int = 3, backoff_factor: float = 2.0, expected_exceptions: tuple = (Exception,) ) -> Callable[[Callable[..., T]], Callable[..., T]]: """ Decorator for adding exponential backoff retry logic to any function Args: max_retries: Maximum number of retry attempts backoff_factor: Exponential backoff factor expected_exceptions: Exceptions to retry on Returns: Decorator function """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> T: """ Wrapper function that adds retry logic Returns: Result of the original function """ for attempt in range(max_retries): try: return func(*args, **kwargs) except expected_exceptions as e: if attempt == max_retries - 1: # Last attempt, re-raise the exception raise # Calculate wait time with exponential backoff wait_time = backoff_factor ** attempt logger.warning(f"Attempt {attempt + 1}/{max_retries} failed: {str(e)}. Retrying in {wait_time:.1f} seconds...") time.sleep(wait_time) # This should not be reached, but return a sensible default return cast(T, None) return wrapper return decorator def handle_api_result( result: Dict[str, Any], default_value: T, error_prefix: str = "API Error" ) -> Union[T, Dict[str, Any]]: """ Handle the result from a safe_api_call wrapped function Args: result: The result dictionary from safe_api_call default_value: Default value to return if the API call failed error_prefix: Prefix for error message Returns: Either the successful data or an error dictionary """ if result.get("success", False): return result.get("data", default_value) else: error_msg = f"{error_prefix}: {result.get('error', 'Unknown error')}" logger.error(error_msg) return { "error": error_msg, "data": default_value }