| """
|
| Admin & Moderation model layer.
|
| Handles reports, audit logs, dashboard stats, and role management.
|
| """
|
| from datetime import datetime, timezone, timedelta
|
| from bson import ObjectId
|
| from pymongo import ASCENDING, DESCENDING
|
|
|
| from ..core.db_connector import (
|
| users_collection,
|
| comments_collection,
|
| reports_collection,
|
| audit_log_collection,
|
| )
|
|
|
|
|
|
|
|
|
|
|
| VALID_ROLES = ("user", "mod", "admin")
|
| REPORT_REASONS = (
|
| "spam", "harassment", "nsfw", "hate_speech", "misinformation", "other"
|
| )
|
| REPORT_STATUSES = ("pending", "resolved", "ignored")
|
| MAX_REPORTS_PAGE = 50
|
| MAX_USERS_PAGE = 50
|
| MAX_LOGS_PAGE = 50
|
|
|
| _indexes_ready = False
|
|
|
|
|
| def _ensure_indexes():
|
| global _indexes_ready
|
| if _indexes_ready:
|
| return
|
| try:
|
| reports_collection.create_index(
|
| [("status", ASCENDING), ("created_at", DESCENDING)],
|
| name="reports_status_date",
|
| )
|
| reports_collection.create_index(
|
| [("comment_id", ASCENDING)],
|
| name="reports_comment",
|
| )
|
| reports_collection.create_index(
|
| [("reported_user_id", ASCENDING)],
|
| name="reports_reported_user",
|
| )
|
| audit_log_collection.create_index(
|
| [("created_at", DESCENDING)],
|
| name="audit_log_date",
|
| )
|
| audit_log_collection.create_index(
|
| [("actor_id", ASCENDING)],
|
| name="audit_log_actor",
|
| )
|
| except Exception:
|
| pass
|
| _indexes_ready = True
|
|
|
|
|
| def utcnow():
|
| return datetime.now(timezone.utc)
|
|
|
|
|
| def iso(dt):
|
| if not dt:
|
| return None
|
| if dt.tzinfo is None:
|
| dt = dt.replace(tzinfo=timezone.utc)
|
| return dt.isoformat()
|
|
|
|
|
|
|
|
|
|
|
|
|
| def get_user_role(user_id):
|
| """Return the role string for a user (defaults to 'user')."""
|
| user = users_collection.find_one({"_id": user_id}, {"role": 1})
|
| if not user:
|
| return None
|
| return user.get("role", "user")
|
|
|
|
|
| def set_user_role(user_id, new_role):
|
| """Set a user's role. Returns True on success."""
|
| if new_role not in VALID_ROLES:
|
| return False
|
| result = users_collection.update_one(
|
| {"_id": user_id},
|
| {"$set": {"role": new_role, "updated_at": utcnow()}},
|
| )
|
| return result.modified_count > 0
|
|
|
|
|
| def is_staff(user_id):
|
| """Check if a user is mod or admin."""
|
| role = get_user_role(user_id)
|
| return role in ("mod", "admin")
|
|
|
|
|
| def is_admin(user_id):
|
| """Check if a user is admin."""
|
| return get_user_role(user_id) == "admin"
|
|
|
|
|
| def can_moderate(role):
|
| """Check if a role has moderation permissions."""
|
| return role in ("mod", "admin")
|
|
|
|
|
|
|
|
|
|
|
|
|
| def ban_user(user_id):
|
| """Ban a user."""
|
| result = users_collection.update_one(
|
| {"_id": user_id},
|
| {"$set": {"is_banned": True, "updated_at": utcnow()}},
|
| )
|
| return result.modified_count > 0
|
|
|
|
|
| def unban_user(user_id):
|
| """Unban a user."""
|
| result = users_collection.update_one(
|
| {"_id": user_id},
|
| {"$set": {"is_banned": False, "updated_at": utcnow()}},
|
| )
|
| return result.modified_count > 0
|
|
|
|
|
| def mute_user(user_id, duration_hours=24):
|
| """Mute a user for a duration."""
|
| muted_until = utcnow() + timedelta(hours=duration_hours)
|
| result = users_collection.update_one(
|
| {"_id": user_id},
|
| {"$set": {"muted_until": muted_until, "updated_at": utcnow()}},
|
| )
|
| return result.modified_count > 0
|
|
|
|
|
| def unmute_user(user_id):
|
| """Remove mute from a user."""
|
| result = users_collection.update_one(
|
| {"_id": user_id},
|
| {"$unset": {"muted_until": ""}, "$set": {"updated_at": utcnow()}},
|
| )
|
| return result.modified_count > 0
|
|
|
|
|
| def is_user_banned(user_id):
|
| """Check if a user is banned."""
|
| user = users_collection.find_one({"_id": user_id}, {"is_banned": 1})
|
| return bool(user and user.get("is_banned"))
|
|
|
|
|
| def is_user_muted(user_id):
|
| """Check if a user is currently muted."""
|
| user = users_collection.find_one({"_id": user_id}, {"muted_until": 1})
|
| if not user or not user.get("muted_until"):
|
| return False
|
| muted_until = user["muted_until"]
|
| if muted_until.tzinfo is None:
|
| muted_until = muted_until.replace(tzinfo=timezone.utc)
|
| return utcnow() < muted_until
|
|
|
|
|
|
|
|
|
|
|
|
|
| def create_report(
|
| comment_id,
|
| reported_user_id,
|
| reported_username,
|
| reporter_id,
|
| reporter_username,
|
| reason,
|
| comment_body="",
|
| anime_id="",
|
| episode_number=0,
|
| details="",
|
| ):
|
| """Create a new report on a comment."""
|
| _ensure_indexes()
|
| if reason not in REPORT_REASONS:
|
| reason = "other"
|
|
|
| doc = {
|
| "comment_id": str(comment_id),
|
| "reported_user_id": str(reported_user_id),
|
| "reported_username": reported_username or "Unknown",
|
| "reporter_id": str(reporter_id),
|
| "reporter_username": reporter_username or "Unknown",
|
| "reason": reason,
|
| "details": (details or "")[:500],
|
| "comment_body": (comment_body or "")[:1000],
|
| "anime_id": str(anime_id),
|
| "episode_number": int(episode_number) if episode_number else 0,
|
| "status": "pending",
|
| "moderator_id": None,
|
| "moderator_username": None,
|
| "moderator_note": None,
|
| "action_taken": None,
|
| "created_at": utcnow(),
|
| "resolved_at": None,
|
| }
|
|
|
|
|
| try:
|
| comments_collection.update_one(
|
| {"_id": ObjectId(comment_id)},
|
| {"$inc": {"report_count": 1}},
|
| )
|
| except Exception:
|
| pass
|
|
|
| result = reports_collection.insert_one(doc)
|
| doc["_id"] = result.inserted_id
|
| return serialize_report(doc)
|
|
|
|
|
| def get_reports(status=None, reason=None, page=1, limit=MAX_REPORTS_PAGE):
|
| """Get paginated reports with optional filters."""
|
| _ensure_indexes()
|
| query = {}
|
| if status and status in REPORT_STATUSES:
|
| query["status"] = status
|
| if reason and reason in REPORT_REASONS:
|
| query["reason"] = reason
|
|
|
| skip = max(0, (page - 1) * limit)
|
| total = reports_collection.count_documents(query)
|
| docs = list(
|
| reports_collection.find(query)
|
| .sort("created_at", DESCENDING)
|
| .skip(skip)
|
| .limit(limit)
|
| )
|
| return {
|
| "reports": [serialize_report(d) for d in docs],
|
| "total": total,
|
| "page": page,
|
| "pages": max(1, (total + limit - 1) // limit),
|
| }
|
|
|
|
|
| def get_report(report_id):
|
| """Get a single report by ID."""
|
| try:
|
| doc = reports_collection.find_one({"_id": ObjectId(report_id)})
|
| return serialize_report(doc) if doc else None
|
| except Exception:
|
| return None
|
|
|
|
|
| def resolve_report(report_id, moderator_id, moderator_username, action, note=""):
|
| """Resolve a report with an action."""
|
| try:
|
| oid = ObjectId(report_id)
|
| except Exception:
|
| return None
|
|
|
| update = {
|
| "$set": {
|
| "status": "resolved",
|
| "moderator_id": str(moderator_id),
|
| "moderator_username": moderator_username,
|
| "moderator_note": (note or "")[:500],
|
| "action_taken": action,
|
| "resolved_at": utcnow(),
|
| }
|
| }
|
| doc = reports_collection.find_one_and_update(
|
| {"_id": oid},
|
| update,
|
| return_document=True,
|
| )
|
| return serialize_report(doc) if doc else None
|
|
|
|
|
| def ignore_report(report_id, moderator_id, moderator_username, note=""):
|
| """Ignore a report."""
|
| try:
|
| oid = ObjectId(report_id)
|
| except Exception:
|
| return None
|
|
|
| update = {
|
| "$set": {
|
| "status": "ignored",
|
| "moderator_id": str(moderator_id),
|
| "moderator_username": moderator_username,
|
| "moderator_note": (note or "")[:500],
|
| "action_taken": "ignored",
|
| "resolved_at": utcnow(),
|
| }
|
| }
|
| doc = reports_collection.find_one_and_update(
|
| {"_id": oid},
|
| update,
|
| return_document=True,
|
| )
|
| return serialize_report(doc) if doc else None
|
|
|
|
|
| def get_report_counts():
|
| """Get report status counts."""
|
| pipeline = [
|
| {"$group": {"_id": "$status", "count": {"$sum": 1}}},
|
| ]
|
| results = list(reports_collection.aggregate(pipeline))
|
| counts = {"pending": 0, "resolved": 0, "ignored": 0, "total": 0}
|
| for r in results:
|
| status = r["_id"]
|
| if status in counts:
|
| counts[status] = r["count"]
|
| counts["total"] += r["count"]
|
| return counts
|
|
|
|
|
| def serialize_report(doc):
|
| """Serialize a report document."""
|
| if not doc:
|
| return None
|
| return {
|
| "_id": str(doc["_id"]),
|
| "comment_id": doc.get("comment_id"),
|
| "reported_user_id": doc.get("reported_user_id"),
|
| "reported_username": doc.get("reported_username", "Unknown"),
|
| "reporter_id": doc.get("reporter_id"),
|
| "reporter_username": doc.get("reporter_username", "Unknown"),
|
| "reason": doc.get("reason", "other"),
|
| "details": doc.get("details", ""),
|
| "comment_body": doc.get("comment_body", ""),
|
| "anime_id": doc.get("anime_id", ""),
|
| "episode_number": doc.get("episode_number", 0),
|
| "status": doc.get("status", "pending"),
|
| "moderator_id": doc.get("moderator_id"),
|
| "moderator_username": doc.get("moderator_username"),
|
| "moderator_note": doc.get("moderator_note"),
|
| "action_taken": doc.get("action_taken"),
|
| "created_at": iso(doc.get("created_at")),
|
| "resolved_at": iso(doc.get("resolved_at")),
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
| def log_action(actor_id, actor_username, action, target_id=None, target_username=None, details=""):
|
| """Log a moderation action."""
|
| _ensure_indexes()
|
| doc = {
|
| "actor_id": str(actor_id),
|
| "actor_username": actor_username or "Unknown",
|
| "action": action,
|
| "target_id": str(target_id) if target_id else None,
|
| "target_username": target_username,
|
| "details": (details or "")[:500],
|
| "created_at": utcnow(),
|
| }
|
| audit_log_collection.insert_one(doc)
|
| return doc
|
|
|
|
|
| def get_audit_logs(page=1, limit=MAX_LOGS_PAGE, actor_id=None):
|
| """Get paginated audit logs."""
|
| _ensure_indexes()
|
| query = {}
|
| if actor_id:
|
| query["actor_id"] = str(actor_id)
|
|
|
| skip = max(0, (page - 1) * limit)
|
| total = audit_log_collection.count_documents(query)
|
| docs = list(
|
| audit_log_collection.find(query)
|
| .sort("created_at", DESCENDING)
|
| .skip(skip)
|
| .limit(limit)
|
| )
|
| return {
|
| "logs": [serialize_log(d) for d in docs],
|
| "total": total,
|
| "page": page,
|
| "pages": max(1, (total + limit - 1) // limit),
|
| }
|
|
|
|
|
| def serialize_log(doc):
|
| if not doc:
|
| return None
|
| return {
|
| "_id": str(doc["_id"]),
|
| "actor_id": doc.get("actor_id"),
|
| "actor_username": doc.get("actor_username"),
|
| "action": doc.get("action"),
|
| "target_id": doc.get("target_id"),
|
| "target_username": doc.get("target_username"),
|
| "details": doc.get("details", ""),
|
| "created_at": iso(doc.get("created_at")),
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
| def get_dashboard_stats():
|
| """Get dashboard statistics for the admin panel."""
|
| now = utcnow()
|
| today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
| week_ago = now - timedelta(days=7)
|
|
|
| total_users = users_collection.count_documents({})
|
| new_users_today = users_collection.count_documents({"created_at": {"$gte": today_start}})
|
| new_users_week = users_collection.count_documents({"created_at": {"$gte": week_ago}})
|
| banned_users = users_collection.count_documents({"is_banned": True})
|
| total_comments = comments_collection.count_documents({"deleted": False})
|
| report_counts = get_report_counts()
|
|
|
|
|
| role_pipeline = [
|
| {"$group": {"_id": {"$ifNull": ["$role", "user"]}, "count": {"$sum": 1}}},
|
| ]
|
| role_results = list(users_collection.aggregate(role_pipeline))
|
| roles = {"user": 0, "mod": 0, "admin": 0}
|
| for r in role_results:
|
| role = r["_id"]
|
| if role in roles:
|
| roles[role] = r["count"]
|
|
|
|
|
| recent_users = list(
|
| users_collection.find({}, {"password": 0, "anilist_access_token": 0, "mal_access_token": 0, "mal_refresh_token": 0})
|
| .sort("created_at", DESCENDING)
|
| .limit(10)
|
| )
|
|
|
|
|
| recent_logs = list(
|
| audit_log_collection.find()
|
| .sort("created_at", DESCENDING)
|
| .limit(10)
|
| )
|
|
|
|
|
| signup_pipeline = [
|
| {"$match": {"created_at": {"$gte": week_ago}}},
|
| {
|
| "$group": {
|
| "_id": {
|
| "$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}
|
| },
|
| "count": {"$sum": 1},
|
| }
|
| },
|
| {"$sort": {"_id": 1}},
|
| ]
|
| signup_chart = list(users_collection.aggregate(signup_pipeline))
|
|
|
| return {
|
| "total_users": total_users,
|
| "new_users_today": new_users_today,
|
| "new_users_week": new_users_week,
|
| "banned_users": banned_users,
|
| "total_comments": total_comments,
|
| "reports": report_counts,
|
| "roles": roles,
|
| "recent_users": [_serialize_user_brief(u) for u in recent_users],
|
| "recent_logs": [serialize_log(l) for l in recent_logs],
|
| "signup_chart": signup_chart,
|
| }
|
|
|
|
|
| def _serialize_user_brief(user):
|
| """Serialize a user document for admin display."""
|
| if not user:
|
| return None
|
| return {
|
| "_id": str(user["_id"]),
|
| "username": user.get("username", "Unknown"),
|
| "email": user.get("email", ""),
|
| "avatar": user.get("avatar"),
|
| "role": user.get("role", "user"),
|
| "is_banned": bool(user.get("is_banned")),
|
| "muted_until": iso(user.get("muted_until")),
|
| "auth_method": user.get("auth_method", "local"),
|
| "created_at": iso(user.get("created_at")),
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
| def search_users_admin(query="", role_filter=None, page=1, limit=MAX_USERS_PAGE):
|
| """Search users with filters for admin panel."""
|
| mongo_query = {}
|
|
|
| if query:
|
|
|
| try:
|
| numeric_id = int(query)
|
| mongo_query["_id"] = numeric_id
|
| except (ValueError, TypeError):
|
| mongo_query["$or"] = [
|
| {"username": {"$regex": query, "$options": "i"}},
|
| {"email": {"$regex": query, "$options": "i"}},
|
| ]
|
|
|
| if role_filter and role_filter in VALID_ROLES:
|
| if role_filter == "user":
|
|
|
| mongo_query["$and"] = mongo_query.get("$and", []) + [
|
| {"$or": [{"role": "user"}, {"role": {"$exists": False}}]}
|
| ]
|
| else:
|
| mongo_query["role"] = role_filter
|
|
|
| skip = max(0, (page - 1) * limit)
|
| total = users_collection.count_documents(mongo_query)
|
| docs = list(
|
| users_collection.find(
|
| mongo_query,
|
| {"password": 0, "anilist_access_token": 0, "mal_access_token": 0, "mal_refresh_token": 0},
|
| )
|
| .sort("created_at", DESCENDING)
|
| .skip(skip)
|
| .limit(limit)
|
| )
|
| return {
|
| "users": [_serialize_user_brief(u) for u in docs],
|
| "total": total,
|
| "page": page,
|
| "pages": max(1, (total + limit - 1) // limit),
|
| }
|
|
|
|
|
| def get_user_admin_detail(user_id):
|
| """Get full user detail for admin panel."""
|
| try:
|
| uid = int(user_id)
|
| except (ValueError, TypeError):
|
| return None
|
|
|
| user = users_collection.find_one(
|
| {"_id": uid},
|
| {"password": 0, "anilist_access_token": 0, "mal_access_token": 0, "mal_refresh_token": 0},
|
| )
|
| if not user:
|
| return None
|
|
|
|
|
| comment_count = comments_collection.count_documents(
|
| {"author_id": str(uid), "deleted": False}
|
| )
|
|
|
|
|
| reports_against = reports_collection.count_documents(
|
| {"reported_user_id": str(uid)}
|
| )
|
|
|
| detail = _serialize_user_brief(user)
|
| detail["comment_count"] = comment_count
|
| detail["reports_against"] = reports_against
|
| detail["anilist_id"] = user.get("anilist_id")
|
| detail["mal_id"] = user.get("mal_id")
|
| detail["mal_username"] = user.get("mal_username")
|
| return detail
|
|
|
|
|
|
|
|
|