File size: 9,373 Bytes
11757af | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | import aiosqlite
from .config import settings
from contextlib import asynccontextmanager
import logging
logger = logging.getLogger("database")
class Database:
def __init__(self, db_path: str):
self.db_path = db_path
@asynccontextmanager
async def get_connection(self):
# We reuse a single connection if it exists to handle high-pressure tasks (300 req/min)
if hasattr(self, '_persistent_db') and self._persistent_db:
yield self._persistent_db
return
# Added timeout=30.0 to handle busy database instead of failing immediately
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA journal_mode=WAL") # Enable WAL mode for high concurrency
await db.execute("PRAGMA synchronous=NORMAL")
try:
yield db
except Exception as e:
logger.error(f"Database connection error: {e}")
raise e
async def start_session(self):
"""Starts a persistent session for high-performance bulk operations."""
if not hasattr(self, '_persistent_db') or not self._persistent_db:
self._persistent_db = await aiosqlite.connect(self.db_path, timeout=60.0)
self._persistent_db.row_factory = aiosqlite.Row
await self._persistent_db.execute("PRAGMA journal_mode=WAL")
await self._persistent_db.execute("PRAGMA synchronous=NORMAL")
logger.info("Persistent DB session started with WAL mode")
async def close_session(self):
"""Closes the persistent session."""
if hasattr(self, '_persistent_db') and self._persistent_db:
await self._persistent_db.close()
self._persistent_db = None
logger.info("Persistent DB session closed")
async def execute_write(self, query: str, params: tuple = ()) -> int:
"""Executes a write operation with retry logic for locked databases."""
max_retries = 5
for attempt in range(max_retries):
async with self.get_connection() as db:
try:
cursor = await db.execute(query, params)
await db.commit()
return cursor.lastrowid
except aiosqlite.OperationalError as e:
if "locked" in str(e).lower() and attempt < max_retries - 1:
wait_time = 0.5 * (attempt + 1)
logger.warning(f"Database locked, retrying write in {wait_time}s... (Attempt {attempt+1}/{max_retries})")
await asyncio.sleep(wait_time)
continue
raise e
except Exception as e:
if not hasattr(self, '_persistent_db') or not self._persistent_db:
await db.rollback()
logger.error(f"Database write error: {e} | Query: {query}")
raise e
async def fetch_one(self, query: str, params: tuple = ()):
"""Safely fetches a single row."""
async with self.get_connection() as db:
try:
async with db.execute(query, params) as cursor:
return await cursor.fetchone()
except Exception as e:
logger.error(f"Database fetch_one error: {e} | Query: {query}")
return None
async def fetch_all(self, query: str, params: tuple = ()):
"""Safely fetches all rows."""
async with self.get_connection() as db:
try:
async with db.execute(query, params) as cursor:
return await cursor.fetchall()
except Exception as e:
logger.error(f"Database fetch_all error: {e} | Query: {query}")
return []
async def init_db(self):
async with self.get_connection() as db:
# Enable WAL mode and set global safe timeout
await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA busy_timeout = 30000") # 30 seconds global wait
# Reusing the existing schema logic but organized
schema = [
"""CREATE TABLE IF NOT EXISTS movies (
id TEXT PRIMARY KEY, title TEXT, poster TEXT, year TEXT,
rating TEXT, description TEXT, category TEXT
)""",
"""CREATE TABLE IF NOT EXISTS series (
id TEXT PRIMARY KEY, title TEXT, poster TEXT, year TEXT,
rating TEXT, description TEXT, category TEXT
)""",
"""CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT, series_id TEXT,
episode_number INTEGER, title TEXT, watch_link TEXT,
FOREIGN KEY(series_id) REFERENCES series(id)
)""",
"""CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
points INTEGER DEFAULT 0,
watch_time_total INTEGER DEFAULT 0,
last_watch_reward_time INTEGER DEFAULT 0,
is_fan INTEGER DEFAULT 0,
ad_free_until INTEGER DEFAULT 0,
is_rewarded INTEGER DEFAULT 0,
referrer_id TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""",
"""CREATE TABLE IF NOT EXISTS referrals (
id INTEGER PRIMARY KEY AUTOINCREMENT, referrer_id TEXT, referred_id TEXT,
status TEXT DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(referrer_id, referred_id)
)""",
"""CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT, content_id TEXT, user_id TEXT,
text TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id)
)""",
"""CREATE TABLE IF NOT EXISTS course_progress (
id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, course_id TEXT,
lesson_id TEXT, completed INTEGER DEFAULT 0, last_position INTEGER DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, course_id, lesson_id),
FOREIGN KEY(user_id) REFERENCES users(id)
)""",
"CREATE INDEX IF NOT EXISTS idx_users_referrer ON users(referrer_id)",
"CREATE INDEX IF NOT EXISTS idx_referrals_referrer ON referrals(referrer_id)",
"CREATE INDEX IF NOT EXISTS idx_comments_content ON comments(content_id)",
"CREATE INDEX IF NOT EXISTS idx_course_progress_user ON course_progress(user_id)",
"CREATE INDEX IF NOT EXISTS idx_course_progress_course ON course_progress(course_id)",
"""CREATE TABLE IF NOT EXISTS watch_history (
id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, content_id TEXT,
content_type TEXT, episode_id TEXT, progress INTEGER DEFAULT 0,
watched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(user_id) REFERENCES users(id)
)""",
"CREATE INDEX IF NOT EXISTS idx_watch_history_user ON watch_history(user_id)",
"CREATE INDEX IF NOT EXISTS idx_watch_history_content ON watch_history(content_id)",
"""CREATE TABLE IF NOT EXISTS site_settings (
key TEXT PRIMARY KEY,
value TEXT
)""",
"""CREATE TABLE IF NOT EXISTS promo_codes (
code TEXT PRIMARY KEY,
reward_type TEXT,
duration_days INTEGER,
max_uses INTEGER DEFAULT 1,
current_uses INTEGER DEFAULT 0,
is_active INTEGER DEFAULT 1
)""",
"""CREATE TABLE IF NOT EXISTS user_promos (
user_id TEXT,
code TEXT,
redeemed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, code)
)"""
]
for statement in schema:
await db.execute(statement)
# Manual migrations to handle existing tables
try:
await db.execute("ALTER TABLE users ADD COLUMN is_rewarded INTEGER DEFAULT 0")
except: pass # Column already exists
try:
await db.execute("ALTER TABLE promo_codes ADD COLUMN duration_days INTEGER")
await db.execute("ALTER TABLE promo_codes ADD COLUMN max_uses INTEGER DEFAULT 1")
await db.execute("ALTER TABLE promo_codes ADD COLUMN current_uses INTEGER DEFAULT 0")
await db.execute("ALTER TABLE promo_codes ADD COLUMN is_active INTEGER DEFAULT 1")
except: pass
await db.commit()
logger.info("Database initialized successfully (WAL Active)")
db_manager = Database(settings.DATABASE_NAME)
|