anicove2 / api /models /admin.py
mwask's picture
Upload 124 files
19ecc0d verified
Raw
History Blame Contribute Delete
20.2 kB
"""
Admin & Moderation model layer.
Handles reports, audit logs, dashboard stats, and role management.
"""
from datetime import datetime, timezone, timedelta
from bson import ObjectId
from pymongo import ASCENDING, DESCENDING
from ..core.db_connector import (
users_collection,
comments_collection,
reports_collection,
audit_log_collection,
)
# ─────────────────────────────────────────────────────────────────────────────
# Constants
# ─────────────────────────────────────────────────────────────────────────────
VALID_ROLES = ("user", "mod", "admin")
REPORT_REASONS = (
"spam", "harassment", "nsfw", "hate_speech", "misinformation", "other"
)
REPORT_STATUSES = ("pending", "resolved", "ignored")
MAX_REPORTS_PAGE = 50
MAX_USERS_PAGE = 50
MAX_LOGS_PAGE = 50
_indexes_ready = False
def _ensure_indexes():
global _indexes_ready
if _indexes_ready:
return
try:
reports_collection.create_index(
[("status", ASCENDING), ("created_at", DESCENDING)],
name="reports_status_date",
)
reports_collection.create_index(
[("comment_id", ASCENDING)],
name="reports_comment",
)
reports_collection.create_index(
[("reported_user_id", ASCENDING)],
name="reports_reported_user",
)
audit_log_collection.create_index(
[("created_at", DESCENDING)],
name="audit_log_date",
)
audit_log_collection.create_index(
[("actor_id", ASCENDING)],
name="audit_log_actor",
)
except Exception:
pass
_indexes_ready = True
def utcnow():
return datetime.now(timezone.utc)
def iso(dt):
if not dt:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat()
# ─────────────────────────────────────────────────────────────────────────────
# Role helpers
# ─────────────────────────────────────────────────────────────────────────────
def get_user_role(user_id):
"""Return the role string for a user (defaults to 'user')."""
user = users_collection.find_one({"_id": user_id}, {"role": 1})
if not user:
return None
return user.get("role", "user")
def set_user_role(user_id, new_role):
"""Set a user's role. Returns True on success."""
if new_role not in VALID_ROLES:
return False
result = users_collection.update_one(
{"_id": user_id},
{"$set": {"role": new_role, "updated_at": utcnow()}},
)
return result.modified_count > 0
def is_staff(user_id):
"""Check if a user is mod or admin."""
role = get_user_role(user_id)
return role in ("mod", "admin")
def is_admin(user_id):
"""Check if a user is admin."""
return get_user_role(user_id) == "admin"
def can_moderate(role):
"""Check if a role has moderation permissions."""
return role in ("mod", "admin")
# ─────────────────────────────────────────────────────────────────────────────
# Ban / Mute
# ─────────────────────────────────────────────────────────────────────────────
def ban_user(user_id):
"""Ban a user."""
result = users_collection.update_one(
{"_id": user_id},
{"$set": {"is_banned": True, "updated_at": utcnow()}},
)
return result.modified_count > 0
def unban_user(user_id):
"""Unban a user."""
result = users_collection.update_one(
{"_id": user_id},
{"$set": {"is_banned": False, "updated_at": utcnow()}},
)
return result.modified_count > 0
def mute_user(user_id, duration_hours=24):
"""Mute a user for a duration."""
muted_until = utcnow() + timedelta(hours=duration_hours)
result = users_collection.update_one(
{"_id": user_id},
{"$set": {"muted_until": muted_until, "updated_at": utcnow()}},
)
return result.modified_count > 0
def unmute_user(user_id):
"""Remove mute from a user."""
result = users_collection.update_one(
{"_id": user_id},
{"$unset": {"muted_until": ""}, "$set": {"updated_at": utcnow()}},
)
return result.modified_count > 0
def is_user_banned(user_id):
"""Check if a user is banned."""
user = users_collection.find_one({"_id": user_id}, {"is_banned": 1})
return bool(user and user.get("is_banned"))
def is_user_muted(user_id):
"""Check if a user is currently muted."""
user = users_collection.find_one({"_id": user_id}, {"muted_until": 1})
if not user or not user.get("muted_until"):
return False
muted_until = user["muted_until"]
if muted_until.tzinfo is None:
muted_until = muted_until.replace(tzinfo=timezone.utc)
return utcnow() < muted_until
# ─────────────────────────────────────────────────────────────────────────────
# Reports
# ─────────────────────────────────────────────────────────────────────────────
def create_report(
comment_id,
reported_user_id,
reported_username,
reporter_id,
reporter_username,
reason,
comment_body="",
anime_id="",
episode_number=0,
details="",
):
"""Create a new report on a comment."""
_ensure_indexes()
if reason not in REPORT_REASONS:
reason = "other"
doc = {
"comment_id": str(comment_id),
"reported_user_id": str(reported_user_id),
"reported_username": reported_username or "Unknown",
"reporter_id": str(reporter_id),
"reporter_username": reporter_username or "Unknown",
"reason": reason,
"details": (details or "")[:500],
"comment_body": (comment_body or "")[:1000],
"anime_id": str(anime_id),
"episode_number": int(episode_number) if episode_number else 0,
"status": "pending",
"moderator_id": None,
"moderator_username": None,
"moderator_note": None,
"action_taken": None,
"created_at": utcnow(),
"resolved_at": None,
}
# Bump report_count on the comment
try:
comments_collection.update_one(
{"_id": ObjectId(comment_id)},
{"$inc": {"report_count": 1}},
)
except Exception:
pass
result = reports_collection.insert_one(doc)
doc["_id"] = result.inserted_id
return serialize_report(doc)
def get_reports(status=None, reason=None, page=1, limit=MAX_REPORTS_PAGE):
"""Get paginated reports with optional filters."""
_ensure_indexes()
query = {}
if status and status in REPORT_STATUSES:
query["status"] = status
if reason and reason in REPORT_REASONS:
query["reason"] = reason
skip = max(0, (page - 1) * limit)
total = reports_collection.count_documents(query)
docs = list(
reports_collection.find(query)
.sort("created_at", DESCENDING)
.skip(skip)
.limit(limit)
)
return {
"reports": [serialize_report(d) for d in docs],
"total": total,
"page": page,
"pages": max(1, (total + limit - 1) // limit),
}
def get_report(report_id):
"""Get a single report by ID."""
try:
doc = reports_collection.find_one({"_id": ObjectId(report_id)})
return serialize_report(doc) if doc else None
except Exception:
return None
def resolve_report(report_id, moderator_id, moderator_username, action, note=""):
"""Resolve a report with an action."""
try:
oid = ObjectId(report_id)
except Exception:
return None
update = {
"$set": {
"status": "resolved",
"moderator_id": str(moderator_id),
"moderator_username": moderator_username,
"moderator_note": (note or "")[:500],
"action_taken": action,
"resolved_at": utcnow(),
}
}
doc = reports_collection.find_one_and_update(
{"_id": oid},
update,
return_document=True,
)
return serialize_report(doc) if doc else None
def ignore_report(report_id, moderator_id, moderator_username, note=""):
"""Ignore a report."""
try:
oid = ObjectId(report_id)
except Exception:
return None
update = {
"$set": {
"status": "ignored",
"moderator_id": str(moderator_id),
"moderator_username": moderator_username,
"moderator_note": (note or "")[:500],
"action_taken": "ignored",
"resolved_at": utcnow(),
}
}
doc = reports_collection.find_one_and_update(
{"_id": oid},
update,
return_document=True,
)
return serialize_report(doc) if doc else None
def get_report_counts():
"""Get report status counts."""
pipeline = [
{"$group": {"_id": "$status", "count": {"$sum": 1}}},
]
results = list(reports_collection.aggregate(pipeline))
counts = {"pending": 0, "resolved": 0, "ignored": 0, "total": 0}
for r in results:
status = r["_id"]
if status in counts:
counts[status] = r["count"]
counts["total"] += r["count"]
return counts
def serialize_report(doc):
"""Serialize a report document."""
if not doc:
return None
return {
"_id": str(doc["_id"]),
"comment_id": doc.get("comment_id"),
"reported_user_id": doc.get("reported_user_id"),
"reported_username": doc.get("reported_username", "Unknown"),
"reporter_id": doc.get("reporter_id"),
"reporter_username": doc.get("reporter_username", "Unknown"),
"reason": doc.get("reason", "other"),
"details": doc.get("details", ""),
"comment_body": doc.get("comment_body", ""),
"anime_id": doc.get("anime_id", ""),
"episode_number": doc.get("episode_number", 0),
"status": doc.get("status", "pending"),
"moderator_id": doc.get("moderator_id"),
"moderator_username": doc.get("moderator_username"),
"moderator_note": doc.get("moderator_note"),
"action_taken": doc.get("action_taken"),
"created_at": iso(doc.get("created_at")),
"resolved_at": iso(doc.get("resolved_at")),
}
# ─────────────────────────────────────────────────────────────────────────────
# Audit Log
# ─────────────────────────────────────────────────────────────────────────────
def log_action(actor_id, actor_username, action, target_id=None, target_username=None, details=""):
"""Log a moderation action."""
_ensure_indexes()
doc = {
"actor_id": str(actor_id),
"actor_username": actor_username or "Unknown",
"action": action,
"target_id": str(target_id) if target_id else None,
"target_username": target_username,
"details": (details or "")[:500],
"created_at": utcnow(),
}
audit_log_collection.insert_one(doc)
return doc
def get_audit_logs(page=1, limit=MAX_LOGS_PAGE, actor_id=None):
"""Get paginated audit logs."""
_ensure_indexes()
query = {}
if actor_id:
query["actor_id"] = str(actor_id)
skip = max(0, (page - 1) * limit)
total = audit_log_collection.count_documents(query)
docs = list(
audit_log_collection.find(query)
.sort("created_at", DESCENDING)
.skip(skip)
.limit(limit)
)
return {
"logs": [serialize_log(d) for d in docs],
"total": total,
"page": page,
"pages": max(1, (total + limit - 1) // limit),
}
def serialize_log(doc):
if not doc:
return None
return {
"_id": str(doc["_id"]),
"actor_id": doc.get("actor_id"),
"actor_username": doc.get("actor_username"),
"action": doc.get("action"),
"target_id": doc.get("target_id"),
"target_username": doc.get("target_username"),
"details": doc.get("details", ""),
"created_at": iso(doc.get("created_at")),
}
# ─────────────────────────────────────────────────────────────────────────────
# Dashboard stats
# ─────────────────────────────────────────────────────────────────────────────
def get_dashboard_stats():
"""Get dashboard statistics for the admin panel."""
now = utcnow()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
week_ago = now - timedelta(days=7)
total_users = users_collection.count_documents({})
new_users_today = users_collection.count_documents({"created_at": {"$gte": today_start}})
new_users_week = users_collection.count_documents({"created_at": {"$gte": week_ago}})
banned_users = users_collection.count_documents({"is_banned": True})
total_comments = comments_collection.count_documents({"deleted": False})
report_counts = get_report_counts()
# Role distribution
role_pipeline = [
{"$group": {"_id": {"$ifNull": ["$role", "user"]}, "count": {"$sum": 1}}},
]
role_results = list(users_collection.aggregate(role_pipeline))
roles = {"user": 0, "mod": 0, "admin": 0}
for r in role_results:
role = r["_id"]
if role in roles:
roles[role] = r["count"]
# Recent users
recent_users = list(
users_collection.find({}, {"password": 0, "anilist_access_token": 0, "mal_access_token": 0, "mal_refresh_token": 0})
.sort("created_at", DESCENDING)
.limit(10)
)
# Recent mod actions
recent_logs = list(
audit_log_collection.find()
.sort("created_at", DESCENDING)
.limit(10)
)
# Signups per day (last 7 days)
signup_pipeline = [
{"$match": {"created_at": {"$gte": week_ago}}},
{
"$group": {
"_id": {
"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}
},
"count": {"$sum": 1},
}
},
{"$sort": {"_id": 1}},
]
signup_chart = list(users_collection.aggregate(signup_pipeline))
return {
"total_users": total_users,
"new_users_today": new_users_today,
"new_users_week": new_users_week,
"banned_users": banned_users,
"total_comments": total_comments,
"reports": report_counts,
"roles": roles,
"recent_users": [_serialize_user_brief(u) for u in recent_users],
"recent_logs": [serialize_log(l) for l in recent_logs],
"signup_chart": signup_chart,
}
def _serialize_user_brief(user):
"""Serialize a user document for admin display."""
if not user:
return None
return {
"_id": str(user["_id"]),
"username": user.get("username", "Unknown"),
"email": user.get("email", ""),
"avatar": user.get("avatar"),
"role": user.get("role", "user"),
"is_banned": bool(user.get("is_banned")),
"muted_until": iso(user.get("muted_until")),
"auth_method": user.get("auth_method", "local"),
"created_at": iso(user.get("created_at")),
}
# ─────────────────────────────────────────────────────────────────────────────
# User search / management for admin
# ─────────────────────────────────────────────────────────────────────────────
def search_users_admin(query="", role_filter=None, page=1, limit=MAX_USERS_PAGE):
"""Search users with filters for admin panel."""
mongo_query = {}
if query:
# Try numeric ID search
try:
numeric_id = int(query)
mongo_query["_id"] = numeric_id
except (ValueError, TypeError):
mongo_query["$or"] = [
{"username": {"$regex": query, "$options": "i"}},
{"email": {"$regex": query, "$options": "i"}},
]
if role_filter and role_filter in VALID_ROLES:
if role_filter == "user":
# Users without role field or with role="user"
mongo_query["$and"] = mongo_query.get("$and", []) + [
{"$or": [{"role": "user"}, {"role": {"$exists": False}}]}
]
else:
mongo_query["role"] = role_filter
skip = max(0, (page - 1) * limit)
total = users_collection.count_documents(mongo_query)
docs = list(
users_collection.find(
mongo_query,
{"password": 0, "anilist_access_token": 0, "mal_access_token": 0, "mal_refresh_token": 0},
)
.sort("created_at", DESCENDING)
.skip(skip)
.limit(limit)
)
return {
"users": [_serialize_user_brief(u) for u in docs],
"total": total,
"page": page,
"pages": max(1, (total + limit - 1) // limit),
}
def get_user_admin_detail(user_id):
"""Get full user detail for admin panel."""
try:
uid = int(user_id)
except (ValueError, TypeError):
return None
user = users_collection.find_one(
{"_id": uid},
{"password": 0, "anilist_access_token": 0, "mal_access_token": 0, "mal_refresh_token": 0},
)
if not user:
return None
# Count user's comments
comment_count = comments_collection.count_documents(
{"author_id": str(uid), "deleted": False}
)
# Count reports against this user
reports_against = reports_collection.count_documents(
{"reported_user_id": str(uid)}
)
detail = _serialize_user_brief(user)
detail["comment_count"] = comment_count
detail["reports_against"] = reports_against
detail["anilist_id"] = user.get("anilist_id")
detail["mal_id"] = user.get("mal_id")
detail["mal_username"] = user.get("mal_username")
return detail