File size: 14,291 Bytes
c481f8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
from __future__ import annotations

import os
from dataclasses import dataclass
from typing import Any, Dict, Tuple

from .account_pool import AccountPool
from .session_pool import SessionPool
from .tasks import TaskStatus
from .proxy_pool import proxy_pool


@dataclass(frozen=True)
class SelectedResources:
    account_id: str | None
    session_id: str | None
    cookies: str | None
    storage_state_path: str | None
    proxy: str | None


@dataclass(frozen=True)
class AttemptPlan:
    engine_key: str
    engine: Any
    engine_name: str
    status: TaskStatus
    is_fallback: bool
    fallback_reason: str | None
    resources: SelectedResources
    payload: Dict[str, Any]


@dataclass
class ExecutionState:
    scope: str
    engine_choice: str
    primary_key: str
    primary_engine: Any
    fallback_key: str | None
    fallback_engine: Any | None
    fallback_reason: str | None
    current_key: str
    current_engine: Any
    used_fallback: bool
    attempts_total: int
    attempts_by_key: Dict[str, int]


@dataclass(frozen=True)
class FailureDecision:
    action: str
    next_key: str | None
    status: TaskStatus | None
    fallback_reason: str | None


@dataclass(frozen=True)
class ErrorPolicy:
    action: str
    max_retries: int
    allow_fallback: bool
    pause_status: TaskStatus | None = None


@dataclass(frozen=True)
class CaptchaChallenge:
    engine_name: str
    scope: str
    task_id: str | None
    payload: Dict[str, Any]


@dataclass(frozen=True)
class CaptchaSolution:
    ok: bool
    token: str | None = None
    meta: Dict[str, Any] | None = None


class CaptchaSolverHook:
    async def solve(self, challenge: CaptchaChallenge) -> CaptchaSolution:
        raise NotImplementedError()


class StabilityController:
    def __init__(self, *, engine_fallback_threshold: int = 3):
        self.engine_fallback_threshold = int(engine_fallback_threshold)
        self._failure_streaks: Dict[Tuple[str, str], int] = {}
        self.account_pool = AccountPool.from_env()
        self.session_pool = SessionPool.from_env()
        self.captcha_solver_enabled = (str(os.getenv("CAPTCHA_SOLVER_ENABLED", "1") or "").strip().lower() in ("1", "true", "yes", "y", "on"))
        self.captcha_solver: CaptchaSolverHook | None = None
        self._policy: Dict[str, ErrorPolicy] = {
            "timeout": ErrorPolicy(action="retry", max_retries=2, allow_fallback=True),
            "proxy_failed": ErrorPolicy(action="retry", max_retries=2, allow_fallback=True),
            "rate": ErrorPolicy(action="pause", max_retries=0, allow_fallback=True, pause_status=TaskStatus.risk_paused),
            "auth": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True),
            "redirect": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True),
            "redirect_to_login": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True),
            "risk": ErrorPolicy(action="pause", max_retries=0, allow_fallback=False, pause_status=TaskStatus.waiting_rpa),
            "captcha": ErrorPolicy(action="pause", max_retries=0, allow_fallback=False, pause_status=TaskStatus.waiting_rpa),
            "missing_dependency": ErrorPolicy(action="fallback", max_retries=0, allow_fallback=True),
            "parse": ErrorPolicy(action="retry", max_retries=1, allow_fallback=True),
        }

    def set_captcha_solver(self, solver: CaptchaSolverHook | None) -> None:
        self.captcha_solver = solver

    def build_captcha_challenge(
        self,
        *,
        engine_name: str,
        scope: str,
        task_id: str | None,
        payload: Dict[str, Any],
    ) -> CaptchaChallenge:
        return CaptchaChallenge(
            engine_name=str(engine_name or "unknown"),
            scope=str(scope or "unknown"),
            task_id=str(task_id) if task_id is not None else None,
            payload=dict(payload or {}),
        )

    async def try_solve_captcha(self, *, challenge: CaptchaChallenge) -> CaptchaSolution | None:
        if not bool(self.captcha_solver_enabled):
            return None
        solver = self.captcha_solver
        if solver is None:
            return None
        return await solver.solve(challenge)

    def _get_streak(self, engine_name: str, scope: str) -> int:
        return int(self._failure_streaks.get((str(engine_name), str(scope)), 0))

    def _set_streak(self, engine_name: str, scope: str, value: int) -> None:
        key = (str(engine_name), str(scope))
        if int(value) <= 0:
            self._failure_streaks.pop(key, None)
            return
        self._failure_streaks[key] = int(value)

    @staticmethod
    def _normalize_engine_choice(choice: str) -> str:
        value = (choice or "").strip().lower()
        if value in ("api", "spider_xhs", "xhs_api"):
            return "api"
        if value in ("browser", "mediacrawler", "xhs_browser"):
            return "browser"
        if value in ("auto", ""):
            return "auto"
        return value

    def init_execution(
        self,
        *,
        task_engine_choice: str,
        scope: str,
        engine_api: Any,
        engine_browser: Any,
    ) -> ExecutionState:
        choice = self._normalize_engine_choice(task_engine_choice)
        threshold = int(self.engine_fallback_threshold)
        primary_key = "api"
        fallback_key: str | None = None
        fallback_reason: str | None = None

        api_name = getattr(engine_api, "name", "spider_xhs")
        streak = self._get_streak(api_name, scope)
        if choice == "api":
            primary_key = "api"
        elif choice == "browser":
            primary_key = "browser"
        elif choice == "auto":
            if threshold > 0 and streak >= threshold:
                primary_key = "browser"
                fallback_reason = f"threshold_exceeded scope={scope} streak={streak} threshold={threshold}"
            else:
                primary_key = "api"
                fallback_key = "browser"
        else:
            raise ValueError(f"unsupported_engine={choice}")

        primary_engine = engine_api if primary_key == "api" else engine_browser
        fb_engine = engine_browser if fallback_key == "browser" else None
        return ExecutionState(
            scope=str(scope),
            engine_choice=choice,
            primary_key=primary_key,
            primary_engine=primary_engine,
            fallback_key=fallback_key,
            fallback_engine=fb_engine,
            fallback_reason=fallback_reason,
            current_key=primary_key,
            current_engine=primary_engine,
            used_fallback=False,
            attempts_total=0,
            attempts_by_key={},
        )

    def select_resources(self, *, engine_key: str, payload: Dict[str, Any]) -> SelectedResources:
        acc = self.account_pool.pick_available()
        cookies_in_payload = payload.get("cookies") or payload.get("cookie") or payload.get("cookies_str")
        storage_state_in_payload = payload.get("storage_state_path") or payload.get("storage_state")
        proxy_in_payload = payload.get("proxy") or payload.get("proxies")

        session = None
        if engine_key == "api":
            if not cookies_in_payload:
                session = self.session_pool.acquire_cookie_session()
        else:
            if not storage_state_in_payload:
                session = self.session_pool.acquire_storage_state_session()

        cookies = str(session.cookies) if session is not None and session.cookies else None
        storage_state_path = str(session.storage_state_path) if session is not None and session.storage_state_path else None

        proxy = None
        if isinstance(proxy_in_payload, str):
            proxy = str(proxy_in_payload).strip() or None
        if proxy is None:
            proxy = proxy_pool.get_random_proxy()

        return SelectedResources(
            account_id=str(acc.id) if acc is not None else None,
            session_id=str(session.id) if session is not None else None,
            cookies=cookies,
            storage_state_path=storage_state_path,
            proxy=proxy,
        )

    def apply_resources_to_payload(self, *, payload: Dict[str, Any], resources: SelectedResources) -> Dict[str, Any]:
        out = dict(payload or {})
        if resources.cookies is not None and not (out.get("cookies") or out.get("cookie") or out.get("cookies_str")):
            out["cookies"] = resources.cookies
        if resources.storage_state_path is not None and not (out.get("storage_state_path") or out.get("storage_state")):
            out["storage_state_path"] = resources.storage_state_path
        if resources.proxy is not None and not (out.get("proxy") or out.get("proxies")):
            out["proxy"] = resources.proxy
        return out

    def build_attempt_plan(self, *, state: ExecutionState, payload: Dict[str, Any]) -> AttemptPlan:
        resources = self.select_resources(engine_key=state.current_key, payload=payload)
        patched = self.apply_resources_to_payload(payload=payload, resources=resources)
        state.attempts_total = int(state.attempts_total) + 1
        state.attempts_by_key[state.current_key] = int(state.attempts_by_key.get(state.current_key, 0)) + 1

        engine = state.current_engine
        engine_name = str(getattr(engine, "name", state.current_key))
        is_fallback = bool(state.used_fallback or (state.current_key != state.primary_key))
        if is_fallback:
            status = TaskStatus.fallback_running
        elif state.attempts_by_key.get(state.current_key, 0) > 1:
            status = TaskStatus.retrying
        else:
            status = TaskStatus.running

        return AttemptPlan(
            engine_key=state.current_key,
            engine=engine,
            engine_name=engine_name,
            status=status,
            is_fallback=is_fallback,
            fallback_reason=state.fallback_reason if is_fallback else None,
            resources=resources,
            payload=patched,
        )

    def decide_after_failure(
        self,
        *,
        state: ExecutionState,
        engine_name: str,
        error_kind: str | None,
    ) -> FailureDecision:
        kind = (error_kind or "").strip().lower() or "parse"
        policy = self._policy.get(kind) or self._policy["parse"]
        attempts = int(state.attempts_by_key.get(state.current_key, 0))

        if policy.action == "pause":
            if (
                policy.allow_fallback
                and state.fallback_engine is not None
                and not state.used_fallback
                and state.current_key == state.primary_key
            ):
                reason = state.fallback_reason or f"primary_failed error_kind={kind}"
                return FailureDecision(
                    action="fallback",
                    next_key=state.fallback_key,
                    status=TaskStatus.fallback_running,
                    fallback_reason=reason,
                )
            return FailureDecision(action="pause", next_key=None, status=policy.pause_status, fallback_reason=None)

        if policy.action == "retry" and attempts <= int(policy.max_retries):
            return FailureDecision(action="retry", next_key=state.current_key, status=TaskStatus.retrying, fallback_reason=None)

        if policy.allow_fallback and state.fallback_engine is not None and not state.used_fallback and state.current_key == state.primary_key:
            reason = state.fallback_reason or f"primary_failed error_kind={kind}"
            return FailureDecision(action="fallback", next_key=state.fallback_key, status=TaskStatus.fallback_running, fallback_reason=reason)

        if policy.action == "fallback" and state.fallback_engine is not None and not state.used_fallback and state.current_key == state.primary_key:
            reason = state.fallback_reason or f"primary_failed error_kind={kind}"
            return FailureDecision(action="fallback", next_key=state.fallback_key, status=TaskStatus.fallback_running, fallback_reason=reason)

        return FailureDecision(action="fail", next_key=None, status=None, fallback_reason=None)

    def report_success(self, *, resources: SelectedResources) -> None:
        if resources.account_id:
            try:
                self.account_pool.report_success(resources.account_id)
            except Exception:
                pass
        if resources.session_id:
            try:
                self.session_pool.report_success(resources.session_id)
            except Exception:
                pass

    def report_failure(self, *, resources: SelectedResources, error_kind: str | None) -> None:
        kind = (error_kind or "").strip() or None
        if resources.account_id:
            try:
                self.account_pool.report_failure(resources.account_id, error_kind=kind)
            except Exception:
                pass
        if resources.session_id:
            try:
                self.session_pool.report_failure(resources.session_id, error_kind=kind)
            except Exception:
                pass

    def observe_result(
        self,
        *,
        state: ExecutionState,
        engine_name: str,
        ok: bool,
        error_kind: str | None,
    ) -> None:
        name = str(engine_name or state.current_key)
        if ok:
            self._set_streak(name, state.scope, 0)
            return
        kind = (error_kind or "").strip().lower()
        if state.engine_choice == "auto" and state.current_key == state.primary_key and kind in (
            "auth",
            "rate",
            "risk",
            "captcha",
            "redirect",
            "redirect_to_login",
        ):
            self._set_streak(name, state.scope, self._get_streak(name, state.scope) + 1)
        else:
            self._set_streak(name, state.scope, 0)

    def switch_to(self, *, state: ExecutionState, key: str, fallback_reason: str | None) -> None:
        if key == state.current_key:
            return
        if key == state.fallback_key:
            state.used_fallback = True
            state.fallback_reason = fallback_reason or state.fallback_reason
        state.current_key = key
        state.current_engine = state.fallback_engine if key == state.fallback_key else state.primary_engine