Spotix-API / backend /app /services /auth_service.py
Anish-530
Added email code verif
c041d0d
import secrets
from datetime import timedelta, datetime, UTC
from sqlalchemy.orm import Session
from app.models.user_model import User
from app.utils.security import verify_password, hash_password
from app.utils.jwt_handler import create_access_token
from fastapi import Request
from app.services.audit_service import log_audit_event
def login_user(db: Session, identifier: str, password: str, request: Request):
user = db.query(User).filter(
(User.email == identifier) | (User.username == identifier)
).first()
if not user:
log_audit_event(db, "login_failed", None, request, {"identifier_attempted": identifier, "reason": "user_not_found"})
return None
if user.locked_until:
if datetime.now(UTC).replace(tzinfo=None) < user.locked_until:
time_left = user.locked_until - datetime.now(UTC).replace(tzinfo=None)
minutes_left = int(time_left.total_seconds() / 60) + 1
log_audit_event(db, "login_blocked", user.id, request, {"reason": "account_locked", "minutes_left": minutes_left})
raise ValueError(f"locked_{minutes_left}")
else:
user.locked_until = None
user.failed_attempts = 0
db.commit()
if not verify_password(password, user.password):
user.failed_attempts += 1
if user.failed_attempts >= 5:
user.locked_until = datetime.now(UTC).replace(tzinfo=None) + timedelta(minutes=15)
db.commit()
log_audit_event(db, "account_locked", user.id, request, {"reason": "max_failed_attempts"})
raise ValueError("locked_15")
db.commit()
attempts_left = 5 - user.failed_attempts
log_audit_event(db, "login_failed", user.id, request, {"reason": "bad_password", "attempts_left": attempts_left})
raise ValueError(f"attempt_{attempts_left}")
user.failed_attempts = 0
user.locked_until = None
db.commit()
if not user.is_verified:
log_audit_event(db, "login_blocked", user.id, request, {"reason": "unverified_email"})
raise ValueError("unverified")
token = create_access_token({"user_id": user.id})
log_audit_event(db, "login_success", user.id, request, {"provider": "local"})
return token
def request_password_reset(db: Session, email: str) -> str | None:
user = db.query(User).filter(User.email == email).first()
if not user:
return None
raw_token = secrets.token_urlsafe(32)
hashed_token = hash_password(raw_token)
expire_date = datetime.now(UTC) + timedelta(minutes=15)
user.reset_token_hash = hashed_token
user.reset_token_expire_at = expire_date
db.commit()
return raw_token
def confirm_password_reset(db: Session, token: str, new_password: str, request: Request) -> bool:
active_reset_users = db.query(User).filter(User.reset_token_hash.isnot(None)).all()
target_user = None
for user in active_reset_users:
if verify_password(token, user.reset_token_hash):
target_user = user
break
if not target_user:
return False
if datetime.now(UTC) > target_user.reset_token_expire_at.replace(tzinfo=UTC):
return False
target_user.password = hash_password(new_password)
target_user.reset_token_hash = None
target_user.reset_token_expire_at = None
db.commit()
log_audit_event(db, "password_reset", target_user.id, request)
return True
def confirm_email_verification(db: Session, email: str, code: str) -> User | None:
target_user = db.query(User).filter(User.email == email, User.is_verified == False).first()
if not target_user:
return None
if not target_user.verification_token_hash or not verify_password(code, target_user.verification_token_hash):
return None
if datetime.now(UTC) > target_user.verification_token_expire_at.replace(tzinfo=UTC):
return None
target_user.is_verified = True
target_user.verification_token_hash = None
target_user.verification_token_expire_at = None
db.commit()
return target_user
def resend_verification(db: Session, email: str) -> str | None:
user = db.query(User).filter(User.email == email).first()
if not user or user.is_verified:
return None
raw_token = f"{secrets.randbelow(900000) + 100000}"
hashed_token = hash_password(raw_token)
expire_date = datetime.now(UTC) + timedelta(minutes=10)
user.verification_token_hash = hashed_token
user.verification_token_expire_at = expire_date
db.commit()
return raw_token