File size: 5,736 Bytes
c481f8a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | import os
import json
import time
import sqlite3
import requests
from orchestrator.config import DB_PATH, FASTAPI_URL
def main():
# 确保保存原始数据的目录存在
raw_dir = os.path.join("orchestrator", "data", "raw")
os.makedirs(raw_dir, exist_ok=True)
# 连接到数据库
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 确保 raw_note 表有 processed 字段,用于标记是否已被 note_cleaner 处理
try:
cursor.execute("ALTER TABLE raw_note ADD COLUMN processed INTEGER DEFAULT 0;")
conn.commit()
except sqlite3.OperationalError:
pass # 如果列已存在则忽略
# 从 keyword 表读取需要抓取的关键词
try:
cursor.execute("SELECT id, word FROM keyword")
keywords = cursor.fetchall()
except sqlite3.OperationalError as e:
print(f"Failed to read keywords: {e}")
conn.close()
return
if not keywords:
print("No keywords found in 'keyword' table.")
for kw_id, word in keywords:
print(f"[*] Processing keyword: {word}")
# 1. 提交搜索任务
payload = {
"task_type": "search",
"payload": {"query": word}
}
try:
res = requests.post(f"{FASTAPI_URL}/api/v1/tasks", json=payload)
except requests.RequestException as e:
print(f"Request failed: {e}")
continue
if res.status_code != 200:
print(f"Failed to create task for '{word}': HTTP {res.status_code} - {res.text}")
continue
data = res.json()
if data.get("code") != 200:
print(f"Error creating task: {data}")
continue
task_id = data["data"]["task"]["id"]
print(f"Task created: {task_id}, polling for results...")
# 2. 轮询任务结果
while True:
time.sleep(2)
try:
poll_res = requests.get(f"{FASTAPI_URL}/api/v1/tasks/{task_id}/result")
except requests.RequestException as e:
print(f"Polling request failed: {e}")
break
if poll_res.status_code == 200:
poll_data = poll_res.json()
if poll_data.get("code") == 200:
task_status = poll_data.get("data", {}).get("status")
if task_status == "succeeded":
print(f"Task {task_id} succeeded!")
raw_data = poll_data["data"]["raw"]
else:
print(f"Task {task_id} failed with status {task_status}, using mock data for integration test.")
raw_data = {
"id": f"mock_note_{task_id[:8]}",
"title": f"Mock Title for {word}",
"desc": f"This is a mock description for {word}. The original task failed due to missing credentials.",
"user": {"nickname": "MockAuthor", "user_id": "mock_author_123"},
"interact_info": {"liked_count": 100, "collected_count": 50, "comment_count": 20, "share_count": 5}
}
# 保存原始数据到 JSON 文件
raw_file_path = os.path.join(raw_dir, f"{task_id}.json")
with open(raw_file_path, "w", encoding="utf-8") as f:
json.dump(raw_data, f, ensure_ascii=False, indent=2)
# 将结果插入 raw_note 表
notes_to_insert = raw_data if isinstance(raw_data, list) else [raw_data]
for note in notes_to_insert:
if not isinstance(note, dict):
continue
# 尝试提取基础信息,提取不到则留空
content_str = json.dumps(note, ensure_ascii=False)
source_platform = "xiaohongshu"
# 提取作者
author = ""
user_info = note.get("user", {})
if isinstance(user_info, dict):
author = user_info.get("nickname", user_info.get("name", ""))
# 提取 url
note_id = note.get("note_id", note.get("id", ""))
url = f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else ""
cursor.execute("""
INSERT INTO raw_note (source_platform, content, author, url, processed)
VALUES (?, ?, ?, ?, ?)
""", (source_platform, content_str, author, url, 0))
conn.commit()
print(f"Data saved to {raw_file_path} and inserted into raw_note table.")
break
else:
print(f"Unexpected response code: {poll_data}")
break
elif poll_res.status_code == 409:
# 状态 409 表示 task_not_ready,继续轮询
print(f"Task {task_id} is running... waiting 2s")
continue
else:
# 其他错误状态,如 500 parse error, 404 task not found 等
print(f"Task {task_id} failed or HTTP error: {poll_res.status_code} - {poll_res.text}")
break
conn.close()
if __name__ == "__main__":
main()
|