Spaces:
Running
Running
| import secrets | |
| from datetime import timedelta, datetime, UTC | |
| from sqlalchemy.orm import Session | |
| from app.models.user_model import User | |
| from app.models.file_model import File | |
| from app.models.user_issue_model import UserIssue | |
| from app.utils.security import hash_password | |
| from app.services.file_delete_service import delete_file_service | |
| def create_user(db: Session, email: str, username: str, password: str) -> tuple[User | None, str | None]: | |
| if db.query(User).filter(User.email == email).first(): | |
| raise ValueError("email_exists") | |
| if db.query(User).filter(User.username == username).first(): | |
| raise ValueError("username_exists") | |
| hashed_password = hash_password(password) | |
| raw_token = f"{secrets.randbelow(900000) + 100000}" | |
| hashed_token = hash_password(raw_token) | |
| expire_date = datetime.now(UTC) + timedelta(minutes=10) | |
| new_user = User( | |
| email=email, | |
| username=username, | |
| password=hashed_password, | |
| is_verified=False, | |
| verification_token_hash=hashed_token, | |
| verification_token_expire_at=expire_date | |
| ) | |
| db.add(new_user) | |
| db.commit() | |
| db.refresh(new_user) | |
| return new_user, raw_token | |
| def delete_user_account(db: Session, user_id: int): | |
| # 1. Fetch user | |
| user = db.query(User).filter(User.id == user_id).first() | |
| if not user: | |
| raise ValueError("User not found") | |
| # 2. Delete all files (and Cloudinary storage media) | |
| files = db.query(File).filter(File.owner_id == user_id).all() | |
| for file in files: | |
| try: | |
| delete_file_service(db, file.id, user_id) | |
| except Exception: | |
| pass # ignore if already deleted or error, ensure we keep deleting others | |
| # 3. Delete all User Issues | |
| db.query(UserIssue).filter(UserIssue.user_id == user_id).delete() | |
| # 4. Delete the user | |
| db.delete(user) | |
| db.commit() |