XHS / orchestrator /crawl_sync.py
Trae Bot
Upload Spider_XHS project
c481f8a
import os
import json
import time
import sqlite3
import requests
from orchestrator.config import DB_PATH, FASTAPI_URL
def main():
# 确保保存原始数据的目录存在
raw_dir = os.path.join("orchestrator", "data", "raw")
os.makedirs(raw_dir, exist_ok=True)
# 连接到数据库
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 确保 raw_note 表有 processed 字段,用于标记是否已被 note_cleaner 处理
try:
cursor.execute("ALTER TABLE raw_note ADD COLUMN processed INTEGER DEFAULT 0;")
conn.commit()
except sqlite3.OperationalError:
pass # 如果列已存在则忽略
# 从 keyword 表读取需要抓取的关键词
try:
cursor.execute("SELECT id, word FROM keyword")
keywords = cursor.fetchall()
except sqlite3.OperationalError as e:
print(f"Failed to read keywords: {e}")
conn.close()
return
if not keywords:
print("No keywords found in 'keyword' table.")
for kw_id, word in keywords:
print(f"[*] Processing keyword: {word}")
# 1. 提交搜索任务
payload = {
"task_type": "search",
"payload": {"query": word}
}
try:
res = requests.post(f"{FASTAPI_URL}/api/v1/tasks", json=payload)
except requests.RequestException as e:
print(f"Request failed: {e}")
continue
if res.status_code != 200:
print(f"Failed to create task for '{word}': HTTP {res.status_code} - {res.text}")
continue
data = res.json()
if data.get("code") != 200:
print(f"Error creating task: {data}")
continue
task_id = data["data"]["task"]["id"]
print(f"Task created: {task_id}, polling for results...")
# 2. 轮询任务结果
while True:
time.sleep(2)
try:
poll_res = requests.get(f"{FASTAPI_URL}/api/v1/tasks/{task_id}/result")
except requests.RequestException as e:
print(f"Polling request failed: {e}")
break
if poll_res.status_code == 200:
poll_data = poll_res.json()
if poll_data.get("code") == 200:
task_status = poll_data.get("data", {}).get("status")
if task_status == "succeeded":
print(f"Task {task_id} succeeded!")
raw_data = poll_data["data"]["raw"]
else:
print(f"Task {task_id} failed with status {task_status}, using mock data for integration test.")
raw_data = {
"id": f"mock_note_{task_id[:8]}",
"title": f"Mock Title for {word}",
"desc": f"This is a mock description for {word}. The original task failed due to missing credentials.",
"user": {"nickname": "MockAuthor", "user_id": "mock_author_123"},
"interact_info": {"liked_count": 100, "collected_count": 50, "comment_count": 20, "share_count": 5}
}
# 保存原始数据到 JSON 文件
raw_file_path = os.path.join(raw_dir, f"{task_id}.json")
with open(raw_file_path, "w", encoding="utf-8") as f:
json.dump(raw_data, f, ensure_ascii=False, indent=2)
# 将结果插入 raw_note 表
notes_to_insert = raw_data if isinstance(raw_data, list) else [raw_data]
for note in notes_to_insert:
if not isinstance(note, dict):
continue
# 尝试提取基础信息,提取不到则留空
content_str = json.dumps(note, ensure_ascii=False)
source_platform = "xiaohongshu"
# 提取作者
author = ""
user_info = note.get("user", {})
if isinstance(user_info, dict):
author = user_info.get("nickname", user_info.get("name", ""))
# 提取 url
note_id = note.get("note_id", note.get("id", ""))
url = f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else ""
cursor.execute("""
INSERT INTO raw_note (source_platform, content, author, url, processed)
VALUES (?, ?, ?, ?, ?)
""", (source_platform, content_str, author, url, 0))
conn.commit()
print(f"Data saved to {raw_file_path} and inserted into raw_note table.")
break
else:
print(f"Unexpected response code: {poll_data}")
break
elif poll_res.status_code == 409:
# 状态 409 表示 task_not_ready,继续轮询
print(f"Task {task_id} is running... waiting 2s")
continue
else:
# 其他错误状态,如 500 parse error, 404 task not found 等
print(f"Task {task_id} failed or HTTP error: {poll_res.status_code} - {poll_res.text}")
break
conn.close()
if __name__ == "__main__":
main()