rrr4 / app /core /database.py
minaewrw's picture
Initial backend deployment for Hugging Face Spaces
11757af
import aiosqlite
from .config import settings
from contextlib import asynccontextmanager
import logging
logger = logging.getLogger("database")
class Database:
def __init__(self, db_path: str):
self.db_path = db_path
@asynccontextmanager
async def get_connection(self):
# We reuse a single connection if it exists to handle high-pressure tasks (300 req/min)
if hasattr(self, '_persistent_db') and self._persistent_db:
yield self._persistent_db
return
# Added timeout=30.0 to handle busy database instead of failing immediately
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL") # Enable WAL mode for high concurrency
await db.execute("PRAGMA synchronous=NORMAL")
try:
yield db
except Exception as e:
logger.error(f"Database connection error: {e}")
raise e
async def start_session(self):
"""Starts a persistent session for high-performance bulk operations."""
if not hasattr(self, '_persistent_db') or not self._persistent_db:
self._persistent_db = await aiosqlite.connect(self.db_path, timeout=60.0)
self._persistent_db.row_factory = aiosqlite.Row
await self._persistent_db.execute("PRAGMA journal_mode=WAL")
await self._persistent_db.execute("PRAGMA synchronous=NORMAL")
logger.info("Persistent DB session started with WAL mode")
async def close_session(self):
"""Closes the persistent session."""
if hasattr(self, '_persistent_db') and self._persistent_db:
await self._persistent_db.close()
self._persistent_db = None
logger.info("Persistent DB session closed")
async def execute_write(self, query: str, params: tuple = ()) -> int:
"""Executes a write operation with retry logic for locked databases."""
max_retries = 5
for attempt in range(max_retries):
async with self.get_connection() as db:
try:
cursor = await db.execute(query, params)
await db.commit()
return cursor.lastrowid
except aiosqlite.OperationalError as e:
if "locked" in str(e).lower() and attempt < max_retries - 1:
wait_time = 0.5 * (attempt + 1)
logger.warning(f"Database locked, retrying write in {wait_time}s... (Attempt {attempt+1}/{max_retries})")
await asyncio.sleep(wait_time)
continue
raise e
except Exception as e:
if not hasattr(self, '_persistent_db') or not self._persistent_db:
await db.rollback()
logger.error(f"Database write error: {e} | Query: {query}")
raise e
async def fetch_one(self, query: str, params: tuple = ()):
"""Safely fetches a single row."""
async with self.get_connection() as db:
try:
async with db.execute(query, params) as cursor:
return await cursor.fetchone()
except Exception as e:
logger.error(f"Database fetch_one error: {e} | Query: {query}")
return None
async def fetch_all(self, query: str, params: tuple = ()):
"""Safely fetches all rows."""
async with self.get_connection() as db:
try:
async with db.execute(query, params) as cursor:
return await cursor.fetchall()
except Exception as e:
logger.error(f"Database fetch_all error: {e} | Query: {query}")
return []
async def init_db(self):
async with self.get_connection() as db:
# Enable WAL mode and set global safe timeout
await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA busy_timeout = 30000") # 30 seconds global wait
# Reusing the existing schema logic but organized
schema = [
"""CREATE TABLE IF NOT EXISTS movies (
id TEXT PRIMARY KEY, title TEXT, poster TEXT, year TEXT,
rating TEXT, description TEXT, category TEXT
)""",
"""CREATE TABLE IF NOT EXISTS series (
id TEXT PRIMARY KEY, title TEXT, poster TEXT, year TEXT,
rating TEXT, description TEXT, category TEXT
)""",
"""CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT, series_id TEXT,
episode_number INTEGER, title TEXT, watch_link TEXT,
FOREIGN KEY(series_id) REFERENCES series(id)
)""",
"""CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
points INTEGER DEFAULT 0,
watch_time_total INTEGER DEFAULT 0,
last_watch_reward_time INTEGER DEFAULT 0,
is_fan INTEGER DEFAULT 0,
ad_free_until INTEGER DEFAULT 0,
is_rewarded INTEGER DEFAULT 0,
referrer_id TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""",
"""CREATE TABLE IF NOT EXISTS referrals (
id INTEGER PRIMARY KEY AUTOINCREMENT, referrer_id TEXT, referred_id TEXT,
status TEXT DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(referrer_id, referred_id)
)""",
"""CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT, content_id TEXT, user_id TEXT,
text TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id)
)""",
"""CREATE TABLE IF NOT EXISTS course_progress (
id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, course_id TEXT,
lesson_id TEXT, completed INTEGER DEFAULT 0, last_position INTEGER DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, course_id, lesson_id),
FOREIGN KEY(user_id) REFERENCES users(id)
)""",
"CREATE INDEX IF NOT EXISTS idx_users_referrer ON users(referrer_id)",
"CREATE INDEX IF NOT EXISTS idx_referrals_referrer ON referrals(referrer_id)",
"CREATE INDEX IF NOT EXISTS idx_comments_content ON comments(content_id)",
"CREATE INDEX IF NOT EXISTS idx_course_progress_user ON course_progress(user_id)",
"CREATE INDEX IF NOT EXISTS idx_course_progress_course ON course_progress(course_id)",
"""CREATE TABLE IF NOT EXISTS watch_history (
id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, content_id TEXT,
content_type TEXT, episode_id TEXT, progress INTEGER DEFAULT 0,
watched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id)
)""",
"CREATE INDEX IF NOT EXISTS idx_watch_history_user ON watch_history(user_id)",
"CREATE INDEX IF NOT EXISTS idx_watch_history_content ON watch_history(content_id)",
"""CREATE TABLE IF NOT EXISTS site_settings (
key TEXT PRIMARY KEY,
value TEXT
)""",
"""CREATE TABLE IF NOT EXISTS promo_codes (
code TEXT PRIMARY KEY,
reward_type TEXT,
duration_days INTEGER,
max_uses INTEGER DEFAULT 1,
current_uses INTEGER DEFAULT 0,
is_active INTEGER DEFAULT 1
)""",
"""CREATE TABLE IF NOT EXISTS user_promos (
user_id TEXT,
code TEXT,
redeemed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, code)
)"""
]
for statement in schema:
await db.execute(statement)
# Manual migrations to handle existing tables
try:
await db.execute("ALTER TABLE users ADD COLUMN is_rewarded INTEGER DEFAULT 0")
except: pass # Column already exists
try:
await db.execute("ALTER TABLE promo_codes ADD COLUMN duration_days INTEGER")
await db.execute("ALTER TABLE promo_codes ADD COLUMN max_uses INTEGER DEFAULT 1")
await db.execute("ALTER TABLE promo_codes ADD COLUMN current_uses INTEGER DEFAULT 0")
await db.execute("ALTER TABLE promo_codes ADD COLUMN is_active INTEGER DEFAULT 1")
except: pass
await db.commit()
logger.info("Database initialized successfully (WAL Active)")
db_manager = Database(settings.DATABASE_NAME)