qqqsfasdf's picture
Upload 60 files
8d21059 verified
# database/users.py
import logging
from database.connection import get_conn
logger = logging.getLogger(__name__)
def register_user(
user_id: int,
username: str,
first_name: str,
last_name: str,
referred_by: int | None = None,
) -> bool:
"""
تسجيل المستخدم إذا لم يكن موجوداً.
لا يُعدِّل بيانات المستخدم الحالي إن كان مسجلاً.
يُعيد True إذا كان مستخدماً جديداً.
"""
conn = get_conn()
try:
c = conn.cursor()
if referred_by and referred_by == user_id:
referred_by = None
c.execute("""
INSERT OR IGNORE INTO users
(user_id, username, first_name, last_name, referred_by)
VALUES (?, ?, ?, ?, ?)
""", (user_id, username or "", first_name or "", last_name or "", referred_by))
conn.commit()
is_new = c.rowcount > 0
if is_new:
logger.info(f"[Users] مستخدم جديد: {user_id} (@{username})")
return is_new
except Exception as e:
logger.error(f"register_user({user_id}): {e}", exc_info=True)
return False
finally:
conn.close()
def get_user(user_id: int) -> dict | None:
conn = get_conn()
try:
c = conn.cursor()
c.execute("SELECT * FROM users WHERE user_id=?", (user_id,))
row = c.fetchone()
return dict(row) if row else None
except Exception as e:
logger.error(f"get_user({user_id}): {e}", exc_info=True)
return None
finally:
conn.close()
def update_username(user_id: int, username: str, first_name: str, last_name: str) -> None:
"""تحديث بيانات المستخدم عند كل /start."""
conn = get_conn()
try:
conn.execute("""
UPDATE users SET username=?, first_name=?, last_name=?
WHERE user_id=?
""", (username or "", first_name or "", last_name or "", user_id))
conn.commit()
except Exception as e:
logger.error(f"update_username({user_id}): {e}", exc_info=True)
finally:
conn.close()
def increment_search_count(user_id: int, last_game: str = "") -> None:
conn = get_conn()
try:
if last_game:
conn.execute("""
UPDATE users
SET search_count = search_count + 1, last_game = ?
WHERE user_id = ?
""", (last_game[:100], user_id))
else:
conn.execute("""
UPDATE users
SET search_count = search_count + 1
WHERE user_id = ?
""", (user_id,))
conn.commit()
except Exception as e:
logger.error(f"increment_search_count({user_id}): {e}", exc_info=True)
finally:
conn.close()
def get_total_users() -> int:
conn = get_conn()
try:
return conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
except Exception:
return 0
finally:
conn.close()
def is_banned(user_id: int) -> bool:
conn = get_conn()
try:
c = conn.cursor()
c.execute("SELECT is_banned FROM users WHERE user_id=?", (user_id,))
row = c.fetchone()
return bool(row and row[0])
except Exception:
return False
finally:
conn.close()