File size: 6,150 Bytes
20adca1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import asyncio
import functools
from typing import Type, Tuple, Optional, TypeVar, Callable, Any
from ..utils.logger import logger

T = TypeVar('T')

def with_retry(

    max_retries: int = 3,

    delay: float = 1.0,

    backoff_factor: float = 2.0,

    exceptions: Tuple[Type[Exception], ...] = (Exception,)

) -> Callable:
    """

    Decorator that implements retry logic with exponential backoff

    

    Args:

        max_retries (int): Maximum number of retry attempts

        delay (float): Initial delay between retries in seconds

        backoff_factor (float): Multiplier for delay after each retry

        exceptions (tuple): Tuple of exceptions to catch and retry on

    """
    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            last_exception = None
            current_delay = delay

            # Initial attempt plus retries
            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    
                    # Don't sleep on the last attempt
                    if attempt < max_retries:
                        logger.warning(
                            f"Attempt {attempt + 1} failed for {func.__name__}: {str(e)}. "
                            f"Retrying in {current_delay} seconds..."
                        )
                        await asyncio.sleep(current_delay)
                        current_delay *= backoff_factor
                    else:
                        logger.error(
                            f"All {max_retries + 1} attempts failed for {func.__name__}: {str(e)}"
                        )

            raise last_exception

        return wrapper
    return decorator

def retry_with_backoff(

    max_retries: int = 3,

    initial_delay: float = 1.0,

    max_delay: float = 30.0,

    backoff_factor: float = 2.0,

    exceptions: Optional[Tuple[Type[Exception], ...]] = None

) -> Callable:
    """

    More advanced retry decorator with capped exponential backoff and jitter

    

    Args:

        max_retries (int): Maximum number of retry attempts

        initial_delay (float): Initial delay between retries in seconds

        max_delay (float): Maximum delay between retries in seconds

        backoff_factor (float): Multiplier for delay after each retry

        exceptions (tuple): Tuple of exceptions to catch and retry on

    """
    if exceptions is None:
        exceptions = (Exception,)

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            retry_count = 0
            current_delay = initial_delay
            operation_name = func.__name__

            while True:
                try:
                    return await func(*args, **kwargs)

                except exceptions as e:
                    retry_count += 1
                    if retry_count > max_retries:
                        logger.error(
                            f"Operation {operation_name} failed after {max_retries} retries: {str(e)}"
                        )
                        raise

                    # Add jitter to prevent thundering herd
                    jitter = (asyncio.get_event_loop().time() * 1000) % 1.0
                    sleep_time = min(current_delay + jitter, max_delay)

                    logger.warning(
                        f"Operation {operation_name} failed (attempt {retry_count}/{max_retries}): "
                        f"{str(e)}. Retrying in {sleep_time:.2f} seconds..."
                    )

                    await asyncio.sleep(sleep_time)
                    current_delay = min(current_delay * backoff_factor, max_delay)

        return wrapper
    return decorator

def circuit_breaker(

    failure_threshold: int = 5,

    reset_timeout: float = 60.0

) -> Callable:
    """

    Circuit breaker decorator to prevent repeated calls to failing services

    

    Args:

        failure_threshold (int): Number of failures before opening circuit

        reset_timeout (float): Time in seconds before attempting to close circuit

    """
    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        # State for the circuit breaker
        state = {
            'failures': 0,
            'last_failure_time': 0,
            'is_open': False
        }

        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            current_time = asyncio.get_event_loop().time()

            # Check if circuit is open
            if state['is_open']:
                if current_time - state['last_failure_time'] > reset_timeout:
                    # Try to close the circuit
                    state['is_open'] = False
                    state['failures'] = 0
                else:
                    raise Exception(
                        f"Circuit breaker is open for {func.__name__}. "
                        f"Try again in {reset_timeout - (current_time - state['last_failure_time']):.1f} seconds"
                    )

            try:
                result = await func(*args, **kwargs)
                # Success - reset failure count
                state['failures'] = 0
                return result

            except Exception as e:
                # Record failure
                state['failures'] += 1
                state['last_failure_time'] = current_time

                # Check if we need to open the circuit
                if state['failures'] >= failure_threshold:
                    state['is_open'] = True
                    logger.error(
                        f"Circuit breaker opened for {func.__name__} after {failure_threshold} failures"
                    )

                raise

        return wrapper
    return decorator