import os import jwt from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from .db import SessionLocal from .models import User # JWT Configuration SECRET_KEY = os.environ.get("JWT_SECRET_KEY", "your-secret-key-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days security = HTTPBearer() def get_db(): """Database dependency.""" db = SessionLocal() try: yield db finally: db.close() def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Create a JWT access token.""" to_encode = data.copy() # Ensure 'sub' (subject) is a string, not an integer if "sub" in to_encode: to_encode["sub"] = str(to_encode["sub"]) if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_token(token: str) -> dict: """Verify and decode a JWT token.""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", ) except jwt.InvalidTokenError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", ) def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ) -> User: """Get the current authenticated user from JWT token.""" token = credentials.credentials payload = verify_token(token) user_id: int = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", ) # Convert user_id back to integer for database query try: user_id_int = int(user_id) except (ValueError, TypeError): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid user ID in token", ) user = db.query(User).filter(User.id == user_id_int).first() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", ) return user