| import random
|
| import time as _time
|
| import re as _re
|
| from datetime import datetime
|
| from bcrypt import hashpw, gensalt, checkpw
|
| import logging
|
| from ..core.db_connector import users_collection
|
| from ..core.caching import (
|
| cache_result, cache_user_data, cache_login_data,
|
| clear_user_cache as clear_user_cache_func,
|
| USER_DATA_CACHE_DURATION, LOGIN_CACHE_DURATION
|
| )
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| def generate_unique_id():
|
| """Generate a unique 6-digit ID for a user."""
|
| while True:
|
| _id = random.randint(100000, 999999)
|
| if users_collection.find_one({"_id": _id}) is None:
|
| return _id
|
|
|
| def create_user(username, password, email=None):
|
| """Create a new user with a unique ID, including email support."""
|
| _id = generate_unique_id()
|
| hashed_password = hashpw(password.encode('utf-8'), gensalt())
|
|
|
| user_doc = {
|
| "_id": _id,
|
| "username": username,
|
| "password": hashed_password,
|
| "created_at": datetime.utcnow(),
|
| "updated_at": datetime.utcnow()
|
| }
|
|
|
|
|
| if email:
|
| user_doc["email"] = email
|
|
|
| users_collection.insert_one(user_doc)
|
| return _id
|
|
|
| def create_anilist_user(anilist_user_info, access_token):
|
| """Create a new user from AniList OAuth data."""
|
| _id = generate_unique_id()
|
|
|
|
|
| raw_username = anilist_user_info['name']
|
| username = raw_username.replace(' ', '_')
|
|
|
|
|
| base_name = username
|
| counter = 1
|
| while users_collection.find_one({"username": username}) is not None:
|
| username = f"{base_name}_{counter}"
|
| counter += 1
|
|
|
| anilist_id = anilist_user_info['id']
|
| avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium')
|
|
|
|
|
| stats = {}
|
| if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']:
|
| anime_stats = anilist_user_info['statistics']['anime']
|
| stats = {
|
| 'anime_count': anime_stats.get('count', 0),
|
| 'mean_score': anime_stats.get('meanScore', 0),
|
| 'minutes_watched': anime_stats.get('minutesWatched', 0)
|
| }
|
|
|
| user_doc = {
|
| "_id": _id,
|
| "username": username,
|
| "anilist_id": anilist_id,
|
| "anilist_access_token": access_token,
|
| "avatar": avatar,
|
| "anilist_stats": stats,
|
| "banner_image": anilist_user_info.get('bannerImage'),
|
| "created_at": datetime.utcnow(),
|
| "updated_at": datetime.utcnow(),
|
| "auth_method": "anilist"
|
| }
|
|
|
| users_collection.insert_one(user_doc)
|
| return _id
|
|
|
| def update_anilist_user(user_id, anilist_user_info, access_token):
|
| """Update existing user with latest AniList information."""
|
| avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium')
|
|
|
|
|
| stats = {}
|
| if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']:
|
| anime_stats = anilist_user_info['statistics']['anime']
|
| stats = {
|
| 'anime_count': anime_stats.get('count', 0),
|
| 'mean_score': anime_stats.get('meanScore', 0),
|
| 'minutes_watched': anime_stats.get('minutesWatched', 0)
|
| }
|
|
|
| update_doc = {
|
| "$set": {
|
| "anilist_access_token": access_token,
|
| "avatar": avatar,
|
| "anilist_stats": stats,
|
| "banner_image": anilist_user_info.get('bannerImage'),
|
| "updated_at": datetime.utcnow()
|
| }
|
| }
|
|
|
| users_collection.update_one({"_id": user_id}, update_doc)
|
| clear_user_cache_func(user_id)
|
| return True
|
|
|
| def get_user_by_anilist_id(anilist_id):
|
| """Get user by AniList ID."""
|
| return users_collection.find_one({"anilist_id": anilist_id})
|
|
|
| def get_user(username, password):
|
| """Retrieve a user by username and password."""
|
| user = users_collection.find_one({"username": username})
|
| if user and user.get('password') and checkpw(password.encode('utf-8'), user['password']):
|
| return user
|
| return None
|
|
|
| @cache_user_data(duration=LOGIN_CACHE_DURATION)
|
| def get_user_by_id(_id):
|
| """Get user by ID (cached for 1 hour)."""
|
| return users_collection.find_one({"_id": _id})
|
|
|
| def get_user_by_email(email):
|
| """Get user by email (case-insensitive)."""
|
| return users_collection.find_one({"email": _re.compile(f'^{_re.escape(email)}$', _re.IGNORECASE)})
|
|
|
| def user_exists(username):
|
| """Check if a user with the given username already exists."""
|
| return users_collection.find_one({"username": username}) is not None
|
|
|
| def email_exists(email):
|
| """Check if a user with the given email already exists."""
|
| if not email:
|
| return False
|
| return users_collection.find_one({"email": email}) is not None
|
|
|
| def update_user_avatar(_id, avatar_url):
|
| """Update user's avatar."""
|
| users_collection.update_one(
|
| {"_id": _id},
|
| {
|
| "$set": {
|
| "avatar": avatar_url,
|
| "updated_at": datetime.utcnow()
|
| }
|
| }
|
| )
|
| clear_user_cache_func(_id)
|
|
|
| def update_user_email(_id, email):
|
| """Update user's email if it doesn't already exist."""
|
| if email_exists(email):
|
| return False
|
|
|
| users_collection.update_one(
|
| {"_id": _id},
|
| {
|
| "$set": {
|
| "email": email,
|
| "updated_at": datetime.utcnow()
|
| }
|
| }
|
| )
|
| clear_user_cache_func(_id)
|
| return True
|
|
|
| def change_password(_id, old_password, new_password):
|
| """Change user's password after verifying old password."""
|
| user = get_user_by_id(_id)
|
| if not user or not user.get('password'):
|
| return False
|
|
|
|
|
| if not checkpw(old_password.encode('utf-8'), user['password']):
|
| return False
|
|
|
|
|
| new_hashed_password = hashpw(new_password.encode('utf-8'), gensalt())
|
|
|
| current_version = user.get('password_version', 0)
|
|
|
|
|
| users_collection.update_one(
|
| {"_id": _id},
|
| {
|
| "$set": {
|
| "password": new_hashed_password,
|
| "password_version": current_version + 1,
|
| "updated_at": datetime.utcnow()
|
| }
|
| }
|
| )
|
| clear_user_cache_func(_id)
|
| return True
|
|
|
| def delete_user(_id):
|
| """Delete a user by ID."""
|
| result = users_collection.delete_one({"_id": _id})
|
| return result.deleted_count > 0
|
|
|
| def get_all_users():
|
| """Get all users (for admin purposes - exclude passwords)."""
|
| return list(users_collection.find({}, {"password": 0}))
|
|
|
| def get_user_count():
|
| """Get total number of users."""
|
| return users_collection.count_documents({})
|
|
|
|
|
|
|
| def search_users(query, limit=10):
|
| """Search users by username or email."""
|
| search_filter = {
|
| "$or": [
|
| {"username": {"$regex": query, "$options": "i"}},
|
| {"email": {"$regex": query, "$options": "i"}}
|
| ]
|
| }
|
| return list(users_collection.find(search_filter, {"password": 0}).limit(limit))
|
|
|
| def get_recent_users(limit=10):
|
| """Get recently registered users."""
|
| return list(users_collection.find({}, {"password": 0})
|
| .sort("created_at", -1)
|
| .limit(limit))
|
|
|
| def link_anilist_to_existing_user(user_id, anilist_user_info, access_token):
|
| """
|
| Link an AniList account to an existing user.
|
|
|
| Args:
|
| user_id (int): Existing user's internal ID (_id).
|
| anilist_user_info (dict): AniList user information (from AniList API).
|
| access_token (str): AniList OAuth access token.
|
|
|
| Returns:
|
| bool: True if updated successfully, False otherwise.
|
| """
|
| user = get_user_by_id(user_id)
|
| if not user:
|
| return False
|
|
|
|
|
| existing = get_user_by_anilist_id(anilist_user_info['id'])
|
| if existing and existing['_id'] != user_id:
|
| return False
|
|
|
| avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium')
|
|
|
| stats = {}
|
| if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']:
|
| anime_stats = anilist_user_info['statistics']['anime']
|
| stats = {
|
| 'anime_count': anime_stats.get('count', 0),
|
| 'mean_score': anime_stats.get('meanScore', 0),
|
| 'minutes_watched': anime_stats.get('minutesWatched', 0)
|
| }
|
|
|
| update_doc = {
|
| "$set": {
|
| "anilist_id": anilist_user_info['id'],
|
| "anilist_access_token": access_token,
|
| "avatar": avatar,
|
| "anilist_stats": stats,
|
| "banner_image": anilist_user_info.get('bannerImage'),
|
| "updated_at": datetime.utcnow(),
|
| "auth_method": "anilist_linked"
|
| }
|
| }
|
|
|
| users_collection.update_one({"_id": user_id}, update_doc)
|
| return True
|
|
|
| def unlink_anilist_from_user(user_id: str) -> bool:
|
| """Remove AniList credentials from a user."""
|
| result = users_collection.update_one(
|
| {"_id": user_id},
|
| {"$unset": {
|
| "anilist_access_token": "",
|
| "anilist_refresh_token": "",
|
| "anilist_expires_at": ""
|
| }}
|
| )
|
| return result.modified_count > 0
|
|
|
| def delete_anilist_data(user_id: int) -> bool:
|
| """Completely remove all AniList-related data from a user account."""
|
| try:
|
|
|
| user = get_user_by_id(user_id)
|
| if not user:
|
| logger.warning(f"User {user_id} not found when trying to delete AniList data")
|
| return False
|
|
|
| anilist_id = user.get('anilist_id')
|
| username = user.get('username', 'Unknown')
|
|
|
|
|
| result = users_collection.update_one(
|
| {"_id": user_id},
|
| {
|
| "$unset": {
|
| "anilist_id": "",
|
| "anilist_access_token": "",
|
| "anilist_refresh_token": "",
|
| "anilist_expires_at": "",
|
| "anilist_stats": "",
|
| "banner_image": "",
|
| },
|
| "$set": {
|
| "updated_at": datetime.utcnow(),
|
| "auth_method": "local"
|
| }
|
| }
|
| )
|
|
|
| if result.modified_count > 0:
|
| logger.info(f"Successfully deleted AniList data for user {username} (ID: {user_id}, AniList ID: {anilist_id})")
|
| return True
|
| else:
|
| logger.warning(f"No AniList data found to delete for user {username} (ID: {user_id})")
|
|
|
| return True
|
|
|
| except Exception as e:
|
| logger.error(f"Error deleting AniList data for user {user_id}: {e}")
|
| return False
|
|
|
| def connect_anilist_to_user(user_id: int, anilist_user_info: dict, access_token: str) -> bool:
|
| """Connect an AniList account to an existing user."""
|
| try:
|
| user = get_user_by_id(user_id)
|
| if not user:
|
| logger.error(f"User {user_id} not found")
|
| return False
|
|
|
|
|
| existing = get_user_by_anilist_id(anilist_user_info['id'])
|
| if existing and existing['_id'] != user_id:
|
| logger.error(f"AniList account {anilist_user_info['id']} already connected to user {existing['_id']}")
|
| return False
|
|
|
| avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium')
|
|
|
| stats = {}
|
| if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']:
|
| anime_stats = anilist_user_info['statistics']['anime']
|
| stats = {
|
| 'anime_count': anime_stats.get('count', 0),
|
| 'mean_score': anime_stats.get('meanScore', 0),
|
| 'minutes_watched': anime_stats.get('minutesWatched', 0)
|
| }
|
|
|
| update_doc = {
|
| "$set": {
|
| "anilist_id": anilist_user_info['id'],
|
| "anilist_access_token": access_token,
|
| "avatar": avatar,
|
| "anilist_stats": stats,
|
| "banner_image": anilist_user_info.get('bannerImage'),
|
| "updated_at": datetime.utcnow(),
|
| "auth_method": "anilist_linked"
|
| }
|
| }
|
|
|
| result = users_collection.update_one({"_id": user_id}, update_doc)
|
|
|
| if result.modified_count > 0:
|
| logger.info(f"Successfully connected AniList account {anilist_user_info['id']} to user {user_id}")
|
| clear_user_cache_func(user_id)
|
| return True
|
| else:
|
| logger.error(f"Failed to update user {user_id} with AniList data")
|
| return False
|
|
|
| except Exception as e:
|
| logger.error(f"Error connecting AniList account to user {user_id}: {e}")
|
| return False
|
|
|
| def get_anilist_connection_info(user_id: int) -> dict:
|
| """Get detailed AniList connection information for a user (cached)."""
|
| return _get_anilist_connection_info_uncached(user_id)
|
|
|
| @cache_user_data(duration=LOGIN_CACHE_DURATION)
|
| def _get_anilist_connection_info_uncached(user_id: int) -> dict:
|
| """Internal uncached version of get_anilist_connection_info."""
|
| try:
|
| user = users_collection.find_one({"_id": user_id})
|
| if not user:
|
| return {'connected': False, 'error': 'User not found'}
|
|
|
| is_connected = bool(user.get('anilist_id'))
|
|
|
| if not is_connected:
|
| return {
|
| 'connected': False,
|
| 'user_id': user_id,
|
| 'username': user.get('username')
|
| }
|
|
|
| return {
|
| 'connected': True,
|
| 'user_id': user_id,
|
| 'username': user.get('username'),
|
| 'anilist_id': user.get('anilist_id'),
|
| 'avatar': user.get('avatar'),
|
| 'anilist_stats': user.get('anilist_stats', {}),
|
| 'banner_image': user.get('banner_image'),
|
| 'auth_method': user.get('auth_method'),
|
| 'connected_at': user.get('updated_at'),
|
| 'has_access_token': bool(user.get('anilist_access_token'))
|
| }
|
|
|
| except Exception as e:
|
| logger.error(f"Error getting AniList connection info for user {user_id}: {e}")
|
| return {'connected': False, 'error': str(e)}
|
|
|
|
|
|
|
|
|
|
|
|
|
| def store_reset_code(email: str, hashed_code: bytes, expires_at: datetime) -> bool:
|
| """Store a bcrypt-hashed reset code and expiry on the user document."""
|
| result = users_collection.update_one(
|
| {"email": email},
|
| {"$set": {
|
| "reset_code": hashed_code,
|
| "reset_code_expires": expires_at,
|
| "updated_at": datetime.utcnow()
|
| }}
|
| )
|
| return result.modified_count > 0
|
|
|
|
|
| def verify_reset_code(email: str, code: str) -> bool:
|
| """Check that *code* matches the stored hash and has not expired."""
|
| user = users_collection.find_one(
|
| {"email": email},
|
| {"reset_code": 1, "reset_code_expires": 1}
|
| )
|
| if not user or not user.get("reset_code") or not user.get("reset_code_expires"):
|
| return False
|
|
|
| if datetime.utcnow() > user["reset_code_expires"]:
|
|
|
| clear_reset_code(email)
|
| return False
|
|
|
| return checkpw(code.encode("utf-8"), user["reset_code"])
|
|
|
|
|
| def clear_reset_code(email: str) -> None:
|
| """Remove reset-code fields from the user document."""
|
| users_collection.update_one(
|
| {"email": email},
|
| {"$unset": {"reset_code": "", "reset_code_expires": ""}}
|
| )
|
|
|
|
|
| def reset_password(email: str, new_password: str) -> bool:
|
| """Set a new password for the user identified by *email* (no old password needed)."""
|
| hashed = hashpw(new_password.encode("utf-8"), gensalt())
|
|
|
| user = users_collection.find_one({"email": email})
|
| if not user:
|
| return False
|
| current_version = user.get('password_version', 0)
|
|
|
| result = users_collection.update_one(
|
| {"email": email},
|
| {"$set": {
|
| "password": hashed,
|
| "password_version": current_version + 1,
|
| "updated_at": datetime.utcnow()
|
| }}
|
| )
|
| if result.modified_count > 0:
|
| user = users_collection.find_one({"email": email}, {"_id": 1})
|
| if user:
|
| clear_user_cache_func(user["_id"])
|
| clear_reset_code(email)
|
| return True
|
| return False
|
|
|
|
|
|
|
|
|
|
|
|
|
| def connect_mal_to_user(user_id: int, mal_user_info: dict, access_token: str,
|
| refresh_token: str, expires_in: int) -> bool:
|
| """Store MAL credentials on an existing user document."""
|
| try:
|
| update_doc = {
|
| "$set": {
|
| "mal_id": mal_user_info.get("id"),
|
| "mal_username": mal_user_info.get("name"),
|
| "mal_avatar": mal_user_info.get("picture"),
|
| "mal_access_token": access_token,
|
| "mal_refresh_token": refresh_token,
|
| "mal_token_expires_at": _time.time() + expires_in,
|
| "updated_at": datetime.utcnow(),
|
| }
|
| }
|
| result = users_collection.update_one({"_id": user_id}, update_doc)
|
| if result.modified_count > 0:
|
| logger.info(f"MAL account {mal_user_info.get('id')} connected to user {user_id}")
|
| clear_user_cache_func(user_id)
|
| return True
|
| return False
|
| except Exception as e:
|
| logger.error(f"Error connecting MAL to user {user_id}: {e}")
|
| return False
|
|
|
|
|
| def delete_mal_data(user_id: int) -> bool:
|
| """Remove all MAL-related fields from a user document."""
|
| try:
|
| result = users_collection.update_one(
|
| {"_id": user_id},
|
| {
|
| "$unset": {
|
| "mal_id": "",
|
| "mal_username": "",
|
| "mal_avatar": "",
|
| "mal_access_token": "",
|
| "mal_refresh_token": "",
|
| "mal_token_expires_at": "",
|
| },
|
| "$set": {"updated_at": datetime.utcnow()},
|
| }
|
| )
|
| if result.modified_count > 0:
|
| logger.info(f"MAL data deleted for user {user_id}")
|
| clear_user_cache_func(user_id)
|
| return True
|
| except Exception as e:
|
| logger.error(f"Error deleting MAL data for user {user_id}: {e}")
|
| return False
|
|
|
|
|
| def get_mal_tokens(user_id: int) -> dict | None:
|
| """Return MAL tokens for a user, or None if not connected."""
|
| try:
|
| user = users_collection.find_one(
|
| {"_id": user_id},
|
| {"mal_access_token": 1, "mal_refresh_token": 1, "mal_token_expires_at": 1, "mal_id": 1}
|
| )
|
| if not user or not user.get("mal_access_token"):
|
| return None
|
| return {
|
| "access_token": user["mal_access_token"],
|
| "refresh_token": user.get("mal_refresh_token"),
|
| "expires_at": user.get("mal_token_expires_at", 0),
|
| "mal_id": user.get("mal_id"),
|
| }
|
| except Exception as e:
|
| logger.error(f"Error getting MAL tokens for user {user_id}: {e}")
|
| return None
|
|
|
|
|
| def update_mal_tokens(user_id: int, access_token: str, refresh_token: str, expires_in: int) -> bool:
|
| """Update MAL tokens after a refresh."""
|
| try:
|
| result = users_collection.update_one(
|
| {"_id": user_id},
|
| {"$set": {
|
| "mal_access_token": access_token,
|
| "mal_refresh_token": refresh_token,
|
| "mal_token_expires_at": _time.time() + expires_in,
|
| "updated_at": datetime.utcnow(),
|
| }}
|
| )
|
| if result.modified_count > 0:
|
| clear_user_cache_func(user_id)
|
| return True
|
| except Exception as e:
|
| logger.error(f"Error updating MAL tokens for user {user_id}: {e}")
|
| return False
|
|
|
|
|