| """任务相关的持久化操作与辅助函数。""" |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Any |
|
|
| import sqlite3 |
|
|
| import storage |
|
|
|
|
| |
|
|
| |
| STATUS_QUEUED = "queued" |
| STATUS_RUNNING = "running" |
| STATUS_SUCCEEDED = "succeeded" |
| STATUS_FAILED = "failed" |
| STATUS_CANCELLED = "cancelled" |
|
|
| ALLOWED_STATUSES: set[str] = { |
| STATUS_QUEUED, |
| STATUS_RUNNING, |
| STATUS_SUCCEEDED, |
| STATUS_FAILED, |
| STATUS_CANCELLED, |
| } |
|
|
|
|
| def row_to_job_dict(row: sqlite3.Row) -> dict[str, Any]: |
| """将任务行转换为对外暴露的字典结构。""" |
| job = dict(row) |
| job["artifact_urls"] = { |
| "mono": f"/api/jobs/{job['id']}/artifacts/mono" |
| if job.get("mono_pdf_path") |
| else None, |
| "dual": f"/api/jobs/{job['id']}/artifacts/dual" |
| if job.get("dual_pdf_path") |
| else None, |
| "glossary": f"/api/jobs/{job['id']}/artifacts/glossary" |
| if job.get("glossary_path") |
| else None, |
| } |
| return job |
|
|
|
|
| def update_job(job_id: str, **fields: Any) -> None: |
| """更新任务记录指定字段。 |
| |
| 注意:业务代码应该优先通过 transition_job 做状态机驱动更新, |
| 直接调用本函数仅用于与状态无关的字段(例如 cancel_requested)。 |
| """ |
| if not fields: |
| return |
| fields["updated_at"] = storage.now_iso() |
| set_clause = ", ".join(f"{k} = ?" for k in fields.keys()) |
| params = tuple(fields.values()) + (job_id,) |
| storage.db_execute(f"UPDATE jobs SET {set_clause} WHERE id = ?", params) |
|
|
|
|
| def get_job_row(job_id: str) -> sqlite3.Row | None: |
| """按 ID 获取任务原始行。""" |
| return storage.db_fetchone("SELECT * FROM jobs WHERE id = ?", (job_id,)) |
|
|
|
|
| def get_job_for_user(job_id: str, username: str) -> dict[str, Any] | None: |
| """获取用户可见的任务,如果不存在或不属于该用户返回 None。""" |
| row = storage.db_fetchone( |
| "SELECT * FROM jobs WHERE id = ? AND username = ?", |
| (job_id, username), |
| ) |
| if row is None: |
| return None |
| return row_to_job_dict(row) |
|
|
|
|
| def get_jobs_for_user(username: str, limit: int) -> list[dict[str, Any]]: |
| """列出用户的任务列表,按创建时间倒序。""" |
| rows = storage.db_fetchall( |
| """ |
| SELECT * FROM jobs |
| WHERE username = ? |
| ORDER BY created_at DESC |
| LIMIT ? |
| """, |
| (username, limit), |
| ) |
| return [row_to_job_dict(row) for row in rows] |
|
|
|
|
| def create_job_record( |
| *, |
| job_id: str, |
| username: str, |
| filename: str, |
| input_path: Path, |
| output_dir: Path, |
| model: str, |
| lang_in: str, |
| lang_out: str, |
| ) -> dict[str, Any]: |
| """插入一条新任务并返回任务字典。""" |
| now = storage.now_iso() |
| storage.db_execute( |
| """ |
| INSERT INTO jobs( |
| id, username, filename, input_path, output_dir, |
| status, progress, message, error, |
| model, lang_in, lang_out, |
| cancel_requested, |
| created_at, updated_at |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| """, |
| ( |
| job_id, |
| username, |
| filename, |
| str(input_path), |
| str(output_dir), |
| "queued", |
| 0.0, |
| "Queued", |
| None, |
| model, |
| lang_in, |
| lang_out, |
| 0, |
| now, |
| now, |
| ), |
| ) |
| row = storage.db_fetchone("SELECT * FROM jobs WHERE id = ?", (job_id,)) |
| if row is None: |
| raise RuntimeError("Failed to fetch job after insert") |
| return row_to_job_dict(row) |
|
|
|
|
| def resolve_artifact_path(raw_path: str | None, output_dir: Path) -> Path | None: |
| """解析并校验任务产物路径,限制在 output_dir 内部。""" |
| if not raw_path: |
| return None |
| path = Path(raw_path) |
| if not path.is_absolute(): |
| path = (output_dir / path).resolve() |
| else: |
| path = path.resolve() |
|
|
| if not path.exists(): |
| return None |
|
|
| try: |
| path.relative_to(output_dir) |
| except ValueError: |
| return None |
| return path |
|
|
|
|
| def transition_job(job_id: str, event: str, **extra_fields: Any) -> dict[str, Any] | None: |
| """基于事件驱动的任务状态迁移。 |
| |
| 这里只负责: |
| * 校验当前状态是否允许执行给定事件 |
| * 决定目标状态(如果有) |
| * 写入数据库 |
| * 返回更新后的任务字典(用于推送给前端) |
| |
| 状态枚举固定为 queued/running/succeeded/failed/cancelled,避免状态空间爆炸。 |
| """ |
| row = get_job_row(job_id) |
| if row is None: |
| return None |
|
|
| current_status = row["status"] |
| if current_status not in ALLOWED_STATUSES: |
| |
| return None |
|
|
| |
| |
| transitions: dict[str, dict[str, Any]] = { |
| "start": { |
| "from": {STATUS_QUEUED}, |
| "to": STATUS_RUNNING, |
| }, |
| "progress": { |
| "from": {STATUS_RUNNING}, |
| "to": None, |
| }, |
| "finish_ok": { |
| "from": {STATUS_RUNNING}, |
| "to": STATUS_SUCCEEDED, |
| }, |
| "finish_error": { |
| "from": {STATUS_QUEUED, STATUS_RUNNING}, |
| "to": STATUS_FAILED, |
| }, |
| "cancel_before_start": { |
| "from": {STATUS_QUEUED}, |
| "to": STATUS_CANCELLED, |
| }, |
| "cancel_running": { |
| "from": {STATUS_RUNNING}, |
| "to": STATUS_CANCELLED, |
| }, |
| |
| "restart_fail": { |
| "from": {STATUS_RUNNING}, |
| "to": STATUS_FAILED, |
| }, |
| } |
|
|
| cfg = transitions.get(event) |
| if cfg is None: |
| return None |
|
|
| if current_status not in cfg["from"]: |
| return None |
|
|
| fields: dict[str, Any] = dict(extra_fields) |
| target_status = cfg["to"] |
| if target_status is not None: |
| fields["status"] = target_status |
|
|
| update_job(job_id, **fields) |
| new_row = get_job_row(job_id) |
| if new_row is None: |
| return None |
| return row_to_job_dict(new_row) |
|
|
|
|