# database/users.py import logging from database.connection import get_conn logger = logging.getLogger(__name__) def register_user( user_id: int, username: str, first_name: str, last_name: str, referred_by: int | None = None, ) -> bool: """ تسجيل المستخدم إذا لم يكن موجوداً. لا يُعدِّل بيانات المستخدم الحالي إن كان مسجلاً. يُعيد True إذا كان مستخدماً جديداً. """ conn = get_conn() try: c = conn.cursor() if referred_by and referred_by == user_id: referred_by = None c.execute(""" INSERT OR IGNORE INTO users (user_id, username, first_name, last_name, referred_by) VALUES (?, ?, ?, ?, ?) """, (user_id, username or "", first_name or "", last_name or "", referred_by)) conn.commit() is_new = c.rowcount > 0 if is_new: logger.info(f"[Users] مستخدم جديد: {user_id} (@{username})") return is_new except Exception as e: logger.error(f"register_user({user_id}): {e}", exc_info=True) return False finally: conn.close() def get_user(user_id: int) -> dict | None: conn = get_conn() try: c = conn.cursor() c.execute("SELECT * FROM users WHERE user_id=?", (user_id,)) row = c.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"get_user({user_id}): {e}", exc_info=True) return None finally: conn.close() def update_username(user_id: int, username: str, first_name: str, last_name: str) -> None: """تحديث بيانات المستخدم عند كل /start.""" conn = get_conn() try: conn.execute(""" UPDATE users SET username=?, first_name=?, last_name=? WHERE user_id=? """, (username or "", first_name or "", last_name or "", user_id)) conn.commit() except Exception as e: logger.error(f"update_username({user_id}): {e}", exc_info=True) finally: conn.close() def increment_search_count(user_id: int, last_game: str = "") -> None: conn = get_conn() try: if last_game: conn.execute(""" UPDATE users SET search_count = search_count + 1, last_game = ? WHERE user_id = ? """, (last_game[:100], user_id)) else: conn.execute(""" UPDATE users SET search_count = search_count + 1 WHERE user_id = ? """, (user_id,)) conn.commit() except Exception as e: logger.error(f"increment_search_count({user_id}): {e}", exc_info=True) finally: conn.close() def get_total_users() -> int: conn = get_conn() try: return conn.execute("SELECT COUNT(*) FROM users").fetchone()[0] except Exception: return 0 finally: conn.close() def is_banned(user_id: int) -> bool: conn = get_conn() try: c = conn.cursor() c.execute("SELECT is_banned FROM users WHERE user_id=?", (user_id,)) row = c.fetchone() return bool(row and row[0]) except Exception: return False finally: conn.close()