Spaces:
Build error
Build error
| """ | |
| JWT authentication β register, login, and token verification. | |
| """ | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional | |
| import jwt | |
| import bcrypt | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from sqlalchemy.orm import Session | |
| from app.config import get_settings | |
| from app.database import get_db | |
| from app.models import User | |
| settings = get_settings() | |
| security = HTTPBearer() | |
| # ββ Password Hashing βββββββββββββββββββββββββββββββββ | |
| def hash_password(password: str) -> str: | |
| return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") | |
| def verify_password(plain: str, hashed: str) -> bool: | |
| return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) | |
| # ββ JWT Token ββββββββββββββββββββββββββββββββββββββββ | |
| def create_access_token(user_id: str) -> str: | |
| """Create a JWT access token with user_id as the subject.""" | |
| payload = { | |
| "sub": user_id, | |
| "type": "access", | |
| "exp": datetime.now(timezone.utc) + timedelta(minutes=settings.JWT_ACCESS_EXPIRY_MINUTES), | |
| "iat": datetime.now(timezone.utc), | |
| } | |
| return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.JWT_ALGORITHM) | |
| def create_refresh_token(user_id: str) -> str: | |
| """Create a JWT refresh token with user_id as the subject.""" | |
| payload = { | |
| "sub": user_id, | |
| "type": "refresh", | |
| "exp": datetime.now(timezone.utc) + timedelta(days=settings.JWT_REFRESH_EXPIRY_DAYS), | |
| "iat": datetime.now(timezone.utc), | |
| } | |
| return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.JWT_ALGORITHM) | |
| def decode_token(token: str, token_type: str = "access") -> Optional[str]: | |
| """Decode JWT and return user_id, or None if invalid.""" | |
| try: | |
| payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) | |
| if payload.get("type") != token_type: | |
| return None | |
| return payload.get("sub") | |
| except jwt.ExpiredSignatureError: | |
| return None | |
| except jwt.InvalidTokenError: | |
| return None | |
| # ββ FastAPI Dependencies βββββββββββββββββββββββββββββ | |
| def get_current_user( | |
| credentials: HTTPAuthorizationCredentials = Depends(security), | |
| db: Session = Depends(get_db), | |
| ) -> User: | |
| """Dependency: extract and validate user from JWT bearer token.""" | |
| token = credentials.credentials | |
| user_id = decode_token(token) | |
| if not user_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired token", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| user = db.query(User).filter(User.id == user_id).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="User not found", | |
| ) | |
| return user | |
| def get_admin_user(user: User = Depends(get_current_user)) -> User: | |
| """Dependency: require admin privileges.""" | |
| if not user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Admin access required", | |
| ) | |
| return user | |