| from __future__ import annotations |
|
|
| import json |
| import os |
| import sys |
| import tempfile |
| from pathlib import Path |
|
|
| from fastapi.testclient import TestClient |
|
|
| _repo_root = Path(__file__).resolve().parents[2] |
| if str(_repo_root) not in sys.path: |
| sys.path.insert(0, str(_repo_root)) |
|
|
| from Spider_XHS.service.app import create_app |
| from Spider_XHS.service.errors import ERROR_CODE_SUCCESS |
| from Spider_XHS.service.tasks import TaskStatus |
|
|
|
|
| def _assert(cond: bool, msg: str) -> None: |
| if not cond: |
| raise AssertionError(msg) |
|
|
|
|
| def verify_storage_migration_and_layout() -> None: |
| os.environ["ENABLE_LEGACY_ROUTES"] = "0" |
| with tempfile.TemporaryDirectory() as td: |
| root = Path(td) |
| tasks_dir = root / "tasks" |
| tasks_dir.mkdir(parents=True, exist_ok=True) |
| (tasks_dir / "t_existing.json").write_text( |
| json.dumps( |
| { |
| "id": "t_existing", |
| "status": "running", |
| "task_type": "selftest", |
| "target": "", |
| "payload": {"operator": "tester"}, |
| "engine": "selftest", |
| "callback": None, |
| "created": 1.0, |
| "started": 2.0, |
| "finished": None, |
| "retry_count": 0, |
| "error": None, |
| }, |
| ensure_ascii=False, |
| indent=2, |
| ), |
| encoding="utf-8", |
| ) |
|
|
| legacy_tasks = { |
| "t_existing": { |
| "id": "t_existing", |
| "status": "queued", |
| "task_type": "selftest", |
| "payload": {"operator": "tester"}, |
| "engine": "selftest", |
| "callback": None, |
| "created_at": 1.0, |
| "updated_at": 1.0, |
| "error": None, |
| }, |
| "t_new": { |
| "id": "t_new", |
| "status": "queued", |
| "task_type": "selftest", |
| "payload": {"operator": "tester"}, |
| "engine": "selftest", |
| "callback": None, |
| "created_at": 3.0, |
| "updated_at": 3.0, |
| "error": None, |
| }, |
| } |
| (root / "tasks.json").write_text( |
| json.dumps(legacy_tasks, ensure_ascii=False, indent=2), |
| encoding="utf-8", |
| ) |
|
|
| os.environ["STORAGE_ROOT"] = td |
| app = create_app() |
| with TestClient(app) as client: |
| resp = client.get("/api/v1/health") |
| _assert(resp.status_code == 200, f"expected 200, got={resp.status_code}, body={resp.text}") |
| _assert(resp.json().get("code") == ERROR_CODE_SUCCESS, "health response code mismatch") |
|
|
| _assert((root / "tasks.json.migrated").exists(), "tasks.json.migrated missing") |
| _assert(not (root / "tasks.json").exists(), "legacy tasks.json should be renamed") |
| _assert((root / "tasks" / "t_new.json").exists(), "migrated shard file missing") |
| _assert((root / "tasks" / "t_existing.json").exists(), "existing shard file missing") |
|
|
| existing = json.loads((root / "tasks" / "t_existing.json").read_text(encoding="utf-8")) |
| _assert(existing.get("status") == "running", "existing shard should not be overwritten") |
|
|
| storage = app.state.storage |
| task = storage.get_task("t_new") |
| _assert(task is not None and task.id == "t_new", "storage.get_task migrated task failed") |
|
|
| updated = storage.update_task("t_new", status=TaskStatus.succeeded) |
| _assert(updated is not None and updated.status == TaskStatus.succeeded, "storage.update_task failed") |
|
|
| storage.write_result("t_new", {"raw": {"ok": True}, "normalized": None, "meta": {"task_id": "t_new"}}) |
| _assert((root / "results" / "t_new.json").exists(), "result path mismatch") |
|
|
| storage.write_callback_failure("t_new", {"task_id": "t_new", "error": "x"}) |
| _assert((root / "callbacks" / "t_new.json").exists(), "callback failure path mismatch") |
|
|
|
|
| def main() -> None: |
| verify_storage_migration_and_layout() |
| print("ok") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|