dfr / bot /database.py
UNUSUALxd's picture
Update bot/database.py
da0675a verified
import os
from typing import Any
from motor.motor_asyncio import AsyncIOMotorClient
from utils.logger import log
class Database:
def __init__(self):
self.client = None
self.db = None
async def init_db(self):
await self.connect()
try:
await self.db.command("ping")
log("✅ Database connection established and verified to 'username_sniper'.")
except Exception as e:
raise ConnectionError(f"🚨 Database failed to ping: {e}")
async def connect(self):
uri = os.environ.get("MONGO_URI")
if not uri:
raise ValueError("MONGO_URI not set.")
self.client = AsyncIOMotorClient(uri)
self.db = self.client["username_sniper"]
# --- SETTINGS MANAGER ---
async def load_settings(self, state_dict: dict):
if self.db is None: return
doc = await self.db.settings.find_one({"_id": "bot_settings"})
if not doc: return
keys = [
"checking_active", "sniping_active", "cycle_paused",
"check_interval", "snipe_interval", "autoclaim_on", "db_filter_on",
"total_checked", "total_available", "total_claimed",
"channel_claim_priority", "preferred_channel_account",
"watchdog_poll_interval", "fragment_sniping_active",
"fragment_autoclaim_on", "fragment_workers", "custom_claim_message",
"mute_minor_alerts", "smart_menu_on", "camper_interval", "custom_claim_gif", "custom_success_text"
]
for key in keys:
if key in doc:
state_dict[key] = doc[key]
async def save_setting(self, key: str, value):
if self.db is not None:
await self.db.settings.update_one({"_id": "bot_settings"}, {"$set": {key: value}}, upsert=True)
# --- ADMIN MANAGER ---
async def add_admin(self, user_id: int):
if self.db is not None:
await self.db.admins.update_one({"user_id": user_id}, {"$set": {"user_id": user_id}}, upsert=True)
async def get_admins(self) -> set:
if self.db is None: return set()
cursor = self.db.admins.find({}, {"user_id": 1})
docs = await cursor.to_list(length=None)
return {doc["user_id"] for doc in docs}
# --- UNAVAILABLE SET ---
async def get_unavailable_set(self) -> set:
if self.db is None: return set()
cursor = self.db.unavailable.find({}, {"username": 1})
docs = await cursor.to_list(length=None)
return {doc["username"] for doc in docs}
async def add_unavailable(self, username: str):
if self.db is not None:
await self.db.unavailable.update_one({"username": username}, {"$set": {"username": username}}, upsert=True)
async def remove_unavailable(self, username: str):
if self.db is not None:
await self.db.unavailable.delete_one({"username": username})
async def clear_unavailable(self):
if self.db is not None:
await self.db.unavailable.delete_many({})
# --- CUSTOM SNIPES ---
async def get_custom_snipes(self) -> list:
if self.db is None: return []
cursor = self.db.custom_snipes.find({}, {"username": 1})
docs = await cursor.to_list(length=None)
return [doc["username"] for doc in docs]
async def add_custom_snipe(self, username: str):
if self.db is not None:
await self.db.custom_snipes.update_one({"username": username}, {"$set": {"username": username}}, upsert=True)
# ⚡ NEW: Atomic Bulk Custom Snipe Write
async def add_custom_snipe_bulk(self, usernames: list):
if self.db is not None and usernames:
from pymongo import UpdateOne
operations = [UpdateOne({"username": u}, {"$set": {"username": u}}, upsert=True) for u in usernames]
for i in range(0, len(operations), 10000):
await self.db.custom_snipes.bulk_write(operations[i:i+10000])
async def remove_custom_snipe(self, username: str):
if self.db is not None:
await self.db.custom_snipes.delete_one({"username": username})
# --- WATCHDOG ---
async def get_watchdog(self) -> dict:
if self.db is None: return {}
cursor = self.db.watchdog.find({}, {"username": 1, "added_by": 1})
docs = await cursor.to_list(length=None)
return {doc["username"]: doc.get("added_by") for doc in docs}
async def add_watchdog(self, username: str, user_id: Any):
if self.db is not None:
await self.db.watchdog.update_one({"username": username}, {"$set": {"username": username, "added_by": user_id}}, upsert=True)
async def remove_watchdog(self, username: str):
if self.db is not None:
await self.db.watchdog.delete_one({"username": username})
# --- FRAGMENT CHECK ---
async def get_fragcheck(self) -> list:
if self.db is None: return []
cursor = self.db.fragcheck.find({}, {"username": 1})
docs = await cursor.to_list(length=None)
return [doc["username"] for doc in docs]
async def add_fragcheck(self, username: str):
if self.db is not None:
await self.db.fragcheck.update_one({"username": username}, {"$set": {"username": username}}, upsert=True)
# ⚡ NEW: Atomic Bulk Fragment Write
async def add_fragcheck_bulk(self, usernames: list):
if self.db is not None and usernames:
from pymongo import UpdateOne
operations = [UpdateOne({"username": u}, {"$set": {"username": u}}, upsert=True) for u in usernames]
for i in range(0, len(operations), 10000):
await self.db.fragcheck.bulk_write(operations[i:i+10000])
async def remove_fragcheck(self, username: str):
if self.db is not None:
await self.db.fragcheck.delete_one({"username": username})
db = Database()