| from __future__ import annotations |
|
|
| import json |
| import os |
| import sys |
| import tempfile |
| from pathlib import Path |
| from typing import Any, Dict |
|
|
| from fastapi.testclient import TestClient |
|
|
| _repo_root = Path(__file__).resolve().parents[2] |
| if str(_repo_root) not in sys.path: |
| sys.path.insert(0, str(_repo_root)) |
|
|
| from Spider_XHS.engines.base import EngineRunOutput |
| from Spider_XHS.engines.mediacrawler import MediaCrawlerEngine |
| from Spider_XHS.service.app import create_app |
| from Spider_XHS.service.config import load_config |
| from Spider_XHS.service.errors import ERROR_CODE_SUCCESS |
| from Spider_XHS.service.runner import TaskRunner |
| from Spider_XHS.service.stability_controller import StabilityController |
| from Spider_XHS.service.storage import LocalJsonStorage |
| from Spider_XHS.service.tasks import TaskRecord, TaskStatus |
|
|
|
|
| def _assert(cond: bool, msg: str) -> None: |
| if not cond: |
| raise AssertionError(msg) |
|
|
|
|
| class _FakeCaptchaEngine: |
| name = "fake_engine" |
|
|
| def run(self, task: TaskRecord) -> EngineRunOutput: |
| raw = {"task_id": task.id} |
| normalized = {"task_id": task.id} |
| meta = { |
| "task_id": task.id, |
| "source_engine": "api", |
| "engine_name": self.name, |
| "source_type": task.task_type, |
| "source_ref": task.target, |
| "operator": (task.payload or {}).get("operator") or "tester", |
| "ingested_at": "1970-01-01T00:00:00Z", |
| "dedup_key": "x", |
| "ok": False, |
| "error_kind": "captcha", |
| "error_message": "captcha_detected", |
| } |
| return EngineRunOutput(raw=raw, normalized=normalized, meta=meta) |
|
|
|
|
| def verify_task_status_extension_persistence() -> None: |
| with tempfile.TemporaryDirectory() as td: |
| storage = LocalJsonStorage(Path(td)) |
| storage.init() |
| task_id = "t_status_ext" |
| storage.create_task( |
| TaskRecord( |
| id=task_id, |
| status=TaskStatus.queued, |
| task_type="selftest", |
| target="", |
| payload={"operator": "tester"}, |
| engine="selftest", |
| ) |
| ) |
|
|
| statuses = ( |
| TaskStatus.retrying, |
| TaskStatus.fallback_running, |
| TaskStatus.waiting_rpa, |
| TaskStatus.rpa_running, |
| TaskStatus.rpa_imported, |
| TaskStatus.rpa_failed, |
| TaskStatus.risk_paused, |
| ) |
| for status in statuses: |
| updated = storage.update_task(task_id, status=status) |
| _assert(updated is not None, f"update_task failed status={status}") |
| got = storage.get_task(task_id) |
| _assert(got is not None, f"get_task missing after update status={status}") |
| _assert(got.status == status, f"status mismatch expected={status} got={got.status}") |
|
|
| snap = storage.count_tasks_by_status_snapshot() |
| _assert(int(snap.get(status.value) or 0) == 1, f"snapshot count mismatch for {status.value}") |
|
|
|
|
| def verify_pools_and_stability_controller_available() -> None: |
| with tempfile.TemporaryDirectory() as td: |
| os.environ["STORAGE_ROOT"] = td |
| cfg = load_config() |
| storage = LocalJsonStorage(cfg.storage_dir) |
| storage.init() |
| runner = TaskRunner(storage=storage, config=cfg) |
| resources_a = runner._stability.select_resources(engine_key="api", payload={}) |
| resources_b = runner._stability.select_resources(engine_key="browser", payload={}) |
| _assert(resources_a is not None and resources_b is not None, "stability_controller.select_resources failed") |
|
|
|
|
| def verify_mediacrawler_stealth_and_captcha_detection() -> None: |
| script = MediaCrawlerEngine._stealth_init_script() |
| _assert("navigator" in script and "webdriver" in script, "stealth init script missing webdriver patch") |
| _assert(MediaCrawlerEngine._is_captcha_html("<html>captcha</html>"), "captcha html detector failed on captcha token") |
| _assert(MediaCrawlerEngine._is_captcha_html("<html>验证码</html>"), "captcha html detector failed on 中文验证码") |
|
|
| engine = MediaCrawlerEngine(proxy=None, storage_state_paths=()) |
| try: |
| engine._raise_if_captcha("<html>captcha</html>") |
| except Exception as e: |
| _assert("captcha_detected" in str(e), "raise_if_captcha should raise captcha_detected") |
| else: |
| raise AssertionError("raise_if_captcha should raise") |
|
|
| with tempfile.TemporaryDirectory() as td: |
| cfg = load_config() |
| storage = LocalJsonStorage(Path(td)) |
| storage.init() |
| runner = TaskRunner(storage=storage, config=cfg) |
| runner._engine_api = _FakeCaptchaEngine() |
| task_id = "t_pause_captcha" |
| storage.create_task( |
| TaskRecord( |
| id=task_id, |
| status=TaskStatus.queued, |
| task_type="note_url", |
| target="https://www.xiaohongshu.com/explore/note_123", |
| payload={"note_url": "https://www.xiaohongshu.com/explore/note_123", "operator": "tester"}, |
| engine="api", |
| ) |
| ) |
| runner.run_task(task_id) |
| paused = storage.get_task(task_id) |
| _assert(paused is not None, "paused task missing") |
| _assert(paused.status == TaskStatus.waiting_rpa, f"expected waiting_rpa, got={paused.status}") |
|
|
|
|
| def verify_stability_controller_error_policies() -> None: |
| controller = StabilityController(engine_fallback_threshold=3) |
|
|
| class _Engine: |
| def __init__(self, name: str): |
| self.name = name |
|
|
| api_engine = _Engine("spider_xhs") |
| browser_engine = _Engine("mediacrawler") |
|
|
| state_auto = controller.init_execution( |
| task_engine_choice="auto", |
| scope="www.xiaohongshu.com", |
| engine_api=api_engine, |
| engine_browser=browser_engine, |
| ) |
| d_timeout = controller.decide_after_failure(state=state_auto, engine_name=api_engine.name, error_kind="timeout") |
| _assert(d_timeout.action == "retry", f"timeout should retry, got={d_timeout.action}") |
|
|
| d_missing = controller.decide_after_failure( |
| state=state_auto, engine_name=api_engine.name, error_kind="missing_dependency" |
| ) |
| _assert(d_missing.action == "fallback", f"missing_dependency should fallback, got={d_missing.action}") |
| _assert(d_missing.next_key == "browser", f"missing_dependency next_key should be browser, got={d_missing.next_key}") |
|
|
| d_rate = controller.decide_after_failure(state=state_auto, engine_name=api_engine.name, error_kind="rate") |
| _assert(d_rate.action == "fallback", f"rate in auto should fallback first, got={d_rate.action}") |
|
|
| state_api_only = controller.init_execution( |
| task_engine_choice="api", |
| scope="www.xiaohongshu.com", |
| engine_api=api_engine, |
| engine_browser=browser_engine, |
| ) |
| d_rate_api = controller.decide_after_failure(state=state_api_only, engine_name=api_engine.name, error_kind="rate") |
| _assert(d_rate_api.action == "pause", f"rate in api-only should pause, got={d_rate_api.action}") |
| _assert(d_rate_api.status == TaskStatus.risk_paused, "rate pause status should be risk_paused") |
|
|
| d_captcha = controller.decide_after_failure(state=state_api_only, engine_name=api_engine.name, error_kind="captcha") |
| _assert(d_captcha.action == "pause", f"captcha should pause, got={d_captcha.action}") |
| _assert(d_captcha.status == TaskStatus.waiting_rpa, "captcha pause status should be waiting_rpa") |
|
|
|
|
| def verify_import_extension_api_and_result_flow() -> None: |
| os.environ["ENABLE_LEGACY_ROUTES"] = "0" |
| with tempfile.TemporaryDirectory() as td: |
| os.environ["STORAGE_ROOT"] = td |
| app = create_app() |
| with TestClient(app) as client: |
| storage = app.state.storage |
| task_id = "t_extension_import" |
| storage.create_task( |
| TaskRecord( |
| id=task_id, |
| status=TaskStatus.waiting_rpa, |
| task_type="note_url", |
| target="https://www.xiaohongshu.com/explore/note_123", |
| payload={"note_url": "https://www.xiaohongshu.com/explore/note_123", "operator": "tester"}, |
| engine="browser", |
| ) |
| ) |
|
|
| raw = {"from": "extension"} |
| normalized = {"ok": True} |
| payload: Dict[str, Any] = {"task_id": task_id, "raw": raw, "normalized": normalized} |
| resp = client.post("/api/v1/import/extension", json=payload) |
| _assert(resp.status_code == 200, f"import extension failed: {resp.text}") |
| body = resp.json() |
| _assert(body.get("code") == ERROR_CODE_SUCCESS, f"unexpected code={body.get('code')}") |
| task = ((body.get("data") or {}).get("task") or {}) |
| _assert(task.get("status") == TaskStatus.rpa_imported.value, f"status mismatch: {task.get('status')}") |
|
|
| resp2 = client.get(f"/api/v1/tasks/{task_id}/result") |
| _assert(resp2.status_code == 200, f"get result failed: {resp2.text}") |
| body2 = resp2.json() |
| _assert(body2.get("code") == ERROR_CODE_SUCCESS, "result response code mismatch") |
| data2 = body2.get("data") or {} |
| _assert(data2.get("status") == TaskStatus.rpa_imported.value, "result status mismatch") |
| _assert(data2.get("raw") == raw, "raw mismatch") |
| _assert(data2.get("normalized") == normalized, "normalized mismatch") |
|
|
|
|
| def verify_extension_directory_loadable() -> None: |
| base = _repo_root / "Spider_XHS" / "extension" |
| manifest_path = base / "manifest.json" |
| _assert(manifest_path.exists(), "extension manifest missing") |
| manifest = json.loads(manifest_path.read_text(encoding="utf-8")) |
| _assert(int(manifest.get("manifest_version") or 0) == 3, "extension not MV3") |
| bg = ((manifest.get("background") or {}) if isinstance(manifest.get("background"), dict) else {}) |
| service_worker = str(bg.get("service_worker") or "") |
| _assert(service_worker, "extension background service_worker missing") |
| _assert((base / service_worker).exists(), "extension service_worker file missing") |
| _assert((base / "content.js").exists(), "extension content.js missing") |
|
|
|
|
| def verify_metrics_and_health_include_proxy_metrics() -> None: |
| os.environ["ENABLE_LEGACY_ROUTES"] = "0" |
| with tempfile.TemporaryDirectory() as td: |
| os.environ["STORAGE_ROOT"] = td |
| app = create_app() |
| with TestClient(app) as client: |
| resp = client.get("/api/v1/health") |
| _assert(resp.status_code == 200, f"health failed: {resp.text}") |
| _assert(resp.json().get("code") == ERROR_CODE_SUCCESS, "health response code mismatch") |
|
|
| resp2 = client.get("/api/v1/metrics") |
| _assert(resp2.status_code == 200, f"metrics failed: {resp2.text}") |
| text = resp2.text |
| _assert("spider_xhs_proxy_pool_size" in text, "missing spider_xhs_proxy_pool_size") |
| _assert("spider_xhs_proxy_pool_avg_score" in text, "missing spider_xhs_proxy_pool_avg_score") |
| _assert("spider_xhs_proxy_pool_ejected_total" in text, "missing spider_xhs_proxy_pool_ejected_total") |
| _assert("spider_xhs_proxy_pool_failures_total" in text, "missing spider_xhs_proxy_pool_failures_total") |
|
|
|
|
| def main() -> None: |
| verify_task_status_extension_persistence() |
| verify_pools_and_stability_controller_available() |
| verify_mediacrawler_stealth_and_captcha_detection() |
| verify_stability_controller_error_policies() |
| verify_import_extension_api_and_result_flow() |
| verify_extension_directory_loadable() |
| verify_metrics_and_health_include_proxy_metrics() |
| print("ok") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|