from __future__ import annotations import secrets import sqlite3 import string from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterator from urllib.parse import parse_qs, unquote, urlparse try: import pymysql from pymysql.cursors import DictCursor as MySQLDictCursor except ImportError: # pragma: no cover - optional dependency in local SQLite mode pymysql = None MySQLDictCursor = None ACTIVE_TASK_STATUSES = {"pending", "running", "cancel_requested"} DEFAULT_REFRESH_INTERVAL_SECONDS = 10 MIN_REFRESH_INTERVAL_SECONDS = 1 MAX_REFRESH_INTERVAL_SECONDS = 120 DEFAULT_REGISTRATION_CODE_MAX_USES = 1 REGISTRATION_CODE_ALPHABET = string.ascii_uppercase + string.digits def utc_now() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") def clamp_refresh_interval_seconds(value: int | str | None, default: int = DEFAULT_REFRESH_INTERVAL_SECONDS) -> int: try: normalized = int(value or default) except (TypeError, ValueError): normalized = default return max(MIN_REFRESH_INTERVAL_SECONDS, min(MAX_REFRESH_INTERVAL_SECONDS, normalized)) def normalize_registration_code(value: str) -> str: return str(value or "").strip().upper() class Database: def __init__( self, path: Path | str, default_parallel_limit: int = 4, *, mysql_ssl_ca_path: Path | str | None = None, ) -> None: self.default_parallel_limit = default_parallel_limit self._mysql_ssl_ca_path = Path(mysql_ssl_ca_path).resolve() if mysql_ssl_ca_path else None self._dialect = "sqlite" self._sqlite_path: Path | None = None self._mysql_options: dict[str, Any] = {} self._mysql_database_name = "" raw_path = str(path) if isinstance(path, Path) or "://" not in raw_path: self._sqlite_path = Path(path).resolve() self._sqlite_path.parent.mkdir(parents=True, exist_ok=True) self.path = self._sqlite_path elif raw_path.startswith("sqlite:///"): self._sqlite_path = Path(raw_path[len("sqlite:///") :]).resolve() self._sqlite_path.parent.mkdir(parents=True, exist_ok=True) self.path = self._sqlite_path elif raw_path.startswith("mysql://") or raw_path.startswith("mysql+pymysql://"): self._dialect = "mysql" self._mysql_options = self._parse_mysql_options(raw_path) self._mysql_database_name = str(self._mysql_options["database"]) self.path = self._mysql_display_label() else: raise ValueError(f"Unsupported database target: {raw_path}") @property def is_mysql(self) -> bool: return self._dialect == "mysql" def _mysql_display_label(self) -> str: if not self._mysql_options: return "mysql://unknown" return ( f"mysql://{self._mysql_options.get('user', '')}@" f"{self._mysql_options.get('host', '')}:{self._mysql_options.get('port', '')}/" f"{self._mysql_options.get('database', '')}" ) @staticmethod def _parse_mysql_options(raw_url: str) -> dict[str, Any]: normalized_url = raw_url.replace("mysql+pymysql://", "mysql://", 1) parsed = urlparse(normalized_url) return { "host": parsed.hostname or "localhost", "port": parsed.port or 3306, "user": unquote(parsed.username or ""), "password": unquote(parsed.password or ""), "database": (parsed.path or "/").lstrip("/"), "query": parse_qs(parsed.query), } def _connect(self): if not self.is_mysql: connection = sqlite3.connect(self._sqlite_path, timeout=30, check_same_thread=False) connection.row_factory = sqlite3.Row connection.execute("PRAGMA foreign_keys = ON") connection.execute("PRAGMA journal_mode = WAL") return connection if pymysql is None or MySQLDictCursor is None: raise RuntimeError("PyMySQL is required when DATABASE_URL/SQL_PASSWORD enables MySQL persistence.") ssl_config: dict[str, str] | None = None ssl_mode = str(self._mysql_options.get("query", {}).get("ssl-mode", [""])[0]).upper() if ssl_mode == "REQUIRED" or self._mysql_ssl_ca_path is not None: if self._mysql_ssl_ca_path is None or not self._mysql_ssl_ca_path.exists(): raise RuntimeError("MySQL SSL is enabled but ca.pem was not found.") ssl_config = {"ca": str(self._mysql_ssl_ca_path)} return pymysql.connect( host=self._mysql_options["host"], port=int(self._mysql_options["port"]), user=self._mysql_options["user"], password=self._mysql_options["password"], database=self._mysql_options["database"], charset="utf8mb4", cursorclass=MySQLDictCursor, autocommit=False, ssl=ssl_config, connect_timeout=10, read_timeout=30, write_timeout=30, ) @contextmanager def _cursor(self) -> Iterator[tuple[Any, Any]]: connection = self._connect() try: cursor = connection.cursor() yield connection, cursor connection.commit() finally: connection.close() def _placeholder(self, name: str) -> str: return f":{name}" if not self.is_mysql else f"%({name})s" @staticmethod def _rows_to_dicts(rows: list[Any]) -> list[dict[str, Any]]: return [dict(row) for row in rows] @staticmethod def _normalize_course_identity(value: str) -> str: return str(value or "").strip().upper() def _column_exists(self, cursor: Any, table_name: str, column_name: str) -> bool: if not self.is_mysql: rows = cursor.execute(f"PRAGMA table_info({table_name})").fetchall() return any(str(row[1]) == column_name for row in rows) query = ( f"SELECT COUNT(*) AS total FROM information_schema.columns " f"WHERE table_schema = {self._placeholder('schema_name')} " f"AND table_name = {self._placeholder('table_name')} " f"AND column_name = {self._placeholder('column_name')}" ) cursor.execute( query, { "schema_name": self._mysql_database_name, "table_name": table_name, "column_name": column_name, }, ) row = cursor.fetchone() return int(dict(row).get("total", 0)) > 0 def _ensure_column(self, table_name: str, column_name: str, definition: str) -> None: with self._cursor() as (_connection, cursor): if self._column_exists(cursor, table_name, column_name): return cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {definition}") def _create_tables_sqlite(self, cursor: Any) -> None: cursor.executescript( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, student_id TEXT NOT NULL UNIQUE, password_encrypted TEXT NOT NULL, display_name TEXT NOT NULL DEFAULT '', is_active INTEGER NOT NULL DEFAULT 1, refresh_interval_seconds INTEGER NOT NULL DEFAULT 10, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS course_targets ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, category TEXT NOT NULL DEFAULT 'free', course_id TEXT NOT NULL, course_index TEXT NOT NULL, created_at TEXT NOT NULL, UNIQUE(user_id, category, course_id, course_index), FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS admins ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS app_settings ( `key` TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, status TEXT NOT NULL, requested_by TEXT NOT NULL, requested_by_role TEXT NOT NULL, last_error TEXT NOT NULL DEFAULT '', total_attempts INTEGER NOT NULL DEFAULT 0, total_errors INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL, started_at TEXT, finished_at TEXT, updated_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id INTEGER, user_id INTEGER, scope TEXT NOT NULL, level TEXT NOT NULL, message TEXT NOT NULL, created_at TEXT NOT NULL, FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS user_schedules ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL UNIQUE, is_enabled INTEGER NOT NULL DEFAULT 0, start_date TEXT, end_date TEXT, daily_start_time TEXT, daily_stop_time TEXT, last_auto_start_on TEXT NOT NULL DEFAULT '', last_auto_stop_on TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS registration_codes ( id INTEGER PRIMARY KEY AUTOINCREMENT, code TEXT NOT NULL UNIQUE, note TEXT NOT NULL DEFAULT '', is_active INTEGER NOT NULL DEFAULT 1, max_uses INTEGER NOT NULL DEFAULT 1, used_count INTEGER NOT NULL DEFAULT 0, created_by TEXT NOT NULL DEFAULT '', used_by_user_id INTEGER, used_at TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(used_by_user_id) REFERENCES users(id) ON DELETE SET NULL ); """ ) def _create_tables_mysql(self, cursor: Any) -> None: statements = [ """ CREATE TABLE IF NOT EXISTS users ( id BIGINT PRIMARY KEY AUTO_INCREMENT, student_id VARCHAR(32) NOT NULL UNIQUE, password_encrypted TEXT NOT NULL, display_name VARCHAR(255) NOT NULL DEFAULT '', is_active TINYINT(1) NOT NULL DEFAULT 1, refresh_interval_seconds INT NOT NULL DEFAULT 10, created_at VARCHAR(32) NOT NULL, updated_at VARCHAR(32) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, """ CREATE TABLE IF NOT EXISTS course_targets ( id BIGINT PRIMARY KEY AUTO_INCREMENT, user_id BIGINT NOT NULL, category VARCHAR(32) NOT NULL DEFAULT 'free', course_id VARCHAR(64) NOT NULL, course_index VARCHAR(32) NOT NULL, created_at VARCHAR(32) NOT NULL, UNIQUE KEY uq_course_targets_identity (user_id, category, course_id, course_index), CONSTRAINT fk_course_targets_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, """ CREATE TABLE IF NOT EXISTS admins ( id BIGINT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(255) NOT NULL UNIQUE, password_hash TEXT NOT NULL, created_at VARCHAR(32) NOT NULL, updated_at VARCHAR(32) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, """ CREATE TABLE IF NOT EXISTS app_settings ( `key` VARCHAR(191) PRIMARY KEY, value TEXT NOT NULL, updated_at VARCHAR(32) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, """ CREATE TABLE IF NOT EXISTS tasks ( id BIGINT PRIMARY KEY AUTO_INCREMENT, user_id BIGINT NOT NULL, status VARCHAR(32) NOT NULL, requested_by VARCHAR(255) NOT NULL, requested_by_role VARCHAR(32) NOT NULL, last_error TEXT NOT NULL, total_attempts INT NOT NULL DEFAULT 0, total_errors INT NOT NULL DEFAULT 0, created_at VARCHAR(32) NOT NULL, started_at VARCHAR(32), finished_at VARCHAR(32), updated_at VARCHAR(32) NOT NULL, CONSTRAINT fk_tasks_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, """ CREATE TABLE IF NOT EXISTS logs ( id BIGINT PRIMARY KEY AUTO_INCREMENT, task_id BIGINT NULL, user_id BIGINT NULL, scope VARCHAR(64) NOT NULL, level VARCHAR(32) NOT NULL, message TEXT NOT NULL, created_at VARCHAR(32) NOT NULL, CONSTRAINT fk_logs_task FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE, CONSTRAINT fk_logs_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, """ CREATE TABLE IF NOT EXISTS user_schedules ( id BIGINT PRIMARY KEY AUTO_INCREMENT, user_id BIGINT NOT NULL UNIQUE, is_enabled TINYINT(1) NOT NULL DEFAULT 0, start_date VARCHAR(10) NULL, end_date VARCHAR(10) NULL, daily_start_time VARCHAR(5) NULL, daily_stop_time VARCHAR(5) NULL, last_auto_start_on VARCHAR(10) NOT NULL DEFAULT '', last_auto_stop_on VARCHAR(10) NOT NULL DEFAULT '', created_at VARCHAR(32) NOT NULL, updated_at VARCHAR(32) NOT NULL, CONSTRAINT fk_user_schedules_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, """ CREATE TABLE IF NOT EXISTS registration_codes ( id BIGINT PRIMARY KEY AUTO_INCREMENT, code VARCHAR(64) NOT NULL UNIQUE, note TEXT NOT NULL, is_active TINYINT(1) NOT NULL DEFAULT 1, max_uses INT NOT NULL DEFAULT 1, used_count INT NOT NULL DEFAULT 0, created_by VARCHAR(255) NOT NULL DEFAULT '', used_by_user_id BIGINT NULL, used_at VARCHAR(32) NULL, created_at VARCHAR(32) NOT NULL, updated_at VARCHAR(32) NOT NULL, CONSTRAINT fk_registration_codes_user FOREIGN KEY (used_by_user_id) REFERENCES users(id) ON DELETE SET NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """, ] for statement in statements: cursor.execute(statement) def init_db(self) -> None: with self._cursor() as (_connection, cursor): if self.is_mysql: self._create_tables_mysql(cursor) else: self._create_tables_sqlite(cursor) self._ensure_column( "users", "refresh_interval_seconds", f"INT NOT NULL DEFAULT {DEFAULT_REFRESH_INTERVAL_SECONDS}", ) self._ensure_column("tasks", "total_attempts", "INT NOT NULL DEFAULT 0") self._ensure_column("tasks", "total_errors", "INT NOT NULL DEFAULT 0") if self.get_setting("parallel_limit") is None: self.set_setting("parallel_limit", str(self.default_parallel_limit)) def get_setting(self, key: str) -> str | None: query = f"SELECT value FROM app_settings WHERE `key` = {self._placeholder('key')}" with self._cursor() as (_connection, cursor): cursor.execute(query, {"key": key}) row = cursor.fetchone() return None if row is None else str(dict(row)["value"]) def set_setting(self, key: str, value: str) -> None: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( f"SELECT value FROM app_settings WHERE `key` = {self._placeholder('key')}", {"key": key}, ) existing = cursor.fetchone() if existing is None: cursor.execute( f"INSERT INTO app_settings (`key`, value, updated_at) VALUES ({self._placeholder('key')}, {self._placeholder('value')}, {self._placeholder('updated_at')})", {"key": key, "value": value, "updated_at": now}, ) else: cursor.execute( f"UPDATE app_settings SET value = {self._placeholder('value')}, updated_at = {self._placeholder('updated_at')} WHERE `key` = {self._placeholder('key')}", {"key": key, "value": value, "updated_at": now}, ) def get_parallel_limit(self) -> int: raw_value = self.get_setting("parallel_limit") if raw_value is None: return self.default_parallel_limit try: return max(1, int(raw_value)) except ValueError: return self.default_parallel_limit def set_parallel_limit(self, limit: int) -> None: self.set_setting("parallel_limit", str(max(1, limit))) def create_user( self, student_id: str, password_encrypted: str, display_name: str = "", *, refresh_interval_seconds: int = DEFAULT_REFRESH_INTERVAL_SECONDS, ) -> int: now = utc_now() refresh_interval = clamp_refresh_interval_seconds(refresh_interval_seconds) with self._cursor() as (_connection, cursor): cursor.execute( f"INSERT INTO users (student_id, password_encrypted, display_name, is_active, refresh_interval_seconds, created_at, updated_at) VALUES ({self._placeholder('student_id')}, {self._placeholder('password_encrypted')}, {self._placeholder('display_name')}, {self._placeholder('is_active')}, {self._placeholder('refresh_interval_seconds')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", { "student_id": student_id.strip(), "password_encrypted": password_encrypted, "display_name": display_name.strip(), "is_active": 1, "refresh_interval_seconds": refresh_interval, "created_at": now, "updated_at": now, }, ) return int(cursor.lastrowid) def register_user_with_code( self, registration_code: str, student_id: str, password_encrypted: str, display_name: str = "", *, refresh_interval_seconds: int = DEFAULT_REFRESH_INTERVAL_SECONDS, ) -> int: now = utc_now() normalized_code = normalize_registration_code(registration_code) normalized_student_id = str(student_id or "").strip() refresh_interval = clamp_refresh_interval_seconds(refresh_interval_seconds) with self._cursor() as (_connection, cursor): cursor.execute( f"SELECT * FROM registration_codes WHERE code = {self._placeholder('code')}", {"code": normalized_code}, ) code_row = cursor.fetchone() if code_row is None: raise ValueError("注册码不存在。") code_payload = dict(code_row) if not bool(code_payload.get("is_active", 0)): raise ValueError("注册码已停用。") if int(code_payload.get("used_count", 0)) >= int(code_payload.get("max_uses", 1)): raise ValueError("注册码已使用完毕。") cursor.execute( f"SELECT id FROM users WHERE student_id = {self._placeholder('student_id')}", {"student_id": normalized_student_id}, ) if cursor.fetchone() is not None: raise ValueError("该学号已经注册。") cursor.execute( f"INSERT INTO users (student_id, password_encrypted, display_name, is_active, refresh_interval_seconds, created_at, updated_at) VALUES ({self._placeholder('student_id')}, {self._placeholder('password_encrypted')}, {self._placeholder('display_name')}, {self._placeholder('is_active')}, {self._placeholder('refresh_interval_seconds')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", { "student_id": normalized_student_id, "password_encrypted": password_encrypted, "display_name": display_name.strip(), "is_active": 1, "refresh_interval_seconds": refresh_interval, "created_at": now, "updated_at": now, }, ) user_id = int(cursor.lastrowid) used_count = int(code_payload.get("used_count", 0)) + 1 max_uses = int(code_payload.get("max_uses", 1)) cursor.execute( f"UPDATE registration_codes SET used_count = {self._placeholder('used_count')}, used_by_user_id = {self._placeholder('used_by_user_id')}, used_at = {self._placeholder('used_at')}, is_active = {self._placeholder('is_active')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('id')}", { "id": int(code_payload["id"]), "used_count": used_count, "used_by_user_id": user_id, "used_at": now, "is_active": 0 if used_count >= max_uses else 1, "updated_at": now, }, ) return user_id def update_user( self, user_id: int, *, password_encrypted: str | None = None, display_name: str | None = None, is_active: bool | None = None, refresh_interval_seconds: int | None = None, ) -> None: assignments: list[str] = [] values: dict[str, Any] = {"user_id": user_id, "updated_at": utc_now()} if password_encrypted is not None: assignments.append(f"password_encrypted = {self._placeholder('password_encrypted')}") values["password_encrypted"] = password_encrypted if display_name is not None: assignments.append(f"display_name = {self._placeholder('display_name')}") values["display_name"] = display_name.strip() if is_active is not None: assignments.append(f"is_active = {self._placeholder('is_active')}") values["is_active"] = 1 if is_active else 0 if refresh_interval_seconds is not None: assignments.append(f"refresh_interval_seconds = {self._placeholder('refresh_interval_seconds')}") values["refresh_interval_seconds"] = clamp_refresh_interval_seconds(refresh_interval_seconds) if not assignments: return assignments.append(f"updated_at = {self._placeholder('updated_at')}") with self._cursor() as (_connection, cursor): cursor.execute( f"UPDATE users SET {', '.join(assignments)} WHERE id = {self._placeholder('user_id')}", values, ) def delete_user(self, user_id: int) -> None: with self._cursor() as (_connection, cursor): cursor.execute( f"DELETE FROM users WHERE id = {self._placeholder('user_id')}", {"user_id": user_id}, ) def toggle_user_active(self, user_id: int) -> dict[str, Any] | None: user = self.get_user(user_id) if not user: return None self.update_user(user_id, is_active=not bool(user["is_active"])) return self.get_user(user_id) def get_user(self, user_id: int) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): cursor.execute( f"SELECT * FROM users WHERE id = {self._placeholder('user_id')}", {"user_id": user_id}, ) row = cursor.fetchone() return None if row is None else dict(row) def get_user_by_student_id(self, student_id: str) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): cursor.execute( f"SELECT * FROM users WHERE student_id = {self._placeholder('student_id')}", {"student_id": student_id.strip()}, ) row = cursor.fetchone() return None if row is None else dict(row) def list_users(self) -> list[dict[str, Any]]: query = f""" SELECT u.*, (SELECT COUNT(*) FROM course_targets c WHERE c.user_id = u.id) AS course_count, (SELECT t.status FROM tasks t WHERE t.user_id = u.id ORDER BY t.created_at DESC LIMIT 1) AS latest_task_status, (SELECT t.updated_at FROM tasks t WHERE t.user_id = u.id ORDER BY t.created_at DESC LIMIT 1) AS latest_task_updated_at FROM users u ORDER BY u.student_id ASC """ with self._cursor() as (_connection, cursor): cursor.execute(query) return self._rows_to_dicts(cursor.fetchall()) def add_course(self, user_id: int, category: str, course_id: str, course_index: str) -> int | None: now = utc_now() normalized_category = "plan" if category == "plan" else "free" normalized_course_id = self._normalize_course_identity(course_id) normalized_course_index = self._normalize_course_identity(course_index) with self._cursor() as (_connection, cursor): cursor.execute( f"SELECT id FROM course_targets WHERE user_id = {self._placeholder('user_id')} AND category = {self._placeholder('category')} AND course_id = {self._placeholder('course_id')} AND course_index = {self._placeholder('course_index')}", { "user_id": user_id, "category": normalized_category, "course_id": normalized_course_id, "course_index": normalized_course_index, }, ) if cursor.fetchone() is not None: return None cursor.execute( f"INSERT INTO course_targets (user_id, category, course_id, course_index, created_at) VALUES ({self._placeholder('user_id')}, {self._placeholder('category')}, {self._placeholder('course_id')}, {self._placeholder('course_index')}, {self._placeholder('created_at')})", { "user_id": user_id, "category": normalized_category, "course_id": normalized_course_id, "course_index": normalized_course_index, "created_at": now, }, ) return int(cursor.lastrowid) def delete_course(self, course_target_id: int) -> None: with self._cursor() as (_connection, cursor): cursor.execute( f"DELETE FROM course_targets WHERE id = {self._placeholder('course_target_id')}", {"course_target_id": course_target_id}, ) def remove_course_by_identity(self, user_id: int, category: str, course_id: str, course_index: str) -> None: with self._cursor() as (_connection, cursor): cursor.execute( f"DELETE FROM course_targets WHERE user_id = {self._placeholder('user_id')} AND category = {self._placeholder('category')} AND course_id = {self._placeholder('course_id')} AND course_index = {self._placeholder('course_index')}", { "user_id": user_id, "category": category, "course_id": self._normalize_course_identity(course_id), "course_index": self._normalize_course_identity(course_index), }, ) def list_courses_for_user(self, user_id: int) -> list[dict[str, Any]]: with self._cursor() as (_connection, cursor): cursor.execute( f"SELECT * FROM course_targets WHERE user_id = {self._placeholder('user_id')} ORDER BY category ASC, course_id ASC, course_index ASC", {"user_id": user_id}, ) return self._rows_to_dicts(cursor.fetchall()) def create_admin(self, username: str, password_hash: str) -> int: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( f"INSERT INTO admins (username, password_hash, created_at, updated_at) VALUES ({self._placeholder('username')}, {self._placeholder('password_hash')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", { "username": username.strip(), "password_hash": password_hash, "created_at": now, "updated_at": now, }, ) return int(cursor.lastrowid) def get_admin_by_username(self, username: str) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): cursor.execute( f"SELECT * FROM admins WHERE username = {self._placeholder('username')}", {"username": username.strip()}, ) row = cursor.fetchone() return None if row is None else dict(row) def list_admins(self) -> list[dict[str, Any]]: with self._cursor() as (_connection, cursor): cursor.execute("SELECT id, username, created_at, updated_at FROM admins ORDER BY username ASC") return self._rows_to_dicts(cursor.fetchall()) def find_active_task_for_user(self, user_id: int) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): cursor.execute( f"SELECT * FROM tasks WHERE user_id = {self._placeholder('user_id')} AND status IN ('pending', 'running', 'cancel_requested') ORDER BY created_at DESC LIMIT 1", {"user_id": user_id}, ) row = cursor.fetchone() return None if row is None else dict(row) def create_task(self, user_id: int, requested_by: str, requested_by_role: str) -> int: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( f"INSERT INTO tasks (user_id, status, requested_by, requested_by_role, last_error, total_attempts, total_errors, created_at, updated_at) VALUES ({self._placeholder('user_id')}, {self._placeholder('status')}, {self._placeholder('requested_by')}, {self._placeholder('requested_by_role')}, {self._placeholder('last_error')}, {self._placeholder('total_attempts')}, {self._placeholder('total_errors')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", { "user_id": user_id, "status": "pending", "requested_by": requested_by, "requested_by_role": requested_by_role, "last_error": "", "total_attempts": 0, "total_errors": 0, "created_at": now, "updated_at": now, }, ) return int(cursor.lastrowid) def get_task(self, task_id: int) -> dict[str, Any] | None: query = f""" SELECT t.*, u.student_id, u.display_name, u.refresh_interval_seconds FROM tasks t JOIN users u ON u.id = t.user_id WHERE t.id = {self._placeholder('task_id')} """ with self._cursor() as (_connection, cursor): cursor.execute(query, {"task_id": task_id}) row = cursor.fetchone() return None if row is None else dict(row) def get_latest_task_for_user(self, user_id: int) -> dict[str, Any] | None: query = f""" SELECT t.*, u.student_id, u.display_name, u.refresh_interval_seconds FROM tasks t JOIN users u ON u.id = t.user_id WHERE t.user_id = {self._placeholder('user_id')} ORDER BY t.created_at DESC LIMIT 1 """ with self._cursor() as (_connection, cursor): cursor.execute(query, {"user_id": user_id}) row = cursor.fetchone() return None if row is None else dict(row) def list_pending_tasks(self, limit: int) -> list[dict[str, Any]]: query = f""" SELECT t.*, u.student_id, u.display_name, u.password_encrypted, u.is_active, u.refresh_interval_seconds FROM tasks t JOIN users u ON u.id = t.user_id WHERE t.status = 'pending' ORDER BY t.created_at ASC LIMIT {self._placeholder('limit')} """ with self._cursor() as (_connection, cursor): cursor.execute(query, {"limit": int(limit)}) return self._rows_to_dicts(cursor.fetchall()) def list_recent_tasks(self, limit: int = 20) -> list[dict[str, Any]]: query = f""" SELECT t.*, u.student_id, u.display_name, u.refresh_interval_seconds FROM tasks t JOIN users u ON u.id = t.user_id ORDER BY t.created_at DESC LIMIT {self._placeholder('limit')} """ with self._cursor() as (_connection, cursor): cursor.execute(query, {"limit": int(limit)}) return self._rows_to_dicts(cursor.fetchall()) def mark_task_running(self, task_id: int) -> None: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( f"UPDATE tasks SET status = {self._placeholder('status')}, started_at = COALESCE(started_at, {self._placeholder('started_at')}), updated_at = {self._placeholder('updated_at')}, last_error = {self._placeholder('last_error')} WHERE id = {self._placeholder('task_id')}", { "status": "running", "started_at": now, "updated_at": now, "last_error": "", "task_id": task_id, }, ) def finish_task(self, task_id: int, status: str, last_error: str = "") -> None: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( f"UPDATE tasks SET status = {self._placeholder('status')}, finished_at = {self._placeholder('finished_at')}, updated_at = {self._placeholder('updated_at')}, last_error = {self._placeholder('last_error')} WHERE id = {self._placeholder('task_id')}", { "status": status, "finished_at": now, "updated_at": now, "last_error": last_error, "task_id": task_id, }, ) def update_task_status(self, task_id: int, status: str, last_error: str = "") -> None: with self._cursor() as (_connection, cursor): cursor.execute( f"UPDATE tasks SET status = {self._placeholder('status')}, updated_at = {self._placeholder('updated_at')}, last_error = {self._placeholder('last_error')} WHERE id = {self._placeholder('task_id')}", { "status": status, "updated_at": utc_now(), "last_error": last_error, "task_id": task_id, }, ) def increment_task_attempts(self, task_id: int, delta: int = 1) -> None: increment = max(0, int(delta)) if increment <= 0: return with self._cursor() as (_connection, cursor): cursor.execute( f"UPDATE tasks SET total_attempts = total_attempts + {self._placeholder('delta')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('task_id')}", {"delta": increment, "updated_at": utc_now(), "task_id": task_id}, ) def increment_task_errors(self, task_id: int, delta: int = 1) -> None: increment = max(0, int(delta)) if increment <= 0: return with self._cursor() as (_connection, cursor): cursor.execute( f"UPDATE tasks SET total_errors = total_errors + {self._placeholder('delta')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('task_id')}", {"delta": increment, "updated_at": utc_now(), "task_id": task_id}, ) def request_task_stop(self, task_id: int) -> bool: task = self.get_task(task_id) if not task or task["status"] not in ACTIVE_TASK_STATUSES: return False with self._cursor() as (_connection, cursor): cursor.execute( f"UPDATE tasks SET status = {self._placeholder('status')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('task_id')}", {"status": "cancel_requested", "updated_at": utc_now(), "task_id": task_id}, ) return True def reset_inflight_tasks(self) -> None: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( f"UPDATE tasks SET status = {self._placeholder('status')}, finished_at = COALESCE(finished_at, {self._placeholder('finished_at')}), updated_at = {self._placeholder('updated_at')} WHERE status IN ('pending', 'running', 'cancel_requested')", {"status": "stopped", "finished_at": now, "updated_at": now}, ) def add_log(self, task_id: int | None, user_id: int | None, scope: str, level: str, message: str) -> int: now = utc_now() with self._cursor() as (_connection, cursor): cursor.execute( f"INSERT INTO logs (task_id, user_id, scope, level, message, created_at) VALUES ({self._placeholder('task_id')}, {self._placeholder('user_id')}, {self._placeholder('scope')}, {self._placeholder('level')}, {self._placeholder('message')}, {self._placeholder('created_at')})", { "task_id": task_id, "user_id": user_id, "scope": scope, "level": level.upper(), "message": message, "created_at": now, }, ) return int(cursor.lastrowid) def list_recent_logs(self, *, user_id: int | None = None, limit: int = 120) -> list[dict[str, Any]]: if user_id is None: query = f""" SELECT l.*, u.student_id, u.display_name FROM logs l LEFT JOIN users u ON u.id = l.user_id ORDER BY l.id DESC LIMIT {self._placeholder('limit')} """ params = {"limit": int(limit)} else: query = f""" SELECT l.*, u.student_id, u.display_name FROM logs l LEFT JOIN users u ON u.id = l.user_id WHERE l.user_id = {self._placeholder('user_id')} ORDER BY l.id DESC LIMIT {self._placeholder('limit')} """ params = {"user_id": user_id, "limit": int(limit)} with self._cursor() as (_connection, cursor): cursor.execute(query, params) rows = cursor.fetchall() return list(reversed(self._rows_to_dicts(rows))) def list_logs_after(self, after_id: int, *, user_id: int | None = None, limit: int = 100) -> list[dict[str, Any]]: if user_id is None: query = f""" SELECT l.*, u.student_id, u.display_name FROM logs l LEFT JOIN users u ON u.id = l.user_id WHERE l.id > {self._placeholder('after_id')} ORDER BY l.id ASC LIMIT {self._placeholder('limit')} """ params = {"after_id": after_id, "limit": int(limit)} else: query = f""" SELECT l.*, u.student_id, u.display_name FROM logs l LEFT JOIN users u ON u.id = l.user_id WHERE l.id > {self._placeholder('after_id')} AND l.user_id = {self._placeholder('user_id')} ORDER BY l.id ASC LIMIT {self._placeholder('limit')} """ params = {"after_id": after_id, "user_id": user_id, "limit": int(limit)} with self._cursor() as (_connection, cursor): cursor.execute(query, params) return self._rows_to_dicts(cursor.fetchall()) def get_admin_stats(self) -> dict[str, int]: with self._cursor() as (_connection, cursor): cursor.execute("SELECT COUNT(*) AS total FROM users") users_count = int(dict(cursor.fetchone())["total"]) cursor.execute("SELECT COUNT(*) AS total FROM course_targets") courses_count = int(dict(cursor.fetchone())["total"]) cursor.execute("SELECT COUNT(*) AS total FROM admins") admins_count = int(dict(cursor.fetchone())["total"]) cursor.execute("SELECT COUNT(*) AS total FROM tasks WHERE status = 'running'") running_count = int(dict(cursor.fetchone())["total"]) cursor.execute("SELECT COUNT(*) AS total FROM tasks WHERE status = 'pending'") pending_count = int(dict(cursor.fetchone())["total"]) cursor.execute("SELECT COUNT(*) AS total FROM registration_codes") registration_code_count = int(dict(cursor.fetchone())["total"]) cursor.execute("SELECT COUNT(*) AS total FROM user_schedules WHERE is_enabled = 1") active_schedule_count = int(dict(cursor.fetchone())["total"]) return { "users_count": users_count, "courses_count": courses_count, "admins_count": admins_count + 1, "running_count": running_count, "pending_count": pending_count, "registration_code_count": registration_code_count, "active_schedule_count": active_schedule_count, } def get_user_schedule(self, user_id: int) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): cursor.execute( f"SELECT * FROM user_schedules WHERE user_id = {self._placeholder('user_id')}", {"user_id": user_id}, ) row = cursor.fetchone() return None if row is None else dict(row) def upsert_user_schedule( self, user_id: int, *, is_enabled: bool, start_date: str | None, end_date: str | None, daily_start_time: str | None, daily_stop_time: str | None, ) -> None: now = utc_now() current = self.get_user_schedule(user_id) payload = { "user_id": user_id, "is_enabled": 1 if is_enabled else 0, "start_date": start_date, "end_date": end_date, "daily_start_time": daily_start_time, "daily_stop_time": daily_stop_time, "last_auto_start_on": "", "last_auto_stop_on": "", "created_at": now, "updated_at": now, "id": int(current["id"]) if current else None, } with self._cursor() as (_connection, cursor): if current is None: cursor.execute( f"INSERT INTO user_schedules (user_id, is_enabled, start_date, end_date, daily_start_time, daily_stop_time, last_auto_start_on, last_auto_stop_on, created_at, updated_at) VALUES ({self._placeholder('user_id')}, {self._placeholder('is_enabled')}, {self._placeholder('start_date')}, {self._placeholder('end_date')}, {self._placeholder('daily_start_time')}, {self._placeholder('daily_stop_time')}, {self._placeholder('last_auto_start_on')}, {self._placeholder('last_auto_stop_on')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", payload, ) else: cursor.execute( f"UPDATE user_schedules SET is_enabled = {self._placeholder('is_enabled')}, start_date = {self._placeholder('start_date')}, end_date = {self._placeholder('end_date')}, daily_start_time = {self._placeholder('daily_start_time')}, daily_stop_time = {self._placeholder('daily_stop_time')}, last_auto_start_on = {self._placeholder('last_auto_start_on')}, last_auto_stop_on = {self._placeholder('last_auto_stop_on')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('id')}", payload, ) def list_enabled_user_schedules(self) -> list[dict[str, Any]]: query = """ SELECT s.*, u.student_id, u.display_name, u.is_active AS user_is_active FROM user_schedules s JOIN users u ON u.id = s.user_id WHERE s.is_enabled = 1 ORDER BY s.user_id ASC """ with self._cursor() as (_connection, cursor): cursor.execute(query) return self._rows_to_dicts(cursor.fetchall()) def mark_schedule_auto_start(self, user_id: int, day_text: str) -> None: with self._cursor() as (_connection, cursor): cursor.execute( f"UPDATE user_schedules SET last_auto_start_on = {self._placeholder('day_text')}, updated_at = {self._placeholder('updated_at')} WHERE user_id = {self._placeholder('user_id')}", {"day_text": day_text, "updated_at": utc_now(), "user_id": user_id}, ) def mark_schedule_auto_stop(self, user_id: int, day_text: str) -> None: with self._cursor() as (_connection, cursor): cursor.execute( f"UPDATE user_schedules SET last_auto_stop_on = {self._placeholder('day_text')}, updated_at = {self._placeholder('updated_at')} WHERE user_id = {self._placeholder('user_id')}", {"day_text": day_text, "updated_at": utc_now(), "user_id": user_id}, ) def create_registration_code( self, *, created_by: str, note: str = "", max_uses: int = DEFAULT_REGISTRATION_CODE_MAX_USES, ) -> dict[str, Any]: normalized_max_uses = max(1, int(max_uses or DEFAULT_REGISTRATION_CODE_MAX_USES)) now = utc_now() for _ in range(24): code = "SACC-" + "".join(secrets.choice(REGISTRATION_CODE_ALPHABET) for _ in range(8)) if self.get_registration_code_by_code(code) is not None: continue with self._cursor() as (_connection, cursor): cursor.execute( f"INSERT INTO registration_codes (code, note, is_active, max_uses, used_count, created_by, used_by_user_id, used_at, created_at, updated_at) VALUES ({self._placeholder('code')}, {self._placeholder('note')}, {self._placeholder('is_active')}, {self._placeholder('max_uses')}, {self._placeholder('used_count')}, {self._placeholder('created_by')}, {self._placeholder('used_by_user_id')}, {self._placeholder('used_at')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", { "code": code, "note": note.strip(), "is_active": 1, "max_uses": normalized_max_uses, "used_count": 0, "created_by": created_by, "used_by_user_id": None, "used_at": None, "created_at": now, "updated_at": now, }, ) return self.get_registration_code_by_id(int(cursor.lastrowid)) or {"code": code} raise RuntimeError("生成注册码失败,请重试。") def get_registration_code_by_code(self, code: str) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): cursor.execute( f"SELECT * FROM registration_codes WHERE code = {self._placeholder('code')}", {"code": normalize_registration_code(code)}, ) row = cursor.fetchone() return None if row is None else dict(row) def get_registration_code_by_id(self, registration_code_id: int) -> dict[str, Any] | None: with self._cursor() as (_connection, cursor): cursor.execute( f"SELECT * FROM registration_codes WHERE id = {self._placeholder('registration_code_id')}", {"registration_code_id": registration_code_id}, ) row = cursor.fetchone() return None if row is None else dict(row) def list_registration_codes(self, limit: int = 100) -> list[dict[str, Any]]: query = f""" SELECT rc.*, u.student_id AS used_by_student_id, u.display_name AS used_by_display_name FROM registration_codes rc LEFT JOIN users u ON u.id = rc.used_by_user_id ORDER BY rc.created_at DESC LIMIT {self._placeholder('limit')} """ with self._cursor() as (_connection, cursor): cursor.execute(query, {"limit": int(limit)}) return self._rows_to_dicts(cursor.fetchall()) def toggle_registration_code_active(self, registration_code_id: int) -> dict[str, Any] | None: code = self.get_registration_code_by_id(registration_code_id) if code is None: return None with self._cursor() as (_connection, cursor): cursor.execute( f"UPDATE registration_codes SET is_active = {self._placeholder('is_active')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('registration_code_id')}", { "is_active": 0 if bool(code.get('is_active')) else 1, "updated_at": utc_now(), "registration_code_id": registration_code_id, }, ) return self.get_registration_code_by_id(registration_code_id)