| from __future__ import annotations |
|
|
| import os |
| from dataclasses import dataclass |
| from typing import Any, Dict, Tuple |
|
|
| from .account_pool import AccountPool |
| from .session_pool import SessionPool |
| from .tasks import TaskStatus |
| from .proxy_pool import proxy_pool |
|
|
|
|
| @dataclass(frozen=True) |
| class SelectedResources: |
| account_id: str | None |
| session_id: str | None |
| cookies: str | None |
| storage_state_path: str | None |
| proxy: str | None |
|
|
|
|
| @dataclass(frozen=True) |
| class AttemptPlan: |
| engine_key: str |
| engine: Any |
| engine_name: str |
| status: TaskStatus |
| is_fallback: bool |
| fallback_reason: str | None |
| resources: SelectedResources |
| payload: Dict[str, Any] |
|
|
|
|
| @dataclass |
| class ExecutionState: |
| scope: str |
| engine_choice: str |
| primary_key: str |
| primary_engine: Any |
| fallback_key: str | None |
| fallback_engine: Any | None |
| fallback_reason: str | None |
| current_key: str |
| current_engine: Any |
| used_fallback: bool |
| attempts_total: int |
| attempts_by_key: Dict[str, int] |
|
|
|
|
| @dataclass(frozen=True) |
| class FailureDecision: |
| action: str |
| next_key: str | None |
| status: TaskStatus | None |
| fallback_reason: str | None |
|
|
|
|
| @dataclass(frozen=True) |
| class ErrorPolicy: |
| action: str |
| max_retries: int |
| allow_fallback: bool |
| pause_status: TaskStatus | None = None |
|
|
|
|
| @dataclass(frozen=True) |
| class CaptchaChallenge: |
| engine_name: str |
| scope: str |
| task_id: str | None |
| payload: Dict[str, Any] |
|
|
|
|
| @dataclass(frozen=True) |
| class CaptchaSolution: |
| ok: bool |
| token: str | None = None |
| meta: Dict[str, Any] | None = None |
|
|
|
|
| class CaptchaSolverHook: |
| async def solve(self, challenge: CaptchaChallenge) -> CaptchaSolution: |
| raise NotImplementedError() |
|
|
|
|
| class StabilityController: |
| def __init__(self, *, engine_fallback_threshold: int = 3): |
| self.engine_fallback_threshold = int(engine_fallback_threshold) |
| self._failure_streaks: Dict[Tuple[str, str], int] = {} |
| self.account_pool = AccountPool.from_env() |
| self.session_pool = SessionPool.from_env() |
| self.captcha_solver_enabled = (str(os.getenv("CAPTCHA_SOLVER_ENABLED", "1") or "").strip().lower() in ("1", "true", "yes", "y", "on")) |
| self.captcha_solver: CaptchaSolverHook | None = None |
| self._policy: Dict[str, ErrorPolicy] = { |
| "timeout": ErrorPolicy(action="retry", max_retries=2, allow_fallback=True), |
| "proxy_failed": ErrorPolicy(action="retry", max_retries=2, allow_fallback=True), |
| "rate": ErrorPolicy(action="pause", max_retries=0, allow_fallback=True, pause_status=TaskStatus.risk_paused), |
| "auth": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True), |
| "redirect": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True), |
| "redirect_to_login": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True), |
| "risk": ErrorPolicy(action="pause", max_retries=0, allow_fallback=False, pause_status=TaskStatus.waiting_rpa), |
| "captcha": ErrorPolicy(action="pause", max_retries=0, allow_fallback=False, pause_status=TaskStatus.waiting_rpa), |
| "missing_dependency": ErrorPolicy(action="fallback", max_retries=0, allow_fallback=True), |
| "parse": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True), |
| } |
|
|
| def set_captcha_solver(self, solver: CaptchaSolverHook | None) -> None: |
| self.captcha_solver = solver |
|
|
| def build_captcha_challenge( |
| self, |
| *, |
| engine_name: str, |
| scope: str, |
| task_id: str | None, |
| payload: Dict[str, Any], |
| ) -> CaptchaChallenge: |
| return CaptchaChallenge( |
| engine_name=str(engine_name or "unknown"), |
| scope=str(scope or "unknown"), |
| task_id=str(task_id) if task_id is not None else None, |
| payload=dict(payload or {}), |
| ) |
|
|
| async def try_solve_captcha(self, *, challenge: CaptchaChallenge) -> CaptchaSolution | None: |
| if not bool(self.captcha_solver_enabled): |
| return None |
| solver = self.captcha_solver |
| if solver is None: |
| return None |
| return await solver.solve(challenge) |
|
|
| def _get_streak(self, engine_name: str, scope: str) -> int: |
| return int(self._failure_streaks.get((str(engine_name), str(scope)), 0)) |
|
|
| def _set_streak(self, engine_name: str, scope: str, value: int) -> None: |
| key = (str(engine_name), str(scope)) |
| if int(value) <= 0: |
| self._failure_streaks.pop(key, None) |
| return |
| self._failure_streaks[key] = int(value) |
|
|
| @staticmethod |
| def _normalize_engine_choice(choice: str) -> str: |
| value = (choice or "").strip().lower() |
| if value in ("api", "spider_xhs", "xhs_api"): |
| return "api" |
| if value in ("browser", "mediacrawler", "xhs_browser"): |
| return "browser" |
| if value in ("auto", ""): |
| return "auto" |
| return value |
|
|
| def init_execution( |
| self, |
| *, |
| task_engine_choice: str, |
| scope: str, |
| engine_api: Any, |
| engine_browser: Any, |
| ) -> ExecutionState: |
| choice = self._normalize_engine_choice(task_engine_choice) |
| threshold = int(self.engine_fallback_threshold) |
| primary_key = "api" |
| fallback_key: str | None = None |
| fallback_reason: str | None = None |
|
|
| api_name = getattr(engine_api, "name", "spider_xhs") |
| streak = self._get_streak(api_name, scope) |
| if choice == "api": |
| primary_key = "api" |
| elif choice == "browser": |
| primary_key = "browser" |
| elif choice == "auto": |
| if threshold > 0 and streak >= threshold: |
| primary_key = "browser" |
| fallback_reason = f"threshold_exceeded scope={scope} streak={streak} threshold={threshold}" |
| else: |
| primary_key = "api" |
| fallback_key = "browser" |
| else: |
| raise ValueError(f"unsupported_engine={choice}") |
|
|
| primary_engine = engine_api if primary_key == "api" else engine_browser |
| fb_engine = engine_browser if fallback_key == "browser" else None |
| return ExecutionState( |
| scope=str(scope), |
| engine_choice=choice, |
| primary_key=primary_key, |
| primary_engine=primary_engine, |
| fallback_key=fallback_key, |
| fallback_engine=fb_engine, |
| fallback_reason=fallback_reason, |
| current_key=primary_key, |
| current_engine=primary_engine, |
| used_fallback=False, |
| attempts_total=0, |
| attempts_by_key={}, |
| ) |
|
|
| def select_resources(self, *, engine_key: str, payload: Dict[str, Any]) -> SelectedResources: |
| acc = self.account_pool.pick_available() |
| cookies_in_payload = payload.get("cookies") or payload.get("cookie") or payload.get("cookies_str") |
| storage_state_in_payload = payload.get("storage_state_path") or payload.get("storage_state") |
| proxy_in_payload = payload.get("proxy") or payload.get("proxies") |
|
|
| session = None |
| if engine_key == "api": |
| if not cookies_in_payload: |
| session = self.session_pool.acquire_cookie_session() |
| else: |
| if not storage_state_in_payload: |
| session = self.session_pool.acquire_storage_state_session() |
|
|
| cookies = str(session.cookies) if session is not None and session.cookies else None |
| storage_state_path = str(session.storage_state_path) if session is not None and session.storage_state_path else None |
|
|
| proxy = None |
| if isinstance(proxy_in_payload, str): |
| proxy = str(proxy_in_payload).strip() or None |
| if proxy is None: |
| proxy = proxy_pool.get_random_proxy() |
|
|
| return SelectedResources( |
| account_id=str(acc.id) if acc is not None else None, |
| session_id=str(session.id) if session is not None else None, |
| cookies=cookies, |
| storage_state_path=storage_state_path, |
| proxy=proxy, |
| ) |
|
|
| def apply_resources_to_payload(self, *, payload: Dict[str, Any], resources: SelectedResources) -> Dict[str, Any]: |
| out = dict(payload or {}) |
| if resources.cookies is not None and not (out.get("cookies") or out.get("cookie") or out.get("cookies_str")): |
| out["cookies"] = resources.cookies |
| if resources.storage_state_path is not None and not (out.get("storage_state_path") or out.get("storage_state")): |
| out["storage_state_path"] = resources.storage_state_path |
| if resources.proxy is not None and not (out.get("proxy") or out.get("proxies")): |
| out["proxy"] = resources.proxy |
| return out |
|
|
| def build_attempt_plan(self, *, state: ExecutionState, payload: Dict[str, Any]) -> AttemptPlan: |
| resources = self.select_resources(engine_key=state.current_key, payload=payload) |
| patched = self.apply_resources_to_payload(payload=payload, resources=resources) |
| state.attempts_total = int(state.attempts_total) + 1 |
| state.attempts_by_key[state.current_key] = int(state.attempts_by_key.get(state.current_key, 0)) + 1 |
|
|
| engine = state.current_engine |
| engine_name = str(getattr(engine, "name", state.current_key)) |
| is_fallback = bool(state.used_fallback or (state.current_key != state.primary_key)) |
| if is_fallback: |
| status = TaskStatus.fallback_running |
| elif state.attempts_by_key.get(state.current_key, 0) > 1: |
| status = TaskStatus.retrying |
| else: |
| status = TaskStatus.running |
|
|
| return AttemptPlan( |
| engine_key=state.current_key, |
| engine=engine, |
| engine_name=engine_name, |
| status=status, |
| is_fallback=is_fallback, |
| fallback_reason=state.fallback_reason if is_fallback else None, |
| resources=resources, |
| payload=patched, |
| ) |
|
|
| def decide_after_failure( |
| self, |
| *, |
| state: ExecutionState, |
| engine_name: str, |
| error_kind: str | None, |
| ) -> FailureDecision: |
| kind = (error_kind or "").strip().lower() or "parse" |
| policy = self._policy.get(kind) or self._policy["parse"] |
| attempts = int(state.attempts_by_key.get(state.current_key, 0)) |
|
|
| if policy.action == "pause": |
| if ( |
| policy.allow_fallback |
| and state.fallback_engine is not None |
| and not state.used_fallback |
| and state.current_key == state.primary_key |
| ): |
| reason = state.fallback_reason or f"primary_failed error_kind={kind}" |
| return FailureDecision( |
| action="fallback", |
| next_key=state.fallback_key, |
| status=TaskStatus.fallback_running, |
| fallback_reason=reason, |
| ) |
| return FailureDecision(action="pause", next_key=None, status=policy.pause_status, fallback_reason=None) |
|
|
| if policy.action == "retry" and attempts <= int(policy.max_retries): |
| return FailureDecision(action="retry", next_key=state.current_key, status=TaskStatus.retrying, fallback_reason=None) |
|
|
| if policy.allow_fallback and state.fallback_engine is not None and not state.used_fallback and state.current_key == state.primary_key: |
| reason = state.fallback_reason or f"primary_failed error_kind={kind}" |
| return FailureDecision(action="fallback", next_key=state.fallback_key, status=TaskStatus.fallback_running, fallback_reason=reason) |
|
|
| if policy.action == "fallback" and state.fallback_engine is not None and not state.used_fallback and state.current_key == state.primary_key: |
| reason = state.fallback_reason or f"primary_failed error_kind={kind}" |
| return FailureDecision(action="fallback", next_key=state.fallback_key, status=TaskStatus.fallback_running, fallback_reason=reason) |
|
|
| return FailureDecision(action="fail", next_key=None, status=None, fallback_reason=None) |
|
|
| def report_success(self, *, resources: SelectedResources) -> None: |
| if resources.account_id: |
| try: |
| self.account_pool.report_success(resources.account_id) |
| except Exception: |
| pass |
| if resources.session_id: |
| try: |
| self.session_pool.report_success(resources.session_id) |
| except Exception: |
| pass |
|
|
| def report_failure(self, *, resources: SelectedResources, error_kind: str | None) -> None: |
| kind = (error_kind or "").strip() or None |
| if resources.account_id: |
| try: |
| self.account_pool.report_failure(resources.account_id, error_kind=kind) |
| except Exception: |
| pass |
| if resources.session_id: |
| try: |
| self.session_pool.report_failure(resources.session_id, error_kind=kind) |
| except Exception: |
| pass |
|
|
| def observe_result( |
| self, |
| *, |
| state: ExecutionState, |
| engine_name: str, |
| ok: bool, |
| error_kind: str | None, |
| ) -> None: |
| name = str(engine_name or state.current_key) |
| if ok: |
| self._set_streak(name, state.scope, 0) |
| return |
| kind = (error_kind or "").strip().lower() |
| if state.engine_choice == "auto" and state.current_key == state.primary_key and kind in ( |
| "auth", |
| "rate", |
| "risk", |
| "captcha", |
| "redirect", |
| "redirect_to_login", |
| ): |
| self._set_streak(name, state.scope, self._get_streak(name, state.scope) + 1) |
| else: |
| self._set_streak(name, state.scope, 0) |
|
|
| def switch_to(self, *, state: ExecutionState, key: str, fallback_reason: str | None) -> None: |
| if key == state.current_key: |
| return |
| if key == state.fallback_key: |
| state.used_fallback = True |
| state.fallback_reason = fallback_reason or state.fallback_reason |
| state.current_key = key |
| state.current_engine = state.fallback_engine if key == state.fallback_key else state.primary_engine |
|
|