from __future__ import annotations import hashlib import random import re import threading import time from collections import Counter, deque from datetime import datetime, timezone import urllib.parse from typing import Any, Dict import requests from loguru import logger import asyncio import os import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from orchestrator.agent_utils import get_llm, get_browser_with_auth from browser_use import Agent from .config import ServiceConfig from .stability_controller import StabilityController, CaptchaSolverHook, CaptchaChallenge, CaptchaSolution from .storage import LocalJsonStorage from .tasks import CallbackState, CallbackStatus, TaskStatus from engines import MediaCrawlerEngine, SpiderXHSEngine class AgenticCaptchaSolver(CaptchaSolverHook): def __init__(self): super().__init__() async def solve(self, challenge: CaptchaChallenge) -> CaptchaSolution: logger.info(f"AgenticCaptchaSolver invoked for task {challenge.task_id}, engine: {challenge.engine_name}") # Determine the storage state path based on payload payload = challenge.payload storage_state_path = payload.get("storage_state_path") or payload.get("storage_state") llm = get_llm() browser = get_browser_with_auth(storage_state_path) prompt = """ 页面出现验证码,请观察当前页面的截图。 如果是滑块验证码,请拖动滑块到缺口位置。 如果是点选验证码,请按顺序点击图中要求的文字。 如果成功通过验证码,请返回 success,否则返回 failed。 """ agent = Agent(task=prompt, llm=llm, browser=browser) try: result = await agent.run() final_text = result.final_result() if hasattr(result, 'final_result') else str(result) ok = "success" in final_text.lower() return CaptchaSolution(ok=ok, meta={"agent_result": final_text}) except Exception as e: logger.error(f"AgenticCaptchaSolver failed: {e}") return CaptchaSolution(ok=False, meta={"error": str(e)}) finally: await browser.close() class _RunnerCounters: def __init__(self, *, recent_window_seconds: int = 300, recent_maxlen: int = 2000): self._lock = threading.Lock() self._recent_window_seconds = int(recent_window_seconds) self._recent: deque[tuple[float, bool]] = deque(maxlen=int(recent_maxlen)) self._started_total = 0 self._succeeded_total = 0 self._failed_total = 0 self._engine_usage_total: Counter[str] = Counter() self._engine_succeeded_total: Counter[str] = Counter() self._engine_failed_total: Counter[str] = Counter() def record_started(self) -> None: with self._lock: self._started_total += 1 def _prune_recent(self, now_ts: float) -> None: window = float(self._recent_window_seconds) while self._recent: ts, _ = self._recent[0] if now_ts - float(ts) <= window: return self._recent.popleft() def record_finished(self, *, engine: str, ok: bool) -> None: name = str(engine or "unknown") now_ts = time.time() with self._lock: self._engine_usage_total[name] += 1 if bool(ok): self._succeeded_total += 1 self._engine_succeeded_total[name] += 1 else: self._failed_total += 1 self._engine_failed_total[name] += 1 self._recent.append((now_ts, bool(ok))) self._prune_recent(now_ts) def snapshot(self) -> Dict[str, Any]: now_ts = time.time() with self._lock: self._prune_recent(now_ts) recent_total = len(self._recent) recent_failed = sum(1 for _, ok in self._recent if not ok) recent_rate = float(recent_failed) / float(recent_total) if recent_total > 0 else 0.0 return { "started_total": int(self._started_total), "succeeded_total": int(self._succeeded_total), "failed_total": int(self._failed_total), "engine_usage_total": dict(self._engine_usage_total), "engine_succeeded_total": dict(self._engine_succeeded_total), "engine_failed_total": dict(self._engine_failed_total), "recent_window_seconds": int(self._recent_window_seconds), "recent_total": int(recent_total), "recent_failed": int(recent_failed), "recent_failure_rate": float(recent_rate), } class TaskRunner: def __init__(self, *, storage: LocalJsonStorage, config: ServiceConfig): self.storage = storage self.config = config self._engine_api = SpiderXHSEngine(proxy=config.proxy) self._engine_browser = MediaCrawlerEngine( proxy=config.proxy, storage_state_paths=config.mediacrawler_storage_state_paths, ) self._ops_counters = _RunnerCounters() self._stability = StabilityController( engine_fallback_threshold=int(getattr(config, "engine_fallback_threshold", 3) or 3), ) self._stability.set_captcha_solver(AgenticCaptchaSolver()) def get_ops_stats(self) -> Dict[str, Any]: return self._ops_counters.snapshot() @staticmethod def _normalize_engine_choice(choice: str) -> str: value = (choice or "").strip().lower() if value in ("api", "spider_xhs", "xhs_api"): return "api" if value in ("browser", "mediacrawler", "xhs_browser"): return "browser" if value in ("auto", ""): return "auto" return value @staticmethod def _first_str(*values: Any) -> str | None: for value in values: if value is None: continue if isinstance(value, str): v = value.strip() if v != "": return v try: v = str(value).strip() if v != "": return v except Exception: continue return None @staticmethod def _extract_scope(task_type: str, payload: Dict[str, Any]) -> str: url_candidate = TaskRunner._first_str( payload.get("note_url"), payload.get("url"), payload.get("user_url"), ) if url_candidate: try: parsed = urllib.parse.urlparse(url_candidate) if parsed.hostname: return parsed.hostname.lower() except Exception: pass return (task_type or "unknown").strip().lower() or "unknown" @staticmethod def _safe_url_for_log(url: str | None) -> str: if url is None: return "" u = str(url).strip() if u == "": return "" try: parsed = urllib.parse.urlparse(u) host = parsed.hostname or "" if parsed.port: host = f"{host}:{parsed.port}" if host == "": return "" scheme = parsed.scheme or "http" path = parsed.path or "" return f"{scheme}://{host}{path}" except Exception: return "" @staticmethod def _safe_error_text(text: Any) -> str: if text is None: return "" raw = str(text) if raw.strip() == "": return "" raw = re.sub( r"https?://[^\s]+", lambda m: TaskRunner._safe_url_for_log(m.group(0)) or "", raw, ) raw = re.sub(r"(?i)(xsec_token=)[^&\s]+", r"\1***", raw) raw = re.sub(r"(?i)(cookies?=)[^&\s]+", r"\1***", raw) raw = re.sub(r"(?i)(authorization:\s*)(bearer\s+)?[^\s]+", r"\1***", raw) raw = raw.strip() if len(raw) > 200: return raw[:200] + "..." return raw def _safe_run_engine(self, engine: Any, task: Any) -> Dict[str, Any]: try: output = engine.run(task) return output.to_dict() except Exception as e: payload = getattr(task, "payload", {}) or {} source_type = getattr(task, "task_type", "") or "" source_ref = str( payload.get("note_url") or payload.get("url") or payload.get("query") or payload.get("keyword") or payload.get("user_id") or "" ) dedup_payload = f"{source_type}:{source_ref}".encode("utf-8", errors="ignore") return { "raw": None, "normalized": None, "meta": { "task_id": getattr(task, "id", None) or "", "source_engine": "api" if getattr(engine, "name", "") == "spider_xhs" else "browser", "engine_name": getattr(engine, "name", None) or "", "source_type": source_type, "source_ref": source_ref, "operator": "system", "ingested_at": datetime.now(timezone.utc).isoformat(), "dedup_key": hashlib.sha1(dedup_payload).hexdigest(), "ok": False, "error_kind": "parse", "error_message": str(e), }, } def _persist_raw_html_snapshot(self, task_id: str, engine: Any, result: Dict[str, Any]) -> str | None: engine_name = getattr(engine, "name", None) if engine_name != getattr(self._engine_browser, "name", "mediacrawler"): return None raw = result.get("raw") if not isinstance(raw, dict): return None html = raw.get("html") if not isinstance(html, str) or html.strip() == "": return None path = self.storage.write_raw_html_snapshot(task_id, html) try: rel = path.relative_to(self.storage.base_dir) return str(rel) except Exception: return str(path) @staticmethod def _to_dict(model: Any) -> Dict[str, Any]: if hasattr(model, "model_dump"): return model.model_dump() if hasattr(model, "dict"): return model.dict() return dict(model) @staticmethod def _callback_idempotency_key(task_id: str, result: Dict[str, Any]) -> str: meta = result.get("meta") if isinstance(result, dict) else None dedup_key = meta.get("dedup_key") if isinstance(meta, dict) else None payload = f"{task_id}:{dedup_key or ''}".encode("utf-8", errors="ignore") return hashlib.sha256(payload).hexdigest() @staticmethod def _sleep_backoff(attempt: int, *, base_s: float = 0.6, max_s: float = 10.0) -> None: exp = min(max_s, base_s * (2**attempt)) jitter = random.uniform(0.0, exp * 0.25) time.sleep(exp + jitter) def _push_callback(self, task_id: str, result: Dict[str, Any]) -> CallbackState | None: callback_url = getattr(self.config, "callback_url", None) if callback_url is None or str(callback_url).strip() == "": return None existing = self.storage.get_task(task_id) existing_state = getattr(existing, "callback", None) if existing is not None else None if existing_state is not None and getattr(existing_state, "status", None) == CallbackStatus.succeeded: return existing_state idempotency_key = self._callback_idempotency_key(task_id, result) state = CallbackState( status=CallbackStatus.pending, callback_url=callback_url, idempotency_key=idempotency_key, attempts=int(getattr(existing_state, "attempts", 0) or 0), last_attempt_at=getattr(existing_state, "last_attempt_at", None), last_http_status=getattr(existing_state, "last_http_status", None), last_error=getattr(existing_state, "last_error", None), next_retry_at=getattr(existing_state, "next_retry_at", None), ) self.storage.update_task(task_id, callback=state) body = {"task_id": task_id, "result": result} headers = {"Idempotency-Key": idempotency_key} max_attempts = 5 safe_callback_url = self._safe_url_for_log(callback_url) logger.bind(task_id=task_id, status="callback").info( "callback_started callback_url={} attempts={}", safe_callback_url, int(state.attempts), ) for attempt in range(max_attempts): state.attempts = int(state.attempts) + 1 state.last_attempt_at = time.time() state.next_retry_at = None self.storage.update_task(task_id, callback=state) try: resp = requests.post(callback_url, json=body, headers=headers, timeout=(5.0, 20.0)) state.last_http_status = int(resp.status_code) if 200 <= int(resp.status_code) < 300: state.status = CallbackStatus.succeeded state.last_error = None state.next_retry_at = None self.storage.update_task(task_id, callback=state) self.storage.delete_callback_failure(task_id) logger.bind(task_id=task_id, status="callback").info( "callback_succeeded callback_url={} http_status={} attempts={}", safe_callback_url, int(resp.status_code), int(state.attempts), ) return state state.last_error = f"http_status={resp.status_code}" if int(resp.status_code) not in (429, 500, 502, 503, 504) and attempt >= 0: break except Exception as e: state.last_http_status = None state.last_error = str(e) if attempt < max_attempts - 1: self._sleep_backoff(attempt) state.status = CallbackStatus.failed state.next_retry_at = time.time() + 60.0 self.storage.update_task(task_id, callback=state) self.storage.write_callback_failure( task_id, { "task_id": task_id, "callback_url": callback_url, "idempotency_key": idempotency_key, "body": body, "callback": self._to_dict(state), "saved_at": time.time(), }, ) logger.bind(task_id=task_id, status="callback").warning( "callback_failed callback_url={} attempts={} last_http_status={} last_error={}", safe_callback_url, int(state.attempts), str(state.last_http_status) if state.last_http_status is not None else "", self._safe_error_text(state.last_error), ) return state def import_extension_result(self, task_id: str, result: Dict[str, Any]) -> TaskRecord: task = self.storage.get_task(task_id) if task is None: raise KeyError("task not found") self.storage.write_result(task_id, result) updated = self.storage.update_task(task_id, status=TaskStatus.rpa_imported, engine="extension_rpa", error=None) try: self._push_callback(task_id, result) except Exception: pass return updated or self.storage.get_task(task_id) or task def retry_callback(self, task_id: str) -> CallbackState: callback_url = getattr(self.config, "callback_url", None) if callback_url is None or str(callback_url).strip() == "": raise ValueError("callback_url not configured") task = self.storage.get_task(task_id) if task is None: raise KeyError("task not found") result = self.storage.read_result(task_id) if result is None: raise RuntimeError("result not ready") failure = self.storage.read_callback_failure(task_id) or {} url = str(failure.get("callback_url") or callback_url) body = failure.get("body") if not isinstance(body, dict): body = {"task_id": task_id, "result": result} idempotency_key = str( failure.get("idempotency_key") or self._callback_idempotency_key(task_id, result) ) body_task_id = body.get("task_id") if body_task_id != task_id: body["task_id"] = task_id if "result" not in body: body["result"] = result state = CallbackState( status=CallbackStatus.pending, callback_url=url, idempotency_key=idempotency_key, attempts=int(getattr(getattr(task, "callback", None), "attempts", 0) or 0), ) self.storage.update_task(task_id, callback=state) headers = {"Idempotency-Key": idempotency_key} max_attempts = 5 safe_url = self._safe_url_for_log(url) logger.bind(task_id=task_id, status="callback").info( "callback_retry_started callback_url={} attempts={}", safe_url, int(state.attempts), ) for attempt in range(max_attempts): state.attempts = int(state.attempts) + 1 state.last_attempt_at = time.time() state.next_retry_at = None self.storage.update_task(task_id, callback=state) try: resp = requests.post(url, json=body, headers=headers, timeout=(5.0, 20.0)) state.last_http_status = int(resp.status_code) if 200 <= int(resp.status_code) < 300: state.status = CallbackStatus.succeeded state.last_error = None state.next_retry_at = None self.storage.update_task(task_id, callback=state) self.storage.delete_callback_failure(task_id) logger.bind(task_id=task_id, status="callback").info( "callback_retry_succeeded callback_url={} http_status={} attempts={}", safe_url, int(resp.status_code), int(state.attempts), ) return state state.last_error = f"http_status={resp.status_code}" if int(resp.status_code) not in (429, 500, 502, 503, 504) and attempt >= 0: break except Exception as e: state.last_http_status = None state.last_error = str(e) if attempt < max_attempts - 1: self._sleep_backoff(attempt) state.status = CallbackStatus.failed state.next_retry_at = time.time() + 60.0 self.storage.update_task(task_id, callback=state) self.storage.write_callback_failure( task_id, { "task_id": task_id, "callback_url": url, "idempotency_key": idempotency_key, "body": body, "callback": self._to_dict(state), "saved_at": time.time(), }, ) logger.bind(task_id=task_id, status="callback").warning( "callback_retry_failed callback_url={} attempts={} last_http_status={} last_error={}", safe_url, int(state.attempts), str(state.last_http_status) if state.last_http_status is not None else "", self._safe_error_text(state.last_error), ) return state def run_task(self, task_id: str) -> None: task = self.storage.get_task(task_id) if task is None: return payload = getattr(task, "payload", {}) or {} scope = self._extract_scope(getattr(task, "task_type", "") or "", payload) task_type = str(getattr(task, "task_type", "") or "") engine_req = str(getattr(task, "engine", None) or "") logger.bind(task_id=task_id, task_type=task_type, engine=engine_req, status="running").info( "task_started scope={}", scope, ) self._ops_counters.record_started() self.storage.update_task(task_id, status=TaskStatus.running) try: if (getattr(task, "task_type", "") or "").strip().lower() == "selftest": operator = str(payload.get("operator") or "system") dedup_payload = f"selftest:{task_id}".encode("utf-8", errors="ignore") meta: Dict[str, Any] = { "task_id": task_id, "source_engine": "selftest", "engine_name": "selftest", "source_type": "selftest", "source_ref": "", "operator": operator, "ingested_at": datetime.now(timezone.utc).isoformat(), "dedup_key": hashlib.sha1(dedup_payload).hexdigest(), "ok": True, "primary_engine": "selftest", "final_engine": "selftest", } result: Dict[str, Any] = { "raw": {"message": "ok"}, "normalized": {"kind": "selftest"}, "meta": meta, } self.storage.update_task(task_id, engine="selftest") self.storage.write_result(task_id, result) self.storage.update_task(task_id, status=TaskStatus.succeeded, error=None) self._ops_counters.record_finished(engine="selftest", ok=True) logger.bind(task_id=task_id, task_type=task_type, engine="selftest", status="succeeded").info( "task_succeeded" ) try: self._push_callback(task_id, result) except Exception: pass return engine_choice = self._normalize_engine_choice(task.engine or self.config.engine_strategy or "auto") exec_state = self._stability.init_execution( task_engine_choice=engine_choice, scope=scope, engine_api=self._engine_api, engine_browser=self._engine_browser, ) primary_name = str(getattr(exec_state.primary_engine, "name", exec_state.primary_key)) retry_count = int(getattr(task, "retry_count", 0) or 0) final_engine = exec_state.primary_engine final_name = primary_name final_result: Dict[str, Any] | None = None paused_status: TaskStatus | None = None while True: payload_now = getattr(task, "payload", {}) or {} plan = self._stability.build_attempt_plan(state=exec_state, payload=payload_now) if plan.status in (TaskStatus.retrying, TaskStatus.fallback_running): retry_count += 1 self.storage.update_task( task_id, status=plan.status, engine=plan.engine_name, retry_count=retry_count, payload=plan.payload, ) task.payload = plan.payload task.engine = plan.engine_name result = self._safe_run_engine(plan.engine, task) meta = result.get("meta") if isinstance(result.get("meta"), dict) else {} ok = bool(meta.get("ok")) error_kind = str(meta.get("error_kind") or "").strip().lower() or None self._stability.observe_result( state=exec_state, engine_name=plan.engine_name, ok=ok, error_kind=error_kind, ) if ok: self._stability.report_success(resources=plan.resources) final_engine = plan.engine final_name = plan.engine_name final_result = result break self._stability.report_failure(resources=plan.resources, error_kind=error_kind) decision = self._stability.decide_after_failure( state=exec_state, engine_name=plan.engine_name, error_kind=error_kind, ) if decision.action == "pause" and error_kind in ("captcha", "risk"): logger.info(f"Triggering AgenticCaptchaSolver for error_kind={error_kind}") challenge = self._stability.build_captcha_challenge( engine_name=plan.engine_name, scope=scope, task_id=task_id, payload=plan.payload, ) try: solution = asyncio.run(self._stability.try_solve_captcha(challenge=challenge)) if solution and solution.ok: logger.info("AgenticCaptchaSolver succeeded! Retrying task...") TaskRunner._sleep_backoff(int(exec_state.attempts_by_key.get(exec_state.current_key, 1) or 1)) continue else: logger.warning(f"AgenticCaptchaSolver failed. Solution: {solution}") except Exception as e: logger.error(f"Failed to run AgenticCaptchaSolver: {e}") if decision.action == "retry": TaskRunner._sleep_backoff(int(exec_state.attempts_by_key.get(exec_state.current_key, 1) or 1)) continue if decision.action == "fallback" and decision.next_key is not None: self._stability.switch_to(state=exec_state, key=decision.next_key, fallback_reason=decision.fallback_reason) continue final_engine = plan.engine final_name = plan.engine_name final_result = result if decision.action == "pause": paused_status = decision.status or TaskStatus.waiting_rpa break if final_result is None: raise RuntimeError("runner_no_result") meta = final_result.get("meta") if isinstance(final_result.get("meta"), dict) else {} meta = dict(meta or {}) meta["primary_engine"] = primary_name meta["final_engine"] = str(final_name) if exec_state.fallback_reason is not None: meta["fallback_reason"] = exec_state.fallback_reason raw_html_path = self._persist_raw_html_snapshot(task.id, final_engine, final_result) if raw_html_path is not None: meta["raw_html_path"] = raw_html_path final_result["meta"] = meta self.storage.write_result(task.id, final_result) if paused_status is not None: err_msg = str(meta.get("error_message") or "engine_failed") err_kind = str(meta.get("error_kind") or "parse") self.storage.update_task( task_id, status=paused_status, error={"kind": err_kind, "message": err_msg, "engine": str(final_name)}, ) return if bool(meta.get("ok")): self.storage.update_task(task_id, status=TaskStatus.succeeded, error=None) self._ops_counters.record_finished(engine=str(final_name), ok=True) logger.bind(task_id=task_id, task_type=task_type, engine=str(final_name), status="succeeded").info( "task_succeeded" ) else: err_msg = str(meta.get("error_message") or "engine_failed") err_kind = str(meta.get("error_kind") or "parse") self.storage.update_task( task_id, status=TaskStatus.failed, error={"kind": err_kind, "message": err_msg, "engine": str(final_name)}, ) self._ops_counters.record_finished(engine=str(final_name), ok=False) logger.bind(task_id=task_id, task_type=task_type, engine=str(final_name), status="failed").error( "task_failed error_kind={} error_message={}", str(meta.get("error_kind") or ""), self._safe_error_text(err_msg), ) try: existing = self.storage.get_task(task_id) if paused_status is None and existing is not None and existing.status != TaskStatus.waiting_rpa: self._push_callback(task_id, final_result) except Exception: pass except Exception as e: source_type = getattr(task, "task_type", "") if task is not None else "" source_ref = "" engine_choice = "auto" try: engine_choice = self._normalize_engine_choice( getattr(task, "engine", None) or self.config.engine_strategy or "auto" ) except Exception: engine_choice = "auto" primary_engine = self._engine_api if engine_choice == "browser": primary_engine = self._engine_browser try: payload = getattr(task, "payload", {}) or {} source_ref = str( payload.get("note_url") or payload.get("url") or payload.get("query") or payload.get("keyword") or payload.get("user_id") or "" ) except Exception: source_ref = "" dedup_payload = f"{source_type}:{source_ref}".encode("utf-8", errors="ignore") primary_name = getattr(primary_engine, "name", "spider_xhs") result = { "raw": None, "normalized": None, "meta": { "task_id": task_id, "source_engine": "api" if primary_name == "spider_xhs" else "browser", "engine_name": primary_name, "source_type": source_type, "source_ref": source_ref, "operator": "system", "ingested_at": datetime.now(timezone.utc).isoformat(), "dedup_key": hashlib.sha1(dedup_payload).hexdigest(), "ok": False, "primary_engine": primary_name, "final_engine": primary_name, "fallback_reason": "runner_exception", "error_kind": "parse", "error_message": str(e), }, } self.storage.write_result(task_id, result) self.storage.update_task(task_id, status=TaskStatus.failed, error={"kind": "parse", "message": str(e)}) self._ops_counters.record_finished(engine=str(primary_name), ok=False) logger.bind(task_id=task_id, task_type=task_type, engine=str(primary_name), status="failed").error( "task_failed error_kind=parse error_message={}", self._safe_error_text(str(e)), ) try: self._push_callback(task_id, result) except Exception: pass