Spaces:
Paused
Paused
| from __future__ import annotations | |
| import sqlite3 | |
| from contextlib import contextmanager | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Iterator | |
| ACTIVE_TASK_STATUSES = {"pending", "running", "cancel_requested"} | |
| def utc_now() -> str: | |
| return datetime.now(timezone.utc).isoformat(timespec="seconds") | |
| class Database: | |
| def __init__(self, path: Path, default_parallel_limit: int = 2) -> None: | |
| self.path = Path(path) | |
| self.default_parallel_limit = default_parallel_limit | |
| self.path.parent.mkdir(parents=True, exist_ok=True) | |
| def _connect(self) -> sqlite3.Connection: | |
| connection = sqlite3.connect(self.path, timeout=30) | |
| connection.row_factory = sqlite3.Row | |
| connection.execute("PRAGMA foreign_keys = ON") | |
| connection.execute("PRAGMA journal_mode = WAL") | |
| return connection | |
| def _cursor(self) -> Iterator[tuple[sqlite3.Connection, sqlite3.Cursor]]: | |
| connection = self._connect() | |
| try: | |
| cursor = connection.cursor() | |
| yield connection, cursor | |
| connection.commit() | |
| finally: | |
| connection.close() | |
| def _rows_to_dicts(rows: list[sqlite3.Row]) -> list[dict[str, Any]]: | |
| return [dict(row) for row in rows] | |
| def init_db(self) -> None: | |
| with self._cursor() as (_connection, cursor): | |
| cursor.executescript( | |
| """ | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| student_id TEXT NOT NULL UNIQUE, | |
| password_encrypted TEXT NOT NULL, | |
| display_name TEXT NOT NULL DEFAULT '', | |
| is_active INTEGER NOT NULL DEFAULT 1, | |
| created_at TEXT NOT NULL, | |
| updated_at TEXT NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS course_targets ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id INTEGER NOT NULL, | |
| category TEXT NOT NULL DEFAULT 'free', | |
| course_id TEXT NOT NULL, | |
| course_index TEXT NOT NULL, | |
| created_at TEXT NOT NULL, | |
| UNIQUE(user_id, category, course_id, course_index), | |
| FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE | |
| ); | |
| CREATE TABLE IF NOT EXISTS admins ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT NOT NULL UNIQUE, | |
| password_hash TEXT NOT NULL, | |
| created_at TEXT NOT NULL, | |
| updated_at TEXT NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS app_settings ( | |
| key TEXT PRIMARY KEY, | |
| value TEXT NOT NULL, | |
| updated_at TEXT NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS tasks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id INTEGER NOT NULL, | |
| status TEXT NOT NULL, | |
| requested_by TEXT NOT NULL, | |
| requested_by_role TEXT NOT NULL, | |
| last_error TEXT NOT NULL DEFAULT '', | |
| created_at TEXT NOT NULL, | |
| started_at TEXT, | |
| finished_at TEXT, | |
| updated_at TEXT NOT NULL, | |
| FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE | |
| ); | |
| CREATE TABLE IF NOT EXISTS logs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| task_id INTEGER, | |
| user_id INTEGER, | |
| scope TEXT NOT NULL, | |
| level TEXT NOT NULL, | |
| message TEXT NOT NULL, | |
| created_at TEXT NOT NULL, | |
| FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE, | |
| FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE | |
| ); | |
| """ | |
| ) | |
| if self.get_setting("parallel_limit") is None: | |
| self.set_setting("parallel_limit", str(self.default_parallel_limit)) | |
| def get_setting(self, key: str) -> str | None: | |
| with self._cursor() as (_connection, cursor): | |
| row = cursor.execute("SELECT value FROM app_settings WHERE key = ?", (key,)).fetchone() | |
| return None if row is None else str(row["value"]) | |
| def set_setting(self, key: str, value: str) -> None: | |
| now = utc_now() | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| INSERT INTO app_settings (key, value, updated_at) | |
| VALUES (?, ?, ?) | |
| ON CONFLICT(key) DO UPDATE SET | |
| value = excluded.value, | |
| updated_at = excluded.updated_at | |
| """, | |
| (key, value, now), | |
| ) | |
| def get_parallel_limit(self) -> int: | |
| raw_value = self.get_setting("parallel_limit") | |
| if raw_value is None: | |
| return self.default_parallel_limit | |
| try: | |
| return max(1, int(raw_value)) | |
| except ValueError: | |
| return self.default_parallel_limit | |
| def set_parallel_limit(self, limit: int) -> None: | |
| self.set_setting("parallel_limit", str(max(1, limit))) | |
| def create_user(self, student_id: str, password_encrypted: str, display_name: str = "") -> int: | |
| now = utc_now() | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| INSERT INTO users (student_id, password_encrypted, display_name, is_active, created_at, updated_at) | |
| VALUES (?, ?, ?, 1, ?, ?) | |
| """, | |
| (student_id.strip(), password_encrypted, display_name.strip(), now, now), | |
| ) | |
| return int(cursor.lastrowid) | |
| def update_user( | |
| self, | |
| user_id: int, | |
| *, | |
| password_encrypted: str | None = None, | |
| display_name: str | None = None, | |
| is_active: bool | None = None, | |
| ) -> None: | |
| assignments: list[str] = [] | |
| values: list[Any] = [] | |
| if password_encrypted is not None: | |
| assignments.append("password_encrypted = ?") | |
| values.append(password_encrypted) | |
| if display_name is not None: | |
| assignments.append("display_name = ?") | |
| values.append(display_name.strip()) | |
| if is_active is not None: | |
| assignments.append("is_active = ?") | |
| values.append(1 if is_active else 0) | |
| if not assignments: | |
| return | |
| assignments.append("updated_at = ?") | |
| values.append(utc_now()) | |
| values.append(user_id) | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute(f"UPDATE users SET {', '.join(assignments)} WHERE id = ?", tuple(values)) | |
| def toggle_user_active(self, user_id: int) -> dict[str, Any] | None: | |
| user = self.get_user(user_id) | |
| if not user: | |
| return None | |
| self.update_user(user_id, is_active=not bool(user["is_active"])) | |
| return self.get_user(user_id) | |
| def get_user(self, user_id: int) -> dict[str, Any] | None: | |
| with self._cursor() as (_connection, cursor): | |
| row = cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() | |
| return None if row is None else dict(row) | |
| def get_user_by_student_id(self, student_id: str) -> dict[str, Any] | None: | |
| with self._cursor() as (_connection, cursor): | |
| row = cursor.execute("SELECT * FROM users WHERE student_id = ?", (student_id.strip(),)).fetchone() | |
| return None if row is None else dict(row) | |
| def list_users(self) -> list[dict[str, Any]]: | |
| with self._cursor() as (_connection, cursor): | |
| rows = cursor.execute( | |
| """ | |
| SELECT | |
| u.*, | |
| COUNT(c.id) AS course_count, | |
| ( | |
| SELECT t.status | |
| FROM tasks t | |
| WHERE t.user_id = u.id | |
| ORDER BY t.created_at DESC | |
| LIMIT 1 | |
| ) AS latest_task_status, | |
| ( | |
| SELECT t.updated_at | |
| FROM tasks t | |
| WHERE t.user_id = u.id | |
| ORDER BY t.created_at DESC | |
| LIMIT 1 | |
| ) AS latest_task_updated_at | |
| FROM users u | |
| LEFT JOIN course_targets c ON c.user_id = u.id | |
| GROUP BY u.id | |
| ORDER BY u.student_id ASC | |
| """ | |
| ).fetchall() | |
| return self._rows_to_dicts(rows) | |
| def add_course(self, user_id: int, category: str, course_id: str, course_index: str) -> int | None: | |
| now = utc_now() | |
| normalized_category = "plan" if category == "plan" else "free" | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| INSERT OR IGNORE INTO course_targets (user_id, category, course_id, course_index, created_at) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, | |
| (user_id, normalized_category, course_id.strip(), course_index.strip(), now), | |
| ) | |
| return int(cursor.lastrowid) if cursor.lastrowid else None | |
| def delete_course(self, course_target_id: int) -> None: | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute("DELETE FROM course_targets WHERE id = ?", (course_target_id,)) | |
| def remove_course_by_identity(self, user_id: int, category: str, course_id: str, course_index: str) -> None: | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| DELETE FROM course_targets | |
| WHERE user_id = ? AND category = ? AND course_id = ? AND course_index = ? | |
| """, | |
| (user_id, category, course_id, course_index), | |
| ) | |
| def list_courses_for_user(self, user_id: int) -> list[dict[str, Any]]: | |
| with self._cursor() as (_connection, cursor): | |
| rows = cursor.execute( | |
| """ | |
| SELECT * | |
| FROM course_targets | |
| WHERE user_id = ? | |
| ORDER BY category ASC, course_id ASC, course_index ASC | |
| """, | |
| (user_id,), | |
| ).fetchall() | |
| return self._rows_to_dicts(rows) | |
| def create_admin(self, username: str, password_hash: str) -> int: | |
| now = utc_now() | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| INSERT INTO admins (username, password_hash, created_at, updated_at) | |
| VALUES (?, ?, ?, ?) | |
| """, | |
| (username.strip(), password_hash, now, now), | |
| ) | |
| return int(cursor.lastrowid) | |
| def get_admin_by_username(self, username: str) -> dict[str, Any] | None: | |
| with self._cursor() as (_connection, cursor): | |
| row = cursor.execute("SELECT * FROM admins WHERE username = ?", (username.strip(),)).fetchone() | |
| return None if row is None else dict(row) | |
| def list_admins(self) -> list[dict[str, Any]]: | |
| with self._cursor() as (_connection, cursor): | |
| rows = cursor.execute( | |
| "SELECT id, username, created_at, updated_at FROM admins ORDER BY username ASC" | |
| ).fetchall() | |
| return self._rows_to_dicts(rows) | |
| def find_active_task_for_user(self, user_id: int) -> dict[str, Any] | None: | |
| with self._cursor() as (_connection, cursor): | |
| row = cursor.execute( | |
| """ | |
| SELECT * | |
| FROM tasks | |
| WHERE user_id = ? AND status IN ('pending', 'running', 'cancel_requested') | |
| ORDER BY created_at DESC | |
| LIMIT 1 | |
| """, | |
| (user_id,), | |
| ).fetchone() | |
| return None if row is None else dict(row) | |
| def create_task(self, user_id: int, requested_by: str, requested_by_role: str) -> int: | |
| now = utc_now() | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| INSERT INTO tasks (user_id, status, requested_by, requested_by_role, created_at, updated_at) | |
| VALUES (?, 'pending', ?, ?, ?, ?) | |
| """, | |
| (user_id, requested_by, requested_by_role, now, now), | |
| ) | |
| return int(cursor.lastrowid) | |
| def get_task(self, task_id: int) -> dict[str, Any] | None: | |
| with self._cursor() as (_connection, cursor): | |
| row = cursor.execute( | |
| """ | |
| SELECT | |
| t.*, | |
| u.student_id, | |
| u.display_name | |
| FROM tasks t | |
| JOIN users u ON u.id = t.user_id | |
| WHERE t.id = ? | |
| """, | |
| (task_id,), | |
| ).fetchone() | |
| return None if row is None else dict(row) | |
| def get_latest_task_for_user(self, user_id: int) -> dict[str, Any] | None: | |
| with self._cursor() as (_connection, cursor): | |
| row = cursor.execute( | |
| """ | |
| SELECT | |
| t.*, | |
| u.student_id, | |
| u.display_name | |
| FROM tasks t | |
| JOIN users u ON u.id = t.user_id | |
| WHERE t.user_id = ? | |
| ORDER BY t.created_at DESC | |
| LIMIT 1 | |
| """, | |
| (user_id,), | |
| ).fetchone() | |
| return None if row is None else dict(row) | |
| def list_pending_tasks(self, limit: int) -> list[dict[str, Any]]: | |
| with self._cursor() as (_connection, cursor): | |
| rows = cursor.execute( | |
| """ | |
| SELECT | |
| t.*, | |
| u.student_id, | |
| u.display_name, | |
| u.password_encrypted, | |
| u.is_active | |
| FROM tasks t | |
| JOIN users u ON u.id = t.user_id | |
| WHERE t.status = 'pending' | |
| ORDER BY t.created_at ASC | |
| LIMIT ? | |
| """, | |
| (limit,), | |
| ).fetchall() | |
| return self._rows_to_dicts(rows) | |
| def list_recent_tasks(self, limit: int = 20) -> list[dict[str, Any]]: | |
| with self._cursor() as (_connection, cursor): | |
| rows = cursor.execute( | |
| """ | |
| SELECT | |
| t.*, | |
| u.student_id, | |
| u.display_name | |
| FROM tasks t | |
| JOIN users u ON u.id = t.user_id | |
| ORDER BY t.created_at DESC | |
| LIMIT ? | |
| """, | |
| (limit,), | |
| ).fetchall() | |
| return self._rows_to_dicts(rows) | |
| def mark_task_running(self, task_id: int) -> None: | |
| now = utc_now() | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| UPDATE tasks | |
| SET status = 'running', | |
| started_at = COALESCE(started_at, ?), | |
| updated_at = ?, | |
| last_error = '' | |
| WHERE id = ? | |
| """, | |
| (now, now, task_id), | |
| ) | |
| def finish_task(self, task_id: int, status: str, last_error: str = "") -> None: | |
| now = utc_now() | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| UPDATE tasks | |
| SET status = ?, | |
| finished_at = ?, | |
| updated_at = ?, | |
| last_error = ? | |
| WHERE id = ? | |
| """, | |
| (status, now, now, last_error, task_id), | |
| ) | |
| def update_task_status(self, task_id: int, status: str, last_error: str = "") -> None: | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| UPDATE tasks | |
| SET status = ?, updated_at = ?, last_error = ? | |
| WHERE id = ? | |
| """, | |
| (status, utc_now(), last_error, task_id), | |
| ) | |
| def request_task_stop(self, task_id: int) -> bool: | |
| task = self.get_task(task_id) | |
| if not task or task["status"] not in ACTIVE_TASK_STATUSES: | |
| return False | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| UPDATE tasks | |
| SET status = 'cancel_requested', updated_at = ? | |
| WHERE id = ? | |
| """, | |
| (utc_now(), task_id), | |
| ) | |
| return True | |
| def reset_inflight_tasks(self) -> None: | |
| now = utc_now() | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| UPDATE tasks | |
| SET status = 'stopped', | |
| finished_at = COALESCE(finished_at, ?), | |
| updated_at = ? | |
| WHERE status IN ('pending', 'running', 'cancel_requested') | |
| """, | |
| (now, now), | |
| ) | |
| def add_log(self, task_id: int | None, user_id: int | None, scope: str, level: str, message: str) -> int: | |
| now = utc_now() | |
| with self._cursor() as (_connection, cursor): | |
| cursor.execute( | |
| """ | |
| INSERT INTO logs (task_id, user_id, scope, level, message, created_at) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, | |
| (task_id, user_id, scope, level.upper(), message, now), | |
| ) | |
| return int(cursor.lastrowid) | |
| def list_recent_logs(self, *, user_id: int | None = None, limit: int = 120) -> list[dict[str, Any]]: | |
| if user_id is None: | |
| query = """ | |
| SELECT | |
| l.*, | |
| u.student_id, | |
| u.display_name | |
| FROM logs l | |
| LEFT JOIN users u ON u.id = l.user_id | |
| ORDER BY l.id DESC | |
| LIMIT ? | |
| """ | |
| params = (limit,) | |
| else: | |
| query = """ | |
| SELECT | |
| l.*, | |
| u.student_id, | |
| u.display_name | |
| FROM logs l | |
| LEFT JOIN users u ON u.id = l.user_id | |
| WHERE l.user_id = ? | |
| ORDER BY l.id DESC | |
| LIMIT ? | |
| """ | |
| params = (user_id, limit) | |
| with self._cursor() as (_connection, cursor): | |
| rows = cursor.execute(query, params).fetchall() | |
| return list(reversed(self._rows_to_dicts(rows))) | |
| def list_logs_after(self, after_id: int, *, user_id: int | None = None, limit: int = 100) -> list[dict[str, Any]]: | |
| if user_id is None: | |
| query = """ | |
| SELECT | |
| l.*, | |
| u.student_id, | |
| u.display_name | |
| FROM logs l | |
| LEFT JOIN users u ON u.id = l.user_id | |
| WHERE l.id > ? | |
| ORDER BY l.id ASC | |
| LIMIT ? | |
| """ | |
| params = (after_id, limit) | |
| else: | |
| query = """ | |
| SELECT | |
| l.*, | |
| u.student_id, | |
| u.display_name | |
| FROM logs l | |
| LEFT JOIN users u ON u.id = l.user_id | |
| WHERE l.id > ? AND l.user_id = ? | |
| ORDER BY l.id ASC | |
| LIMIT ? | |
| """ | |
| params = (after_id, user_id, limit) | |
| with self._cursor() as (_connection, cursor): | |
| rows = cursor.execute(query, params).fetchall() | |
| return self._rows_to_dicts(rows) | |
| def get_admin_stats(self) -> dict[str, int]: | |
| with self._cursor() as (_connection, cursor): | |
| users_count = cursor.execute("SELECT COUNT(*) AS total FROM users").fetchone()["total"] | |
| courses_count = cursor.execute("SELECT COUNT(*) AS total FROM course_targets").fetchone()["total"] | |
| admins_count = cursor.execute("SELECT COUNT(*) AS total FROM admins").fetchone()["total"] | |
| running_count = cursor.execute("SELECT COUNT(*) AS total FROM tasks WHERE status = 'running'").fetchone()["total"] | |
| pending_count = cursor.execute("SELECT COUNT(*) AS total FROM tasks WHERE status = 'pending'").fetchone()["total"] | |
| return { | |
| "users_count": int(users_count), | |
| "courses_count": int(courses_count), | |
| "admins_count": int(admins_count) + 1, | |
| "running_count": int(running_count), | |
| "pending_count": int(pending_count), | |
| } | |