SACC-release / core /db.py
cacode's picture
Deploy updated SCU course catcher
e28c9e4 verified
from __future__ import annotations
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterator
ACTIVE_TASK_STATUSES = {"pending", "running", "cancel_requested"}
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
class Database:
def __init__(self, path: Path, default_parallel_limit: int = 2) -> None:
self.path = Path(path)
self.default_parallel_limit = default_parallel_limit
self.path.parent.mkdir(parents=True, exist_ok=True)
def _connect(self) -> sqlite3.Connection:
connection = sqlite3.connect(self.path, timeout=30)
connection.row_factory = sqlite3.Row
connection.execute("PRAGMA foreign_keys = ON")
connection.execute("PRAGMA journal_mode = WAL")
return connection
@contextmanager
def _cursor(self) -> Iterator[tuple[sqlite3.Connection, sqlite3.Cursor]]:
connection = self._connect()
try:
cursor = connection.cursor()
yield connection, cursor
connection.commit()
finally:
connection.close()
@staticmethod
def _rows_to_dicts(rows: list[sqlite3.Row]) -> list[dict[str, Any]]:
return [dict(row) for row in rows]
def init_db(self) -> None:
with self._cursor() as (_connection, cursor):
cursor.executescript(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id TEXT NOT NULL UNIQUE,
password_encrypted TEXT NOT NULL,
display_name TEXT NOT NULL DEFAULT '',
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS course_targets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
category TEXT NOT NULL DEFAULT 'free',
course_id TEXT NOT NULL,
course_index TEXT NOT NULL,
created_at TEXT NOT NULL,
UNIQUE(user_id, category, course_id, course_index),
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS admins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS app_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
status TEXT NOT NULL,
requested_by TEXT NOT NULL,
requested_by_role TEXT NOT NULL,
last_error TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
started_at TEXT,
finished_at TEXT,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER,
user_id INTEGER,
scope TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(task_id) REFERENCES tasks(id) ON DELETE CASCADE,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
);
"""
)
if self.get_setting("parallel_limit") is None:
self.set_setting("parallel_limit", str(self.default_parallel_limit))
def get_setting(self, key: str) -> str | None:
with self._cursor() as (_connection, cursor):
row = cursor.execute("SELECT value FROM app_settings WHERE key = ?", (key,)).fetchone()
return None if row is None else str(row["value"])
def set_setting(self, key: str, value: str) -> None:
now = utc_now()
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
INSERT INTO app_settings (key, value, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at = excluded.updated_at
""",
(key, value, now),
)
def get_parallel_limit(self) -> int:
raw_value = self.get_setting("parallel_limit")
if raw_value is None:
return self.default_parallel_limit
try:
return max(1, int(raw_value))
except ValueError:
return self.default_parallel_limit
def set_parallel_limit(self, limit: int) -> None:
self.set_setting("parallel_limit", str(max(1, limit)))
def create_user(self, student_id: str, password_encrypted: str, display_name: str = "") -> int:
now = utc_now()
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
INSERT INTO users (student_id, password_encrypted, display_name, is_active, created_at, updated_at)
VALUES (?, ?, ?, 1, ?, ?)
""",
(student_id.strip(), password_encrypted, display_name.strip(), now, now),
)
return int(cursor.lastrowid)
def update_user(
self,
user_id: int,
*,
password_encrypted: str | None = None,
display_name: str | None = None,
is_active: bool | None = None,
) -> None:
assignments: list[str] = []
values: list[Any] = []
if password_encrypted is not None:
assignments.append("password_encrypted = ?")
values.append(password_encrypted)
if display_name is not None:
assignments.append("display_name = ?")
values.append(display_name.strip())
if is_active is not None:
assignments.append("is_active = ?")
values.append(1 if is_active else 0)
if not assignments:
return
assignments.append("updated_at = ?")
values.append(utc_now())
values.append(user_id)
with self._cursor() as (_connection, cursor):
cursor.execute(f"UPDATE users SET {', '.join(assignments)} WHERE id = ?", tuple(values))
def toggle_user_active(self, user_id: int) -> dict[str, Any] | None:
user = self.get_user(user_id)
if not user:
return None
self.update_user(user_id, is_active=not bool(user["is_active"]))
return self.get_user(user_id)
def get_user(self, user_id: int) -> dict[str, Any] | None:
with self._cursor() as (_connection, cursor):
row = cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
return None if row is None else dict(row)
def get_user_by_student_id(self, student_id: str) -> dict[str, Any] | None:
with self._cursor() as (_connection, cursor):
row = cursor.execute("SELECT * FROM users WHERE student_id = ?", (student_id.strip(),)).fetchone()
return None if row is None else dict(row)
def list_users(self) -> list[dict[str, Any]]:
with self._cursor() as (_connection, cursor):
rows = cursor.execute(
"""
SELECT
u.*,
COUNT(c.id) AS course_count,
(
SELECT t.status
FROM tasks t
WHERE t.user_id = u.id
ORDER BY t.created_at DESC
LIMIT 1
) AS latest_task_status,
(
SELECT t.updated_at
FROM tasks t
WHERE t.user_id = u.id
ORDER BY t.created_at DESC
LIMIT 1
) AS latest_task_updated_at
FROM users u
LEFT JOIN course_targets c ON c.user_id = u.id
GROUP BY u.id
ORDER BY u.student_id ASC
"""
).fetchall()
return self._rows_to_dicts(rows)
def add_course(self, user_id: int, category: str, course_id: str, course_index: str) -> int | None:
now = utc_now()
normalized_category = "plan" if category == "plan" else "free"
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
INSERT OR IGNORE INTO course_targets (user_id, category, course_id, course_index, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(user_id, normalized_category, course_id.strip(), course_index.strip(), now),
)
return int(cursor.lastrowid) if cursor.lastrowid else None
def delete_course(self, course_target_id: int) -> None:
with self._cursor() as (_connection, cursor):
cursor.execute("DELETE FROM course_targets WHERE id = ?", (course_target_id,))
def remove_course_by_identity(self, user_id: int, category: str, course_id: str, course_index: str) -> None:
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
DELETE FROM course_targets
WHERE user_id = ? AND category = ? AND course_id = ? AND course_index = ?
""",
(user_id, category, course_id, course_index),
)
def list_courses_for_user(self, user_id: int) -> list[dict[str, Any]]:
with self._cursor() as (_connection, cursor):
rows = cursor.execute(
"""
SELECT *
FROM course_targets
WHERE user_id = ?
ORDER BY category ASC, course_id ASC, course_index ASC
""",
(user_id,),
).fetchall()
return self._rows_to_dicts(rows)
def create_admin(self, username: str, password_hash: str) -> int:
now = utc_now()
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
INSERT INTO admins (username, password_hash, created_at, updated_at)
VALUES (?, ?, ?, ?)
""",
(username.strip(), password_hash, now, now),
)
return int(cursor.lastrowid)
def get_admin_by_username(self, username: str) -> dict[str, Any] | None:
with self._cursor() as (_connection, cursor):
row = cursor.execute("SELECT * FROM admins WHERE username = ?", (username.strip(),)).fetchone()
return None if row is None else dict(row)
def list_admins(self) -> list[dict[str, Any]]:
with self._cursor() as (_connection, cursor):
rows = cursor.execute(
"SELECT id, username, created_at, updated_at FROM admins ORDER BY username ASC"
).fetchall()
return self._rows_to_dicts(rows)
def find_active_task_for_user(self, user_id: int) -> dict[str, Any] | None:
with self._cursor() as (_connection, cursor):
row = cursor.execute(
"""
SELECT *
FROM tasks
WHERE user_id = ? AND status IN ('pending', 'running', 'cancel_requested')
ORDER BY created_at DESC
LIMIT 1
""",
(user_id,),
).fetchone()
return None if row is None else dict(row)
def create_task(self, user_id: int, requested_by: str, requested_by_role: str) -> int:
now = utc_now()
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
INSERT INTO tasks (user_id, status, requested_by, requested_by_role, created_at, updated_at)
VALUES (?, 'pending', ?, ?, ?, ?)
""",
(user_id, requested_by, requested_by_role, now, now),
)
return int(cursor.lastrowid)
def get_task(self, task_id: int) -> dict[str, Any] | None:
with self._cursor() as (_connection, cursor):
row = cursor.execute(
"""
SELECT
t.*,
u.student_id,
u.display_name
FROM tasks t
JOIN users u ON u.id = t.user_id
WHERE t.id = ?
""",
(task_id,),
).fetchone()
return None if row is None else dict(row)
def get_latest_task_for_user(self, user_id: int) -> dict[str, Any] | None:
with self._cursor() as (_connection, cursor):
row = cursor.execute(
"""
SELECT
t.*,
u.student_id,
u.display_name
FROM tasks t
JOIN users u ON u.id = t.user_id
WHERE t.user_id = ?
ORDER BY t.created_at DESC
LIMIT 1
""",
(user_id,),
).fetchone()
return None if row is None else dict(row)
def list_pending_tasks(self, limit: int) -> list[dict[str, Any]]:
with self._cursor() as (_connection, cursor):
rows = cursor.execute(
"""
SELECT
t.*,
u.student_id,
u.display_name,
u.password_encrypted,
u.is_active
FROM tasks t
JOIN users u ON u.id = t.user_id
WHERE t.status = 'pending'
ORDER BY t.created_at ASC
LIMIT ?
""",
(limit,),
).fetchall()
return self._rows_to_dicts(rows)
def list_recent_tasks(self, limit: int = 20) -> list[dict[str, Any]]:
with self._cursor() as (_connection, cursor):
rows = cursor.execute(
"""
SELECT
t.*,
u.student_id,
u.display_name
FROM tasks t
JOIN users u ON u.id = t.user_id
ORDER BY t.created_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
return self._rows_to_dicts(rows)
def mark_task_running(self, task_id: int) -> None:
now = utc_now()
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
UPDATE tasks
SET status = 'running',
started_at = COALESCE(started_at, ?),
updated_at = ?,
last_error = ''
WHERE id = ?
""",
(now, now, task_id),
)
def finish_task(self, task_id: int, status: str, last_error: str = "") -> None:
now = utc_now()
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
UPDATE tasks
SET status = ?,
finished_at = ?,
updated_at = ?,
last_error = ?
WHERE id = ?
""",
(status, now, now, last_error, task_id),
)
def update_task_status(self, task_id: int, status: str, last_error: str = "") -> None:
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
UPDATE tasks
SET status = ?, updated_at = ?, last_error = ?
WHERE id = ?
""",
(status, utc_now(), last_error, task_id),
)
def request_task_stop(self, task_id: int) -> bool:
task = self.get_task(task_id)
if not task or task["status"] not in ACTIVE_TASK_STATUSES:
return False
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
UPDATE tasks
SET status = 'cancel_requested', updated_at = ?
WHERE id = ?
""",
(utc_now(), task_id),
)
return True
def reset_inflight_tasks(self) -> None:
now = utc_now()
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
UPDATE tasks
SET status = 'stopped',
finished_at = COALESCE(finished_at, ?),
updated_at = ?
WHERE status IN ('pending', 'running', 'cancel_requested')
""",
(now, now),
)
def add_log(self, task_id: int | None, user_id: int | None, scope: str, level: str, message: str) -> int:
now = utc_now()
with self._cursor() as (_connection, cursor):
cursor.execute(
"""
INSERT INTO logs (task_id, user_id, scope, level, message, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(task_id, user_id, scope, level.upper(), message, now),
)
return int(cursor.lastrowid)
def list_recent_logs(self, *, user_id: int | None = None, limit: int = 120) -> list[dict[str, Any]]:
if user_id is None:
query = """
SELECT
l.*,
u.student_id,
u.display_name
FROM logs l
LEFT JOIN users u ON u.id = l.user_id
ORDER BY l.id DESC
LIMIT ?
"""
params = (limit,)
else:
query = """
SELECT
l.*,
u.student_id,
u.display_name
FROM logs l
LEFT JOIN users u ON u.id = l.user_id
WHERE l.user_id = ?
ORDER BY l.id DESC
LIMIT ?
"""
params = (user_id, limit)
with self._cursor() as (_connection, cursor):
rows = cursor.execute(query, params).fetchall()
return list(reversed(self._rows_to_dicts(rows)))
def list_logs_after(self, after_id: int, *, user_id: int | None = None, limit: int = 100) -> list[dict[str, Any]]:
if user_id is None:
query = """
SELECT
l.*,
u.student_id,
u.display_name
FROM logs l
LEFT JOIN users u ON u.id = l.user_id
WHERE l.id > ?
ORDER BY l.id ASC
LIMIT ?
"""
params = (after_id, limit)
else:
query = """
SELECT
l.*,
u.student_id,
u.display_name
FROM logs l
LEFT JOIN users u ON u.id = l.user_id
WHERE l.id > ? AND l.user_id = ?
ORDER BY l.id ASC
LIMIT ?
"""
params = (after_id, user_id, limit)
with self._cursor() as (_connection, cursor):
rows = cursor.execute(query, params).fetchall()
return self._rows_to_dicts(rows)
def get_admin_stats(self) -> dict[str, int]:
with self._cursor() as (_connection, cursor):
users_count = cursor.execute("SELECT COUNT(*) AS total FROM users").fetchone()["total"]
courses_count = cursor.execute("SELECT COUNT(*) AS total FROM course_targets").fetchone()["total"]
admins_count = cursor.execute("SELECT COUNT(*) AS total FROM admins").fetchone()["total"]
running_count = cursor.execute("SELECT COUNT(*) AS total FROM tasks WHERE status = 'running'").fetchone()["total"]
pending_count = cursor.execute("SELECT COUNT(*) AS total FROM tasks WHERE status = 'pending'").fetchone()["total"]
return {
"users_count": int(users_count),
"courses_count": int(courses_count),
"admins_count": int(admins_count) + 1,
"running_count": int(running_count),
"pending_count": int(pending_count),
}