Spaces:
Running
Running
| """ | |
| project/database.py | |
| MongoDB persistence layer with Borg singleton pattern. | |
| Collections: | |
| evidence β scraped text for FAISS/NLI pipeline | |
| users β registered accounts (hashed + peppered passwords) | |
| history β per-user claim history | |
| revoked_tokens β JWT blocklist (TTL-indexed, self-cleaning) | |
| """ | |
| from datetime import datetime, timedelta, timezone | |
| from bson import ObjectId | |
| from pymongo import MongoClient, ASCENDING, DESCENDING | |
| import certifi | |
| from project import config | |
| # ββ Borg Singleton βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # All DatabaseManager instances share the same __dict__, giving us a singleton | |
| # without a class-level lock β safe for Flask's threaded/forked model. | |
| class DatabaseManager: | |
| _shared_state: dict = {} | |
| def __init__(self): | |
| self.__dict__ = self._shared_state | |
| def _connect(self): | |
| if getattr(self, '_db', None) is None: | |
| self._client = MongoClient( | |
| config.MONGO_URI, | |
| serverSelectionTimeoutMS=5000, | |
| connectTimeoutMS=5000, | |
| tlsCAFile=certifi.where(), | |
| tlsAllowInvalidCertificates=True | |
| ) | |
| self._db = self._client[config.MONGO_DB_NAME] | |
| return self._db | |
| def db(self): | |
| return self._connect() | |
| _manager = DatabaseManager() | |
| def get_db(): | |
| return _manager.db | |
| # ββ Initialise indexes βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def init_db(): | |
| db = get_db() | |
| # evidence: TTL β auto-removes docs older than 30 days | |
| db.evidence.create_index( | |
| [("created_at", ASCENDING)], | |
| expireAfterSeconds=30 * 24 * 3600, | |
| background=True, | |
| name="evidence_ttl" | |
| ) | |
| db.evidence.create_index([("source", ASCENDING)], background=True, name="source_idx") | |
| # users: unique email | |
| db.users.create_index([("email", ASCENDING)], unique=True, background=True, name="email_unique") | |
| db.users.create_index([("username", ASCENDING)], background=True, name="username_idx") | |
| # history: fast per-user lookup, newest first | |
| db.history.create_index( | |
| [("user_id", ASCENDING), ("created_at", DESCENDING)], | |
| background=True, | |
| name="user_history_idx" | |
| ) | |
| # revoked_tokens: TTL auto-removes expired JTIs | |
| db.revoked_tokens.create_index( | |
| [("exp", ASCENDING)], | |
| expireAfterSeconds=0, | |
| background=True, | |
| name="token_ttl" | |
| ) | |
| db.revoked_tokens.create_index([("jti", ASCENDING)], unique=True, background=True, name="jti_unique") | |
| # cached_results: exact claim cache, indexed by normalized claim | |
| db.cached_results.create_index([("normalized_claim", ASCENDING)], unique=True, background=True, name="claim_cache_idx") | |
| print("[DB] MongoDB indexes ensured.") | |
| # ββ Evidence helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def clear_db(): | |
| get_db().evidence.delete_many({}) | |
| def save_evidence(text, source, embedding=None): | |
| try: | |
| get_db().evidence.insert_one({ | |
| "text": text, | |
| "source": source, | |
| "embedding": embedding, # Store the vector list directly | |
| "created_at": datetime.now(timezone.utc) | |
| }) | |
| except Exception as e: | |
| print(f"[DB] save_evidence error: {e}") | |
| def load_all_evidence(): | |
| """Returns list of (id, text, source, embedding) β same shape the FAISS pipeline expects.""" | |
| docs = list(get_db().evidence.find({}, {"_id": 1, "text": 1, "source": 1, "embedding": 1})) | |
| return [(str(d["_id"]), d["text"], d["source"], d.get("embedding")) for d in docs] | |
| def get_total_evidence_count(): | |
| return get_db().evidence.count_documents({}) | |
| def prune_old_evidence(days=30): | |
| cutoff = datetime.now(timezone.utc) - timedelta(days=days) | |
| result = get_db().evidence.delete_many({"created_at": {"$lt": cutoff}}) | |
| return result.deleted_count | |
| # ββ User helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_user(username, email, password_hash, is_admin=False): | |
| """Returns inserted ObjectId string, or None on duplicate/error.""" | |
| try: | |
| result = get_db().users.insert_one({ | |
| "username": username, | |
| "email": email, | |
| "password_hash": password_hash, | |
| "is_admin": is_admin, | |
| "created_at": datetime.now(timezone.utc) | |
| }) | |
| return str(result.inserted_id) | |
| except Exception as e: | |
| # Avoid logging email β privacy | |
| print(f"[DB] create_user error: {type(e).__name__}") | |
| return None | |
| def find_user_by_email(email): | |
| return get_db().users.find_one({"email": email}) | |
| def find_user_by_id(user_id): | |
| try: | |
| return get_db().users.find_one({"_id": ObjectId(user_id)}) | |
| except Exception: | |
| return None | |
| # ββ JWT Blocklist helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_token_to_blocklist(jti: str, exp: datetime): | |
| """Persist a revoked JWT jti. TTL index auto-removes it after exp.""" | |
| try: | |
| get_db().revoked_tokens.insert_one({ | |
| "jti": jti, | |
| "exp": exp, | |
| "revoked_at": datetime.now(timezone.utc) | |
| }) | |
| except Exception as e: | |
| print(f"[DB] add_token_to_blocklist error: {type(e).__name__}") | |
| def is_token_revoked(jti: str) -> bool: | |
| return get_db().revoked_tokens.find_one({"jti": jti}) is not None | |
| # ββ History helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_history(user_id, claim, verdict, confidence, evidence_count): | |
| try: | |
| get_db().history.insert_one({ | |
| "user_id": user_id, | |
| "claim": claim, | |
| "verdict": verdict, | |
| "confidence": confidence, | |
| "evidence_count": evidence_count, | |
| "created_at": datetime.now(timezone.utc) | |
| }) | |
| except Exception as e: | |
| print(f"[DB] save_history error: {type(e).__name__}") | |
| def get_user_history(user_id, limit=50): | |
| return list( | |
| get_db().history | |
| .find({"user_id": user_id}) | |
| .sort("created_at", DESCENDING) | |
| .limit(limit) | |
| ) | |
| def delete_history_item(user_id, item_id): | |
| try: | |
| result = get_db().history.delete_one({ | |
| "_id": ObjectId(item_id), | |
| "user_id": user_id | |
| }) | |
| return result.deleted_count == 1 | |
| except Exception as e: | |
| print(f"[DB] delete_history_item error: {type(e).__name__}") | |
| return False | |
| def clear_user_history(user_id): | |
| try: | |
| result = get_db().history.delete_many({"user_id": user_id}) | |
| return result.deleted_count | |
| except Exception as e: | |
| print(f"[DB] clear_user_history error: {type(e).__name__}") | |
| return 0 | |
| # ββ EXACT CLAIM CACHE ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _normalize_claim(claim: str) -> str: | |
| """Lowercase and strip whitespace for exact matching.""" | |
| return claim.strip().lower() | |
| def get_cached_result(claim: str) -> dict: | |
| """Returns the cached fully structured API dictionary if it exists.""" | |
| norm = _normalize_claim(claim) | |
| try: | |
| doc = get_db().cached_results.find_one({"normalized_claim": norm}) | |
| if doc and "result" in doc: | |
| return doc["result"] | |
| except Exception as e: | |
| print(f"[DB] get_cached_result error: {type(e).__name__}") | |
| return None | |
| def save_cached_result(claim: str, result: dict): | |
| """Saves a successful API run structure into the cache.""" | |
| if not result.get("success"): | |
| return | |
| norm = _normalize_claim(claim) | |
| try: | |
| get_db().cached_results.update_one( | |
| {"normalized_claim": norm}, | |
| { | |
| "$set": { | |
| "result": result, | |
| "updated_at": datetime.now(timezone.utc) | |
| }, | |
| "$setOnInsert": { | |
| "created_at": datetime.now(timezone.utc) | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| except Exception as e: | |
| print(f"[DB] save_cached_result error: {type(e).__name__}") | |
| # ββ ADMIN HELPERS ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_system_stats(): | |
| """Aggregate high-level system metrics + chart data for God Mode.""" | |
| db = get_db() | |
| try: | |
| total_users = db.users.count_documents({}) | |
| total_checks = db.history.count_documents({}) | |
| total_evidence = db.evidence.count_documents({}) | |
| total_cached = db.cached_results.count_documents({}) | |
| day_ago = datetime.now(timezone.utc) - timedelta(days=1) | |
| recent_checks = db.history.count_documents({"created_at": {"$gt": day_ago}}) | |
| # ββ Verdict breakdown (last 500 checks) ββββββββββββββββββββββββββββββ | |
| verdict_pipeline = [ | |
| {"$group": {"_id": "$verdict", "count": {"$sum": 1}}}, | |
| ] | |
| verdict_raw = list(db.history.aggregate(verdict_pipeline)) | |
| verdict_counts = {r["_id"]: r["count"] for r in verdict_raw} | |
| # ββ Daily checks β last 7 days ββββββββββββββββββββββββββββββββββββββ | |
| seven_days_ago = datetime.now(timezone.utc) - timedelta(days=7) | |
| daily_pipeline = [ | |
| {"$match": {"created_at": {"$gt": seven_days_ago}}}, | |
| {"$group": { | |
| "_id": { | |
| "year": {"$year": "$created_at"}, | |
| "month": {"$month": "$created_at"}, | |
| "day": {"$dayOfMonth": "$created_at"}, | |
| }, | |
| "count": {"$sum": 1} | |
| }}, | |
| {"$sort": {"_id.year": 1, "_id.month": 1, "_id.day": 1}} | |
| ] | |
| daily_raw = list(db.history.aggregate(daily_pipeline)) | |
| # Build a filled 7-day array (fill missing days with 0) | |
| daily_map = {} | |
| for r in daily_raw: | |
| d = r["_id"] | |
| key = f"{d['year']}-{str(d['month']).zfill(2)}-{str(d['day']).zfill(2)}" | |
| daily_map[key] = r["count"] | |
| daily_labels, daily_data = [], [] | |
| for i in range(6, -1, -1): | |
| day = datetime.now(timezone.utc) - timedelta(days=i) | |
| label = day.strftime("%b %d") | |
| key = day.strftime("%Y-%m-%d") | |
| daily_labels.append(label) | |
| daily_data.append(daily_map.get(key, 0)) | |
| # ββ Top 5 users by check count ββββββββββββββββββββββββββββββββββββ | |
| top_users_pipeline = [ | |
| {"$group": {"_id": "$user_id", "count": {"$sum": 1}}}, | |
| {"$sort": {"count": -1}}, | |
| {"$limit": 5}, | |
| ] | |
| top_users_raw = list(db.history.aggregate(top_users_pipeline)) | |
| top_users = [] | |
| for r in top_users_raw: | |
| user = find_user_by_id(r["_id"]) | |
| top_users.append({ | |
| "username": user.get("username", "Unknown") if user else "Unknown", | |
| "count": r["count"] | |
| }) | |
| cache_hit_rate = round((total_cached / total_checks * 100), 1) if total_checks else 0 | |
| return { | |
| "total_users": total_users, | |
| "total_checks": total_checks, | |
| "total_evidence": total_evidence, | |
| "total_cached": total_cached, | |
| "recent_checks_24h": recent_checks, | |
| "cache_hit_rate": cache_hit_rate, | |
| "verdict_counts": verdict_counts, | |
| "daily_labels": daily_labels, | |
| "daily_data": daily_data, | |
| "top_users": top_users, | |
| } | |
| except Exception as e: | |
| print(f"[DB] get_system_stats error: {e}") | |
| return {} | |
| def list_all_users(limit=100): | |
| """Returns all users for the admin dashboard.""" | |
| return list(get_db().users.find({}, {"password_hash": 0}).sort("created_at", DESCENDING).limit(limit)) | |
| def get_global_history(limit=500): | |
| """Returns the most recent fact checks across all users, enriched with usernames.""" | |
| history = list(get_db().history.find({}).sort("created_at", DESCENDING).limit(limit)) | |
| user_map = {} | |
| for h in history: | |
| uid = h.get("user_id") | |
| if uid not in user_map: | |
| user = find_user_by_id(uid) | |
| user_map[uid] = user.get("username", "Unknown") if user else "Unknown" | |
| h["username"] = user_map[uid] | |
| return history | |