File size: 7,758 Bytes
eb27803
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""
API helper utilities for reliable data fetching with retry logic
"""
import time
import logging
import functools
import numpy as np
from typing import Any, Dict, Optional, Callable, TypeVar, cast, Union
import pandas as pd
import requests
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
    RetryError
)

# Set up logging
logger = logging.getLogger("api_helpers")

# Type variable for return type of functions
T = TypeVar('T')

def validate_dataframe(df: pd.DataFrame, required_columns: list, min_rows: int = 1) -> bool:
    """
    Validate that a pandas DataFrame meets minimum requirements
    
    Args:
        df: DataFrame to validate
        required_columns: List of column names that must be present
        min_rows: Minimum number of rows required
        
    Returns:
        True if valid, False otherwise
    """
    # Check if DataFrame is empty
    if df is None or df.empty or len(df) < min_rows:
        logger.warning(f"DataFrame validation failed: empty or too few rows (expected {min_rows}, got {0 if df is None or df.empty else len(df)})")
        return False
    
    # Check for required columns
    missing_columns = [col for col in required_columns if col not in df.columns]
    if missing_columns:
        logger.warning(f"DataFrame validation failed: missing columns {missing_columns}")
        return False
    
    return True

def convert_numpy_types(obj: Any) -> Any:
    """
    Convert numpy types to native Python types for JSON serialization
    
    Args:
        obj: Object that might contain numpy types
        
    Returns:
        Object with numpy types converted to Python types
    """
    if isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.floating):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, pd.DataFrame):
        return obj.to_dict(orient='records')
    elif isinstance(obj, pd.Series):
        return obj.to_dict()
    elif isinstance(obj, dict):
        return {k: convert_numpy_types(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy_types(item) for item in obj]
    else:
        return obj

def safe_api_call(
    func: Callable[..., T],
    max_retries: int = 3,
    backoff_factor: float = 2.0,
    timeout: int = 30,
    expected_exceptions: tuple = (requests.exceptions.RequestException,),
    validation_func: Optional[Callable[[T], bool]] = None
) -> Callable[..., Dict[str, Any]]:
    """
    Decorator for safely making API calls with retries and error handling
    
    Args:
        func: Function to wrap
        max_retries: Maximum number of retry attempts
        backoff_factor: Exponential backoff factor
        timeout: Request timeout in seconds
        expected_exceptions: Exceptions to retry on
        validation_func: Optional function to validate the response
        
    Returns:
        Wrapped function that returns a dict with either data or error
    """
    @functools.wraps(func)
    def wrapper(*args: Any, **kwargs: Any) -> Dict[str, Any]:
        """
        Wrapper function that adds retry logic and error handling
        
        Returns:
            Dictionary with either successful data or error information
        """
        try:
            # Add the timeout parameter if it's a keyword argument in the original function
            if 'timeout' in kwargs:
                # Only override if not explicitly provided
                if kwargs['timeout'] is None:
                    kwargs['timeout'] = timeout
            
            # Apply the retry decorator dynamically
            retried_func = retry(
                stop=stop_after_attempt(max_retries),
                wait=wait_exponential(multiplier=1, min=backoff_factor, max=backoff_factor * 10),
                retry=retry_if_exception_type(expected_exceptions),
                reraise=True
            )(func)
            
            # Call the function with retries
            result = retried_func(*args, **kwargs)
            
            # Validate result if validation function is provided
            if validation_func and not validation_func(result):
                return {
                    "success": False,
                    "error": "Data validation failed",
                    "data": None
                }
            
            # Convert numpy types for JSON serialization
            result = convert_numpy_types(result)
            
            return {
                "success": True,
                "data": result,
                "error": None
            }
            
        except RetryError as e:
            # This means we exceeded max retries
            original_error = e.__cause__
            logger.error(f"Max retries exceeded in {func.__name__}: {str(original_error)}")
            return {
                "success": False,
                "error": f"Max retries exceeded: {str(original_error)}",
                "data": None
            }
            
        except Exception as e:
            logger.error(f"Error in {func.__name__}: {str(e)}", exc_info=True)
            return {
                "success": False,
                "error": str(e),
                "data": None
            }
    
    return wrapper

def with_exponential_backoff(
    max_retries: int = 3,
    backoff_factor: float = 2.0,
    expected_exceptions: tuple = (Exception,)
) -> Callable[[Callable[..., T]], Callable[..., T]]:
    """
    Decorator for adding exponential backoff retry logic to any function
    
    Args:
        max_retries: Maximum number of retry attempts
        backoff_factor: Exponential backoff factor
        expected_exceptions: Exceptions to retry on
        
    Returns:
        Decorator function
    """
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @functools.wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> T:
            """
            Wrapper function that adds retry logic
            
            Returns:
                Result of the original function
            """
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except expected_exceptions as e:
                    if attempt == max_retries - 1:
                        # Last attempt, re-raise the exception
                        raise
                    
                    # Calculate wait time with exponential backoff
                    wait_time = backoff_factor ** attempt
                    logger.warning(f"Attempt {attempt + 1}/{max_retries} failed: {str(e)}. Retrying in {wait_time:.1f} seconds...")
                    time.sleep(wait_time)
            
            # This should not be reached, but return a sensible default
            return cast(T, None)
        
        return wrapper
    
    return decorator

def handle_api_result(
    result: Dict[str, Any], 
    default_value: T, 
    error_prefix: str = "API Error"
) -> Union[T, Dict[str, Any]]:
    """
    Handle the result from a safe_api_call wrapped function
    
    Args:
        result: The result dictionary from safe_api_call
        default_value: Default value to return if the API call failed
        error_prefix: Prefix for error message
        
    Returns:
        Either the successful data or an error dictionary
    """
    if result.get("success", False):
        return result.get("data", default_value)
    else:
        error_msg = f"{error_prefix}: {result.get('error', 'Unknown error')}"
        logger.error(error_msg)
        return {
            "error": error_msg,
            "data": default_value
        }