import secrets from datetime import timedelta, datetime, UTC from sqlalchemy.orm import Session from app.models.user_model import User from app.utils.security import verify_password, hash_password from app.utils.jwt_handler import create_access_token from fastapi import Request from app.services.audit_service import log_audit_event def login_user(db: Session, identifier: str, password: str, request: Request): user = db.query(User).filter( (User.email == identifier) | (User.username == identifier) ).first() if not user: log_audit_event(db, "login_failed", None, request, {"identifier_attempted": identifier, "reason": "user_not_found"}) return None if user.locked_until: if datetime.now(UTC).replace(tzinfo=None) < user.locked_until: time_left = user.locked_until - datetime.now(UTC).replace(tzinfo=None) minutes_left = int(time_left.total_seconds() / 60) + 1 log_audit_event(db, "login_blocked", user.id, request, {"reason": "account_locked", "minutes_left": minutes_left}) raise ValueError(f"locked_{minutes_left}") else: user.locked_until = None user.failed_attempts = 0 db.commit() if not verify_password(password, user.password): user.failed_attempts += 1 if user.failed_attempts >= 5: user.locked_until = datetime.now(UTC).replace(tzinfo=None) + timedelta(minutes=15) db.commit() log_audit_event(db, "account_locked", user.id, request, {"reason": "max_failed_attempts"}) raise ValueError("locked_15") db.commit() attempts_left = 5 - user.failed_attempts log_audit_event(db, "login_failed", user.id, request, {"reason": "bad_password", "attempts_left": attempts_left}) raise ValueError(f"attempt_{attempts_left}") user.failed_attempts = 0 user.locked_until = None db.commit() if not user.is_verified: log_audit_event(db, "login_blocked", user.id, request, {"reason": "unverified_email"}) raise ValueError("unverified") token = create_access_token({"user_id": user.id}) log_audit_event(db, "login_success", user.id, request, {"provider": "local"}) return token def request_password_reset(db: Session, email: str) -> str | None: user = db.query(User).filter(User.email == email).first() if not user: return None raw_token = secrets.token_urlsafe(32) hashed_token = hash_password(raw_token) expire_date = datetime.now(UTC) + timedelta(minutes=15) user.reset_token_hash = hashed_token user.reset_token_expire_at = expire_date db.commit() return raw_token def confirm_password_reset(db: Session, token: str, new_password: str, request: Request) -> bool: active_reset_users = db.query(User).filter(User.reset_token_hash.isnot(None)).all() target_user = None for user in active_reset_users: if verify_password(token, user.reset_token_hash): target_user = user break if not target_user: return False if datetime.now(UTC) > target_user.reset_token_expire_at.replace(tzinfo=UTC): return False target_user.password = hash_password(new_password) target_user.reset_token_hash = None target_user.reset_token_expire_at = None db.commit() log_audit_event(db, "password_reset", target_user.id, request) return True def confirm_email_verification(db: Session, email: str, code: str) -> User | None: target_user = db.query(User).filter(User.email == email, User.is_verified == False).first() if not target_user: return None if not target_user.verification_token_hash or not verify_password(code, target_user.verification_token_hash): return None if datetime.now(UTC) > target_user.verification_token_expire_at.replace(tzinfo=UTC): return None target_user.is_verified = True target_user.verification_token_hash = None target_user.verification_token_expire_at = None db.commit() return target_user def resend_verification(db: Session, email: str) -> str | None: user = db.query(User).filter(User.email == email).first() if not user or user.is_verified: return None raw_token = f"{secrets.randbelow(900000) + 100000}" hashed_token = hash_password(raw_token) expire_date = datetime.now(UTC) + timedelta(minutes=10) user.verification_token_hash = hashed_token user.verification_token_expire_at = expire_date db.commit() return raw_token