|
|
""" |
|
|
API helper utilities for reliable data fetching with retry logic |
|
|
""" |
|
|
import time |
|
|
import logging |
|
|
import functools |
|
|
import numpy as np |
|
|
from typing import Any, Dict, Optional, Callable, TypeVar, cast, Union |
|
|
import pandas as pd |
|
|
import requests |
|
|
from tenacity import ( |
|
|
retry, |
|
|
stop_after_attempt, |
|
|
wait_exponential, |
|
|
retry_if_exception_type, |
|
|
RetryError |
|
|
) |
|
|
|
|
|
|
|
|
logger = logging.getLogger("api_helpers") |
|
|
|
|
|
|
|
|
T = TypeVar('T') |
|
|
|
|
|
def validate_dataframe(df: pd.DataFrame, required_columns: list, min_rows: int = 1) -> bool: |
|
|
""" |
|
|
Validate that a pandas DataFrame meets minimum requirements |
|
|
|
|
|
Args: |
|
|
df: DataFrame to validate |
|
|
required_columns: List of column names that must be present |
|
|
min_rows: Minimum number of rows required |
|
|
|
|
|
Returns: |
|
|
True if valid, False otherwise |
|
|
""" |
|
|
|
|
|
if df is None or df.empty or len(df) < min_rows: |
|
|
logger.warning(f"DataFrame validation failed: empty or too few rows (expected {min_rows}, got {0 if df is None or df.empty else len(df)})") |
|
|
return False |
|
|
|
|
|
|
|
|
missing_columns = [col for col in required_columns if col not in df.columns] |
|
|
if missing_columns: |
|
|
logger.warning(f"DataFrame validation failed: missing columns {missing_columns}") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def convert_numpy_types(obj: Any) -> Any: |
|
|
""" |
|
|
Convert numpy types to native Python types for JSON serialization |
|
|
|
|
|
Args: |
|
|
obj: Object that might contain numpy types |
|
|
|
|
|
Returns: |
|
|
Object with numpy types converted to Python types |
|
|
""" |
|
|
if isinstance(obj, np.integer): |
|
|
return int(obj) |
|
|
elif isinstance(obj, np.floating): |
|
|
return float(obj) |
|
|
elif isinstance(obj, np.ndarray): |
|
|
return obj.tolist() |
|
|
elif isinstance(obj, pd.DataFrame): |
|
|
return obj.to_dict(orient='records') |
|
|
elif isinstance(obj, pd.Series): |
|
|
return obj.to_dict() |
|
|
elif isinstance(obj, dict): |
|
|
return {k: convert_numpy_types(v) for k, v in obj.items()} |
|
|
elif isinstance(obj, list): |
|
|
return [convert_numpy_types(item) for item in obj] |
|
|
else: |
|
|
return obj |
|
|
|
|
|
def safe_api_call( |
|
|
func: Callable[..., T], |
|
|
max_retries: int = 3, |
|
|
backoff_factor: float = 2.0, |
|
|
timeout: int = 30, |
|
|
expected_exceptions: tuple = (requests.exceptions.RequestException,), |
|
|
validation_func: Optional[Callable[[T], bool]] = None |
|
|
) -> Callable[..., Dict[str, Any]]: |
|
|
""" |
|
|
Decorator for safely making API calls with retries and error handling |
|
|
|
|
|
Args: |
|
|
func: Function to wrap |
|
|
max_retries: Maximum number of retry attempts |
|
|
backoff_factor: Exponential backoff factor |
|
|
timeout: Request timeout in seconds |
|
|
expected_exceptions: Exceptions to retry on |
|
|
validation_func: Optional function to validate the response |
|
|
|
|
|
Returns: |
|
|
Wrapped function that returns a dict with either data or error |
|
|
""" |
|
|
@functools.wraps(func) |
|
|
def wrapper(*args: Any, **kwargs: Any) -> Dict[str, Any]: |
|
|
""" |
|
|
Wrapper function that adds retry logic and error handling |
|
|
|
|
|
Returns: |
|
|
Dictionary with either successful data or error information |
|
|
""" |
|
|
try: |
|
|
|
|
|
if 'timeout' in kwargs: |
|
|
|
|
|
if kwargs['timeout'] is None: |
|
|
kwargs['timeout'] = timeout |
|
|
|
|
|
|
|
|
retried_func = retry( |
|
|
stop=stop_after_attempt(max_retries), |
|
|
wait=wait_exponential(multiplier=1, min=backoff_factor, max=backoff_factor * 10), |
|
|
retry=retry_if_exception_type(expected_exceptions), |
|
|
reraise=True |
|
|
)(func) |
|
|
|
|
|
|
|
|
result = retried_func(*args, **kwargs) |
|
|
|
|
|
|
|
|
if validation_func and not validation_func(result): |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Data validation failed", |
|
|
"data": None |
|
|
} |
|
|
|
|
|
|
|
|
result = convert_numpy_types(result) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"data": result, |
|
|
"error": None |
|
|
} |
|
|
|
|
|
except RetryError as e: |
|
|
|
|
|
original_error = e.__cause__ |
|
|
logger.error(f"Max retries exceeded in {func.__name__}: {str(original_error)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Max retries exceeded: {str(original_error)}", |
|
|
"data": None |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in {func.__name__}: {str(e)}", exc_info=True) |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"data": None |
|
|
} |
|
|
|
|
|
return wrapper |
|
|
|
|
|
def with_exponential_backoff( |
|
|
max_retries: int = 3, |
|
|
backoff_factor: float = 2.0, |
|
|
expected_exceptions: tuple = (Exception,) |
|
|
) -> Callable[[Callable[..., T]], Callable[..., T]]: |
|
|
""" |
|
|
Decorator for adding exponential backoff retry logic to any function |
|
|
|
|
|
Args: |
|
|
max_retries: Maximum number of retry attempts |
|
|
backoff_factor: Exponential backoff factor |
|
|
expected_exceptions: Exceptions to retry on |
|
|
|
|
|
Returns: |
|
|
Decorator function |
|
|
""" |
|
|
def decorator(func: Callable[..., T]) -> Callable[..., T]: |
|
|
@functools.wraps(func) |
|
|
def wrapper(*args: Any, **kwargs: Any) -> T: |
|
|
""" |
|
|
Wrapper function that adds retry logic |
|
|
|
|
|
Returns: |
|
|
Result of the original function |
|
|
""" |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
return func(*args, **kwargs) |
|
|
except expected_exceptions as e: |
|
|
if attempt == max_retries - 1: |
|
|
|
|
|
raise |
|
|
|
|
|
|
|
|
wait_time = backoff_factor ** attempt |
|
|
logger.warning(f"Attempt {attempt + 1}/{max_retries} failed: {str(e)}. Retrying in {wait_time:.1f} seconds...") |
|
|
time.sleep(wait_time) |
|
|
|
|
|
|
|
|
return cast(T, None) |
|
|
|
|
|
return wrapper |
|
|
|
|
|
return decorator |
|
|
|
|
|
def handle_api_result( |
|
|
result: Dict[str, Any], |
|
|
default_value: T, |
|
|
error_prefix: str = "API Error" |
|
|
) -> Union[T, Dict[str, Any]]: |
|
|
""" |
|
|
Handle the result from a safe_api_call wrapped function |
|
|
|
|
|
Args: |
|
|
result: The result dictionary from safe_api_call |
|
|
default_value: Default value to return if the API call failed |
|
|
error_prefix: Prefix for error message |
|
|
|
|
|
Returns: |
|
|
Either the successful data or an error dictionary |
|
|
""" |
|
|
if result.get("success", False): |
|
|
return result.get("data", default_value) |
|
|
else: |
|
|
error_msg = f"{error_prefix}: {result.get('error', 'Unknown error')}" |
|
|
logger.error(error_msg) |
|
|
return { |
|
|
"error": error_msg, |
|
|
"data": default_value |
|
|
} |