AI Developer Agent
AI Developer Agent v1.0 backend
763ef0d
"""
Retry wrapper - executes a callable with exponential backoff and
optional classification-based repair callback.
"""
from __future__ import annotations
import time
import logging
from typing import Callable, Optional, Any
from .classifier import classify
logger = logging.getLogger("retry")
def retry_call(
fn: Callable[[], Any],
max_attempts: int = 3,
base_delay: float = 1.5,
on_error: Optional[Callable[[Exception, int], None]] = None,
repair_cb: Optional[Callable[[str], None]] = None,
) -> Any:
last_exc: Optional[Exception] = None
for attempt in range(1, max_attempts + 1):
try:
return fn()
except Exception as e:
last_exc = e
logger.warning("retry attempt %s failed: %s", attempt, e)
if on_error:
try:
on_error(e, attempt)
except Exception:
pass
if repair_cb:
try:
err_class = classify(str(e))
if err_class:
repair_cb(err_class.category)
except Exception:
pass
if attempt < max_attempts:
time.sleep(base_delay * (2 ** (attempt - 1)))
assert last_exc is not None
raise last_exc