apigateway / core /dependencies /rate_limit.py
jebin2's picture
ref
a42ab7e
"""
Rate Limiting Dependencies
Functions for checking and enforcing rate limits on API endpoints.
"""
import logging
from datetime import datetime, timedelta
from sqlalchemy import select, and_
from sqlalchemy.ext.asyncio import AsyncSession
from core.models import RateLimit
logger = logging.getLogger(__name__)
async def check_rate_limit(
db: AsyncSession,
identifier: str,
endpoint: str,
limit: int,
window_minutes: int
) -> bool:
"""
Check if request is within rate limits.
Returns True if allowed, False if limit exceeded.
"""
now = datetime.utcnow()
window_start = now - timedelta(minutes=window_minutes)
# Check existing limit (get most recent if multiple exist)
query = select(RateLimit).where(
and_(
RateLimit.identifier == identifier,
RateLimit.endpoint == endpoint,
RateLimit.window_start >= window_start
)
).order_by(RateLimit.window_start.desc())
result = await db.execute(query)
rate_limit = result.scalars().first()
if rate_limit:
if rate_limit.attempts >= limit:
return False
# Increment attempts
rate_limit.attempts += 1
await db.commit()
return True
else:
# Create new rate limit record
new_limit = RateLimit(
identifier=identifier,
endpoint=endpoint,
attempts=1,
window_start=now,
expires_at=now + timedelta(minutes=window_minutes)
)
db.add(new_limit)
await db.commit()
return True