# --- START OF FILE db.py --- import logging import asyncio import asyncpg import orjson as json from credentials import DATABASE_URL from bot_handlers.redis_manager import get_redis_client_for_user, get_redis_client_for_chat from datetime import datetime, timezone logger = logging.getLogger(__name__) # --- TABLE SETUP --- async def setup_database(pool): if not pool: return try: async with pool.acquire() as conn: # Users Table await conn.execute(""" CREATE TABLE IF NOT EXISTS users ( user_id BIGINT PRIMARY KEY, first_name TEXT, username TEXT, added_on TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, language TEXT DEFAULT 'en', has_started_bot BOOLEAN DEFAULT FALSE, message_count INTEGER DEFAULT 0, last_active_on TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, warning_count INTEGER DEFAULT 0, trust_score INTEGER DEFAULT 100 )""") # --- db.py এর setup_database ফাংশনের ভেতরে --- # মেম্বার ট্র্যাকিং টেবিল (নতুন যোগ করুন) await conn.execute(""" CREATE TABLE IF NOT EXISTS chat_members ( chat_id BIGINT, user_id BIGINT, last_seen TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (chat_id, user_id) )""") # ইনডেক্স যাতে সার্চ ফাস্ট হয় await conn.execute("CREATE INDEX IF NOT EXISTS idx_chat_members_chat_id ON chat_members (chat_id)") # Missing columns fix for users for col, dtype in [("message_count", "INTEGER DEFAULT 0"), ("last_active_on", "TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP"), ("warning_count", "INTEGER DEFAULT 0"), ("trust_score", "INTEGER DEFAULT 100")]: try: await conn.execute(f"ALTER TABLE users ADD COLUMN {col} {dtype}") except: pass # Group Settings Table await conn.execute(""" CREATE TABLE IF NOT EXISTS group_settings ( chat_id BIGINT PRIMARY KEY, forbidden_words JSONB DEFAULT '[]'::jsonb, anti_duplicate BOOLEAN DEFAULT FALSE, max_message_length INTEGER DEFAULT 0, auto_delete_seconds INTEGER DEFAULT 0, delete_join_messages BOOLEAN DEFAULT TRUE, block_channel_forwards BOOLEAN DEFAULT TRUE, enable_spamscan BOOLEAN DEFAULT FALSE, mute_on_link BOOLEAN DEFAULT FALSE, allow_admins_manage BOOLEAN DEFAULT FALSE, block_bio_links BOOLEAN DEFAULT TRUE, enable_ocr_scan BOOLEAN DEFAULT FALSE, allow_usernames BOOLEAN DEFAULT FALSE, welcome_data JSONB DEFAULT NULL, welcome_delete_seconds INTEGER DEFAULT 0, welcome_mode TEXT DEFAULT 'text', enable_sentiment BOOLEAN DEFAULT FALSE, enable_support_ticket BOOLEAN DEFAULT FALSE, enable_trust_score BOOLEAN DEFAULT FALSE, enable_traffic_control BOOLEAN DEFAULT FALSE, enable_smart_notes BOOLEAN DEFAULT FALSE, warn_limit INTEGER DEFAULT 0, warn_time_window INTEGER DEFAULT 10, warn_action TEXT DEFAULT 'mute' )""") # 🔥 FIX: Missing columns fix for group_settings (Auto-Warn Feature) # এটি পুরাতন ডাটাবেসে নতুন কলামগুলো যোগ করে দিবে যাতে /setwarn কাজ করে for col, dtype in [ ("warn_limit", "INTEGER DEFAULT 0"), ("warn_time_window", "INTEGER DEFAULT 10"), ("warn_action", "TEXT DEFAULT 'mute'") ]: try: await conn.execute(f"ALTER TABLE group_settings ADD COLUMN {col} {dtype}") except: pass # Filters & Notes await conn.execute("CREATE TABLE IF NOT EXISTS filters (chat_id BIGINT, keyword TEXT, reply_text TEXT, file_id TEXT, file_type TEXT, PRIMARY KEY (chat_id, keyword))") await conn.execute("CREATE TABLE IF NOT EXISTS notes (chat_id BIGINT, slug TEXT, content TEXT, file_id TEXT, file_type TEXT, PRIMARY KEY (chat_id, slug))") await conn.execute("CREATE TABLE IF NOT EXISTS chats (chat_id BIGINT PRIMARY KEY, added_on TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP)") # VIP SYSTEM await conn.execute(""" CREATE TABLE IF NOT EXISTS vip_trials ( chat_id BIGINT, user_id BIGINT, expiry_date TIMESTAMPTZ, joined_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, is_safe BOOLEAN DEFAULT FALSE, PRIMARY KEY (chat_id, user_id) )""") try: await conn.execute("ALTER TABLE vip_trials ADD COLUMN is_safe BOOLEAN DEFAULT FALSE") except: pass # Indexes await conn.execute("CREATE INDEX IF NOT EXISTS idx_users_username ON users (username)") # 🔥🔥 Performance Fix Index 🔥🔥 await conn.execute("CREATE INDEX IF NOT EXISTS idx_vip_expiry ON vip_trials (expiry_date)") except Exception as e: logger.error(f"DB Setup Error: {e}") # --- 🔥 REDIS BUFFER FUNCTIONS (BATCH UPDATE) 🔥 --- async def buffer_user_stats(context, user_id, first_name, username): """ Redis-এ ইউজারের মেসেজ কাউন্ট এবং ইনফো বাফার করে রাখে। """ redis_clients = context.bot_data.get('redis_clients') # প্রথম Redis ক্লায়েন্ট ব্যবহার করা হচ্ছে redis = redis_clients[0] if redis_clients else None if not redis: return try: key = f"stats_buffer:{user_id}" async with redis.pipeline() as pipe: # ১. মেসেজ কাউন্ট ১ বাড়ান pipe.hincrby(key, "msg_count", 1) # ২. নাম এবং ইউজারনেম আপডেট করুন (শুধুমাত্র যদি non-empty হয়) if first_name: pipe.hset(key, "first_name", first_name) if username: pipe.hset(key, "username", username) # ৩. এই ইউজারকে 'dirty' লিস্টে রাখুন যাতে পরে DB তে সেভ করা যায় pipe.sadd("stats_dirty_users", user_id) # ৪. সেফটির জন্য এক্সপায়ার টাইম pipe.expire(key, 86400) await pipe.execute() except Exception as e: logger.error(f"Buffer Stats Error: {e}") # --- db.py ফাইলের flush_user_stats_to_db ফাংশনটি এভাবে পরিবর্তন করুন --- async def flush_user_stats_to_db(app): pool = app.bot_data.get('db_pool') redis_clients = app.bot_data.get('redis_clients') redis = redis_clients[0] if redis_clients else None if not pool or not redis: return try: # ১. সব 'dirty' ইউজার আইডি বের করা user_ids_raw = await redis.smembers("stats_dirty_users") if not user_ids_raw: return user_ids = [int(uid) for uid in user_ids_raw] batch_data = [] processed_users = [] for uid in user_ids: old_key = f"stats_buffer:{uid}" process_key = f"sync_temp:{uid}" # ২. কিউতে থাকা ডাটা মুভ করা (Isolation) try: # rename ব্যবহার করলে ওই মুহূর্তের ডাটা লক হয়ে যায় await redis.rename(old_key, process_key) except: continue # যদি কি না থাকে তবে স্কিপ data = await redis.hgetall(process_key) if data: msg_inc = int(data.get("msg_count", 0)) fname = data.get("first_name") uname = data.get("username") if msg_inc > 0 or fname or uname: batch_data.append((uid, fname, uname, msg_inc)) processed_users.append(uid) # কাজ শেষ, টেম্পোরারি কি ডিলিট await redis.delete(process_key) if batch_data: async with pool.acquire() as conn: await conn.executemany(""" INSERT INTO users (user_id, first_name, username, message_count, last_active_on) VALUES ($1, $2, $3, $4, CURRENT_TIMESTAMP) ON CONFLICT (user_id) DO UPDATE SET first_name = COALESCE(EXCLUDED.first_name, users.first_name), username = COALESCE(EXCLUDED.username, users.username), message_count = users.message_count + EXCLUDED.message_count, last_active_on = CURRENT_TIMESTAMP """, batch_data) # ৩. ডার্টি লিস্ট থেকে সফলভাবে সেভ হওয়া আইডিগুলো বাদ দেওয়া await redis.srem("stats_dirty_users", *processed_users) logger.info(f"✅ Flushed {len(batch_data)} users safely to DB.") except Exception as e: logger.error(f"DB Flush Error: {e}") # --- CORE DB FUNCTIONS --- async def _add_user_to_db_core(pool, user_id, first_name, username): # এখন এটি শুধু নতুন ইউজার চেক করার জন্য ব্যবহার হবে if not pool: return try: async with pool.acquire() as conn: await conn.execute(""" INSERT INTO users (user_id, first_name, username, last_active_on) VALUES ($1, $2, $3, CURRENT_TIMESTAMP) ON CONFLICT (user_id) DO NOTHING """, user_id, first_name, username) except: pass async def get_user_id_by_username(pool, username): if not pool: return None clean_username = username.strip().replace("@", "") try: async with pool.acquire() as conn: row = await conn.fetchrow("SELECT user_id FROM users WHERE LOWER(REPLACE(username, '@', '')) = LOWER($1)", clean_username) return row['user_id'] if row else None except: return None async def increment_user_message_count(pool, user_id): # This is now handled by buffer_user_stats, kept as fallback if not pool: return try: async with pool.acquire() as conn: await conn.execute("UPDATE users SET message_count = message_count + 1 WHERE user_id = $1", user_id) except: pass async def increment_user_warning_count(pool, user_id): if not pool: return try: async with pool.acquire() as conn: await conn.execute(""" INSERT INTO users (user_id, warning_count, trust_score, last_active_on) VALUES ($1, 1, 90, CURRENT_TIMESTAMP) ON CONFLICT (user_id) DO UPDATE SET warning_count = users.warning_count + 1, trust_score = GREATEST(0, users.trust_score - 10), last_active_on = CURRENT_TIMESTAMP """, user_id) except: pass async def get_user_stats(pool, user_id): if not pool: return None try: async with pool.acquire() as conn: return await conn.fetchrow("SELECT * FROM users WHERE user_id = $1", user_id) except: return None async def _add_chat_to_db_core(pool, chat_id: int): if not pool: return try: async with pool.acquire() as conn: await conn.execute("INSERT INTO chats (chat_id) VALUES ($1) ON CONFLICT DO NOTHING", chat_id) except: pass async def get_db_stats(pool): if not pool: return 0, 0, 0 try: async with pool.acquire() as conn: total_users = await conn.fetchval("SELECT COUNT(*) FROM users") total_groups = await conn.fetchval("SELECT COUNT(*) FROM chats") pm_users = await conn.fetchval("SELECT COUNT(*) FROM users WHERE has_started_bot = TRUE") return total_users, total_groups, pm_users except: return 0, 0, 0 async def get_user_lang_from_db(context, user_id): redis = get_redis_client_for_user(context) try: if l := await redis.get(f"user_lang:{user_id}"): return l except: pass pool = context.bot_data.get('db_pool') if not pool: return "en" try: async with pool.acquire() as conn: l = await conn.fetchval("SELECT language FROM users WHERE user_id = $1", user_id) if l: await redis.set(f"user_lang:{user_id}", l, ex=86400) return l or "en" except: return "en" async def get_all_settings(context, chat_id): redis = get_redis_client_for_chat(context, chat_id) pool = context.bot_data.get('db_pool') defaults = { 'forbidden_words': [], 'anti_duplicate': False, 'max_message_length': 0, 'auto_delete_seconds': 0, 'delete_join_messages': True, 'block_channel_forwards': True, 'enable_spamscan': False, 'mute_on_link': False, 'allow_admins_manage': False, 'block_bio_links': True, 'enable_ocr_scan': False, 'allow_usernames': False, 'welcome_data': None, 'welcome_delete_seconds': 0, 'welcome_mode': 'text', 'enable_sentiment': False, 'enable_support_ticket': False, 'enable_trust_score': False, 'enable_traffic_control': False, 'enable_smart_notes': False, 'warn_limit': 0, 'warn_time_window': 10, 'warn_action': 'mute' } try: if cached := await redis.get(f"group_settings:{chat_id}"): return json.loads(cached) except: pass if not pool: return defaults.copy() try: async with pool.acquire() as conn: row = await conn.fetchrow("SELECT * FROM group_settings WHERE chat_id = $1", chat_id) if not row: return defaults.copy() final = defaults.copy() final.update(dict(row)) if isinstance(final.get('forbidden_words'), str): final['forbidden_words'] = json.loads(final['forbidden_words']) if isinstance(final.get('welcome_data'), str): final['welcome_data'] = json.loads(final['welcome_data']) await redis.set(f"group_settings:{chat_id}", json.dumps(final).decode('utf-8'), ex=1800) return final except: return defaults.copy() async def invalidate_settings_cache(context, chat_id): try: await get_redis_client_for_chat(context, chat_id).delete(f"group_settings:{chat_id}") except: pass async def set_user_lang_in_db(context, user_id, lang): pool = context.bot_data.get('db_pool') if not pool: return try: async with pool.acquire() as conn: await conn.execute("INSERT INTO users (user_id) VALUES ($1) ON CONFLICT (user_id) DO NOTHING", user_id) await conn.execute("UPDATE users SET language = $1 WHERE user_id = $2", lang, user_id) await get_redis_client_for_user(context).set(f"user_lang:{user_id}", lang, ex=86400) except: pass async def update_setting_in_db(context, chat_id, key, value): pool = context.bot_data.get('db_pool') if not pool: return val = json.dumps(value).decode('utf-8') if isinstance(value, (list, dict)) else value try: async with pool.acquire() as conn: await conn.execute(f"INSERT INTO group_settings (chat_id, {key}) VALUES ($1, $2) ON CONFLICT (chat_id) DO UPDATE SET {key} = EXCLUDED.{key}", chat_id, val) await invalidate_settings_cache(context, chat_id) except: pass async def reset_settings_in_db(context, chat_id): pool = context.bot_data.get('db_pool') if not pool: return False try: async with pool.acquire() as conn: await conn.execute("DELETE FROM group_settings WHERE chat_id = $1", chat_id) await conn.execute("DELETE FROM filters WHERE chat_id = $1", chat_id) await conn.execute("DELETE FROM notes WHERE chat_id = $1", chat_id) await invalidate_settings_cache(context, chat_id) try: r = get_redis_client_for_chat(context, chat_id) await r.delete(f"filters:{chat_id}") await r.delete(f"notes:{chat_id}") except: pass return True except: return False async def save_note_to_db(context, chat_id, slug, content, file_id=None, file_type=None): pool = context.bot_data.get('db_pool') if not pool: return try: async with pool.acquire() as conn: await conn.execute(""" INSERT INTO notes (chat_id, slug, content, file_id, file_type) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (chat_id, slug) DO UPDATE SET content = EXCLUDED.content, file_id = EXCLUDED.file_id, file_type = EXCLUDED.file_type """, chat_id, slug.lower(), content, file_id, file_type) await get_redis_client_for_chat(context, chat_id).delete(f"notes:{chat_id}") except: pass async def get_note_from_db(context, chat_id, slug): redis = get_redis_client_for_chat(context, chat_id) slug = slug.lower() try: if cached := await redis.hget(f"notes:{chat_id}", slug): return json.loads(cached) except: pass pool = context.bot_data.get('db_pool') if not pool: return None try: async with pool.acquire() as conn: r = await conn.fetchrow("SELECT content, file_id, file_type FROM notes WHERE chat_id = $1 AND slug = $2", chat_id, slug) if r: data = {'text': r['content'], 'file_id': r['file_id'], 'type': r['file_type']} # Updated: Added .decode('utf-8') to avoid bytes in string-mode Redis await redis.hset(f"notes:{chat_id}", slug, json.dumps(data).decode('utf-8')) await redis.expire(f"notes:{chat_id}", 3600) return data except: return None async def add_filter_to_db(context, chat_id, keyword, reply_text, file_id=None, file_type=None): pool = context.bot_data.get('db_pool') if not pool: return try: async with pool.acquire() as conn: await conn.execute(""" INSERT INTO filters (chat_id, keyword, reply_text, file_id, file_type) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (chat_id, keyword) DO UPDATE SET reply_text = EXCLUDED.reply_text, file_id = EXCLUDED.file_id, file_type = EXCLUDED.file_type """, chat_id, keyword.lower(), reply_text, file_id, file_type) await get_redis_client_for_chat(context, chat_id).delete(f"filters:{chat_id}") except: pass async def remove_filter_from_db(context, chat_id, keyword): pool = context.bot_data.get('db_pool') if not pool: return False try: async with pool.acquire() as conn: res = await conn.execute("DELETE FROM filters WHERE chat_id = $1 AND keyword = $2", chat_id, keyword.lower()) await get_redis_client_for_chat(context, chat_id).delete(f"filters:{chat_id}") return "DELETE 0" not in res except: return False async def get_chat_filters(context, chat_id): redis = get_redis_client_for_chat(context, chat_id) try: if cached := await redis.get(f"filters:{chat_id}"): return json.loads(cached) except: pass pool = context.bot_data.get('db_pool') if not pool: return {} try: async with pool.acquire() as conn: rows = await conn.fetch("SELECT keyword, reply_text, file_id, file_type FROM filters WHERE chat_id = $1", chat_id) data = {r['keyword']: {'text': r['reply_text'], 'file_id': r['file_id'], 'type': r['file_type']} for r in rows} # Updated: Added .decode('utf-8') to avoid bytes in string-mode Redis await redis.set(f"filters:{chat_id}", json.dumps(data).decode('utf-8'), ex=3600) return data except: return {} async def get_all_group_ids(pool): if not pool: return [] try: async with pool.acquire() as conn: records = await conn.fetch("SELECT chat_id FROM chats") return [r['chat_id'] for r in records] except: return [] async def get_all_user_ids(pool): if not pool: return [] try: async with pool.acquire() as conn: records = await conn.fetch("SELECT user_id FROM users WHERE has_started_bot = TRUE") return [r['user_id'] for r in records] except: return [] # --- db.py এর একদম শেষে এই ফাংশনটি যোগ করুন --- async def create_db_pool_with_retry(max_retries=5, delay=5): db_url = DATABASE_URL if not db_url: return None for attempt in range(max_retries): try: pool = await asyncpg.create_pool( dsn=db_url, min_size=2, max_size=10, command_timeout=30, statement_cache_size=0, max_inactive_connection_lifetime=30 # PGBouncer এর জন্য ভালো ) logger.info("✅ DB Connected successfully.") return pool except Exception as e: logger.error(f"❌ DB Connection failed (Attempt {attempt+1}): {e}") await asyncio.sleep(delay) return None async def track_chat_member(pool, chat_id, user_id): if not pool: return try: async with pool.acquire() as conn: # টেবিলটি ইনশিওর করার জন্য ON CONFLICT ব্যবহার await conn.execute(""" INSERT INTO chat_members (chat_id, user_id, last_seen) VALUES ($1, $2, CURRENT_TIMESTAMP) ON CONFLICT (chat_id, user_id) DO UPDATE SET last_seen = CURRENT_TIMESTAMP """, chat_id, user_id) except Exception as e: logger.error(f"❌ Track Member DB Error: {e}") # --- END OF FILE db.py ---