from datetime import datetime, timedelta, timezone from jose import JWTError, jwt import bcrypt as _bcrypt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from backend.config import JWT_SECRET, JWT_ALGORITHM, JWT_EXPIRATION_HOURS security = HTTPBearer(auto_error=False) def hash_password(password: str) -> str: return _bcrypt.hashpw(password.encode("utf-8"), _bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: return _bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) def create_access_token(user_id: int, email: str, is_admin: bool = False) -> str: exp = datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRATION_HOURS) return jwt.encode( {"sub": str(user_id), "email": email, "is_admin": is_admin, "exp": exp, "iat": datetime.now(timezone.utc)}, JWT_SECRET, algorithm=JWT_ALGORITHM, ) def decode_token(token: str) -> dict: try: payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) return {"user_id": int(payload["sub"]), "email": payload["email"], "is_admin": payload.get("is_admin", False)} except (JWTError, KeyError): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token") async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict: if credentials is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") return decode_token(credentials.credentials) async def optional_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict | None: if credentials is None: return None try: return decode_token(credentials.credentials) except Exception: return None