| import aiosqlite |
| from .config import settings |
| from contextlib import asynccontextmanager |
| import logging |
|
|
| logger = logging.getLogger("database") |
|
|
| class Database: |
| def __init__(self, db_path: str): |
| self.db_path = db_path |
|
|
| @asynccontextmanager |
| async def get_connection(self): |
| |
| if hasattr(self, '_persistent_db') and self._persistent_db: |
| yield self._persistent_db |
| return |
|
|
| |
| async with aiosqlite.connect(self.db_path, timeout=30.0) as db: |
| db.row_factory = aiosqlite.Row |
| await db.execute("PRAGMA journal_mode=WAL") |
| await db.execute("PRAGMA synchronous=NORMAL") |
| try: |
| yield db |
| except Exception as e: |
| logger.error(f"Database connection error: {e}") |
| raise e |
|
|
| async def start_session(self): |
| """Starts a persistent session for high-performance bulk operations.""" |
| if not hasattr(self, '_persistent_db') or not self._persistent_db: |
| self._persistent_db = await aiosqlite.connect(self.db_path, timeout=60.0) |
| self._persistent_db.row_factory = aiosqlite.Row |
| await self._persistent_db.execute("PRAGMA journal_mode=WAL") |
| await self._persistent_db.execute("PRAGMA synchronous=NORMAL") |
| logger.info("Persistent DB session started with WAL mode") |
|
|
| async def close_session(self): |
| """Closes the persistent session.""" |
| if hasattr(self, '_persistent_db') and self._persistent_db: |
| await self._persistent_db.close() |
| self._persistent_db = None |
| logger.info("Persistent DB session closed") |
|
|
| async def execute_write(self, query: str, params: tuple = ()) -> int: |
| """Executes a write operation with retry logic for locked databases.""" |
| max_retries = 5 |
| for attempt in range(max_retries): |
| async with self.get_connection() as db: |
| try: |
| cursor = await db.execute(query, params) |
| await db.commit() |
| return cursor.lastrowid |
| except aiosqlite.OperationalError as e: |
| if "locked" in str(e).lower() and attempt < max_retries - 1: |
| wait_time = 0.5 * (attempt + 1) |
| logger.warning(f"Database locked, retrying write in {wait_time}s... (Attempt {attempt+1}/{max_retries})") |
| await asyncio.sleep(wait_time) |
| continue |
| raise e |
| except Exception as e: |
| if not hasattr(self, '_persistent_db') or not self._persistent_db: |
| await db.rollback() |
| logger.error(f"Database write error: {e} | Query: {query}") |
| raise e |
|
|
| async def fetch_one(self, query: str, params: tuple = ()): |
| """Safely fetches a single row.""" |
| async with self.get_connection() as db: |
| try: |
| async with db.execute(query, params) as cursor: |
| return await cursor.fetchone() |
| except Exception as e: |
| logger.error(f"Database fetch_one error: {e} | Query: {query}") |
| return None |
|
|
| async def fetch_all(self, query: str, params: tuple = ()): |
| """Safely fetches all rows.""" |
| async with self.get_connection() as db: |
| try: |
| async with db.execute(query, params) as cursor: |
| return await cursor.fetchall() |
| except Exception as e: |
| logger.error(f"Database fetch_all error: {e} | Query: {query}") |
| return [] |
|
|
| async def init_db(self): |
| async with self.get_connection() as db: |
| |
| await db.execute("PRAGMA journal_mode=WAL") |
| await db.execute("PRAGMA busy_timeout = 30000") |
| |
| |
| schema = [ |
| """CREATE TABLE IF NOT EXISTS movies ( |
| id TEXT PRIMARY KEY, title TEXT, poster TEXT, year TEXT, |
| rating TEXT, description TEXT, category TEXT |
| )""", |
| """CREATE TABLE IF NOT EXISTS series ( |
| id TEXT PRIMARY KEY, title TEXT, poster TEXT, year TEXT, |
| rating TEXT, description TEXT, category TEXT |
| )""", |
| """CREATE TABLE IF NOT EXISTS episodes ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, series_id TEXT, |
| episode_number INTEGER, title TEXT, watch_link TEXT, |
| FOREIGN KEY(series_id) REFERENCES series(id) |
| )""", |
| """CREATE TABLE IF NOT EXISTS users ( |
| id TEXT PRIMARY KEY, |
| points INTEGER DEFAULT 0, |
| watch_time_total INTEGER DEFAULT 0, |
| last_watch_reward_time INTEGER DEFAULT 0, |
| is_fan INTEGER DEFAULT 0, |
| ad_free_until INTEGER DEFAULT 0, |
| is_rewarded INTEGER DEFAULT 0, |
| referrer_id TEXT, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| )""", |
| """CREATE TABLE IF NOT EXISTS referrals ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, referrer_id TEXT, referred_id TEXT, |
| status TEXT DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| UNIQUE(referrer_id, referred_id) |
| )""", |
| """CREATE TABLE IF NOT EXISTS comments ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, content_id TEXT, user_id TEXT, |
| text TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY(user_id) REFERENCES users(id) |
| )""", |
| """CREATE TABLE IF NOT EXISTS course_progress ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, course_id TEXT, |
| lesson_id TEXT, completed INTEGER DEFAULT 0, last_position INTEGER DEFAULT 0, |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| UNIQUE(user_id, course_id, lesson_id), |
| FOREIGN KEY(user_id) REFERENCES users(id) |
| )""", |
| "CREATE INDEX IF NOT EXISTS idx_users_referrer ON users(referrer_id)", |
| "CREATE INDEX IF NOT EXISTS idx_referrals_referrer ON referrals(referrer_id)", |
| "CREATE INDEX IF NOT EXISTS idx_comments_content ON comments(content_id)", |
| "CREATE INDEX IF NOT EXISTS idx_course_progress_user ON course_progress(user_id)", |
| "CREATE INDEX IF NOT EXISTS idx_course_progress_course ON course_progress(course_id)", |
| """CREATE TABLE IF NOT EXISTS watch_history ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, content_id TEXT, |
| content_type TEXT, episode_id TEXT, progress INTEGER DEFAULT 0, |
| watched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY(user_id) REFERENCES users(id) |
| )""", |
| "CREATE INDEX IF NOT EXISTS idx_watch_history_user ON watch_history(user_id)", |
| "CREATE INDEX IF NOT EXISTS idx_watch_history_content ON watch_history(content_id)", |
| """CREATE TABLE IF NOT EXISTS site_settings ( |
| key TEXT PRIMARY KEY, |
| value TEXT |
| )""", |
| """CREATE TABLE IF NOT EXISTS promo_codes ( |
| code TEXT PRIMARY KEY, |
| reward_type TEXT, |
| duration_days INTEGER, |
| max_uses INTEGER DEFAULT 1, |
| current_uses INTEGER DEFAULT 0, |
| is_active INTEGER DEFAULT 1 |
| )""", |
| """CREATE TABLE IF NOT EXISTS user_promos ( |
| user_id TEXT, |
| code TEXT, |
| redeemed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| PRIMARY KEY (user_id, code) |
| )""" |
| ] |
| for statement in schema: |
| await db.execute(statement) |
|
|
| |
| try: |
| await db.execute("ALTER TABLE users ADD COLUMN is_rewarded INTEGER DEFAULT 0") |
| except: pass |
| |
| try: |
| await db.execute("ALTER TABLE promo_codes ADD COLUMN duration_days INTEGER") |
| await db.execute("ALTER TABLE promo_codes ADD COLUMN max_uses INTEGER DEFAULT 1") |
| await db.execute("ALTER TABLE promo_codes ADD COLUMN current_uses INTEGER DEFAULT 0") |
| await db.execute("ALTER TABLE promo_codes ADD COLUMN is_active INTEGER DEFAULT 1") |
| except: pass |
|
|
| await db.commit() |
| logger.info("Database initialized successfully (WAL Active)") |
|
|
| db_manager = Database(settings.DATABASE_NAME) |
|
|
|
|