Spaces:
Running
Running
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| import bcrypt | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from jose import JWTError, jwt | |
| from app.core.config import settings | |
| bearer_scheme = HTTPBearer() | |
| # ββ Password helpers ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def hash_password(plain: str) -> str: | |
| return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode() | |
| def verify_password(plain: str, hashed: str) -> bool: | |
| return bcrypt.checkpw(plain.encode(), hashed.encode()) | |
| # ββ JWT helpers βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_token(username: str) -> str: | |
| expire = datetime.utcnow() + timedelta(days=settings.JWT_EXPIRE_DAYS) | |
| payload = {"sub": username, "exp": expire} | |
| return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) | |
| def decode_token(token: str) -> Optional[str]: | |
| """Return username from a valid token, or None.""" | |
| try: | |
| payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) | |
| return payload.get("sub") | |
| except JWTError: | |
| return None | |
| # ββ FastAPI dependency ββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme)) -> str: | |
| """ | |
| Validates the Bearer token and returns the username (used as user_id). | |
| Raises 401 if the token is missing, expired, or invalid. | |
| """ | |
| username = decode_token(credentials.credentials) | |
| if not username: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired token", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return username | |