XHS / engines /agentic_crawler.py
Trae Bot
Upload Spider_XHS project
c481f8a
from typing import Any, Dict
import asyncio
from datetime import datetime, timezone
import hashlib
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from .base import EngineRunOutput
from service.tasks import TaskRecord
from orchestrator.agent_utils import get_llm, get_browser_with_auth
from browser_use import Agent
class AgenticCrawlerEngine:
name = "agentic_crawler"
def __init__(self, proxy: str | None = None, storage_state_paths: list[str] | None = None):
self.proxy = proxy
self.storage_state_paths = storage_state_paths or []
def _get_storage_state_path(self) -> str | None:
if self.storage_state_paths:
return self.storage_state_paths[0]
return None
def run(self, task: TaskRecord) -> EngineRunOutput:
return asyncio.run(self._run_async(task))
async def _run_async(self, task: TaskRecord) -> EngineRunOutput:
task_type = task.task_type
payload = task.payload or {}
llm = get_llm()
browser = get_browser_with_auth(self._get_storage_state_path())
prompt = ""
if task_type == "search_notes":
keyword = payload.get("keyword")
prompt = f"Go to Xiaohongshu (https://www.xiaohongshu.com/). Search for '{keyword}', click on the first 3 notes, and extract their title, author, likes, and content. Return the results in a valid JSON array format."
elif task_type == "note_detail":
url = payload.get("note_url") or payload.get("url")
prompt = f"Open this Xiaohongshu note: {url}. Extract its title, author, likes, and content. Return the result in a valid JSON format."
else:
await browser.close()
raise ValueError(f"Agentic Crawler does not support task type: {task_type}")
agent = Agent(task=prompt, llm=llm, browser=browser)
try:
result = await agent.run()
final_text = result.final_result() if hasattr(result, 'final_result') else str(result)
source_ref = payload.get("keyword") or payload.get("note_url") or payload.get("url") or ""
dedup_payload = f"{task_type}:{source_ref}".encode("utf-8", errors="ignore")
meta = {
"task_id": task.id,
"source_engine": "browser",
"engine_name": self.name,
"source_type": task_type,
"source_ref": source_ref,
"operator": payload.get("operator") or "system",
"ingested_at": datetime.now(timezone.utc).isoformat(),
"dedup_key": hashlib.sha1(dedup_payload).hexdigest(),
"ok": True,
}
return EngineRunOutput(
raw={"agent_result": final_text},
normalized={"data": final_text},
meta=meta
)
except Exception as e:
raise RuntimeError(f"Agentic Crawler failed: {e}")
finally:
await browser.close()