File size: 9,373 Bytes
11757af
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import aiosqlite
from .config import settings
from contextlib import asynccontextmanager
import logging

logger = logging.getLogger("database")

class Database:
    def __init__(self, db_path: str):
        self.db_path = db_path

    @asynccontextmanager
    async def get_connection(self):
        # We reuse a single connection if it exists to handle high-pressure tasks (300 req/min)
        if hasattr(self, '_persistent_db') and self._persistent_db:
            yield self._persistent_db
            return

        # Added timeout=30.0 to handle busy database instead of failing immediately
        async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
            db.row_factory = aiosqlite.Row
            await db.execute("PRAGMA journal_mode=WAL") # Enable WAL mode for high concurrency
            await db.execute("PRAGMA synchronous=NORMAL")
            try:
                yield db
            except Exception as e:
                logger.error(f"Database connection error: {e}")
                raise e

    async def start_session(self):
        """Starts a persistent session for high-performance bulk operations."""
        if not hasattr(self, '_persistent_db') or not self._persistent_db:
            self._persistent_db = await aiosqlite.connect(self.db_path, timeout=60.0)
            self._persistent_db.row_factory = aiosqlite.Row
            await self._persistent_db.execute("PRAGMA journal_mode=WAL")
            await self._persistent_db.execute("PRAGMA synchronous=NORMAL")
            logger.info("Persistent DB session started with WAL mode")

    async def close_session(self):
        """Closes the persistent session."""
        if hasattr(self, '_persistent_db') and self._persistent_db:
            await self._persistent_db.close()
            self._persistent_db = None
            logger.info("Persistent DB session closed")

    async def execute_write(self, query: str, params: tuple = ()) -> int:
        """Executes a write operation with retry logic for locked databases."""
        max_retries = 5
        for attempt in range(max_retries):
            async with self.get_connection() as db:
                try:
                    cursor = await db.execute(query, params)
                    await db.commit()
                    return cursor.lastrowid
                except aiosqlite.OperationalError as e:
                    if "locked" in str(e).lower() and attempt < max_retries - 1:
                        wait_time = 0.5 * (attempt + 1)
                        logger.warning(f"Database locked, retrying write in {wait_time}s... (Attempt {attempt+1}/{max_retries})")
                        await asyncio.sleep(wait_time)
                        continue
                    raise e
                except Exception as e:
                    if not hasattr(self, '_persistent_db') or not self._persistent_db:
                        await db.rollback()
                    logger.error(f"Database write error: {e} | Query: {query}")
                    raise e

    async def fetch_one(self, query: str, params: tuple = ()):
        """Safely fetches a single row."""
        async with self.get_connection() as db:
            try:
                async with db.execute(query, params) as cursor:
                    return await cursor.fetchone()
            except Exception as e:
                logger.error(f"Database fetch_one error: {e} | Query: {query}")
                return None

    async def fetch_all(self, query: str, params: tuple = ()):
        """Safely fetches all rows."""
        async with self.get_connection() as db:
            try:
                async with db.execute(query, params) as cursor:
                    return await cursor.fetchall()
            except Exception as e:
                logger.error(f"Database fetch_all error: {e} | Query: {query}")
                return []

    async def init_db(self):
        async with self.get_connection() as db:
            # Enable WAL mode and set global safe timeout
            await db.execute("PRAGMA journal_mode=WAL")
            await db.execute("PRAGMA busy_timeout = 30000") # 30 seconds global wait
            
            # Reusing the existing schema logic but organized
            schema = [
                """CREATE TABLE IF NOT EXISTS movies (
                    id TEXT PRIMARY KEY, title TEXT, poster TEXT, year TEXT, 
                    rating TEXT, description TEXT, category TEXT
                )""",
                """CREATE TABLE IF NOT EXISTS series (
                    id TEXT PRIMARY KEY, title TEXT, poster TEXT, year TEXT, 
                    rating TEXT, description TEXT, category TEXT
                )""",
                """CREATE TABLE IF NOT EXISTS episodes (
                    id INTEGER PRIMARY KEY AUTOINCREMENT, series_id TEXT, 
                    episode_number INTEGER, title TEXT, watch_link TEXT,
                    FOREIGN KEY(series_id) REFERENCES series(id)
                )""",
                """CREATE TABLE IF NOT EXISTS users (
                    id TEXT PRIMARY KEY,
                    points INTEGER DEFAULT 0,
                    watch_time_total INTEGER DEFAULT 0,
                    last_watch_reward_time INTEGER DEFAULT 0,
                    is_fan INTEGER DEFAULT 0,
                    ad_free_until INTEGER DEFAULT 0,
                    is_rewarded INTEGER DEFAULT 0,
                    referrer_id TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )""",
                """CREATE TABLE IF NOT EXISTS referrals (
                    id INTEGER PRIMARY KEY AUTOINCREMENT, referrer_id TEXT, referred_id TEXT,
                    status TEXT DEFAULT 'pending', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    UNIQUE(referrer_id, referred_id)
                )""",
                """CREATE TABLE IF NOT EXISTS comments (
                    id INTEGER PRIMARY KEY AUTOINCREMENT, content_id TEXT, user_id TEXT,
                    text TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY(user_id) REFERENCES users(id)
                )""",
                """CREATE TABLE IF NOT EXISTS course_progress (
                    id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, course_id TEXT,
                    lesson_id TEXT, completed INTEGER DEFAULT 0, last_position INTEGER DEFAULT 0,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    UNIQUE(user_id, course_id, lesson_id),
                    FOREIGN KEY(user_id) REFERENCES users(id)
                )""",
                "CREATE INDEX IF NOT EXISTS idx_users_referrer ON users(referrer_id)",
                "CREATE INDEX IF NOT EXISTS idx_referrals_referrer ON referrals(referrer_id)",
                "CREATE INDEX IF NOT EXISTS idx_comments_content ON comments(content_id)",
                "CREATE INDEX IF NOT EXISTS idx_course_progress_user ON course_progress(user_id)",
                "CREATE INDEX IF NOT EXISTS idx_course_progress_course ON course_progress(course_id)",
                """CREATE TABLE IF NOT EXISTS watch_history (
                    id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT, content_id TEXT,
                    content_type TEXT, episode_id TEXT, progress INTEGER DEFAULT 0,
                    watched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY(user_id) REFERENCES users(id)
                )""",
                "CREATE INDEX IF NOT EXISTS idx_watch_history_user ON watch_history(user_id)",
                "CREATE INDEX IF NOT EXISTS idx_watch_history_content ON watch_history(content_id)",
                """CREATE TABLE IF NOT EXISTS site_settings (
                    key TEXT PRIMARY KEY,
                    value TEXT
                )""",
                """CREATE TABLE IF NOT EXISTS promo_codes (
                    code TEXT PRIMARY KEY,
                    reward_type TEXT,
                    duration_days INTEGER,
                    max_uses INTEGER DEFAULT 1,
                    current_uses INTEGER DEFAULT 0,
                    is_active INTEGER DEFAULT 1
                )""",
                """CREATE TABLE IF NOT EXISTS user_promos (
                    user_id TEXT,
                    code TEXT,
                    redeemed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    PRIMARY KEY (user_id, code)
                )"""
            ]
            for statement in schema:
                await db.execute(statement)

            # Manual migrations to handle existing tables
            try:
                await db.execute("ALTER TABLE users ADD COLUMN is_rewarded INTEGER DEFAULT 0")
            except: pass # Column already exists
            
            try:
                await db.execute("ALTER TABLE promo_codes ADD COLUMN duration_days INTEGER")
                await db.execute("ALTER TABLE promo_codes ADD COLUMN max_uses INTEGER DEFAULT 1")
                await db.execute("ALTER TABLE promo_codes ADD COLUMN current_uses INTEGER DEFAULT 0")
                await db.execute("ALTER TABLE promo_codes ADD COLUMN is_active INTEGER DEFAULT 1")
            except: pass 

            await db.commit()
            logger.info("Database initialized successfully (WAL Active)")

db_manager = Database(settings.DATABASE_NAME)