from __future__ import annotations import json import os import sys import tempfile from pathlib import Path from fastapi.testclient import TestClient _repo_root = Path(__file__).resolve().parents[2] if str(_repo_root) not in sys.path: sys.path.insert(0, str(_repo_root)) from Spider_XHS.service.app import create_app from Spider_XHS.service.errors import ERROR_CODE_SUCCESS from Spider_XHS.service.tasks import TaskStatus def _assert(cond: bool, msg: str) -> None: if not cond: raise AssertionError(msg) def verify_storage_migration_and_layout() -> None: os.environ["ENABLE_LEGACY_ROUTES"] = "0" with tempfile.TemporaryDirectory() as td: root = Path(td) tasks_dir = root / "tasks" tasks_dir.mkdir(parents=True, exist_ok=True) (tasks_dir / "t_existing.json").write_text( json.dumps( { "id": "t_existing", "status": "running", "task_type": "selftest", "target": "", "payload": {"operator": "tester"}, "engine": "selftest", "callback": None, "created": 1.0, "started": 2.0, "finished": None, "retry_count": 0, "error": None, }, ensure_ascii=False, indent=2, ), encoding="utf-8", ) legacy_tasks = { "t_existing": { "id": "t_existing", "status": "queued", "task_type": "selftest", "payload": {"operator": "tester"}, "engine": "selftest", "callback": None, "created_at": 1.0, "updated_at": 1.0, "error": None, }, "t_new": { "id": "t_new", "status": "queued", "task_type": "selftest", "payload": {"operator": "tester"}, "engine": "selftest", "callback": None, "created_at": 3.0, "updated_at": 3.0, "error": None, }, } (root / "tasks.json").write_text( json.dumps(legacy_tasks, ensure_ascii=False, indent=2), encoding="utf-8", ) os.environ["STORAGE_ROOT"] = td app = create_app() with TestClient(app) as client: resp = client.get("/api/v1/health") _assert(resp.status_code == 200, f"expected 200, got={resp.status_code}, body={resp.text}") _assert(resp.json().get("code") == ERROR_CODE_SUCCESS, "health response code mismatch") _assert((root / "tasks.json.migrated").exists(), "tasks.json.migrated missing") _assert(not (root / "tasks.json").exists(), "legacy tasks.json should be renamed") _assert((root / "tasks" / "t_new.json").exists(), "migrated shard file missing") _assert((root / "tasks" / "t_existing.json").exists(), "existing shard file missing") existing = json.loads((root / "tasks" / "t_existing.json").read_text(encoding="utf-8")) _assert(existing.get("status") == "running", "existing shard should not be overwritten") storage = app.state.storage task = storage.get_task("t_new") _assert(task is not None and task.id == "t_new", "storage.get_task migrated task failed") updated = storage.update_task("t_new", status=TaskStatus.succeeded) _assert(updated is not None and updated.status == TaskStatus.succeeded, "storage.update_task failed") storage.write_result("t_new", {"raw": {"ok": True}, "normalized": None, "meta": {"task_id": "t_new"}}) _assert((root / "results" / "t_new.json").exists(), "result path mismatch") storage.write_callback_failure("t_new", {"task_id": "t_new", "error": "x"}) _assert((root / "callbacks" / "t_new.json").exists(), "callback failure path mismatch") def main() -> None: verify_storage_migration_and_layout() print("ok") if __name__ == "__main__": main()