Spaces:
Running
Running
| from __future__ import annotations | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Any | |
| import bcrypt | |
| from jose import JWTError, jwt | |
| from loguru import logger | |
| from sqlalchemy.orm import Session | |
| from config import settings | |
| from db.models import User | |
| def _encode_pw(plain: str) -> bytes: | |
| # bcrypt truncates to 72 bytes silently in some builds and hard-errors in others. | |
| # Truncate explicitly so behavior is deterministic across versions. | |
| return plain.encode("utf-8")[:72] | |
| def hash_password(plain: str) -> str: | |
| return bcrypt.hashpw(_encode_pw(plain), bcrypt.gensalt()).decode("utf-8") | |
| def verify_password(plain: str, hashed: str) -> bool: | |
| try: | |
| return bcrypt.checkpw(_encode_pw(plain), hashed.encode("utf-8")) | |
| except Exception as exc: | |
| logger.warning(f"Password verification failed due to malformed hash: {exc}") | |
| return False | |
| def create_access_token(user_id: int, email: str) -> str: | |
| now = datetime.now(timezone.utc) | |
| payload = { | |
| "sub": str(user_id), | |
| "email": email, | |
| "iat": int(now.timestamp()), | |
| "exp": int((now + timedelta(minutes=settings.JWT_EXPIRATION_MINUTES)).timestamp()), | |
| } | |
| return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) | |
| def decode_token(token: str) -> dict[str, Any] | None: | |
| try: | |
| return jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) | |
| except JWTError: | |
| return None | |
| def register_user(db: Session, email: str, password: str, name: str | None) -> User: | |
| email = email.strip().lower() | |
| user = User(email=email, password_hash=hash_password(password), name=(name or None)) | |
| db.add(user) | |
| db.commit() | |
| db.refresh(user) | |
| return user | |
| def authenticate(db: Session, email: str, password: str) -> User | None: | |
| email = email.strip().lower() | |
| user = db.query(User).filter(User.email == email).first() | |
| if not user or not verify_password(password, user.password_hash): | |
| return None | |
| return user | |
| def get_user(db: Session, user_id: int) -> User | None: | |
| return db.query(User).filter(User.id == user_id).first() | |