lvkaokao commited on
Commit
a09ebea
·
1 Parent(s): 05268c0

enhance retry logic.

Browse files

Signed-off-by: lkk12014402 <kaokao.lv@intel.com>

docs/eta_logic.md ADDED
@@ -0,0 +1,101 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ETA 预估等待时间 — 运行逻辑与计算公式
2
+
3
+ ## 时序图
4
+
5
+ ```mermaid
6
+ sequenceDiagram
7
+ participant User as 用户 (Browser)
8
+ participant App as app.py
9
+ participant Submit as submit.py
10
+ participant Upload as _upload_to_hub()
11
+ participant StatusDir as status/ 目录
12
+ participant ETA as queue_eta.py
13
+
14
+ User->>App: 点击 Submit 按钮
15
+ App->>Submit: submit_model() / submit_quant()
16
+
17
+ Note over Submit: 1. 验证模型存在性
18
+ Note over Submit: 2. 构建 eval_entry (status="Pending")
19
+ Note over Submit: 3. Dedup 检查
20
+
21
+ Submit->>Upload: _upload_to_hub(eval_entry)
22
+ Upload->>StatusDir: 写入 status/{user}/{prefix}.json<br/>{"status": "Pending", "script": "auto_eval", ...}
23
+ Upload-->>Submit: 写入成功
24
+
25
+ Note over Submit: 4. 更新内存 dedup 缓存<br/>_EVAL_REQUESTED.add(dedup_key)
26
+
27
+ Submit->>ETA: compute_single_eta(status_path, model_params, concurrency=2)
28
+
29
+ ETA->>StatusDir: os.walk(status/) 扫描所有 JSON 文件
30
+ StatusDir-->>ETA: 返回所有 status 条目
31
+
32
+ Note over ETA: 分类计数:<br/>• active_count: Running/Waiting/Triggered<br/>• pending_count: Pending + auto_*<br/>(包含刚写入的自身)
33
+
34
+ Note over ETA: 计算 ETA(见下方公式)
35
+
36
+ ETA-->>Submit: 返回 eta_hours
37
+ Submit->>Submit: format_eta(eta_hours) → "~3h"
38
+ Submit-->>User: "已提交!预计完成时间: ~3h"
39
+ ```
40
+
41
+ ## 计算公式
42
+
43
+ ```
44
+ ETA = running_remaining + ⌈queue_pos / concurrency⌉ × task_duration
45
+ ```
46
+
47
+ ### 各变量含义
48
+
49
+ | 变量 | 含义 | 计算方式 |
50
+ |------|------|----------|
51
+ | `queue_pos` | 当前模型在 Pending 队列中的位置 | `max(pending_count, 1)` — 自身已包含在内 |
52
+ | `concurrency` | 最大并发数 | 固定值 `2` |
53
+ | `task_duration` | 单任务预估时长 | 模型 ≤ 30B → 3h;模型 > 30B → 5h |
54
+ | `running_remaining` | 当前运行任务的剩余时间 | 有活跃任务: `sum(各活跃任务时长) / active_count`<br/>无活跃任务: `0` |
55
+ | `active_count` | 正在运行的任务数 | status 为 Running/Waiting/Triggered 的条目数 |
56
+ | `pending_count` | 排队等待的任务数 | status=Pending 且 script=auto_* 的条目数 |
57
+
58
+ ### 示例推演
59
+
60
+ **场景:空队列,concurrency=2,连续提交两个 ≤30B 模型**
61
+
62
+ ```
63
+ ┌─────────────┬────────┬──────────┬───────────┬─────────────────────┬─────┐
64
+ │ 提交顺序 │ active │ pending │ queue_pos │ 公式 │ ETA │
65
+ ├─────────────┼────────┼──────────┼───────────┼─────────────────────┼─────┤
66
+ │ 第1个模型 │ 0 │ 1(自身) │ 1 │ 0 + ⌈1/2⌉×3 = 1×3 │ 3h │
67
+ │ 第2个模型 │ 0 │ 2 │ 2 │ 0 + ⌈2/2⌉×3 = 1×3 │ 3h │
68
+ │ 第3个模型 │ 0 │ 3 │ 3 │ 0 + ⌈3/2⌉×3 = 2×3 │ 6h │
69
+ │ 第4个模型 │ 0 │ 4 │ 4 │ 0 + ⌈4/2⌉×3 = 2×3 │ 6h │
70
+ │ 第5个模型 │ 0 │ 5 │ 5 │ 0 + ⌈5/2⌉×3 = 3×3 │ 9h │
71
+ └─────────────┴────────┴──────────┴───────────┴─────────────────────┴─────┘
72
+ ```
73
+
74
+ **场景:1个任务正在运行,1个 Pending,提交新模型**
75
+
76
+ ```
77
+ active_count = 1, active 任务预估 3h
78
+ pending_count = 2 (已有1个 + 自身)
79
+ queue_pos = 2
80
+ running_remaining = 3/1 = 3h
81
+
82
+ ETA = 3 + ⌈2/2⌉×3 = 3 + 3 = 6h
83
+ ```
84
+
85
+ ## 关键代码路径
86
+
87
+ ```
88
+ submit.py: submit_model() / submit_quant()
89
+
90
+ ├── _upload_to_hub() ← 先写 status/ 文件
91
+ │ └── status/{user}/eval_request_xxx.json
92
+
93
+ ├── _EVAL_REQUESTED.add() ← 更新内存 dedup 缓存
94
+
95
+ └── compute_single_eta() ← 再计算 ETA
96
+
97
+ ├── os.walk(status/) ← 扫描(此时自身文件已存在)
98
+ ├── 分类: active vs pending
99
+ ├── queue_pos = max(pending_count, 1)
100
+ └── ETA = running_remaining + ⌈pos/concurrency⌉ × task_hours
101
+ ```
docs/retry_logic.md ADDED
@@ -0,0 +1,251 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # CI Dispatcher Retry 机制
2
+
3
+ ## 核心概念
4
+
5
+ Retry **不是立即重新执行**,而是把 status 重置为 `Pending`,重新进入队列排队。下一次被调度时才会执行,需要等待并发槽位空闲 + 30分钟 cooldown。
6
+
7
+ ### 状态数据源
8
+
9
+ | 数据源 | 位置 | 用途 |
10
+ |---|---|---|
11
+ | 本地 status 文件 | `lb_eval/status/<owner>/<model>.json` | 持久化层,git 管理,leaderboard UI 读取展示 |
12
+ | Azure REST API | `GET /pipelines/{id}/runs/{runId}` | 实时查询 pipeline 真实运行状态 |
13
+
14
+ 每个 scan 周期:`git pull` 拉最新 status → 对 active 条目调 Azure API 校验 → 修正 status → `git push`。
15
+
16
+ ---
17
+
18
+ ## 状态流转图
19
+
20
+ ```mermaid
21
+ stateDiagram-v2
22
+ [*] --> Pending: 用户提交
23
+ Pending --> Triggered: dispatcher 调 Azure API 触发 pipeline
24
+ Triggered --> Running: Azure 报 inProgress
25
+ Running --> Finished: CI 内 upload_results_github.py 写回
26
+
27
+ Triggered --> Pending: Azure 报 failed/canceled (retry)
28
+ Running --> Pending: Azure 报 failed/canceled (retry)
29
+ Running --> Pending: Azure succeeded 但超过1h无 Finished 写回 (retry)
30
+ Running --> Pending: Azure inProgress 超过8h (retry)
31
+ Triggered --> Pending: Azure API 不可达超过6h (retry)
32
+ Triggered --> Pending: 无 ci_run_id 超过6h (retry)
33
+
34
+ Pending --> Failed: retry_count >= 3
35
+
36
+ note right of Pending
37
+ retry 回到 Pending 后需等待:
38
+ 1. 30分钟 cooldown
39
+ 2. 并发槽位空闲
40
+ 3. 按提交时间排队
41
+ end note
42
+
43
+ note right of Failed
44
+ 终态,不再重试
45
+ 用户不可重复提交
46
+ 需人工介入
47
+ end note
48
+ ```
49
+
50
+ ---
51
+
52
+ ## Retry 时序图
53
+
54
+ ```mermaid
55
+ sequenceDiagram
56
+ participant S as APScheduler (每60s)
57
+ participant D as CIDispatcher
58
+ participant SF as lb_eval/status/ (git)
59
+ participant AZ as Azure DevOps API
60
+ participant CI as CI Pipeline
61
+
62
+ Note over S: ── Scan Cycle Start ──
63
+
64
+ S->>D: scan_and_dispatch()
65
+ D->>SF: git pull (同步最新)
66
+
67
+ rect rgb(255, 240, 240)
68
+ Note over D: Step 2: Reconcile Active Entries
69
+ D->>SF: 读取所有 active 条目 (Running/Triggered/Waiting)
70
+
71
+ loop 每个 active 条目
72
+ D->>AZ: GET /runs/{ci_run_id}
73
+ alt Azure: completed + failed/canceled
74
+ Note over D: 立即重置
75
+ D->>D: _apply_retry_limit(retry_count++)
76
+ alt retry_count < 3
77
+ D->>SF: status = Pending + last_failed_time = now
78
+ else retry_count >= 3
79
+ D->>SF: status = Failed (终态)
80
+ end
81
+ else Azure: completed + succeeded
82
+ alt triggered_time 距今 < 1h
83
+ Note over D: 在 grace period 内,等 CI 写回 Finished
84
+ else triggered_time 距今 > 1h
85
+ Note over D: 写回失败,按 retry 处理
86
+ D->>D: _apply_retry_limit(retry_count++)
87
+ D->>SF: status = Pending + last_failed_time = now
88
+ end
89
+ else Azure: inProgress
90
+ alt triggered_time 距今 < 8h
91
+ D->>SF: status = Running (正常)
92
+ else triggered_time 距今 > 8h
93
+ Note over D: Pipeline 挂起,按 retry 处理
94
+ D->>D: _apply_retry_limit(retry_count++)
95
+ D->>SF: status = Pending + last_failed_time = now
96
+ end
97
+ else Azure API 不可达
98
+ alt triggered_time 距今 < 6h
99
+ Note over D: 保持现状,等待恢复
100
+ else triggered_time 距今 > 6h
101
+ D->>D: _apply_retry_limit(retry_count++)
102
+ D->>SF: status = Pending + last_failed_time = now
103
+ end
104
+ end
105
+ end
106
+
107
+ D->>SF: git push (批量写回)
108
+ end
109
+
110
+ rect rgb(240, 255, 240)
111
+ Note over D: Step 3-4: Dispatch Pending
112
+ D->>SF: 读取 Pending 条目
113
+ Note over D: 过滤: retry_count > 0 且<br/>last_failed_time 距今 < 30min 的跳过
114
+ D->>D: 按 submitted_time 排序
115
+ D->>D: 取 min(pending数, 剩余并发槽位) 个
116
+
117
+ loop 每个待调度条目
118
+ D->>AZ: POST /runs (触发 pipeline)
119
+ AZ-->>D: run_id
120
+ D->>SF: status = Triggered, ci_run_id = run_id
121
+ end
122
+ D->>SF: git push
123
+ end
124
+
125
+ Note over S: ── 60s 后下一轮 ──
126
+ ```
127
+
128
+ ---
129
+
130
+ ## Retry 触发场景详解
131
+
132
+ ### 场景 1:Azure 报 `completed + failed`
133
+
134
+ **原因**:Pod 启动失败、脚本执行报错、OOM 等。
135
+
136
+ ```
137
+ 时间线:
138
+ T+0 Pending → Triggered (dispatcher 触发 CI)
139
+ T+5min Triggered → Running (Azure 报 inProgress)
140
+ T+30min Azure 报 completed+failed
141
+ → dispatcher 重置为 Pending (retry_count=1, last_failed_time=T+30min)
142
+ T+60min cooldown 未结束,跳过 (30min cooldown)
143
+ T+61min cooldown 结束��有空位 → 重新触发
144
+ ```
145
+
146
+ ### 场景 2:Azure 报 `completed + canceled`
147
+
148
+ **原因**:人工取消、Azure 调度取消。处理方式与 failed 完全一致。
149
+
150
+ ### 场景 3:Azure 报 `completed + succeeded`,但无 Finished 写回
151
+
152
+ **原因**:`upload_results_github.py` 执行失败(git push 冲突、网络问题、脚本 bug)。
153
+
154
+ ```
155
+ 时间线:
156
+ T+0 Pending → Triggered
157
+ T+5min Triggered → Running
158
+ T+3h Azure 报 succeeded,CI pipeline 已结束
159
+ 但 status 仍为 Running (upload_results_github.py 没写回 Finished)
160
+ → dispatcher 检查: triggered_time 距今 > 1h (SUCCEEDED_GRACE_HOURS)
161
+ → 重置为 Pending (retry_count=1)
162
+ ```
163
+
164
+ **为什么不立即重置?** 因为 `upload_results_github.py` 在 CI pipeline 结束前执行,
165
+ Azure 报 succeeded 时 git push 可能还在进行中,给 1 小时 grace period。
166
+
167
+ ### 场景 4:Pipeline 挂起 (Azure 持续报 `inProgress`)
168
+
169
+ **原因**:Pod 分配后进程卡死、GPU 驱动问题、无限循环。
170
+
171
+ ```
172
+ 时间线:
173
+ T+0 Pending → Triggered → Running
174
+ T+8h Azure 仍报 inProgress
175
+ → dispatcher 检查: triggered_time 距今 > 8h (MAX_ACTIVE_HOURS)
176
+ → 重置为 Pending (retry_count=1)
177
+ ```
178
+
179
+ ### 场景 5:Azure API 不可达
180
+
181
+ **原因**:Azure 服务端故障、网络分区、PAT 过期。
182
+
183
+ ```
184
+ 时间线:
185
+ T+0 Pending → Triggered (正常触发)
186
+ T+1min dispatcher 调 Azure API → 超时/连接失败
187
+ → 保持现状 (可能是临时问题)
188
+ T+6h Azure API 仍不可达
189
+ → dispatcher 检查: triggered_time 距今 > 6h
190
+ → 重置为 Pending (retry_count=1)
191
+ ```
192
+
193
+ ### 场景 6:无 ci_run_id
194
+
195
+ **原因**:触发 Azure API 调用成功返回,但写 status 文件时崩溃;或触发 API 调用本身
196
+ 超时,status 已写为 Triggered 但没拿到 run_id。
197
+
198
+ 处理方式同场景 5 (6小时超时)。
199
+
200
+ ---
201
+
202
+ ## 常量配置
203
+
204
+ | 常量 | 值 | 含义 |
205
+ |---|---|---|
206
+ | `_SUCCEEDED_GRACE_HOURS` | 1h | Azure succeeded 后等写回 Finished 的宽限期 |
207
+ | `_MAX_ACTIVE_HOURS` | 8h | 任何 active 条目的绝对超时 |
208
+ | `_API_UNREACHABLE_HOURS` | 6h | Azure API 不可达的降级超时 |
209
+ | `_RETRY_COOLDOWN_HOURS` | 0.5h (30min) | retry 回 Pending 后的冷却期 |
210
+ | `_MAX_RETRIES` | 3 | 最大重试次数,超过后标记 Failed |
211
+
212
+ ---
213
+
214
+ ## 多并发安全性
215
+
216
+ ### 保护机制
217
+
218
+ 1. **单线程调度**: APScheduler `max_instances=1 + coalesce=True`,不会有两个 scan 并行
219
+ 2. **Git 互斥**: `git_lock` (threading.Condition) 保护所有 git 操作
220
+ 3. **Non-fast-forward 保护**: 如果 CI 同时 push 了 Finished,dispatcher 的 push 会冲突
221
+ 4. **Push 重试**: dispatcher reconcile 的 push 有 3 次重试 (pull → retry push)
222
+
223
+ ### 可能的竞争场景
224
+
225
+ ```
226
+ Dispatcher CI Pipeline
227
+ │ │
228
+ ├─ git pull │
229
+ ├─ 读取 status: Running │
230
+ ├─ 查 Azure: succeeded │
231
+ ├─ 在 grace period 内,保持 Running │
232
+ │ ├─ upload_results_github.py
233
+ │ ├─ status = Finished
234
+ │ ├─ git push ✓
235
+ ├─ git push (其他条目的修改) │
236
+ │ └─ 冲突!pull → retry push ✓ │
237
+ │ │
238
+ ├─ 下次 scan: git pull │
239
+ ├─ 读取 status: Finished │
240
+ ├─ 不是 active,跳过 ✓ │
241
+ ```
242
+
243
+ **结论**:即使并发 push 冲突,数据不会丢失。最坏情况是本轮 reconcile 结果延迟到下轮生效。
244
+
245
+ ---
246
+
247
+ ## 用户不可重复提交
248
+
249
+ 即使模型被标记为 `Failed`(retry 耗尽),用户也不能重新提交同一模型。
250
+ `already_submitted_models()` 扫描 `status/` 和 `pending_requests/` 下所有 JSON 文件,
251
+ 不区分状态。Failed 模型需要管理员人工介入处理。
src/ci_dispatcher.py CHANGED
@@ -14,10 +14,14 @@ Concurrency control:
14
 
15
  Active status reconciliation:
16
  Each scan cycle queries the Azure DevOps Runs API for entries that have a
17
- ``ci_run_id``. The real pipeline state is mapped back to local status so
18
- that failed/canceled runs are detected immediately instead of waiting for a
19
- fixed timeout. A short fallback timeout (``_STALE_FALLBACK_HOURS``) still
20
- exists for edge cases where the Azure API is unreachable.
 
 
 
 
21
  """
22
 
23
  import base64
@@ -39,9 +43,23 @@ _AZP_BRANCH = "main"
39
  # Statuses that count towards active concurrency slots
40
  _ACTIVE_STATUSES = frozenset({"Running", "Waiting", "Triggered"})
41
 
42
- # Fallback: if Azure API is unreachable for this long, reset entry to Pending.
43
- # This is a safety net, NOT the primary reconciliation mechanism.
44
- _STALE_FALLBACK_HOURS = 6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
 
46
  # Maximum number of pipeline retries before marking an entry as Failed.
47
  _MAX_RETRIES = 3
@@ -188,13 +206,32 @@ class CIDispatcher:
188
  return running, triggered, pending
189
 
190
  def _find_pending(self) -> list[tuple[str, dict]]:
191
- """Return ``[(status_file_path, entry_dict), ...]`` sorted oldest-first."""
 
 
 
 
192
  pending = []
193
  for fpath, data in self._iter_status_files():
194
  if (
195
  data.get("status") == "Pending"
196
  and data.get("script") in ("auto_quant", "auto_eval")
197
  ):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
198
  pending.append((fpath, data))
199
  pending.sort(key=lambda x: x[1].get("submitted_time", ""))
200
  return pending
@@ -327,17 +364,16 @@ class CIDispatcher:
327
  def _reconcile_active_runs(self):
328
  """Query Azure for all active entries and sync local status to reality.
329
 
330
- For each status file in Triggered/Waiting/Running that carries a
331
- ``ci_run_id``, we call the Azure Runs API to get the real pipeline state
332
- and map it back to a local status update.
333
-
334
- Fallback: entries whose Azure status cannot be determined AND whose
335
- ``triggered_time`` exceeds ``_STALE_FALLBACK_HOURS`` are reset to
336
- Pending as a safety net.
337
  """
338
  active_entries: list[tuple[str, dict]] = []
339
  for fpath, data in self._iter_status_files():
340
- if data.get("status") in _ACTIVE_STATUSES and data.get("ci_run_id"):
341
  active_entries.append((fpath, data))
342
 
343
  if not active_entries:
@@ -346,51 +382,90 @@ class CIDispatcher:
346
  updates: list[tuple[str, dict]] = [] # entries that need git write-back
347
 
348
  for fpath, data in active_entries:
349
- run_id = data["ci_run_id"]
350
  model = data.get("model", "?")
351
  old_status = data["status"]
 
352
 
 
 
 
 
 
 
 
 
 
 
 
 
 
353
  azure_state = self._query_azure_run(run_id)
354
 
355
  if azure_state is None:
356
  # API unreachable — apply fallback timeout
357
- new_status = self._fallback_stale_check(data)
358
- if new_status and new_status != old_status:
359
- new_status = self._apply_retry_limit(data, new_status, model)
360
- data["status"] = new_status
361
- if new_status == "Pending":
362
- data.pop("ci_run_id", None)
363
- data.pop("triggered_time", None)
364
- elif new_status == "Failed":
365
- data.pop("ci_run_id", None)
366
- data.pop("triggered_time", None)
367
  updates.append((fpath, data))
368
  logger.warning(
369
- "[dispatcher] Fallback: %s → %s (Azure unreachable)",
370
- model, new_status,
371
  )
372
  continue
373
 
374
- # Map Azure state → local status
375
- new_status = self._map_azure_state(azure_state)
376
-
377
- if new_status == old_status:
378
- continue # nothing changed
379
-
380
- # Apply retry limit when resetting to Pending
381
- if new_status == "Pending":
382
- new_status = self._apply_retry_limit(data, new_status, model)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
383
 
384
- logger.info(
385
- "[dispatcher] Reconcile %s: %s → %s (azure: state=%s, result=%s)",
386
- model, old_status, new_status,
387
- azure_state.get("state"), azure_state.get("result"),
388
- )
389
- data["status"] = new_status
390
- if new_status in ("Pending", "Failed"):
391
- data.pop("ci_run_id", None)
392
- data.pop("triggered_time", None)
393
- updates.append((fpath, data))
 
 
 
 
 
 
 
 
 
394
 
395
  if not updates:
396
  return
@@ -410,10 +485,26 @@ class CIDispatcher:
410
  self._repo.index.commit(
411
  f"[dispatcher] Reconcile {len(staged)} entries from Azure status"
412
  )
413
- self._repo.remotes.origin.push(self._repo.active_branch.name)
414
- logger.info(
415
- "[dispatcher] Pushed reconciled status for %d entries", len(staged)
416
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
417
  except Exception:
418
  logger.error(
419
  "[dispatcher] Failed to push reconciled status updates", exc_info=True
@@ -444,49 +535,32 @@ class CIDispatcher:
444
  return None
445
 
446
  @staticmethod
447
- def _map_azure_state(azure_state: dict) -> str:
448
- """Map Azure pipeline run state/result to local status string.
449
-
450
- Azure states:
451
- state: "inProgress" | "completed" | "canceling"
452
- result (when completed): "succeeded" | "failed" | "canceled"
453
-
454
- Local mapping:
455
- inProgress → Running (CI is actively executing)
456
- completed+succeeded → Running (keep Running; CI template will write Finished)
457
- completed+failed → Pending (reset for retry)
458
- completed+canceled → Pending (reset for retry)
459
- canceling → Running (still active, wait for completion)
460
- """
461
- state = azure_state.get("state", "")
462
- result = azure_state.get("result", "")
463
-
464
- if state == _AZP_STATE_COMPLETED:
465
- if result == _AZP_RESULT_SUCCEEDED:
466
- # Don't touch — the CI pipeline itself writes Finished via
467
- # git-status-template.yml. Keep as Running so dispatcher
468
- # doesn't re-trigger.
469
- return "Running"
470
- # failed or canceled → reset to Pending for retry
471
- return "Pending"
472
-
473
- # inProgress / canceling → mark as Running
474
- return "Running"
475
-
476
- @staticmethod
477
- def _fallback_stale_check(data: dict) -> str | None:
478
- """Return new status if stale beyond fallback threshold, else None."""
479
  tstr = data.get("triggered_time", "")
480
  if not tstr:
481
  return None
482
  try:
483
- triggered_at = datetime.strptime(tstr, "%Y-%m-%dT%H:%M:%SZ")
484
  except ValueError:
485
  return None
486
- threshold = datetime.utcnow() - timedelta(hours=_STALE_FALLBACK_HOURS)
487
- if triggered_at < threshold:
488
- return "Pending"
489
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
490
 
491
  @staticmethod
492
  def _apply_retry_limit(data: dict, candidate_status: str, model: str) -> str:
 
14
 
15
  Active status reconciliation:
16
  Each scan cycle queries the Azure DevOps Runs API for entries that have a
17
+ ``ci_run_id``. The real pipeline state is mapped back to local status.
18
+ Time-based guards cover ALL failure modes:
19
+
20
+ 1. Azure ``completed+failed/canceled`` retry (immediate)
21
+ 2. Azure ``completed+succeeded`` but no write-back after grace period → retry
22
+ 3. Azure ``inProgress`` beyond max runtime → retry (pipeline hung)
23
+ 4. Azure API unreachable beyond fallback timeout → retry
24
+ 5. All retries go through ``_MAX_RETRIES`` limit → ``Failed``
25
  """
26
 
27
  import base64
 
43
  # Statuses that count towards active concurrency slots
44
  _ACTIVE_STATUSES = frozenset({"Running", "Waiting", "Triggered"})
45
 
46
+ # ── Timeout & retry constants ────────────────────────────────────────────
47
+
48
+ # After Azure reports succeeded, wait this long for CI to write back Finished.
49
+ # If status is still Running after this, assume write-back failed and retry.
50
+ _SUCCEEDED_GRACE_HOURS = 1
51
+
52
+ # Absolute max hours any entry can stay in Running/Triggered/Waiting.
53
+ # Covers pipelines that hang indefinitely (Azure shows inProgress forever).
54
+ _MAX_ACTIVE_HOURS = 8
55
+
56
+ # Fallback when Azure API is unreachable — if triggered_time exceeds this,
57
+ # assume something went wrong and reset.
58
+ _API_UNREACHABLE_HOURS = 6
59
+
60
+ # Cooldown: minimum hours between retries of the same entry.
61
+ # Prevents rapid-fire retries for deterministic failures (OOM, bad model).
62
+ _RETRY_COOLDOWN_HOURS = 0.5 # 30 minutes
63
 
64
  # Maximum number of pipeline retries before marking an entry as Failed.
65
  _MAX_RETRIES = 3
 
206
  return running, triggered, pending
207
 
208
  def _find_pending(self) -> list[tuple[str, dict]]:
209
+ """Return ``[(status_file_path, entry_dict), ...]`` sorted oldest-first.
210
+
211
+ Entries that have been retried recently (within ``_RETRY_COOLDOWN_HOURS``)
212
+ are excluded so deterministic failures don't burn through retries instantly.
213
+ """
214
  pending = []
215
  for fpath, data in self._iter_status_files():
216
  if (
217
  data.get("status") == "Pending"
218
  and data.get("script") in ("auto_quant", "auto_eval")
219
  ):
220
+ # Enforce cooldown for retried entries
221
+ if data.get("retry_count", 0) > 0:
222
+ lft = data.get("last_failed_time", "")
223
+ if lft:
224
+ try:
225
+ failed_at = datetime.strptime(lft, "%Y-%m-%dT%H:%M:%SZ")
226
+ if self._hours_since(failed_at) < _RETRY_COOLDOWN_HOURS:
227
+ logger.debug(
228
+ "[dispatcher] %s: cooldown (%.0f min remaining)",
229
+ data.get("model", "?"),
230
+ (_RETRY_COOLDOWN_HOURS - self._hours_since(failed_at)) * 60,
231
+ )
232
+ continue
233
+ except ValueError:
234
+ pass # bad timestamp, proceed with dispatch
235
  pending.append((fpath, data))
236
  pending.sort(key=lambda x: x[1].get("submitted_time", ""))
237
  return pending
 
364
  def _reconcile_active_runs(self):
365
  """Query Azure for all active entries and sync local status to reality.
366
 
367
+ Handles ALL failure scenarios:
368
+ 1. Azure completed+failed/canceled retry immediately
369
+ 2. Azure completed+succeeded but no Finished write-back retry after grace period
370
+ 3. Azure inProgress beyond max runtime → retry (pipeline hung)
371
+ 4. Azure API unreachable beyond fallback timeout → retry
372
+ 5. Entries without ci_run_id stuck in active state → retry after timeout
 
373
  """
374
  active_entries: list[tuple[str, dict]] = []
375
  for fpath, data in self._iter_status_files():
376
+ if data.get("status") in _ACTIVE_STATUSES:
377
  active_entries.append((fpath, data))
378
 
379
  if not active_entries:
 
382
  updates: list[tuple[str, dict]] = [] # entries that need git write-back
383
 
384
  for fpath, data in active_entries:
385
+ run_id = data.get("ci_run_id")
386
  model = data.get("model", "?")
387
  old_status = data["status"]
388
+ triggered_at = self._parse_triggered_time(data)
389
 
390
+ # ── No ci_run_id (e.g. Triggered but API call never returned) ──
391
+ if not run_id:
392
+ if triggered_at and self._hours_since(triggered_at) > _API_UNREACHABLE_HOURS:
393
+ new_status = self._apply_retry_limit(data, "Pending", model)
394
+ self._reset_active_fields(data, new_status)
395
+ updates.append((fpath, data))
396
+ logger.warning(
397
+ "[dispatcher] %s: no ci_run_id, stale %s → %s",
398
+ model, old_status, new_status,
399
+ )
400
+ continue
401
+
402
+ # ── Query Azure ──
403
  azure_state = self._query_azure_run(run_id)
404
 
405
  if azure_state is None:
406
  # API unreachable — apply fallback timeout
407
+ if triggered_at and self._hours_since(triggered_at) > _API_UNREACHABLE_HOURS:
408
+ new_status = self._apply_retry_limit(data, "Pending", model)
409
+ self._reset_active_fields(data, new_status)
 
 
 
 
 
 
 
410
  updates.append((fpath, data))
411
  logger.warning(
412
+ "[dispatcher] Fallback: %s → %s (Azure unreachable for %.1fh)",
413
+ model, new_status, self._hours_since(triggered_at),
414
  )
415
  continue
416
 
417
+ state = azure_state.get("state", "")
418
+ result = azure_state.get("result", "")
419
+
420
+ # ── Azure: completed ──
421
+ if state == _AZP_STATE_COMPLETED:
422
+ if result == _AZP_RESULT_SUCCEEDED:
423
+ # CI pipeline succeeded. Normally upload_results_github.py
424
+ # writes back Finished. Give it a grace period.
425
+ if triggered_at and self._hours_since(triggered_at) > _SUCCEEDED_GRACE_HOURS:
426
+ # Grace period expired — write-back likely failed
427
+ new_status = self._apply_retry_limit(data, "Pending", model)
428
+ self._reset_active_fields(data, new_status)
429
+ updates.append((fpath, data))
430
+ logger.warning(
431
+ "[dispatcher] %s: Azure succeeded but no Finished write-back "
432
+ "after %.1fh — %s → %s",
433
+ model, self._hours_since(triggered_at), old_status, new_status,
434
+ )
435
+ elif old_status != "Running":
436
+ # Within grace period, ensure status is Running
437
+ data["status"] = "Running"
438
+ updates.append((fpath, data))
439
+ else:
440
+ # failed or canceled → retry immediately
441
+ new_status = self._apply_retry_limit(data, "Pending", model)
442
+ self._reset_active_fields(data, new_status)
443
+ updates.append((fpath, data))
444
+ logger.info(
445
+ "[dispatcher] %s: Azure %s → %s → %s",
446
+ model, result, old_status, new_status,
447
+ )
448
+ continue
449
 
450
+ # ── Azure: inProgress / canceling ──
451
+ # Check absolute max runtime
452
+ if triggered_at and self._hours_since(triggered_at) > _MAX_ACTIVE_HOURS:
453
+ new_status = self._apply_retry_limit(data, "Pending", model)
454
+ self._reset_active_fields(data, new_status)
455
+ updates.append((fpath, data))
456
+ logger.warning(
457
+ "[dispatcher] %s: exceeded max runtime (%.1fh > %dh) — %s → %s",
458
+ model, self._hours_since(triggered_at), _MAX_ACTIVE_HOURS,
459
+ old_status, new_status,
460
+ )
461
+ elif old_status != "Running":
462
+ # Azure shows inProgress, update local to Running
463
+ data["status"] = "Running"
464
+ updates.append((fpath, data))
465
+ logger.info(
466
+ "[dispatcher] %s: Azure inProgress — %s → Running",
467
+ model, old_status,
468
+ )
469
 
470
  if not updates:
471
  return
 
485
  self._repo.index.commit(
486
  f"[dispatcher] Reconcile {len(staged)} entries from Azure status"
487
  )
488
+ # Push with retry: CI may have pushed concurrently
489
+ branch = self._repo.active_branch.name
490
+ for attempt in range(3):
491
+ try:
492
+ self._repo.remotes.origin.push(branch)
493
+ logger.info(
494
+ "[dispatcher] Pushed reconciled status for %d entries",
495
+ len(staged),
496
+ )
497
+ break
498
+ except Exception:
499
+ if attempt < 2:
500
+ logger.warning(
501
+ "[dispatcher] Push conflict (attempt %d/3), "
502
+ "pulling and retrying",
503
+ attempt + 1,
504
+ )
505
+ self._repo.remotes.origin.pull(branch)
506
+ else:
507
+ raise
508
  except Exception:
509
  logger.error(
510
  "[dispatcher] Failed to push reconciled status updates", exc_info=True
 
535
  return None
536
 
537
  @staticmethod
538
+ def _parse_triggered_time(data: dict) -> datetime | None:
539
+ """Parse triggered_time string to datetime, or None."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
540
  tstr = data.get("triggered_time", "")
541
  if not tstr:
542
  return None
543
  try:
544
+ return datetime.strptime(tstr, "%Y-%m-%dT%H:%M:%SZ")
545
  except ValueError:
546
  return None
547
+
548
+ @staticmethod
549
+ def _hours_since(dt: datetime) -> float:
550
+ """Return hours elapsed since the given UTC datetime."""
551
+ return (datetime.utcnow() - dt).total_seconds() / 3600
552
+
553
+ @staticmethod
554
+ def _reset_active_fields(data: dict, new_status: str):
555
+ """Set new status and clean up active-state fields."""
556
+ data["status"] = new_status
557
+ if new_status in ("Pending", "Failed"):
558
+ data.pop("ci_run_id", None)
559
+ data.pop("triggered_time", None)
560
+ # Record when this reset happened for cooldown enforcement
561
+ data["last_failed_time"] = (
562
+ datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
563
+ )
564
 
565
  @staticmethod
566
  def _apply_retry_limit(data: dict, candidate_status: str, model: str) -> str:
src/queue_eta.py CHANGED
@@ -132,8 +132,9 @@ def compute_single_eta(
132
  elif status == "Pending" and script in ("auto_quant", "auto_eval"):
133
  pending_count += 1
134
 
135
- # The new submission will be at position = pending_count + 1
136
- queue_pos = pending_count + 1
 
137
  task_hours = estimate_task_hours(model_params)
138
 
139
  if active_count > 0:
 
132
  elif status == "Pending" and script in ("auto_quant", "auto_eval"):
133
  pending_count += 1
134
 
135
+ # The model's own status file has already been written by _upload_to_hub,
136
+ # so it is already included in pending_count.
137
+ queue_pos = max(pending_count, 1)
138
  task_hours = estimate_task_hours(model_params)
139
 
140
  if active_count > 0:
tests/test_ci_dispatcher.py CHANGED
@@ -30,12 +30,15 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
30
  from src.ci_dispatcher import (
31
  CIDispatcher,
32
  _ACTIVE_STATUSES,
 
33
  _AZP_RESULT_CANCELED,
34
  _AZP_RESULT_FAILED,
35
  _AZP_RESULT_SUCCEEDED,
36
  _AZP_STATE_COMPLETED,
 
37
  _MAX_RETRIES,
38
- _STALE_FALLBACK_HOURS,
 
39
  )
40
 
41
 
@@ -98,52 +101,162 @@ def _mock_azure_response(status_code=200, body=None):
98
 
99
 
100
  # ═══════════════════════════════════════════════════════════════════════
101
- # 1. _map_azure_state
102
  # ═══════════════════════════════════════════════════════════════════════
103
 
104
- class TestMapAzureState:
105
- """Azure (state, result) local status string."""
106
 
107
- @pytest.mark.parametrize("state, result, expected", [
108
- ("inProgress", "", "Running"),
109
- ("canceling", "", "Running"),
110
- ("completed", "succeeded", "Running"), # CI template writes Finished
111
- ("completed", "failed", "Pending"),
112
- ("completed", "canceled", "Pending"),
113
- ("unknown", "", "Running"), # fallback for unknown
114
- ])
115
- def test_mapping(self, state, result, expected):
116
- assert CIDispatcher._map_azure_state({"state": state, "result": result}) == expected
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
 
118
 
119
  # ═══════════════════════════════════════════════════════════════════════
120
- # 2. _fallback_stale_check
121
  # ═══════════════════════════════════════════════════════════════════════
122
 
123
- class TestFallbackStaleCheck:
124
- """Fallback when Azure API is unreachable."""
125
 
126
- def test_no_triggered_time(self):
127
- assert CIDispatcher._fallback_stale_check({"status": "Triggered"}) is None
 
 
128
 
129
- def test_recent_entry_stays(self):
130
- data = {"status": "Triggered", "triggered_time": _old_str(1)}
131
- assert CIDispatcher._fallback_stale_check(data) is None
 
132
 
133
- def test_exactly_at_threshold_stays(self):
134
- data = {"status": "Triggered", "triggered_time": _old_str(_STALE_FALLBACK_HOURS)}
135
- # at boundary not strictly less-than
136
- result = CIDispatcher._fallback_stale_check(data)
137
- # might be None or Pending depending on seconds; either is acceptable
138
- assert result in (None, "Pending")
 
 
 
 
 
 
139
 
140
- def test_stale_entry_resets(self):
141
- data = {"status": "Triggered", "triggered_time": _old_str(_STALE_FALLBACK_HOURS + 2)}
142
- assert CIDispatcher._fallback_stale_check(data) == "Pending"
 
 
 
 
 
 
 
 
 
 
 
143
 
144
- def test_bad_timestamp(self):
145
- data = {"status": "Triggered", "triggered_time": "garbage"}
146
- assert CIDispatcher._fallback_stale_check(data) is None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
 
148
 
149
  # ═══════════════════════════════════════════════════════════════════════
@@ -303,7 +416,7 @@ class TestReconcileActiveRuns:
303
  def test_unreachable_stale_entry_resets(self):
304
  fp = _write_status(self._status_dir, "org", "h.json", {
305
  "model": "org/h", "status": "Triggered", "script": "auto_eval",
306
- "ci_run_id": 90, "triggered_time": _old_str(_STALE_FALLBACK_HOURS + 2),
307
  })
308
  d = self._dispatcher()
309
  with patch("src.ci_dispatcher.http_requests.get", side_effect=ConnectionError):
@@ -313,12 +426,13 @@ class TestReconcileActiveRuns:
313
  assert "ci_run_id" not in data
314
  assert "triggered_time" not in data
315
 
316
- # -- Entries without ci_run_id are skipped ------------------------------
317
 
318
- def test_pending_without_run_id_untouched(self):
319
- fp = _write_status(self._status_dir, "org", "i.json", {
320
- "model": "org/i", "status": "Pending", "script": "auto_quant",
321
- "submitted_time": "2026-04-01T00:00:00Z",
 
322
  })
323
  d = self._dispatcher()
324
  with patch("src.ci_dispatcher.http_requests.get") as mock_get:
@@ -488,6 +602,41 @@ class TestFindPending:
488
  d._repo.working_dir = self._tmpdir
489
  assert d._find_pending() == []
490
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
491
 
492
  # ═══════════════════════════════════════════════════════════════════════
493
  # 7. _batch_update_status
@@ -744,7 +893,7 @@ class TestRetryLimitIntegration:
744
  def test_fallback_stale_also_counts_retry(self):
745
  fp = _write_status(self._status_dir, "org", "f.json", {
746
  "model": "org/f", "status": "Triggered", "script": "auto_quant",
747
- "ci_run_id": 50, "triggered_time": _old_str(_STALE_FALLBACK_HOURS + 2),
748
  "retry_count": 2,
749
  })
750
  d = self._dispatcher()
 
30
  from src.ci_dispatcher import (
31
  CIDispatcher,
32
  _ACTIVE_STATUSES,
33
+ _API_UNREACHABLE_HOURS,
34
  _AZP_RESULT_CANCELED,
35
  _AZP_RESULT_FAILED,
36
  _AZP_RESULT_SUCCEEDED,
37
  _AZP_STATE_COMPLETED,
38
+ _MAX_ACTIVE_HOURS,
39
  _MAX_RETRIES,
40
+ _RETRY_COOLDOWN_HOURS,
41
+ _SUCCEEDED_GRACE_HOURS,
42
  )
43
 
44
 
 
101
 
102
 
103
  # ═══════════════════════════════════════════════════════════════════════
104
+ # 1. _parse_triggered_time / _hours_since / _reset_active_fields
105
  # ═══════════════════════════════════════════════════════════════════════
106
 
107
+ class TestHelpers:
108
+ """Test the static helper methods on CIDispatcher."""
109
 
110
+ def test_parse_triggered_time_valid(self):
111
+ data = {"triggered_time": "2026-04-01T12:00:00Z"}
112
+ result = CIDispatcher._parse_triggered_time(data)
113
+ assert result == datetime(2026, 4, 1, 12, 0, 0)
114
+
115
+ def test_parse_triggered_time_missing(self):
116
+ assert CIDispatcher._parse_triggered_time({}) is None
117
+
118
+ def test_parse_triggered_time_garbage(self):
119
+ assert CIDispatcher._parse_triggered_time({"triggered_time": "garbage"}) is None
120
+
121
+ def test_hours_since(self):
122
+ two_hours_ago = datetime.utcnow() - timedelta(hours=2)
123
+ h = CIDispatcher._hours_since(two_hours_ago)
124
+ assert 1.9 < h < 2.1
125
+
126
+ def test_reset_active_fields_pending(self):
127
+ data = {"status": "Running", "ci_run_id": 42, "triggered_time": "2026-04-01T00:00:00Z", "model": "a/b"}
128
+ CIDispatcher._reset_active_fields(data, "Pending")
129
+ assert data["status"] == "Pending"
130
+ assert "ci_run_id" not in data
131
+ assert "triggered_time" not in data
132
+ assert "last_failed_time" in data # records when reset happened
133
+ assert data["model"] == "a/b" # preserved
134
+
135
+ def test_reset_active_fields_failed(self):
136
+ data = {"status": "Running", "ci_run_id": 42, "triggered_time": "x"}
137
+ CIDispatcher._reset_active_fields(data, "Failed")
138
+ assert data["status"] == "Failed"
139
+ assert "ci_run_id" not in data
140
+ assert "last_failed_time" in data
141
+
142
+ def test_reset_active_fields_running_keeps_fields(self):
143
+ data = {"status": "Triggered", "ci_run_id": 42, "triggered_time": "x"}
144
+ CIDispatcher._reset_active_fields(data, "Running")
145
+ assert data["status"] == "Running"
146
+ assert data["ci_run_id"] == 42 # not removed
147
 
148
 
149
  # ═══════════════════════════════════════════════════════════════════════
150
+ # 2. Reconcile: succeeded grace period & max active timeout
151
  # ═══════════════════════════════════════════════════════════════════════
152
 
153
+ class TestReconcileTimeouts:
154
+ """Test the time-based guards added to _reconcile_active_runs."""
155
 
156
+ def setup_method(self):
157
+ self._tmpdir = tempfile.mkdtemp(prefix="test_timeout_")
158
+ self._status_dir = os.path.join(self._tmpdir, "status")
159
+ os.makedirs(self._status_dir)
160
 
161
+ def _dispatcher(self, **kw):
162
+ d = _make_dispatcher(self._status_dir, **kw)
163
+ d._repo.working_dir = self._tmpdir
164
+ return d
165
 
166
+ def test_succeeded_within_grace_period_stays_running(self):
167
+ """Azure succeeded recently — stay Running, wait for write-back."""
168
+ fp = _write_status(self._status_dir, "org", "a.json", {
169
+ "model": "org/a", "status": "Running", "script": "auto_quant",
170
+ "ci_run_id": 10, "triggered_time": _now_str(),
171
+ })
172
+ d = self._dispatcher()
173
+ resp = _mock_azure_response(200, {"state": "completed", "result": "succeeded"})
174
+ with patch("src.ci_dispatcher.http_requests.get", return_value=resp):
175
+ d._reconcile_active_runs()
176
+ assert _read_status(fp)["status"] == "Running"
177
+ assert _read_status(fp)["ci_run_id"] == 10
178
 
179
+ def test_succeeded_past_grace_period_resets(self):
180
+ """Azure succeeded long ago but no Finished write-back — retry."""
181
+ fp = _write_status(self._status_dir, "org", "b.json", {
182
+ "model": "org/b", "status": "Running", "script": "auto_quant",
183
+ "ci_run_id": 20, "triggered_time": _old_str(_SUCCEEDED_GRACE_HOURS + 1),
184
+ })
185
+ d = self._dispatcher()
186
+ resp = _mock_azure_response(200, {"state": "completed", "result": "succeeded"})
187
+ with patch("src.ci_dispatcher.http_requests.get", return_value=resp):
188
+ d._reconcile_active_runs()
189
+ data = _read_status(fp)
190
+ assert data["status"] == "Pending"
191
+ assert "ci_run_id" not in data
192
+ assert data["retry_count"] == 1
193
 
194
+ def test_in_progress_within_max_active_stays(self):
195
+ """Azure inProgress within max active hours — keep Running."""
196
+ fp = _write_status(self._status_dir, "org", "c.json", {
197
+ "model": "org/c", "status": "Running", "script": "auto_quant",
198
+ "ci_run_id": 30, "triggered_time": _old_str(2),
199
+ })
200
+ d = self._dispatcher()
201
+ resp = _mock_azure_response(200, {"state": "inProgress", "result": ""})
202
+ with patch("src.ci_dispatcher.http_requests.get", return_value=resp):
203
+ d._reconcile_active_runs()
204
+ assert _read_status(fp)["status"] == "Running"
205
+
206
+ def test_in_progress_past_max_active_resets(self):
207
+ """Azure inProgress beyond max active hours — pipeline hung, retry."""
208
+ fp = _write_status(self._status_dir, "org", "d.json", {
209
+ "model": "org/d", "status": "Running", "script": "auto_quant",
210
+ "ci_run_id": 40, "triggered_time": _old_str(_MAX_ACTIVE_HOURS + 1),
211
+ })
212
+ d = self._dispatcher()
213
+ resp = _mock_azure_response(200, {"state": "inProgress", "result": ""})
214
+ with patch("src.ci_dispatcher.http_requests.get", return_value=resp):
215
+ d._reconcile_active_runs()
216
+ data = _read_status(fp)
217
+ assert data["status"] == "Pending"
218
+ assert data["retry_count"] == 1
219
+
220
+ def test_no_ci_run_id_stale_resets(self):
221
+ """Entry stuck in Triggered without ci_run_id — API call never returned."""
222
+ fp = _write_status(self._status_dir, "org", "e.json", {
223
+ "model": "org/e", "status": "Triggered", "script": "auto_quant",
224
+ "triggered_time": _old_str(_API_UNREACHABLE_HOURS + 1),
225
+ })
226
+ d = self._dispatcher()
227
+ with patch("src.ci_dispatcher.http_requests.get") as mock_get:
228
+ d._reconcile_active_runs()
229
+ mock_get.assert_not_called() # no ci_run_id, no API call
230
+ data = _read_status(fp)
231
+ assert data["status"] == "Pending"
232
+ assert data["retry_count"] == 1
233
+
234
+ def test_no_ci_run_id_recent_stays(self):
235
+ """Entry recently Triggered without ci_run_id — still within timeout."""
236
+ fp = _write_status(self._status_dir, "org", "f.json", {
237
+ "model": "org/f", "status": "Triggered", "script": "auto_quant",
238
+ "triggered_time": _now_str(),
239
+ })
240
+ d = self._dispatcher()
241
+ with patch("src.ci_dispatcher.http_requests.get") as mock_get:
242
+ d._reconcile_active_runs()
243
+ mock_get.assert_not_called()
244
+ assert _read_status(fp)["status"] == "Triggered"
245
+
246
+ def test_succeeded_grace_period_exhausts_retries(self):
247
+ """Succeeded but stuck after multiple retries → Failed."""
248
+ fp = _write_status(self._status_dir, "org", "g.json", {
249
+ "model": "org/g", "status": "Running", "script": "auto_quant",
250
+ "ci_run_id": 70, "triggered_time": _old_str(_SUCCEEDED_GRACE_HOURS + 1),
251
+ "retry_count": 2,
252
+ })
253
+ d = self._dispatcher()
254
+ resp = _mock_azure_response(200, {"state": "completed", "result": "succeeded"})
255
+ with patch("src.ci_dispatcher.http_requests.get", return_value=resp):
256
+ d._reconcile_active_runs()
257
+ data = _read_status(fp)
258
+ assert data["status"] == "Failed"
259
+ assert data["retry_count"] == 3
260
 
261
 
262
  # ═══════════════════════════════════════════════════════════════════════
 
416
  def test_unreachable_stale_entry_resets(self):
417
  fp = _write_status(self._status_dir, "org", "h.json", {
418
  "model": "org/h", "status": "Triggered", "script": "auto_eval",
419
+ "ci_run_id": 90, "triggered_time": _old_str(_API_UNREACHABLE_HOURS + 2),
420
  })
421
  d = self._dispatcher()
422
  with patch("src.ci_dispatcher.http_requests.get", side_effect=ConnectionError):
 
426
  assert "ci_run_id" not in data
427
  assert "triggered_time" not in data
428
 
429
+ # -- Entries without ci_run_id: active ones get timeout check -----------
430
 
431
+ def test_active_without_run_id_gets_timeout_check(self):
432
+ """Active entry with no ci_run_id but stale → resets to Pending."""
433
+ fp = _write_status(self._status_dir, "org", "norun.json", {
434
+ "model": "org/norun", "status": "Triggered", "script": "auto_quant",
435
+ "triggered_time": _old_str(_API_UNREACHABLE_HOURS + 1),
436
  })
437
  d = self._dispatcher()
438
  with patch("src.ci_dispatcher.http_requests.get") as mock_get:
 
602
  d._repo.working_dir = self._tmpdir
603
  assert d._find_pending() == []
604
 
605
+ def test_cooldown_excludes_recent_retry(self):
606
+ """Retried entry within cooldown period is NOT picked up."""
607
+ _write_status(self._status_dir, "a", "retry.json", {
608
+ "model": "a/retry", "status": "Pending", "script": "auto_quant",
609
+ "submitted_time": "2026-04-01T00:00:00Z",
610
+ "retry_count": 1,
611
+ "last_failed_time": _now_str(),
612
+ })
613
+ d = _make_dispatcher(self._status_dir)
614
+ d._repo.working_dir = self._tmpdir
615
+ assert d._find_pending() == []
616
+
617
+ def test_cooldown_allows_old_retry(self):
618
+ """Retried entry past cooldown period IS picked up."""
619
+ _write_status(self._status_dir, "a", "retry.json", {
620
+ "model": "a/retry", "status": "Pending", "script": "auto_quant",
621
+ "submitted_time": "2026-04-01T00:00:00Z",
622
+ "retry_count": 1,
623
+ "last_failed_time": _old_str(_RETRY_COOLDOWN_HOURS + 1),
624
+ })
625
+ d = _make_dispatcher(self._status_dir)
626
+ d._repo.working_dir = self._tmpdir
627
+ pending = d._find_pending()
628
+ assert len(pending) == 1
629
+
630
+ def test_first_time_pending_has_no_cooldown(self):
631
+ """Fresh Pending entry (no retry_count) is always picked up."""
632
+ _write_status(self._status_dir, "a", "fresh.json", {
633
+ "model": "a/fresh", "status": "Pending", "script": "auto_quant",
634
+ "submitted_time": "2026-04-01T00:00:00Z",
635
+ })
636
+ d = _make_dispatcher(self._status_dir)
637
+ d._repo.working_dir = self._tmpdir
638
+ assert len(d._find_pending()) == 1
639
+
640
 
641
  # ═══════════════════════════════════════════════════════════════════════
642
  # 7. _batch_update_status
 
893
  def test_fallback_stale_also_counts_retry(self):
894
  fp = _write_status(self._status_dir, "org", "f.json", {
895
  "model": "org/f", "status": "Triggered", "script": "auto_quant",
896
+ "ci_run_id": 50, "triggered_time": _old_str(_API_UNREACHABLE_HOURS + 2),
897
  "retry_count": 2,
898
  })
899
  d = self._dispatcher()