Naveedtechlab's picture
feat: Update with all 12 gap fixes - SQLModel, Better Auth, OpenAI Agents, Urdu, Voice
25b2e6b
"""Authentication utilities for JWT and password management.
Uses BETTER_AUTH_SECRET for JWT verification (shared secret with Better Auth frontend).
"""
import os
from datetime import datetime, timedelta
import bcrypt
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from dotenv import load_dotenv
load_dotenv()
# JWT configuration - uses BETTER_AUTH_SECRET (shared with Better Auth frontend)
# Falls back to JWT_SECRET for backwards compatibility
BETTER_AUTH_SECRET = os.getenv("BETTER_AUTH_SECRET") or os.getenv("JWT_SECRET")
if not BETTER_AUTH_SECRET:
raise ValueError("BETTER_AUTH_SECRET (or JWT_SECRET) environment variable must be set")
# Remove quotes if present
if BETTER_AUTH_SECRET.startswith('"') and BETTER_AUTH_SECRET.endswith('"'):
BETTER_AUTH_SECRET = BETTER_AUTH_SECRET[1:-1]
JWT_ALGORITHM = os.getenv("JWT_ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_HOURS = int(os.getenv("ACCESS_TOKEN_EXPIRE_HOURS", "24"))
# HTTP Bearer token scheme
security = HTTPBearer()
def hash_password(password: str) -> str:
"""Hash a password using bcrypt."""
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password_bytes, salt)
return hashed.decode('utf-8')
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
password_bytes = plain_password.encode('utf-8')
hashed_bytes = hashed_password.encode('utf-8')
return bcrypt.checkpw(password_bytes, hashed_bytes)
def create_access_token(user_id: int, email: str) -> str:
"""Create JWT access token with user_id and email."""
expire = datetime.utcnow() + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode = {
"sub": str(user_id),
"email": email,
"exp": expire
}
encoded_jwt = jwt.encode(to_encode, BETTER_AUTH_SECRET, algorithm=JWT_ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> dict:
"""Verify JWT token and return payload. Supports Better Auth JWT plugin tokens."""
try:
payload = jwt.decode(token, BETTER_AUTH_SECRET, algorithms=[JWT_ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> int:
"""Dependency to extract current user ID from JWT token."""
token = credentials.credentials
payload = verify_token(token)
user_id = int(payload.get("sub"))
return user_id