videoNote / backend /app /routers /worker.py
zhoujiaangyao
feat: 外部下载任务队列(网页一键走家用 worker)
e8d1a53
Raw
History Blame Contribute Delete
3.26 kB
"""家用下载 worker 的对接接口(都在 /api 下,复用全局访问密码鉴权)。
GET /worker/next worker 轮询认领一个 pending 下载任务(顺带上报心跳)
POST /worker/complete worker 下载完回传文件 → 服务器用原 task_id 跑本地管线
POST /worker/fail worker 下载失败上报 → 任务标记失败
GET /worker/config 查看外部下载配置(前端可选用)
POST /worker/config 改外部下载配置(enabled / platforms)
"""
import os
from fastapi import APIRouter, BackgroundTasks, File, Form, UploadFile
from pydantic import BaseModel
from app.db import download_job_dao as jobs
from app.enmus.note_enums import DownloadQuality
from app.enmus.task_status_enums import TaskStatus
from app.services import external_download
from app.utils.response import ResponseWrapper as R
router = APIRouter()
@router.get("/worker/next")
def worker_next():
external_download.mark_worker_seen()
job = jobs.claim_next()
return R.success(job or {})
@router.post("/worker/complete")
async def worker_complete(
background_tasks: BackgroundTasks,
job_id: str = Form(...),
file: UploadFile = File(...),
):
from app.routers.note import UPLOAD_DIR, run_note_task
external_download.mark_worker_seen()
job = jobs.get_job(job_id)
if job is None:
return R.error(msg="下载任务不存在", code=404)
os.makedirs(UPLOAD_DIR, exist_ok=True)
dest = os.path.join(UPLOAD_DIR, file.filename)
with open(dest, "wb+") as f:
f.write(await file.read())
file_url = f"/uploads/{file.filename}"
jobs.complete_job(job_id, file_url)
# 用原 task_id 跑本地管线(platform=local),把网页发起时的参数原样重放
p = job["params"] or {}
try:
quality = DownloadQuality(p.get("quality", "fast"))
except Exception:
quality = DownloadQuality.fast
background_tasks.add_task(
run_note_task,
job["task_id"], file_url, "local", quality,
p.get("link", False), p.get("screenshot", False),
p.get("model_name"), p.get("provider_id"), p.get("format", []),
p.get("style"), p.get("extras"), p.get("video_understanding", False),
p.get("video_interval", 0), p.get("grid_size", []),
)
return R.success({"task_id": job["task_id"]})
class WorkerFailRequest(BaseModel):
job_id: str
error: str = "下载失败"
@router.post("/worker/fail")
def worker_fail(data: WorkerFailRequest):
from app.services.note import NoteGenerator
external_download.mark_worker_seen()
job = jobs.get_job(data.job_id)
jobs.fail_job(data.job_id, data.error)
if job is not None:
NoteGenerator()._update_status(job["task_id"], TaskStatus.FAILED, message=data.error)
return R.success()
@router.get("/worker/config")
def get_worker_config():
cfg = external_download.get_config()
cfg["worker_online"] = external_download.worker_alive()
return R.success(cfg)
class WorkerConfigRequest(BaseModel):
enabled: bool
platforms: list = ["youtube"]
@router.post("/worker/config")
def set_worker_config(data: WorkerConfigRequest):
return R.success(external_download.set_config(data.enabled, data.platforms))