| from __future__ import annotations |
|
|
| import hashlib |
| import random |
| import re |
| import threading |
| import time |
| from collections import Counter, deque |
| from datetime import datetime, timezone |
| import urllib.parse |
| from typing import Any, Dict |
|
|
| import requests |
| from loguru import logger |
| import asyncio |
| import os |
| import sys |
|
|
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| from orchestrator.agent_utils import get_llm, get_browser_with_auth |
| from browser_use import Agent |
|
|
| from .config import ServiceConfig |
| from .stability_controller import StabilityController, CaptchaSolverHook, CaptchaChallenge, CaptchaSolution |
| from .storage import LocalJsonStorage |
| from .tasks import CallbackState, CallbackStatus, TaskStatus |
| from engines import MediaCrawlerEngine, SpiderXHSEngine |
|
|
| class AgenticCaptchaSolver(CaptchaSolverHook): |
| def __init__(self): |
| super().__init__() |
|
|
| async def solve(self, challenge: CaptchaChallenge) -> CaptchaSolution: |
| logger.info(f"AgenticCaptchaSolver invoked for task {challenge.task_id}, engine: {challenge.engine_name}") |
| |
| |
| payload = challenge.payload |
| storage_state_path = payload.get("storage_state_path") or payload.get("storage_state") |
| |
| llm = get_llm() |
| browser = get_browser_with_auth(storage_state_path) |
| |
| prompt = """ |
| 页面出现验证码,请观察当前页面的截图。 |
| 如果是滑块验证码,请拖动滑块到缺口位置。 |
| 如果是点选验证码,请按顺序点击图中要求的文字。 |
| 如果成功通过验证码,请返回 success,否则返回 failed。 |
| """ |
| |
| agent = Agent(task=prompt, llm=llm, browser=browser) |
| |
| try: |
| result = await agent.run() |
| final_text = result.final_result() if hasattr(result, 'final_result') else str(result) |
| ok = "success" in final_text.lower() |
| return CaptchaSolution(ok=ok, meta={"agent_result": final_text}) |
| except Exception as e: |
| logger.error(f"AgenticCaptchaSolver failed: {e}") |
| return CaptchaSolution(ok=False, meta={"error": str(e)}) |
| finally: |
| await browser.close() |
|
|
|
|
| class _RunnerCounters: |
| def __init__(self, *, recent_window_seconds: int = 300, recent_maxlen: int = 2000): |
| self._lock = threading.Lock() |
| self._recent_window_seconds = int(recent_window_seconds) |
| self._recent: deque[tuple[float, bool]] = deque(maxlen=int(recent_maxlen)) |
| self._started_total = 0 |
| self._succeeded_total = 0 |
| self._failed_total = 0 |
| self._engine_usage_total: Counter[str] = Counter() |
| self._engine_succeeded_total: Counter[str] = Counter() |
| self._engine_failed_total: Counter[str] = Counter() |
|
|
| def record_started(self) -> None: |
| with self._lock: |
| self._started_total += 1 |
|
|
| def _prune_recent(self, now_ts: float) -> None: |
| window = float(self._recent_window_seconds) |
| while self._recent: |
| ts, _ = self._recent[0] |
| if now_ts - float(ts) <= window: |
| return |
| self._recent.popleft() |
|
|
| def record_finished(self, *, engine: str, ok: bool) -> None: |
| name = str(engine or "unknown") |
| now_ts = time.time() |
| with self._lock: |
| self._engine_usage_total[name] += 1 |
| if bool(ok): |
| self._succeeded_total += 1 |
| self._engine_succeeded_total[name] += 1 |
| else: |
| self._failed_total += 1 |
| self._engine_failed_total[name] += 1 |
| self._recent.append((now_ts, bool(ok))) |
| self._prune_recent(now_ts) |
|
|
| def snapshot(self) -> Dict[str, Any]: |
| now_ts = time.time() |
| with self._lock: |
| self._prune_recent(now_ts) |
| recent_total = len(self._recent) |
| recent_failed = sum(1 for _, ok in self._recent if not ok) |
| recent_rate = float(recent_failed) / float(recent_total) if recent_total > 0 else 0.0 |
| return { |
| "started_total": int(self._started_total), |
| "succeeded_total": int(self._succeeded_total), |
| "failed_total": int(self._failed_total), |
| "engine_usage_total": dict(self._engine_usage_total), |
| "engine_succeeded_total": dict(self._engine_succeeded_total), |
| "engine_failed_total": dict(self._engine_failed_total), |
| "recent_window_seconds": int(self._recent_window_seconds), |
| "recent_total": int(recent_total), |
| "recent_failed": int(recent_failed), |
| "recent_failure_rate": float(recent_rate), |
| } |
|
|
|
|
| class TaskRunner: |
| def __init__(self, *, storage: LocalJsonStorage, config: ServiceConfig): |
| self.storage = storage |
| self.config = config |
| self._engine_api = SpiderXHSEngine(proxy=config.proxy) |
| self._engine_browser = MediaCrawlerEngine( |
| proxy=config.proxy, |
| storage_state_paths=config.mediacrawler_storage_state_paths, |
| ) |
| self._ops_counters = _RunnerCounters() |
| self._stability = StabilityController( |
| engine_fallback_threshold=int(getattr(config, "engine_fallback_threshold", 3) or 3), |
| ) |
| self._stability.set_captcha_solver(AgenticCaptchaSolver()) |
|
|
| def get_ops_stats(self) -> Dict[str, Any]: |
| return self._ops_counters.snapshot() |
|
|
| @staticmethod |
| def _normalize_engine_choice(choice: str) -> str: |
| value = (choice or "").strip().lower() |
| if value in ("api", "spider_xhs", "xhs_api"): |
| return "api" |
| if value in ("browser", "mediacrawler", "xhs_browser"): |
| return "browser" |
| if value in ("auto", ""): |
| return "auto" |
| return value |
|
|
| @staticmethod |
| def _first_str(*values: Any) -> str | None: |
| for value in values: |
| if value is None: |
| continue |
| if isinstance(value, str): |
| v = value.strip() |
| if v != "": |
| return v |
| try: |
| v = str(value).strip() |
| if v != "": |
| return v |
| except Exception: |
| continue |
| return None |
|
|
| @staticmethod |
| def _extract_scope(task_type: str, payload: Dict[str, Any]) -> str: |
| url_candidate = TaskRunner._first_str( |
| payload.get("note_url"), |
| payload.get("url"), |
| payload.get("user_url"), |
| ) |
| if url_candidate: |
| try: |
| parsed = urllib.parse.urlparse(url_candidate) |
| if parsed.hostname: |
| return parsed.hostname.lower() |
| except Exception: |
| pass |
| return (task_type or "unknown").strip().lower() or "unknown" |
|
|
| @staticmethod |
| def _safe_url_for_log(url: str | None) -> str: |
| if url is None: |
| return "" |
| u = str(url).strip() |
| if u == "": |
| return "" |
| try: |
| parsed = urllib.parse.urlparse(u) |
| host = parsed.hostname or "" |
| if parsed.port: |
| host = f"{host}:{parsed.port}" |
| if host == "": |
| return "" |
| scheme = parsed.scheme or "http" |
| path = parsed.path or "" |
| return f"{scheme}://{host}{path}" |
| except Exception: |
| return "" |
|
|
| @staticmethod |
| def _safe_error_text(text: Any) -> str: |
| if text is None: |
| return "" |
| raw = str(text) |
| if raw.strip() == "": |
| return "" |
| raw = re.sub( |
| r"https?://[^\s]+", |
| lambda m: TaskRunner._safe_url_for_log(m.group(0)) or "", |
| raw, |
| ) |
| raw = re.sub(r"(?i)(xsec_token=)[^&\s]+", r"\1***", raw) |
| raw = re.sub(r"(?i)(cookies?=)[^&\s]+", r"\1***", raw) |
| raw = re.sub(r"(?i)(authorization:\s*)(bearer\s+)?[^\s]+", r"\1***", raw) |
| raw = raw.strip() |
| if len(raw) > 200: |
| return raw[:200] + "..." |
| return raw |
|
|
| def _safe_run_engine(self, engine: Any, task: Any) -> Dict[str, Any]: |
| try: |
| output = engine.run(task) |
| return output.to_dict() |
| except Exception as e: |
| payload = getattr(task, "payload", {}) or {} |
| source_type = getattr(task, "task_type", "") or "" |
| source_ref = str( |
| payload.get("note_url") |
| or payload.get("url") |
| or payload.get("query") |
| or payload.get("keyword") |
| or payload.get("user_id") |
| or "" |
| ) |
| dedup_payload = f"{source_type}:{source_ref}".encode("utf-8", errors="ignore") |
| return { |
| "raw": None, |
| "normalized": None, |
| "meta": { |
| "task_id": getattr(task, "id", None) or "", |
| "source_engine": "api" if getattr(engine, "name", "") == "spider_xhs" else "browser", |
| "engine_name": getattr(engine, "name", None) or "", |
| "source_type": source_type, |
| "source_ref": source_ref, |
| "operator": "system", |
| "ingested_at": datetime.now(timezone.utc).isoformat(), |
| "dedup_key": hashlib.sha1(dedup_payload).hexdigest(), |
| "ok": False, |
| "error_kind": "parse", |
| "error_message": str(e), |
| }, |
| } |
|
|
| def _persist_raw_html_snapshot(self, task_id: str, engine: Any, result: Dict[str, Any]) -> str | None: |
| engine_name = getattr(engine, "name", None) |
| if engine_name != getattr(self._engine_browser, "name", "mediacrawler"): |
| return None |
| raw = result.get("raw") |
| if not isinstance(raw, dict): |
| return None |
| html = raw.get("html") |
| if not isinstance(html, str) or html.strip() == "": |
| return None |
| path = self.storage.write_raw_html_snapshot(task_id, html) |
| try: |
| rel = path.relative_to(self.storage.base_dir) |
| return str(rel) |
| except Exception: |
| return str(path) |
|
|
| @staticmethod |
| def _to_dict(model: Any) -> Dict[str, Any]: |
| if hasattr(model, "model_dump"): |
| return model.model_dump() |
| if hasattr(model, "dict"): |
| return model.dict() |
| return dict(model) |
|
|
| @staticmethod |
| def _callback_idempotency_key(task_id: str, result: Dict[str, Any]) -> str: |
| meta = result.get("meta") if isinstance(result, dict) else None |
| dedup_key = meta.get("dedup_key") if isinstance(meta, dict) else None |
| payload = f"{task_id}:{dedup_key or ''}".encode("utf-8", errors="ignore") |
| return hashlib.sha256(payload).hexdigest() |
|
|
| @staticmethod |
| def _sleep_backoff(attempt: int, *, base_s: float = 0.6, max_s: float = 10.0) -> None: |
| exp = min(max_s, base_s * (2**attempt)) |
| jitter = random.uniform(0.0, exp * 0.25) |
| time.sleep(exp + jitter) |
|
|
| def _push_callback(self, task_id: str, result: Dict[str, Any]) -> CallbackState | None: |
| callback_url = getattr(self.config, "callback_url", None) |
| if callback_url is None or str(callback_url).strip() == "": |
| return None |
|
|
| existing = self.storage.get_task(task_id) |
| existing_state = getattr(existing, "callback", None) if existing is not None else None |
| if existing_state is not None and getattr(existing_state, "status", None) == CallbackStatus.succeeded: |
| return existing_state |
|
|
| idempotency_key = self._callback_idempotency_key(task_id, result) |
| state = CallbackState( |
| status=CallbackStatus.pending, |
| callback_url=callback_url, |
| idempotency_key=idempotency_key, |
| attempts=int(getattr(existing_state, "attempts", 0) or 0), |
| last_attempt_at=getattr(existing_state, "last_attempt_at", None), |
| last_http_status=getattr(existing_state, "last_http_status", None), |
| last_error=getattr(existing_state, "last_error", None), |
| next_retry_at=getattr(existing_state, "next_retry_at", None), |
| ) |
| self.storage.update_task(task_id, callback=state) |
|
|
| body = {"task_id": task_id, "result": result} |
| headers = {"Idempotency-Key": idempotency_key} |
|
|
| max_attempts = 5 |
| safe_callback_url = self._safe_url_for_log(callback_url) |
| logger.bind(task_id=task_id, status="callback").info( |
| "callback_started callback_url={} attempts={}", |
| safe_callback_url, |
| int(state.attempts), |
| ) |
| for attempt in range(max_attempts): |
| state.attempts = int(state.attempts) + 1 |
| state.last_attempt_at = time.time() |
| state.next_retry_at = None |
| self.storage.update_task(task_id, callback=state) |
| try: |
| resp = requests.post(callback_url, json=body, headers=headers, timeout=(5.0, 20.0)) |
| state.last_http_status = int(resp.status_code) |
| if 200 <= int(resp.status_code) < 300: |
| state.status = CallbackStatus.succeeded |
| state.last_error = None |
| state.next_retry_at = None |
| self.storage.update_task(task_id, callback=state) |
| self.storage.delete_callback_failure(task_id) |
| logger.bind(task_id=task_id, status="callback").info( |
| "callback_succeeded callback_url={} http_status={} attempts={}", |
| safe_callback_url, |
| int(resp.status_code), |
| int(state.attempts), |
| ) |
| return state |
| state.last_error = f"http_status={resp.status_code}" |
| if int(resp.status_code) not in (429, 500, 502, 503, 504) and attempt >= 0: |
| break |
| except Exception as e: |
| state.last_http_status = None |
| state.last_error = str(e) |
|
|
| if attempt < max_attempts - 1: |
| self._sleep_backoff(attempt) |
|
|
| state.status = CallbackStatus.failed |
| state.next_retry_at = time.time() + 60.0 |
| self.storage.update_task(task_id, callback=state) |
| self.storage.write_callback_failure( |
| task_id, |
| { |
| "task_id": task_id, |
| "callback_url": callback_url, |
| "idempotency_key": idempotency_key, |
| "body": body, |
| "callback": self._to_dict(state), |
| "saved_at": time.time(), |
| }, |
| ) |
| logger.bind(task_id=task_id, status="callback").warning( |
| "callback_failed callback_url={} attempts={} last_http_status={} last_error={}", |
| safe_callback_url, |
| int(state.attempts), |
| str(state.last_http_status) if state.last_http_status is not None else "", |
| self._safe_error_text(state.last_error), |
| ) |
| return state |
|
|
| def import_extension_result(self, task_id: str, result: Dict[str, Any]) -> TaskRecord: |
| task = self.storage.get_task(task_id) |
| if task is None: |
| raise KeyError("task not found") |
|
|
| self.storage.write_result(task_id, result) |
| updated = self.storage.update_task(task_id, status=TaskStatus.rpa_imported, engine="extension_rpa", error=None) |
| try: |
| self._push_callback(task_id, result) |
| except Exception: |
| pass |
|
|
| return updated or self.storage.get_task(task_id) or task |
|
|
| def retry_callback(self, task_id: str) -> CallbackState: |
| callback_url = getattr(self.config, "callback_url", None) |
| if callback_url is None or str(callback_url).strip() == "": |
| raise ValueError("callback_url not configured") |
|
|
| task = self.storage.get_task(task_id) |
| if task is None: |
| raise KeyError("task not found") |
|
|
| result = self.storage.read_result(task_id) |
| if result is None: |
| raise RuntimeError("result not ready") |
|
|
| failure = self.storage.read_callback_failure(task_id) or {} |
| url = str(failure.get("callback_url") or callback_url) |
| body = failure.get("body") |
| if not isinstance(body, dict): |
| body = {"task_id": task_id, "result": result} |
| idempotency_key = str( |
| failure.get("idempotency_key") or self._callback_idempotency_key(task_id, result) |
| ) |
|
|
| body_task_id = body.get("task_id") |
| if body_task_id != task_id: |
| body["task_id"] = task_id |
| if "result" not in body: |
| body["result"] = result |
|
|
| state = CallbackState( |
| status=CallbackStatus.pending, |
| callback_url=url, |
| idempotency_key=idempotency_key, |
| attempts=int(getattr(getattr(task, "callback", None), "attempts", 0) or 0), |
| ) |
| self.storage.update_task(task_id, callback=state) |
|
|
| headers = {"Idempotency-Key": idempotency_key} |
| max_attempts = 5 |
| safe_url = self._safe_url_for_log(url) |
| logger.bind(task_id=task_id, status="callback").info( |
| "callback_retry_started callback_url={} attempts={}", |
| safe_url, |
| int(state.attempts), |
| ) |
| for attempt in range(max_attempts): |
| state.attempts = int(state.attempts) + 1 |
| state.last_attempt_at = time.time() |
| state.next_retry_at = None |
| self.storage.update_task(task_id, callback=state) |
| try: |
| resp = requests.post(url, json=body, headers=headers, timeout=(5.0, 20.0)) |
| state.last_http_status = int(resp.status_code) |
| if 200 <= int(resp.status_code) < 300: |
| state.status = CallbackStatus.succeeded |
| state.last_error = None |
| state.next_retry_at = None |
| self.storage.update_task(task_id, callback=state) |
| self.storage.delete_callback_failure(task_id) |
| logger.bind(task_id=task_id, status="callback").info( |
| "callback_retry_succeeded callback_url={} http_status={} attempts={}", |
| safe_url, |
| int(resp.status_code), |
| int(state.attempts), |
| ) |
| return state |
| state.last_error = f"http_status={resp.status_code}" |
| if int(resp.status_code) not in (429, 500, 502, 503, 504) and attempt >= 0: |
| break |
| except Exception as e: |
| state.last_http_status = None |
| state.last_error = str(e) |
|
|
| if attempt < max_attempts - 1: |
| self._sleep_backoff(attempt) |
|
|
| state.status = CallbackStatus.failed |
| state.next_retry_at = time.time() + 60.0 |
| self.storage.update_task(task_id, callback=state) |
| self.storage.write_callback_failure( |
| task_id, |
| { |
| "task_id": task_id, |
| "callback_url": url, |
| "idempotency_key": idempotency_key, |
| "body": body, |
| "callback": self._to_dict(state), |
| "saved_at": time.time(), |
| }, |
| ) |
| logger.bind(task_id=task_id, status="callback").warning( |
| "callback_retry_failed callback_url={} attempts={} last_http_status={} last_error={}", |
| safe_url, |
| int(state.attempts), |
| str(state.last_http_status) if state.last_http_status is not None else "", |
| self._safe_error_text(state.last_error), |
| ) |
| return state |
|
|
| def run_task(self, task_id: str) -> None: |
| task = self.storage.get_task(task_id) |
| if task is None: |
| return |
|
|
| payload = getattr(task, "payload", {}) or {} |
| scope = self._extract_scope(getattr(task, "task_type", "") or "", payload) |
| task_type = str(getattr(task, "task_type", "") or "") |
| engine_req = str(getattr(task, "engine", None) or "") |
| logger.bind(task_id=task_id, task_type=task_type, engine=engine_req, status="running").info( |
| "task_started scope={}", |
| scope, |
| ) |
| self._ops_counters.record_started() |
| self.storage.update_task(task_id, status=TaskStatus.running) |
|
|
| try: |
| if (getattr(task, "task_type", "") or "").strip().lower() == "selftest": |
| operator = str(payload.get("operator") or "system") |
| dedup_payload = f"selftest:{task_id}".encode("utf-8", errors="ignore") |
| meta: Dict[str, Any] = { |
| "task_id": task_id, |
| "source_engine": "selftest", |
| "engine_name": "selftest", |
| "source_type": "selftest", |
| "source_ref": "", |
| "operator": operator, |
| "ingested_at": datetime.now(timezone.utc).isoformat(), |
| "dedup_key": hashlib.sha1(dedup_payload).hexdigest(), |
| "ok": True, |
| "primary_engine": "selftest", |
| "final_engine": "selftest", |
| } |
| result: Dict[str, Any] = { |
| "raw": {"message": "ok"}, |
| "normalized": {"kind": "selftest"}, |
| "meta": meta, |
| } |
| self.storage.update_task(task_id, engine="selftest") |
| self.storage.write_result(task_id, result) |
| self.storage.update_task(task_id, status=TaskStatus.succeeded, error=None) |
| self._ops_counters.record_finished(engine="selftest", ok=True) |
| logger.bind(task_id=task_id, task_type=task_type, engine="selftest", status="succeeded").info( |
| "task_succeeded" |
| ) |
| try: |
| self._push_callback(task_id, result) |
| except Exception: |
| pass |
| return |
|
|
| engine_choice = self._normalize_engine_choice(task.engine or self.config.engine_strategy or "auto") |
| exec_state = self._stability.init_execution( |
| task_engine_choice=engine_choice, |
| scope=scope, |
| engine_api=self._engine_api, |
| engine_browser=self._engine_browser, |
| ) |
|
|
| primary_name = str(getattr(exec_state.primary_engine, "name", exec_state.primary_key)) |
| retry_count = int(getattr(task, "retry_count", 0) or 0) |
|
|
| final_engine = exec_state.primary_engine |
| final_name = primary_name |
| final_result: Dict[str, Any] | None = None |
| paused_status: TaskStatus | None = None |
|
|
| while True: |
| payload_now = getattr(task, "payload", {}) or {} |
| plan = self._stability.build_attempt_plan(state=exec_state, payload=payload_now) |
|
|
| if plan.status in (TaskStatus.retrying, TaskStatus.fallback_running): |
| retry_count += 1 |
|
|
| self.storage.update_task( |
| task_id, |
| status=plan.status, |
| engine=plan.engine_name, |
| retry_count=retry_count, |
| payload=plan.payload, |
| ) |
| task.payload = plan.payload |
| task.engine = plan.engine_name |
|
|
| result = self._safe_run_engine(plan.engine, task) |
| meta = result.get("meta") if isinstance(result.get("meta"), dict) else {} |
| ok = bool(meta.get("ok")) |
| error_kind = str(meta.get("error_kind") or "").strip().lower() or None |
|
|
| self._stability.observe_result( |
| state=exec_state, |
| engine_name=plan.engine_name, |
| ok=ok, |
| error_kind=error_kind, |
| ) |
|
|
| if ok: |
| self._stability.report_success(resources=plan.resources) |
| final_engine = plan.engine |
| final_name = plan.engine_name |
| final_result = result |
| break |
|
|
| self._stability.report_failure(resources=plan.resources, error_kind=error_kind) |
| decision = self._stability.decide_after_failure( |
| state=exec_state, |
| engine_name=plan.engine_name, |
| error_kind=error_kind, |
| ) |
|
|
| if decision.action == "pause" and error_kind in ("captcha", "risk"): |
| logger.info(f"Triggering AgenticCaptchaSolver for error_kind={error_kind}") |
| challenge = self._stability.build_captcha_challenge( |
| engine_name=plan.engine_name, |
| scope=scope, |
| task_id=task_id, |
| payload=plan.payload, |
| ) |
| try: |
| solution = asyncio.run(self._stability.try_solve_captcha(challenge=challenge)) |
| if solution and solution.ok: |
| logger.info("AgenticCaptchaSolver succeeded! Retrying task...") |
| TaskRunner._sleep_backoff(int(exec_state.attempts_by_key.get(exec_state.current_key, 1) or 1)) |
| continue |
| else: |
| logger.warning(f"AgenticCaptchaSolver failed. Solution: {solution}") |
| except Exception as e: |
| logger.error(f"Failed to run AgenticCaptchaSolver: {e}") |
|
|
| if decision.action == "retry": |
| TaskRunner._sleep_backoff(int(exec_state.attempts_by_key.get(exec_state.current_key, 1) or 1)) |
| continue |
|
|
| if decision.action == "fallback" and decision.next_key is not None: |
| self._stability.switch_to(state=exec_state, key=decision.next_key, fallback_reason=decision.fallback_reason) |
| continue |
|
|
| final_engine = plan.engine |
| final_name = plan.engine_name |
| final_result = result |
| if decision.action == "pause": |
| paused_status = decision.status or TaskStatus.waiting_rpa |
| break |
|
|
| if final_result is None: |
| raise RuntimeError("runner_no_result") |
|
|
| meta = final_result.get("meta") if isinstance(final_result.get("meta"), dict) else {} |
| meta = dict(meta or {}) |
| meta["primary_engine"] = primary_name |
| meta["final_engine"] = str(final_name) |
| if exec_state.fallback_reason is not None: |
| meta["fallback_reason"] = exec_state.fallback_reason |
| raw_html_path = self._persist_raw_html_snapshot(task.id, final_engine, final_result) |
| if raw_html_path is not None: |
| meta["raw_html_path"] = raw_html_path |
| final_result["meta"] = meta |
|
|
| self.storage.write_result(task.id, final_result) |
|
|
| if paused_status is not None: |
| err_msg = str(meta.get("error_message") or "engine_failed") |
| err_kind = str(meta.get("error_kind") or "parse") |
| self.storage.update_task( |
| task_id, |
| status=paused_status, |
| error={"kind": err_kind, "message": err_msg, "engine": str(final_name)}, |
| ) |
| return |
|
|
| if bool(meta.get("ok")): |
| self.storage.update_task(task_id, status=TaskStatus.succeeded, error=None) |
| self._ops_counters.record_finished(engine=str(final_name), ok=True) |
| logger.bind(task_id=task_id, task_type=task_type, engine=str(final_name), status="succeeded").info( |
| "task_succeeded" |
| ) |
| else: |
| err_msg = str(meta.get("error_message") or "engine_failed") |
| err_kind = str(meta.get("error_kind") or "parse") |
| self.storage.update_task( |
| task_id, |
| status=TaskStatus.failed, |
| error={"kind": err_kind, "message": err_msg, "engine": str(final_name)}, |
| ) |
| self._ops_counters.record_finished(engine=str(final_name), ok=False) |
| logger.bind(task_id=task_id, task_type=task_type, engine=str(final_name), status="failed").error( |
| "task_failed error_kind={} error_message={}", |
| str(meta.get("error_kind") or ""), |
| self._safe_error_text(err_msg), |
| ) |
| try: |
| existing = self.storage.get_task(task_id) |
| if paused_status is None and existing is not None and existing.status != TaskStatus.waiting_rpa: |
| self._push_callback(task_id, final_result) |
| except Exception: |
| pass |
| except Exception as e: |
| source_type = getattr(task, "task_type", "") if task is not None else "" |
| source_ref = "" |
| engine_choice = "auto" |
| try: |
| engine_choice = self._normalize_engine_choice( |
| getattr(task, "engine", None) or self.config.engine_strategy or "auto" |
| ) |
| except Exception: |
| engine_choice = "auto" |
| primary_engine = self._engine_api |
| if engine_choice == "browser": |
| primary_engine = self._engine_browser |
| try: |
| payload = getattr(task, "payload", {}) or {} |
| source_ref = str( |
| payload.get("note_url") |
| or payload.get("url") |
| or payload.get("query") |
| or payload.get("keyword") |
| or payload.get("user_id") |
| or "" |
| ) |
| except Exception: |
| source_ref = "" |
| dedup_payload = f"{source_type}:{source_ref}".encode("utf-8", errors="ignore") |
| primary_name = getattr(primary_engine, "name", "spider_xhs") |
| result = { |
| "raw": None, |
| "normalized": None, |
| "meta": { |
| "task_id": task_id, |
| "source_engine": "api" if primary_name == "spider_xhs" else "browser", |
| "engine_name": primary_name, |
| "source_type": source_type, |
| "source_ref": source_ref, |
| "operator": "system", |
| "ingested_at": datetime.now(timezone.utc).isoformat(), |
| "dedup_key": hashlib.sha1(dedup_payload).hexdigest(), |
| "ok": False, |
| "primary_engine": primary_name, |
| "final_engine": primary_name, |
| "fallback_reason": "runner_exception", |
| "error_kind": "parse", |
| "error_message": str(e), |
| }, |
| } |
| self.storage.write_result(task_id, result) |
| self.storage.update_task(task_id, status=TaskStatus.failed, error={"kind": "parse", "message": str(e)}) |
| self._ops_counters.record_finished(engine=str(primary_name), ok=False) |
| logger.bind(task_id=task_id, task_type=task_type, engine=str(primary_name), status="failed").error( |
| "task_failed error_kind=parse error_message={}", |
| self._safe_error_text(str(e)), |
| ) |
| try: |
| self._push_callback(task_id, result) |
| except Exception: |
| pass |
|
|