""" Bug Reports model layer. Handles all MongoDB operations for the bug/issue reporting system. """ from datetime import datetime, timezone from bson import ObjectId from pymongo import DESCENDING from ..core.db_connector import bug_reports_collection def _ensure_indexes(): try: bug_reports_collection.create_index([("created_at", DESCENDING)]) bug_reports_collection.create_index([("status", DESCENDING)]) bug_reports_collection.create_index([("author_id", DESCENDING)]) except Exception: pass _ensure_indexes() def _serialize_report(doc): """Convert a MongoDB report document into a JSON-serialisable dict.""" if not doc: return None admin_replies = [] for r in doc.get("admin_replies", []): admin_replies.append({ "_id": str(r.get("_id", ObjectId())), "body": r.get("body", ""), "created_at": r["created_at"].isoformat() if r.get("created_at") else None, "is_broadcast": r.get("is_broadcast", False), "type": "admin", }) user_replies = [] for r in doc.get("user_replies", []): user_replies.append({ "_id": str(r.get("_id", ObjectId())), "body": r.get("body", ""), "created_at": r["created_at"].isoformat() if r.get("created_at") else None, "type": "user", }) return { "_id": str(doc["_id"]), "title": doc.get("title", ""), "body": doc.get("body", ""), "category": doc.get("category", "bug"), "page_url": doc.get("page_url", ""), "author": doc.get("author", "Anonymous"), "author_id": str(doc["author_id"]) if doc.get("author_id") is not None else None, "author_avatar": doc.get("author_avatar"), "status": doc.get("status", "open"), "created_at": doc["created_at"].isoformat() if doc.get("created_at") else None, "updated_at": doc["updated_at"].isoformat() if doc.get("updated_at") else None, "admin_replies": admin_replies, "user_replies": user_replies, "admin_reply_count": len(admin_replies), "user_reply_count": len(user_replies), "reply_count": len(admin_replies) + len(user_replies), } def create_report(title, body, category, author, author_id, author_avatar=None, page_url=None): """Create a new bug report.""" doc = { "title": title.strip(), "body": body.strip(), "category": category or "bug", "page_url": (page_url or "").strip(), "author": author, "author_id": str(author_id) if author_id is not None else None, "author_avatar": author_avatar, "status": "open", "created_at": datetime.now(timezone.utc), "updated_at": datetime.now(timezone.utc), "admin_replies": [], "user_replies": [], } result = bug_reports_collection.insert_one(doc) doc["_id"] = result.inserted_id return _serialize_report(doc) def get_reports(page=1, per_page=20, status=None, author_id=None): """Get bug reports with pagination and optional filtering.""" query = {} if status: query["status"] = status if author_id: query["author_id"] = str(author_id) total = bug_reports_collection.count_documents(query) skip = (page - 1) * per_page cursor = bug_reports_collection.find(query).sort("created_at", DESCENDING).skip(skip).limit(per_page) reports = [_serialize_report(doc) for doc in cursor] return reports, total def get_report_by_id(report_id): """Get a single report by its ID.""" try: oid = ObjectId(report_id) except Exception: return None doc = bug_reports_collection.find_one({"_id": oid}) return _serialize_report(doc) def add_admin_reply(report_id, body, is_broadcast=False): """Add an admin reply to a bug report.""" try: oid = ObjectId(report_id) except Exception: return False reply = { "_id": ObjectId(), "body": body.strip(), "created_at": datetime.now(timezone.utc), "is_broadcast": is_broadcast, } result = bug_reports_collection.update_one( {"_id": oid}, { "$push": {"admin_replies": reply}, "$set": { "status": "in_progress", "updated_at": datetime.now(timezone.utc), }, } ) return result.modified_count > 0 def add_broadcast_reply(report_ids, body): """Add the same admin reply to multiple reports (group reply).""" reply = { "_id": ObjectId(), "body": body.strip(), "created_at": datetime.now(timezone.utc), "is_broadcast": True, } oids = [] for rid in report_ids: try: oids.append(ObjectId(rid)) except Exception: continue if not oids: return False result = bug_reports_collection.update_many( {"_id": {"$in": oids}}, { "$push": {"admin_replies": reply}, "$set": { "status": "in_progress", "updated_at": datetime.now(timezone.utc), }, } ) return result.modified_count > 0 def update_report_status(report_id, new_status): """Update the status of a bug report.""" try: oid = ObjectId(report_id) except Exception: return False valid_statuses = ("open", "in_progress", "resolved", "closed") if new_status not in valid_statuses: return False result = bug_reports_collection.update_one( {"_id": oid}, {"$set": {"status": new_status, "updated_at": datetime.now(timezone.utc)}} ) return result.modified_count > 0 def add_user_reply(report_id, author, author_id, body): """Add a user reply to their own bug report. Verifies the user owns the report.""" try: oid = ObjectId(report_id) except Exception: return False reply = { "_id": ObjectId(), "author": author, "author_id": str(author_id) if author_id is not None else None, "body": body.strip(), "created_at": datetime.now(timezone.utc), } result = bug_reports_collection.update_one( {"_id": oid}, { "$push": {"user_replies": reply}, "$set": { "updated_at": datetime.now(timezone.utc), }, } ) return result.modified_count > 0 def get_stats(): """Get bug report statistics for the admin panel.""" total = bug_reports_collection.count_documents({}) open_count = bug_reports_collection.count_documents({"status": "open"}) in_progress_count = bug_reports_collection.count_documents({"status": "in_progress"}) resolved_count = bug_reports_collection.count_documents({"status": "resolved"}) return { "total": total, "open": open_count, "in_progress": in_progress_count, "resolved": resolved_count, }