anicove2 / api /models /user.py
mwask's picture
Upload 124 files
19ecc0d verified
Raw
History Blame Contribute Delete
21.8 kB
import random
import time as _time
import re as _re
from datetime import datetime
from bcrypt import hashpw, gensalt, checkpw
import logging
from ..core.db_connector import users_collection
from ..core.caching import (
cache_result, cache_user_data, cache_login_data,
clear_user_cache as clear_user_cache_func,
USER_DATA_CACHE_DURATION, LOGIN_CACHE_DURATION
)
logger = logging.getLogger(__name__)
def generate_unique_id():
"""Generate a unique 6-digit ID for a user."""
while True:
_id = random.randint(100000, 999999)
if users_collection.find_one({"_id": _id}) is None:
return _id
def create_user(username, password, email=None):
"""Create a new user with a unique ID, including email support."""
_id = generate_unique_id()
hashed_password = hashpw(password.encode('utf-8'), gensalt())
user_doc = {
"_id": _id,
"username": username,
"password": hashed_password,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
# Add email if provided
if email:
user_doc["email"] = email
users_collection.insert_one(user_doc)
return _id # Return the new user's ID
def create_anilist_user(anilist_user_info, access_token):
"""Create a new user from AniList OAuth data."""
_id = generate_unique_id()
# Extract user information from AniList data and ensure no spaces
raw_username = anilist_user_info['name']
username = raw_username.replace(' ', '_')
# Ensure username uniqueness
base_name = username
counter = 1
while users_collection.find_one({"username": username}) is not None:
username = f"{base_name}_{counter}"
counter += 1
anilist_id = anilist_user_info['id']
avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium')
# Prepare statistics if available
stats = {}
if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']:
anime_stats = anilist_user_info['statistics']['anime']
stats = {
'anime_count': anime_stats.get('count', 0),
'mean_score': anime_stats.get('meanScore', 0),
'minutes_watched': anime_stats.get('minutesWatched', 0)
}
user_doc = {
"_id": _id,
"username": username,
"anilist_id": anilist_id,
"anilist_access_token": access_token,
"avatar": avatar,
"anilist_stats": stats,
"banner_image": anilist_user_info.get('bannerImage'),
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
"auth_method": "anilist"
}
users_collection.insert_one(user_doc)
return _id
def update_anilist_user(user_id, anilist_user_info, access_token):
"""Update existing user with latest AniList information."""
avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium')
# Prepare statistics if available
stats = {}
if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']:
anime_stats = anilist_user_info['statistics']['anime']
stats = {
'anime_count': anime_stats.get('count', 0),
'mean_score': anime_stats.get('meanScore', 0),
'minutes_watched': anime_stats.get('minutesWatched', 0)
}
update_doc = {
"$set": {
"anilist_access_token": access_token,
"avatar": avatar,
"anilist_stats": stats,
"banner_image": anilist_user_info.get('bannerImage'),
"updated_at": datetime.utcnow()
}
}
users_collection.update_one({"_id": user_id}, update_doc)
clear_user_cache_func(user_id)
return True
def get_user_by_anilist_id(anilist_id):
"""Get user by AniList ID."""
return users_collection.find_one({"anilist_id": anilist_id})
def get_user(username, password):
"""Retrieve a user by username and password."""
user = users_collection.find_one({"username": username})
if user and user.get('password') and checkpw(password.encode('utf-8'), user['password']):
return user
return None
@cache_user_data(duration=LOGIN_CACHE_DURATION)
def get_user_by_id(_id):
"""Get user by ID (cached for 1 hour)."""
return users_collection.find_one({"_id": _id})
def get_user_by_email(email):
"""Get user by email (case-insensitive)."""
return users_collection.find_one({"email": _re.compile(f'^{_re.escape(email)}$', _re.IGNORECASE)})
def user_exists(username):
"""Check if a user with the given username already exists."""
return users_collection.find_one({"username": username}) is not None
def email_exists(email):
"""Check if a user with the given email already exists."""
if not email:
return False
return users_collection.find_one({"email": email}) is not None
def update_user_avatar(_id, avatar_url):
"""Update user's avatar."""
users_collection.update_one(
{"_id": _id},
{
"$set": {
"avatar": avatar_url,
"updated_at": datetime.utcnow()
}
}
)
clear_user_cache_func(_id)
def update_user_email(_id, email):
"""Update user's email if it doesn't already exist."""
if email_exists(email):
return False # Email already taken
users_collection.update_one(
{"_id": _id},
{
"$set": {
"email": email,
"updated_at": datetime.utcnow()
}
}
)
clear_user_cache_func(_id)
return True
def change_password(_id, old_password, new_password):
"""Change user's password after verifying old password."""
user = get_user_by_id(_id)
if not user or not user.get('password'):
return False
# Verify old password
if not checkpw(old_password.encode('utf-8'), user['password']):
return False
# Hash new password
new_hashed_password = hashpw(new_password.encode('utf-8'), gensalt())
current_version = user.get('password_version', 0)
# Update password
users_collection.update_one(
{"_id": _id},
{
"$set": {
"password": new_hashed_password,
"password_version": current_version + 1,
"updated_at": datetime.utcnow()
}
}
)
clear_user_cache_func(_id)
return True
def delete_user(_id):
"""Delete a user by ID."""
result = users_collection.delete_one({"_id": _id})
return result.deleted_count > 0
def get_all_users():
"""Get all users (for admin purposes - exclude passwords)."""
return list(users_collection.find({}, {"password": 0}))
def get_user_count():
"""Get total number of users."""
return users_collection.count_documents({})
# Additional utility functions for user management
def search_users(query, limit=10):
"""Search users by username or email."""
search_filter = {
"$or": [
{"username": {"$regex": query, "$options": "i"}},
{"email": {"$regex": query, "$options": "i"}}
]
}
return list(users_collection.find(search_filter, {"password": 0}).limit(limit))
def get_recent_users(limit=10):
"""Get recently registered users."""
return list(users_collection.find({}, {"password": 0})
.sort("created_at", -1)
.limit(limit))
def link_anilist_to_existing_user(user_id, anilist_user_info, access_token):
"""
Link an AniList account to an existing user.
Args:
user_id (int): Existing user's internal ID (_id).
anilist_user_info (dict): AniList user information (from AniList API).
access_token (str): AniList OAuth access token.
Returns:
bool: True if updated successfully, False otherwise.
"""
user = get_user_by_id(user_id)
if not user:
return False # User not found
# Prevent linking if AniList ID is already linked to another account
existing = get_user_by_anilist_id(anilist_user_info['id'])
if existing and existing['_id'] != user_id:
return False # AniList account already linked elsewhere
avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium')
stats = {}
if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']:
anime_stats = anilist_user_info['statistics']['anime']
stats = {
'anime_count': anime_stats.get('count', 0),
'mean_score': anime_stats.get('meanScore', 0),
'minutes_watched': anime_stats.get('minutesWatched', 0)
}
update_doc = {
"$set": {
"anilist_id": anilist_user_info['id'],
"anilist_access_token": access_token,
"avatar": avatar,
"anilist_stats": stats,
"banner_image": anilist_user_info.get('bannerImage'),
"updated_at": datetime.utcnow(),
"auth_method": "anilist_linked"
}
}
users_collection.update_one({"_id": user_id}, update_doc)
return True
def unlink_anilist_from_user(user_id: str) -> bool:
"""Remove AniList credentials from a user."""
result = users_collection.update_one(
{"_id": user_id},
{"$unset": {
"anilist_access_token": "",
"anilist_refresh_token": "",
"anilist_expires_at": ""
}}
)
return result.modified_count > 0
def delete_anilist_data(user_id: int) -> bool:
"""Completely remove all AniList-related data from a user account."""
try:
# First, get the current user data to log what we're removing
user = get_user_by_id(user_id)
if not user:
logger.warning(f"User {user_id} not found when trying to delete AniList data")
return False
anilist_id = user.get('anilist_id')
username = user.get('username', 'Unknown')
# Remove all AniList-related fields from the user document
result = users_collection.update_one(
{"_id": user_id},
{
"$unset": {
"anilist_id": "",
"anilist_access_token": "",
"anilist_refresh_token": "",
"anilist_expires_at": "",
"anilist_stats": "",
"banner_image": "", # Remove AniList banner
},
"$set": {
"updated_at": datetime.utcnow(),
"auth_method": "local" # Reset to local authentication only
}
}
)
if result.modified_count > 0:
logger.info(f"Successfully deleted AniList data for user {username} (ID: {user_id}, AniList ID: {anilist_id})")
return True
else:
logger.warning(f"No AniList data found to delete for user {username} (ID: {user_id})")
# Return True anyway since the goal (no AniList data) is achieved
return True
except Exception as e:
logger.error(f"Error deleting AniList data for user {user_id}: {e}")
return False
def connect_anilist_to_user(user_id: int, anilist_user_info: dict, access_token: str) -> bool:
"""Connect an AniList account to an existing user."""
try:
user = get_user_by_id(user_id)
if not user:
logger.error(f"User {user_id} not found")
return False
# Check if AniList account is already connected to another user
existing = get_user_by_anilist_id(anilist_user_info['id'])
if existing and existing['_id'] != user_id:
logger.error(f"AniList account {anilist_user_info['id']} already connected to user {existing['_id']}")
return False
avatar = anilist_user_info.get('avatar', {}).get('large') or anilist_user_info.get('avatar', {}).get('medium')
stats = {}
if 'statistics' in anilist_user_info and 'anime' in anilist_user_info['statistics']:
anime_stats = anilist_user_info['statistics']['anime']
stats = {
'anime_count': anime_stats.get('count', 0),
'mean_score': anime_stats.get('meanScore', 0),
'minutes_watched': anime_stats.get('minutesWatched', 0)
}
update_doc = {
"$set": {
"anilist_id": anilist_user_info['id'],
"anilist_access_token": access_token,
"avatar": avatar,
"anilist_stats": stats,
"banner_image": anilist_user_info.get('bannerImage'),
"updated_at": datetime.utcnow(),
"auth_method": "anilist_linked"
}
}
result = users_collection.update_one({"_id": user_id}, update_doc)
if result.modified_count > 0:
logger.info(f"Successfully connected AniList account {anilist_user_info['id']} to user {user_id}")
clear_user_cache_func(user_id)
return True
else:
logger.error(f"Failed to update user {user_id} with AniList data")
return False
except Exception as e:
logger.error(f"Error connecting AniList account to user {user_id}: {e}")
return False
def get_anilist_connection_info(user_id: int) -> dict:
"""Get detailed AniList connection information for a user (cached)."""
return _get_anilist_connection_info_uncached(user_id)
@cache_user_data(duration=LOGIN_CACHE_DURATION)
def _get_anilist_connection_info_uncached(user_id: int) -> dict:
"""Internal uncached version of get_anilist_connection_info."""
try:
user = users_collection.find_one({"_id": user_id})
if not user:
return {'connected': False, 'error': 'User not found'}
is_connected = bool(user.get('anilist_id'))
if not is_connected:
return {
'connected': False,
'user_id': user_id,
'username': user.get('username')
}
return {
'connected': True,
'user_id': user_id,
'username': user.get('username'),
'anilist_id': user.get('anilist_id'),
'avatar': user.get('avatar'),
'anilist_stats': user.get('anilist_stats', {}),
'banner_image': user.get('banner_image'),
'auth_method': user.get('auth_method'),
'connected_at': user.get('updated_at'),
'has_access_token': bool(user.get('anilist_access_token'))
}
except Exception as e:
logger.error(f"Error getting AniList connection info for user {user_id}: {e}")
return {'connected': False, 'error': str(e)}
# ──────────────────────────────────────────────
# Password Reset (Forgot Password) helpers
# ──────────────────────────────────────────────
def store_reset_code(email: str, hashed_code: bytes, expires_at: datetime) -> bool:
"""Store a bcrypt-hashed reset code and expiry on the user document."""
result = users_collection.update_one(
{"email": email},
{"$set": {
"reset_code": hashed_code,
"reset_code_expires": expires_at,
"updated_at": datetime.utcnow()
}}
)
return result.modified_count > 0
def verify_reset_code(email: str, code: str) -> bool:
"""Check that *code* matches the stored hash and has not expired."""
user = users_collection.find_one(
{"email": email},
{"reset_code": 1, "reset_code_expires": 1}
)
if not user or not user.get("reset_code") or not user.get("reset_code_expires"):
return False
if datetime.utcnow() > user["reset_code_expires"]:
# Expired — clean up
clear_reset_code(email)
return False
return checkpw(code.encode("utf-8"), user["reset_code"])
def clear_reset_code(email: str) -> None:
"""Remove reset-code fields from the user document."""
users_collection.update_one(
{"email": email},
{"$unset": {"reset_code": "", "reset_code_expires": ""}}
)
def reset_password(email: str, new_password: str) -> bool:
"""Set a new password for the user identified by *email* (no old password needed)."""
hashed = hashpw(new_password.encode("utf-8"), gensalt())
user = users_collection.find_one({"email": email})
if not user:
return False
current_version = user.get('password_version', 0)
result = users_collection.update_one(
{"email": email},
{"$set": {
"password": hashed,
"password_version": current_version + 1,
"updated_at": datetime.utcnow()
}}
)
if result.modified_count > 0:
user = users_collection.find_one({"email": email}, {"_id": 1})
if user:
clear_user_cache_func(user["_id"])
clear_reset_code(email)
return True
return False
# ──────────────────────────────────────────────
# MyAnimeList integration helpers
# ──────────────────────────────────────────────
def connect_mal_to_user(user_id: int, mal_user_info: dict, access_token: str,
refresh_token: str, expires_in: int) -> bool:
"""Store MAL credentials on an existing user document."""
try:
update_doc = {
"$set": {
"mal_id": mal_user_info.get("id"),
"mal_username": mal_user_info.get("name"),
"mal_avatar": mal_user_info.get("picture"),
"mal_access_token": access_token,
"mal_refresh_token": refresh_token,
"mal_token_expires_at": _time.time() + expires_in,
"updated_at": datetime.utcnow(),
}
}
result = users_collection.update_one({"_id": user_id}, update_doc)
if result.modified_count > 0:
logger.info(f"MAL account {mal_user_info.get('id')} connected to user {user_id}")
clear_user_cache_func(user_id)
return True
return False
except Exception as e:
logger.error(f"Error connecting MAL to user {user_id}: {e}")
return False
def delete_mal_data(user_id: int) -> bool:
"""Remove all MAL-related fields from a user document."""
try:
result = users_collection.update_one(
{"_id": user_id},
{
"$unset": {
"mal_id": "",
"mal_username": "",
"mal_avatar": "",
"mal_access_token": "",
"mal_refresh_token": "",
"mal_token_expires_at": "",
},
"$set": {"updated_at": datetime.utcnow()},
}
)
if result.modified_count > 0:
logger.info(f"MAL data deleted for user {user_id}")
clear_user_cache_func(user_id)
return True
except Exception as e:
logger.error(f"Error deleting MAL data for user {user_id}: {e}")
return False
def get_mal_tokens(user_id: int) -> dict | None:
"""Return MAL tokens for a user, or None if not connected."""
try:
user = users_collection.find_one(
{"_id": user_id},
{"mal_access_token": 1, "mal_refresh_token": 1, "mal_token_expires_at": 1, "mal_id": 1}
)
if not user or not user.get("mal_access_token"):
return None
return {
"access_token": user["mal_access_token"],
"refresh_token": user.get("mal_refresh_token"),
"expires_at": user.get("mal_token_expires_at", 0),
"mal_id": user.get("mal_id"),
}
except Exception as e:
logger.error(f"Error getting MAL tokens for user {user_id}: {e}")
return None
def update_mal_tokens(user_id: int, access_token: str, refresh_token: str, expires_in: int) -> bool:
"""Update MAL tokens after a refresh."""
try:
result = users_collection.update_one(
{"_id": user_id},
{"$set": {
"mal_access_token": access_token,
"mal_refresh_token": refresh_token,
"mal_token_expires_at": _time.time() + expires_in,
"updated_at": datetime.utcnow(),
}}
)
if result.modified_count > 0:
clear_user_cache_func(user_id)
return True
except Exception as e:
logger.error(f"Error updating MAL tokens for user {user_id}: {e}")
return False