CandidateExplorer / utils /decorator.py
ishaq101's picture
[KM-383] [CEX] [AI] Deployment AI Engine / BE
f3bdba1
raw
history blame
2.97 kB
import asyncio
import asyncpg
import inspect
import time
from functools import wraps
from typing import Callable, Any
from sqlalchemy.exc import OperationalError, InterfaceError, PendingRollbackError
def trace_runtime(func: Callable) -> Callable:
@wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
# This wrapper runs if the original func was async
start_time = time.perf_counter()
# Await the coroutine returned by func(*args, **kwargs)
result = await func(*args, **kwargs)
end_time = time.perf_counter()
duration = end_time - start_time
print(f"⏱️ ASYNC Function '{func.__name__}' took {duration:.6f} seconds")
return result
@wraps(func)
def sync_wrapper(*args, **kwargs) -> Any:
# This wrapper runs if the original func was sync
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
duration = end_time - start_time
print(f"⏱️ SYNC Function '{func.__name__}' took {duration:.6f} seconds")
return result
# Check if the function being decorated is an async function
if inspect.iscoroutinefunction(func):
# If it's async, return the async wrapper
return async_wrapper
else:
# If it's sync, return the sync wrapper
return sync_wrapper
def retry_db(
retries: int = 3,
delay: float = 2.0,
backoff: float = 2.0,
) -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
current_delay = delay
for attempt in range(1, retries + 1):
try:
return await func(*args, **kwargs)
except (
OperationalError,
InterfaceError,
PendingRollbackError, # πŸ‘ˆ Add this
asyncpg.exceptions.PostgresConnectionError,
asyncpg.exceptions.CannotConnectNowError,
ConnectionError,
TimeoutError,
) as e:
if attempt == retries:
raise
print(
f"πŸ” Retry {attempt}/{retries} for '{func.__name__}' "
f"after {current_delay:.2f}s due to: {type(e).__name__}"
)
# πŸ‘‡ Roll back the broken session before retrying
db = args[0] if args else kwargs.get("db")
if db is not None:
try:
await db.rollback()
except Exception:
pass # If rollback itself fails, just continue
await asyncio.sleep(current_delay)
current_delay *= backoff
return async_wrapper
return decorator