rohde-Auth / app /services /auth_service.py
Hammad712's picture
Email based Auth Auth completed
b2aa058
# app/services/auth_service.py
from datetime import datetime, timedelta
from jose import jwt
from passlib.context import CryptContext
from fastapi import HTTPException, UploadFile
from bson import ObjectId
from app.core.config import SECRET_KEY, ACCESS_TOKEN_EXPIRE_MINUTES, REFRESH_TOKEN_EXPIRE_DAYS
from app.core.db import users_collection, fs
import logging
# Use a module-specific logger so logs are easy to filter
logger = logging.getLogger("app.services.auth_service")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
"""Hash a plaintext password (do NOT log the password)."""
logger.debug("hash_password called")
hashed = pwd_context.hash(password)
logger.debug("Password hashed successfully")
return hashed
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify plaintext password against stored hash."""
logger.debug("verify_password called")
try:
ok = pwd_context.verify(plain_password, hashed_password)
logger.debug("Password verification result: %s", ok)
return ok
except Exception as exc:
# Something unexpected happened during verify
logger.exception("Error verifying password: %s", exc)
return False
def create_token(data: dict, expires: timedelta):
"""Create a JWT token with expiry. `data` should contain 'sub'."""
sub = data.get("sub")
logger.debug("create_token called for subject=%s expires=%s", sub, expires)
payload = data.copy()
payload["exp"] = datetime.utcnow() + expires
try:
token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
logger.info("JWT created for subject=%s", sub)
return token
except Exception as exc:
logger.exception("Failed to create JWT for subject=%s: %s", sub, exc)
raise HTTPException(status_code=500, detail="Token generation failed") from exc
def create_access_token(email: str):
logger.debug("create_access_token called for email=%s", email)
token = create_token({"sub": email}, timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
logger.info("Access token generated for %s", email)
return token
def create_refresh_token(email: str):
logger.debug("create_refresh_token called for email=%s", email)
token = create_token({"sub": email}, timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS))
logger.info("Refresh token generated for %s", email)
return token
def _delete_avatar_file_if_exists(file_id: str) -> None:
"""Delete an avatar file from GridFS if it exists. Non-fatal on failure."""
if not file_id:
logger.debug("_delete_avatar_file_if_exists called with empty file_id; skipping")
return
try:
oid = ObjectId(file_id)
except Exception:
logger.warning("Invalid avatar file_id, skipping delete: %s", file_id)
return
try:
fs.delete(oid)
logger.info("Deleted old avatar from GridFS: %s", file_id)
except Exception as exc:
# Non-fatal: log and continue. Common causes: already deleted or invalid id.
logger.warning("Failed to delete avatar %s from GridFS: %s", file_id, exc)
def save_avatar(file: UploadFile) -> str:
"""Save a new avatar to GridFS and return its file id (string)."""
logger.debug("save_avatar called filename=%s content_type=%s", getattr(file, "filename", None), getattr(file, "content_type", None))
allowed = ["image/jpeg", "image/png", "image/gif"]
if file.content_type not in allowed:
logger.warning("Rejected avatar upload due to invalid content type: %s", file.content_type)
raise HTTPException(status_code=400, detail="Invalid image type.")
# read file content (UploadFile.file is a file-like object)
try:
contents = file.file.read()
except Exception as exc:
logger.exception("Failed to read uploaded avatar file: %s", exc)
raise HTTPException(status_code=400, detail="Failed to read uploaded file") from exc
try:
file_id = fs.put(contents, filename=file.filename, contentType=file.content_type)
logger.info("Saved avatar to GridFS with id: %s (size=%d bytes)", file_id, len(contents))
return str(file_id)
except Exception as exc:
logger.exception("Failed to save avatar to GridFS: %s", exc)
raise HTTPException(status_code=500, detail="Failed to store avatar") from exc
def fetch_user(email: str):
"""Return the raw user document (or None) by email."""
logger.debug("fetch_user called for email=%s", email)
if not email:
logger.debug("fetch_user called with empty email")
return None
try:
user = users_collection.find_one({"email": email})
logger.debug("fetch_user found: %s", bool(user))
return user
except Exception as exc:
logger.exception("Error fetching user %s: %s", email, exc)
raise HTTPException(status_code=500, detail="Failed to fetch user") from exc
def authenticate(email: str, password: str):
"""Authenticate user by email and password. Returns user dict or None."""
logger.debug("authenticate called for email=%s", email)
user = fetch_user(email)
if not user:
logger.warning("Authenticate failed: user not found for email=%s", email)
return None
if not verify_password(password, user.get("hashed_password", "")):
logger.warning("Authenticate failed: invalid credentials for email=%s", email)
return None
logger.info("User authenticated successfully: %s", email)
return user
def update_user_profile(current_email: str, name: str | None = None, new_email: str | None = None,
password: str | None = None, avatar: UploadFile | None = None):
"""
Update profile fields for the user identified by current_email.
If avatar is provided, the old avatar (if any) will be deleted from GridFS.
Returns the updated user document (with hashed_password removed).
"""
logger.info("update_user_profile called for current_email=%s", current_email)
if not current_email:
logger.error("update_user_profile missing current_email")
raise HTTPException(status_code=400, detail="Missing current user email")
# Fetch current user to get current avatar id (if any)
current_user = fetch_user(current_email)
if not current_user:
logger.warning("update_user_profile user not found: %s", current_email)
raise HTTPException(status_code=404, detail="User not found")
update_data = {}
if name:
update_data["name"] = name
logger.debug("Will update name for %s", current_email)
if new_email:
update_data["email"] = new_email
logger.debug("Will update email for %s -> %s", current_email, new_email)
if password:
update_data["hashed_password"] = hash_password(password)
logger.debug("Password updated for %s (hashed, not logged)", current_email)
if avatar:
logger.debug("Avatar upload detected for %s; processing replacement", current_email)
# Delete the old avatar first (if exists)
old_avatar_id = current_user.get("avatar")
if old_avatar_id:
logger.debug("Deleting old avatar id=%s for user=%s", old_avatar_id, current_email)
_delete_avatar_file_if_exists(old_avatar_id)
# Save the new avatar and set the new id
new_file_id = save_avatar(avatar)
update_data["avatar"] = new_file_id
logger.debug("New avatar saved with id=%s for user=%s", new_file_id, current_email)
if not update_data:
logger.info("No update parameters provided for %s", current_email)
raise HTTPException(status_code=400, detail="No update parameters provided")
try:
result = users_collection.update_one({"email": current_email}, {"$set": update_data})
except Exception as exc:
logger.exception("Failed to update user in DB for %s: %s", current_email, exc)
raise HTTPException(status_code=500, detail="Failed to update user") from exc
if result.matched_count == 0:
logger.warning("Update attempted but user not found during DB update for %s", current_email)
raise HTTPException(status_code=404, detail="User not found")
# fetch and return the updated document (prefer the new email if changed)
lookup_email = new_email if new_email else current_email
try:
updated_user = users_collection.find_one({"email": lookup_email})
except Exception as exc:
logger.exception("Failed to fetch updated user %s: %s", lookup_email, exc)
raise HTTPException(status_code=500, detail="Failed to fetch updated user") from exc
if not updated_user:
logger.error("Updated user document not found after update for %s", lookup_email)
raise HTTPException(status_code=500, detail="Updated user not found")
# Remove hashed_password before returning user object to any caller
if "hashed_password" in updated_user:
updated_user.pop("hashed_password", None)
logger.debug("Removed hashed_password from returned user object for %s", lookup_email)
logger.info("User profile updated successfully for %s", lookup_email)
return updated_user