# ============================================ # database/users.py — المستخدمون والمشتركون # ============================================ import logging from database.connection import get_connection logger = logging.getLogger(__name__) # ════════════════════════════════════════ # المستخدمون # ════════════════════════════════════════ def upsert_user(user_id: int, username: str, first_name: str) -> None: """ تسجيل مستخدم جديد أو تحديث بياناته إذا كان موجوداً. يُستدعى في كل تفاعل جديد مع البوت. """ conn = get_connection() try: conn.execute(""" INSERT INTO users (user_id, username, first_name) VALUES (?, ?, ?) ON CONFLICT(user_id) DO UPDATE SET username = excluded.username, first_name = excluded.first_name """, (user_id, username or "", first_name or "")) conn.commit() except Exception as e: logger.error(f"خطأ في upsert_user uid={user_id}: {e}", exc_info=True) finally: conn.close() def get_user(user_id: int) -> dict | None: """جلب بيانات مستخدم""" conn = get_connection() try: c = conn.cursor() c.execute("SELECT * FROM users WHERE user_id = ?", (user_id,)) row = c.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"خطأ في get_user uid={user_id}: {e}", exc_info=True) return None finally: conn.close() def is_user_banned(user_id: int) -> bool: """التحقق من حظر المستخدم""" conn = get_connection() try: c = conn.cursor() c.execute( "SELECT is_banned FROM users WHERE user_id = ?", (user_id,) ) row = c.fetchone() return bool(row["is_banned"]) if row else False except Exception as e: logger.error(f"خطأ في is_user_banned uid={user_id}: {e}", exc_info=True) return False finally: conn.close() def ban_user(user_id: int) -> bool: """حظر مستخدم""" conn = get_connection() try: conn.execute( "UPDATE users SET is_banned = 1 WHERE user_id = ?", (user_id,) ) conn.commit() affected = conn.execute( "SELECT changes() AS c" ).fetchone()["c"] return affected > 0 except Exception as e: logger.error(f"خطأ في ban_user uid={user_id}: {e}", exc_info=True) return False finally: conn.close() def unban_user(user_id: int) -> bool: """رفع الحظر عن مستخدم""" conn = get_connection() try: conn.execute( "UPDATE users SET is_banned = 0 WHERE user_id = ?", (user_id,) ) conn.commit() affected = conn.execute( "SELECT changes() AS c" ).fetchone()["c"] return affected > 0 except Exception as e: logger.error(f"خطأ في unban_user uid={user_id}: {e}", exc_info=True) return False finally: conn.close() def get_total_users() -> int: """إجمالي المستخدمين المسجلين""" conn = get_connection() try: c = conn.cursor() c.execute("SELECT COUNT(*) AS cnt FROM users WHERE is_banned = 0") return c.fetchone()["cnt"] except Exception as e: logger.error(f"خطأ في get_total_users: {e}", exc_info=True) return 0 finally: conn.close() # ════════════════════════════════════════ # المشتركون في التنبيهات # ════════════════════════════════════════ def subscribe_user(user_id: int, platform: str = "all") -> bool: """ اشتراك مستخدم في التنبيهات. يُرجع True إذا تم الاشتراك (لم يكن مشتركاً)، False إذا كان مشتركاً مسبقاً. """ conn = get_connection() try: c = conn.cursor() c.execute(""" INSERT OR IGNORE INTO notification_subscribers (user_id, platform) VALUES (?, ?) """, (user_id, platform)) conn.commit() return c.rowcount > 0 except Exception as e: logger.error(f"خطأ في subscribe_user uid={user_id}: {e}", exc_info=True) return False finally: conn.close() def unsubscribe_user(user_id: int) -> bool: """ إلغاء اشتراك مستخدم من التنبيهات. يُرجع True إذا تم الإلغاء. """ conn = get_connection() try: c = conn.cursor() c.execute( "DELETE FROM notification_subscribers WHERE user_id = ?", (user_id,) ) conn.commit() return c.rowcount > 0 except Exception as e: logger.error(f"خطأ في unsubscribe_user uid={user_id}: {e}", exc_info=True) return False finally: conn.close() def is_subscribed(user_id: int) -> bool: """التحقق من اشتراك مستخدم في التنبيهات""" conn = get_connection() try: c = conn.cursor() c.execute( "SELECT user_id FROM notification_subscribers WHERE user_id = ?", (user_id,) ) return c.fetchone() is not None except Exception as e: logger.error(f"خطأ في is_subscribed uid={user_id}: {e}", exc_info=True) return False finally: conn.close() def get_all_subscriber_ids() -> list[int]: """ جلب كل IDs المشتركين في التنبيهات (غير المحظورين). يُستخدم من broadcaster.py لإرسال الإذاعات. """ conn = get_connection() try: c = conn.cursor() c.execute(""" SELECT ns.user_id FROM notification_subscribers ns JOIN users u ON ns.user_id = u.user_id WHERE u.is_banned = 0 ORDER BY ns.subscribed_at ASC """) return [row["user_id"] for row in c.fetchall()] except Exception as e: logger.error(f"خطأ في get_all_subscriber_ids: {e}", exc_info=True) return [] finally: conn.close() def get_subscriber_count() -> int: """عدد المشتركين النشطين في التنبيهات""" conn = get_connection() try: c = conn.cursor() c.execute(""" SELECT COUNT(*) AS cnt FROM notification_subscribers ns JOIN users u ON ns.user_id = u.user_id WHERE u.is_banned = 0 """) return c.fetchone()["cnt"] except Exception as e: logger.error(f"خطأ في get_subscriber_count: {e}", exc_info=True) return 0 finally: conn.close() def remove_subscriber(user_id: int) -> None: """ حذف مشترك من التنبيهات (يُستخدم من broadcaster عند خطأ دائم). مثال: المستخدم حظر البوت أو حذف حسابه. """ conn = get_connection() try: conn.execute( "DELETE FROM notification_subscribers WHERE user_id = ?", (user_id,) ) conn.commit() logger.info(f"تم إزالة المشترك {user_id} من التنبيهات (حظر أو حساب محذوف)") except Exception as e: logger.error(f"خطأ في remove_subscriber uid={user_id}: {e}", exc_info=True) finally: conn.close() # ════════════════════════════════════════ # سجل الإذاعات # ════════════════════════════════════════ def log_broadcast( admin_id: int, message_text: str, broadcast_type: str, game_title: str, game_link: str, sent_count: int, failed_count: int, blocked_count: int ) -> int: """تسجيل نتيجة إذاعة في قاعدة البيانات. يُرجع ID السجل.""" conn = get_connection() try: c = conn.cursor() c.execute(""" INSERT INTO broadcasts (admin_id, message_text, broadcast_type, game_title, game_link, sent_count, failed_count, blocked_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( admin_id, message_text, broadcast_type, game_title, game_link, sent_count, failed_count, blocked_count )) conn.commit() return c.lastrowid except Exception as e: logger.error(f"خطأ في log_broadcast: {e}", exc_info=True) return 0 finally: conn.close() def get_broadcast_history(limit: int = 10) -> list[dict]: """آخر N إذاعة (للمشرف)""" conn = get_connection() try: c = conn.cursor() c.execute(""" SELECT id, broadcast_type, game_title, sent_count, failed_count, blocked_count, sent_at FROM broadcasts ORDER BY id DESC LIMIT ? """, (limit,)) return [dict(r) for r in c.fetchall()] except Exception as e: logger.error(f"خطأ في get_broadcast_history: {e}", exc_info=True) return [] finally: conn.close()