deepshield / services /auth_service.py
ar07xd's picture
Sync from GitHub via hub-sync
26f3f24 verified
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
import bcrypt
from jose import JWTError, jwt
from loguru import logger
from sqlalchemy.orm import Session
from config import settings
from db.models import User
def _encode_pw(plain: str) -> bytes:
# bcrypt truncates to 72 bytes silently in some builds and hard-errors in others.
# Truncate explicitly so behavior is deterministic across versions.
return plain.encode("utf-8")[:72]
def hash_password(plain: str) -> str:
return bcrypt.hashpw(_encode_pw(plain), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
try:
return bcrypt.checkpw(_encode_pw(plain), hashed.encode("utf-8"))
except Exception as exc:
logger.warning(f"Password verification failed due to malformed hash: {exc}")
return False
def create_access_token(user_id: int, email: str) -> str:
now = datetime.now(timezone.utc)
payload = {
"sub": str(user_id),
"email": email,
"iat": int(now.timestamp()),
"exp": int((now + timedelta(minutes=settings.JWT_EXPIRATION_MINUTES)).timestamp()),
}
return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
def decode_token(token: str) -> dict[str, Any] | None:
try:
return jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
except JWTError:
return None
def register_user(db: Session, email: str, password: str, name: str | None) -> User:
email = email.strip().lower()
user = User(email=email, password_hash=hash_password(password), name=(name or None))
db.add(user)
db.commit()
db.refresh(user)
return user
def authenticate(db: Session, email: str, password: str) -> User | None:
email = email.strip().lower()
user = db.query(User).filter(User.email == email).first()
if not user or not verify_password(password, user.password_hash):
return None
return user
def get_user(db: Session, user_id: int) -> User | None:
return db.query(User).filter(User.id == user_id).first()