File size: 2,247 Bytes
de0f1ef | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | """CSRF Protection Middleware."""
import secrets
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from app.core.config import settings
class CSRFMiddleware(BaseHTTPMiddleware):
def __init__(self, app, exempt_paths: list[str] | None = None, cookie_name: str = "csrf_token"):
super().__init__(app)
self.exempt_paths = exempt_paths or []
self.cookie_name = cookie_name
self.header_name = "X-CSRF-Token"
async def dispatch(self, request: Request, call_next):
# Safe methods — تعيين الـ cookie إذا لم يكن موجوداً
if request.method in ("GET", "HEAD", "OPTIONS"):
response = await call_next(request)
if self.cookie_name not in request.cookies:
token = secrets.token_urlsafe(32)
response.set_cookie(
self.cookie_name,
token,
httponly=False,
secure=settings.ENV == "production",
samesite="lax",
max_age=86400,
path="/",
)
return response
# المسارات المعفاة
for path in self.exempt_paths:
if request.url.path.startswith(path):
return await call_next(request)
# ✅ Next.js Server Actions — user-agent: node
user_agent = request.headers.get("user-agent", "")
if user_agent == "node":
return await call_next(request)
# التحقق من الـ CSRF token
cookie_token = request.cookies.get(self.cookie_name)
header_token = request.headers.get(self.header_name)
if not cookie_token or not header_token:
return JSONResponse(
status_code=403,
content={"error": "CSRF_MISSING", "detail": "CSRF token missing"},
)
if not secrets.compare_digest(cookie_token, header_token):
return JSONResponse(
status_code=403,
content={"error": "CSRF_INVALID", "detail": "CSRF token invalid"},
)
return await call_next(request) |