from typing import Any, Dict import asyncio from datetime import datetime, timezone import hashlib import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from .base import EngineRunOutput from service.tasks import TaskRecord from orchestrator.agent_utils import get_llm, get_browser_with_auth from browser_use import Agent class AgenticCrawlerEngine: name = "agentic_crawler" def __init__(self, proxy: str | None = None, storage_state_paths: list[str] | None = None): self.proxy = proxy self.storage_state_paths = storage_state_paths or [] def _get_storage_state_path(self) -> str | None: if self.storage_state_paths: return self.storage_state_paths[0] return None def run(self, task: TaskRecord) -> EngineRunOutput: return asyncio.run(self._run_async(task)) async def _run_async(self, task: TaskRecord) -> EngineRunOutput: task_type = task.task_type payload = task.payload or {} llm = get_llm() browser = get_browser_with_auth(self._get_storage_state_path()) prompt = "" if task_type == "search_notes": keyword = payload.get("keyword") prompt = f"Go to Xiaohongshu (https://www.xiaohongshu.com/). Search for '{keyword}', click on the first 3 notes, and extract their title, author, likes, and content. Return the results in a valid JSON array format." elif task_type == "note_detail": url = payload.get("note_url") or payload.get("url") prompt = f"Open this Xiaohongshu note: {url}. Extract its title, author, likes, and content. Return the result in a valid JSON format." else: await browser.close() raise ValueError(f"Agentic Crawler does not support task type: {task_type}") agent = Agent(task=prompt, llm=llm, browser=browser) try: result = await agent.run() final_text = result.final_result() if hasattr(result, 'final_result') else str(result) source_ref = payload.get("keyword") or payload.get("note_url") or payload.get("url") or "" dedup_payload = f"{task_type}:{source_ref}".encode("utf-8", errors="ignore") meta = { "task_id": task.id, "source_engine": "browser", "engine_name": self.name, "source_type": task_type, "source_ref": source_ref, "operator": payload.get("operator") or "system", "ingested_at": datetime.now(timezone.utc).isoformat(), "dedup_key": hashlib.sha1(dedup_payload).hexdigest(), "ok": True, } return EngineRunOutput( raw={"agent_result": final_text}, normalized={"data": final_text}, meta=meta ) except Exception as e: raise RuntimeError(f"Agentic Crawler failed: {e}") finally: await browser.close()