Spaces:
Sleeping
Sleeping
| # app/services/auth_service.py | |
| from datetime import datetime, timedelta | |
| from jose import jwt | |
| from passlib.context import CryptContext | |
| from fastapi import HTTPException, UploadFile | |
| from bson import ObjectId | |
| from app.core.config import SECRET_KEY, ACCESS_TOKEN_EXPIRE_MINUTES, REFRESH_TOKEN_EXPIRE_DAYS | |
| from app.core.db import users_collection, fs | |
| import logging | |
| # Use a module-specific logger so logs are easy to filter | |
| logger = logging.getLogger("app.services.auth_service") | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| def hash_password(password: str) -> str: | |
| """Hash a plaintext password (do NOT log the password).""" | |
| logger.debug("hash_password called") | |
| hashed = pwd_context.hash(password) | |
| logger.debug("Password hashed successfully") | |
| return hashed | |
| def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| """Verify plaintext password against stored hash.""" | |
| logger.debug("verify_password called") | |
| try: | |
| ok = pwd_context.verify(plain_password, hashed_password) | |
| logger.debug("Password verification result: %s", ok) | |
| return ok | |
| except Exception as exc: | |
| # Something unexpected happened during verify | |
| logger.exception("Error verifying password: %s", exc) | |
| return False | |
| def create_token(data: dict, expires: timedelta): | |
| """Create a JWT token with expiry. `data` should contain 'sub'.""" | |
| sub = data.get("sub") | |
| logger.debug("create_token called for subject=%s expires=%s", sub, expires) | |
| payload = data.copy() | |
| payload["exp"] = datetime.utcnow() + expires | |
| try: | |
| token = jwt.encode(payload, SECRET_KEY, algorithm="HS256") | |
| logger.info("JWT created for subject=%s", sub) | |
| return token | |
| except Exception as exc: | |
| logger.exception("Failed to create JWT for subject=%s: %s", sub, exc) | |
| raise HTTPException(status_code=500, detail="Token generation failed") from exc | |
| def create_access_token(email: str): | |
| logger.debug("create_access_token called for email=%s", email) | |
| token = create_token({"sub": email}, timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) | |
| logger.info("Access token generated for %s", email) | |
| return token | |
| def create_refresh_token(email: str): | |
| logger.debug("create_refresh_token called for email=%s", email) | |
| token = create_token({"sub": email}, timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)) | |
| logger.info("Refresh token generated for %s", email) | |
| return token | |
| def _delete_avatar_file_if_exists(file_id: str) -> None: | |
| """Delete an avatar file from GridFS if it exists. Non-fatal on failure.""" | |
| if not file_id: | |
| logger.debug("_delete_avatar_file_if_exists called with empty file_id; skipping") | |
| return | |
| try: | |
| oid = ObjectId(file_id) | |
| except Exception: | |
| logger.warning("Invalid avatar file_id, skipping delete: %s", file_id) | |
| return | |
| try: | |
| fs.delete(oid) | |
| logger.info("Deleted old avatar from GridFS: %s", file_id) | |
| except Exception as exc: | |
| # Non-fatal: log and continue. Common causes: already deleted or invalid id. | |
| logger.warning("Failed to delete avatar %s from GridFS: %s", file_id, exc) | |
| def save_avatar(file: UploadFile) -> str: | |
| """Save a new avatar to GridFS and return its file id (string).""" | |
| logger.debug("save_avatar called filename=%s content_type=%s", getattr(file, "filename", None), getattr(file, "content_type", None)) | |
| allowed = ["image/jpeg", "image/png", "image/gif"] | |
| if file.content_type not in allowed: | |
| logger.warning("Rejected avatar upload due to invalid content type: %s", file.content_type) | |
| raise HTTPException(status_code=400, detail="Invalid image type.") | |
| # read file content (UploadFile.file is a file-like object) | |
| try: | |
| contents = file.file.read() | |
| except Exception as exc: | |
| logger.exception("Failed to read uploaded avatar file: %s", exc) | |
| raise HTTPException(status_code=400, detail="Failed to read uploaded file") from exc | |
| try: | |
| file_id = fs.put(contents, filename=file.filename, contentType=file.content_type) | |
| logger.info("Saved avatar to GridFS with id: %s (size=%d bytes)", file_id, len(contents)) | |
| return str(file_id) | |
| except Exception as exc: | |
| logger.exception("Failed to save avatar to GridFS: %s", exc) | |
| raise HTTPException(status_code=500, detail="Failed to store avatar") from exc | |
| def fetch_user(email: str): | |
| """Return the raw user document (or None) by email.""" | |
| logger.debug("fetch_user called for email=%s", email) | |
| if not email: | |
| logger.debug("fetch_user called with empty email") | |
| return None | |
| try: | |
| user = users_collection.find_one({"email": email}) | |
| logger.debug("fetch_user found: %s", bool(user)) | |
| return user | |
| except Exception as exc: | |
| logger.exception("Error fetching user %s: %s", email, exc) | |
| raise HTTPException(status_code=500, detail="Failed to fetch user") from exc | |
| def authenticate(email: str, password: str): | |
| """Authenticate user by email and password. Returns user dict or None.""" | |
| logger.debug("authenticate called for email=%s", email) | |
| user = fetch_user(email) | |
| if not user: | |
| logger.warning("Authenticate failed: user not found for email=%s", email) | |
| return None | |
| if not verify_password(password, user.get("hashed_password", "")): | |
| logger.warning("Authenticate failed: invalid credentials for email=%s", email) | |
| return None | |
| logger.info("User authenticated successfully: %s", email) | |
| return user | |
| def update_user_profile(current_email: str, name: str | None = None, new_email: str | None = None, | |
| password: str | None = None, avatar: UploadFile | None = None): | |
| """ | |
| Update profile fields for the user identified by current_email. | |
| If avatar is provided, the old avatar (if any) will be deleted from GridFS. | |
| Returns the updated user document (with hashed_password removed). | |
| """ | |
| logger.info("update_user_profile called for current_email=%s", current_email) | |
| if not current_email: | |
| logger.error("update_user_profile missing current_email") | |
| raise HTTPException(status_code=400, detail="Missing current user email") | |
| # Fetch current user to get current avatar id (if any) | |
| current_user = fetch_user(current_email) | |
| if not current_user: | |
| logger.warning("update_user_profile user not found: %s", current_email) | |
| raise HTTPException(status_code=404, detail="User not found") | |
| update_data = {} | |
| if name: | |
| update_data["name"] = name | |
| logger.debug("Will update name for %s", current_email) | |
| if new_email: | |
| update_data["email"] = new_email | |
| logger.debug("Will update email for %s -> %s", current_email, new_email) | |
| if password: | |
| update_data["hashed_password"] = hash_password(password) | |
| logger.debug("Password updated for %s (hashed, not logged)", current_email) | |
| if avatar: | |
| logger.debug("Avatar upload detected for %s; processing replacement", current_email) | |
| # Delete the old avatar first (if exists) | |
| old_avatar_id = current_user.get("avatar") | |
| if old_avatar_id: | |
| logger.debug("Deleting old avatar id=%s for user=%s", old_avatar_id, current_email) | |
| _delete_avatar_file_if_exists(old_avatar_id) | |
| # Save the new avatar and set the new id | |
| new_file_id = save_avatar(avatar) | |
| update_data["avatar"] = new_file_id | |
| logger.debug("New avatar saved with id=%s for user=%s", new_file_id, current_email) | |
| if not update_data: | |
| logger.info("No update parameters provided for %s", current_email) | |
| raise HTTPException(status_code=400, detail="No update parameters provided") | |
| try: | |
| result = users_collection.update_one({"email": current_email}, {"$set": update_data}) | |
| except Exception as exc: | |
| logger.exception("Failed to update user in DB for %s: %s", current_email, exc) | |
| raise HTTPException(status_code=500, detail="Failed to update user") from exc | |
| if result.matched_count == 0: | |
| logger.warning("Update attempted but user not found during DB update for %s", current_email) | |
| raise HTTPException(status_code=404, detail="User not found") | |
| # fetch and return the updated document (prefer the new email if changed) | |
| lookup_email = new_email if new_email else current_email | |
| try: | |
| updated_user = users_collection.find_one({"email": lookup_email}) | |
| except Exception as exc: | |
| logger.exception("Failed to fetch updated user %s: %s", lookup_email, exc) | |
| raise HTTPException(status_code=500, detail="Failed to fetch updated user") from exc | |
| if not updated_user: | |
| logger.error("Updated user document not found after update for %s", lookup_email) | |
| raise HTTPException(status_code=500, detail="Updated user not found") | |
| # Remove hashed_password before returning user object to any caller | |
| if "hashed_password" in updated_user: | |
| updated_user.pop("hashed_password", None) | |
| logger.debug("Removed hashed_password from returned user object for %s", lookup_email) | |
| logger.info("User profile updated successfully for %s", lookup_email) | |
| return updated_user | |