PDF / src /jobs.py
BirkhoffLee's picture
refactor: 完成了阶段二的修复
b2d8381 unverified
"""任务相关的持久化操作与辅助函数。"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import sqlite3
import storage
# ── 任务状态与状态机 ────────────────────────────────────────────────────────────
# 约定的任务状态枚举,避免在业务层随意写字符串
STATUS_QUEUED = "queued"
STATUS_RUNNING = "running"
STATUS_SUCCEEDED = "succeeded"
STATUS_FAILED = "failed"
STATUS_CANCELLED = "cancelled"
ALLOWED_STATUSES: set[str] = {
STATUS_QUEUED,
STATUS_RUNNING,
STATUS_SUCCEEDED,
STATUS_FAILED,
STATUS_CANCELLED,
}
def row_to_job_dict(row: sqlite3.Row) -> dict[str, Any]:
"""将任务行转换为对外暴露的字典结构。"""
job = dict(row)
job["artifact_urls"] = {
"mono": f"/api/jobs/{job['id']}/artifacts/mono"
if job.get("mono_pdf_path")
else None,
"dual": f"/api/jobs/{job['id']}/artifacts/dual"
if job.get("dual_pdf_path")
else None,
"glossary": f"/api/jobs/{job['id']}/artifacts/glossary"
if job.get("glossary_path")
else None,
}
return job
def update_job(job_id: str, **fields: Any) -> None:
"""更新任务记录指定字段。
注意:业务代码应该优先通过 transition_job 做状态机驱动更新,
直接调用本函数仅用于与状态无关的字段(例如 cancel_requested)。
"""
if not fields:
return
fields["updated_at"] = storage.now_iso()
set_clause = ", ".join(f"{k} = ?" for k in fields.keys())
params = tuple(fields.values()) + (job_id,)
storage.db_execute(f"UPDATE jobs SET {set_clause} WHERE id = ?", params)
def get_job_row(job_id: str) -> sqlite3.Row | None:
"""按 ID 获取任务原始行。"""
return storage.db_fetchone("SELECT * FROM jobs WHERE id = ?", (job_id,))
def get_job_for_user(job_id: str, username: str) -> dict[str, Any] | None:
"""获取用户可见的任务,如果不存在或不属于该用户返回 None。"""
row = storage.db_fetchone(
"SELECT * FROM jobs WHERE id = ? AND username = ?",
(job_id, username),
)
if row is None:
return None
return row_to_job_dict(row)
def get_jobs_for_user(username: str, limit: int) -> list[dict[str, Any]]:
"""列出用户的任务列表,按创建时间倒序。"""
rows = storage.db_fetchall(
"""
SELECT * FROM jobs
WHERE username = ?
ORDER BY created_at DESC
LIMIT ?
""",
(username, limit),
)
return [row_to_job_dict(row) for row in rows]
def create_job_record(
*,
job_id: str,
username: str,
filename: str,
input_path: Path,
output_dir: Path,
model: str,
lang_in: str,
lang_out: str,
) -> dict[str, Any]:
"""插入一条新任务并返回任务字典。"""
now = storage.now_iso()
storage.db_execute(
"""
INSERT INTO jobs(
id, username, filename, input_path, output_dir,
status, progress, message, error,
model, lang_in, lang_out,
cancel_requested,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
job_id,
username,
filename,
str(input_path),
str(output_dir),
"queued",
0.0,
"Queued",
None,
model,
lang_in,
lang_out,
0,
now,
now,
),
)
row = storage.db_fetchone("SELECT * FROM jobs WHERE id = ?", (job_id,))
if row is None:
raise RuntimeError("Failed to fetch job after insert")
return row_to_job_dict(row)
def resolve_artifact_path(raw_path: str | None, output_dir: Path) -> Path | None:
"""解析并校验任务产物路径,限制在 output_dir 内部。"""
if not raw_path:
return None
path = Path(raw_path)
if not path.is_absolute():
path = (output_dir / path).resolve()
else:
path = path.resolve()
if not path.exists():
return None
try:
path.relative_to(output_dir)
except ValueError:
return None
return path
def transition_job(job_id: str, event: str, **extra_fields: Any) -> dict[str, Any] | None:
"""基于事件驱动的任务状态迁移。
这里只负责:
* 校验当前状态是否允许执行给定事件
* 决定目标状态(如果有)
* 写入数据库
* 返回更新后的任务字典(用于推送给前端)
状态枚举固定为 queued/running/succeeded/failed/cancelled,避免状态空间爆炸。
"""
row = get_job_row(job_id)
if row is None:
return None
current_status = row["status"]
if current_status not in ALLOWED_STATUSES:
# 非法状态一律拒绝迁移,由调用方记录日志
return None
# 简单的事件 -> 允许来源状态集合、目标状态映射
# 对于 progress 这类事件,目标状态为 None,只更新进度等字段。
transitions: dict[str, dict[str, Any]] = {
"start": {
"from": {STATUS_QUEUED},
"to": STATUS_RUNNING,
},
"progress": {
"from": {STATUS_RUNNING},
"to": None,
},
"finish_ok": {
"from": {STATUS_RUNNING},
"to": STATUS_SUCCEEDED,
},
"finish_error": {
"from": {STATUS_QUEUED, STATUS_RUNNING},
"to": STATUS_FAILED,
},
"cancel_before_start": {
"from": {STATUS_QUEUED},
"to": STATUS_CANCELLED,
},
"cancel_running": {
"from": {STATUS_RUNNING},
"to": STATUS_CANCELLED,
},
# 预留重启失败事件,当前在 gateway 中直接 SQL 处理,不走这里
"restart_fail": {
"from": {STATUS_RUNNING},
"to": STATUS_FAILED,
},
}
cfg = transitions.get(event)
if cfg is None:
return None
if current_status not in cfg["from"]:
return None
fields: dict[str, Any] = dict(extra_fields)
target_status = cfg["to"]
if target_status is not None:
fields["status"] = target_status
update_job(job_id, **fields)
new_row = get_job_row(job_id)
if new_row is None:
return None
return row_to_job_dict(new_row)