Spaces:
Running
Running
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional | |
| from fastapi import Depends, HTTPException, status, Request | |
| from jose import JWTError, jwt | |
| from passlib.context import CryptContext | |
| from sqlalchemy.orm import Session | |
| from app.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES | |
| from app.database import get_db | |
| from app.models import User | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| def hash_password(password: str) -> str: | |
| return pwd_context.hash(password) | |
| def verify_password(plain: str, hashed: str) -> bool: | |
| return pwd_context.verify(plain, hashed) | |
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| to_encode = data.copy() | |
| expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| def decode_token(token: str) -> Optional[dict]: | |
| try: | |
| return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| except JWTError as e: | |
| print(f"[AUTH] Token decode failed: {e}") | |
| return None | |
| def get_current_user(request: Request, db: Session = Depends(get_db)) -> User: | |
| token = request.query_params.get("token") | |
| if not token: | |
| token = request.cookies.get("access_token") | |
| if not token: | |
| auth = request.headers.get("authorization", "") | |
| if auth.startswith("Bearer "): | |
| token = auth[7:] | |
| if not token: | |
| raise HTTPException(status_code=401, detail="Not authenticated") | |
| payload = decode_token(token) | |
| if not payload: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| user_id = int(payload.get("sub")) | |
| user = db.query(User).filter(User.id == user_id).first() | |
| if not user or not user.is_active: | |
| raise HTTPException(status_code=401, detail="User not found") | |
| return user | |
| def require_admin(current_user: User = Depends(get_current_user)) -> User: | |
| if current_user.role not in ("admin", "superadmin"): | |
| raise HTTPException(status_code=403, detail="Admin access required") | |
| return current_user | |
| def require_superadmin(current_user: User = Depends(get_current_user)) -> User: | |
| if current_user.role != "superadmin": | |
| raise HTTPException(status_code=403, detail="Super Admin access required") | |
| return current_user | |