Spaces:
Running
Running
| import secrets | |
| from datetime import timedelta, datetime, UTC | |
| from sqlalchemy.orm import Session | |
| from app.models.user_model import User | |
| from app.utils.security import verify_password, hash_password | |
| from app.utils.jwt_handler import create_access_token | |
| from fastapi import Request | |
| from app.services.audit_service import log_audit_event | |
| def login_user(db: Session, identifier: str, password: str, request: Request): | |
| user = db.query(User).filter( | |
| (User.email == identifier) | (User.username == identifier) | |
| ).first() | |
| if not user: | |
| log_audit_event(db, "login_failed", None, request, {"identifier_attempted": identifier, "reason": "user_not_found"}) | |
| return None | |
| if user.locked_until: | |
| if datetime.now(UTC).replace(tzinfo=None) < user.locked_until: | |
| time_left = user.locked_until - datetime.now(UTC).replace(tzinfo=None) | |
| minutes_left = int(time_left.total_seconds() / 60) + 1 | |
| log_audit_event(db, "login_blocked", user.id, request, {"reason": "account_locked", "minutes_left": minutes_left}) | |
| raise ValueError(f"locked_{minutes_left}") | |
| else: | |
| user.locked_until = None | |
| user.failed_attempts = 0 | |
| db.commit() | |
| if not verify_password(password, user.password): | |
| user.failed_attempts += 1 | |
| if user.failed_attempts >= 5: | |
| user.locked_until = datetime.now(UTC).replace(tzinfo=None) + timedelta(minutes=15) | |
| db.commit() | |
| log_audit_event(db, "account_locked", user.id, request, {"reason": "max_failed_attempts"}) | |
| raise ValueError("locked_15") | |
| db.commit() | |
| attempts_left = 5 - user.failed_attempts | |
| log_audit_event(db, "login_failed", user.id, request, {"reason": "bad_password", "attempts_left": attempts_left}) | |
| raise ValueError(f"attempt_{attempts_left}") | |
| user.failed_attempts = 0 | |
| user.locked_until = None | |
| db.commit() | |
| if not user.is_verified: | |
| log_audit_event(db, "login_blocked", user.id, request, {"reason": "unverified_email"}) | |
| raise ValueError("unverified") | |
| token = create_access_token({"user_id": user.id}) | |
| log_audit_event(db, "login_success", user.id, request, {"provider": "local"}) | |
| return token | |
| def request_password_reset(db: Session, email: str) -> str | None: | |
| user = db.query(User).filter(User.email == email).first() | |
| if not user: | |
| return None | |
| raw_token = secrets.token_urlsafe(32) | |
| hashed_token = hash_password(raw_token) | |
| expire_date = datetime.now(UTC) + timedelta(minutes=15) | |
| user.reset_token_hash = hashed_token | |
| user.reset_token_expire_at = expire_date | |
| db.commit() | |
| return raw_token | |
| def confirm_password_reset(db: Session, token: str, new_password: str, request: Request) -> bool: | |
| active_reset_users = db.query(User).filter(User.reset_token_hash.isnot(None)).all() | |
| target_user = None | |
| for user in active_reset_users: | |
| if verify_password(token, user.reset_token_hash): | |
| target_user = user | |
| break | |
| if not target_user: | |
| return False | |
| if datetime.now(UTC) > target_user.reset_token_expire_at.replace(tzinfo=UTC): | |
| return False | |
| target_user.password = hash_password(new_password) | |
| target_user.reset_token_hash = None | |
| target_user.reset_token_expire_at = None | |
| db.commit() | |
| log_audit_event(db, "password_reset", target_user.id, request) | |
| return True | |
| def confirm_email_verification(db: Session, email: str, code: str) -> User | None: | |
| target_user = db.query(User).filter(User.email == email, User.is_verified == False).first() | |
| if not target_user: | |
| return None | |
| if not target_user.verification_token_hash or not verify_password(code, target_user.verification_token_hash): | |
| return None | |
| if datetime.now(UTC) > target_user.verification_token_expire_at.replace(tzinfo=UTC): | |
| return None | |
| target_user.is_verified = True | |
| target_user.verification_token_hash = None | |
| target_user.verification_token_expire_at = None | |
| db.commit() | |
| return target_user | |
| def resend_verification(db: Session, email: str) -> str | None: | |
| user = db.query(User).filter(User.email == email).first() | |
| if not user or user.is_verified: | |
| return None | |
| raw_token = f"{secrets.randbelow(900000) + 100000}" | |
| hashed_token = hash_password(raw_token) | |
| expire_date = datetime.now(UTC) + timedelta(minutes=10) | |
| user.verification_token_hash = hashed_token | |
| user.verification_token_expire_at = expire_date | |
| db.commit() | |
| return raw_token | |