SatyamPrakash09's picture
Feat: Add seperate function to create refresh and access tokens
d610d41 unverified
"""
JWT authentication β€” register, login, and token verification.
"""
from datetime import datetime, timedelta, timezone
from typing import Optional
import jwt
import bcrypt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.config import get_settings
from app.database import get_db
from app.models import User
settings = get_settings()
security = HTTPBearer()
# ── Password Hashing ─────────────────────────────────
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
# ── JWT Token ────────────────────────────────────────
def create_access_token(user_id: str) -> str:
"""Create a JWT access token with user_id as the subject."""
payload = {
"sub": user_id,
"type": "access",
"exp": datetime.now(timezone.utc) + timedelta(minutes=settings.JWT_ACCESS_EXPIRY_MINUTES),
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
def create_refresh_token(user_id: str) -> str:
"""Create a JWT refresh token with user_id as the subject."""
payload = {
"sub": user_id,
"type": "refresh",
"exp": datetime.now(timezone.utc) + timedelta(days=settings.JWT_REFRESH_EXPIRY_DAYS),
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
def decode_token(token: str, token_type: str = "access") -> Optional[str]:
"""Decode JWT and return user_id, or None if invalid."""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
if payload.get("type") != token_type:
return None
return payload.get("sub")
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
# ── FastAPI Dependencies ─────────────────────────────
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
"""Dependency: extract and validate user from JWT bearer token."""
token = credentials.credentials
user_id = decode_token(token)
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
return user
def get_admin_user(user: User = Depends(get_current_user)) -> User:
"""Dependency: require admin privileges."""
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return user