from __future__ import annotations import os from dataclasses import dataclass from typing import Any, Dict, Tuple from .account_pool import AccountPool from .session_pool import SessionPool from .tasks import TaskStatus from .proxy_pool import proxy_pool @dataclass(frozen=True) class SelectedResources: account_id: str | None session_id: str | None cookies: str | None storage_state_path: str | None proxy: str | None @dataclass(frozen=True) class AttemptPlan: engine_key: str engine: Any engine_name: str status: TaskStatus is_fallback: bool fallback_reason: str | None resources: SelectedResources payload: Dict[str, Any] @dataclass class ExecutionState: scope: str engine_choice: str primary_key: str primary_engine: Any fallback_key: str | None fallback_engine: Any | None fallback_reason: str | None current_key: str current_engine: Any used_fallback: bool attempts_total: int attempts_by_key: Dict[str, int] @dataclass(frozen=True) class FailureDecision: action: str next_key: str | None status: TaskStatus | None fallback_reason: str | None @dataclass(frozen=True) class ErrorPolicy: action: str max_retries: int allow_fallback: bool pause_status: TaskStatus | None = None @dataclass(frozen=True) class CaptchaChallenge: engine_name: str scope: str task_id: str | None payload: Dict[str, Any] @dataclass(frozen=True) class CaptchaSolution: ok: bool token: str | None = None meta: Dict[str, Any] | None = None class CaptchaSolverHook: async def solve(self, challenge: CaptchaChallenge) -> CaptchaSolution: raise NotImplementedError() class StabilityController: def __init__(self, *, engine_fallback_threshold: int = 3): self.engine_fallback_threshold = int(engine_fallback_threshold) self._failure_streaks: Dict[Tuple[str, str], int] = {} self.account_pool = AccountPool.from_env() self.session_pool = SessionPool.from_env() self.captcha_solver_enabled = (str(os.getenv("CAPTCHA_SOLVER_ENABLED", "1") or "").strip().lower() in ("1", "true", "yes", "y", "on")) self.captcha_solver: CaptchaSolverHook | None = None self._policy: Dict[str, ErrorPolicy] = { "timeout": ErrorPolicy(action="retry", max_retries=2, allow_fallback=True), "proxy_failed": ErrorPolicy(action="retry", max_retries=2, allow_fallback=True), "rate": ErrorPolicy(action="pause", max_retries=0, allow_fallback=True, pause_status=TaskStatus.risk_paused), "auth": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True), "redirect": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True), "redirect_to_login": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True), "risk": ErrorPolicy(action="pause", max_retries=0, allow_fallback=False, pause_status=TaskStatus.waiting_rpa), "captcha": ErrorPolicy(action="pause", max_retries=0, allow_fallback=False, pause_status=TaskStatus.waiting_rpa), "missing_dependency": ErrorPolicy(action="fallback", max_retries=0, allow_fallback=True), "parse": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True), } def set_captcha_solver(self, solver: CaptchaSolverHook | None) -> None: self.captcha_solver = solver def build_captcha_challenge( self, *, engine_name: str, scope: str, task_id: str | None, payload: Dict[str, Any], ) -> CaptchaChallenge: return CaptchaChallenge( engine_name=str(engine_name or "unknown"), scope=str(scope or "unknown"), task_id=str(task_id) if task_id is not None else None, payload=dict(payload or {}), ) async def try_solve_captcha(self, *, challenge: CaptchaChallenge) -> CaptchaSolution | None: if not bool(self.captcha_solver_enabled): return None solver = self.captcha_solver if solver is None: return None return await solver.solve(challenge) def _get_streak(self, engine_name: str, scope: str) -> int: return int(self._failure_streaks.get((str(engine_name), str(scope)), 0)) def _set_streak(self, engine_name: str, scope: str, value: int) -> None: key = (str(engine_name), str(scope)) if int(value) <= 0: self._failure_streaks.pop(key, None) return self._failure_streaks[key] = int(value) @staticmethod def _normalize_engine_choice(choice: str) -> str: value = (choice or "").strip().lower() if value in ("api", "spider_xhs", "xhs_api"): return "api" if value in ("browser", "mediacrawler", "xhs_browser"): return "browser" if value in ("auto", ""): return "auto" return value def init_execution( self, *, task_engine_choice: str, scope: str, engine_api: Any, engine_browser: Any, ) -> ExecutionState: choice = self._normalize_engine_choice(task_engine_choice) threshold = int(self.engine_fallback_threshold) primary_key = "api" fallback_key: str | None = None fallback_reason: str | None = None api_name = getattr(engine_api, "name", "spider_xhs") streak = self._get_streak(api_name, scope) if choice == "api": primary_key = "api" elif choice == "browser": primary_key = "browser" elif choice == "auto": if threshold > 0 and streak >= threshold: primary_key = "browser" fallback_reason = f"threshold_exceeded scope={scope} streak={streak} threshold={threshold}" else: primary_key = "api" fallback_key = "browser" else: raise ValueError(f"unsupported_engine={choice}") primary_engine = engine_api if primary_key == "api" else engine_browser fb_engine = engine_browser if fallback_key == "browser" else None return ExecutionState( scope=str(scope), engine_choice=choice, primary_key=primary_key, primary_engine=primary_engine, fallback_key=fallback_key, fallback_engine=fb_engine, fallback_reason=fallback_reason, current_key=primary_key, current_engine=primary_engine, used_fallback=False, attempts_total=0, attempts_by_key={}, ) def select_resources(self, *, engine_key: str, payload: Dict[str, Any]) -> SelectedResources: acc = self.account_pool.pick_available() cookies_in_payload = payload.get("cookies") or payload.get("cookie") or payload.get("cookies_str") storage_state_in_payload = payload.get("storage_state_path") or payload.get("storage_state") proxy_in_payload = payload.get("proxy") or payload.get("proxies") session = None if engine_key == "api": if not cookies_in_payload: session = self.session_pool.acquire_cookie_session() else: if not storage_state_in_payload: session = self.session_pool.acquire_storage_state_session() cookies = str(session.cookies) if session is not None and session.cookies else None storage_state_path = str(session.storage_state_path) if session is not None and session.storage_state_path else None proxy = None if isinstance(proxy_in_payload, str): proxy = str(proxy_in_payload).strip() or None if proxy is None: proxy = proxy_pool.get_random_proxy() return SelectedResources( account_id=str(acc.id) if acc is not None else None, session_id=str(session.id) if session is not None else None, cookies=cookies, storage_state_path=storage_state_path, proxy=proxy, ) def apply_resources_to_payload(self, *, payload: Dict[str, Any], resources: SelectedResources) -> Dict[str, Any]: out = dict(payload or {}) if resources.cookies is not None and not (out.get("cookies") or out.get("cookie") or out.get("cookies_str")): out["cookies"] = resources.cookies if resources.storage_state_path is not None and not (out.get("storage_state_path") or out.get("storage_state")): out["storage_state_path"] = resources.storage_state_path if resources.proxy is not None and not (out.get("proxy") or out.get("proxies")): out["proxy"] = resources.proxy return out def build_attempt_plan(self, *, state: ExecutionState, payload: Dict[str, Any]) -> AttemptPlan: resources = self.select_resources(engine_key=state.current_key, payload=payload) patched = self.apply_resources_to_payload(payload=payload, resources=resources) state.attempts_total = int(state.attempts_total) + 1 state.attempts_by_key[state.current_key] = int(state.attempts_by_key.get(state.current_key, 0)) + 1 engine = state.current_engine engine_name = str(getattr(engine, "name", state.current_key)) is_fallback = bool(state.used_fallback or (state.current_key != state.primary_key)) if is_fallback: status = TaskStatus.fallback_running elif state.attempts_by_key.get(state.current_key, 0) > 1: status = TaskStatus.retrying else: status = TaskStatus.running return AttemptPlan( engine_key=state.current_key, engine=engine, engine_name=engine_name, status=status, is_fallback=is_fallback, fallback_reason=state.fallback_reason if is_fallback else None, resources=resources, payload=patched, ) def decide_after_failure( self, *, state: ExecutionState, engine_name: str, error_kind: str | None, ) -> FailureDecision: kind = (error_kind or "").strip().lower() or "parse" policy = self._policy.get(kind) or self._policy["parse"] attempts = int(state.attempts_by_key.get(state.current_key, 0)) if policy.action == "pause": if ( policy.allow_fallback and state.fallback_engine is not None and not state.used_fallback and state.current_key == state.primary_key ): reason = state.fallback_reason or f"primary_failed error_kind={kind}" return FailureDecision( action="fallback", next_key=state.fallback_key, status=TaskStatus.fallback_running, fallback_reason=reason, ) return FailureDecision(action="pause", next_key=None, status=policy.pause_status, fallback_reason=None) if policy.action == "retry" and attempts <= int(policy.max_retries): return FailureDecision(action="retry", next_key=state.current_key, status=TaskStatus.retrying, fallback_reason=None) if policy.allow_fallback and state.fallback_engine is not None and not state.used_fallback and state.current_key == state.primary_key: reason = state.fallback_reason or f"primary_failed error_kind={kind}" return FailureDecision(action="fallback", next_key=state.fallback_key, status=TaskStatus.fallback_running, fallback_reason=reason) if policy.action == "fallback" and state.fallback_engine is not None and not state.used_fallback and state.current_key == state.primary_key: reason = state.fallback_reason or f"primary_failed error_kind={kind}" return FailureDecision(action="fallback", next_key=state.fallback_key, status=TaskStatus.fallback_running, fallback_reason=reason) return FailureDecision(action="fail", next_key=None, status=None, fallback_reason=None) def report_success(self, *, resources: SelectedResources) -> None: if resources.account_id: try: self.account_pool.report_success(resources.account_id) except Exception: pass if resources.session_id: try: self.session_pool.report_success(resources.session_id) except Exception: pass def report_failure(self, *, resources: SelectedResources, error_kind: str | None) -> None: kind = (error_kind or "").strip() or None if resources.account_id: try: self.account_pool.report_failure(resources.account_id, error_kind=kind) except Exception: pass if resources.session_id: try: self.session_pool.report_failure(resources.session_id, error_kind=kind) except Exception: pass def observe_result( self, *, state: ExecutionState, engine_name: str, ok: bool, error_kind: str | None, ) -> None: name = str(engine_name or state.current_key) if ok: self._set_streak(name, state.scope, 0) return kind = (error_kind or "").strip().lower() if state.engine_choice == "auto" and state.current_key == state.primary_key and kind in ( "auth", "rate", "risk", "captcha", "redirect", "redirect_to_login", ): self._set_streak(name, state.scope, self._get_streak(name, state.scope) + 1) else: self._set_streak(name, state.scope, 0) def switch_to(self, *, state: ExecutionState, key: str, fallback_reason: str | None) -> None: if key == state.current_key: return if key == state.fallback_key: state.used_fallback = True state.fallback_reason = fallback_reason or state.fallback_reason state.current_key = key state.current_engine = state.fallback_engine if key == state.fallback_key else state.primary_engine