from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Any import bcrypt from jose import JWTError, jwt from loguru import logger from sqlalchemy.orm import Session from config import settings from db.models import User def _encode_pw(plain: str) -> bytes: # bcrypt truncates to 72 bytes silently in some builds and hard-errors in others. # Truncate explicitly so behavior is deterministic across versions. return plain.encode("utf-8")[:72] def hash_password(plain: str) -> str: return bcrypt.hashpw(_encode_pw(plain), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: try: return bcrypt.checkpw(_encode_pw(plain), hashed.encode("utf-8")) except Exception as exc: logger.warning(f"Password verification failed due to malformed hash: {exc}") return False def create_access_token(user_id: int, email: str) -> str: now = datetime.now(timezone.utc) payload = { "sub": str(user_id), "email": email, "iat": int(now.timestamp()), "exp": int((now + timedelta(minutes=settings.JWT_EXPIRATION_MINUTES)).timestamp()), } return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) def decode_token(token: str) -> dict[str, Any] | None: try: return jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) except JWTError: return None def register_user(db: Session, email: str, password: str, name: str | None) -> User: email = email.strip().lower() user = User(email=email, password_hash=hash_password(password), name=(name or None)) db.add(user) db.commit() db.refresh(user) return user def authenticate(db: Session, email: str, password: str) -> User | None: email = email.strip().lower() user = db.query(User).filter(User.email == email).first() if not user or not verify_password(password, user.password_hash): return None return user def get_user(db: Session, user_id: int) -> User | None: return db.query(User).filter(User.id == user_id).first()