| from __future__ import annotations |
|
|
| import secrets |
| import sqlite3 |
| import string |
| from contextlib import contextmanager |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any, Iterator |
| from urllib.parse import parse_qs, unquote, urlparse |
|
|
| try: |
| import pymysql |
| from pymysql.cursors import DictCursor as MySQLDictCursor |
| except ImportError: |
| pymysql = None |
| MySQLDictCursor = None |
|
|
|
|
| ACTIVE_TASK_STATUSES = {"pending", "running", "cancel_requested"} |
| DEFAULT_REFRESH_INTERVAL_SECONDS = 10 |
| MIN_REFRESH_INTERVAL_SECONDS = 1 |
| MAX_REFRESH_INTERVAL_SECONDS = 120 |
| DEFAULT_REGISTRATION_CODE_MAX_USES = 1 |
| REGISTRATION_CODE_ALPHABET = string.ascii_uppercase + string.digits |
|
|
|
|
| def utc_now() -> str: |
| return datetime.now(timezone.utc).isoformat(timespec="seconds") |
|
|
|
|
| def clamp_refresh_interval_seconds(value: int | str | None, default: int = DEFAULT_REFRESH_INTERVAL_SECONDS) -> int: |
| try: |
| normalized = int(value or default) |
| except (TypeError, ValueError): |
| normalized = default |
| return max(MIN_REFRESH_INTERVAL_SECONDS, min(MAX_REFRESH_INTERVAL_SECONDS, normalized)) |
|
|
|
|
| def normalize_registration_code(value: str) -> str: |
| return str(value or "").strip().upper() |
|
|
|
|
| class Database: |
| def __init__( |
| self, |
| path: Path | str, |
| default_parallel_limit: int = 4, |
| *, |
| mysql_ssl_ca_path: Path | str | None = None, |
| ) -> None: |
| self.default_parallel_limit = default_parallel_limit |
| self._mysql_ssl_ca_path = Path(mysql_ssl_ca_path).resolve() if mysql_ssl_ca_path else None |
| self._dialect = "sqlite" |
| self._sqlite_path: Path | None = None |
| self._mysql_options: dict[str, Any] = {} |
| self._mysql_database_name = "" |
|
|
| raw_path = str(path) |
| if isinstance(path, Path) or "://" not in raw_path: |
| self._sqlite_path = Path(path).resolve() |
| self._sqlite_path.parent.mkdir(parents=True, exist_ok=True) |
| self.path = self._sqlite_path |
| elif raw_path.startswith("sqlite:///"): |
| self._sqlite_path = Path(raw_path[len("sqlite:///") :]).resolve() |
| self._sqlite_path.parent.mkdir(parents=True, exist_ok=True) |
| self.path = self._sqlite_path |
| elif raw_path.startswith("mysql://") or raw_path.startswith("mysql+pymysql://"): |
| self._dialect = "mysql" |
| self._mysql_options = self._parse_mysql_options(raw_path) |
| self._mysql_database_name = str(self._mysql_options["database"]) |
| self.path = self._mysql_display_label() |
| else: |
| raise ValueError(f"Unsupported database target: {raw_path}") |
|
|
| @property |
| def is_mysql(self) -> bool: |
| return self._dialect == "mysql" |
|
|
| def _mysql_display_label(self) -> str: |
| if not self._mysql_options: |
| return "mysql://unknown" |
| return ( |
| f"mysql://{self._mysql_options.get('user', '')}@" |
| f"{self._mysql_options.get('host', '')}:{self._mysql_options.get('port', '')}/" |
| f"{self._mysql_options.get('database', '')}" |
| ) |
|
|
| @staticmethod |
| def _parse_mysql_options(raw_url: str) -> dict[str, Any]: |
| normalized_url = raw_url.replace("mysql+pymysql://", "mysql://", 1) |
| parsed = urlparse(normalized_url) |
| return { |
| "host": parsed.hostname or "localhost", |
| "port": parsed.port or 3306, |
| "user": unquote(parsed.username or ""), |
| "password": unquote(parsed.password or ""), |
| "database": (parsed.path or "/").lstrip("/"), |
| "query": parse_qs(parsed.query), |
| } |
|
|
| def _connect(self): |
| if not self.is_mysql: |
| connection = sqlite3.connect(self._sqlite_path, timeout=30, check_same_thread=False) |
| connection.row_factory = sqlite3.Row |
| connection.execute("PRAGMA foreign_keys = ON") |
| connection.execute("PRAGMA journal_mode = WAL") |
| return connection |
|
|
| if pymysql is None or MySQLDictCursor is None: |
| raise RuntimeError("PyMySQL is required when DATABASE_URL/SQL_PASSWORD enables MySQL persistence.") |
|
|
| ssl_config: dict[str, str] | None = None |
| ssl_mode = str(self._mysql_options.get("query", {}).get("ssl-mode", [""])[0]).upper() |
| if ssl_mode == "REQUIRED" or self._mysql_ssl_ca_path is not None: |
| if self._mysql_ssl_ca_path is None or not self._mysql_ssl_ca_path.exists(): |
| raise RuntimeError("MySQL SSL is enabled but ca.pem was not found.") |
| ssl_config = {"ca": str(self._mysql_ssl_ca_path)} |
|
|
| return pymysql.connect( |
| host=self._mysql_options["host"], |
| port=int(self._mysql_options["port"]), |
| user=self._mysql_options["user"], |
| password=self._mysql_options["password"], |
| database=self._mysql_options["database"], |
| charset="utf8mb4", |
| cursorclass=MySQLDictCursor, |
| autocommit=False, |
| ssl=ssl_config, |
| connect_timeout=10, |
| read_timeout=30, |
| write_timeout=30, |
| ) |
|
|
| @contextmanager |
| def _cursor(self) -> Iterator[tuple[Any, Any]]: |
| connection = self._connect() |
| try: |
| cursor = connection.cursor() |
| yield connection, cursor |
| connection.commit() |
| finally: |
| connection.close() |
|
|
| def _placeholder(self, name: str) -> str: |
| return f":{name}" if not self.is_mysql else f"%({name})s" |
|
|
| @staticmethod |
| def _rows_to_dicts(rows: list[Any]) -> list[dict[str, Any]]: |
| return [dict(row) for row in rows] |
|
|
| @staticmethod |
| def _normalize_course_identity(value: str) -> str: |
| return str(value or "").strip().upper() |
|
|
| def _column_exists(self, cursor: Any, table_name: str, column_name: str) -> bool: |
| if not self.is_mysql: |
| rows = cursor.execute(f"PRAGMA table_info({table_name})").fetchall() |
| return any(str(row[1]) == column_name for row in rows) |
|
|
| query = ( |
| f"SELECT COUNT(*) AS total FROM information_schema.columns " |
| f"WHERE table_schema = {self._placeholder('schema_name')} " |
| f"AND table_name = {self._placeholder('table_name')} " |
| f"AND column_name = {self._placeholder('column_name')}" |
| ) |
| cursor.execute( |
| query, |
| { |
| "schema_name": self._mysql_database_name, |
| "table_name": table_name, |
| "column_name": column_name, |
| }, |
| ) |
| row = cursor.fetchone() |
| return int(dict(row).get("total", 0)) > 0 |
|
|
| def _ensure_column(self, table_name: str, column_name: str, definition: str) -> None: |
| with self._cursor() as (_connection, cursor): |
| if self._column_exists(cursor, table_name, column_name): |
| return |
| cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {definition}") |
|
|
| def _create_tables_sqlite(self, cursor: Any) -> None: |
| cursor.executescript( |
| """ |
| CREATE TABLE IF NOT EXISTS users ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| student_id TEXT NOT NULL UNIQUE, |
| password_encrypted TEXT NOT NULL, |
| display_name TEXT NOT NULL DEFAULT '', |
| is_active INTEGER NOT NULL DEFAULT 1, |
| refresh_interval_seconds INTEGER NOT NULL DEFAULT 10, |
| created_at TEXT NOT NULL, |
| updated_at TEXT NOT NULL |
| ); |
| CREATE TABLE IF NOT EXISTS course_targets ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER NOT NULL, |
| category TEXT NOT NULL DEFAULT 'free', |
| course_id TEXT NOT NULL, |
| course_index TEXT NOT NULL, |
| created_at TEXT NOT NULL, |
| UNIQUE(user_id, category, course_id, course_index), |
| FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE |
| ); |
| CREATE TABLE IF NOT EXISTS admins ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| username TEXT NOT NULL UNIQUE, |
| password_hash TEXT NOT NULL, |
| created_at TEXT NOT NULL, |
| updated_at TEXT NOT NULL |
| ); |
| CREATE TABLE IF NOT EXISTS app_settings ( |
| `key` TEXT PRIMARY KEY, |
| value TEXT NOT NULL, |
| updated_at TEXT NOT NULL |
| ); |
| CREATE TABLE IF NOT EXISTS tasks ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER NOT NULL, |
| status TEXT NOT NULL, |
| requested_by TEXT NOT NULL, |
| requested_by_role TEXT NOT NULL, |
| last_error TEXT NOT NULL DEFAULT '', |
| total_attempts INTEGER NOT NULL DEFAULT 0, |
| total_errors INTEGER NOT NULL DEFAULT 0, |
| created_at TEXT NOT NULL, |
| started_at TEXT, |
| finished_at TEXT, |
| updated_at TEXT NOT NULL, |
| FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE |
| ); |
| CREATE TABLE IF NOT EXISTS logs ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| task_id INTEGER, |
| user_id INTEGER, |
| scope TEXT NOT NULL, |
| level TEXT NOT NULL, |
| message TEXT NOT NULL, |
| created_at TEXT NOT NULL, |
| FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE, |
| FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE |
| ); |
| CREATE TABLE IF NOT EXISTS user_schedules ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER NOT NULL UNIQUE, |
| is_enabled INTEGER NOT NULL DEFAULT 0, |
| start_date TEXT, |
| end_date TEXT, |
| daily_start_time TEXT, |
| daily_stop_time TEXT, |
| last_auto_start_on TEXT NOT NULL DEFAULT '', |
| last_auto_stop_on TEXT NOT NULL DEFAULT '', |
| created_at TEXT NOT NULL, |
| updated_at TEXT NOT NULL, |
| FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE |
| ); |
| CREATE TABLE IF NOT EXISTS registration_codes ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| code TEXT NOT NULL UNIQUE, |
| note TEXT NOT NULL DEFAULT '', |
| is_active INTEGER NOT NULL DEFAULT 1, |
| max_uses INTEGER NOT NULL DEFAULT 1, |
| used_count INTEGER NOT NULL DEFAULT 0, |
| created_by TEXT NOT NULL DEFAULT '', |
| used_by_user_id INTEGER, |
| used_at TEXT, |
| created_at TEXT NOT NULL, |
| updated_at TEXT NOT NULL, |
| FOREIGN KEY(used_by_user_id) REFERENCES users(id) ON DELETE SET NULL |
| ); |
| """ |
| )
|
| def _create_tables_mysql(self, cursor: Any) -> None: |
| statements = [ |
| """ |
| CREATE TABLE IF NOT EXISTS users ( |
| id BIGINT PRIMARY KEY AUTO_INCREMENT, |
| student_id VARCHAR(32) NOT NULL UNIQUE, |
| password_encrypted TEXT NOT NULL, |
| display_name VARCHAR(255) NOT NULL DEFAULT '', |
| is_active TINYINT(1) NOT NULL DEFAULT 1, |
| refresh_interval_seconds INT NOT NULL DEFAULT 10, |
| created_at VARCHAR(32) NOT NULL, |
| updated_at VARCHAR(32) NOT NULL |
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 |
| """, |
| """ |
| CREATE TABLE IF NOT EXISTS course_targets ( |
| id BIGINT PRIMARY KEY AUTO_INCREMENT, |
| user_id BIGINT NOT NULL, |
| category VARCHAR(32) NOT NULL DEFAULT 'free', |
| course_id VARCHAR(64) NOT NULL, |
| course_index VARCHAR(32) NOT NULL, |
| created_at VARCHAR(32) NOT NULL, |
| UNIQUE KEY uq_course_targets_identity (user_id, category, course_id, course_index), |
| CONSTRAINT fk_course_targets_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE |
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 |
| """, |
| """ |
| CREATE TABLE IF NOT EXISTS admins ( |
| id BIGINT PRIMARY KEY AUTO_INCREMENT, |
| username VARCHAR(255) NOT NULL UNIQUE, |
| password_hash TEXT NOT NULL, |
| created_at VARCHAR(32) NOT NULL, |
| updated_at VARCHAR(32) NOT NULL |
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 |
| """, |
| """ |
| CREATE TABLE IF NOT EXISTS app_settings ( |
| `key` VARCHAR(191) PRIMARY KEY, |
| value TEXT NOT NULL, |
| updated_at VARCHAR(32) NOT NULL |
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 |
| """, |
| """ |
| CREATE TABLE IF NOT EXISTS tasks ( |
| id BIGINT PRIMARY KEY AUTO_INCREMENT, |
| user_id BIGINT NOT NULL, |
| status VARCHAR(32) NOT NULL, |
| requested_by VARCHAR(255) NOT NULL, |
| requested_by_role VARCHAR(32) NOT NULL, |
| last_error TEXT NOT NULL, |
| total_attempts INT NOT NULL DEFAULT 0, |
| total_errors INT NOT NULL DEFAULT 0, |
| created_at VARCHAR(32) NOT NULL, |
| started_at VARCHAR(32), |
| finished_at VARCHAR(32), |
| updated_at VARCHAR(32) NOT NULL, |
| CONSTRAINT fk_tasks_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE |
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 |
| """, |
| """ |
| CREATE TABLE IF NOT EXISTS logs ( |
| id BIGINT PRIMARY KEY AUTO_INCREMENT, |
| task_id BIGINT NULL, |
| user_id BIGINT NULL, |
| scope VARCHAR(64) NOT NULL, |
| level VARCHAR(32) NOT NULL, |
| message TEXT NOT NULL, |
| created_at VARCHAR(32) NOT NULL, |
| CONSTRAINT fk_logs_task FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE, |
| CONSTRAINT fk_logs_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE |
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 |
| """, |
| """ |
| CREATE TABLE IF NOT EXISTS user_schedules ( |
| id BIGINT PRIMARY KEY AUTO_INCREMENT, |
| user_id BIGINT NOT NULL UNIQUE, |
| is_enabled TINYINT(1) NOT NULL DEFAULT 0, |
| start_date VARCHAR(10) NULL, |
| end_date VARCHAR(10) NULL, |
| daily_start_time VARCHAR(5) NULL, |
| daily_stop_time VARCHAR(5) NULL, |
| last_auto_start_on VARCHAR(10) NOT NULL DEFAULT '', |
| last_auto_stop_on VARCHAR(10) NOT NULL DEFAULT '', |
| created_at VARCHAR(32) NOT NULL, |
| updated_at VARCHAR(32) NOT NULL, |
| CONSTRAINT fk_user_schedules_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE |
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 |
| """, |
| """ |
| CREATE TABLE IF NOT EXISTS registration_codes ( |
| id BIGINT PRIMARY KEY AUTO_INCREMENT, |
| code VARCHAR(64) NOT NULL UNIQUE, |
| note TEXT NOT NULL, |
| is_active TINYINT(1) NOT NULL DEFAULT 1, |
| max_uses INT NOT NULL DEFAULT 1, |
| used_count INT NOT NULL DEFAULT 0, |
| created_by VARCHAR(255) NOT NULL DEFAULT '', |
| used_by_user_id BIGINT NULL, |
| used_at VARCHAR(32) NULL, |
| created_at VARCHAR(32) NOT NULL, |
| updated_at VARCHAR(32) NOT NULL, |
| CONSTRAINT fk_registration_codes_user FOREIGN KEY (used_by_user_id) REFERENCES users(id) ON DELETE SET NULL |
| ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 |
| """, |
| ] |
| for statement in statements: |
| cursor.execute(statement) |
|
|
| def init_db(self) -> None: |
| with self._cursor() as (_connection, cursor): |
| if self.is_mysql: |
| self._create_tables_mysql(cursor) |
| else: |
| self._create_tables_sqlite(cursor) |
|
|
| self._ensure_column( |
| "users", |
| "refresh_interval_seconds", |
| f"INT NOT NULL DEFAULT {DEFAULT_REFRESH_INTERVAL_SECONDS}", |
| ) |
| self._ensure_column("tasks", "total_attempts", "INT NOT NULL DEFAULT 0") |
| self._ensure_column("tasks", "total_errors", "INT NOT NULL DEFAULT 0") |
|
|
| if self.get_setting("parallel_limit") is None: |
| self.set_setting("parallel_limit", str(self.default_parallel_limit)) |
|
|
| def get_setting(self, key: str) -> str | None: |
| query = f"SELECT value FROM app_settings WHERE `key` = {self._placeholder('key')}" |
| with self._cursor() as (_connection, cursor): |
| cursor.execute(query, {"key": key}) |
| row = cursor.fetchone() |
| return None if row is None else str(dict(row)["value"]) |
|
|
| def set_setting(self, key: str, value: str) -> None: |
| now = utc_now() |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"SELECT value FROM app_settings WHERE `key` = {self._placeholder('key')}", |
| {"key": key}, |
| ) |
| existing = cursor.fetchone() |
| if existing is None: |
| cursor.execute( |
| f"INSERT INTO app_settings (`key`, value, updated_at) VALUES ({self._placeholder('key')}, {self._placeholder('value')}, {self._placeholder('updated_at')})", |
| {"key": key, "value": value, "updated_at": now}, |
| ) |
| else: |
| cursor.execute( |
| f"UPDATE app_settings SET value = {self._placeholder('value')}, updated_at = {self._placeholder('updated_at')} WHERE `key` = {self._placeholder('key')}", |
| {"key": key, "value": value, "updated_at": now}, |
| ) |
|
|
| def get_parallel_limit(self) -> int: |
| raw_value = self.get_setting("parallel_limit") |
| if raw_value is None: |
| return self.default_parallel_limit |
| try: |
| return max(1, int(raw_value)) |
| except ValueError: |
| return self.default_parallel_limit |
|
|
| def set_parallel_limit(self, limit: int) -> None: |
| self.set_setting("parallel_limit", str(max(1, limit))) |
|
|
| def create_user( |
| self, |
| student_id: str, |
| password_encrypted: str, |
| display_name: str = "", |
| *, |
| refresh_interval_seconds: int = DEFAULT_REFRESH_INTERVAL_SECONDS, |
| ) -> int: |
| now = utc_now() |
| refresh_interval = clamp_refresh_interval_seconds(refresh_interval_seconds) |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"INSERT INTO users (student_id, password_encrypted, display_name, is_active, refresh_interval_seconds, created_at, updated_at) VALUES ({self._placeholder('student_id')}, {self._placeholder('password_encrypted')}, {self._placeholder('display_name')}, {self._placeholder('is_active')}, {self._placeholder('refresh_interval_seconds')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", |
| { |
| "student_id": student_id.strip(), |
| "password_encrypted": password_encrypted, |
| "display_name": display_name.strip(), |
| "is_active": 1, |
| "refresh_interval_seconds": refresh_interval, |
| "created_at": now, |
| "updated_at": now, |
| }, |
| ) |
| return int(cursor.lastrowid) |
|
|
| def register_user_with_code( |
| self, |
| registration_code: str, |
| student_id: str, |
| password_encrypted: str, |
| display_name: str = "", |
| *, |
| refresh_interval_seconds: int = DEFAULT_REFRESH_INTERVAL_SECONDS, |
| ) -> int: |
| now = utc_now() |
| normalized_code = normalize_registration_code(registration_code) |
| normalized_student_id = str(student_id or "").strip() |
| refresh_interval = clamp_refresh_interval_seconds(refresh_interval_seconds) |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"SELECT * FROM registration_codes WHERE code = {self._placeholder('code')}", |
| {"code": normalized_code}, |
| ) |
| code_row = cursor.fetchone() |
| if code_row is None: |
| raise ValueError("注册码不存在。") |
| code_payload = dict(code_row) |
| if not bool(code_payload.get("is_active", 0)): |
| raise ValueError("注册码已停用。") |
| if int(code_payload.get("used_count", 0)) >= int(code_payload.get("max_uses", 1)): |
| raise ValueError("注册码已使用完毕。") |
| cursor.execute( |
| f"SELECT id FROM users WHERE student_id = {self._placeholder('student_id')}", |
| {"student_id": normalized_student_id}, |
| ) |
| if cursor.fetchone() is not None: |
| raise ValueError("该学号已经注册。") |
| cursor.execute( |
| f"INSERT INTO users (student_id, password_encrypted, display_name, is_active, refresh_interval_seconds, created_at, updated_at) VALUES ({self._placeholder('student_id')}, {self._placeholder('password_encrypted')}, {self._placeholder('display_name')}, {self._placeholder('is_active')}, {self._placeholder('refresh_interval_seconds')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", |
| { |
| "student_id": normalized_student_id, |
| "password_encrypted": password_encrypted, |
| "display_name": display_name.strip(), |
| "is_active": 1, |
| "refresh_interval_seconds": refresh_interval, |
| "created_at": now, |
| "updated_at": now, |
| }, |
| ) |
| user_id = int(cursor.lastrowid) |
| used_count = int(code_payload.get("used_count", 0)) + 1 |
| max_uses = int(code_payload.get("max_uses", 1)) |
| cursor.execute( |
| f"UPDATE registration_codes SET used_count = {self._placeholder('used_count')}, used_by_user_id = {self._placeholder('used_by_user_id')}, used_at = {self._placeholder('used_at')}, is_active = {self._placeholder('is_active')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('id')}", |
| { |
| "id": int(code_payload["id"]), |
| "used_count": used_count, |
| "used_by_user_id": user_id, |
| "used_at": now, |
| "is_active": 0 if used_count >= max_uses else 1, |
| "updated_at": now, |
| }, |
| ) |
| return user_id
|
| def update_user( |
| self, |
| user_id: int, |
| *, |
| password_encrypted: str | None = None, |
| display_name: str | None = None, |
| is_active: bool | None = None, |
| refresh_interval_seconds: int | None = None, |
| ) -> None: |
| assignments: list[str] = [] |
| values: dict[str, Any] = {"user_id": user_id, "updated_at": utc_now()} |
| if password_encrypted is not None: |
| assignments.append(f"password_encrypted = {self._placeholder('password_encrypted')}") |
| values["password_encrypted"] = password_encrypted |
| if display_name is not None: |
| assignments.append(f"display_name = {self._placeholder('display_name')}") |
| values["display_name"] = display_name.strip() |
| if is_active is not None: |
| assignments.append(f"is_active = {self._placeholder('is_active')}") |
| values["is_active"] = 1 if is_active else 0 |
| if refresh_interval_seconds is not None: |
| assignments.append(f"refresh_interval_seconds = {self._placeholder('refresh_interval_seconds')}") |
| values["refresh_interval_seconds"] = clamp_refresh_interval_seconds(refresh_interval_seconds) |
| if not assignments: |
| return |
| assignments.append(f"updated_at = {self._placeholder('updated_at')}") |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"UPDATE users SET {', '.join(assignments)} WHERE id = {self._placeholder('user_id')}", |
| values, |
| ) |
|
|
| def delete_user(self, user_id: int) -> None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"DELETE FROM users WHERE id = {self._placeholder('user_id')}", |
| {"user_id": user_id}, |
| ) |
|
|
| def toggle_user_active(self, user_id: int) -> dict[str, Any] | None: |
| user = self.get_user(user_id) |
| if not user: |
| return None |
| self.update_user(user_id, is_active=not bool(user["is_active"])) |
| return self.get_user(user_id) |
|
|
| def get_user(self, user_id: int) -> dict[str, Any] | None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"SELECT * FROM users WHERE id = {self._placeholder('user_id')}", |
| {"user_id": user_id}, |
| ) |
| row = cursor.fetchone() |
| return None if row is None else dict(row) |
|
|
| def get_user_by_student_id(self, student_id: str) -> dict[str, Any] | None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"SELECT * FROM users WHERE student_id = {self._placeholder('student_id')}", |
| {"student_id": student_id.strip()}, |
| ) |
| row = cursor.fetchone() |
| return None if row is None else dict(row) |
|
|
| def list_users(self) -> list[dict[str, Any]]: |
| query = f""" |
| SELECT |
| u.*, |
| (SELECT COUNT(*) FROM course_targets c WHERE c.user_id = u.id) AS course_count, |
| (SELECT t.status FROM tasks t WHERE t.user_id = u.id ORDER BY t.created_at DESC LIMIT 1) AS latest_task_status, |
| (SELECT t.updated_at FROM tasks t WHERE t.user_id = u.id ORDER BY t.created_at DESC LIMIT 1) AS latest_task_updated_at |
| FROM users u |
| ORDER BY u.student_id ASC |
| """ |
| with self._cursor() as (_connection, cursor): |
| cursor.execute(query) |
| return self._rows_to_dicts(cursor.fetchall()) |
|
|
| def add_course(self, user_id: int, category: str, course_id: str, course_index: str) -> int | None: |
| now = utc_now() |
| normalized_category = "plan" if category == "plan" else "free" |
| normalized_course_id = self._normalize_course_identity(course_id) |
| normalized_course_index = self._normalize_course_identity(course_index) |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"SELECT id FROM course_targets WHERE user_id = {self._placeholder('user_id')} AND category = {self._placeholder('category')} AND course_id = {self._placeholder('course_id')} AND course_index = {self._placeholder('course_index')}", |
| { |
| "user_id": user_id, |
| "category": normalized_category, |
| "course_id": normalized_course_id, |
| "course_index": normalized_course_index, |
| }, |
| ) |
| if cursor.fetchone() is not None: |
| return None |
| cursor.execute( |
| f"INSERT INTO course_targets (user_id, category, course_id, course_index, created_at) VALUES ({self._placeholder('user_id')}, {self._placeholder('category')}, {self._placeholder('course_id')}, {self._placeholder('course_index')}, {self._placeholder('created_at')})", |
| { |
| "user_id": user_id, |
| "category": normalized_category, |
| "course_id": normalized_course_id, |
| "course_index": normalized_course_index, |
| "created_at": now, |
| }, |
| ) |
| return int(cursor.lastrowid) |
|
|
| def delete_course(self, course_target_id: int) -> None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"DELETE FROM course_targets WHERE id = {self._placeholder('course_target_id')}", |
| {"course_target_id": course_target_id}, |
| ) |
|
|
| def remove_course_by_identity(self, user_id: int, category: str, course_id: str, course_index: str) -> None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"DELETE FROM course_targets WHERE user_id = {self._placeholder('user_id')} AND category = {self._placeholder('category')} AND course_id = {self._placeholder('course_id')} AND course_index = {self._placeholder('course_index')}", |
| { |
| "user_id": user_id, |
| "category": category, |
| "course_id": self._normalize_course_identity(course_id), |
| "course_index": self._normalize_course_identity(course_index), |
| }, |
| ) |
|
|
| def list_courses_for_user(self, user_id: int) -> list[dict[str, Any]]: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"SELECT * FROM course_targets WHERE user_id = {self._placeholder('user_id')} ORDER BY category ASC, course_id ASC, course_index ASC", |
| {"user_id": user_id}, |
| ) |
| return self._rows_to_dicts(cursor.fetchall()) |
|
|
| def create_admin(self, username: str, password_hash: str) -> int: |
| now = utc_now() |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"INSERT INTO admins (username, password_hash, created_at, updated_at) VALUES ({self._placeholder('username')}, {self._placeholder('password_hash')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", |
| { |
| "username": username.strip(), |
| "password_hash": password_hash, |
| "created_at": now, |
| "updated_at": now, |
| }, |
| ) |
| return int(cursor.lastrowid) |
|
|
| def get_admin_by_username(self, username: str) -> dict[str, Any] | None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"SELECT * FROM admins WHERE username = {self._placeholder('username')}", |
| {"username": username.strip()}, |
| ) |
| row = cursor.fetchone() |
| return None if row is None else dict(row) |
|
|
| def list_admins(self) -> list[dict[str, Any]]: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute("SELECT id, username, created_at, updated_at FROM admins ORDER BY username ASC") |
| return self._rows_to_dicts(cursor.fetchall()) |
|
|
| def find_active_task_for_user(self, user_id: int) -> dict[str, Any] | None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"SELECT * FROM tasks WHERE user_id = {self._placeholder('user_id')} AND status IN ('pending', 'running', 'cancel_requested') ORDER BY created_at DESC LIMIT 1", |
| {"user_id": user_id}, |
| ) |
| row = cursor.fetchone() |
| return None if row is None else dict(row) |
|
|
| def create_task(self, user_id: int, requested_by: str, requested_by_role: str) -> int: |
| now = utc_now() |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"INSERT INTO tasks (user_id, status, requested_by, requested_by_role, last_error, total_attempts, total_errors, created_at, updated_at) VALUES ({self._placeholder('user_id')}, {self._placeholder('status')}, {self._placeholder('requested_by')}, {self._placeholder('requested_by_role')}, {self._placeholder('last_error')}, {self._placeholder('total_attempts')}, {self._placeholder('total_errors')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", |
| { |
| "user_id": user_id, |
| "status": "pending", |
| "requested_by": requested_by, |
| "requested_by_role": requested_by_role, |
| "last_error": "", |
| "total_attempts": 0, |
| "total_errors": 0, |
| "created_at": now, |
| "updated_at": now, |
| }, |
| ) |
| return int(cursor.lastrowid) |
|
|
| def get_task(self, task_id: int) -> dict[str, Any] | None: |
| query = f""" |
| SELECT t.*, u.student_id, u.display_name, u.refresh_interval_seconds |
| FROM tasks t |
| JOIN users u ON u.id = t.user_id |
| WHERE t.id = {self._placeholder('task_id')} |
| """ |
| with self._cursor() as (_connection, cursor): |
| cursor.execute(query, {"task_id": task_id}) |
| row = cursor.fetchone() |
| return None if row is None else dict(row) |
|
|
| def get_latest_task_for_user(self, user_id: int) -> dict[str, Any] | None: |
| query = f""" |
| SELECT t.*, u.student_id, u.display_name, u.refresh_interval_seconds |
| FROM tasks t |
| JOIN users u ON u.id = t.user_id |
| WHERE t.user_id = {self._placeholder('user_id')} |
| ORDER BY t.created_at DESC |
| LIMIT 1 |
| """ |
| with self._cursor() as (_connection, cursor): |
| cursor.execute(query, {"user_id": user_id}) |
| row = cursor.fetchone() |
| return None if row is None else dict(row) |
|
|
| def list_pending_tasks(self, limit: int) -> list[dict[str, Any]]: |
| query = f""" |
| SELECT t.*, u.student_id, u.display_name, u.password_encrypted, u.is_active, u.refresh_interval_seconds |
| FROM tasks t |
| JOIN users u ON u.id = t.user_id |
| WHERE t.status = 'pending' |
| ORDER BY t.created_at ASC |
| LIMIT {self._placeholder('limit')} |
| """ |
| with self._cursor() as (_connection, cursor): |
| cursor.execute(query, {"limit": int(limit)}) |
| return self._rows_to_dicts(cursor.fetchall()) |
|
|
| def list_recent_tasks(self, limit: int = 20) -> list[dict[str, Any]]: |
| query = f""" |
| SELECT t.*, u.student_id, u.display_name, u.refresh_interval_seconds |
| FROM tasks t |
| JOIN users u ON u.id = t.user_id |
| ORDER BY t.created_at DESC |
| LIMIT {self._placeholder('limit')} |
| """ |
| with self._cursor() as (_connection, cursor): |
| cursor.execute(query, {"limit": int(limit)}) |
| return self._rows_to_dicts(cursor.fetchall())
|
| def mark_task_running(self, task_id: int) -> None: |
| now = utc_now() |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"UPDATE tasks SET status = {self._placeholder('status')}, started_at = COALESCE(started_at, {self._placeholder('started_at')}), updated_at = {self._placeholder('updated_at')}, last_error = {self._placeholder('last_error')} WHERE id = {self._placeholder('task_id')}", |
| { |
| "status": "running", |
| "started_at": now, |
| "updated_at": now, |
| "last_error": "", |
| "task_id": task_id, |
| }, |
| ) |
|
|
| def finish_task(self, task_id: int, status: str, last_error: str = "") -> None: |
| now = utc_now() |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"UPDATE tasks SET status = {self._placeholder('status')}, finished_at = {self._placeholder('finished_at')}, updated_at = {self._placeholder('updated_at')}, last_error = {self._placeholder('last_error')} WHERE id = {self._placeholder('task_id')}", |
| { |
| "status": status, |
| "finished_at": now, |
| "updated_at": now, |
| "last_error": last_error, |
| "task_id": task_id, |
| }, |
| ) |
|
|
| def update_task_status(self, task_id: int, status: str, last_error: str = "") -> None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"UPDATE tasks SET status = {self._placeholder('status')}, updated_at = {self._placeholder('updated_at')}, last_error = {self._placeholder('last_error')} WHERE id = {self._placeholder('task_id')}", |
| { |
| "status": status, |
| "updated_at": utc_now(), |
| "last_error": last_error, |
| "task_id": task_id, |
| }, |
| ) |
|
|
| def increment_task_attempts(self, task_id: int, delta: int = 1) -> None: |
| increment = max(0, int(delta)) |
| if increment <= 0: |
| return |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"UPDATE tasks SET total_attempts = total_attempts + {self._placeholder('delta')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('task_id')}", |
| {"delta": increment, "updated_at": utc_now(), "task_id": task_id}, |
| ) |
|
|
| def increment_task_errors(self, task_id: int, delta: int = 1) -> None: |
| increment = max(0, int(delta)) |
| if increment <= 0: |
| return |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"UPDATE tasks SET total_errors = total_errors + {self._placeholder('delta')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('task_id')}", |
| {"delta": increment, "updated_at": utc_now(), "task_id": task_id}, |
| ) |
|
|
| def request_task_stop(self, task_id: int) -> bool: |
| task = self.get_task(task_id) |
| if not task or task["status"] not in ACTIVE_TASK_STATUSES: |
| return False |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"UPDATE tasks SET status = {self._placeholder('status')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('task_id')}", |
| {"status": "cancel_requested", "updated_at": utc_now(), "task_id": task_id}, |
| ) |
| return True |
|
|
| def reset_inflight_tasks(self) -> None: |
| now = utc_now() |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"UPDATE tasks SET status = {self._placeholder('status')}, finished_at = COALESCE(finished_at, {self._placeholder('finished_at')}), updated_at = {self._placeholder('updated_at')} WHERE status IN ('pending', 'running', 'cancel_requested')", |
| {"status": "stopped", "finished_at": now, "updated_at": now}, |
| ) |
|
|
| def add_log(self, task_id: int | None, user_id: int | None, scope: str, level: str, message: str) -> int: |
| now = utc_now() |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"INSERT INTO logs (task_id, user_id, scope, level, message, created_at) VALUES ({self._placeholder('task_id')}, {self._placeholder('user_id')}, {self._placeholder('scope')}, {self._placeholder('level')}, {self._placeholder('message')}, {self._placeholder('created_at')})", |
| { |
| "task_id": task_id, |
| "user_id": user_id, |
| "scope": scope, |
| "level": level.upper(), |
| "message": message, |
| "created_at": now, |
| }, |
| ) |
| return int(cursor.lastrowid) |
|
|
| def list_recent_logs(self, *, user_id: int | None = None, limit: int = 120) -> list[dict[str, Any]]: |
| if user_id is None: |
| query = f""" |
| SELECT l.*, u.student_id, u.display_name |
| FROM logs l |
| LEFT JOIN users u ON u.id = l.user_id |
| ORDER BY l.id DESC |
| LIMIT {self._placeholder('limit')} |
| """ |
| params = {"limit": int(limit)} |
| else: |
| query = f""" |
| SELECT l.*, u.student_id, u.display_name |
| FROM logs l |
| LEFT JOIN users u ON u.id = l.user_id |
| WHERE l.user_id = {self._placeholder('user_id')} |
| ORDER BY l.id DESC |
| LIMIT {self._placeholder('limit')} |
| """ |
| params = {"user_id": user_id, "limit": int(limit)} |
| with self._cursor() as (_connection, cursor): |
| cursor.execute(query, params) |
| rows = cursor.fetchall() |
| return list(reversed(self._rows_to_dicts(rows))) |
|
|
| def list_logs_after(self, after_id: int, *, user_id: int | None = None, limit: int = 100) -> list[dict[str, Any]]: |
| if user_id is None: |
| query = f""" |
| SELECT l.*, u.student_id, u.display_name |
| FROM logs l |
| LEFT JOIN users u ON u.id = l.user_id |
| WHERE l.id > {self._placeholder('after_id')} |
| ORDER BY l.id ASC |
| LIMIT {self._placeholder('limit')} |
| """ |
| params = {"after_id": after_id, "limit": int(limit)} |
| else: |
| query = f""" |
| SELECT l.*, u.student_id, u.display_name |
| FROM logs l |
| LEFT JOIN users u ON u.id = l.user_id |
| WHERE l.id > {self._placeholder('after_id')} AND l.user_id = {self._placeholder('user_id')} |
| ORDER BY l.id ASC |
| LIMIT {self._placeholder('limit')} |
| """ |
| params = {"after_id": after_id, "user_id": user_id, "limit": int(limit)} |
| with self._cursor() as (_connection, cursor): |
| cursor.execute(query, params) |
| return self._rows_to_dicts(cursor.fetchall()) |
|
|
| def get_admin_stats(self) -> dict[str, int]: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute("SELECT COUNT(*) AS total FROM users") |
| users_count = int(dict(cursor.fetchone())["total"]) |
| cursor.execute("SELECT COUNT(*) AS total FROM course_targets") |
| courses_count = int(dict(cursor.fetchone())["total"]) |
| cursor.execute("SELECT COUNT(*) AS total FROM admins") |
| admins_count = int(dict(cursor.fetchone())["total"]) |
| cursor.execute("SELECT COUNT(*) AS total FROM tasks WHERE status = 'running'") |
| running_count = int(dict(cursor.fetchone())["total"]) |
| cursor.execute("SELECT COUNT(*) AS total FROM tasks WHERE status = 'pending'") |
| pending_count = int(dict(cursor.fetchone())["total"]) |
| cursor.execute("SELECT COUNT(*) AS total FROM registration_codes") |
| registration_code_count = int(dict(cursor.fetchone())["total"]) |
| cursor.execute("SELECT COUNT(*) AS total FROM user_schedules WHERE is_enabled = 1") |
| active_schedule_count = int(dict(cursor.fetchone())["total"]) |
| return { |
| "users_count": users_count, |
| "courses_count": courses_count, |
| "admins_count": admins_count + 1, |
| "running_count": running_count, |
| "pending_count": pending_count, |
| "registration_code_count": registration_code_count, |
| "active_schedule_count": active_schedule_count, |
| } |
|
|
| def get_user_schedule(self, user_id: int) -> dict[str, Any] | None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"SELECT * FROM user_schedules WHERE user_id = {self._placeholder('user_id')}", |
| {"user_id": user_id}, |
| ) |
| row = cursor.fetchone() |
| return None if row is None else dict(row) |
|
|
| def upsert_user_schedule( |
| self, |
| user_id: int, |
| *, |
| is_enabled: bool, |
| start_date: str | None, |
| end_date: str | None, |
| daily_start_time: str | None, |
| daily_stop_time: str | None, |
| ) -> None: |
| now = utc_now() |
| current = self.get_user_schedule(user_id) |
| payload = { |
| "user_id": user_id, |
| "is_enabled": 1 if is_enabled else 0, |
| "start_date": start_date, |
| "end_date": end_date, |
| "daily_start_time": daily_start_time, |
| "daily_stop_time": daily_stop_time, |
| "last_auto_start_on": "", |
| "last_auto_stop_on": "", |
| "created_at": now, |
| "updated_at": now, |
| "id": int(current["id"]) if current else None, |
| } |
| with self._cursor() as (_connection, cursor): |
| if current is None: |
| cursor.execute( |
| f"INSERT INTO user_schedules (user_id, is_enabled, start_date, end_date, daily_start_time, daily_stop_time, last_auto_start_on, last_auto_stop_on, created_at, updated_at) VALUES ({self._placeholder('user_id')}, {self._placeholder('is_enabled')}, {self._placeholder('start_date')}, {self._placeholder('end_date')}, {self._placeholder('daily_start_time')}, {self._placeholder('daily_stop_time')}, {self._placeholder('last_auto_start_on')}, {self._placeholder('last_auto_stop_on')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", |
| payload, |
| ) |
| else: |
| cursor.execute( |
| f"UPDATE user_schedules SET is_enabled = {self._placeholder('is_enabled')}, start_date = {self._placeholder('start_date')}, end_date = {self._placeholder('end_date')}, daily_start_time = {self._placeholder('daily_start_time')}, daily_stop_time = {self._placeholder('daily_stop_time')}, last_auto_start_on = {self._placeholder('last_auto_start_on')}, last_auto_stop_on = {self._placeholder('last_auto_stop_on')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('id')}", |
| payload, |
| ) |
|
|
| def list_enabled_user_schedules(self) -> list[dict[str, Any]]: |
| query = """ |
| SELECT s.*, u.student_id, u.display_name, u.is_active AS user_is_active |
| FROM user_schedules s |
| JOIN users u ON u.id = s.user_id |
| WHERE s.is_enabled = 1 |
| ORDER BY s.user_id ASC |
| """ |
| with self._cursor() as (_connection, cursor): |
| cursor.execute(query) |
| return self._rows_to_dicts(cursor.fetchall()) |
|
|
| def mark_schedule_auto_start(self, user_id: int, day_text: str) -> None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"UPDATE user_schedules SET last_auto_start_on = {self._placeholder('day_text')}, updated_at = {self._placeholder('updated_at')} WHERE user_id = {self._placeholder('user_id')}", |
| {"day_text": day_text, "updated_at": utc_now(), "user_id": user_id}, |
| ) |
|
|
| def mark_schedule_auto_stop(self, user_id: int, day_text: str) -> None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"UPDATE user_schedules SET last_auto_stop_on = {self._placeholder('day_text')}, updated_at = {self._placeholder('updated_at')} WHERE user_id = {self._placeholder('user_id')}", |
| {"day_text": day_text, "updated_at": utc_now(), "user_id": user_id}, |
| )
|
| def create_registration_code( |
| self, |
| *, |
| created_by: str, |
| note: str = "", |
| max_uses: int = DEFAULT_REGISTRATION_CODE_MAX_USES, |
| ) -> dict[str, Any]: |
| normalized_max_uses = max(1, int(max_uses or DEFAULT_REGISTRATION_CODE_MAX_USES)) |
| now = utc_now() |
| for _ in range(24): |
| code = "SACC-" + "".join(secrets.choice(REGISTRATION_CODE_ALPHABET) for _ in range(8)) |
| if self.get_registration_code_by_code(code) is not None: |
| continue |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"INSERT INTO registration_codes (code, note, is_active, max_uses, used_count, created_by, used_by_user_id, used_at, created_at, updated_at) VALUES ({self._placeholder('code')}, {self._placeholder('note')}, {self._placeholder('is_active')}, {self._placeholder('max_uses')}, {self._placeholder('used_count')}, {self._placeholder('created_by')}, {self._placeholder('used_by_user_id')}, {self._placeholder('used_at')}, {self._placeholder('created_at')}, {self._placeholder('updated_at')})", |
| { |
| "code": code, |
| "note": note.strip(), |
| "is_active": 1, |
| "max_uses": normalized_max_uses, |
| "used_count": 0, |
| "created_by": created_by, |
| "used_by_user_id": None, |
| "used_at": None, |
| "created_at": now, |
| "updated_at": now, |
| }, |
| ) |
| return self.get_registration_code_by_id(int(cursor.lastrowid)) or {"code": code} |
| raise RuntimeError("生成注册码失败,请重试。") |
|
|
| def get_registration_code_by_code(self, code: str) -> dict[str, Any] | None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"SELECT * FROM registration_codes WHERE code = {self._placeholder('code')}", |
| {"code": normalize_registration_code(code)}, |
| ) |
| row = cursor.fetchone() |
| return None if row is None else dict(row) |
|
|
| def get_registration_code_by_id(self, registration_code_id: int) -> dict[str, Any] | None: |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"SELECT * FROM registration_codes WHERE id = {self._placeholder('registration_code_id')}", |
| {"registration_code_id": registration_code_id}, |
| ) |
| row = cursor.fetchone() |
| return None if row is None else dict(row) |
|
|
| def list_registration_codes(self, limit: int = 100) -> list[dict[str, Any]]: |
| query = f""" |
| SELECT rc.*, u.student_id AS used_by_student_id, u.display_name AS used_by_display_name |
| FROM registration_codes rc |
| LEFT JOIN users u ON u.id = rc.used_by_user_id |
| ORDER BY rc.created_at DESC |
| LIMIT {self._placeholder('limit')} |
| """ |
| with self._cursor() as (_connection, cursor): |
| cursor.execute(query, {"limit": int(limit)}) |
| return self._rows_to_dicts(cursor.fetchall()) |
|
|
| def toggle_registration_code_active(self, registration_code_id: int) -> dict[str, Any] | None: |
| code = self.get_registration_code_by_id(registration_code_id) |
| if code is None: |
| return None |
| with self._cursor() as (_connection, cursor): |
| cursor.execute( |
| f"UPDATE registration_codes SET is_active = {self._placeholder('is_active')}, updated_at = {self._placeholder('updated_at')} WHERE id = {self._placeholder('registration_code_id')}", |
| { |
| "is_active": 0 if bool(code.get('is_active')) else 1, |
| "updated_at": utc_now(), |
| "registration_code_id": registration_code_id, |
| }, |
| ) |
| return self.get_registration_code_by_id(registration_code_id)
|
|
|