import os from typing import Any from motor.motor_asyncio import AsyncIOMotorClient from utils.logger import log class Database: def __init__(self): self.client = None self.db = None async def init_db(self): await self.connect() try: await self.db.command("ping") log("✅ Database connection established and verified to 'username_sniper'.") except Exception as e: raise ConnectionError(f"🚨 Database failed to ping: {e}") async def connect(self): uri = os.environ.get("MONGO_URI") if not uri: raise ValueError("MONGO_URI not set.") self.client = AsyncIOMotorClient(uri) self.db = self.client["username_sniper"] # --- SETTINGS MANAGER --- async def load_settings(self, state_dict: dict): if self.db is None: return doc = await self.db.settings.find_one({"_id": "bot_settings"}) if not doc: return keys = [ "checking_active", "sniping_active", "cycle_paused", "check_interval", "snipe_interval", "autoclaim_on", "db_filter_on", "total_checked", "total_available", "total_claimed", "channel_claim_priority", "preferred_channel_account", "watchdog_poll_interval", "fragment_sniping_active", "fragment_autoclaim_on", "fragment_workers", "custom_claim_message", "mute_minor_alerts", "smart_menu_on", "camper_interval", "custom_claim_gif", "custom_success_text" ] for key in keys: if key in doc: state_dict[key] = doc[key] async def save_setting(self, key: str, value): if self.db is not None: await self.db.settings.update_one({"_id": "bot_settings"}, {"$set": {key: value}}, upsert=True) # --- ADMIN MANAGER --- async def add_admin(self, user_id: int): if self.db is not None: await self.db.admins.update_one({"user_id": user_id}, {"$set": {"user_id": user_id}}, upsert=True) async def get_admins(self) -> set: if self.db is None: return set() cursor = self.db.admins.find({}, {"user_id": 1}) docs = await cursor.to_list(length=None) return {doc["user_id"] for doc in docs} # --- UNAVAILABLE SET --- async def get_unavailable_set(self) -> set: if self.db is None: return set() cursor = self.db.unavailable.find({}, {"username": 1}) docs = await cursor.to_list(length=None) return {doc["username"] for doc in docs} async def add_unavailable(self, username: str): if self.db is not None: await self.db.unavailable.update_one({"username": username}, {"$set": {"username": username}}, upsert=True) async def remove_unavailable(self, username: str): if self.db is not None: await self.db.unavailable.delete_one({"username": username}) async def clear_unavailable(self): if self.db is not None: await self.db.unavailable.delete_many({}) # --- CUSTOM SNIPES --- async def get_custom_snipes(self) -> list: if self.db is None: return [] cursor = self.db.custom_snipes.find({}, {"username": 1}) docs = await cursor.to_list(length=None) return [doc["username"] for doc in docs] async def add_custom_snipe(self, username: str): if self.db is not None: await self.db.custom_snipes.update_one({"username": username}, {"$set": {"username": username}}, upsert=True) # ⚡ NEW: Atomic Bulk Custom Snipe Write async def add_custom_snipe_bulk(self, usernames: list): if self.db is not None and usernames: from pymongo import UpdateOne operations = [UpdateOne({"username": u}, {"$set": {"username": u}}, upsert=True) for u in usernames] for i in range(0, len(operations), 10000): await self.db.custom_snipes.bulk_write(operations[i:i+10000]) async def remove_custom_snipe(self, username: str): if self.db is not None: await self.db.custom_snipes.delete_one({"username": username}) # --- WATCHDOG --- async def get_watchdog(self) -> dict: if self.db is None: return {} cursor = self.db.watchdog.find({}, {"username": 1, "added_by": 1}) docs = await cursor.to_list(length=None) return {doc["username"]: doc.get("added_by") for doc in docs} async def add_watchdog(self, username: str, user_id: Any): if self.db is not None: await self.db.watchdog.update_one({"username": username}, {"$set": {"username": username, "added_by": user_id}}, upsert=True) async def remove_watchdog(self, username: str): if self.db is not None: await self.db.watchdog.delete_one({"username": username}) # --- FRAGMENT CHECK --- async def get_fragcheck(self) -> list: if self.db is None: return [] cursor = self.db.fragcheck.find({}, {"username": 1}) docs = await cursor.to_list(length=None) return [doc["username"] for doc in docs] async def add_fragcheck(self, username: str): if self.db is not None: await self.db.fragcheck.update_one({"username": username}, {"$set": {"username": username}}, upsert=True) # ⚡ NEW: Atomic Bulk Fragment Write async def add_fragcheck_bulk(self, usernames: list): if self.db is not None and usernames: from pymongo import UpdateOne operations = [UpdateOne({"username": u}, {"$set": {"username": u}}, upsert=True) for u in usernames] for i in range(0, len(operations), 10000): await self.db.fragcheck.bulk_write(operations[i:i+10000]) async def remove_fragcheck(self, username: str): if self.db is not None: await self.db.fragcheck.delete_one({"username": username}) db = Database()