from __future__ import annotations import json import os import sys import tempfile from pathlib import Path from typing import Any, Dict from fastapi.testclient import TestClient _repo_root = Path(__file__).resolve().parents[2] if str(_repo_root) not in sys.path: sys.path.insert(0, str(_repo_root)) from Spider_XHS.engines.base import EngineRunOutput from Spider_XHS.engines.mediacrawler import MediaCrawlerEngine from Spider_XHS.service.app import create_app from Spider_XHS.service.config import load_config from Spider_XHS.service.errors import ERROR_CODE_SUCCESS from Spider_XHS.service.runner import TaskRunner from Spider_XHS.service.stability_controller import StabilityController from Spider_XHS.service.storage import LocalJsonStorage from Spider_XHS.service.tasks import TaskRecord, TaskStatus def _assert(cond: bool, msg: str) -> None: if not cond: raise AssertionError(msg) class _FakeCaptchaEngine: name = "fake_engine" def run(self, task: TaskRecord) -> EngineRunOutput: raw = {"task_id": task.id} normalized = {"task_id": task.id} meta = { "task_id": task.id, "source_engine": "api", "engine_name": self.name, "source_type": task.task_type, "source_ref": task.target, "operator": (task.payload or {}).get("operator") or "tester", "ingested_at": "1970-01-01T00:00:00Z", "dedup_key": "x", "ok": False, "error_kind": "captcha", "error_message": "captcha_detected", } return EngineRunOutput(raw=raw, normalized=normalized, meta=meta) def verify_task_status_extension_persistence() -> None: with tempfile.TemporaryDirectory() as td: storage = LocalJsonStorage(Path(td)) storage.init() task_id = "t_status_ext" storage.create_task( TaskRecord( id=task_id, status=TaskStatus.queued, task_type="selftest", target="", payload={"operator": "tester"}, engine="selftest", ) ) statuses = ( TaskStatus.retrying, TaskStatus.fallback_running, TaskStatus.waiting_rpa, TaskStatus.rpa_running, TaskStatus.rpa_imported, TaskStatus.rpa_failed, TaskStatus.risk_paused, ) for status in statuses: updated = storage.update_task(task_id, status=status) _assert(updated is not None, f"update_task failed status={status}") got = storage.get_task(task_id) _assert(got is not None, f"get_task missing after update status={status}") _assert(got.status == status, f"status mismatch expected={status} got={got.status}") snap = storage.count_tasks_by_status_snapshot() _assert(int(snap.get(status.value) or 0) == 1, f"snapshot count mismatch for {status.value}") def verify_pools_and_stability_controller_available() -> None: with tempfile.TemporaryDirectory() as td: os.environ["STORAGE_ROOT"] = td cfg = load_config() storage = LocalJsonStorage(cfg.storage_dir) storage.init() runner = TaskRunner(storage=storage, config=cfg) resources_a = runner._stability.select_resources(engine_key="api", payload={}) resources_b = runner._stability.select_resources(engine_key="browser", payload={}) _assert(resources_a is not None and resources_b is not None, "stability_controller.select_resources failed") def verify_mediacrawler_stealth_and_captcha_detection() -> None: script = MediaCrawlerEngine._stealth_init_script() _assert("navigator" in script and "webdriver" in script, "stealth init script missing webdriver patch") _assert(MediaCrawlerEngine._is_captcha_html("captcha"), "captcha html detector failed on captcha token") _assert(MediaCrawlerEngine._is_captcha_html("验证码"), "captcha html detector failed on 中文验证码") engine = MediaCrawlerEngine(proxy=None, storage_state_paths=()) try: engine._raise_if_captcha("captcha") except Exception as e: _assert("captcha_detected" in str(e), "raise_if_captcha should raise captcha_detected") else: raise AssertionError("raise_if_captcha should raise") with tempfile.TemporaryDirectory() as td: cfg = load_config() storage = LocalJsonStorage(Path(td)) storage.init() runner = TaskRunner(storage=storage, config=cfg) runner._engine_api = _FakeCaptchaEngine() task_id = "t_pause_captcha" storage.create_task( TaskRecord( id=task_id, status=TaskStatus.queued, task_type="note_url", target="https://www.xiaohongshu.com/explore/note_123", payload={"note_url": "https://www.xiaohongshu.com/explore/note_123", "operator": "tester"}, engine="api", ) ) runner.run_task(task_id) paused = storage.get_task(task_id) _assert(paused is not None, "paused task missing") _assert(paused.status == TaskStatus.waiting_rpa, f"expected waiting_rpa, got={paused.status}") def verify_stability_controller_error_policies() -> None: controller = StabilityController(engine_fallback_threshold=3) class _Engine: def __init__(self, name: str): self.name = name api_engine = _Engine("spider_xhs") browser_engine = _Engine("mediacrawler") state_auto = controller.init_execution( task_engine_choice="auto", scope="www.xiaohongshu.com", engine_api=api_engine, engine_browser=browser_engine, ) d_timeout = controller.decide_after_failure(state=state_auto, engine_name=api_engine.name, error_kind="timeout") _assert(d_timeout.action == "retry", f"timeout should retry, got={d_timeout.action}") d_missing = controller.decide_after_failure( state=state_auto, engine_name=api_engine.name, error_kind="missing_dependency" ) _assert(d_missing.action == "fallback", f"missing_dependency should fallback, got={d_missing.action}") _assert(d_missing.next_key == "browser", f"missing_dependency next_key should be browser, got={d_missing.next_key}") d_rate = controller.decide_after_failure(state=state_auto, engine_name=api_engine.name, error_kind="rate") _assert(d_rate.action == "fallback", f"rate in auto should fallback first, got={d_rate.action}") state_api_only = controller.init_execution( task_engine_choice="api", scope="www.xiaohongshu.com", engine_api=api_engine, engine_browser=browser_engine, ) d_rate_api = controller.decide_after_failure(state=state_api_only, engine_name=api_engine.name, error_kind="rate") _assert(d_rate_api.action == "pause", f"rate in api-only should pause, got={d_rate_api.action}") _assert(d_rate_api.status == TaskStatus.risk_paused, "rate pause status should be risk_paused") d_captcha = controller.decide_after_failure(state=state_api_only, engine_name=api_engine.name, error_kind="captcha") _assert(d_captcha.action == "pause", f"captcha should pause, got={d_captcha.action}") _assert(d_captcha.status == TaskStatus.waiting_rpa, "captcha pause status should be waiting_rpa") def verify_import_extension_api_and_result_flow() -> None: os.environ["ENABLE_LEGACY_ROUTES"] = "0" with tempfile.TemporaryDirectory() as td: os.environ["STORAGE_ROOT"] = td app = create_app() with TestClient(app) as client: storage = app.state.storage task_id = "t_extension_import" storage.create_task( TaskRecord( id=task_id, status=TaskStatus.waiting_rpa, task_type="note_url", target="https://www.xiaohongshu.com/explore/note_123", payload={"note_url": "https://www.xiaohongshu.com/explore/note_123", "operator": "tester"}, engine="browser", ) ) raw = {"from": "extension"} normalized = {"ok": True} payload: Dict[str, Any] = {"task_id": task_id, "raw": raw, "normalized": normalized} resp = client.post("/api/v1/import/extension", json=payload) _assert(resp.status_code == 200, f"import extension failed: {resp.text}") body = resp.json() _assert(body.get("code") == ERROR_CODE_SUCCESS, f"unexpected code={body.get('code')}") task = ((body.get("data") or {}).get("task") or {}) _assert(task.get("status") == TaskStatus.rpa_imported.value, f"status mismatch: {task.get('status')}") resp2 = client.get(f"/api/v1/tasks/{task_id}/result") _assert(resp2.status_code == 200, f"get result failed: {resp2.text}") body2 = resp2.json() _assert(body2.get("code") == ERROR_CODE_SUCCESS, "result response code mismatch") data2 = body2.get("data") or {} _assert(data2.get("status") == TaskStatus.rpa_imported.value, "result status mismatch") _assert(data2.get("raw") == raw, "raw mismatch") _assert(data2.get("normalized") == normalized, "normalized mismatch") def verify_extension_directory_loadable() -> None: base = _repo_root / "Spider_XHS" / "extension" manifest_path = base / "manifest.json" _assert(manifest_path.exists(), "extension manifest missing") manifest = json.loads(manifest_path.read_text(encoding="utf-8")) _assert(int(manifest.get("manifest_version") or 0) == 3, "extension not MV3") bg = ((manifest.get("background") or {}) if isinstance(manifest.get("background"), dict) else {}) service_worker = str(bg.get("service_worker") or "") _assert(service_worker, "extension background service_worker missing") _assert((base / service_worker).exists(), "extension service_worker file missing") _assert((base / "content.js").exists(), "extension content.js missing") def verify_metrics_and_health_include_proxy_metrics() -> None: os.environ["ENABLE_LEGACY_ROUTES"] = "0" with tempfile.TemporaryDirectory() as td: os.environ["STORAGE_ROOT"] = td app = create_app() with TestClient(app) as client: resp = client.get("/api/v1/health") _assert(resp.status_code == 200, f"health failed: {resp.text}") _assert(resp.json().get("code") == ERROR_CODE_SUCCESS, "health response code mismatch") resp2 = client.get("/api/v1/metrics") _assert(resp2.status_code == 200, f"metrics failed: {resp2.text}") text = resp2.text _assert("spider_xhs_proxy_pool_size" in text, "missing spider_xhs_proxy_pool_size") _assert("spider_xhs_proxy_pool_avg_score" in text, "missing spider_xhs_proxy_pool_avg_score") _assert("spider_xhs_proxy_pool_ejected_total" in text, "missing spider_xhs_proxy_pool_ejected_total") _assert("spider_xhs_proxy_pool_failures_total" in text, "missing spider_xhs_proxy_pool_failures_total") def main() -> None: verify_task_status_extension_persistence() verify_pools_and_stability_controller_available() verify_mediacrawler_stealth_and_captcha_detection() verify_stability_controller_error_policies() verify_import_extension_api_and_result_flow() verify_extension_directory_loadable() verify_metrics_and_health_include_proxy_metrics() print("ok") if __name__ == "__main__": main()