Spaces:
Sleeping
Sleeping
| # log.py | |
| """ | |
| Auth router for cookie-based JWT authentication (form-data). | |
| - POST /api/auth/signup -> form: name, email, password; sets JWT cookie | |
| - POST /api/auth/login -> form: email, password; sets JWT cookie | |
| - POST /api/auth/logout -> clears JWT cookie | |
| - GET /api/auth/me -> current user from cookie | |
| Storage: | |
| - Uses Mongo collection 'log_details' via mongo_store.py helpers. | |
| Usage in app.py: | |
| from log import get_auth_router | |
| app.include_router(get_auth_router()) | |
| """ | |
| import os | |
| import uuid | |
| import jwt | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Dict, Any, Annotated | |
| from fastapi import APIRouter, HTTPException, Response, Request, Depends, status, Form | |
| from pydantic import BaseModel, EmailStr | |
| from passlib.context import CryptContext | |
| from pymongo.errors import DuplicateKeyError | |
| # Auth-specific Mongo helpers for log_details collection | |
| from mongo_store import ( | |
| get_user_by_email, | |
| get_user_by_id, | |
| insert_user, | |
| update_user, | |
| ) | |
| # ================= | |
| # CONFIG | |
| # ================= | |
| ALGORITHM = "HS256" | |
| JWT_SECRET = os.getenv("JWT_SECRET", "dev-secret-change-me") # set in env for production | |
| ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("JWT_EXPIRE_MINUTES", "60")) | |
| JWT_COOKIE_NAME = os.getenv("JWT_COOKIE_NAME", "access_token") | |
| # For cross-site setups: | |
| # - COOKIE_SAMESITE="none" and COOKIE_SECURE=true (HTTPS required) | |
| COOKIE_SAMESITE = os.getenv("COOKIE_SAMESITE", "lax") # "lax" | "strict" | "none" | |
| COOKIE_SECURE = os.getenv("COOKIE_SECURE", "true").lower() == "true" | |
| # Use PBKDF2-SHA256 to avoid bcrypt's 72-byte limit and backend quirks | |
| # Rounds ~310k+ is a solid default; adjust if you need faster hashing. | |
| pwd_context = CryptContext( | |
| schemes=["pbkdf2_sha256"], | |
| deprecated="auto", | |
| pbkdf2_sha256__rounds=int(os.getenv("PBKDF2_ROUNDS", "310000")), | |
| ) | |
| # ================= | |
| # RESPONSE SCHEMAS | |
| # ================= | |
| class UserOut(BaseModel): | |
| id: str | |
| name: str | |
| email: EmailStr | |
| # ================= | |
| # HELPERS | |
| # ================= | |
| def create_access_token(sub: str, email: str, minutes: int = ACCESS_TOKEN_EXPIRE_MINUTES) -> str: | |
| now = datetime.now(timezone.utc) | |
| exp = now + timedelta(minutes=minutes) | |
| payload = { | |
| "sub": sub, | |
| "email": email, | |
| "type": "access", | |
| "iat": int(now.timestamp()), | |
| "exp": int(exp.timestamp()), | |
| } | |
| return jwt.encode(payload, JWT_SECRET, algorithm=ALGORITHM) | |
| def set_auth_cookie(response: Response, token: str): | |
| max_age = ACCESS_TOKEN_EXPIRE_MINUTES * 60 | |
| response.set_cookie( | |
| key=JWT_COOKIE_NAME, | |
| value=token, | |
| max_age=max_age, | |
| expires=max_age, | |
| path="/", | |
| secure=COOKIE_SECURE, | |
| httponly=True, | |
| samesite=COOKIE_SAMESITE, | |
| ) | |
| def clear_auth_cookie(response: Response): | |
| response.delete_cookie(key=JWT_COOKIE_NAME, path="/") | |
| def verify_password(plain: str, hashed: str) -> bool: | |
| return pwd_context.verify(plain, hashed) | |
| def hash_password(plain: str) -> str: | |
| return pwd_context.hash(plain) | |
| # ================= | |
| # ROUTER | |
| # ================= | |
| def get_auth_router() -> APIRouter: | |
| router = APIRouter(prefix="/api/auth", tags=["auth"]) | |
| # Dependency to get current user from cookie | |
| def get_current_user(request: Request) -> Dict[str, Any]: | |
| token = request.cookies.get(JWT_COOKIE_NAME) | |
| if not token: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") | |
| try: | |
| payload = jwt.decode(token, JWT_SECRET, algorithms=[ALGORITHM]) | |
| except jwt.ExpiredSignatureError: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired") | |
| except jwt.InvalidTokenError: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") | |
| user_id = payload.get("sub") | |
| if not user_id: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload") | |
| user = get_user_by_id(user_id) | |
| if not user: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") | |
| return {"id": user["id"], "name": user["name"], "email": user["email"]} | |
| # ------------- | |
| # SIGNUP (form-data) | |
| # ------------- | |
| def signup( | |
| response: Response, | |
| name: Annotated[str, Form(min_length=2, max_length=80)], | |
| email: Annotated[EmailStr, Form()], | |
| password: Annotated[str, Form(min_length=8, max_length=1024)], | |
| ): | |
| email_norm = email.strip().lower() | |
| name_norm = name.strip() | |
| existing = get_user_by_email(email_norm) | |
| if existing: | |
| raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Email already registered") | |
| try: | |
| pwd_hash = hash_password(password) | |
| except Exception as e: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid password: {e}") | |
| user_doc = { | |
| "id": str(uuid.uuid4()), | |
| "name": name_norm, | |
| "email": email_norm, | |
| "password_hash": pwd_hash, | |
| "created_at": datetime.now(timezone.utc), | |
| "updated_at": datetime.now(timezone.utc), | |
| "last_login_at": None, | |
| } | |
| try: | |
| insert_user(user_doc) | |
| except DuplicateKeyError: | |
| raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Email already registered") | |
| token = create_access_token(sub=user_doc["id"], email=user_doc["email"]) | |
| set_auth_cookie(response, token) | |
| return {"id": user_doc["id"], "name": user_doc["name"], "email": user_doc["email"]} | |
| # ------------- | |
| # LOGIN (form-data) | |
| # ------------- | |
| def login( | |
| response: Response, | |
| email: Annotated[EmailStr, Form()], | |
| password: Annotated[str, Form(min_length=1, max_length=1024)], | |
| ): | |
| email_norm = email.strip().lower() | |
| user = get_user_by_email(email_norm) | |
| if not user or not verify_password(password, user.get("password_hash", "")): | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") | |
| token = create_access_token(sub=user["id"], email=user["email"]) | |
| set_auth_cookie(response, token) | |
| # best-effort update timestamps | |
| try: | |
| now = datetime.now(timezone.utc) | |
| update_user(user["id"], {"last_login_at": now, "updated_at": now}) | |
| except Exception: | |
| pass | |
| return {"id": user["id"], "name": user["name"], "email": user["email"]} | |
| # ------------- | |
| # LOGOUT | |
| # ------------- | |
| def logout(response: Response): | |
| clear_auth_cookie(response) | |
| return {"ok": True} | |
| # ------------- | |
| # CURRENT USER | |
| # ------------- | |
| def me(current_user: Dict[str, Any] = Depends(get_current_user)): | |
| return current_user | |
| return router |