XHS / verification /verify_storage_migration.py
Trae Bot
Upload Spider_XHS project
c481f8a
from __future__ import annotations
import json
import os
import sys
import tempfile
from pathlib import Path
from fastapi.testclient import TestClient
_repo_root = Path(__file__).resolve().parents[2]
if str(_repo_root) not in sys.path:
sys.path.insert(0, str(_repo_root))
from Spider_XHS.service.app import create_app
from Spider_XHS.service.errors import ERROR_CODE_SUCCESS
from Spider_XHS.service.tasks import TaskStatus
def _assert(cond: bool, msg: str) -> None:
if not cond:
raise AssertionError(msg)
def verify_storage_migration_and_layout() -> None:
os.environ["ENABLE_LEGACY_ROUTES"] = "0"
with tempfile.TemporaryDirectory() as td:
root = Path(td)
tasks_dir = root / "tasks"
tasks_dir.mkdir(parents=True, exist_ok=True)
(tasks_dir / "t_existing.json").write_text(
json.dumps(
{
"id": "t_existing",
"status": "running",
"task_type": "selftest",
"target": "",
"payload": {"operator": "tester"},
"engine": "selftest",
"callback": None,
"created": 1.0,
"started": 2.0,
"finished": None,
"retry_count": 0,
"error": None,
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
legacy_tasks = {
"t_existing": {
"id": "t_existing",
"status": "queued",
"task_type": "selftest",
"payload": {"operator": "tester"},
"engine": "selftest",
"callback": None,
"created_at": 1.0,
"updated_at": 1.0,
"error": None,
},
"t_new": {
"id": "t_new",
"status": "queued",
"task_type": "selftest",
"payload": {"operator": "tester"},
"engine": "selftest",
"callback": None,
"created_at": 3.0,
"updated_at": 3.0,
"error": None,
},
}
(root / "tasks.json").write_text(
json.dumps(legacy_tasks, ensure_ascii=False, indent=2),
encoding="utf-8",
)
os.environ["STORAGE_ROOT"] = td
app = create_app()
with TestClient(app) as client:
resp = client.get("/api/v1/health")
_assert(resp.status_code == 200, f"expected 200, got={resp.status_code}, body={resp.text}")
_assert(resp.json().get("code") == ERROR_CODE_SUCCESS, "health response code mismatch")
_assert((root / "tasks.json.migrated").exists(), "tasks.json.migrated missing")
_assert(not (root / "tasks.json").exists(), "legacy tasks.json should be renamed")
_assert((root / "tasks" / "t_new.json").exists(), "migrated shard file missing")
_assert((root / "tasks" / "t_existing.json").exists(), "existing shard file missing")
existing = json.loads((root / "tasks" / "t_existing.json").read_text(encoding="utf-8"))
_assert(existing.get("status") == "running", "existing shard should not be overwritten")
storage = app.state.storage
task = storage.get_task("t_new")
_assert(task is not None and task.id == "t_new", "storage.get_task migrated task failed")
updated = storage.update_task("t_new", status=TaskStatus.succeeded)
_assert(updated is not None and updated.status == TaskStatus.succeeded, "storage.update_task failed")
storage.write_result("t_new", {"raw": {"ok": True}, "normalized": None, "meta": {"task_id": "t_new"}})
_assert((root / "results" / "t_new.json").exists(), "result path mismatch")
storage.write_callback_failure("t_new", {"task_id": "t_new", "error": "x"})
_assert((root / "callbacks" / "t_new.json").exists(), "callback failure path mismatch")
def main() -> None:
verify_storage_migration_and_layout()
print("ok")
if __name__ == "__main__":
main()