from __future__ import annotations import sqlite3 from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterator ACTIVE_TASK_STATUSES = {"pending", "running", "cancel_requested"} def utc_now() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") class Database: def __init__(self, path: Path, default_parallel_limit: int = 2) -> None: self.path = Path(path) self.default_parallel_limit = default_parallel_limit self.path.parent.mkdir(parents=True, exist_ok=True) def _connect(self) -> sqlite3.Connection: connection = sqlite3.connect(self.path, timeout=30) connection.row_factory = sqlite3.Row connection.execute("PRAGMA foreign_keys = ON") connection.execute("PRAGMA journal_mode = WAL") return connection @contextmanager def _cursor(self) -> Iterator[tuple[sqlite3.Connection, sqlite3.Cursor]]: connection = self._connect() try: cursor = connection.cursor() yield connection, cursor connection.commit() finally: connection.close() @staticmethod def _rows_to_dicts(rows: list[sqlite3.Row]) -> list[dict[str, Any]]: return [dict(row) for row in rows] def init_db(self) -> None: with self._cursor() as (_connection, cursor): cursor.executescript( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, student_id TEXT NOT NULL UNIQUE, password_encrypted TEXT NOT NULL, display_name TEXT NOT NULL DEFAULT '', is_active INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS course_targets ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, category TEXT NOT NULL DEFAULT 'free', course_id TEXT NOT NULL, course_index TEXT NOT NULL, created_at TEXT NOT NULL, UNIQUE(user_id, category, course_id, course_index), FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS admins ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS app_settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, status TEXT NOT NULL, requested_by TEXT NOT NULL, requested_by_role TEXT NOT NULL, last_error TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, started_at TEXT, finished_at TEXT, updated_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id INTEGER, user_id INTEGER, scope TEXT NOT NULL, level TEXT NOT NULL, message TEXT NOT NULL, created_at TEXT NOT NULL, FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); """ ) if self.get_setting("parallel_limit") is None: self.set_setting("parallel_limit", str(self.default_parallel_limit)) def get_setting(self, key: str) -> str | None: with self._cursor() as (_connection, cursor): row = cursor.execute("SELECT value FROM app_settings WHERE key = ?", (key,)).fetchone() return None if row is None else str(row["value"]) def set_setting(self, key: str, value: str) -> None: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( """ INSERT INTO app_settings (key, value, updated_at) VALUES (?, ?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at """, (key, value, now), ) def get_parallel_limit(self) -> int: raw_value = self.get_setting("parallel_limit") if raw_value is None: return self.default_parallel_limit try: return max(1, int(raw_value)) except ValueError: return self.default_parallel_limit def set_parallel_limit(self, limit: int) -> None: self.set_setting("parallel_limit", str(max(1, limit))) def create_user(self, student_id: str, password_encrypted: str, display_name: str = "") -> int: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( """ INSERT INTO users (student_id, password_encrypted, display_name, is_active, created_at, updated_at) VALUES (?, ?, ?, 1, ?, ?) """, (student_id.strip(), password_encrypted, display_name.strip(), now, now), ) return int(cursor.lastrowid) def update_user( self, user_id: int, *, password_encrypted: str | None = None, display_name: str | None = None, is_active: bool | None = None, ) -> None: assignments: list[str] = [] values: list[Any] = [] if password_encrypted is not None: assignments.append("password_encrypted = ?") values.append(password_encrypted) if display_name is not None: assignments.append("display_name = ?") values.append(display_name.strip()) if is_active is not None: assignments.append("is_active = ?") values.append(1 if is_active else 0) if not assignments: return assignments.append("updated_at = ?") values.append(utc_now()) values.append(user_id) with self._cursor() as (_connection, cursor): cursor.execute(f"UPDATE users SET {', '.join(assignments)} WHERE id = ?", tuple(values)) def toggle_user_active(self, user_id: int) -> dict[str, Any] | None: user = self.get_user(user_id) if not user: return None self.update_user(user_id, is_active=not bool(user["is_active"])) return self.get_user(user_id) def get_user(self, user_id: int) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): row = cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() return None if row is None else dict(row) def get_user_by_student_id(self, student_id: str) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): row = cursor.execute("SELECT * FROM users WHERE student_id = ?", (student_id.strip(),)).fetchone() return None if row is None else dict(row) def list_users(self) -> list[dict[str, Any]]: with self._cursor() as (_connection, cursor): rows = cursor.execute( """ SELECT u.*, COUNT(c.id) AS course_count, ( SELECT t.status FROM tasks t WHERE t.user_id = u.id ORDER BY t.created_at DESC LIMIT 1 ) AS latest_task_status, ( SELECT t.updated_at FROM tasks t WHERE t.user_id = u.id ORDER BY t.created_at DESC LIMIT 1 ) AS latest_task_updated_at FROM users u LEFT JOIN course_targets c ON c.user_id = u.id GROUP BY u.id ORDER BY u.student_id ASC """ ).fetchall() return self._rows_to_dicts(rows) def add_course(self, user_id: int, category: str, course_id: str, course_index: str) -> int | None: now = utc_now() normalized_category = "plan" if category == "plan" else "free" with self._cursor() as (_connection, cursor): cursor.execute( """ INSERT OR IGNORE INTO course_targets (user_id, category, course_id, course_index, created_at) VALUES (?, ?, ?, ?, ?) """, (user_id, normalized_category, course_id.strip(), course_index.strip(), now), ) return int(cursor.lastrowid) if cursor.lastrowid else None def delete_course(self, course_target_id: int) -> None: with self._cursor() as (_connection, cursor): cursor.execute("DELETE FROM course_targets WHERE id = ?", (course_target_id,)) def remove_course_by_identity(self, user_id: int, category: str, course_id: str, course_index: str) -> None: with self._cursor() as (_connection, cursor): cursor.execute( """ DELETE FROM course_targets WHERE user_id = ? AND category = ? AND course_id = ? AND course_index = ? """, (user_id, category, course_id, course_index), ) def list_courses_for_user(self, user_id: int) -> list[dict[str, Any]]: with self._cursor() as (_connection, cursor): rows = cursor.execute( """ SELECT * FROM course_targets WHERE user_id = ? ORDER BY category ASC, course_id ASC, course_index ASC """, (user_id,), ).fetchall() return self._rows_to_dicts(rows) def create_admin(self, username: str, password_hash: str) -> int: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( """ INSERT INTO admins (username, password_hash, created_at, updated_at) VALUES (?, ?, ?, ?) """, (username.strip(), password_hash, now, now), ) return int(cursor.lastrowid) def get_admin_by_username(self, username: str) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): row = cursor.execute("SELECT * FROM admins WHERE username = ?", (username.strip(),)).fetchone() return None if row is None else dict(row) def list_admins(self) -> list[dict[str, Any]]: with self._cursor() as (_connection, cursor): rows = cursor.execute( "SELECT id, username, created_at, updated_at FROM admins ORDER BY username ASC" ).fetchall() return self._rows_to_dicts(rows) def find_active_task_for_user(self, user_id: int) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): row = cursor.execute( """ SELECT * FROM tasks WHERE user_id = ? AND status IN ('pending', 'running', 'cancel_requested') ORDER BY created_at DESC LIMIT 1 """, (user_id,), ).fetchone() return None if row is None else dict(row) def create_task(self, user_id: int, requested_by: str, requested_by_role: str) -> int: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( """ INSERT INTO tasks (user_id, status, requested_by, requested_by_role, created_at, updated_at) VALUES (?, 'pending', ?, ?, ?, ?) """, (user_id, requested_by, requested_by_role, now, now), ) return int(cursor.lastrowid) def get_task(self, task_id: int) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): row = cursor.execute( """ SELECT t.*, u.student_id, u.display_name FROM tasks t JOIN users u ON u.id = t.user_id WHERE t.id = ? """, (task_id,), ).fetchone() return None if row is None else dict(row) def get_latest_task_for_user(self, user_id: int) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): row = cursor.execute( """ SELECT t.*, u.student_id, u.display_name FROM tasks t JOIN users u ON u.id = t.user_id WHERE t.user_id = ? ORDER BY t.created_at DESC LIMIT 1 """, (user_id,), ).fetchone() return None if row is None else dict(row) def list_pending_tasks(self, limit: int) -> list[dict[str, Any]]: with self._cursor() as (_connection, cursor): rows = cursor.execute( """ SELECT t.*, u.student_id, u.display_name, u.password_encrypted, u.is_active FROM tasks t JOIN users u ON u.id = t.user_id WHERE t.status = 'pending' ORDER BY t.created_at ASC LIMIT ? """, (limit,), ).fetchall() return self._rows_to_dicts(rows) def list_recent_tasks(self, limit: int = 20) -> list[dict[str, Any]]: with self._cursor() as (_connection, cursor): rows = cursor.execute( """ SELECT t.*, u.student_id, u.display_name FROM tasks t JOIN users u ON u.id = t.user_id ORDER BY t.created_at DESC LIMIT ? """, (limit,), ).fetchall() return self._rows_to_dicts(rows) def mark_task_running(self, task_id: int) -> None: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( """ UPDATE tasks SET status = 'running', started_at = COALESCE(started_at, ?), updated_at = ?, last_error = '' WHERE id = ? """, (now, now, task_id), ) def finish_task(self, task_id: int, status: str, last_error: str = "") -> None: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( """ UPDATE tasks SET status = ?, finished_at = ?, updated_at = ?, last_error = ? WHERE id = ? """, (status, now, now, last_error, task_id), ) def update_task_status(self, task_id: int, status: str, last_error: str = "") -> None: with self._cursor() as (_connection, cursor): cursor.execute( """ UPDATE tasks SET status = ?, updated_at = ?, last_error = ? WHERE id = ? """, (status, utc_now(), last_error, task_id), ) def request_task_stop(self, task_id: int) -> bool: task = self.get_task(task_id) if not task or task["status"] not in ACTIVE_TASK_STATUSES: return False with self._cursor() as (_connection, cursor): cursor.execute( """ UPDATE tasks SET status = 'cancel_requested', updated_at = ? WHERE id = ? """, (utc_now(), task_id), ) return True def reset_inflight_tasks(self) -> None: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( """ UPDATE tasks SET status = 'stopped', finished_at = COALESCE(finished_at, ?), updated_at = ? WHERE status IN ('pending', 'running', 'cancel_requested') """, (now, now), ) def add_log(self, task_id: int | None, user_id: int | None, scope: str, level: str, message: str) -> int: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( """ INSERT INTO logs (task_id, user_id, scope, level, message, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (task_id, user_id, scope, level.upper(), message, now), ) return int(cursor.lastrowid) def list_recent_logs(self, *, user_id: int | None = None, limit: int = 120) -> list[dict[str, Any]]: if user_id is None: query = """ SELECT l.*, u.student_id, u.display_name FROM logs l LEFT JOIN users u ON u.id = l.user_id ORDER BY l.id DESC LIMIT ? """ params = (limit,) else: query = """ SELECT l.*, u.student_id, u.display_name FROM logs l LEFT JOIN users u ON u.id = l.user_id WHERE l.user_id = ? ORDER BY l.id DESC LIMIT ? """ params = (user_id, limit) with self._cursor() as (_connection, cursor): rows = cursor.execute(query, params).fetchall() return list(reversed(self._rows_to_dicts(rows))) def list_logs_after(self, after_id: int, *, user_id: int | None = None, limit: int = 100) -> list[dict[str, Any]]: if user_id is None: query = """ SELECT l.*, u.student_id, u.display_name FROM logs l LEFT JOIN users u ON u.id = l.user_id WHERE l.id > ? ORDER BY l.id ASC LIMIT ? """ params = (after_id, limit) else: query = """ SELECT l.*, u.student_id, u.display_name FROM logs l LEFT JOIN users u ON u.id = l.user_id WHERE l.id > ? AND l.user_id = ? ORDER BY l.id ASC LIMIT ? """ params = (after_id, user_id, limit) with self._cursor() as (_connection, cursor): rows = cursor.execute(query, params).fetchall() return self._rows_to_dicts(rows) def get_admin_stats(self) -> dict[str, int]: with self._cursor() as (_connection, cursor): users_count = cursor.execute("SELECT COUNT(*) AS total FROM users").fetchone()["total"] courses_count = cursor.execute("SELECT COUNT(*) AS total FROM course_targets").fetchone()["total"] admins_count = cursor.execute("SELECT COUNT(*) AS total FROM admins").fetchone()["total"] running_count = cursor.execute("SELECT COUNT(*) AS total FROM tasks WHERE status = 'running'").fetchone()["total"] pending_count = cursor.execute("SELECT COUNT(*) AS total FROM tasks WHERE status = 'pending'").fetchone()["total"] return { "users_count": int(users_count), "courses_count": int(courses_count), "admins_count": int(admins_count) + 1, "running_count": int(running_count), "pending_count": int(pending_count), }