File size: 4,264 Bytes
c481f8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from __future__ import annotations

import json
import os
import sys
import tempfile
from pathlib import Path

from fastapi.testclient import TestClient

_repo_root = Path(__file__).resolve().parents[2]
if str(_repo_root) not in sys.path:
    sys.path.insert(0, str(_repo_root))

from Spider_XHS.service.app import create_app
from Spider_XHS.service.errors import ERROR_CODE_SUCCESS
from Spider_XHS.service.tasks import TaskStatus


def _assert(cond: bool, msg: str) -> None:
    if not cond:
        raise AssertionError(msg)


def verify_storage_migration_and_layout() -> None:
    os.environ["ENABLE_LEGACY_ROUTES"] = "0"
    with tempfile.TemporaryDirectory() as td:
        root = Path(td)
        tasks_dir = root / "tasks"
        tasks_dir.mkdir(parents=True, exist_ok=True)
        (tasks_dir / "t_existing.json").write_text(
            json.dumps(
                {
                    "id": "t_existing",
                    "status": "running",
                    "task_type": "selftest",
                    "target": "",
                    "payload": {"operator": "tester"},
                    "engine": "selftest",
                    "callback": None,
                    "created": 1.0,
                    "started": 2.0,
                    "finished": None,
                    "retry_count": 0,
                    "error": None,
                },
                ensure_ascii=False,
                indent=2,
            ),
            encoding="utf-8",
        )

        legacy_tasks = {
            "t_existing": {
                "id": "t_existing",
                "status": "queued",
                "task_type": "selftest",
                "payload": {"operator": "tester"},
                "engine": "selftest",
                "callback": None,
                "created_at": 1.0,
                "updated_at": 1.0,
                "error": None,
            },
            "t_new": {
                "id": "t_new",
                "status": "queued",
                "task_type": "selftest",
                "payload": {"operator": "tester"},
                "engine": "selftest",
                "callback": None,
                "created_at": 3.0,
                "updated_at": 3.0,
                "error": None,
            },
        }
        (root / "tasks.json").write_text(
            json.dumps(legacy_tasks, ensure_ascii=False, indent=2),
            encoding="utf-8",
        )

        os.environ["STORAGE_ROOT"] = td
        app = create_app()
        with TestClient(app) as client:
            resp = client.get("/api/v1/health")
            _assert(resp.status_code == 200, f"expected 200, got={resp.status_code}, body={resp.text}")
            _assert(resp.json().get("code") == ERROR_CODE_SUCCESS, "health response code mismatch")

            _assert((root / "tasks.json.migrated").exists(), "tasks.json.migrated missing")
            _assert(not (root / "tasks.json").exists(), "legacy tasks.json should be renamed")
            _assert((root / "tasks" / "t_new.json").exists(), "migrated shard file missing")
            _assert((root / "tasks" / "t_existing.json").exists(), "existing shard file missing")

            existing = json.loads((root / "tasks" / "t_existing.json").read_text(encoding="utf-8"))
            _assert(existing.get("status") == "running", "existing shard should not be overwritten")

            storage = app.state.storage
            task = storage.get_task("t_new")
            _assert(task is not None and task.id == "t_new", "storage.get_task migrated task failed")

            updated = storage.update_task("t_new", status=TaskStatus.succeeded)
            _assert(updated is not None and updated.status == TaskStatus.succeeded, "storage.update_task failed")

            storage.write_result("t_new", {"raw": {"ok": True}, "normalized": None, "meta": {"task_id": "t_new"}})
            _assert((root / "results" / "t_new.json").exists(), "result path mismatch")

            storage.write_callback_failure("t_new", {"task_id": "t_new", "error": "x"})
            _assert((root / "callbacks" / "t_new.json").exists(), "callback failure path mismatch")


def main() -> None:
    verify_storage_migration_and_layout()
    print("ok")


if __name__ == "__main__":
    main()