zhoujiaangyao commited on
Commit
e8d1a53
·
1 Parent(s): 5dbd5b2

feat: 外部下载任务队列(网页一键走家用 worker)

Browse files
backend/app/__init__.py CHANGED
@@ -22,7 +22,7 @@ async def verify_web_access_password(
22
  return True
23
 
24
  def create_app(lifespan) -> FastAPI:
25
- from .routers import note, notification, provider, model, config, chat, flashcard, hot_videos, article, trend_subscription, feishu
26
  from .utils.response import ResponseWrapper as R
27
 
28
  app = FastAPI(title="VideoMemo",lifespan=lifespan)
@@ -43,5 +43,6 @@ def create_app(lifespan) -> FastAPI:
43
  app.include_router(trend_subscription.router, prefix="/api", dependencies=protected)
44
  app.include_router(notification.router, prefix="/api", dependencies=protected)
45
  app.include_router(feishu.router, prefix="/api", dependencies=protected)
 
46
 
47
  return app
 
22
  return True
23
 
24
  def create_app(lifespan) -> FastAPI:
25
+ from .routers import note, notification, provider, model, config, chat, flashcard, hot_videos, article, trend_subscription, feishu, worker
26
  from .utils.response import ResponseWrapper as R
27
 
28
  app = FastAPI(title="VideoMemo",lifespan=lifespan)
 
43
  app.include_router(trend_subscription.router, prefix="/api", dependencies=protected)
44
  app.include_router(notification.router, prefix="/api", dependencies=protected)
45
  app.include_router(feishu.router, prefix="/api", dependencies=protected)
46
+ app.include_router(worker.router, prefix="/api", dependencies=protected)
47
 
48
  return app
backend/app/db/download_job_dao.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """外部下载任务队列的读写。"""
2
+ import uuid
3
+ from datetime import datetime
4
+ from typing import Any, Dict, Optional
5
+
6
+ from app.db.engine import SessionLocal
7
+ from app.db.models.download_job import DownloadJob
8
+
9
+
10
+ def create_job(task_id: str, url: str, platform: str, want_video: bool,
11
+ params: Dict[str, Any]) -> str:
12
+ jid = uuid.uuid4().hex
13
+ with SessionLocal() as db:
14
+ db.add(DownloadJob(
15
+ id=jid, task_id=task_id, url=url, platform=platform,
16
+ want_video=want_video, params=params, status="pending",
17
+ ))
18
+ db.commit()
19
+ return jid
20
+
21
+
22
+ def claim_next() -> Optional[Dict[str, Any]]:
23
+ """认领最早的一个 pending 任务,标记 claimed 并返回其信息;没有则 None。
24
+
25
+ 个人部署通常只有一个 worker,简单的 select-then-update 足够。
26
+ """
27
+ with SessionLocal() as db:
28
+ job = (db.query(DownloadJob)
29
+ .filter(DownloadJob.status == "pending")
30
+ .order_by(DownloadJob.created_at.asc())
31
+ .first())
32
+ if job is None:
33
+ return None
34
+ data = {
35
+ "job_id": job.id,
36
+ "task_id": job.task_id,
37
+ "url": job.url,
38
+ "want_video": bool(job.want_video),
39
+ }
40
+ job.status = "claimed"
41
+ job.claimed_at = datetime.now()
42
+ db.commit()
43
+ return data
44
+
45
+
46
+ def get_job(job_id: str) -> Optional[Dict[str, Any]]:
47
+ with SessionLocal() as db:
48
+ job = db.get(DownloadJob, job_id)
49
+ if job is None:
50
+ return None
51
+ return {
52
+ "id": job.id,
53
+ "task_id": job.task_id,
54
+ "url": job.url,
55
+ "platform": job.platform,
56
+ "want_video": bool(job.want_video),
57
+ "params": job.params or {},
58
+ "status": job.status,
59
+ }
60
+
61
+
62
+ def complete_job(job_id: str, file_url: str) -> None:
63
+ with SessionLocal() as db:
64
+ job = db.get(DownloadJob, job_id)
65
+ if job is not None:
66
+ job.status = "done"
67
+ job.file_url = file_url
68
+ db.commit()
69
+
70
+
71
+ def fail_job(job_id: str, error: str) -> None:
72
+ with SessionLocal() as db:
73
+ job = db.get(DownloadJob, job_id)
74
+ if job is not None:
75
+ job.status = "failed"
76
+ job.error = (error or "")[:1000]
77
+ db.commit()
backend/app/db/init_db.py CHANGED
@@ -4,6 +4,7 @@ import os
4
 
5
  from app.db.models.app_config import AppConfig
6
  from app.db.models.articles import ArticleItem, ArticleSubscription, ArticleSubscriptionItem
 
7
  from app.db.models.models import Model
8
  from app.db.models.note import Note
9
  from app.db.models.providers import Provider
 
4
 
5
  from app.db.models.app_config import AppConfig
6
  from app.db.models.articles import ArticleItem, ArticleSubscription, ArticleSubscriptionItem
7
+ from app.db.models.download_job import DownloadJob
8
  from app.db.models.models import Model
9
  from app.db.models.note import Note
10
  from app.db.models.providers import Provider
backend/app/db/models/download_job.py ADDED
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sqlalchemy import Column, String, JSON, Boolean, DateTime, func
2
+
3
+ from app.db.engine import Base
4
+
5
+
6
+ class DownloadJob(Base):
7
+ """外部下载任务队列:网页发起的某些平台(默认 YouTube)下载,交给跑在住宅 IP 上的
8
+ 家用 worker 拉取执行,绕开机房 IP 风控。
9
+
10
+ 家里 worker 轮询认领 pending 任务 → 下载 → 回传文件 → 服务器用原 task_id 跑本地管线。
11
+ """
12
+
13
+ __tablename__ = "download_jobs"
14
+
15
+ id = Column(String, primary_key=True) # job uuid
16
+ task_id = Column(String, nullable=False) # 关联的笔记任务(沿用它跑后续管线)
17
+ url = Column(String, nullable=False)
18
+ platform = Column(String, nullable=False)
19
+ want_video = Column(Boolean, default=False) # True=要截图→下视频;False=只下音频
20
+ params = Column(JSON, nullable=True) # run_note_task 的其余参数,回传后原样重放
21
+ status = Column(String, default="pending") # pending / claimed / done / failed
22
+ file_url = Column(String, nullable=True) # 回传后的 /uploads/xxx
23
+ error = Column(String, nullable=True)
24
+ created_at = Column(DateTime, server_default=func.now())
25
+ claimed_at = Column(DateTime, nullable=True)
backend/app/routers/note.py CHANGED
@@ -12,6 +12,7 @@ from dataclasses import asdict
12
 
13
  from app.db.note_dao import save_note, load_note, get_status
14
  from app.db.video_task_dao import get_task_by_video
 
15
  from app.enmus.exception import NoteErrorEnum
16
  from app.enmus.note_enums import DownloadQuality
17
  from app.exceptions.note import NoteError
@@ -494,6 +495,39 @@ def generate_note(data: VideoRequest, background_tasks: BackgroundTasks):
494
  except Exception as e:
495
  logger.warning(f"写入预取字幕失败 (task_id={task_id}): {e}")
496
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
497
  background_tasks.add_task(run_note_task, task_id, data.video_url, data.platform, data.quality, data.link,
498
  data.screenshot, data.model_name, data.provider_id, data.format, data.style,
499
  data.extras, data.video_understanding, data.video_interval, data.grid_size)
 
12
 
13
  from app.db.note_dao import save_note, load_note, get_status
14
  from app.db.video_task_dao import get_task_by_video
15
+ from app.services import external_download
16
  from app.enmus.exception import NoteErrorEnum
17
  from app.enmus.note_enums import DownloadQuality
18
  from app.exceptions.note import NoteError
 
495
  except Exception as e:
496
  logger.warning(f"写入预取字幕失败 (task_id={task_id}): {e}")
497
 
498
+ # 外部下载:某些平台(默认 YouTube)机房 IP 被风控,改交给跑在住宅 IP 上的家用 worker。
499
+ # 没有预取字幕(确实要下载)且命中外部下载平台时,入队等 worker 拉取,不在服务器本地下。
500
+ if not data.prefetched_transcript and external_download.should_external(data.platform):
501
+ if not external_download.worker_alive():
502
+ return R.error(
503
+ msg="本地下载器未运行:请在你自己的机器上启动 downloader-worker 守护进程后重试"
504
+ "(见 downloader-worker/README)。",
505
+ code=300103,
506
+ )
507
+ from app.db import download_job_dao
508
+ download_job_dao.create_job(
509
+ task_id=task_id,
510
+ url=data.video_url,
511
+ platform=data.platform,
512
+ want_video=bool(data.screenshot),
513
+ params={
514
+ "quality": data.quality.value if hasattr(data.quality, "value") else data.quality,
515
+ "link": data.link,
516
+ "screenshot": data.screenshot,
517
+ "model_name": data.model_name,
518
+ "provider_id": data.provider_id,
519
+ "format": data.format,
520
+ "style": data.style,
521
+ "extras": data.extras,
522
+ "video_understanding": data.video_understanding,
523
+ "video_interval": data.video_interval,
524
+ "grid_size": data.grid_size,
525
+ },
526
+ )
527
+ NoteGenerator()._update_status(task_id, TaskStatus.DOWNLOADING, message="等待本地下载器获取…")
528
+ logger.info(f"任务 {task_id} 走外部下载,已入队等待家用 worker")
529
+ return R.success({"task_id": task_id})
530
+
531
  background_tasks.add_task(run_note_task, task_id, data.video_url, data.platform, data.quality, data.link,
532
  data.screenshot, data.model_name, data.provider_id, data.format, data.style,
533
  data.extras, data.video_understanding, data.video_interval, data.grid_size)
backend/app/routers/worker.py ADDED
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """家用下载 worker 的对接接口(都在 /api 下,复用全局访问密码鉴权)。
2
+
3
+ GET /worker/next worker 轮询认领一个 pending 下载任务(顺带上报心跳)
4
+ POST /worker/complete worker 下载完回传文件 → 服务器用原 task_id 跑本地管线
5
+ POST /worker/fail worker 下载失败上报 → 任务标记失败
6
+ GET /worker/config 查看外部下载配置(前端可选用)
7
+ POST /worker/config 改外部下载配置(enabled / platforms)
8
+ """
9
+ import os
10
+
11
+ from fastapi import APIRouter, BackgroundTasks, File, Form, UploadFile
12
+ from pydantic import BaseModel
13
+
14
+ from app.db import download_job_dao as jobs
15
+ from app.enmus.note_enums import DownloadQuality
16
+ from app.enmus.task_status_enums import TaskStatus
17
+ from app.services import external_download
18
+ from app.utils.response import ResponseWrapper as R
19
+
20
+ router = APIRouter()
21
+
22
+
23
+ @router.get("/worker/next")
24
+ def worker_next():
25
+ external_download.mark_worker_seen()
26
+ job = jobs.claim_next()
27
+ return R.success(job or {})
28
+
29
+
30
+ @router.post("/worker/complete")
31
+ async def worker_complete(
32
+ background_tasks: BackgroundTasks,
33
+ job_id: str = Form(...),
34
+ file: UploadFile = File(...),
35
+ ):
36
+ from app.routers.note import UPLOAD_DIR, run_note_task
37
+
38
+ external_download.mark_worker_seen()
39
+ job = jobs.get_job(job_id)
40
+ if job is None:
41
+ return R.error(msg="下载任务不存在", code=404)
42
+
43
+ os.makedirs(UPLOAD_DIR, exist_ok=True)
44
+ dest = os.path.join(UPLOAD_DIR, file.filename)
45
+ with open(dest, "wb+") as f:
46
+ f.write(await file.read())
47
+ file_url = f"/uploads/{file.filename}"
48
+ jobs.complete_job(job_id, file_url)
49
+
50
+ # 用原 task_id 跑本地管线(platform=local),把网页发起时的参数原样重放
51
+ p = job["params"] or {}
52
+ try:
53
+ quality = DownloadQuality(p.get("quality", "fast"))
54
+ except Exception:
55
+ quality = DownloadQuality.fast
56
+ background_tasks.add_task(
57
+ run_note_task,
58
+ job["task_id"], file_url, "local", quality,
59
+ p.get("link", False), p.get("screenshot", False),
60
+ p.get("model_name"), p.get("provider_id"), p.get("format", []),
61
+ p.get("style"), p.get("extras"), p.get("video_understanding", False),
62
+ p.get("video_interval", 0), p.get("grid_size", []),
63
+ )
64
+ return R.success({"task_id": job["task_id"]})
65
+
66
+
67
+ class WorkerFailRequest(BaseModel):
68
+ job_id: str
69
+ error: str = "下载失败"
70
+
71
+
72
+ @router.post("/worker/fail")
73
+ def worker_fail(data: WorkerFailRequest):
74
+ from app.services.note import NoteGenerator
75
+
76
+ external_download.mark_worker_seen()
77
+ job = jobs.get_job(data.job_id)
78
+ jobs.fail_job(data.job_id, data.error)
79
+ if job is not None:
80
+ NoteGenerator()._update_status(job["task_id"], TaskStatus.FAILED, message=data.error)
81
+ return R.success()
82
+
83
+
84
+ @router.get("/worker/config")
85
+ def get_worker_config():
86
+ cfg = external_download.get_config()
87
+ cfg["worker_online"] = external_download.worker_alive()
88
+ return R.success(cfg)
89
+
90
+
91
+ class WorkerConfigRequest(BaseModel):
92
+ enabled: bool
93
+ platforms: list = ["youtube"]
94
+
95
+
96
+ @router.post("/worker/config")
97
+ def set_worker_config(data: WorkerConfigRequest):
98
+ return R.success(external_download.set_config(data.enabled, data.platforms))
backend/app/services/external_download.py ADDED
@@ -0,0 +1,44 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """外部下载(家用 worker)路由策略 + worker 心跳。
2
+
3
+ 某些平台(默认 YouTube)在机房 IP 上会被风控,改由跑在住宅 IP 上的家用 worker
4
+ 拉取下载。这里集中:是否该走外部下载、worker 是否在线。
5
+
6
+ 配置存 app_config(key="external_downloader"):{enabled: bool, platforms: [str]}。
7
+ 默认开启、只对 youtube 生效——启动了 worker 就能用;没启动则入口会提示先开 worker。
8
+ """
9
+ import time
10
+ from typing import Dict, Any
11
+
12
+ from app.db import app_config_dao
13
+
14
+ _KEY = "external_downloader"
15
+ _DEFAULT = {"enabled": True, "platforms": ["youtube"]}
16
+
17
+ # worker 最近一次轮询的时间戳(进程内存;HF 重启清零,worker 几秒一轮很快回填)
18
+ _last_seen = {"ts": 0.0}
19
+
20
+
21
+ def get_config() -> Dict[str, Any]:
22
+ cfg = app_config_dao.get_value(_KEY) or {}
23
+ return {
24
+ "enabled": bool(cfg.get("enabled", _DEFAULT["enabled"])),
25
+ "platforms": cfg.get("platforms") or _DEFAULT["platforms"],
26
+ }
27
+
28
+
29
+ def set_config(enabled: bool, platforms) -> Dict[str, Any]:
30
+ app_config_dao.set_value(_KEY, {"enabled": bool(enabled), "platforms": list(platforms or [])})
31
+ return get_config()
32
+
33
+
34
+ def should_external(platform: str) -> bool:
35
+ cfg = get_config()
36
+ return cfg["enabled"] and platform in cfg["platforms"]
37
+
38
+
39
+ def mark_worker_seen() -> None:
40
+ _last_seen["ts"] = time.time()
41
+
42
+
43
+ def worker_alive(within: float = 90.0) -> bool:
44
+ return (time.time() - _last_seen["ts"]) < within