import time import psycopg2 from psycopg2 import pool from config import DB_URL, logger # ========================= # CONNECTION POOL # ========================= if not DB_URL: raise RuntimeError("DATABASE_URL is not set. Please configure environment variables.") db_pool = psycopg2.pool.SimpleConnectionPool(1, 20, DB_URL) def init_db(): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" CREATE TABLE IF NOT EXISTS users ( user_id BIGINT PRIMARY KEY, username TEXT, balance INTEGER DEFAULT 1000, vip_until BIGINT DEFAULT 0, ref_by BIGINT DEFAULT NULL, last_daily BIGINT DEFAULT 0, last_url TEXT DEFAULT NULL, created_at BIGINT DEFAULT 0 )""") c.execute(""" CREATE TABLE IF NOT EXISTS opened_chapters ( user_id BIGINT, chapter_id TEXT, opened_at BIGINT DEFAULT 0, manga_slug TEXT DEFAULT NULL, PRIMARY KEY (user_id, chapter_id) )""") c.execute(""" CREATE TABLE IF NOT EXISTS vouchers ( code TEXT PRIMARY KEY, coins INTEGER DEFAULT 0, vip_days INTEGER DEFAULT 0, used_by BIGINT DEFAULT NULL )""") c.execute(""" CREATE TABLE IF NOT EXISTS manga_index ( slug TEXT PRIMARY KEY, title TEXT, url TEXT, thumbnail TEXT )""") c.execute(""" CREATE TABLE IF NOT EXISTS manga_genres ( manga_slug TEXT, genre_id TEXT, genre_name TEXT, PRIMARY KEY (manga_slug, genre_id) )""") c.execute(""" CREATE TABLE IF NOT EXISTS bookmarks ( user_id BIGINT, manga_slug TEXT, manga_url TEXT, manga_title TEXT, last_chapter_id TEXT DEFAULT NULL, added_at BIGINT DEFAULT 0, PRIMARY KEY (user_id, manga_slug) )""") c.execute(""" CREATE TABLE IF NOT EXISTS chapter_cache ( manga_slug TEXT, chapter_id TEXT, file_format TEXT DEFAULT 'pdf', file_id TEXT, PRIMARY KEY (manga_slug, chapter_id, file_format) )""") c.execute(""" CREATE TABLE IF NOT EXISTS manga_queue ( url TEXT PRIMARY KEY, added_at BIGINT DEFAULT 0 )""") # Migrations c.execute("ALTER TABLE users ADD COLUMN IF NOT EXISTS username TEXT DEFAULT NULL") c.execute("ALTER TABLE users ADD COLUMN IF NOT EXISTS created_at BIGINT DEFAULT 0") c.execute("ALTER TABLE opened_chapters ADD COLUMN IF NOT EXISTS manga_slug TEXT DEFAULT NULL") c.execute("ALTER TABLE chapter_cache ADD COLUMN IF NOT EXISTS file_format TEXT DEFAULT 'pdf'") c.execute("ALTER TABLE bookmarks ADD COLUMN IF NOT EXISTS manga_slug TEXT DEFAULT NULL") c.execute( "UPDATE bookmarks " "SET manga_slug = split_part(trim(trailing '/' from manga_url), '/manga/', 2) " "WHERE (manga_slug IS NULL OR manga_slug = '') AND manga_url IS NOT NULL" ) c.execute( "CREATE UNIQUE INDEX IF NOT EXISTS idx_bookmarks_user_slug " "ON bookmarks (user_id, manga_slug)" ) c.execute("UPDATE users SET created_at = %s WHERE created_at = 0 OR created_at IS NULL", (int(time.time()),)) c.execute("ALTER TABLE users ADD COLUMN IF NOT EXISTS preferred_format TEXT DEFAULT 'pdf'") conn.commit() finally: db_pool.putconn(conn) # ========================= # USER HELPERS # ========================= def get_user_preferred_format(user_id) -> str: conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT preferred_format FROM users WHERE user_id=%s", (user_id,)) row = c.fetchone() return row[0] if row else 'pdf' finally: db_pool.putconn(conn) def set_user_preferred_format(user_id, format_type: str): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("UPDATE users SET preferred_format=%s WHERE user_id=%s", (format_type, user_id)) conn.commit() finally: db_pool.putconn(conn) def get_user(user_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" SELECT user_id, username, balance, vip_until, ref_by, last_daily, last_url FROM users WHERE user_id=%s """, (user_id,)) return c.fetchone() finally: db_pool.putconn(conn) def create_user(user_id, username=None): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute( "INSERT INTO users (user_id, username, balance, vip_until, created_at) " "VALUES (%s,%s,%s,%s,%s) ON CONFLICT (user_id) DO UPDATE SET username = EXCLUDED.username", (user_id, username, 1000, 0, int(time.time())) ) conn.commit() finally: db_pool.putconn(conn) def ensure_user(user_id, username=None): user = get_user(user_id) if not user: create_user(user_id, username) user = get_user(user_id) elif username and user[1] != username: conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("UPDATE users SET username=%s WHERE user_id=%s", (username, user_id)) conn.commit() finally: db_pool.putconn(conn) return user def update_user_status(user_id, balance, vip_until=None): conn = db_pool.getconn() try: with conn.cursor() as c: if vip_until is not None: c.execute("UPDATE users SET balance=%s, vip_until=%s WHERE user_id=%s", (balance, vip_until, user_id)) else: c.execute("UPDATE users SET balance=%s WHERE user_id=%s", (balance, user_id)) conn.commit() finally: db_pool.putconn(conn) def update_last_daily(user_id, ts): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("UPDATE users SET last_daily=%s WHERE user_id=%s", (ts, user_id)) conn.commit() finally: db_pool.putconn(conn) def save_last_url(user_id, url): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("UPDATE users SET last_url=%s WHERE user_id=%s", (url, user_id)) conn.commit() finally: db_pool.putconn(conn) def is_vip(user): if not user or len(user) < 4: return False return user[3] is not None and user[3] > int(time.time()) def vip_days_left(user): if not is_vip(user): return 0 return max(0, (user[3] - int(time.time())) // 86400) # ========================= # CHAPTERS / HISTORY # ========================= def is_opened(user_id: int, chapter_id: str) -> bool: conn = db_pool.getconn() try: with conn.cursor() as c: one_week_ago = int(time.time()) - 7 * 86400 c.execute( "SELECT 1 FROM opened_chapters WHERE user_id=%s AND chapter_id=%s AND opened_at >= %s", (user_id, str(chapter_id), one_week_ago), ) return c.fetchone() is not None finally: db_pool.putconn(conn) def save_opened(user_id, ch_id, manga_slug=None): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute( "INSERT INTO opened_chapters (user_id, chapter_id, opened_at, manga_slug) " "VALUES (%s,%s,%s,%s) ON CONFLICT (user_id, chapter_id) " "DO UPDATE SET opened_at=EXCLUDED.opened_at, manga_slug=EXCLUDED.manga_slug", (user_id, str(ch_id), int(time.time()), manga_slug) ) conn.commit() finally: db_pool.putconn(conn) def get_user_history(user_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute( "SELECT chapter_id, opened_at FROM opened_chapters " "WHERE user_id=%s ORDER BY opened_at DESC LIMIT 10", (user_id,) ) return c.fetchall() finally: db_pool.putconn(conn) def get_chapters_bought_count(user_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT COUNT(*) FROM opened_chapters WHERE user_id=%s", (user_id,)) return c.fetchone()[0] finally: db_pool.putconn(conn) def get_referral_count(user_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT COUNT(*) FROM users WHERE ref_by=%s", (user_id,)) return c.fetchone()[0] finally: db_pool.putconn(conn) def get_top_buyers(limit=5): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" SELECT u.user_id, u.username, COUNT(oc.chapter_id) AS cnt FROM users u JOIN opened_chapters oc ON u.user_id = oc.user_id GROUP BY u.user_id, u.username ORDER BY cnt DESC LIMIT %s """, (limit,)) return c.fetchall() finally: db_pool.putconn(conn) # ========================= # MANGA INDEX / GENRES # ========================= def upsert_manga_index(slug, title, url, thumbnail=""): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" INSERT INTO manga_index (slug, title, url, thumbnail) VALUES (%s, %s, %s, %s) ON CONFLICT (slug) DO UPDATE SET title=EXCLUDED.title, url=EXCLUDED.url, thumbnail=EXCLUDED.thumbnail """, (slug, title, url, thumbnail)) conn.commit() finally: db_pool.putconn(conn) def search_manga_index(query: str, limit: int = 10): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" SELECT slug, title, url FROM manga_index WHERE LOWER(title) LIKE LOWER(%s) LIMIT %s """, (f"%{query}%", limit)) return c.fetchall() finally: db_pool.putconn(conn) def save_manga_genres(manga_slug: str, genres: list): if not genres: logger.warning(f"⚠️ save_manga_genres called with empty list for {manga_slug}") return conn = db_pool.getconn() try: with conn.cursor() as c: for g in genres: gid = str(g.get("term_id", g.get("id", ""))) gname = g.get("name", "") if gid and gname: c.execute(""" INSERT INTO manga_genres (manga_slug, genre_id, genre_name) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, (manga_slug, gid, gname)) conn.commit() logger.info(f"✅ Saved {len(genres)} genres for {manga_slug}") except Exception as e: logger.error(f"❌ save_manga_genres failed for {manga_slug}: {e}") finally: db_pool.putconn(conn) def get_manga_by_genre(genre_id: str, limit: int = 30): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" SELECT mi.slug, mi.title, mi.url FROM manga_genres mg JOIN manga_index mi ON mg.manga_slug = mi.slug WHERE mg.genre_id = %s LIMIT %s """, (genre_id, limit)) return c.fetchall() finally: db_pool.putconn(conn) # ========================= # BOOKMARKS / FAVORITES # ========================= def get_manga_slug(manga_url: str) -> str: return manga_url.rstrip("/").split("/manga/")[-1].rstrip("/") def add_bookmark(user_id, manga_url, manga_title, last_chapter_id=None): slug = get_manga_slug(manga_url) conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" INSERT INTO bookmarks (user_id, manga_slug, manga_title, manga_url, last_chapter_id, added_at) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (user_id, manga_slug) DO UPDATE SET manga_title=EXCLUDED.manga_title, manga_url=EXCLUDED.manga_url """, (user_id, slug, manga_title, manga_url, last_chapter_id, int(time.time()))) conn.commit() finally: db_pool.putconn(conn) def remove_bookmark(user_id, manga_url): slug = get_manga_slug(manga_url) conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("DELETE FROM bookmarks WHERE user_id=%s AND manga_slug=%s", (user_id, slug)) conn.commit() finally: db_pool.putconn(conn) def get_bookmarks(user_id): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute(""" SELECT manga_url, manga_title, last_chapter_id, added_at, manga_slug FROM bookmarks WHERE user_id=%s ORDER BY added_at DESC """, (user_id,)) return c.fetchall() finally: db_pool.putconn(conn) def is_bookmarked(user_id, manga_url) -> bool: slug = get_manga_slug(manga_url) conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT 1 FROM bookmarks WHERE user_id=%s AND manga_slug=%s", (user_id, slug)) return c.fetchone() is not None finally: db_pool.putconn(conn) def update_bookmark_chapter(user_id, manga_url, chapter_id): slug = get_manga_slug(manga_url) conn = db_pool.getconn() try: with conn.cursor() as c: c.execute( "UPDATE bookmarks SET last_chapter_id=%s WHERE user_id=%s AND manga_slug=%s", (chapter_id, user_id, slug) ) conn.commit() finally: db_pool.putconn(conn) def get_all_bookmarks_for_notification(): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT user_id, manga_url, manga_title, last_chapter_id FROM bookmarks WHERE manga_url IS NOT NULL") return c.fetchall() finally: db_pool.putconn(conn) # ========================= # MANGA QUEUE # ========================= def save_manga_queue(urls: list): if not urls: return conn = db_pool.getconn() try: with conn.cursor() as c: for url in urls: c.execute( "INSERT INTO manga_queue (url, added_at) VALUES (%s, %s) " "ON CONFLICT (url) DO UPDATE SET added_at = EXCLUDED.added_at", (url, int(time.time())) ) conn.commit() finally: db_pool.putconn(conn) def get_manga_queue() -> list: conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT url FROM manga_queue ORDER BY added_at") return [row[0] for row in c.fetchall()] finally: db_pool.putconn(conn) def get_manga_queue_count() -> int: conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT COUNT(*) FROM manga_queue") return c.fetchone()[0] finally: db_pool.putconn(conn) def get_cached_file_id(manga_slug: str, chapter_id: str, file_format: str): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute( "SELECT file_id FROM chapter_cache WHERE manga_slug=%s AND chapter_id=%s AND file_format=%s", (manga_slug, str(chapter_id), file_format), ) row = c.fetchone() return row[0] if row else None finally: db_pool.putconn(conn) def save_cached_file_id(manga_slug: str, chapter_id: str, file_format: str, file_id: str): conn = db_pool.getconn() try: with conn.cursor() as c: c.execute( "INSERT INTO chapter_cache (manga_slug, chapter_id, file_format, file_id) " "VALUES (%s, %s, %s, %s) " "ON CONFLICT (manga_slug, chapter_id, file_format) DO UPDATE SET file_id=EXCLUDED.file_id", (manga_slug, str(chapter_id), file_format, file_id), ) conn.commit() finally: db_pool.putconn(conn) # ========================= # WEEKLY STATS # ========================= def get_weekly_stats(): one_week_ago = int(time.time()) - 7 * 86400 conn = db_pool.getconn() try: with conn.cursor() as c: c.execute("SELECT COUNT(*) FROM users") total_users = c.fetchone()[0] c.execute("SELECT COUNT(*) FROM users WHERE created_at >= %s", (one_week_ago,)) new_users = c.fetchone()[0] c.execute("SELECT COUNT(*) FROM opened_chapters WHERE opened_at >= %s", (one_week_ago,)) weekly_chapters = c.fetchone()[0] c.execute(""" SELECT u.user_id, u.username, COUNT(oc.chapter_id) AS cnt FROM users u JOIN opened_chapters oc ON u.user_id = oc.user_id WHERE oc.opened_at >= %s GROUP BY u.user_id, u.username ORDER BY cnt DESC LIMIT 5 """, (one_week_ago,)) top_users = c.fetchall() c.execute(""" SELECT manga_slug, COUNT(*) AS cnt FROM opened_chapters WHERE opened_at >= %s AND manga_slug IS NOT NULL GROUP BY manga_slug ORDER BY cnt DESC LIMIT 5 """, (one_week_ago,)) top_manga = c.fetchall() c.execute("SELECT COUNT(*) FROM chapter_cache") cached_chapters = c.fetchone()[0] return { "total_users": total_users, "new_users": new_users, "weekly_chapters": weekly_chapters, "top_users": top_users, "top_manga": top_manga, "cached_chapters": cached_chapters, } finally: db_pool.putconn(conn) def build_weekly_report_text(stats: dict) -> str: medals = ["🥇","🥈","🥉","4️⃣","5️⃣"] top_users_lines = "\n".join( f"{medals[i]} {('@'+uname) if uname else str(uid)} — *{cnt}* chapters" for i, (uid, uname, cnt) in enumerate(stats["top_users"]) ) or "_No activity this week_" top_manga_lines = "\n".join( f"{medals[i]} `{slug}` — *{cnt}* downloads" for i, (slug, cnt) in enumerate(stats["top_manga"]) ) or "_No activity this week_" now_str = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime()) return ( f"📊 *Weekly Bot Report*\n_Generated: {now_str}_\n\n" f"👥 Total Registered Users: *{stats['total_users']}*\n" f"🆕 New Users This Week: *{stats['new_users']}*\n" f"📖 Chapters Unlocked This Week: *{stats['weekly_chapters']}*\n" f"💾 Cached Chapters: *{stats['cached_chapters']}*\n\n" f"🏆 *Top 5 Users This Week:*\n{top_users_lines}\n\n" f"📚 *Top 5 Manga This Week:*\n{top_manga_lines}" )