| from typing import Any, Dict |
| import asyncio |
| from datetime import datetime, timezone |
| import hashlib |
| import sys |
| import os |
|
|
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from .base import EngineRunOutput |
| from service.tasks import TaskRecord |
| from orchestrator.agent_utils import get_llm, get_browser_with_auth |
| from browser_use import Agent |
|
|
| class AgenticCrawlerEngine: |
| name = "agentic_crawler" |
|
|
| def __init__(self, proxy: str | None = None, storage_state_paths: list[str] | None = None): |
| self.proxy = proxy |
| self.storage_state_paths = storage_state_paths or [] |
|
|
| def _get_storage_state_path(self) -> str | None: |
| if self.storage_state_paths: |
| return self.storage_state_paths[0] |
| return None |
|
|
| def run(self, task: TaskRecord) -> EngineRunOutput: |
| return asyncio.run(self._run_async(task)) |
|
|
| async def _run_async(self, task: TaskRecord) -> EngineRunOutput: |
| task_type = task.task_type |
| payload = task.payload or {} |
| |
| llm = get_llm() |
| browser = get_browser_with_auth(self._get_storage_state_path()) |
| |
| prompt = "" |
| if task_type == "search_notes": |
| keyword = payload.get("keyword") |
| prompt = f"Go to Xiaohongshu (https://www.xiaohongshu.com/). Search for '{keyword}', click on the first 3 notes, and extract their title, author, likes, and content. Return the results in a valid JSON array format." |
| elif task_type == "note_detail": |
| url = payload.get("note_url") or payload.get("url") |
| prompt = f"Open this Xiaohongshu note: {url}. Extract its title, author, likes, and content. Return the result in a valid JSON format." |
| else: |
| await browser.close() |
| raise ValueError(f"Agentic Crawler does not support task type: {task_type}") |
|
|
| agent = Agent(task=prompt, llm=llm, browser=browser) |
| |
| try: |
| result = await agent.run() |
| final_text = result.final_result() if hasattr(result, 'final_result') else str(result) |
| |
| source_ref = payload.get("keyword") or payload.get("note_url") or payload.get("url") or "" |
| dedup_payload = f"{task_type}:{source_ref}".encode("utf-8", errors="ignore") |
| |
| meta = { |
| "task_id": task.id, |
| "source_engine": "browser", |
| "engine_name": self.name, |
| "source_type": task_type, |
| "source_ref": source_ref, |
| "operator": payload.get("operator") or "system", |
| "ingested_at": datetime.now(timezone.utc).isoformat(), |
| "dedup_key": hashlib.sha1(dedup_payload).hexdigest(), |
| "ok": True, |
| } |
| |
| return EngineRunOutput( |
| raw={"agent_result": final_text}, |
| normalized={"data": final_text}, |
| meta=meta |
| ) |
| |
| except Exception as e: |
| raise RuntimeError(f"Agentic Crawler failed: {e}") |
| finally: |
| await browser.close() |
|
|