import random import time as _time from datetime import datetime from bcrypt import hashpw, gensalt, checkpw import logging from ..core.db_connector import users_collection from ..core.caching import ( cache_result, cache_user_data, cache_login_data, clear_user_cache as clear_user_cache_func, USER_DATA_CACHE_DURATION, LOGIN_CACHE_DURATION ) logger = logging.getLogger(__name__) def generate_unique_id(): """Generate a unique 6-digit ID for a user.""" while True: _id = random.randint(100000, 999999) if users_collection.find_one({"_id": _id}) is None: return _id def create_user(username, password, email=None): """Create a new user with a unique ID, including email support.""" _id = generate_unique_id() hashed_password = hashpw(password.encode('utf-8'), gensalt()) user_doc = { "_id": _id, "username": username, "password": hashed_password, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } # Add email if provided if email: user_doc["email"] = email users_collection.insert_one(user_doc) return _id # Return the new user's ID def create_anilist_user(anilist_user_info, access_token): """Create a new user from AniList OAuth data.""" _id = generate_unique_id() # Extract user information from AniList data username = anilist_user_info['name'] anilist_id = anilist_user_info['id'] avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium') # Prepare statistics if available stats = {} if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']: anime_stats = anilist_user_info['statistics']['anime'] stats = { 'anime_count': anime_stats.get('count', 0), 'mean_score': anime_stats.get('meanScore', 0), 'minutes_watched': anime_stats.get('minutesWatched', 0) } user_doc = { "_id": _id, "username": username, "anilist_id": anilist_id, "anilist_access_token": access_token, "avatar": avatar, "anilist_stats": stats, "banner_image": anilist_user_info.get('bannerImage'), "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), "auth_method": "anilist" } users_collection.insert_one(user_doc) return _id def update_anilist_user(user_id, anilist_user_info, access_token): """Update existing user with latest AniList information.""" avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium') # Prepare statistics if available stats = {} if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']: anime_stats = anilist_user_info['statistics']['anime'] stats = { 'anime_count': anime_stats.get('count', 0), 'mean_score': anime_stats.get('meanScore', 0), 'minutes_watched': anime_stats.get('minutesWatched', 0) } update_doc = { "$set": { "anilist_access_token": access_token, "avatar": avatar, "anilist_stats": stats, "banner_image": anilist_user_info.get('bannerImage'), "updated_at": datetime.utcnow() } } users_collection.update_one({"_id": user_id}, update_doc) clear_user_cache_func(user_id) return True def get_user_by_anilist_id(anilist_id): """Get user by AniList ID.""" return users_collection.find_one({"anilist_id": anilist_id}) def get_user(username, password): """Retrieve a user by username and password.""" user = users_collection.find_one({"username": username}) if user and user.get('password') and checkpw(password.encode('utf-8'), user['password']): return user return None @cache_user_data(duration=LOGIN_CACHE_DURATION) def get_user_by_id(_id): """Get user by ID (cached for 1 hour).""" return users_collection.find_one({"_id": _id}) def get_user_by_email(email): """Get user by email (case-insensitive).""" import re as _re return users_collection.find_one({"email": _re.compile(f'^{_re.escape(email)}$', _re.IGNORECASE)}) def user_exists(username): """Check if a user with the given username already exists.""" return users_collection.find_one({"username": username}) is not None def email_exists(email): """Check if a user with the given email already exists.""" if not email: return False return users_collection.find_one({"email": email}) is not None def update_user_avatar(_id, avatar_url): """Update user's avatar.""" users_collection.update_one( {"_id": _id}, { "$set": { "avatar": avatar_url, "updated_at": datetime.utcnow() } } ) clear_user_cache_func(_id) def update_user_email(_id, email): """Update user's email if it doesn't already exist.""" if email_exists(email): return False # Email already taken users_collection.update_one( {"_id": _id}, { "$set": { "email": email, "updated_at": datetime.utcnow() } } ) clear_user_cache_func(_id) return True def change_password(_id, old_password, new_password): """Change user's password after verifying old password.""" user = get_user_by_id(_id) if not user or not user.get('password'): return False # Verify old password if not checkpw(old_password.encode('utf-8'), user['password']): return False # Hash new password new_hashed_password = hashpw(new_password.encode('utf-8'), gensalt()) current_version = user.get('password_version', 0) # Update password users_collection.update_one( {"_id": _id}, { "$set": { "password": new_hashed_password, "password_version": current_version + 1, "updated_at": datetime.utcnow() } } ) clear_user_cache_func(_id) return True def delete_user(_id): """Delete a user by ID.""" result = users_collection.delete_one({"_id": _id}) return result.deleted_count > 0 def get_all_users(): """Get all users (for admin purposes - exclude passwords).""" return list(users_collection.find({}, {"password": 0})) def get_user_count(): """Get total number of users.""" return users_collection.count_documents({}) # Additional utility functions for user management def search_users(query, limit=10): """Search users by username or email.""" search_filter = { "$or": [ {"username": {"$regex": query, "$options": "i"}}, {"email": {"$regex": query, "$options": "i"}} ] } return list(users_collection.find(search_filter, {"password": 0}).limit(limit)) def get_recent_users(limit=10): """Get recently registered users.""" return list(users_collection.find({}, {"password": 0}) .sort("created_at", -1) .limit(limit)) def link_anilist_to_existing_user(user_id, anilist_user_info, access_token): """ Link an AniList account to an existing user. Args: user_id (int): Existing user's internal ID (_id). anilist_user_info (dict): AniList user information (from AniList API). access_token (str): AniList OAuth access token. Returns: bool: True if updated successfully, False otherwise. """ user = get_user_by_id(user_id) if not user: return False # User not found # Prevent linking if AniList ID is already linked to another account existing = get_user_by_anilist_id(anilist_user_info['id']) if existing and existing['_id'] != user_id: return False # AniList account already linked elsewhere avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium') stats = {} if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']: anime_stats = anilist_user_info['statistics']['anime'] stats = { 'anime_count': anime_stats.get('count', 0), 'mean_score': anime_stats.get('meanScore', 0), 'minutes_watched': anime_stats.get('minutesWatched', 0) } update_doc = { "$set": { "anilist_id": anilist_user_info['id'], "anilist_access_token": access_token, "avatar": avatar, "anilist_stats": stats, "banner_image": anilist_user_info.get('bannerImage'), "updated_at": datetime.utcnow(), "auth_method": "anilist_linked" } } users_collection.update_one({"_id": user_id}, update_doc) return True def unlink_anilist_from_user(user_id: str) -> bool: """Remove AniList credentials from a user.""" result = users_collection.update_one( {"_id": user_id}, {"$unset": { "anilist_access_token": "", "anilist_refresh_token": "", "anilist_expires_at": "" }} ) return result.modified_count > 0 def delete_anilist_data(user_id: int) -> bool: """Completely remove all AniList-related data from a user account.""" try: # First, get the current user data to log what we're removing user = get_user_by_id(user_id) if not user: logger.warning(f"User {user_id} not found when trying to delete AniList data") return False anilist_id = user.get('anilist_id') username = user.get('username', 'Unknown') # Remove all AniList-related fields from the user document result = users_collection.update_one( {"_id": user_id}, { "$unset": { "anilist_id": "", "anilist_access_token": "", "anilist_refresh_token": "", "anilist_expires_at": "", "anilist_stats": "", "banner_image": "", # Remove AniList banner }, "$set": { "updated_at": datetime.utcnow(), "auth_method": "local" # Reset to local authentication only } } ) if result.modified_count > 0: logger.info(f"Successfully deleted AniList data for user {username} (ID: {user_id}, AniList ID: {anilist_id})") return True else: logger.warning(f"No AniList data found to delete for user {username} (ID: {user_id})") # Return True anyway since the goal (no AniList data) is achieved return True except Exception as e: logger.error(f"Error deleting AniList data for user {user_id}: {e}") return False def connect_anilist_to_user(user_id: int, anilist_user_info: dict, access_token: str) -> bool: """Connect an AniList account to an existing user.""" try: user = get_user_by_id(user_id) if not user: logger.error(f"User {user_id} not found") return False # Check if AniList account is already connected to another user existing = get_user_by_anilist_id(anilist_user_info['id']) if existing and existing['_id'] != user_id: logger.error(f"AniList account {anilist_user_info['id']} already connected to user {existing['_id']}") return False avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium') stats = {} if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']: anime_stats = anilist_user_info['statistics']['anime'] stats = { 'anime_count': anime_stats.get('count', 0), 'mean_score': anime_stats.get('meanScore', 0), 'minutes_watched': anime_stats.get('minutesWatched', 0) } update_doc = { "$set": { "anilist_id": anilist_user_info['id'], "anilist_access_token": access_token, "avatar": avatar, "anilist_stats": stats, "banner_image": anilist_user_info.get('bannerImage'), "updated_at": datetime.utcnow(), "auth_method": "anilist_linked" } } result = users_collection.update_one({"_id": user_id}, update_doc) if result.modified_count > 0: logger.info(f"Successfully connected AniList account {anilist_user_info['id']} to user {user_id}") clear_user_cache_func(user_id) return True else: logger.error(f"Failed to update user {user_id} with AniList data") return False except Exception as e: logger.error(f"Error connecting AniList account to user {user_id}: {e}") return False def get_anilist_connection_info(user_id: int) -> dict: """Get detailed AniList connection information for a user (cached).""" return _get_anilist_connection_info_uncached(user_id) @cache_user_data(duration=LOGIN_CACHE_DURATION) def _get_anilist_connection_info_uncached(user_id: int) -> dict: """Internal uncached version of get_anilist_connection_info.""" try: user = users_collection.find_one({"_id": user_id}) if not user: return {'connected': False, 'error': 'User not found'} is_connected = bool(user.get('anilist_id')) if not is_connected: return { 'connected': False, 'user_id': user_id, 'username': user.get('username') } return { 'connected': True, 'user_id': user_id, 'username': user.get('username'), 'anilist_id': user.get('anilist_id'), 'avatar': user.get('avatar'), 'anilist_stats': user.get('anilist_stats', {}), 'banner_image': user.get('banner_image'), 'auth_method': user.get('auth_method'), 'connected_at': user.get('updated_at'), 'has_access_token': bool(user.get('anilist_access_token')) } except Exception as e: logger.error(f"Error getting AniList connection info for user {user_id}: {e}") return {'connected': False, 'error': str(e)} # ────────────────────────────────────────────── # Password Reset (Forgot Password) helpers # ────────────────────────────────────────────── def store_reset_code(email: str, hashed_code: bytes, expires_at: datetime) -> bool: """Store a bcrypt-hashed reset code and expiry on the user document.""" result = users_collection.update_one( {"email": email}, {"$set": { "reset_code": hashed_code, "reset_code_expires": expires_at, "updated_at": datetime.utcnow() }} ) return result.modified_count > 0 def verify_reset_code(email: str, code: str) -> bool: """Check that *code* matches the stored hash and has not expired.""" user = users_collection.find_one( {"email": email}, {"reset_code": 1, "reset_code_expires": 1} ) if not user or not user.get("reset_code") or not user.get("reset_code_expires"): return False if datetime.utcnow() > user["reset_code_expires"]: # Expired — clean up clear_reset_code(email) return False return checkpw(code.encode("utf-8"), user["reset_code"]) def clear_reset_code(email: str) -> None: """Remove reset-code fields from the user document.""" users_collection.update_one( {"email": email}, {"$unset": {"reset_code": "", "reset_code_expires": ""}} ) def reset_password(email: str, new_password: str) -> bool: """Set a new password for the user identified by *email* (no old password needed).""" hashed = hashpw(new_password.encode("utf-8"), gensalt()) user = users_collection.find_one({"email": email}) if not user: return False current_version = user.get('password_version', 0) result = users_collection.update_one( {"email": email}, {"$set": { "password": hashed, "password_version": current_version + 1, "updated_at": datetime.utcnow() }} ) if result.modified_count > 0: user = users_collection.find_one({"email": email}, {"_id": 1}) if user: clear_user_cache_func(user["_id"]) clear_reset_code(email) return True return False # ────────────────────────────────────────────── # MyAnimeList integration helpers # ────────────────────────────────────────────── def connect_mal_to_user(user_id: int, mal_user_info: dict, access_token: str, refresh_token: str, expires_in: int) -> bool: """Store MAL credentials on an existing user document.""" try: update_doc = { "$set": { "mal_id": mal_user_info.get("id"), "mal_username": mal_user_info.get("name"), "mal_access_token": access_token, "mal_refresh_token": refresh_token, "mal_token_expires_at": _time.time() + expires_in, "updated_at": datetime.utcnow(), } } result = users_collection.update_one({"_id": user_id}, update_doc) if result.modified_count > 0: logger.info(f"MAL account {mal_user_info.get('id')} connected to user {user_id}") clear_user_cache_func(user_id) return True return False except Exception as e: logger.error(f"Error connecting MAL to user {user_id}: {e}") return False def delete_mal_data(user_id: int) -> bool: """Remove all MAL-related fields from a user document.""" try: result = users_collection.update_one( {"_id": user_id}, { "$unset": { "mal_id": "", "mal_username": "", "mal_access_token": "", "mal_refresh_token": "", "mal_token_expires_at": "", }, "$set": {"updated_at": datetime.utcnow()}, } ) if result.modified_count > 0: logger.info(f"MAL data deleted for user {user_id}") clear_user_cache_func(user_id) return True except Exception as e: logger.error(f"Error deleting MAL data for user {user_id}: {e}") return False def get_mal_tokens(user_id: int) -> dict | None: """Return MAL tokens for a user, or None if not connected.""" try: user = users_collection.find_one( {"_id": user_id}, {"mal_access_token": 1, "mal_refresh_token": 1, "mal_token_expires_at": 1, "mal_id": 1} ) if not user or not user.get("mal_access_token"): return None return { "access_token": user["mal_access_token"], "refresh_token": user.get("mal_refresh_token"), "expires_at": user.get("mal_token_expires_at", 0), "mal_id": user.get("mal_id"), } except Exception as e: logger.error(f"Error getting MAL tokens for user {user_id}: {e}") return None def update_mal_tokens(user_id: int, access_token: str, refresh_token: str, expires_in: int) -> bool: """Update MAL tokens after a refresh.""" try: result = users_collection.update_one( {"_id": user_id}, {"$set": { "mal_access_token": access_token, "mal_refresh_token": refresh_token, "mal_token_expires_at": _time.time() + expires_in, "updated_at": datetime.utcnow(), }} ) if result.modified_count > 0: clear_user_cache_func(user_id) return True except Exception as e: logger.error(f"Error updating MAL tokens for user {user_id}: {e}") return False