XHS / verification /verify_upgrade_stability_controller_v1.py
Trae Bot
Upload Spider_XHS project
c481f8a
from __future__ import annotations
import json
import os
import sys
import tempfile
from pathlib import Path
from typing import Any, Dict
from fastapi.testclient import TestClient
_repo_root = Path(__file__).resolve().parents[2]
if str(_repo_root) not in sys.path:
sys.path.insert(0, str(_repo_root))
from Spider_XHS.engines.base import EngineRunOutput
from Spider_XHS.engines.mediacrawler import MediaCrawlerEngine
from Spider_XHS.service.app import create_app
from Spider_XHS.service.config import load_config
from Spider_XHS.service.errors import ERROR_CODE_SUCCESS
from Spider_XHS.service.runner import TaskRunner
from Spider_XHS.service.stability_controller import StabilityController
from Spider_XHS.service.storage import LocalJsonStorage
from Spider_XHS.service.tasks import TaskRecord, TaskStatus
def _assert(cond: bool, msg: str) -> None:
if not cond:
raise AssertionError(msg)
class _FakeCaptchaEngine:
name = "fake_engine"
def run(self, task: TaskRecord) -> EngineRunOutput:
raw = {"task_id": task.id}
normalized = {"task_id": task.id}
meta = {
"task_id": task.id,
"source_engine": "api",
"engine_name": self.name,
"source_type": task.task_type,
"source_ref": task.target,
"operator": (task.payload or {}).get("operator") or "tester",
"ingested_at": "1970-01-01T00:00:00Z",
"dedup_key": "x",
"ok": False,
"error_kind": "captcha",
"error_message": "captcha_detected",
}
return EngineRunOutput(raw=raw, normalized=normalized, meta=meta)
def verify_task_status_extension_persistence() -> None:
with tempfile.TemporaryDirectory() as td:
storage = LocalJsonStorage(Path(td))
storage.init()
task_id = "t_status_ext"
storage.create_task(
TaskRecord(
id=task_id,
status=TaskStatus.queued,
task_type="selftest",
target="",
payload={"operator": "tester"},
engine="selftest",
)
)
statuses = (
TaskStatus.retrying,
TaskStatus.fallback_running,
TaskStatus.waiting_rpa,
TaskStatus.rpa_running,
TaskStatus.rpa_imported,
TaskStatus.rpa_failed,
TaskStatus.risk_paused,
)
for status in statuses:
updated = storage.update_task(task_id, status=status)
_assert(updated is not None, f"update_task failed status={status}")
got = storage.get_task(task_id)
_assert(got is not None, f"get_task missing after update status={status}")
_assert(got.status == status, f"status mismatch expected={status} got={got.status}")
snap = storage.count_tasks_by_status_snapshot()
_assert(int(snap.get(status.value) or 0) == 1, f"snapshot count mismatch for {status.value}")
def verify_pools_and_stability_controller_available() -> None:
with tempfile.TemporaryDirectory() as td:
os.environ["STORAGE_ROOT"] = td
cfg = load_config()
storage = LocalJsonStorage(cfg.storage_dir)
storage.init()
runner = TaskRunner(storage=storage, config=cfg)
resources_a = runner._stability.select_resources(engine_key="api", payload={})
resources_b = runner._stability.select_resources(engine_key="browser", payload={})
_assert(resources_a is not None and resources_b is not None, "stability_controller.select_resources failed")
def verify_mediacrawler_stealth_and_captcha_detection() -> None:
script = MediaCrawlerEngine._stealth_init_script()
_assert("navigator" in script and "webdriver" in script, "stealth init script missing webdriver patch")
_assert(MediaCrawlerEngine._is_captcha_html("<html>captcha</html>"), "captcha html detector failed on captcha token")
_assert(MediaCrawlerEngine._is_captcha_html("<html>验证码</html>"), "captcha html detector failed on 中文验证码")
engine = MediaCrawlerEngine(proxy=None, storage_state_paths=())
try:
engine._raise_if_captcha("<html>captcha</html>")
except Exception as e:
_assert("captcha_detected" in str(e), "raise_if_captcha should raise captcha_detected")
else:
raise AssertionError("raise_if_captcha should raise")
with tempfile.TemporaryDirectory() as td:
cfg = load_config()
storage = LocalJsonStorage(Path(td))
storage.init()
runner = TaskRunner(storage=storage, config=cfg)
runner._engine_api = _FakeCaptchaEngine()
task_id = "t_pause_captcha"
storage.create_task(
TaskRecord(
id=task_id,
status=TaskStatus.queued,
task_type="note_url",
target="https://www.xiaohongshu.com/explore/note_123",
payload={"note_url": "https://www.xiaohongshu.com/explore/note_123", "operator": "tester"},
engine="api",
)
)
runner.run_task(task_id)
paused = storage.get_task(task_id)
_assert(paused is not None, "paused task missing")
_assert(paused.status == TaskStatus.waiting_rpa, f"expected waiting_rpa, got={paused.status}")
def verify_stability_controller_error_policies() -> None:
controller = StabilityController(engine_fallback_threshold=3)
class _Engine:
def __init__(self, name: str):
self.name = name
api_engine = _Engine("spider_xhs")
browser_engine = _Engine("mediacrawler")
state_auto = controller.init_execution(
task_engine_choice="auto",
scope="www.xiaohongshu.com",
engine_api=api_engine,
engine_browser=browser_engine,
)
d_timeout = controller.decide_after_failure(state=state_auto, engine_name=api_engine.name, error_kind="timeout")
_assert(d_timeout.action == "retry", f"timeout should retry, got={d_timeout.action}")
d_missing = controller.decide_after_failure(
state=state_auto, engine_name=api_engine.name, error_kind="missing_dependency"
)
_assert(d_missing.action == "fallback", f"missing_dependency should fallback, got={d_missing.action}")
_assert(d_missing.next_key == "browser", f"missing_dependency next_key should be browser, got={d_missing.next_key}")
d_rate = controller.decide_after_failure(state=state_auto, engine_name=api_engine.name, error_kind="rate")
_assert(d_rate.action == "fallback", f"rate in auto should fallback first, got={d_rate.action}")
state_api_only = controller.init_execution(
task_engine_choice="api",
scope="www.xiaohongshu.com",
engine_api=api_engine,
engine_browser=browser_engine,
)
d_rate_api = controller.decide_after_failure(state=state_api_only, engine_name=api_engine.name, error_kind="rate")
_assert(d_rate_api.action == "pause", f"rate in api-only should pause, got={d_rate_api.action}")
_assert(d_rate_api.status == TaskStatus.risk_paused, "rate pause status should be risk_paused")
d_captcha = controller.decide_after_failure(state=state_api_only, engine_name=api_engine.name, error_kind="captcha")
_assert(d_captcha.action == "pause", f"captcha should pause, got={d_captcha.action}")
_assert(d_captcha.status == TaskStatus.waiting_rpa, "captcha pause status should be waiting_rpa")
def verify_import_extension_api_and_result_flow() -> None:
os.environ["ENABLE_LEGACY_ROUTES"] = "0"
with tempfile.TemporaryDirectory() as td:
os.environ["STORAGE_ROOT"] = td
app = create_app()
with TestClient(app) as client:
storage = app.state.storage
task_id = "t_extension_import"
storage.create_task(
TaskRecord(
id=task_id,
status=TaskStatus.waiting_rpa,
task_type="note_url",
target="https://www.xiaohongshu.com/explore/note_123",
payload={"note_url": "https://www.xiaohongshu.com/explore/note_123", "operator": "tester"},
engine="browser",
)
)
raw = {"from": "extension"}
normalized = {"ok": True}
payload: Dict[str, Any] = {"task_id": task_id, "raw": raw, "normalized": normalized}
resp = client.post("/api/v1/import/extension", json=payload)
_assert(resp.status_code == 200, f"import extension failed: {resp.text}")
body = resp.json()
_assert(body.get("code") == ERROR_CODE_SUCCESS, f"unexpected code={body.get('code')}")
task = ((body.get("data") or {}).get("task") or {})
_assert(task.get("status") == TaskStatus.rpa_imported.value, f"status mismatch: {task.get('status')}")
resp2 = client.get(f"/api/v1/tasks/{task_id}/result")
_assert(resp2.status_code == 200, f"get result failed: {resp2.text}")
body2 = resp2.json()
_assert(body2.get("code") == ERROR_CODE_SUCCESS, "result response code mismatch")
data2 = body2.get("data") or {}
_assert(data2.get("status") == TaskStatus.rpa_imported.value, "result status mismatch")
_assert(data2.get("raw") == raw, "raw mismatch")
_assert(data2.get("normalized") == normalized, "normalized mismatch")
def verify_extension_directory_loadable() -> None:
base = _repo_root / "Spider_XHS" / "extension"
manifest_path = base / "manifest.json"
_assert(manifest_path.exists(), "extension manifest missing")
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
_assert(int(manifest.get("manifest_version") or 0) == 3, "extension not MV3")
bg = ((manifest.get("background") or {}) if isinstance(manifest.get("background"), dict) else {})
service_worker = str(bg.get("service_worker") or "")
_assert(service_worker, "extension background service_worker missing")
_assert((base / service_worker).exists(), "extension service_worker file missing")
_assert((base / "content.js").exists(), "extension content.js missing")
def verify_metrics_and_health_include_proxy_metrics() -> None:
os.environ["ENABLE_LEGACY_ROUTES"] = "0"
with tempfile.TemporaryDirectory() as td:
os.environ["STORAGE_ROOT"] = td
app = create_app()
with TestClient(app) as client:
resp = client.get("/api/v1/health")
_assert(resp.status_code == 200, f"health failed: {resp.text}")
_assert(resp.json().get("code") == ERROR_CODE_SUCCESS, "health response code mismatch")
resp2 = client.get("/api/v1/metrics")
_assert(resp2.status_code == 200, f"metrics failed: {resp2.text}")
text = resp2.text
_assert("spider_xhs_proxy_pool_size" in text, "missing spider_xhs_proxy_pool_size")
_assert("spider_xhs_proxy_pool_avg_score" in text, "missing spider_xhs_proxy_pool_avg_score")
_assert("spider_xhs_proxy_pool_ejected_total" in text, "missing spider_xhs_proxy_pool_ejected_total")
_assert("spider_xhs_proxy_pool_failures_total" in text, "missing spider_xhs_proxy_pool_failures_total")
def main() -> None:
verify_task_status_extension_persistence()
verify_pools_and_stability_controller_available()
verify_mediacrawler_stealth_and_captcha_detection()
verify_stability_controller_error_policies()
verify_import_extension_api_and_result_flow()
verify_extension_directory_loadable()
verify_metrics_and_health_include_proxy_metrics()
print("ok")
if __name__ == "__main__":
main()