| import os |
| from typing import Any |
| from motor.motor_asyncio import AsyncIOMotorClient |
| from utils.logger import log |
|
|
| class Database: |
| def __init__(self): |
| self.client = None |
| self.db = None |
|
|
| async def init_db(self): |
| await self.connect() |
| try: |
| await self.db.command("ping") |
| log("✅ Database connection established and verified to 'username_sniper'.") |
| except Exception as e: |
| raise ConnectionError(f"🚨 Database failed to ping: {e}") |
|
|
| async def connect(self): |
| uri = os.environ.get("MONGO_URI") |
| if not uri: |
| raise ValueError("MONGO_URI not set.") |
| self.client = AsyncIOMotorClient(uri) |
| self.db = self.client["username_sniper"] |
|
|
| |
| async def load_settings(self, state_dict: dict): |
| if self.db is None: return |
| doc = await self.db.settings.find_one({"_id": "bot_settings"}) |
| if not doc: return |
| |
| keys = [ |
| "checking_active", "sniping_active", "cycle_paused", |
| "check_interval", "snipe_interval", "autoclaim_on", "db_filter_on", |
| "total_checked", "total_available", "total_claimed", |
| "channel_claim_priority", "preferred_channel_account", |
| "watchdog_poll_interval", "fragment_sniping_active", |
| "fragment_autoclaim_on", "fragment_workers", "custom_claim_message", |
| "mute_minor_alerts", "smart_menu_on", "camper_interval", "custom_claim_gif", "custom_success_text" |
| ] |
| for key in keys: |
| if key in doc: |
| state_dict[key] = doc[key] |
|
|
| async def save_setting(self, key: str, value): |
| if self.db is not None: |
| await self.db.settings.update_one({"_id": "bot_settings"}, {"$set": {key: value}}, upsert=True) |
|
|
| |
| async def add_admin(self, user_id: int): |
| if self.db is not None: |
| await self.db.admins.update_one({"user_id": user_id}, {"$set": {"user_id": user_id}}, upsert=True) |
|
|
| async def get_admins(self) -> set: |
| if self.db is None: return set() |
| cursor = self.db.admins.find({}, {"user_id": 1}) |
| docs = await cursor.to_list(length=None) |
| return {doc["user_id"] for doc in docs} |
|
|
| |
| async def get_unavailable_set(self) -> set: |
| if self.db is None: return set() |
| cursor = self.db.unavailable.find({}, {"username": 1}) |
| docs = await cursor.to_list(length=None) |
| return {doc["username"] for doc in docs} |
|
|
| async def add_unavailable(self, username: str): |
| if self.db is not None: |
| await self.db.unavailable.update_one({"username": username}, {"$set": {"username": username}}, upsert=True) |
|
|
| async def remove_unavailable(self, username: str): |
| if self.db is not None: |
| await self.db.unavailable.delete_one({"username": username}) |
|
|
| async def clear_unavailable(self): |
| if self.db is not None: |
| await self.db.unavailable.delete_many({}) |
|
|
| |
| async def get_custom_snipes(self) -> list: |
| if self.db is None: return [] |
| cursor = self.db.custom_snipes.find({}, {"username": 1}) |
| docs = await cursor.to_list(length=None) |
| return [doc["username"] for doc in docs] |
|
|
| async def add_custom_snipe(self, username: str): |
| if self.db is not None: |
| await self.db.custom_snipes.update_one({"username": username}, {"$set": {"username": username}}, upsert=True) |
|
|
| |
| async def add_custom_snipe_bulk(self, usernames: list): |
| if self.db is not None and usernames: |
| from pymongo import UpdateOne |
| operations = [UpdateOne({"username": u}, {"$set": {"username": u}}, upsert=True) for u in usernames] |
| for i in range(0, len(operations), 10000): |
| await self.db.custom_snipes.bulk_write(operations[i:i+10000]) |
|
|
| async def remove_custom_snipe(self, username: str): |
| if self.db is not None: |
| await self.db.custom_snipes.delete_one({"username": username}) |
|
|
| |
| async def get_watchdog(self) -> dict: |
| if self.db is None: return {} |
| cursor = self.db.watchdog.find({}, {"username": 1, "added_by": 1}) |
| docs = await cursor.to_list(length=None) |
| return {doc["username"]: doc.get("added_by") for doc in docs} |
|
|
| async def add_watchdog(self, username: str, user_id: Any): |
| if self.db is not None: |
| await self.db.watchdog.update_one({"username": username}, {"$set": {"username": username, "added_by": user_id}}, upsert=True) |
|
|
| async def remove_watchdog(self, username: str): |
| if self.db is not None: |
| await self.db.watchdog.delete_one({"username": username}) |
|
|
| |
| async def get_fragcheck(self) -> list: |
| if self.db is None: return [] |
| cursor = self.db.fragcheck.find({}, {"username": 1}) |
| docs = await cursor.to_list(length=None) |
| return [doc["username"] for doc in docs] |
|
|
| async def add_fragcheck(self, username: str): |
| if self.db is not None: |
| await self.db.fragcheck.update_one({"username": username}, {"$set": {"username": username}}, upsert=True) |
|
|
| |
| async def add_fragcheck_bulk(self, usernames: list): |
| if self.db is not None and usernames: |
| from pymongo import UpdateOne |
| operations = [UpdateOne({"username": u}, {"$set": {"username": u}}, upsert=True) for u in usernames] |
| for i in range(0, len(operations), 10000): |
| await self.db.fragcheck.bulk_write(operations[i:i+10000]) |
|
|
| async def remove_fragcheck(self, username: str): |
| if self.db is not None: |
| await self.db.fragcheck.delete_one({"username": username}) |
|
|
| db = Database() |