Gaykar's picture
changes
33ffec6
Raw
History Blame Contribute Delete
2.6 kB
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
import hashlib
import base64
from app.core.config import settings
from app.database.connection import get_session
from app.database.models import User
# CONFIG
SECRET_KEY = settings.SECRET_KEY
ALGORITHM = settings.ALGORITHM
ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# auto_error=False → allows us to return 401 instead of FastAPI's 403
security = HTTPBearer(auto_error=False)
# TOKEN LOGIC
def create_access_token(data: dict) -> str:
"""
Create a JWT access token with UTC-aware expiration
"""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(
minutes=ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({
"exp": expire,
"sub": str(data.get("id")) # JWT best practice
})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
"""
Verify JWT token and return payload
"""
if credentials is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)
try:
token = credentials.credentials
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("id")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload"
)
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token"
)
# CURRENT USER DEPENDENCY
async def get_current_user(
token_data: dict = Depends(verify_token),
db: Session = Depends(get_session)
) -> User:
"""
Fetch the logged-in user from DB using JWT payload
"""
user_id = token_data.get("id")
user = (
db.query(User)
.filter(User.id == user_id)
.first()
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
return user
# PASSWORD LOGIC (bcrypt-safe, unlimited length)