Spaces:
Build error
Build error
File size: 4,632 Bytes
e6ec780 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
import logging
from typing import Any, Dict
from datetime import datetime
from fastapi import Request, HTTPException, status
from sqlalchemy import select
from google_auth_service.fastapi_hooks import AuthHooks
from core.database import async_session_maker
from core.dependencies import check_rate_limit
from services.audit_service import AuditService
from core.models import ClientUser, User
logger = logging.getLogger(__name__)
class CoreAuthHooks(AuthHooks):
"""
Custom authentication hooks for API Gateway.
Handles: Rate Limiting, Audit Logging, Client User Linking, and Backups.
"""
async def before_login(self, request: Request):
"""Rate Limit Check"""
ip = request.client.host
async with async_session_maker() as db:
if not await check_rate_limit(db, ip, "/auth/google", 10, 1):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Too many authentication attempts"
)
async def on_login_success(self, user: Any, tokens: Dict[str, str], request: Request, is_new_user: bool = False):
"""Audit Log, Link Client, Trigger Backup"""
ip = request.client.host
# Try to retrieve body (FastAPI/Starlette caches .json() result)
login_data = {}
try:
login_data = await request.json()
except Exception:
pass
temp_user_id = login_data.get("temp_user_id")
async with async_session_maker() as db:
# 1. Link Client User if temp_user_id provided
if temp_user_id:
# Check if this client mapping exists
client_query = select(ClientUser).where(
ClientUser.user_id == user.id,
ClientUser.client_user_id == temp_user_id
)
client_result = await db.execute(client_query)
existing_client = client_result.scalar_one_or_none()
if not existing_client:
# Create new client user mapping
client_user = ClientUser(
user_id=user.id,
client_user_id=temp_user_id,
ip_address=ip,
last_seen_at=datetime.utcnow()
)
db.add(client_user)
else:
# Update last seen
existing_client.last_seen_at = datetime.utcnow()
# Commit is needed for ClientUser changes
await db.commit()
# 2. Log Success
await AuditService.log_event(
db=db,
log_type="server",
user_id=user.id,
client_user_id=temp_user_id,
action="google_auth",
status="success",
request=request
)
await db.commit()
# 3. Trigger Backup
from services.backup_service import get_backup_service
backup_service = get_backup_service()
await backup_service.backup_async()
async def on_login_error(self, error: Exception, request: Request):
"""Audit Log Failure"""
async with async_session_maker() as db:
await AuditService.log_event(
db=db,
log_type="server",
action="google_auth",
status="failed",
error_message=str(error),
request=request
)
async def on_logout(self, user: Any, request: Request):
"""Log Logout, Backup"""
async with async_session_maker() as db:
if user:
# Need user.id (int) or user_id (str)?
# User object from library `get` is a Dict in test, but `User` model in prod?
# Wait, `get` returns what `UserStore.save` returns.
# apigateway's UserStore will return SQLAlchemy model `User`.
# So user.id is valid.
await AuditService.log_event(
db=db,
log_type="server",
user_id=user.id,
action="logout",
status="success",
request=request
)
from services.backup_service import get_backup_service
backup_service = get_backup_service()
await backup_service.backup_async()
|