vLAR commited on
Commit
86fd0d5
·
1 Parent(s): 54708e8
DESIGN_v0.5_archive.md ADDED
@@ -0,0 +1,674 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 📋 PhysInOne Benchmark 多任务自动评测系统 — 设计文档 (v0.5)
2
+
3
+ > **维护方**:[vLAR Group](https://vlar-group.github.io/) (HK PolyU)
4
+
5
+ ## 0. v0.5 关键变化(相对 v0.4)
6
+
7
+ | 变化点 | v0.4 | v0.5 |
8
+ |---|---|---|
9
+ | Worker Space | 待创建 | **已创建**:[`vLAR/PhysInOne-Leaderboard-Worker`](https://huggingface.co/spaces/vLAR/PhysInOne-Leaderboard-Worker) |
10
+ | 持久存储 | 无;状态全部塞进 HF dataset | **Worker 挂载了永久 bucket `/data`**(Spaces persistent storage),承担所有 worker 端状态 |
11
+ | GT / progress / lease / logs / archive | 全部存在 HF dataset | **全部迁到 Worker `/data`**,HF dataset 不再承担此职 |
12
+ | HF dataset 角色 | 对象存储 + 状态库 + 日志(三合一) | 仅作为前后端**通信邮箱**:`submissions/<sid>.json`(pending 入箱)+ `public/queue.json`(Worker 镜像快照)+ `results/<task>/<team>_best.json` |
13
+ | 用户 ZIP 下载落地 | `/tmp/<sid>/...` | **`/tmp` 内的临时目录**(评测后立即删),与 `/data` 严格隔离 |
14
+ | 仓库划分 | 单仓库(Frontend + Worker 代码共存) | **本仓库 = Frontend Space 专用**;Worker 代码在独立仓库 |
15
+ | 队列扫描 | Frontend 自己 list + 逐文件 download 全部 `submissions/*.json` + `*.progress.json` | Worker 每个 tick 写一份 `public/queue.json`;**Frontend 只下载这一个文件** |
16
+
17
+ 详见 §11.5 (cross-Space `/data` 隔离) 与 §10 (代码结构)。
18
+
19
+ ---
20
+
21
+ ## 0.4. v0.4 关键变化(相对 v0.3)
22
+
23
+ | 变化点 | v0.3 | v0.4 |
24
+ |---|---|---|
25
+ | 评测粒度 | 一次提交 = 一个 ZIP,整体评测 | **一次提交 = N 个 scene**;每个 scene 一个 ZIP,**逐 scene** 下载/评测/回写 |
26
+ | 用户身份 | 自填 `team_name` | **以 `user_dataset` 的 owner 作为身份 ID**;`display_name` 仅用于展示 |
27
+ | GT 组织 | 单文件 `ground_truth/<gt_file>` | **按 scene 分目录** `ground_truth/<scene_id>/...`;任务通过 `task_manifest/<task>.json` 声明所用 scene 子集 |
28
+ | 断点续跑 | 无;Worker 死掉 → submission 卡 `running` | **Worker 重启即续**,scene 粒度。新增 `submissions/<sid>.progress.json` |
29
+ | 失败租约 | 无 | `running` 带 `lease_expires_at`,超时可被重新接管 |
30
+ | 日志膨胀 | `logs/execution_logs.jsonl` 单文件 | 按月分片 + 大小轮转;done 记录定期归档 |
31
+ | 多 Worker | 单 Worker | 支持横向多开(CAS 防抢同一 sid),可选 |
32
+ | 前端刷新 | 手动按钮 | Queue tab 上加 `gr.Timer(15s)` 自动刷新 |
33
+
34
+ ---
35
+
36
+ ## 1. 三件套架构
37
+
38
+ > **v0.5 读者注意**:以下 ASCII 拓扑仍描述 v0.4 时 dataset 【代三职】(GT / progress / logs / archive 都在 dataset)的状态。**v0.5 后**这些物住都迁到 Worker `/data`,dataset 仅保留 `submissions/<sid>.json` / `public/queue.json` / `results/`。详见 §0 与 §11.5。
39
+
40
+ ```
41
+ ┌──────────────────────────────────────────────────────────────────────┐
42
+ │ 用户浏览器 │
43
+ └──────────────────────────┬───────────────────────────────────────────┘
44
+
45
+ ┌──────────────────────────────────────────────────────────────────────┐
46
+ │ ① Frontend Space (PUBLIC, Gradio) │
47
+ │ vLAR/PhysInOne-Leaderboard │
48
+ │ - 提交表单:display_name / task / user_dataset │
49
+ │ - 写 submissions/<sid>.json (status=pending) 到 ③ │
50
+ │ - 读 results/ + submissions/ + progress/ 渲染榜单 / 队列 / 进度 │
51
+ └──────────────────────────┬───────────────────────────────────────────┘
52
+ │ huggingface_hub API
53
+
54
+ ┌──────────────────────────────────────────────────────────────────────┐
55
+ │ ③ Private Dataset (PRIVATE) — 唯一持久层 / 通信总线 │
56
+ │ vLAR/PhysInOne-Leaderboard (HF dataset, PRIVATE) │
57
+ │ submissions/<sid>.json ← 任务表 (pending/running/done/failed) │
58
+ │ submissions/<sid>.progress.json ← scene 级中间结果(CAS 整体替换) │
59
+ │ task_manifest/<task>.json ← 该任务所用 scene_id 列表 │
60
+ │ ground_truth/<scene_id>/... ← 按 scene 组织��真值 │
61
+ │ results/<task>/<team_id>_best.json ← 历史最佳快照(聚合后) │
62
+ │ logs/exec-YYYY-MM[.partNN].jsonl ← 日志,按月分片 + 大小轮转 │
63
+ │ archive/submissions-YYYY-MM/ ← 老的 done/failed 归档 │
64
+ └──────────────────────────▲───────────────────────────────────────────┘
65
+ │ huggingface_hub API(读+写)
66
+ ┌──────────────────────────┴───────────────────────────────────────────┐
67
+ │ ② Worker Space (PRIVATE, Docker, 纯后端:APScheduler + stdlib HTTP) │
68
+ │ vLAR/PhysInOne-Leaderboard-Worker │
69
+ │ - 启动:扫 running 且 lease 过期 → 接管(断点续跑) │
70
+ │ - tick:扫 pending → CAS 接管 (running + lease) │
71
+ │ - 对待办 scene 列表逐个: │
72
+ │ hf_hub_download(user_dataset, <scene_id>.zip) → /tmp │
73
+ │ unzip → task.evaluate_scene(scene_dir, gt) → metrics │
74
+ │ CAS 追加到 progress.json(含 lease 续期)→ 删 /tmp │
75
+ │ - 所有 scene 处理完毕 → 聚合 → status=done + 更新 results/ │
76
+ │ - GET :7860/status 返回 JSON(worker_id / in_flight / 状态) │
77
+ └──────────────────────────────────────────────────────────────────────┘
78
+ ```
79
+
80
+ ### 1.1 设计原则
81
+ - **③ 是唯一通信总线**:① 与 ② 之间不直接通信,全部走 ③。
82
+ - **scene 是评测的最小原子单位**:所有任务都按 scene 切片,天然支持断点续跑、流式进度、可选并行。
83
+ - **Worker 假定随时被杀**:无内存状态依赖;进程死亡 → lease 过期 → 重启 / 另一个 Worker 自动接管。
84
+ - **存储分离**:用户的大 ZIP 留在用户自己的 HF dataset;我们仅记录指针。
85
+ - **不归档用户数据**:评测完单 scene 立刻删 `/tmp`;不缓存用户上传内容。
86
+ - **身份 = dataset owner**:天然防撞名、防冒充,无需 OAuth。
87
+
88
+ ---
89
+
90
+ ## 2. 三件套资源清单
91
+
92
+ | # | 资源 | 类型 | 可见性 | 名称 | 状态 |
93
+ |---|---|---|---|---|---|
94
+ | 1 | Frontend Space | Space (Gradio SDK) | **PUBLIC** | `vLAR/PhysInOne-Leaderboard` | ✅ 已建 |
95
+ | 2 | Worker Space | Space (**Docker SDK,纯后端**) + **persistent `/data`** | **PRIVATE** | [`vLAR/PhysInOne-Leaderboard-Worker`](https://huggingface.co/spaces/vLAR/PhysInOne-Leaderboard-Worker) | ✅ 已建 |
96
+ | 3 | Private Dataset (**通信邮箱**) | Dataset | **PRIVATE** | `vLAR/PhysInOne-Leaderboard` | ✅ 已建 |
97
+
98
+ ### 2.1 Secrets
99
+
100
+ | Space | Secret 名 | 权限要求 |
101
+ |---|---|---|
102
+ | Frontend | `HF_TOKEN` | 对 ③ write |
103
+ | Worker | `HF_TOKEN` | 对 ③ **write** + 对用户 dataset **read** |
104
+
105
+ ### 2.2 Worker 入口
106
+
107
+ Worker Space 使用 **`sdk: docker`**(不是 gradio),README YAML:
108
+
109
+ ```yaml
110
+ sdk: docker
111
+ app_port: 7860
112
+ ```
113
+
114
+ 根目录提供 [`Dockerfile.worker`](Dockerfile.worker)(HF Spaces 会自动识别 `Dockerfile` 名;如使用 `.worker` 后缀需在 Space README 中显式 `dockerfile_path:`)。
115
+
116
+ Worker 进程:
117
+ - APScheduler 后台循环 `tick()` / `archive` / `restart_space`
118
+ - stdlib `http.server` 在 `:7860/status` 暴露 JSON 健康状态(HF Spaces 需要进程监听端口才不会被判定为崩溃)
119
+ - 不依赖 Gradio、不渲染任何 UI;所有可视化都在 Frontend Space
120
+
121
+ > **关于 "HF 有 bucket 吗"**:HF **没有 S3 风格 bucket**。只有 model / dataset / space 三种 git repo(大文件走 git-lfs)。我们的 dataset 同时承担"对象存储 + 状态库 + 日志"三个角色——因此 §9 专门定义**抗膨胀策略**。
122
+
123
+ ---
124
+
125
+ ## 3. 用户身份模型
126
+
127
+ | 概念 | 来源 | 用途 | 是否可变 |
128
+ |---|---|---|---|
129
+ | `team_id` | `user_dataset.split("/")[0]` (HF account / org) | 历史最佳去重 key、results 路径 | **不可变** |
130
+ | `display_name` | 表单填写 | 榜单展示文本 | 可变;每次提交覆盖 |
131
+
132
+ 榜单显示 `Awesome Team (userA)`,排序键是 `team_id`。
133
+
134
+ > 彻底解决"两个用户都叫 Team_A 互相覆盖"的问题,且**无需 OAuth**——dataset 的 owner 是 HF 强保证的。
135
+
136
+ ---
137
+
138
+ ## 4. 用户 dataset 约定结构
139
+
140
+ ```
141
+ <user_dataset>/ ← 用户自己的 HF dataset (建议 public)
142
+ ├── README.md ← HF dataset card
143
+ └── predictions/
144
+ └── <task_name>/
145
+ ├── scene_001.zip
146
+ ├── scene_002.zip
147
+ └── ... ← 每个 scene 一个 ZIP
148
+ ```
149
+
150
+ - 用户**只��为想刷的 task 准备 ZIP**;未提交的 scene 在 `progress.json` 里记 `missing`,并体现在聚合 metric 里(按聚合策略,比如缺失算 0 分 / 跳过等)。
151
+ - 单 scene ZIP 推荐 < 200MB,上限可配。
152
+ - ZIP 内部结构由各 task 的 `expected_scene_layout` 自描述(§6)。
153
+
154
+ ---
155
+
156
+ ## 5. ③ 的数据 Schema
157
+
158
+ > **v0.5 读者注意**:下述 5.1–5.7 描述 v0.4 dataset 全量存储的布局。**v0.5 下**:
159
+ > * `5.1 submissions/<sid>.json` 仅在 pending 阶段存于 dataset;Worker 接管后迁到 `/data/inflight/<sid>.json` 并从 dataset 删除。Frontend 提交时只写 envelope(`submission_id` / `team_id` / `display_name` / `task_name` / `user_dataset` / `submitted_at` / `status=pending` / `primary_metric`)。
160
+ > * `5.2 progress.json` / `5.3 task_manifest/` / `5.4 ground_truth/` / `5.6 logs/` / `5.7 archive/` 在 dataset 中**不再存在**——等价文件由 Worker 在 `/data` 下本地维护。
161
+ > * 新增 `5.8 public/queue.json`——Worker 每 tick 重写的 UI 快照。详见 §11.5。
162
+
163
+ ### 5.1 `submissions/<sid>.json` — 提交记录
164
+
165
+ ```json
166
+ {
167
+ "submission_id": "8e5f6g7h",
168
+ "team_id": "userA",
169
+ "display_name": "Awesome Team",
170
+ "task_name": "task_future_prediction",
171
+ "user_dataset": "userA/my-results",
172
+ "submitted_at": "2026-05-20T10:00:00Z",
173
+ "status": "pending",
174
+ "started_at": null,
175
+ "completed_at": null,
176
+ "lease_expires_at": null,
177
+ "primary_metric": "psnr",
178
+ "score": null,
179
+ "metrics": null,
180
+ "scenes_total": 50,
181
+ "scenes_done": 0,
182
+ "scenes_failed": 0,
183
+ "error_message": null,
184
+ "worker_attempt": 0
185
+ }
186
+ ```
187
+
188
+ **状态机**:
189
+ ```
190
+ [pending] ──CAS claim──▶ [running] ──all scenes settled──▶ [done]
191
+
192
+ ├──fatal err──▶ [failed]
193
+
194
+ └──lease expired──▶ 仍是 [running]
195
+ ↑ │
196
+ └────CAS reclaim─┘ 另一 worker / 重启接管
197
+ ```
198
+
199
+ ### 5.2 `submissions/<sid>.progress.json` — scene 级中间结果
200
+
201
+ ```json
202
+ {
203
+ "submission_id": "8e5f6g7h",
204
+ "scenes_total": 50,
205
+ "scene_results": {
206
+ "scene_001": {"metrics": {"psnr": 31.2}, "ts": "2026-05-20T10:01:00Z"},
207
+ "scene_002": {"metrics": {"psnr": 29.8}, "ts": "2026-05-20T10:02:00Z"}
208
+ },
209
+ "scene_failures": {
210
+ "scene_003": "missing: scene_003.zip in user dataset"
211
+ },
212
+ "last_worker": "worker-<uuid>",
213
+ "last_updated_at": "2026-05-20T10:02:00Z"
214
+ }
215
+ ```
216
+
217
+ > 每完成一个 scene → CAS 覆盖写。"一个 scene 一次 commit",但所有 scene 数据都在 **同一文件**——不会把目录撑爆。
218
+
219
+ ### 5.3 `task_manifest/<task>.json`
220
+
221
+ ```json
222
+ {
223
+ "task_name": "task_future_prediction",
224
+ "scene_ids": ["scene_001", "scene_002", "..."],
225
+ "primary_metric": "psnr",
226
+ "scene_layout": "predictions/<task>/<scene_id>.zip"
227
+ }
228
+ ```
229
+
230
+ 由任务负责人维护;Worker 启动每个 submission 时读取,决定要拉哪些 scene。
231
+
232
+ ### 5.4 `ground_truth/<scene_id>/...`
233
+
234
+ 任务负责人上传。Worker 按需 `hf_hub_download(filename=f"ground_truth/{scene_id}/...")`,配 **进程内 LRU**(容量可配,例如 16 个 scene),避免重复下载。
235
+
236
+ ### 5.5 `results/<task>/<team_id>_best.json`
237
+
238
+ ```json
239
+ {
240
+ "team_id": "userA",
241
+ "display_name": "Awesome Team",
242
+ "task_name": "task_future_prediction",
243
+ "score": 32.41,
244
+ "primary_metric": "psnr",
245
+ "metrics": {"psnr": 32.41, "ssim": 0.91, "lpips": 0.12},
246
+ "scenes_count": 50,
247
+ "updated_at": "2026-05-20T10:05:00Z",
248
+ "submission_id": "8e5f6g7h"
249
+ }
250
+ ```
251
+
252
+ 默认聚合:各 scene metric 算术平均。任务可以在 plugin 内重写 `aggregate_fn`。
253
+
254
+ ### 5.6 `logs/exec-YYYY-MM[.partNN].jsonl` — 日志
255
+
256
+ - 按 **年-月** 切文件;当月超过 50MB 自动 `.part02.jsonl` / `.part03.jsonl`。
257
+ - 仅 append(CAS 读 → 拼接 → 写)。
258
+ - 旧月份只读,不再触碰。
259
+
260
+ ### 5.7 `archive/submissions-YYYY-MM/`
261
+
262
+ 定时任务把 30 天前的 `done` / `failed` `submissions/<sid>.{json,progress.json}` 移到归档目录,主目录保持小而快。
263
+
264
+ ### 5.8 `public/queue.json` — Worker → Frontend 镜像(**v0.5 新增**)
265
+
266
+ ```json
267
+ {
268
+ "generated_at": "2026-05-20T10:05:00Z",
269
+ "worker_id": "worker-<uuid>",
270
+ "rows": [
271
+ {
272
+ "Submitted At": "2026-05-20T10:00:00Z",
273
+ "ID": "8e5f6g7h",
274
+ "Team": "Awesome Team",
275
+ "Owner": "userA",
276
+ "Task": "task_future_prediction",
277
+ "Status": "running",
278
+ "Position": "running",
279
+ "Scenes": "23/50 (+1 fail)",
280
+ "Score": null,
281
+ "Error": ""
282
+ }
283
+ ]
284
+ }
285
+ ```
286
+
287
+ - Worker 每个 tick 把 `/data` 中所有进行中 + 最近 N 条已完成的 submission 序列化进这一个文件,CAS 覆盖写。
288
+ - Frontend `src/leaderboard/read_evals.py::load_queue_df` 只下载这一个文件,按 `Submitted At` 倒序展示。
289
+ - 字段语义与 §7.3 `Position` 计算保持一致。
290
+
291
+ ---
292
+
293
+ ## 6. 任务插件接口(v0.4 调整)
294
+
295
+ ```python
296
+ @dataclass
297
+ class TaskPlugin:
298
+ name: str
299
+ display_name: str
300
+ description: str
301
+ primary_metric: str
302
+ higher_is_better: bool = True
303
+ leaderboard_columns: List[str] = field(default_factory=list)
304
+
305
+ # scene 级评测的契约 -------------------------------------------------
306
+ expected_scene_layout: str # 给参赛者看的单 scene ZIP 内部说明
307
+ validate_scene_fn: Callable[[str], None] # validate(scene_sandbox_dir)
308
+ evaluate_scene_fn: Callable[[str, Any], Dict[str, float]]
309
+ # evaluate(scene_sandbox_dir, gt_scene) -> metrics
310
+ load_gt_scene_fn: Callable[[str, str], Any]
311
+ # (scene_id, gt_dir) -> in-memory GT object
312
+
313
+ # 聚合 -------------------------------------------------------------
314
+ aggregate_fn: Optional[Callable[[Dict[str, Dict[str, float]]], Dict[str, float]]] = None
315
+ # 默认按 metric key 算术平均;任务可重写(e.g. 加权 / 取中位 / 缺失补零)
316
+ ```
317
+
318
+ > **取消** v0.3 的整体 `validate(sandbox_dir)` / `evaluate(sandbox_dir, gt)`,现在以 scene 为最小单元。
319
+
320
+ ---
321
+
322
+ ## 7. Worker 处理流程(含断点续跑)
323
+
324
+ ```python
325
+ # ====== boot ======
326
+ on_startup():
327
+ # 接管所有 lease 过期的 running(前任 worker 死了)
328
+ for sub in list_submissions(status="running"):
329
+ if sub.lease_expires_at < now:
330
+ CAS_reclaim(sub)
331
+
332
+ # ====== periodic tick (每 POLL_INTERVAL_SECONDS) ======
333
+ tick():
334
+ for sub in list_submissions(status="pending", order_by="submitted_at"):
335
+ if CAS_claim(sub): # pending → running + lease
336
+ dispatch(sub)
337
+
338
+ # ====== dispatch (per submission) ======
339
+ dispatch(sub):
340
+ manifest = read task_manifest/<sub.task>.json
341
+ progress = read submissions/<sub.sid>.progress.json (or empty)
342
+ settled = set(progress.scene_results) | set(progress.scene_failures)
343
+ todo = [s for s in manifest.scene_ids if s not in settled]
344
+
345
+ for scene_id in todo:
346
+ if shutdown_requested(): return # 优雅退出;下次启动续跑
347
+ try:
348
+ zip_path = hf_hub_download(
349
+ sub.user_dataset, f"predictions/{sub.task}/{scene_id}.zip"
350
+ )
351
+ with TempDir() as sandbox:
352
+ safe_extract_zip(zip_path, sandbox)
353
+ task.validate_scene(sandbox)
354
+ gt = lru_load_gt_scene(sub.task, scene_id)
355
+ metrics = task.evaluate_scene(sandbox, gt)
356
+ CAS_progress_add_result(sub.sid, scene_id, metrics) # 同时续 lease
357
+ except EntryNotFoundError:
358
+ CAS_progress_add_failure(sub.sid, scene_id, "missing zip")
359
+ except Exception as e:
360
+ CAS_progress_add_failure(sub.sid, scene_id, sanitize(e))
361
+ finally:
362
+ try: os.remove(zip_path)
363
+ except: pass
364
+
365
+ # 全部 settled → 聚合并 finalize
366
+ finalize(sub):
367
+ agg = task.aggregate(progress.scene_results)
368
+ CAS_update_submission(status="done", score=agg[primary], metrics=agg, ...)
369
+ CAS_update_best_if_improved(sub.team_id, sub.task, agg)
370
+ append_log(...)
371
+ ```
372
+
373
+ ### 7.1 Lease 机制
374
+ - Claim 时写 `lease_expires_at = now + LEASE_SECONDS`(默认 5min)。
375
+ - 每完成一个 scene → 续到 `now + LEASE_SECONDS`。
376
+ - 其他 worker tick 时若发现 `running && lease_expires_at < now` → CAS 改回自己接管。
377
+
378
+ ### 7.2 多 Worker
379
+ - 直接多开 Worker Space 实例即可;CAS 保证不会两个 worker 抢同一 sid(claim 失败)。
380
+ - 单个 worker 内通过 `ThreadPoolExecutor(max_workers=WORKER_CONCURRENCY)` 做**任务间并行**(默认 2);同一 sid 由 `_in_flight` set + CAS 双保险防重入。
381
+ - 任务**内**并行(一个 submission 多 scene 同时跑)当前**不做**;HF Space CPU 配额有限,意义不大。
382
+
383
+ ### 7.4 流程图(含断点续跑路径)
384
+
385
+ ```mermaid
386
+ flowchart TD
387
+ BOOT([Space 启动]) --> BR[boot_reclaim:<br/>扫 status=running<br/>且 lease 已过期]
388
+ BR -- "CAS 续 lease<br/>worker_attempt++" --> DISPATCH
389
+
390
+ SCHED([APScheduler tick<br/>每 POLL_INTERVAL_SECONDS]) --> SNAP
391
+ SNAP[snapshot_download<br/>submissions/*.json] --> PEND{有 pending?}
392
+ PEND -- 否 --> SCHED
393
+ PEND -- "是<br/>FIFO by submitted_at" --> CAS1
394
+
395
+ CAS1[/"CAS: pending → running<br/>lease_expires_at = now+5min"/]
396
+ CAS1 -- "409/412 别人抢先" --> PEND
397
+ CAS1 -- OK --> DISPATCH
398
+
399
+ DISPATCH[ThreadPoolExecutor<br/>max=WORKER_CONCURRENCY]
400
+ DISPATCH --> PM
401
+
402
+ subgraph process_submission["process_submission(sid) ◇ 可并发多个 sid"]
403
+ direction TB
404
+ PM[读 task_manifest<br/>scene_ids 列表] --> PI[load_progress<br/>**断点恢复入口**]
405
+ PI --> LOOP{遍历 scene_ids}
406
+ LOOP -- "已在 scene_results<br/>或 scene_failures" --> LOOP
407
+ LOOP -- 未处理 --> DL
408
+
409
+ DL[hf_hub_download<br/>user_dataset 中的 scene.zip]
410
+ DL -- 404/403 --> FAIL
411
+ DL -- OK --> VZ
412
+ VZ[validate_zip_filesize<br/>safe_extract_zip → sandbox]
413
+ VZ -- 超限/路径穿越 --> FAIL
414
+ VZ -- OK --> EVAL
415
+ EVAL[task.validate_scene<br/>get_gt_scene LRU 缓存<br/>task.evaluate_scene → metrics]
416
+ EVAL -- 异常 --> FAIL
417
+ EVAL -- OK --> OK1
418
+ OK1[add_scene_result<br/>CAS 写 progress.json] --> RENEW
419
+ FAIL[add_scene_failure<br/>CAS 写 progress.json] --> RENEW
420
+ RENEW[renew_lease<br/>CAS 续 lease +5min] --> LOOP
421
+
422
+ LOOP -- 全部 settled --> AGG
423
+ AGG[aggregate per-scene metrics] --> FIN
424
+ FIN[finalize_submission<br/>status=done/failed<br/>清空 lease]
425
+ FIN --> BEST[CAS update<br/>results/task/team_best.json<br/>仅当更优]
426
+ BEST --> LOG[append_log<br/>logs/exec-YYYY-MM.jsonl]
427
+ end
428
+
429
+ LOG --> SCHED
430
+
431
+ %% 优雅退出
432
+ SIG([SIGTERM/SIGINT]) -.-> SD[request_shutdown<br/>设置 _shutdown_event]
433
+ SD -.-> LOOP
434
+ LOOP -.-> |"shutdown_requested<br/>直接 return"| EXIT([进程退出<br/>status 保持 running<br/>下次 boot_reclaim 接管])
435
+ ```
436
+
437
+ **断点恢复的关键路径**(图中标 `**断点恢复入口**`):`load_progress` 读到的 `scene_results` / `scene_failures` 都是历史落地结果;`LOOP` 跳过任何已 settled 的 scene,所以同一 sid 被任何 worker 重入都是幂等的。
438
+
439
+ ### 7.3 用户视角的"前面还有几个 pending"
440
+ 前端 Queue 表为每条记录额外算一列 `Position`:
441
+ ```
442
+ position(my) = #(other pending with submitted_at < my.submitted_at) + #(all running)
443
+ ```
444
+ 并对单 submission 渲染"已完成 X / N scene"进度条(读 progress.json)。
445
+
446
+ ---
447
+
448
+ ## 8. 安全
449
+
450
+ | 风险 | 处置 |
451
+ |---|---|
452
+ | ZIP 炸弹(per scene) | 条目/字节双限 + 流式校验 |
453
+ | 路径穿越 / 软链接 | 全部拒绝 |
454
+ | 用户 dataset 不存在 / 文件缺失 | 单 scene 级 `scene_failures`,不影响其他 scene |
455
+ | GT 泄露 | scene 评测异常 → 仅写 "internal evaluation error",traceback 仅入 worker 日志 |
456
+ | 并发冲突 | per-file CAS(`parent_commit`);单 sid 只有一个 worker 持 lease |
457
+ | Worker 崩溃 | lease 过期机制 + scene 级幂等续跑 |
458
+ | Token 泄露 | 仅 Space Secrets,不进入任何日志 |
459
+ | 撞名 / 冒名 | `team_id = dataset owner` 替代自填名 |
460
+ | 输入校验(前端) | 正则:`user_dataset` 形如 `user/repo`,`display_name` 长度 2-32,`task_name` 在已注册任务里 |
461
+
462
+ ---
463
+
464
+ ## 9. 抗膨胀策略(HF dataset = git repo 的现实约束)
465
+
466
+ > 没有 bucket,所以我们必须主动管理 commit 历史和文件数。
467
+
468
+ | 策略 | 实施 |
469
+ |---|---|
470
+ | 单文件 CAS 覆盖 | `progress.json` 一份文件承载一个 submission 的所有 scene 结果,**不**每 scene 单独存文件 |
471
+ | 日志按月切片 | 主路径 `logs/exec-YYYY-MM.jsonl`,跨月新建 |
472
+ | 日志大小轮转 | 当月文件 > 50MB → `exec-YYYY-MM.part02.jsonl` |
473
+ | 主目录归档 | cron 把 30 天前的 done/failed `submissions/<sid>.*` 移到 `archive/submissions-YYYY-MM/` |
474
+ | 短 commit message | 便于按 path 过滤 git log |
475
+ | Frontend 读 ③ | 走 HF CDN,缓存层会吸收绝大多数读流量 |
476
+ | 紧急退路(v0.5 再做) | 把 logs/archive 迁到 Cloudflare R2 / Backblaze B2;③ 只保留权威结果与最佳快照 |
477
+
478
+ ---
479
+
480
+ ## 10. 代码结构
481
+
482
+ 自 v0.5 起,**本仓库 = Frontend Space 专用**,Worker 在独立仓库 `vLAR/PhysInOne-Leaderboard-Worker`。
483
+
484
+ ### 10.1 本仓库(Frontend Space)
485
+
486
+ ```
487
+ .
488
+ ├── app.py # Gradio 入口(Leaderboard / Submit / Queue / About)
489
+ ├── requirements.txt # gradio + huggingface-hub + pandas + apscheduler
490
+ ├── DESIGN.md
491
+ ├── README.md
492
+ └── src/
493
+ ├── envs.py # 仅前端配置:仓库 ID + 邮箱路径 + 表单正则
494
+ ├── about.py # UI 文案
495
+ ├── populate.py # UI 层薄包装
496
+ ├── display/ # CSS / 表格格式化
497
+ ├── tasks/ # 任务清单(前端用其 display_name / primary_metric)
498
+ │ ├── base.py
499
+ │ ├── _template/
500
+ │ ├── task_future_prediction/
501
+ │ └── task_property_estimation/
502
+ ├── submission/
503
+ │ └── frontend.py # 校验 + 把 pending submission 写入邮箱
504
+ ├── storage/
505
+ │ └── hub.py # 极简 HF I/O:download_file / list / upload_json
506
+ └── leaderboard/
507
+ └── read_evals.py # 读 results/ + 读 public/queue.json 快照
508
+ ```
509
+
510
+ **已删除(v0.4 → v0.5)**:`worker.py`、`Dockerfile.worker`、`requirements.worker.txt`、`src/worker/`、`src/storage/progress.py`、`src/storage/archive.py`、`src/submission/check_validity.py`。这些都搬去 Worker 仓库了。
511
+
512
+ ### 10.2 Worker 仓库(另一个 repo)
513
+
514
+ ```
515
+ .
516
+ ├── worker.py # stdlib http.server + APScheduler
517
+ ├── Dockerfile # HF Spaces Docker SDK 入口
518
+ ���── requirements.txt # huggingface-hub + apscheduler(无 gradio)
519
+ └── src/
520
+ ├── envs.py # /data 路径 + lease/poll/limit 全套配置
521
+ ├── worker/
522
+ │ ├── loop.py # tick + reclaim;扫 dataset 的 submissions/ 入箱
523
+ │ ├── runner.py # scene 流水线(写 /data,下载到 /tmp)
524
+ │ └── lease.py # 基于 /data 本地文件的 lease
525
+ ├── storage/
526
+ │ ├── local.py # /data 上的 progress / lease / archive
527
+ │ ├── mirror.py # 每 tick 写 public/queue.json + results/ 到 dataset
528
+ │ └── gt.py # /data 上常驻 GT,无需重复下载
529
+ └── submission/
530
+ └── check_validity.py # 安全解压(在 /tmp 中操作)
531
+ ```
532
+
533
+ ---
534
+
535
+ ## 11. 设计决策 FAQ
536
+
537
+ 本节记录开发过程中对架构设计的反复推敲,便于后续维护者理解"为什么不这样做"。
538
+
539
+ ### 11.1 HF dataset 能修改文件,还是只能增删?
540
+
541
+ **能修改(原地覆盖)。** HF dataset 本质是 git 仓库,`upload_file` 支持对同一路径重复写入(每次产生一个新 commit 覆盖旧内容)。本系统"状态迁移"的实现就是对 `submissions/<sid>.json` 反复调用 `upload_file` 改 `status` 字段。唯一现实约束:每次写都是一次 git push(HTTP 请求),频率不宜过高。
542
+
543
+ ### 11.2 为什么不用"多队列"设计(4 个独立列表文件)?
544
+
545
+ 一个直觉上更"清晰"的设计是维护 4 个列表文件:
546
+
547
+ ```
548
+ pending_queue.json [sid, sid, ...]
549
+ processing_list.json [sid, sid]
550
+ complete_list.json [sid (done), sid (failed), ...]
551
+ status/<sid>.json 详细进度
552
+ ```
553
+
554
+ **问题根源:原子性缺失。**
555
+
556
+ "把 sid 从 pending 移到 processing"需要两步:
557
+ 1. 改 `pending_queue.json`(删掉 sid)
558
+ 2. 改 `processing_list.json`(加入 sid)
559
+
560
+ 两步之间若 Worker 崩溃 / 并发写,sid 可能凭空消失或被双重领取。要修复就需要分布式事务——在 git 存储上实现代价极高。
561
+
562
+ **此外还有写放大问题:** `complete_list.json` 随时间无限膨胀;每次追加一条都要"读全表 + append + 写回",写入量随历史提交数线性增长(O(N²) 写操作)。
563
+
564
+ 当前设计把"4 个逻辑队列"压缩进每个 sid 的单一文件,用 `status` 字段区分视图,CAS 在单文件粒度上做原子操作,彻底避免了上述问题。
565
+
566
+ ### 11.3 当前架构用"四队列"语言重新描述
567
+
568
+ | 四队列概念 | 当前实现对应物 |
569
+ |---|---|
570
+ | pending queue | 所有 `status=="pending"` 的 `submissions/<sid>.json` |
571
+ | processing list | 所有 `status=="running"` 的 `submissions/<sid>.json` |
572
+ | complete list | 所有 `status in {"done","failed"}` 的 `submissions/<sid>.json`;30 天后归档至 `archive/submissions-YYYY-MM/` |
573
+ | processing task status | `submissions/<sid>.progress.json`(scene 级中间结果,CAS 整体替换) |
574
+
575
+ 操作映射:
576
+
577
+ | 四队列操作 | 当前实现 |
578
+ |---|---|
579
+ | append 到 pending queue | `upload_file("submissions/<sid>.json", {status: "pending"})` |
580
+ | 从 pending 取出加入 processing | CAS `status: "pending" → "running"` + 写 `lease_expires_at` |
581
+ | 处理完毕加到 complete list | CAS `status: "running" → "done"/"failed"` |
582
+ | 重启优先处理 processing | `boot_reclaim`:扫 `status=="running"` 且 lease 过期 |
583
+
584
+ `progress.json` 与主 `submissions/<sid>.json` 拆成两个文件的原因:前者每完成一个 scene 写一次(高频),后者只在三次状态转换时写(低频)。拆开后高频写不会污染低频文件的 commit history,也不会产生 CAS 伪冲突。
585
+
586
+ ### 11.4 Worker 扫描效率优化(已实现)
587
+
588
+ **原始设计**:每次 tick 调用 `snapshot_download(submissions/**)` 全量下载所有文件(包括历史 done/failed)。
589
+
590
+ **问题**:随历史提交增多,每次 tick 的 HTTP etag 检查数线性增长,大部分是无用的已完成任务。
591
+
592
+ **现有优化**(`src/worker/loop.py`):
593
+
594
+ ```
595
+ 维护内存集合 _settled_sids(done/failed 的 sid 集合)
596
+
597
+ tick():
598
+ API.list_repo_files() ← 1次 API 调用,只传文件名,无数据
599
+ 过滤 _settled_sids ← 纯内存
600
+ 对剩余 sid: hf_hub_download(force=False) ← etag 缓存,无变化不传数据
601
+ finalize() 时: _mark_settled(sid)
602
+ ```
603
+
604
+ 稳态下(大量历史 done/failed),实际数据下载量收敛为 O(pending + running),而不是 O(所有历史提交)。Worker 重启后 `_settled_sids` 清空,第一次 tick 重新下载所有文件并自动填充集合,之后恢复高效模式。
605
+
606
+ ### 11.5 跨 Space 的 `/data` 隔离 → 邮箱镜像策略(v0.5)
607
+
608
+ **事实**:HF Spaces 的 persistent storage 是**单 Space 私有**的——Frontend Space 看不到 Worker Space 的 `/data`。
609
+
610
+ **问题**:v0.5 把 GT / progress / lease / logs / archive 全搬进 Worker `/data` 之后,Frontend 怎么渲染队列?
611
+
612
+ **选择 A(采纳)**:**Worker 单向镜像**。Worker 每个 tick 把当前队列状态序列化成 `public/queue.json` 写回 HF dataset;Frontend 只需 `download_file("public/queue.json", force=False)`,etag 命中即零字节传输。
613
+
614
+ * 优点:保留 "③ 是唯一通信总线" 不变;前端代码退化为单文件读;Worker 离线时 Frontend 自动显示陈旧快照而不是报错。
615
+ * 缺点:队列最多滞后一个 tick(~30s)。可接受。
616
+
617
+ **选择 B(未采纳)**:让 Frontend HTTP 调 Worker 的 `:7860/queue` 端点。
618
+
619
+ * 缺点:Frontend → Worker Space 的私有访问要带 token,且 Worker 重启时前端会立刻 5xx;调试复杂度显著上升。
620
+
621
+ **邮箱(dataset)schema 收敛后**:
622
+
623
+ | 路径 | 写入方 | 读取方 | 备注 |
624
+ |---|---|---|---|
625
+ | `submissions/<sid>.json` | Frontend | Worker | 只在 pending 阶段存在;Worker 接管后立刻拷到 `/data/inflight/` 并从 dataset 删除 |
626
+ | `public/queue.json` | Worker | Frontend | 整张表的 UI 快照(含 status / progress / position) |
627
+ | `results/<task>/<team>_best.json` | Worker | Frontend | 历史最佳;仅在提分时被改写 |
628
+
629
+ 所有 v0.4 的 `submissions/<sid>.progress.json` / `task_manifest/` / `ground_truth/` / `logs/` / `archive/` 在 dataset 中**不再存在**——它们的等价物住在 Worker 的 `/data` 下。
630
+
631
+ ---
632
+
633
+ ## 12. ✅ TODO 总清单
634
+
635
+ ### A. 平台资源
636
+ - [x] Frontend Space `vLAR/PhysInOne-Leaderboard` — **Public**,Gradio SDK。
637
+ - [x] Private Dataset `vLAR/PhysInOne-Leaderboard` — **Private**。
638
+ - [x] Worker Space `vLAR/PhysInOne-Leaderboard-Worker` — **Private**,**Docker SDK**,**persistent `/data` 已挂载**。
639
+ - [ ] Worker Space → Secrets:`HF_TOKEN`(write 私有 dataset + read 用户 dataset)。
640
+ - [ ] Frontend Space → Secrets:`HF_TOKEN`(write 私有 dataset)。
641
+
642
+ ### B. 私有 dataset 初始化
643
+ - [ ] 上传占位 `submissions/.gitkeep` / `archive/.gitkeep` / `logs/.gitkeep`。
644
+ - [ ] 上传 `task_manifest/<task>.json`(每任务一份)。
645
+ - [ ] 上传 `ground_truth/<scene_id>/...`(任务负责人)。
646
+
647
+ ### C. 代码(v0.4 重写 — **本次待实施**)
648
+ - [ ] `src/tasks/base.py`:改为 scene 级接口(`evaluate_scene` / `aggregate_fn`)。
649
+ - [ ] `src/storage/progress.py`:progress.json 的 CAS helper(含 lease 续期)。
650
+ - [ ] `src/worker/lease.py`:claim / renew / reclaim。
651
+ - [ ] `src/worker/runner.py`:scene 流水线 + `SIGTERM` 优雅退出。
652
+ - [ ] `src/worker/loop.py`:tick + 启动钩子(接管过期 lease)。
653
+ - [ ] `src/storage/archive.py`:归档 + 日志轮转的 cron job。
654
+ - [ ] `src/submission/frontend.py`:去掉 `filename`,新增 `display_name`,以 owner 推 `team_id`。
655
+ - [ ] `src/leaderboard/read_evals.py`:加 progress + position 计算。
656
+ - [ ] `app.py`:表单字段调整 + 每 submission 进度条 + Timer 自动刷新。
657
+ - [ ] 现有 `_template` / 两个占位任务改为 scene 级。
658
+
659
+ ### D. 任务侧(合作者)
660
+ - [ ] 复制 `src/tasks/_template/`,实现 `validate_scene` / `evaluate_scene` / `load_gt_scene`。
661
+ - [ ] 上传 `task_manifest/<task>.json` 声明所用 scene_id。
662
+ - [ ] 上传 `ground_truth/<scene_id>/...`。
663
+ - [ ] 提供"用户 ZIP 内部结构"的 markdown 说明 (`expected_scene_layout`)。
664
+
665
+ ### E. 留白 / 待 user 确认
666
+ - [ ] **Worker GPU**:默认 CPU basic。
667
+ - [ ] **POLL_INTERVAL_SECONDS**:默认 30s。
668
+ - [ ] **LEASE_SECONDS**:默认 300s(5min)。
669
+ - [ ] **scene 失败自动重试**:默认不重试;运维可改 progress.json 删掉条目让 worker 续跑。
670
+ - [ ] **单 scene 评测超时**:默认无;如需,加 `multiprocessing` 子进程 timeout。
671
+ - [ ] **用户 dataset 允许 private**:默认仅 public。
672
+ - [ ] **多 Worker**:默认 1 个;按队列长度手动扩。
673
+ - [ ] **聚合策略**:默认算术平均(缺失 scene 跳过 / 算 0?待定)。
674
+ - [ ] **归档触发**:默认 done/failed 满 30 天后;定时 cron 由 worker 跑。
Dockerfile.worker DELETED
@@ -1,19 +0,0 @@
1
- # Worker Space — pure backend container.
2
- # Configure the HF Space with `sdk: docker` and this Dockerfile at repo root.
3
- FROM python:3.11-slim
4
-
5
- ENV PYTHONUNBUFFERED=1 \
6
- PIP_NO_CACHE_DIR=1 \
7
- PIP_DISABLE_PIP_VERSION_CHECK=1 \
8
- HF_HOME=/data/hf
9
-
10
- WORKDIR /app
11
-
12
- # Minimal runtime deps for the worker (no gradio).
13
- COPY requirements.worker.txt /app/requirements.worker.txt
14
- RUN pip install --no-cache-dir -r /app/requirements.worker.txt
15
-
16
- COPY . /app
17
-
18
- EXPOSE 7860
19
- CMD ["python", "worker.py"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
requirements.worker.txt DELETED
@@ -1,5 +0,0 @@
1
- APScheduler>=3.10
2
- huggingface-hub>=0.18.0
3
- pandas
4
- numpy
5
- python-dateutil
 
 
 
 
 
 
src/envs.py CHANGED
@@ -1,7 +1,15 @@
1
- """Central configuration for v0.4 (scene-level + lease)."""
 
 
 
 
 
 
 
 
 
2
  import os
3
  import re
4
- import uuid
5
 
6
  from huggingface_hub import HfApi
7
 
@@ -15,48 +23,18 @@ FRONTEND_REPO_ID = os.environ.get("HF_FRONTEND_SPACE", f"{OWNER}/{BENCHMARK_NAME
15
  WORKER_REPO_ID = os.environ.get("HF_WORKER_SPACE", f"{OWNER}/{BENCHMARK_NAME}-Worker")
16
  RESULTS_REPO = os.environ.get("HF_RESULTS_REPO", f"{OWNER}/{BENCHMARK_NAME}")
17
 
18
- # ---------- dataset internal paths -------------------------------------------
19
- DS_SUBMISSIONS_DIR = "submissions"
20
- DS_TASK_MANIFEST_DIR = "task_manifest"
21
- DS_GROUND_TRUTH_DIR = "ground_truth"
22
- DS_RESULTS_DIR = "results"
23
- DS_LOGS_DIR = "logs"
24
- DS_ARCHIVE_DIR = "archive"
25
-
26
- # ---------- local cache ------------------------------------------------------
27
- CACHE_PATH = os.getenv("HF_HOME", ".")
28
- DATA_CACHE_PATH = os.path.join(CACHE_PATH, "benchmark-cache")
29
- for _sub in ("submissions", "results", "manifest", "downloads", "gt"):
30
- os.makedirs(os.path.join(DATA_CACHE_PATH, _sub), exist_ok=True)
31
- SUBMISSIONS_CACHE_PATH = os.path.join(DATA_CACHE_PATH, "submissions")
32
- RESULTS_CACHE_PATH = os.path.join(DATA_CACHE_PATH, "results")
33
- MANIFEST_CACHE_PATH = os.path.join(DATA_CACHE_PATH, "manifest")
34
- DOWNLOADS_CACHE_PATH = os.path.join(DATA_CACHE_PATH, "downloads")
35
- GT_CACHE_PATH = os.path.join(DATA_CACHE_PATH, "gt")
36
-
37
- # ---------- size limits (per scene) ------------------------------------------
38
- MAX_SCENE_ZIP_SIZE = int(os.environ.get("MAX_SCENE_ZIP_SIZE", 500 * 1024 * 1024)) # 500 MB
39
- MAX_EXTRACTED_BYTES = int(os.environ.get("MAX_EXTRACTED_BYTES", 2 * 1024 * 1024 * 1024)) # 2 GB
40
- MAX_ZIP_ENTRIES = int(os.environ.get("MAX_ZIP_ENTRIES", 100000))
41
-
42
- # ---------- worker -----------------------------------------------------------
43
- POLL_INTERVAL_SECONDS = int(os.environ.get("POLL_INTERVAL_SECONDS", 30))
44
- LEASE_SECONDS = int(os.environ.get("LEASE_SECONDS", 300)) # 5 min
45
- WORKER_CONCURRENCY = int(os.environ.get("WORKER_CONCURRENCY", 2)) # 任务间并行度
46
- WORKER_ID = os.environ.get("WORKER_ID", f"worker-{uuid.uuid4().hex[:8]}")
47
- GT_LRU_SIZE = int(os.environ.get("GT_LRU_SIZE", 16))
48
-
49
- # ---------- log + archive ----------------------------------------------------
50
- LOG_FILE_MAX_BYTES = int(os.environ.get("LOG_FILE_MAX_BYTES", 50 * 1024 * 1024)) # 50 MB
51
- ARCHIVE_AFTER_DAYS = int(os.environ.get("ARCHIVE_AFTER_DAYS", 30))
52
- ARCHIVE_INTERVAL_SECONDS = int(os.environ.get("ARCHIVE_INTERVAL_SECONDS", 6 * 3600)) # 6 h
53
 
54
  # ---------- validation regex -------------------------------------------------
55
  DISPLAY_NAME_REGEX = re.compile(r"^[\w\-. ]{2,40}$", re.UNICODE)
56
  HF_REPO_ID_REGEX = re.compile(r"^[A-Za-z0-9_.\-]{1,96}/[A-Za-z0-9_.\-]{1,96}$")
57
- SCENE_ID_REGEX = re.compile(r"^[A-Za-z0-9_\-]{1,64}$")
58
 
59
  # ---------- CAS --------------------------------------------------------------
60
  CAS_MAX_RETRIES = 6
61
 
62
  API = HfApi(token=TOKEN)
 
 
1
+ """Central configuration for Frontend Space (v0.5).
2
+
3
+ NOTE: all worker-side state (progress / lease / GT cache / archive / logs)
4
+ lives on the Worker Space's persistent disk (`/data`). The HF dataset is now
5
+ only a thin **mailbox** between Frontend and Worker:
6
+
7
+ submissions/<sid>.json (Frontend writes; Worker ingests + deletes)
8
+ public/queue.json (Worker mirrors snapshot; Frontend reads)
9
+ results/<task>/<team>_best.json (Worker writes; Frontend reads)
10
+ """
11
  import os
12
  import re
 
13
 
14
  from huggingface_hub import HfApi
15
 
 
23
  WORKER_REPO_ID = os.environ.get("HF_WORKER_SPACE", f"{OWNER}/{BENCHMARK_NAME}-Worker")
24
  RESULTS_REPO = os.environ.get("HF_RESULTS_REPO", f"{OWNER}/{BENCHMARK_NAME}")
25
 
26
+ # ---------- dataset paths (mailbox surface only) -----------------------------
27
+ DS_SUBMISSIONS_DIR = "submissions" # Frontend write-only, Worker consumes
28
+ DS_RESULTS_DIR = "results" # Worker write, Frontend read
29
+ DS_PUBLIC_DIR = "public" # Worker write snapshot, Frontend read
30
+ DS_QUEUE_SNAPSHOT = f"{DS_PUBLIC_DIR}/queue.json"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
 
32
  # ---------- validation regex -------------------------------------------------
33
  DISPLAY_NAME_REGEX = re.compile(r"^[\w\-. ]{2,40}$", re.UNICODE)
34
  HF_REPO_ID_REGEX = re.compile(r"^[A-Za-z0-9_.\-]{1,96}/[A-Za-z0-9_.\-]{1,96}$")
 
35
 
36
  # ---------- CAS --------------------------------------------------------------
37
  CAS_MAX_RETRIES = 6
38
 
39
  API = HfApi(token=TOKEN)
40
+
src/leaderboard/read_evals.py CHANGED
@@ -1,15 +1,16 @@
1
- """Read leaderboards + queue with position + per-submission progress."""
2
  from __future__ import annotations
3
 
 
4
  import logging
5
  import os
6
- from typing import Dict, List, Optional
7
 
8
  import pandas as pd
9
 
10
  from src.envs import (
 
11
  DS_RESULTS_DIR,
12
- DS_SUBMISSIONS_DIR,
13
  )
14
  from src.storage.hub import download_file, list_repo_files_under
15
  from src.tasks.base import TaskPlugin
@@ -19,7 +20,6 @@ logger = logging.getLogger(__name__)
19
 
20
  def _load_remote_json(path_in_repo: str) -> Optional[dict]:
21
  """Download (etag-cached) and parse a JSON file from the private dataset."""
22
- import json
23
  local = download_file(path_in_repo, force=False)
24
  if local is None:
25
  return None
@@ -70,82 +70,29 @@ def load_task_leaderboard(task: TaskPlugin) -> pd.DataFrame:
70
  return df[["#", *cols]]
71
 
72
 
73
- # --------------------- queue with position + progress -----------------------
74
- def _scan_submissions() -> List[dict]:
75
- """Fetch all non-terminal submission JSONs via list + per-file download."""
76
- files = list_repo_files_under(DS_SUBMISSIONS_DIR)
77
- out: List[dict] = []
78
- for fpath in files:
79
- bn = os.path.basename(fpath)
80
- if bn.endswith(".progress.json") or not bn.endswith(".json"):
81
- continue
82
- data = _load_remote_json(fpath)
83
- if data:
84
- out.append(data)
85
- return out
86
-
87
-
88
- def _scan_progress() -> Dict[str, dict]:
89
- """Fetch all progress JSONs for submissions that are currently in-flight."""
90
- files = list_repo_files_under(DS_SUBMISSIONS_DIR)
91
- out: Dict[str, dict] = {}
92
- for fpath in files:
93
- if not fpath.endswith(".progress.json"):
94
- continue
95
- data = _load_remote_json(fpath)
96
- if data and data.get("submission_id"):
97
- out[data["submission_id"]] = data
98
- return out
99
 
100
 
101
  def load_queue_df(limit: int = 100) -> pd.DataFrame:
102
- subs = _scan_submissions()
103
- progs = _scan_progress()
104
-
105
- # position计算:在我之前还有几个 pending + 全部 running
106
- pending_sorted = sorted(
107
- [s for s in subs if s.get("status") == "pending"],
108
- key=lambda d: d.get("submitted_at", ""),
109
- )
110
- pending_pos_of: Dict[str, int] = {s["submission_id"]: i + 1
111
- for i, s in enumerate(pending_sorted)}
112
- running_total = sum(1 for s in subs if s.get("status") == "running")
113
 
114
- rows: List[dict] = []
115
- for s in subs:
116
- sid = s.get("submission_id", "")
117
- status = s.get("status", "?")
118
- prog = progs.get(sid, {})
119
- n_done = len(prog.get("scene_results", {})) or s.get("scenes_done", 0) or 0
120
- n_fail = len(prog.get("scene_failures", {})) or s.get("scenes_failed", 0) or 0
121
- n_total = prog.get("scenes_total") or s.get("scenes_total") or 0
122
-
123
- if status == "pending":
124
- position = pending_pos_of.get(sid, 0) + running_total
125
- position_str = f"#{position}"
126
- elif status == "running":
127
- position_str = "running"
128
- else:
129
- position_str = "—"
130
-
131
- progress_str = (f"{n_done}/{n_total}"
132
- + (f" (+{n_fail} fail)" if n_fail else ""))
133
-
134
- rows.append({
135
- "Submitted At": s.get("submitted_at"),
136
- "ID": sid,
137
- "Team": s.get("display_name") or s.get("team_id"),
138
- "Owner": s.get("team_id"),
139
- "Task": s.get("task_name"),
140
- "Status": status,
141
- "Position": position_str,
142
- "Scenes": progress_str,
143
- "Score": s.get("score"),
144
- "Error": (s.get("error_message") or "")[:120],
145
- })
146
  if not rows:
147
- return pd.DataFrame(columns=["Submitted At", "ID", "Team", "Owner", "Task",
148
- "Status", "Position", "Scenes", "Score", "Error"])
149
  df = pd.DataFrame.from_records(rows)
 
 
 
 
150
  df = df.sort_values(by="Submitted At", ascending=False, na_position="last").head(limit)
151
  return df.reset_index(drop=True)
 
1
+ """Read leaderboards + queue (v0.5: queue is a single Worker-mirrored snapshot)."""
2
  from __future__ import annotations
3
 
4
+ import json
5
  import logging
6
  import os
7
+ from typing import List, Optional
8
 
9
  import pandas as pd
10
 
11
  from src.envs import (
12
+ DS_QUEUE_SNAPSHOT,
13
  DS_RESULTS_DIR,
 
14
  )
15
  from src.storage.hub import download_file, list_repo_files_under
16
  from src.tasks.base import TaskPlugin
 
20
 
21
  def _load_remote_json(path_in_repo: str) -> Optional[dict]:
22
  """Download (etag-cached) and parse a JSON file from the private dataset."""
 
23
  local = download_file(path_in_repo, force=False)
24
  if local is None:
25
  return None
 
70
  return df[["#", *cols]]
71
 
72
 
73
+ # --------------------- queue (Worker-mirrored snapshot) ---------------------
74
+ _QUEUE_COLUMNS = ["Submitted At", "ID", "Team", "Owner", "Task",
75
+ "Status", "Position", "Scenes", "Score", "Error"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
 
77
 
78
  def load_queue_df(limit: int = 100) -> pd.DataFrame:
79
+ """Read the queue snapshot that the Worker mirrors at `public/queue.json`.
 
 
 
 
 
 
 
 
 
 
80
 
81
+ The Worker keeps the canonical state on its /data persistent disk and
82
+ rewrites this single file every tick. Frontend only needs one download.
83
+ """
84
+ snap = _load_remote_json(DS_QUEUE_SNAPSHOT)
85
+ if not snap or not isinstance(snap, dict):
86
+ return pd.DataFrame(columns=_QUEUE_COLUMNS)
87
+
88
+ rows = snap.get("rows") or []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
  if not rows:
90
+ return pd.DataFrame(columns=_QUEUE_COLUMNS)
91
+
92
  df = pd.DataFrame.from_records(rows)
93
+ for c in _QUEUE_COLUMNS:
94
+ if c not in df.columns:
95
+ df[c] = None
96
+ df = df[_QUEUE_COLUMNS]
97
  df = df.sort_values(by="Submitted At", ascending=False, na_position="last").head(limit)
98
  return df.reset_index(drop=True)
src/storage/archive.py DELETED
@@ -1,149 +0,0 @@
1
- """Log append (monthly + size rotation) + submission archive cron."""
2
- from __future__ import annotations
3
-
4
- import json
5
- import logging
6
- import os
7
- import time
8
- from datetime import datetime, timezone
9
- from typing import Optional
10
-
11
- from huggingface_hub import hf_hub_download
12
- from huggingface_hub.utils import EntryNotFoundError, HfHubHTTPError
13
-
14
- from src.envs import (
15
- ARCHIVE_AFTER_DAYS,
16
- CAS_MAX_RETRIES,
17
- DS_ARCHIVE_DIR,
18
- DS_LOGS_DIR,
19
- DS_SUBMISSIONS_DIR,
20
- LOG_FILE_MAX_BYTES,
21
- RESULTS_REPO,
22
- )
23
- from src.storage.hub import (
24
- API,
25
- delete_file,
26
- list_repo_files_under,
27
- read_json,
28
- upload_bytes,
29
- )
30
-
31
- logger = logging.getLogger(__name__)
32
-
33
-
34
- def _now() -> datetime:
35
- return datetime.now(timezone.utc)
36
-
37
-
38
- def _month_prefix() -> str:
39
- return _now().strftime("%Y-%m")
40
-
41
-
42
- def _candidate_log_paths(month: str) -> list[str]:
43
- """Existing log file candidates for this month, sorted ascending."""
44
- files = list_repo_files_under(DS_LOGS_DIR)
45
- pat = f"{DS_LOGS_DIR}/exec-{month}"
46
- return sorted(f for f in files if f.startswith(pat))
47
-
48
-
49
- def _resolve_log_target(month: str) -> tuple[str, bytes]:
50
- """Return (path, existing bytes) for current append target, applying rotation."""
51
- candidates = _candidate_log_paths(month)
52
- if not candidates:
53
- return (f"{DS_LOGS_DIR}/exec-{month}.jsonl", b"")
54
- latest = candidates[-1]
55
- try:
56
- local = hf_hub_download(repo_id=RESULTS_REPO, repo_type="dataset",
57
- filename=latest, force_download=True)
58
- with open(local, "rb") as f:
59
- body = f.read()
60
- except (EntryNotFoundError, HfHubHTTPError):
61
- body = b""
62
- if len(body) >= LOG_FILE_MAX_BYTES:
63
- # rotate to next part
64
- n_parts = sum(1 for c in candidates if ".part" in c) + 1
65
- return (f"{DS_LOGS_DIR}/exec-{month}.part{n_parts + 1:02d}.jsonl", b"")
66
- return (latest, body)
67
-
68
-
69
- def append_log(entry: dict) -> None:
70
- line = (json.dumps(entry, ensure_ascii=False) + "\n").encode("utf-8")
71
- month = _month_prefix()
72
- for attempt in range(CAS_MAX_RETRIES):
73
- path, body = _resolve_log_target(month)
74
- # parent SHA: relax — log append is best-effort.
75
- try:
76
- upload_bytes(path, body + line,
77
- f"append log: {entry.get('submission_id', '?')}")
78
- return
79
- except HfHubHTTPError as e:
80
- status = getattr(e.response, "status_code", None)
81
- if status in (409, 412):
82
- time.sleep(0.4 * (2 ** attempt))
83
- continue
84
- logger.warning("append_log failed: %s", e)
85
- return
86
- logger.warning("append_log retries exhausted")
87
-
88
-
89
- # ---------------------- archive cron ----------------------------------------
90
- def _parse_iso(s: Optional[str]) -> Optional[datetime]:
91
- if not s:
92
- return None
93
- try:
94
- return datetime.strptime(s, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
95
- except Exception:
96
- return None
97
-
98
-
99
- def archive_old_submissions() -> int:
100
- """Move done/failed submissions older than ARCHIVE_AFTER_DAYS to archive/.
101
-
102
- Implementation: download json -> upload under archive/ -> delete originals.
103
- Returns count archived.
104
- """
105
- cutoff_age = ARCHIVE_AFTER_DAYS * 86400
106
- now = _now()
107
- files = list_repo_files_under(DS_SUBMISSIONS_DIR)
108
- sids = set()
109
- for f in files:
110
- bn = os.path.basename(f)
111
- if bn.endswith(".json"):
112
- sids.add(bn.split(".")[0])
113
-
114
- moved = 0
115
- for sid in sids:
116
- sub_path = f"{DS_SUBMISSIONS_DIR}/{sid}.json"
117
- existing = read_json(sub_path)
118
- if existing is None:
119
- continue
120
- data, _sha = existing
121
- if data.get("status") not in ("done", "failed"):
122
- continue
123
- ts = _parse_iso(data.get("completed_at") or data.get("submitted_at"))
124
- if ts is None or (now - ts).total_seconds() < cutoff_age:
125
- continue
126
- month = ts.strftime("%Y-%m")
127
- archive_prefix = f"{DS_ARCHIVE_DIR}/submissions-{month}"
128
- try:
129
- # upload archived copies
130
- upload_bytes(
131
- f"{archive_prefix}/{sid}.json",
132
- json.dumps(data, ensure_ascii=False, indent=2).encode("utf-8"),
133
- f"archive {sid}",
134
- )
135
- prog_path = f"{DS_SUBMISSIONS_DIR}/{sid}.progress.json"
136
- prog = read_json(prog_path)
137
- if prog is not None:
138
- upload_bytes(
139
- f"{archive_prefix}/{sid}.progress.json",
140
- json.dumps(prog[0], ensure_ascii=False, indent=2).encode("utf-8"),
141
- f"archive progress {sid}",
142
- )
143
- # delete originals
144
- delete_file(sub_path, f"archive cleanup {sid}")
145
- delete_file(prog_path, f"archive cleanup {sid} progress")
146
- moved += 1
147
- except Exception as e:
148
- logger.warning("archive %s failed: %s", sid, e)
149
- return moved
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/storage/hub.py CHANGED
@@ -1,37 +1,29 @@
1
- """Generic CAS-based read/write helpers for the private dataset.
2
 
3
- All hub access (other than user-dataset downloads) goes through this module.
 
 
 
 
 
 
 
4
  """
5
  from __future__ import annotations
6
 
7
  import json
8
  import logging
9
- import os
10
- import time
11
  from io import BytesIO
12
- from typing import Any, Callable, Dict, List, Optional, Tuple
13
 
14
- from huggingface_hub import hf_hub_download, snapshot_download
15
- from huggingface_hub.utils import EntryNotFoundError, HfHubHTTPError, RepositoryNotFoundError
16
 
17
- from src.envs import (
18
- API,
19
- CAS_MAX_RETRIES,
20
- DATA_CACHE_PATH,
21
- RESULTS_REPO,
22
- )
23
 
24
  logger = logging.getLogger(__name__)
25
 
26
 
27
- def _parent_sha() -> Optional[str]:
28
- try:
29
- return API.dataset_info(repo_id=RESULTS_REPO).sha
30
- except Exception:
31
- return None
32
-
33
-
34
- # --------------------------- download helpers --------------------------------
35
  def download_file(path_in_repo: str, force: bool = True) -> Optional[str]:
36
  try:
37
  return hf_hub_download(
@@ -48,35 +40,7 @@ def download_file(path_in_repo: str, force: bool = True) -> Optional[str]:
48
  raise
49
 
50
 
51
- def read_json(path_in_repo: str) -> Optional[Tuple[dict, Optional[str]]]:
52
- local = download_file(path_in_repo)
53
- if local is None:
54
- return None
55
- try:
56
- with open(local, "r", encoding="utf-8") as f:
57
- return json.load(f), _parent_sha()
58
- except json.JSONDecodeError as e:
59
- logger.warning("malformed json at %s: %s", path_in_repo, e)
60
- return None
61
-
62
-
63
- def snapshot_subdir(allow_patterns: List[str],
64
- local_dir: str = DATA_CACHE_PATH) -> str:
65
- try:
66
- return snapshot_download(
67
- repo_id=RESULTS_REPO,
68
- repo_type="dataset",
69
- local_dir=local_dir,
70
- allow_patterns=allow_patterns,
71
- etag_timeout=30,
72
- )
73
- except (RepositoryNotFoundError, HfHubHTTPError) as e:
74
- logger.warning("snapshot_subdir failed for %s: %s", allow_patterns, e)
75
- return local_dir
76
-
77
-
78
  def list_repo_files_under(prefix: str) -> List[str]:
79
- """List files under a path-prefix in the private dataset."""
80
  try:
81
  all_files = API.list_repo_files(repo_id=RESULTS_REPO, repo_type="dataset")
82
  except Exception as e:
@@ -86,62 +50,12 @@ def list_repo_files_under(prefix: str) -> List[str]:
86
  return [f for f in all_files if f.startswith(prefix)]
87
 
88
 
89
- # --------------------------- upload helpers ----------------------------------
90
- def upload_bytes(path_in_repo: str, body: bytes, commit_message: str,
91
- parent_commit: Optional[str] = None) -> None:
92
- kwargs: Dict[str, Any] = dict(
93
  path_or_fileobj=BytesIO(body),
94
  path_in_repo=path_in_repo,
95
  repo_id=RESULTS_REPO,
96
  repo_type="dataset",
97
  commit_message=commit_message,
98
  )
99
- if parent_commit:
100
- kwargs["parent_commit"] = parent_commit
101
- API.upload_file(**kwargs)
102
-
103
-
104
- def upload_json(path_in_repo: str, payload: dict, commit_message: str,
105
- parent_commit: Optional[str] = None) -> None:
106
- body = json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8")
107
- upload_bytes(path_in_repo, body, commit_message, parent_commit)
108
-
109
-
110
- def delete_file(path_in_repo: str, commit_message: str) -> bool:
111
- try:
112
- API.delete_file(
113
- path_in_repo=path_in_repo,
114
- repo_id=RESULTS_REPO,
115
- repo_type="dataset",
116
- commit_message=commit_message,
117
- )
118
- return True
119
- except (EntryNotFoundError, HfHubHTTPError) as e:
120
- logger.warning("delete_file %s failed: %s", path_in_repo, e)
121
- return False
122
-
123
-
124
- def cas_update_json(path_in_repo: str,
125
- mutate_fn: Callable[[Optional[dict]], Optional[dict]],
126
- commit_message: str) -> Optional[dict]:
127
- """Optimistic CAS: read -> mutate -> write with parent_commit.
128
-
129
- `mutate_fn(current_or_None)` returns the new payload or None to abort.
130
- Returns the persisted payload, or the current one if aborted.
131
- """
132
- for attempt in range(CAS_MAX_RETRIES):
133
- existing = read_json(path_in_repo)
134
- current, parent = (existing[0], existing[1]) if existing else (None, _parent_sha())
135
- new_payload = mutate_fn(current)
136
- if new_payload is None:
137
- return current
138
- try:
139
- upload_json(path_in_repo, new_payload, commit_message, parent)
140
- return new_payload
141
- except HfHubHTTPError as e:
142
- status = getattr(e.response, "status_code", None)
143
- if status in (409, 412):
144
- time.sleep(0.4 * (2 ** attempt))
145
- continue
146
- raise
147
- raise RuntimeError(f"CAS exhausted for {path_in_repo}")
 
1
+ """Minimal hub I/O for the Frontend Space (v0.5).
2
 
3
+ Frontend talks to the dataset only via a tiny mailbox:
4
+ * `upload_json(submissions/<sid>.json, ...)` — write pending submission
5
+ * `download_file(public/queue.json)` — read queue snapshot
6
+ * `download_file(results/<task>/<team>_best.json)` — read leaderboard rows
7
+ * `list_repo_files_under(results/<task>)` — enumerate leaderboard entries
8
+
9
+ All CAS / progress / lease logic lives on the Worker (against its /data disk),
10
+ so this module no longer ships those helpers.
11
  """
12
  from __future__ import annotations
13
 
14
  import json
15
  import logging
 
 
16
  from io import BytesIO
17
+ from typing import List, Optional
18
 
19
+ from huggingface_hub import hf_hub_download
20
+ from huggingface_hub.utils import EntryNotFoundError, HfHubHTTPError
21
 
22
+ from src.envs import API, RESULTS_REPO
 
 
 
 
 
23
 
24
  logger = logging.getLogger(__name__)
25
 
26
 
 
 
 
 
 
 
 
 
27
  def download_file(path_in_repo: str, force: bool = True) -> Optional[str]:
28
  try:
29
  return hf_hub_download(
 
40
  raise
41
 
42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  def list_repo_files_under(prefix: str) -> List[str]:
 
44
  try:
45
  all_files = API.list_repo_files(repo_id=RESULTS_REPO, repo_type="dataset")
46
  except Exception as e:
 
50
  return [f for f in all_files if f.startswith(prefix)]
51
 
52
 
53
+ def upload_json(path_in_repo: str, payload: dict, commit_message: str) -> None:
54
+ body = json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8")
55
+ API.upload_file(
 
56
  path_or_fileobj=BytesIO(body),
57
  path_in_repo=path_in_repo,
58
  repo_id=RESULTS_REPO,
59
  repo_type="dataset",
60
  commit_message=commit_message,
61
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/storage/progress.py DELETED
@@ -1,159 +0,0 @@
1
- """progress.json CAS helpers (per-submission scene results + lease)."""
2
- from __future__ import annotations
3
-
4
- from datetime import datetime, timedelta, timezone
5
- from typing import Dict, Optional
6
-
7
- from src.envs import DS_SUBMISSIONS_DIR, LEASE_SECONDS, WORKER_ID
8
- from src.storage.hub import cas_update_json, read_json
9
-
10
-
11
- def _now() -> datetime:
12
- return datetime.now(timezone.utc)
13
-
14
-
15
- def _iso(dt: datetime) -> str:
16
- return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
17
-
18
-
19
- def progress_path(sid: str) -> str:
20
- return f"{DS_SUBMISSIONS_DIR}/{sid}.progress.json"
21
-
22
-
23
- def submission_path(sid: str) -> str:
24
- return f"{DS_SUBMISSIONS_DIR}/{sid}.json"
25
-
26
-
27
- def load_progress(sid: str) -> Dict:
28
- """Returns the progress dict (possibly empty skeleton)."""
29
- existing = read_json(progress_path(sid))
30
- if existing is None:
31
- return {
32
- "submission_id": sid,
33
- "scenes_total": 0,
34
- "scene_results": {},
35
- "scene_failures": {},
36
- "last_worker": None,
37
- "last_updated_at": None,
38
- }
39
- return existing[0]
40
-
41
-
42
- def init_progress_if_absent(sid: str, scenes_total: int) -> None:
43
- def mutate(current):
44
- if current is not None:
45
- return None
46
- return {
47
- "submission_id": sid,
48
- "scenes_total": scenes_total,
49
- "scene_results": {},
50
- "scene_failures": {},
51
- "last_worker": WORKER_ID,
52
- "last_updated_at": _iso(_now()),
53
- }
54
- cas_update_json(progress_path(sid), mutate, f"init progress {sid}")
55
-
56
-
57
- def add_scene_result(sid: str, scene_id: str, metrics: Dict[str, float]) -> Dict:
58
- def mutate(current):
59
- cur = current or {
60
- "submission_id": sid, "scenes_total": 0,
61
- "scene_results": {}, "scene_failures": {},
62
- }
63
- cur["scene_results"] = dict(cur.get("scene_results", {}))
64
- cur["scene_failures"] = dict(cur.get("scene_failures", {}))
65
- cur["scene_results"][scene_id] = {"metrics": metrics, "ts": _iso(_now())}
66
- cur["scene_failures"].pop(scene_id, None)
67
- cur["last_worker"] = WORKER_ID
68
- cur["last_updated_at"] = _iso(_now())
69
- return cur
70
- return cas_update_json(progress_path(sid), mutate, f"scene {scene_id} OK ({sid})") or {}
71
-
72
-
73
- def add_scene_failure(sid: str, scene_id: str, reason: str) -> Dict:
74
- def mutate(current):
75
- cur = current or {
76
- "submission_id": sid, "scenes_total": 0,
77
- "scene_results": {}, "scene_failures": {},
78
- }
79
- cur["scene_results"] = dict(cur.get("scene_results", {}))
80
- cur["scene_failures"] = dict(cur.get("scene_failures", {}))
81
- cur["scene_failures"][scene_id] = reason[:240]
82
- cur["last_worker"] = WORKER_ID
83
- cur["last_updated_at"] = _iso(_now())
84
- return cur
85
- return cas_update_json(progress_path(sid), mutate, f"scene {scene_id} FAIL ({sid})") or {}
86
-
87
-
88
- # --------------------- submission CAS (status + lease) -----------------------
89
- def claim_submission(sid: str) -> Optional[dict]:
90
- """Atomic pending → running (+ lease). Returns running payload or None."""
91
- holder = {"payload": None}
92
-
93
- def mutate(current):
94
- if current is None or current.get("status") != "pending":
95
- return None
96
- new = dict(current)
97
- new["status"] = "running"
98
- new["started_at"] = _iso(_now())
99
- new["worker_attempt"] = int(current.get("worker_attempt", 0)) + 1
100
- new["lease_expires_at"] = _iso(_now() + timedelta(seconds=LEASE_SECONDS))
101
- new["last_worker"] = WORKER_ID
102
- holder["payload"] = new
103
- return new
104
-
105
- cas_update_json(submission_path(sid), mutate, f"claim {sid}")
106
- return holder["payload"]
107
-
108
-
109
- def reclaim_submission(sid: str) -> Optional[dict]:
110
- """Take over a running submission whose lease has expired."""
111
- holder = {"payload": None}
112
-
113
- def mutate(current):
114
- if current is None or current.get("status") != "running":
115
- return None
116
- lease = current.get("lease_expires_at")
117
- if lease and lease > _iso(_now()):
118
- return None # still leased by someone else
119
- new = dict(current)
120
- new["worker_attempt"] = int(current.get("worker_attempt", 0)) + 1
121
- new["lease_expires_at"] = _iso(_now() + timedelta(seconds=LEASE_SECONDS))
122
- new["last_worker"] = WORKER_ID
123
- holder["payload"] = new
124
- return new
125
-
126
- cas_update_json(submission_path(sid), mutate, f"reclaim {sid}")
127
- return holder["payload"]
128
-
129
-
130
- def renew_lease(sid: str, scene_counts: Optional[Dict[str, int]] = None) -> None:
131
- def mutate(current):
132
- if current is None or current.get("status") != "running":
133
- return None
134
- new = dict(current)
135
- new["lease_expires_at"] = _iso(_now() + timedelta(seconds=LEASE_SECONDS))
136
- new["last_worker"] = WORKER_ID
137
- if scene_counts:
138
- new["scenes_done"] = scene_counts.get("done", new.get("scenes_done", 0))
139
- new["scenes_failed"] = scene_counts.get("failed", new.get("scenes_failed", 0))
140
- return new
141
- cas_update_json(submission_path(sid), mutate, f"renew lease {sid}")
142
-
143
-
144
- def finalize_submission(sid: str, status: str, score: Optional[float],
145
- metrics: Optional[dict], error_message: Optional[str],
146
- scenes_done: int, scenes_failed: int) -> None:
147
- def mutate(current):
148
- base = current or {"submission_id": sid}
149
- new = dict(base)
150
- new["status"] = status
151
- new["completed_at"] = _iso(_now())
152
- new["lease_expires_at"] = None
153
- new["score"] = score
154
- new["metrics"] = metrics
155
- new["error_message"] = error_message
156
- new["scenes_done"] = scenes_done
157
- new["scenes_failed"] = scenes_failed
158
- return new
159
- cas_update_json(submission_path(sid), mutate, f"finalize {sid}={status}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/submission/check_validity.py DELETED
@@ -1,60 +0,0 @@
1
- """ZIP safe-extract — anti zip-bomb / path traversal."""
2
- from __future__ import annotations
3
-
4
- import os
5
- import zipfile
6
-
7
- from src.envs import MAX_EXTRACTED_BYTES, MAX_SCENE_ZIP_SIZE, MAX_ZIP_ENTRIES
8
-
9
-
10
- def validate_zip_filesize(zip_path: str) -> None:
11
- size = os.path.getsize(zip_path)
12
- if size > MAX_SCENE_ZIP_SIZE:
13
- raise ValueError(f"scene ZIP 大小 {size} 超过上限 {MAX_SCENE_ZIP_SIZE} 字节。")
14
-
15
-
16
- def safe_extract_zip(zip_path: str, dest_dir: str) -> None:
17
- dest_real = os.path.realpath(dest_dir)
18
- try:
19
- zf = zipfile.ZipFile(zip_path, "r")
20
- except zipfile.BadZipFile as e:
21
- raise ValueError(f"非法 ZIP 文件:{e}") from e
22
-
23
- with zf:
24
- infos = zf.infolist()
25
- if len(infos) > MAX_ZIP_ENTRIES:
26
- raise ValueError(f"ZIP 条目数 {len(infos)} 超过上限 {MAX_ZIP_ENTRIES}。")
27
- declared = sum(max(0, i.file_size) for i in infos)
28
- if declared > MAX_EXTRACTED_BYTES:
29
- raise ValueError(f"声明的解压字节 {declared} 超过上限 {MAX_EXTRACTED_BYTES}。")
30
-
31
- written = 0
32
- for info in infos:
33
- name = info.filename
34
- if name.startswith("/") or ".." in name.replace("\\", "/").split("/"):
35
- raise ValueError(f"非法路径条目:{name}")
36
- if (info.external_attr >> 16) & 0xF000 == 0xA000:
37
- raise ValueError(f"禁止符号链接:{name}")
38
- target = os.path.realpath(os.path.join(dest_dir, name))
39
- if not (target == dest_real or target.startswith(dest_real + os.sep)):
40
- raise ValueError(f"非法目标路径:{name}")
41
- if info.is_dir():
42
- os.makedirs(target, exist_ok=True)
43
- continue
44
- os.makedirs(os.path.dirname(target), exist_ok=True)
45
- with zf.open(info, "r") as src, open(target, "wb") as dst:
46
- while True:
47
- chunk = src.read(64 * 1024)
48
- if not chunk:
49
- break
50
- written += len(chunk)
51
- if written > MAX_EXTRACTED_BYTES:
52
- dst.close()
53
- try:
54
- os.remove(target)
55
- except OSError:
56
- pass
57
- raise ValueError(
58
- f"解压字节超过上限 {MAX_EXTRACTED_BYTES},疑似 zip 炸弹。"
59
- )
60
- dst.write(chunk)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/submission/frontend.py CHANGED
@@ -12,10 +12,9 @@ from src.envs import (
12
  API,
13
  DISPLAY_NAME_REGEX,
14
  DS_SUBMISSIONS_DIR,
15
- DS_TASK_MANIFEST_DIR,
16
  HF_REPO_ID_REGEX,
17
  )
18
- from src.storage.hub import read_json, upload_json
19
  from src.tasks import get_task
20
 
21
  logger = logging.getLogger(__name__)
@@ -53,14 +52,6 @@ def _check_user_dataset_exists(user_dataset: str) -> Optional[str]:
53
  return f"检查 dataset 失败:{e}"
54
 
55
 
56
- def _scenes_total_for(task_name: str) -> int:
57
- """Read scenes_total from task_manifest; 0 if absent."""
58
- existing = read_json(f"{DS_TASK_MANIFEST_DIR}/{task_name}.json")
59
- if existing is None:
60
- return 0
61
- return len(existing[0].get("scene_ids") or [])
62
-
63
-
64
  def submit_task(display_name: str, task_name: str, user_dataset: str) -> dict:
65
  display_name = (display_name or "").strip()
66
  task_name = (task_name or "").strip()
@@ -77,8 +68,9 @@ def submit_task(display_name: str, task_name: str, user_dataset: str) -> dict:
77
  task = get_task(task_name)
78
  team_id = user_dataset.split("/")[0]
79
  submission_id = uuid.uuid4().hex[:10]
80
- scenes_total = _scenes_total_for(task_name)
81
 
 
 
82
  payload = {
83
  "submission_id": submission_id,
84
  "team_id": team_id,
@@ -87,18 +79,7 @@ def submit_task(display_name: str, task_name: str, user_dataset: str) -> dict:
87
  "user_dataset": user_dataset,
88
  "submitted_at": _utc_now_iso(),
89
  "status": "pending",
90
- "started_at": None,
91
- "completed_at": None,
92
- "lease_expires_at": None,
93
  "primary_metric": task.primary_metric,
94
- "score": None,
95
- "metrics": None,
96
- "scenes_total": scenes_total,
97
- "scenes_done": 0,
98
- "scenes_failed": 0,
99
- "error_message": None,
100
- "worker_attempt": 0,
101
- "last_worker": None,
102
  }
103
  path = f"{DS_SUBMISSIONS_DIR}/{submission_id}.json"
104
  try:
@@ -109,13 +90,13 @@ def submit_task(display_name: str, task_name: str, user_dataset: str) -> dict:
109
  "message": f"❌ 提交写入失败:{e}",
110
  "submission_id": None}
111
 
 
112
  return {
113
  "status": "accepted",
114
  "submission_id": submission_id,
115
  "message": (
116
  f"✅ 已提交(id `{submission_id}`,状态:pending)。\n\n"
117
- f"队伍身份:`{team_id}` · 展示名:`{display_name}` · "
118
- f"该任务共 **{scenes_total}** 个 scene。\n\n"
119
  f"请在「Queue」标签页查看进度。"
120
  ),
121
  }
 
12
  API,
13
  DISPLAY_NAME_REGEX,
14
  DS_SUBMISSIONS_DIR,
 
15
  HF_REPO_ID_REGEX,
16
  )
17
+ from src.storage.hub import upload_json
18
  from src.tasks import get_task
19
 
20
  logger = logging.getLogger(__name__)
 
52
  return f"检查 dataset 失败:{e}"
53
 
54
 
 
 
 
 
 
 
 
 
55
  def submit_task(display_name: str, task_name: str, user_dataset: str) -> dict:
56
  display_name = (display_name or "").strip()
57
  task_name = (task_name or "").strip()
 
68
  task = get_task(task_name)
69
  team_id = user_dataset.split("/")[0]
70
  submission_id = uuid.uuid4().hex[:10]
 
71
 
72
+ # Frontend only writes the mailbox envelope. Worker fills in scenes_total
73
+ # and per-scene state into its own /data once it ingests this file.
74
  payload = {
75
  "submission_id": submission_id,
76
  "team_id": team_id,
 
79
  "user_dataset": user_dataset,
80
  "submitted_at": _utc_now_iso(),
81
  "status": "pending",
 
 
 
82
  "primary_metric": task.primary_metric,
 
 
 
 
 
 
 
 
83
  }
84
  path = f"{DS_SUBMISSIONS_DIR}/{submission_id}.json"
85
  try:
 
90
  "message": f"❌ 提交写入失败:{e}",
91
  "submission_id": None}
92
 
93
+
94
  return {
95
  "status": "accepted",
96
  "submission_id": submission_id,
97
  "message": (
98
  f"✅ 已提交(id `{submission_id}`,状态:pending)。\n\n"
99
+ f"队伍身份:`{team_id}` · 展示名:`{display_name}`\n\n"
 
100
  f"请在「Queue」标签页查看进度。"
101
  ),
102
  }
src/worker/__init__.py DELETED
File without changes
src/worker/loop.py DELETED
@@ -1,211 +0,0 @@
1
- """Worker poll loop:
2
-
3
- - on_startup: reclaim any `running` submission whose lease has expired.
4
- - tick: claim pending submissions and dispatch them to a thread pool.
5
- - dispatch concurrency = WORKER_CONCURRENCY (task-level parallelism).
6
-
7
- 扫描策略(避免全量重复下载)
8
- -------------------------------
9
- 维护一个内存级 _settled_sids (done/failed 的 sid 集合)。
10
- 每次 tick:
11
- 1. API.list_repo_files → 拿全部文件名 (单次 API call, 无数据传输)
12
- 2. 过滤掉 _settled_sids — 纯内存操作
13
- 3. 对剩余 sid 调用 hf_hub_download (本地 etag 缓存, 若文件未变则命中缓存不传数据)
14
- 4. finalize 时将 sid 加入 _settled_sids
15
-
16
- 随着时间推移, settled 集合越来越大, 实际下载量收敛到 O(pending + running), 而不是
17
- O(所有历史提交)。
18
- """
19
- from __future__ import annotations
20
-
21
- import json
22
- import logging
23
- import os
24
- import threading
25
- from concurrent.futures import ThreadPoolExecutor
26
- from datetime import datetime, timezone
27
- from typing import List, Optional
28
-
29
- from src.envs import (
30
- DATA_CACHE_PATH,
31
- DS_SUBMISSIONS_DIR,
32
- POLL_INTERVAL_SECONDS,
33
- WORKER_CONCURRENCY,
34
- WORKER_ID,
35
- )
36
- from src.storage import progress as progress_store
37
- from src.storage.hub import download_file, list_repo_files_under
38
- from src.worker.runner import process_submission, shutdown_requested
39
-
40
- logger = logging.getLogger(__name__)
41
-
42
- _executor = ThreadPoolExecutor(max_workers=max(1, WORKER_CONCURRENCY),
43
- thread_name_prefix="worker")
44
- _in_flight_lock = threading.Lock()
45
- _in_flight: set[str] = set()
46
-
47
- # ---- settled set: sids whose status is done/failed (terminal, never re-dispatch) ----
48
- _settled_lock = threading.Lock()
49
- _settled_sids: set[str] = set()
50
-
51
- _TERMINAL_STATUSES = {"done", "failed"}
52
-
53
-
54
- def _mark_settled(sid: str) -> None:
55
- with _settled_lock:
56
- _settled_sids.add(sid)
57
-
58
-
59
- def _is_settled(sid: str) -> bool:
60
- with _settled_lock:
61
- return sid in _settled_sids
62
-
63
-
64
- def _now_iso() -> str:
65
- return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
66
-
67
-
68
- def _parse_sid(filename: str) -> Optional[str]:
69
- """submissions/<sid>.json → sid. Returns None for progress files."""
70
- bn = os.path.basename(filename)
71
- if bn.endswith(".progress.json") or not bn.endswith(".json"):
72
- return None
73
- return bn[: -len(".json")]
74
-
75
-
76
- def _fetch_submission(sid: str) -> Optional[dict]:
77
- """Download (or serve from etag cache) one submission JSON. Returns None on error."""
78
- path = f"{DS_SUBMISSIONS_DIR}/{sid}.json"
79
- local = download_file(path, force=False) # force=False → use etag cache
80
- if local is None:
81
- return None
82
- try:
83
- with open(local, "r", encoding="utf-8") as f:
84
- return json.load(f)
85
- except Exception as e:
86
- logger.warning("malformed submission %s: %s", sid, e)
87
- return None
88
-
89
-
90
- def _scan_submissions() -> List[dict]:
91
- """
92
- 1. list_repo_files → all submission filenames (1 API call, no data download)
93
- 2. skip settled sids
94
- 3. hf_hub_download each remaining sid (etag-cached)
95
- """
96
- all_files = list_repo_files_under(DS_SUBMISSIONS_DIR)
97
- rows: List[dict] = []
98
- for fpath in all_files:
99
- sid = _parse_sid(fpath)
100
- if sid is None or _is_settled(sid):
101
- continue
102
- data = _fetch_submission(sid)
103
- if data is None:
104
- continue
105
- # Opportunistically update settled set from whatever we downloaded
106
- if data.get("status") in _TERMINAL_STATUSES:
107
- _mark_settled(sid)
108
- continue
109
- rows.append(data)
110
- return rows
111
-
112
-
113
- def _list_pending(rows: List[dict]) -> List[dict]:
114
- pend = [r for r in rows if r.get("status") == "pending"]
115
- pend.sort(key=lambda d: d.get("submitted_at", ""))
116
- return pend
117
-
118
-
119
- def _list_expired_running(rows: List[dict]) -> List[dict]:
120
- now_iso = _now_iso()
121
- out = []
122
- for r in rows:
123
- if r.get("status") != "running":
124
- continue
125
- lease = r.get("lease_expires_at")
126
- if lease and lease < now_iso:
127
- out.append(r)
128
- return out
129
-
130
-
131
- def _try_dispatch(claimed: dict) -> bool:
132
- sid = claimed["submission_id"]
133
- with _in_flight_lock:
134
- if sid in _in_flight:
135
- return False
136
- _in_flight.add(sid)
137
-
138
- def _run():
139
- try:
140
- logger.info("[%s] start sid=%s task=%s", WORKER_ID, sid, claimed.get("task_name"))
141
- process_submission(claimed)
142
- logger.info("[%s] end sid=%s", WORKER_ID, sid)
143
- # Mark settled so future ticks skip downloading this sid entirely.
144
- # (process_submission writes done/failed, so it IS terminal after this.)
145
- _mark_settled(sid)
146
- except Exception:
147
- logger.exception("[%s] dispatch crashed sid=%s", WORKER_ID, sid)
148
- finally:
149
- with _in_flight_lock:
150
- _in_flight.discard(sid)
151
-
152
- _executor.submit(_run)
153
- return True
154
-
155
-
156
- def boot_reclaim() -> int:
157
- """Reclaim any running-but-lease-expired submissions (call on startup)."""
158
- rows = _scan_submissions()
159
- expired = _list_expired_running(rows)
160
- n = 0
161
- for r in expired:
162
- if shutdown_requested():
163
- break
164
- claimed = progress_store.reclaim_submission(r["submission_id"])
165
- if claimed is None:
166
- continue
167
- if _try_dispatch(claimed):
168
- n += 1
169
- logger.info("[%s] boot_reclaim dispatched %d", WORKER_ID, n)
170
- return n
171
-
172
-
173
- def tick() -> int:
174
- """Periodic tick: claim pending up to free capacity, then dispatch."""
175
- if shutdown_requested():
176
- return 0
177
- with _in_flight_lock:
178
- free = max(0, WORKER_CONCURRENCY - len(_in_flight))
179
- if free == 0:
180
- logger.debug("[%s] tick: no free slots", WORKER_ID)
181
- return 0
182
-
183
- rows = _scan_submissions()
184
- pending = _list_pending(rows)
185
- if not pending:
186
- return 0
187
- logger.info("[%s] tick: %d pending, %d free slots", WORKER_ID, len(pending), free)
188
- dispatched = 0
189
- for r in pending:
190
- if dispatched >= free or shutdown_requested():
191
- break
192
- claimed = progress_store.claim_submission(r["submission_id"])
193
- if claimed is None:
194
- continue
195
- if _try_dispatch(claimed):
196
- dispatched += 1
197
- return dispatched
198
-
199
-
200
- def in_flight_snapshot() -> List[str]:
201
- with _in_flight_lock:
202
- return sorted(_in_flight)
203
-
204
-
205
- def shutdown_executor(wait: bool = True) -> None:
206
- _executor.shutdown(wait=wait)
207
-
208
-
209
- # poll interval re-export for the worker entry to schedule it
210
- __all__ = ["tick", "boot_reclaim", "in_flight_snapshot", "shutdown_executor",
211
- "POLL_INTERVAL_SECONDS"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/worker/runner.py DELETED
@@ -1,291 +0,0 @@
1
- """Per-submission scene pipeline.
2
-
3
- Assumptions:
4
- - The submission has already been CAS-claimed (status=running, lease set).
5
- - This module is responsible for:
6
- * loading the task manifest
7
- * iterating scenes (skipping settled ones — resume-friendly)
8
- * downloading & evaluating each scene
9
- * updating progress.json + renewing lease
10
- * finalizing the submission once all scenes are settled
11
- """
12
- from __future__ import annotations
13
-
14
- import logging
15
- import os
16
- import shutil
17
- import tempfile
18
- import threading
19
- from collections import OrderedDict
20
- from datetime import datetime, timezone
21
- from typing import Any, Dict, List, Optional, Tuple
22
-
23
- from huggingface_hub import hf_hub_download, snapshot_download
24
- from huggingface_hub.utils import EntryNotFoundError, HfHubHTTPError
25
-
26
- from src.envs import (
27
- DOWNLOADS_CACHE_PATH,
28
- DS_GROUND_TRUTH_DIR,
29
- DS_TASK_MANIFEST_DIR,
30
- GT_CACHE_PATH,
31
- GT_LRU_SIZE,
32
- RESULTS_REPO,
33
- TOKEN,
34
- WORKER_ID,
35
- )
36
- from src.storage import progress as progress_store
37
- from src.storage.archive import append_log
38
- from src.storage.hub import cas_update_json, read_json
39
- from src.submission.check_validity import safe_extract_zip, validate_zip_filesize
40
- from src.tasks import get_task
41
- from src.tasks.base import TaskPlugin
42
-
43
- logger = logging.getLogger(__name__)
44
-
45
- # Cooperative shutdown signal (set by worker entry on SIGTERM/SIGINT).
46
- _shutdown_event = threading.Event()
47
-
48
-
49
- def request_shutdown() -> None:
50
- _shutdown_event.set()
51
-
52
-
53
- def shutdown_requested() -> bool:
54
- return _shutdown_event.is_set()
55
-
56
-
57
- # --------------------------- GT cache (LRU) ----------------------------------
58
- _gt_lock = threading.Lock()
59
- _gt_lru: "OrderedDict[Tuple[str, str], Any]" = OrderedDict()
60
-
61
-
62
- def _load_gt_dir(task_name: str, scene_id: str) -> str:
63
- """Snapshot GT dir for this scene into local cache; return local path."""
64
- local_root = snapshot_download(
65
- repo_id=RESULTS_REPO,
66
- repo_type="dataset",
67
- local_dir=GT_CACHE_PATH,
68
- allow_patterns=[f"{DS_GROUND_TRUTH_DIR}/{scene_id}/**"],
69
- etag_timeout=30,
70
- )
71
- return os.path.join(local_root, DS_GROUND_TRUTH_DIR, scene_id)
72
-
73
-
74
- def get_gt_scene(task: TaskPlugin, scene_id: str) -> Any:
75
- key = (task.name, scene_id)
76
- with _gt_lock:
77
- if key in _gt_lru:
78
- _gt_lru.move_to_end(key)
79
- return _gt_lru[key]
80
- gt_dir = _load_gt_dir(task.name, scene_id)
81
- gt_obj = task.load_gt_scene(scene_id, gt_dir)
82
- with _gt_lock:
83
- _gt_lru[key] = gt_obj
84
- _gt_lru.move_to_end(key)
85
- while len(_gt_lru) > GT_LRU_SIZE:
86
- _gt_lru.popitem(last=False)
87
- return gt_obj
88
-
89
-
90
- # --------------------------- helpers ----------------------------------------
91
- def _utc_now_iso() -> str:
92
- return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
93
-
94
-
95
- def _read_manifest(task_name: str) -> Optional[dict]:
96
- res = read_json(f"{DS_TASK_MANIFEST_DIR}/{task_name}.json")
97
- return res[0] if res else None
98
-
99
-
100
- def _scene_filename(task_name: str, scene_id: str, layout: Optional[str]) -> str:
101
- if layout:
102
- return (layout
103
- .replace("<task>", task_name)
104
- .replace("<scene_id>", scene_id))
105
- return f"predictions/{task_name}/{scene_id}.zip"
106
-
107
-
108
- # --------------------------- main entry --------------------------------------
109
- def process_submission(sub: dict) -> None:
110
- """Run the scene pipeline for one (already-claimed) submission.
111
-
112
- Safe to call again on the same sub after a crash — it will resume.
113
- """
114
- sid = sub["submission_id"]
115
- task_name = sub["task_name"]
116
- team_id = sub.get("team_id") or sub.get("user_dataset", "/").split("/")[0]
117
- user_dataset = sub["user_dataset"]
118
-
119
- try:
120
- task = get_task(task_name)
121
- except ValueError as e:
122
- progress_store.finalize_submission(
123
- sid, "failed", None, None, f"unknown task: {e}", 0, 0,
124
- )
125
- append_log({"submission_id": sid, "ts": _utc_now_iso(),
126
- "status": "Failed", "error_message": str(e)})
127
- return
128
-
129
- manifest = _read_manifest(task_name)
130
- if manifest is None or not manifest.get("scene_ids"):
131
- msg = f"task_manifest/{task_name}.json 不存在或为空。"
132
- progress_store.finalize_submission(sid, "failed", None, None, msg, 0, 0)
133
- append_log({"submission_id": sid, "ts": _utc_now_iso(),
134
- "status": "Failed", "error_message": msg})
135
- return
136
-
137
- scene_ids: List[str] = list(manifest["scene_ids"])
138
- layout = manifest.get("scene_layout")
139
-
140
- progress_store.init_progress_if_absent(sid, len(scene_ids))
141
- prog = progress_store.load_progress(sid)
142
-
143
- # ---- per-scene loop ----
144
- for scene_id in scene_ids:
145
- if shutdown_requested():
146
- logger.info("[%s] %s shutdown requested, exiting", WORKER_ID, sid)
147
- return
148
- if scene_id in prog.get("scene_results", {}) or scene_id in prog.get("scene_failures", {}):
149
- continue
150
-
151
- zip_path: Optional[str] = None
152
- scene_target_dir = os.path.join(DOWNLOADS_CACHE_PATH, sid, scene_id)
153
- try:
154
- os.makedirs(scene_target_dir, exist_ok=True)
155
- zip_path = hf_hub_download(
156
- repo_id=user_dataset,
157
- repo_type="dataset",
158
- filename=_scene_filename(task_name, scene_id, layout),
159
- token=TOKEN,
160
- local_dir=scene_target_dir,
161
- force_download=True,
162
- )
163
- validate_zip_filesize(zip_path)
164
- with tempfile.TemporaryDirectory(prefix=f"bench_{sid}_{scene_id}_") as sandbox:
165
- safe_extract_zip(zip_path, sandbox)
166
- task.validate_scene(sandbox)
167
- gt = get_gt_scene(task, scene_id)
168
- metrics = task.evaluate_scene(sandbox, gt)
169
- prog = progress_store.add_scene_result(sid, scene_id, metrics)
170
- except EntryNotFoundError:
171
- prog = progress_store.add_scene_failure(
172
- sid, scene_id, "missing: scene zip not found in user dataset"
173
- )
174
- except HfHubHTTPError as e:
175
- status = getattr(e.response, "status_code", "?")
176
- prog = progress_store.add_scene_failure(
177
- sid, scene_id, f"download error (status {status})"
178
- )
179
- except ValueError as e:
180
- prog = progress_store.add_scene_failure(sid, scene_id, str(e))
181
- except Exception as e:
182
- logger.exception("[%s] scene eval crashed sid=%s scene=%s", WORKER_ID, sid, scene_id)
183
- prog = progress_store.add_scene_failure(
184
- sid, scene_id, "internal evaluation error",
185
- )
186
- finally:
187
- if zip_path:
188
- try:
189
- os.remove(zip_path)
190
- except OSError:
191
- pass
192
- shutil.rmtree(scene_target_dir, ignore_errors=True)
193
-
194
- # renew lease + update scene counters in submission json
195
- progress_store.renew_lease(sid, {
196
- "done": len(prog.get("scene_results", {})),
197
- "failed": len(prog.get("scene_failures", {})),
198
- })
199
-
200
- # ---- finalize ----
201
- settled = set(prog.get("scene_results", {})) | set(prog.get("scene_failures", {}))
202
- if not settled.issuperset(scene_ids):
203
- # incomplete (e.g. shutdown). Leave running; next worker picks up.
204
- return
205
-
206
- per_scene_metrics = {
207
- sid_: entry["metrics"]
208
- for sid_, entry in prog.get("scene_results", {}).items()
209
- }
210
- try:
211
- agg = task.aggregate(per_scene_metrics)
212
- except Exception as e:
213
- logger.exception("[%s] aggregate failed sid=%s", WORKER_ID, sid)
214
- progress_store.finalize_submission(
215
- sid, "failed", None, None, "aggregation error",
216
- len(prog.get("scene_results", {})),
217
- len(prog.get("scene_failures", {})),
218
- )
219
- append_log({"submission_id": sid, "ts": _utc_now_iso(),
220
- "status": "Failed", "error_message": f"aggregate: {e}"})
221
- return
222
-
223
- primary = agg.get(task.primary_metric) if agg else None
224
- failures = len(prog.get("scene_failures", {}))
225
- successes = len(prog.get("scene_results", {}))
226
- final_status = "done" if successes > 0 else "failed"
227
- err_msg = None if final_status == "done" else "no scene evaluated successfully"
228
-
229
- progress_store.finalize_submission(
230
- sid, final_status,
231
- float(primary) if primary is not None else None,
232
- agg if final_status == "done" else None,
233
- err_msg,
234
- successes, failures,
235
- )
236
-
237
- if final_status == "done":
238
- try:
239
- is_new, prev = _maybe_update_best(task, team_id, sub.get("display_name", team_id),
240
- float(primary), agg, sid)
241
- except Exception:
242
- logger.exception("[%s] best-update failed sid=%s", WORKER_ID, sid)
243
- is_new, prev = False, None
244
- append_log({
245
- "submission_id": sid, "ts": _utc_now_iso(),
246
- "status": "Success", "team_id": team_id, "task_name": task_name,
247
- "score": primary, "metrics": agg,
248
- "is_new_record": is_new, "previous_best": prev,
249
- "scenes_done": successes, "scenes_failed": failures,
250
- })
251
- else:
252
- append_log({
253
- "submission_id": sid, "ts": _utc_now_iso(),
254
- "status": "Failed", "team_id": team_id, "task_name": task_name,
255
- "error_message": err_msg,
256
- "scenes_done": successes, "scenes_failed": failures,
257
- })
258
-
259
-
260
- def _maybe_update_best(task: TaskPlugin, team_id: str, display_name: str,
261
- primary_score: float, metrics: dict, submission_id: str
262
- ) -> Tuple[bool, Optional[float]]:
263
- from src.envs import DS_RESULTS_DIR
264
- path = f"{DS_RESULTS_DIR}/{task.name}/{team_id}_best.json"
265
- holder = {"prev": None, "is_new": False}
266
-
267
- def mutate(current):
268
- if current is None:
269
- holder["prev"] = None
270
- beats = True
271
- else:
272
- holder["prev"] = float(current.get("score", 0))
273
- prev = holder["prev"]
274
- beats = (primary_score > prev) if task.higher_is_better else (primary_score < prev)
275
- if not beats:
276
- return None
277
- holder["is_new"] = True
278
- return {
279
- "team_id": team_id,
280
- "display_name": display_name,
281
- "task_name": task.name,
282
- "score": primary_score,
283
- "primary_metric": task.primary_metric,
284
- "metrics": metrics,
285
- "scenes_count": int(metrics.get("scenes_count", 0)) if isinstance(metrics, dict) else 0,
286
- "updated_at": _utc_now_iso(),
287
- "submission_id": submission_id,
288
- }
289
-
290
- cas_update_json(path, mutate, f"best {task.name}/{team_id}")
291
- return holder["is_new"], holder["prev"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
worker.py DELETED
@@ -1,173 +0,0 @@
1
- """Worker Space entry — pure backend.
2
-
3
- No Gradio: HF Spaces only requires the container to expose a port; we serve a
4
- minimal stdlib HTTP endpoint for health/status. Configure the Space with
5
- `sdk: docker` and a Dockerfile that runs `python worker.py`.
6
- """
7
- from __future__ import annotations
8
-
9
- import json
10
- import logging
11
- import signal
12
- import threading
13
- from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
14
-
15
- from apscheduler.schedulers.background import BackgroundScheduler
16
-
17
- from src.envs import (
18
- API,
19
- ARCHIVE_INTERVAL_SECONDS,
20
- POLL_INTERVAL_SECONDS,
21
- WORKER_CONCURRENCY,
22
- WORKER_ID,
23
- WORKER_REPO_ID,
24
- )
25
- from src.storage.archive import archive_old_submissions
26
- from src.tasks import TASKS
27
- from src.worker.loop import (
28
- boot_reclaim,
29
- in_flight_snapshot,
30
- shutdown_executor,
31
- tick,
32
- )
33
- from src.worker.runner import request_shutdown, shutdown_requested
34
-
35
- logging.basicConfig(
36
- level=logging.INFO,
37
- format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
38
- )
39
- logger = logging.getLogger(__name__)
40
-
41
- HTTP_PORT = 7860 # HF Spaces default exposed port
42
- _last_status = {"text": "scheduler initialising", "last_tick_dispatched": 0}
43
-
44
-
45
- # --------------------------- scheduled jobs ---------------------------------
46
- def _scheduled_tick() -> None:
47
- if shutdown_requested():
48
- return
49
- try:
50
- n = tick()
51
- _last_status["text"] = (
52
- f"last tick dispatched {n}; in-flight={in_flight_snapshot()}"
53
- )
54
- _last_status["last_tick_dispatched"] = n
55
- except Exception as e:
56
- logger.exception("tick crashed")
57
- _last_status["text"] = f"last tick crashed: {e}"
58
-
59
-
60
- def _scheduled_archive() -> None:
61
- if shutdown_requested():
62
- return
63
- try:
64
- n = archive_old_submissions()
65
- if n:
66
- logger.info("[%s] archived %d submissions", WORKER_ID, n)
67
- except Exception:
68
- logger.exception("archive cron crashed")
69
-
70
-
71
- def _restart_space() -> None:
72
- try:
73
- API.restart_space(repo_id=WORKER_REPO_ID)
74
- except Exception as e:
75
- logger.warning("restart_space failed: %s", e)
76
-
77
-
78
- # --------------------------- health endpoint --------------------------------
79
- class _HealthHandler(BaseHTTPRequestHandler):
80
- def _send(self, code: int, body: bytes, ctype: str = "application/json") -> None:
81
- self.send_response(code)
82
- self.send_header("Content-Type", ctype)
83
- self.send_header("Content-Length", str(len(body)))
84
- self.end_headers()
85
- self.wfile.write(body)
86
-
87
- def do_GET(self): # noqa: N802 (stdlib signature)
88
- if self.path in ("/", "/status", "/healthz"):
89
- body = {
90
- "worker_id": WORKER_ID,
91
- "concurrency": WORKER_CONCURRENCY,
92
- "poll_interval_seconds": POLL_INTERVAL_SECONDS,
93
- "in_flight": in_flight_snapshot(),
94
- "shutdown_requested": shutdown_requested(),
95
- "last_status": _last_status["text"],
96
- "registered_tasks": list(TASKS.keys()),
97
- }
98
- self._send(200, json.dumps(body, ensure_ascii=False, indent=2).encode("utf-8"))
99
- else:
100
- self._send(404, b'{"error":"not found"}')
101
-
102
- def log_message(self, format, *args): # silence default access logging
103
- return
104
-
105
-
106
- def _start_http_server() -> ThreadingHTTPServer:
107
- server = ThreadingHTTPServer(("0.0.0.0", HTTP_PORT), _HealthHandler)
108
- threading.Thread(
109
- target=server.serve_forever, name="health-http", daemon=True
110
- ).start()
111
- logger.info("[%s] health server on :%d", WORKER_ID, HTTP_PORT)
112
- return server
113
-
114
-
115
- # --------------------------- signal handlers --------------------------------
116
- def _install_signal_handlers() -> None:
117
- def _handler(signum, frame):
118
- logger.info(
119
- "[%s] received signal %s; requesting graceful shutdown",
120
- WORKER_ID, signum,
121
- )
122
- request_shutdown()
123
-
124
- for s in (signal.SIGTERM, signal.SIGINT):
125
- try:
126
- signal.signal(s, _handler)
127
- except (ValueError, OSError):
128
- pass
129
-
130
-
131
- # --------------------------- main -------------------------------------------
132
- def main() -> None:
133
- _install_signal_handlers()
134
-
135
- try:
136
- boot_reclaim()
137
- except Exception:
138
- logger.exception("boot_reclaim failed")
139
-
140
- scheduler = BackgroundScheduler()
141
- scheduler.add_job(_scheduled_tick, "interval", seconds=POLL_INTERVAL_SECONDS)
142
- scheduler.add_job(_scheduled_archive, "interval", seconds=ARCHIVE_INTERVAL_SECONDS)
143
- scheduler.add_job(_restart_space, "interval", seconds=6 * 3600)
144
- scheduler.start()
145
-
146
- server = _start_http_server()
147
- logger.info("[%s] worker up; poll=%ds concurrency=%d",
148
- WORKER_ID, POLL_INTERVAL_SECONDS, WORKER_CONCURRENCY)
149
-
150
- # Block main thread until shutdown is signalled.
151
- while not shutdown_requested():
152
- try:
153
- signal.pause() # POSIX: wake on signal
154
- except (AttributeError, KeyboardInterrupt):
155
- # Windows or Ctrl-C inside debugger: fall back to busy-wait
156
- import time
157
- time.sleep(1.0)
158
-
159
- logger.info("[%s] shutting down...", WORKER_ID)
160
- try:
161
- scheduler.shutdown(wait=False)
162
- except Exception:
163
- pass
164
- try:
165
- server.shutdown()
166
- except Exception:
167
- pass
168
- shutdown_executor(wait=True)
169
- logger.info("[%s] bye", WORKER_ID)
170
-
171
-
172
- if __name__ == "__main__":
173
- main()