voiceforge / backend /app /core /security.py
lordofgaming
Initial VoiceForge deployment (clean)
673435a
"""
Security Utilities
Handles password hashing, JWT generation, and API key verification.
"""
from datetime import datetime, timedelta
from typing import Optional, Union, Any
from jose import jwt
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader
from fastapi import Depends, HTTPException, status
from sqlalchemy.orm import Session
from ..core.config import get_settings
from ..models import get_db, User, ApiKey
settings = get_settings()
# Password hashing (PBKDF2 is safer/easier on Windows than bcrypt sometimes)
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
# JWT configuration
SECRET_KEY = settings.secret_key
ALGORITHM = settings.algorithm
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes
# OAuth2 scheme - auto_error=False allows API key fallback
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/v1/auth/login", auto_error=False)
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(subject: Union[str, Any], expires_delta: timedelta = None) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> Optional[User]:
"""Validate JWT and return user. Returns None if token missing/invalid."""
if not token:
return None # Allow API key fallback
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
return None
except Exception:
return None
user = db.query(User).filter(User.id == int(user_id)).first()
return user
async def get_current_active_user(current_user: Optional[User] = Depends(get_current_user)) -> User:
"""Get current active user. Raises 401 if not authenticated."""
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def verify_api_key(
api_key: str = Depends(api_key_header),
db: Session = Depends(get_db)
) -> Optional[User]:
"""
Validate API key from X-API-Key header.
Returns the associated user if valid, else None (or raises if enforcing).
"""
if not api_key:
return None # Or raise if strict
key_record = db.query(ApiKey).filter(ApiKey.key == api_key, ApiKey.is_active == True).first()
if key_record:
# Update usage stats
key_record.last_used_at = datetime.utcnow()
db.commit()
return key_record.user
return None # Invalid key
def get_api_user_or_jwt_user(
api_key_user: Optional[User] = Depends(verify_api_key),
jwt_user: Optional[User] = Depends(get_current_user)
) -> User:
"""Allow access via either API Key or JWT"""
if api_key_user:
return api_key_user
if jwt_user:
return jwt_user
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)