XHS / service /stability_controller.py
Trae Bot
Upload Spider_XHS project
c481f8a
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Any, Dict, Tuple
from .account_pool import AccountPool
from .session_pool import SessionPool
from .tasks import TaskStatus
from .proxy_pool import proxy_pool
@dataclass(frozen=True)
class SelectedResources:
account_id: str | None
session_id: str | None
cookies: str | None
storage_state_path: str | None
proxy: str | None
@dataclass(frozen=True)
class AttemptPlan:
engine_key: str
engine: Any
engine_name: str
status: TaskStatus
is_fallback: bool
fallback_reason: str | None
resources: SelectedResources
payload: Dict[str, Any]
@dataclass
class ExecutionState:
scope: str
engine_choice: str
primary_key: str
primary_engine: Any
fallback_key: str | None
fallback_engine: Any | None
fallback_reason: str | None
current_key: str
current_engine: Any
used_fallback: bool
attempts_total: int
attempts_by_key: Dict[str, int]
@dataclass(frozen=True)
class FailureDecision:
action: str
next_key: str | None
status: TaskStatus | None
fallback_reason: str | None
@dataclass(frozen=True)
class ErrorPolicy:
action: str
max_retries: int
allow_fallback: bool
pause_status: TaskStatus | None = None
@dataclass(frozen=True)
class CaptchaChallenge:
engine_name: str
scope: str
task_id: str | None
payload: Dict[str, Any]
@dataclass(frozen=True)
class CaptchaSolution:
ok: bool
token: str | None = None
meta: Dict[str, Any] | None = None
class CaptchaSolverHook:
async def solve(self, challenge: CaptchaChallenge) -> CaptchaSolution:
raise NotImplementedError()
class StabilityController:
def __init__(self, *, engine_fallback_threshold: int = 3):
self.engine_fallback_threshold = int(engine_fallback_threshold)
self._failure_streaks: Dict[Tuple[str, str], int] = {}
self.account_pool = AccountPool.from_env()
self.session_pool = SessionPool.from_env()
self.captcha_solver_enabled = (str(os.getenv("CAPTCHA_SOLVER_ENABLED", "1") or "").strip().lower() in ("1", "true", "yes", "y", "on"))
self.captcha_solver: CaptchaSolverHook | None = None
self._policy: Dict[str, ErrorPolicy] = {
"timeout": ErrorPolicy(action="retry", max_retries=2, allow_fallback=True),
"proxy_failed": ErrorPolicy(action="retry", max_retries=2, allow_fallback=True),
"rate": ErrorPolicy(action="pause", max_retries=0, allow_fallback=True, pause_status=TaskStatus.risk_paused),
"auth": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True),
"redirect": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True),
"redirect_to_login": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True),
"risk": ErrorPolicy(action="pause", max_retries=0, allow_fallback=False, pause_status=TaskStatus.waiting_rpa),
"captcha": ErrorPolicy(action="pause", max_retries=0, allow_fallback=False, pause_status=TaskStatus.waiting_rpa),
"missing_dependency": ErrorPolicy(action="fallback", max_retries=0, allow_fallback=True),
"parse": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True),
}
def set_captcha_solver(self, solver: CaptchaSolverHook | None) -> None:
self.captcha_solver = solver
def build_captcha_challenge(
self,
*,
engine_name: str,
scope: str,
task_id: str | None,
payload: Dict[str, Any],
) -> CaptchaChallenge:
return CaptchaChallenge(
engine_name=str(engine_name or "unknown"),
scope=str(scope or "unknown"),
task_id=str(task_id) if task_id is not None else None,
payload=dict(payload or {}),
)
async def try_solve_captcha(self, *, challenge: CaptchaChallenge) -> CaptchaSolution | None:
if not bool(self.captcha_solver_enabled):
return None
solver = self.captcha_solver
if solver is None:
return None
return await solver.solve(challenge)
def _get_streak(self, engine_name: str, scope: str) -> int:
return int(self._failure_streaks.get((str(engine_name), str(scope)), 0))
def _set_streak(self, engine_name: str, scope: str, value: int) -> None:
key = (str(engine_name), str(scope))
if int(value) <= 0:
self._failure_streaks.pop(key, None)
return
self._failure_streaks[key] = int(value)
@staticmethod
def _normalize_engine_choice(choice: str) -> str:
value = (choice or "").strip().lower()
if value in ("api", "spider_xhs", "xhs_api"):
return "api"
if value in ("browser", "mediacrawler", "xhs_browser"):
return "browser"
if value in ("auto", ""):
return "auto"
return value
def init_execution(
self,
*,
task_engine_choice: str,
scope: str,
engine_api: Any,
engine_browser: Any,
) -> ExecutionState:
choice = self._normalize_engine_choice(task_engine_choice)
threshold = int(self.engine_fallback_threshold)
primary_key = "api"
fallback_key: str | None = None
fallback_reason: str | None = None
api_name = getattr(engine_api, "name", "spider_xhs")
streak = self._get_streak(api_name, scope)
if choice == "api":
primary_key = "api"
elif choice == "browser":
primary_key = "browser"
elif choice == "auto":
if threshold > 0 and streak >= threshold:
primary_key = "browser"
fallback_reason = f"threshold_exceeded scope={scope} streak={streak} threshold={threshold}"
else:
primary_key = "api"
fallback_key = "browser"
else:
raise ValueError(f"unsupported_engine={choice}")
primary_engine = engine_api if primary_key == "api" else engine_browser
fb_engine = engine_browser if fallback_key == "browser" else None
return ExecutionState(
scope=str(scope),
engine_choice=choice,
primary_key=primary_key,
primary_engine=primary_engine,
fallback_key=fallback_key,
fallback_engine=fb_engine,
fallback_reason=fallback_reason,
current_key=primary_key,
current_engine=primary_engine,
used_fallback=False,
attempts_total=0,
attempts_by_key={},
)
def select_resources(self, *, engine_key: str, payload: Dict[str, Any]) -> SelectedResources:
acc = self.account_pool.pick_available()
cookies_in_payload = payload.get("cookies") or payload.get("cookie") or payload.get("cookies_str")
storage_state_in_payload = payload.get("storage_state_path") or payload.get("storage_state")
proxy_in_payload = payload.get("proxy") or payload.get("proxies")
session = None
if engine_key == "api":
if not cookies_in_payload:
session = self.session_pool.acquire_cookie_session()
else:
if not storage_state_in_payload:
session = self.session_pool.acquire_storage_state_session()
cookies = str(session.cookies) if session is not None and session.cookies else None
storage_state_path = str(session.storage_state_path) if session is not None and session.storage_state_path else None
proxy = None
if isinstance(proxy_in_payload, str):
proxy = str(proxy_in_payload).strip() or None
if proxy is None:
proxy = proxy_pool.get_random_proxy()
return SelectedResources(
account_id=str(acc.id) if acc is not None else None,
session_id=str(session.id) if session is not None else None,
cookies=cookies,
storage_state_path=storage_state_path,
proxy=proxy,
)
def apply_resources_to_payload(self, *, payload: Dict[str, Any], resources: SelectedResources) -> Dict[str, Any]:
out = dict(payload or {})
if resources.cookies is not None and not (out.get("cookies") or out.get("cookie") or out.get("cookies_str")):
out["cookies"] = resources.cookies
if resources.storage_state_path is not None and not (out.get("storage_state_path") or out.get("storage_state")):
out["storage_state_path"] = resources.storage_state_path
if resources.proxy is not None and not (out.get("proxy") or out.get("proxies")):
out["proxy"] = resources.proxy
return out
def build_attempt_plan(self, *, state: ExecutionState, payload: Dict[str, Any]) -> AttemptPlan:
resources = self.select_resources(engine_key=state.current_key, payload=payload)
patched = self.apply_resources_to_payload(payload=payload, resources=resources)
state.attempts_total = int(state.attempts_total) + 1
state.attempts_by_key[state.current_key] = int(state.attempts_by_key.get(state.current_key, 0)) + 1
engine = state.current_engine
engine_name = str(getattr(engine, "name", state.current_key))
is_fallback = bool(state.used_fallback or (state.current_key != state.primary_key))
if is_fallback:
status = TaskStatus.fallback_running
elif state.attempts_by_key.get(state.current_key, 0) > 1:
status = TaskStatus.retrying
else:
status = TaskStatus.running
return AttemptPlan(
engine_key=state.current_key,
engine=engine,
engine_name=engine_name,
status=status,
is_fallback=is_fallback,
fallback_reason=state.fallback_reason if is_fallback else None,
resources=resources,
payload=patched,
)
def decide_after_failure(
self,
*,
state: ExecutionState,
engine_name: str,
error_kind: str | None,
) -> FailureDecision:
kind = (error_kind or "").strip().lower() or "parse"
policy = self._policy.get(kind) or self._policy["parse"]
attempts = int(state.attempts_by_key.get(state.current_key, 0))
if policy.action == "pause":
if (
policy.allow_fallback
and state.fallback_engine is not None
and not state.used_fallback
and state.current_key == state.primary_key
):
reason = state.fallback_reason or f"primary_failed error_kind={kind}"
return FailureDecision(
action="fallback",
next_key=state.fallback_key,
status=TaskStatus.fallback_running,
fallback_reason=reason,
)
return FailureDecision(action="pause", next_key=None, status=policy.pause_status, fallback_reason=None)
if policy.action == "retry" and attempts <= int(policy.max_retries):
return FailureDecision(action="retry", next_key=state.current_key, status=TaskStatus.retrying, fallback_reason=None)
if policy.allow_fallback and state.fallback_engine is not None and not state.used_fallback and state.current_key == state.primary_key:
reason = state.fallback_reason or f"primary_failed error_kind={kind}"
return FailureDecision(action="fallback", next_key=state.fallback_key, status=TaskStatus.fallback_running, fallback_reason=reason)
if policy.action == "fallback" and state.fallback_engine is not None and not state.used_fallback and state.current_key == state.primary_key:
reason = state.fallback_reason or f"primary_failed error_kind={kind}"
return FailureDecision(action="fallback", next_key=state.fallback_key, status=TaskStatus.fallback_running, fallback_reason=reason)
return FailureDecision(action="fail", next_key=None, status=None, fallback_reason=None)
def report_success(self, *, resources: SelectedResources) -> None:
if resources.account_id:
try:
self.account_pool.report_success(resources.account_id)
except Exception:
pass
if resources.session_id:
try:
self.session_pool.report_success(resources.session_id)
except Exception:
pass
def report_failure(self, *, resources: SelectedResources, error_kind: str | None) -> None:
kind = (error_kind or "").strip() or None
if resources.account_id:
try:
self.account_pool.report_failure(resources.account_id, error_kind=kind)
except Exception:
pass
if resources.session_id:
try:
self.session_pool.report_failure(resources.session_id, error_kind=kind)
except Exception:
pass
def observe_result(
self,
*,
state: ExecutionState,
engine_name: str,
ok: bool,
error_kind: str | None,
) -> None:
name = str(engine_name or state.current_key)
if ok:
self._set_streak(name, state.scope, 0)
return
kind = (error_kind or "").strip().lower()
if state.engine_choice == "auto" and state.current_key == state.primary_key and kind in (
"auth",
"rate",
"risk",
"captcha",
"redirect",
"redirect_to_login",
):
self._set_streak(name, state.scope, self._get_streak(name, state.scope) + 1)
else:
self._set_streak(name, state.scope, 0)
def switch_to(self, *, state: ExecutionState, key: str, fallback_reason: str | None) -> None:
if key == state.current_key:
return
if key == state.fallback_key:
state.used_fallback = True
state.fallback_reason = fallback_reason or state.fallback_reason
state.current_key = key
state.current_engine = state.fallback_engine if key == state.fallback_key else state.primary_engine