Spaces:
Running
Running
| # --- START OF FILE db.py --- | |
| import logging | |
| import asyncio | |
| import asyncpg | |
| import orjson as json | |
| from credentials import DATABASE_URL | |
| from bot_handlers.redis_manager import get_redis_client_for_user, get_redis_client_for_chat | |
| from datetime import datetime, timezone | |
| logger = logging.getLogger(__name__) | |
| # --- TABLE SETUP --- | |
| async def setup_database(pool): | |
| if not pool: return | |
| try: | |
| async with pool.acquire() as conn: | |
| # Users Table | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS users ( | |
| user_id BIGINT PRIMARY KEY, first_name TEXT, username TEXT, | |
| added_on TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, language TEXT DEFAULT 'en', | |
| has_started_bot BOOLEAN DEFAULT FALSE, | |
| message_count INTEGER DEFAULT 0, | |
| last_active_on TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, | |
| warning_count INTEGER DEFAULT 0, | |
| trust_score INTEGER DEFAULT 100 | |
| )""") | |
| # --- db.py এর setup_database ফাংশনের ভেতরে --- | |
| # মেম্বার ট্র্যাকিং টেবিল (নতুন যোগ করুন) | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS chat_members ( | |
| chat_id BIGINT, | |
| user_id BIGINT, | |
| last_seen TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, | |
| PRIMARY KEY (chat_id, user_id) | |
| )""") | |
| # ইনডেক্স যাতে সার্চ ফাস্ট হয় | |
| await conn.execute("CREATE INDEX IF NOT EXISTS idx_chat_members_chat_id ON chat_members (chat_id)") | |
| # Missing columns fix for users | |
| for col, dtype in [("message_count", "INTEGER DEFAULT 0"), ("last_active_on", "TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP"), ("warning_count", "INTEGER DEFAULT 0"), ("trust_score", "INTEGER DEFAULT 100")]: | |
| try: await conn.execute(f"ALTER TABLE users ADD COLUMN {col} {dtype}") | |
| except: pass | |
| # Group Settings Table | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS group_settings ( | |
| chat_id BIGINT PRIMARY KEY, forbidden_words JSONB DEFAULT '[]'::jsonb, | |
| anti_duplicate BOOLEAN DEFAULT FALSE, max_message_length INTEGER DEFAULT 0, | |
| auto_delete_seconds INTEGER DEFAULT 0, delete_join_messages BOOLEAN DEFAULT TRUE, | |
| block_channel_forwards BOOLEAN DEFAULT TRUE, enable_spamscan BOOLEAN DEFAULT FALSE, | |
| mute_on_link BOOLEAN DEFAULT FALSE, | |
| allow_admins_manage BOOLEAN DEFAULT FALSE, | |
| block_bio_links BOOLEAN DEFAULT TRUE, enable_ocr_scan BOOLEAN DEFAULT FALSE, | |
| allow_usernames BOOLEAN DEFAULT FALSE, welcome_data JSONB DEFAULT NULL, | |
| welcome_delete_seconds INTEGER DEFAULT 0, | |
| welcome_mode TEXT DEFAULT 'text', | |
| enable_sentiment BOOLEAN DEFAULT FALSE, | |
| enable_support_ticket BOOLEAN DEFAULT FALSE, | |
| enable_trust_score BOOLEAN DEFAULT FALSE, | |
| enable_traffic_control BOOLEAN DEFAULT FALSE, | |
| enable_smart_notes BOOLEAN DEFAULT FALSE, | |
| warn_limit INTEGER DEFAULT 0, | |
| warn_time_window INTEGER DEFAULT 10, | |
| warn_action TEXT DEFAULT 'mute' | |
| )""") | |
| # 🔥 FIX: Missing columns fix for group_settings (Auto-Warn Feature) | |
| # এটি পুরাতন ডাটাবেসে নতুন কলামগুলো যোগ করে দিবে যাতে /setwarn কাজ করে | |
| for col, dtype in [ | |
| ("warn_limit", "INTEGER DEFAULT 0"), | |
| ("warn_time_window", "INTEGER DEFAULT 10"), | |
| ("warn_action", "TEXT DEFAULT 'mute'") | |
| ]: | |
| try: await conn.execute(f"ALTER TABLE group_settings ADD COLUMN {col} {dtype}") | |
| except: pass | |
| # Filters & Notes | |
| await conn.execute("CREATE TABLE IF NOT EXISTS filters (chat_id BIGINT, keyword TEXT, reply_text TEXT, file_id TEXT, file_type TEXT, PRIMARY KEY (chat_id, keyword))") | |
| await conn.execute("CREATE TABLE IF NOT EXISTS notes (chat_id BIGINT, slug TEXT, content TEXT, file_id TEXT, file_type TEXT, PRIMARY KEY (chat_id, slug))") | |
| await conn.execute("CREATE TABLE IF NOT EXISTS chats (chat_id BIGINT PRIMARY KEY, added_on TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP)") | |
| # VIP SYSTEM | |
| await conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS vip_trials ( | |
| chat_id BIGINT, user_id BIGINT, expiry_date TIMESTAMPTZ, | |
| joined_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, is_safe BOOLEAN DEFAULT FALSE, | |
| PRIMARY KEY (chat_id, user_id) | |
| )""") | |
| try: await conn.execute("ALTER TABLE vip_trials ADD COLUMN is_safe BOOLEAN DEFAULT FALSE") | |
| except: pass | |
| # Indexes | |
| await conn.execute("CREATE INDEX IF NOT EXISTS idx_users_username ON users (username)") | |
| # 🔥🔥 Performance Fix Index 🔥🔥 | |
| await conn.execute("CREATE INDEX IF NOT EXISTS idx_vip_expiry ON vip_trials (expiry_date)") | |
| except Exception as e: logger.error(f"DB Setup Error: {e}") | |
| # --- 🔥 REDIS BUFFER FUNCTIONS (BATCH UPDATE) 🔥 --- | |
| async def buffer_user_stats(context, user_id, first_name, username): | |
| """ | |
| Redis-এ ইউজারের মেসেজ কাউন্ট এবং ইনফো বাফার করে রাখে। | |
| """ | |
| redis_clients = context.bot_data.get('redis_clients') | |
| # প্রথম Redis ক্লায়েন্ট ব্যবহার করা হচ্ছে | |
| redis = redis_clients[0] if redis_clients else None | |
| if not redis: return | |
| try: | |
| key = f"stats_buffer:{user_id}" | |
| async with redis.pipeline() as pipe: | |
| # ১. মেসেজ কাউন্ট ১ বাড়ান | |
| pipe.hincrby(key, "msg_count", 1) | |
| # ২. নাম এবং ইউজারনেম আপডেট করুন (শুধুমাত্র যদি non-empty হয়) | |
| if first_name: pipe.hset(key, "first_name", first_name) | |
| if username: pipe.hset(key, "username", username) | |
| # ৩. এই ইউজারকে 'dirty' লিস্টে রাখুন যাতে পরে DB তে সেভ করা যায় | |
| pipe.sadd("stats_dirty_users", user_id) | |
| # ৪. সেফটির জন্য এক্সপায়ার টাইম | |
| pipe.expire(key, 86400) | |
| await pipe.execute() | |
| except Exception as e: | |
| logger.error(f"Buffer Stats Error: {e}") | |
| # --- db.py ফাইলের flush_user_stats_to_db ফাংশনটি এভাবে পরিবর্তন করুন --- | |
| async def flush_user_stats_to_db(app): | |
| pool = app.bot_data.get('db_pool') | |
| redis_clients = app.bot_data.get('redis_clients') | |
| redis = redis_clients[0] if redis_clients else None | |
| if not pool or not redis: return | |
| try: | |
| # ১. সব 'dirty' ইউজার আইডি বের করা | |
| user_ids_raw = await redis.smembers("stats_dirty_users") | |
| if not user_ids_raw: return | |
| user_ids = [int(uid) for uid in user_ids_raw] | |
| batch_data = [] | |
| processed_users = [] | |
| for uid in user_ids: | |
| old_key = f"stats_buffer:{uid}" | |
| process_key = f"sync_temp:{uid}" | |
| # ২. কিউতে থাকা ডাটা মুভ করা (Isolation) | |
| try: | |
| # rename ব্যবহার করলে ওই মুহূর্তের ডাটা লক হয়ে যায় | |
| await redis.rename(old_key, process_key) | |
| except: | |
| continue # যদি কি না থাকে তবে স্কিপ | |
| data = await redis.hgetall(process_key) | |
| if data: | |
| msg_inc = int(data.get("msg_count", 0)) | |
| fname = data.get("first_name") | |
| uname = data.get("username") | |
| if msg_inc > 0 or fname or uname: | |
| batch_data.append((uid, fname, uname, msg_inc)) | |
| processed_users.append(uid) | |
| # কাজ শেষ, টেম্পোরারি কি ডিলিট | |
| await redis.delete(process_key) | |
| if batch_data: | |
| async with pool.acquire() as conn: | |
| await conn.executemany(""" | |
| INSERT INTO users (user_id, first_name, username, message_count, last_active_on) | |
| VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP) | |
| ON CONFLICT (user_id) DO UPDATE SET | |
| first_name = COALESCE(EXCLUDED.first_name, users.first_name), | |
| username = COALESCE(EXCLUDED.username, users.username), | |
| message_count = users.message_count + EXCLUDED.message_count, | |
| last_active_on = CURRENT_TIMESTAMP | |
| """, batch_data) | |
| # ৩. ডার্টি লিস্ট থেকে সফলভাবে সেভ হওয়া আইডিগুলো বাদ দেওয়া | |
| await redis.srem("stats_dirty_users", *processed_users) | |
| logger.info(f"✅ Flushed {len(batch_data)} users safely to DB.") | |
| except Exception as e: | |
| logger.error(f"DB Flush Error: {e}") | |
| # --- CORE DB FUNCTIONS --- | |
| async def _add_user_to_db_core(pool, user_id, first_name, username): | |
| # এখন এটি শুধু নতুন ইউজার চেক করার জন্য ব্যবহার হবে | |
| if not pool: return | |
| try: | |
| async with pool.acquire() as conn: | |
| await conn.execute(""" | |
| INSERT INTO users (user_id, first_name, username, last_active_on) | |
| VALUES ($1, $2, $3, CURRENT_TIMESTAMP) | |
| ON CONFLICT (user_id) DO NOTHING | |
| """, user_id, first_name, username) | |
| except: pass | |
| async def get_user_id_by_username(pool, username): | |
| if not pool: return None | |
| clean_username = username.strip().replace("@", "") | |
| try: | |
| async with pool.acquire() as conn: | |
| row = await conn.fetchrow("SELECT user_id FROM users WHERE LOWER(REPLACE(username, '@', '')) = LOWER($1)", clean_username) | |
| return row['user_id'] if row else None | |
| except: return None | |
| async def increment_user_message_count(pool, user_id): | |
| # This is now handled by buffer_user_stats, kept as fallback | |
| if not pool: return | |
| try: | |
| async with pool.acquire() as conn: | |
| await conn.execute("UPDATE users SET message_count = message_count + 1 WHERE user_id = $1", user_id) | |
| except: pass | |
| async def increment_user_warning_count(pool, user_id): | |
| if not pool: return | |
| try: | |
| async with pool.acquire() as conn: | |
| await conn.execute(""" | |
| INSERT INTO users (user_id, warning_count, trust_score, last_active_on) | |
| VALUES ($1, 1, 90, CURRENT_TIMESTAMP) | |
| ON CONFLICT (user_id) DO UPDATE SET | |
| warning_count = users.warning_count + 1, | |
| trust_score = GREATEST(0, users.trust_score - 10), | |
| last_active_on = CURRENT_TIMESTAMP | |
| """, user_id) | |
| except: pass | |
| async def get_user_stats(pool, user_id): | |
| if not pool: return None | |
| try: | |
| async with pool.acquire() as conn: | |
| return await conn.fetchrow("SELECT * FROM users WHERE user_id = $1", user_id) | |
| except: return None | |
| async def _add_chat_to_db_core(pool, chat_id: int): | |
| if not pool: return | |
| try: | |
| async with pool.acquire() as conn: | |
| await conn.execute("INSERT INTO chats (chat_id) VALUES ($1) ON CONFLICT DO NOTHING", chat_id) | |
| except: pass | |
| async def get_db_stats(pool): | |
| if not pool: return 0, 0, 0 | |
| try: | |
| async with pool.acquire() as conn: | |
| total_users = await conn.fetchval("SELECT COUNT(*) FROM users") | |
| total_groups = await conn.fetchval("SELECT COUNT(*) FROM chats") | |
| pm_users = await conn.fetchval("SELECT COUNT(*) FROM users WHERE has_started_bot = TRUE") | |
| return total_users, total_groups, pm_users | |
| except: return 0, 0, 0 | |
| async def get_user_lang_from_db(context, user_id): | |
| redis = get_redis_client_for_user(context) | |
| try: | |
| if l := await redis.get(f"user_lang:{user_id}"): return l | |
| except: pass | |
| pool = context.bot_data.get('db_pool') | |
| if not pool: return "en" | |
| try: | |
| async with pool.acquire() as conn: | |
| l = await conn.fetchval("SELECT language FROM users WHERE user_id = $1", user_id) | |
| if l: await redis.set(f"user_lang:{user_id}", l, ex=86400) | |
| return l or "en" | |
| except: return "en" | |
| async def get_all_settings(context, chat_id): | |
| redis = get_redis_client_for_chat(context, chat_id) | |
| pool = context.bot_data.get('db_pool') | |
| defaults = { | |
| 'forbidden_words': [], 'anti_duplicate': False, 'max_message_length': 0, | |
| 'auto_delete_seconds': 0, 'delete_join_messages': True, 'block_channel_forwards': True, | |
| 'enable_spamscan': False, 'mute_on_link': False, | |
| 'allow_admins_manage': False, | |
| 'block_bio_links': True, 'enable_ocr_scan': False, 'allow_usernames': False, 'welcome_data': None, | |
| 'welcome_delete_seconds': 0, 'welcome_mode': 'text', 'enable_sentiment': False, | |
| 'enable_support_ticket': False, 'enable_trust_score': False, 'enable_traffic_control': False, | |
| 'enable_smart_notes': False, | |
| 'warn_limit': 0, 'warn_time_window': 10, 'warn_action': 'mute' | |
| } | |
| try: | |
| if cached := await redis.get(f"group_settings:{chat_id}"): return json.loads(cached) | |
| except: pass | |
| if not pool: return defaults.copy() | |
| try: | |
| async with pool.acquire() as conn: | |
| row = await conn.fetchrow("SELECT * FROM group_settings WHERE chat_id = $1", chat_id) | |
| if not row: return defaults.copy() | |
| final = defaults.copy() | |
| final.update(dict(row)) | |
| if isinstance(final.get('forbidden_words'), str): final['forbidden_words'] = json.loads(final['forbidden_words']) | |
| if isinstance(final.get('welcome_data'), str): final['welcome_data'] = json.loads(final['welcome_data']) | |
| await redis.set(f"group_settings:{chat_id}", json.dumps(final).decode('utf-8'), ex=1800) | |
| return final | |
| except: return defaults.copy() | |
| async def invalidate_settings_cache(context, chat_id): | |
| try: await get_redis_client_for_chat(context, chat_id).delete(f"group_settings:{chat_id}") | |
| except: pass | |
| async def set_user_lang_in_db(context, user_id, lang): | |
| pool = context.bot_data.get('db_pool') | |
| if not pool: return | |
| try: | |
| async with pool.acquire() as conn: | |
| await conn.execute("INSERT INTO users (user_id) VALUES ($1) ON CONFLICT (user_id) DO NOTHING", user_id) | |
| await conn.execute("UPDATE users SET language = $1 WHERE user_id = $2", lang, user_id) | |
| await get_redis_client_for_user(context).set(f"user_lang:{user_id}", lang, ex=86400) | |
| except: pass | |
| async def update_setting_in_db(context, chat_id, key, value): | |
| pool = context.bot_data.get('db_pool') | |
| if not pool: return | |
| val = json.dumps(value).decode('utf-8') if isinstance(value, (list, dict)) else value | |
| try: | |
| async with pool.acquire() as conn: | |
| await conn.execute(f"INSERT INTO group_settings (chat_id, {key}) VALUES ($1, $2) ON CONFLICT (chat_id) DO UPDATE SET {key} = EXCLUDED.{key}", chat_id, val) | |
| await invalidate_settings_cache(context, chat_id) | |
| except: pass | |
| async def reset_settings_in_db(context, chat_id): | |
| pool = context.bot_data.get('db_pool') | |
| if not pool: return False | |
| try: | |
| async with pool.acquire() as conn: | |
| await conn.execute("DELETE FROM group_settings WHERE chat_id = $1", chat_id) | |
| await conn.execute("DELETE FROM filters WHERE chat_id = $1", chat_id) | |
| await conn.execute("DELETE FROM notes WHERE chat_id = $1", chat_id) | |
| await invalidate_settings_cache(context, chat_id) | |
| try: | |
| r = get_redis_client_for_chat(context, chat_id) | |
| await r.delete(f"filters:{chat_id}") | |
| await r.delete(f"notes:{chat_id}") | |
| except: pass | |
| return True | |
| except: return False | |
| async def save_note_to_db(context, chat_id, slug, content, file_id=None, file_type=None): | |
| pool = context.bot_data.get('db_pool') | |
| if not pool: return | |
| try: | |
| async with pool.acquire() as conn: | |
| await conn.execute(""" | |
| INSERT INTO notes (chat_id, slug, content, file_id, file_type) | |
| VALUES ($1, $2, $3, $4, $5) | |
| ON CONFLICT (chat_id, slug) DO UPDATE | |
| SET content = EXCLUDED.content, file_id = EXCLUDED.file_id, file_type = EXCLUDED.file_type | |
| """, chat_id, slug.lower(), content, file_id, file_type) | |
| await get_redis_client_for_chat(context, chat_id).delete(f"notes:{chat_id}") | |
| except: pass | |
| async def get_note_from_db(context, chat_id, slug): | |
| redis = get_redis_client_for_chat(context, chat_id) | |
| slug = slug.lower() | |
| try: | |
| if cached := await redis.hget(f"notes:{chat_id}", slug): return json.loads(cached) | |
| except: pass | |
| pool = context.bot_data.get('db_pool') | |
| if not pool: return None | |
| try: | |
| async with pool.acquire() as conn: | |
| r = await conn.fetchrow("SELECT content, file_id, file_type FROM notes WHERE chat_id = $1 AND slug = $2", chat_id, slug) | |
| if r: | |
| data = {'text': r['content'], 'file_id': r['file_id'], 'type': r['file_type']} | |
| # Updated: Added .decode('utf-8') to avoid bytes in string-mode Redis | |
| await redis.hset(f"notes:{chat_id}", slug, json.dumps(data).decode('utf-8')) | |
| await redis.expire(f"notes:{chat_id}", 3600) | |
| return data | |
| except: return None | |
| async def add_filter_to_db(context, chat_id, keyword, reply_text, file_id=None, file_type=None): | |
| pool = context.bot_data.get('db_pool') | |
| if not pool: return | |
| try: | |
| async with pool.acquire() as conn: | |
| await conn.execute(""" | |
| INSERT INTO filters (chat_id, keyword, reply_text, file_id, file_type) | |
| VALUES ($1, $2, $3, $4, $5) | |
| ON CONFLICT (chat_id, keyword) DO UPDATE | |
| SET reply_text = EXCLUDED.reply_text, file_id = EXCLUDED.file_id, file_type = EXCLUDED.file_type | |
| """, chat_id, keyword.lower(), reply_text, file_id, file_type) | |
| await get_redis_client_for_chat(context, chat_id).delete(f"filters:{chat_id}") | |
| except: pass | |
| async def remove_filter_from_db(context, chat_id, keyword): | |
| pool = context.bot_data.get('db_pool') | |
| if not pool: return False | |
| try: | |
| async with pool.acquire() as conn: | |
| res = await conn.execute("DELETE FROM filters WHERE chat_id = $1 AND keyword = $2", chat_id, keyword.lower()) | |
| await get_redis_client_for_chat(context, chat_id).delete(f"filters:{chat_id}") | |
| return "DELETE 0" not in res | |
| except: return False | |
| async def get_chat_filters(context, chat_id): | |
| redis = get_redis_client_for_chat(context, chat_id) | |
| try: | |
| if cached := await redis.get(f"filters:{chat_id}"): return json.loads(cached) | |
| except: pass | |
| pool = context.bot_data.get('db_pool') | |
| if not pool: return {} | |
| try: | |
| async with pool.acquire() as conn: | |
| rows = await conn.fetch("SELECT keyword, reply_text, file_id, file_type FROM filters WHERE chat_id = $1", chat_id) | |
| data = {r['keyword']: {'text': r['reply_text'], 'file_id': r['file_id'], 'type': r['file_type']} for r in rows} | |
| # Updated: Added .decode('utf-8') to avoid bytes in string-mode Redis | |
| await redis.set(f"filters:{chat_id}", json.dumps(data).decode('utf-8'), ex=3600) | |
| return data | |
| except: return {} | |
| async def get_all_group_ids(pool): | |
| if not pool: return [] | |
| try: | |
| async with pool.acquire() as conn: | |
| records = await conn.fetch("SELECT chat_id FROM chats") | |
| return [r['chat_id'] for r in records] | |
| except: return [] | |
| async def get_all_user_ids(pool): | |
| if not pool: return [] | |
| try: | |
| async with pool.acquire() as conn: | |
| records = await conn.fetch("SELECT user_id FROM users WHERE has_started_bot = TRUE") | |
| return [r['user_id'] for r in records] | |
| except: return [] | |
| # --- db.py এর একদম শেষে এই ফাংশনটি যোগ করুন --- | |
| async def create_db_pool_with_retry(max_retries=5, delay=5): | |
| db_url = DATABASE_URL | |
| if not db_url: return None | |
| for attempt in range(max_retries): | |
| try: | |
| pool = await asyncpg.create_pool( | |
| dsn=db_url, | |
| min_size=2, | |
| max_size=10, | |
| command_timeout=30, | |
| statement_cache_size=0, | |
| max_inactive_connection_lifetime=30 # PGBouncer এর জন্য ভালো | |
| ) | |
| logger.info("✅ DB Connected successfully.") | |
| return pool | |
| except Exception as e: | |
| logger.error(f"❌ DB Connection failed (Attempt {attempt+1}): {e}") | |
| await asyncio.sleep(delay) | |
| return None | |
| async def track_chat_member(pool, chat_id, user_id): | |
| if not pool: return | |
| try: | |
| async with pool.acquire() as conn: | |
| # টেবিলটি ইনশিওর করার জন্য ON CONFLICT ব্যবহার | |
| await conn.execute(""" | |
| INSERT INTO chat_members (chat_id, user_id, last_seen) | |
| VALUES ($1, $2, CURRENT_TIMESTAMP) | |
| ON CONFLICT (chat_id, user_id) | |
| DO UPDATE SET last_seen = CURRENT_TIMESTAMP | |
| """, chat_id, user_id) | |
| except Exception as e: | |
| logger.error(f"❌ Track Member DB Error: {e}") | |
| # --- END OF FILE db.py --- |