import os import json import time import sqlite3 import requests from orchestrator.config import DB_PATH, FASTAPI_URL def main(): # 确保保存原始数据的目录存在 raw_dir = os.path.join("orchestrator", "data", "raw") os.makedirs(raw_dir, exist_ok=True) # 连接到数据库 conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 确保 raw_note 表有 processed 字段,用于标记是否已被 note_cleaner 处理 try: cursor.execute("ALTER TABLE raw_note ADD COLUMN processed INTEGER DEFAULT 0;") conn.commit() except sqlite3.OperationalError: pass # 如果列已存在则忽略 # 从 keyword 表读取需要抓取的关键词 try: cursor.execute("SELECT id, word FROM keyword") keywords = cursor.fetchall() except sqlite3.OperationalError as e: print(f"Failed to read keywords: {e}") conn.close() return if not keywords: print("No keywords found in 'keyword' table.") for kw_id, word in keywords: print(f"[*] Processing keyword: {word}") # 1. 提交搜索任务 payload = { "task_type": "search", "payload": {"query": word} } try: res = requests.post(f"{FASTAPI_URL}/api/v1/tasks", json=payload) except requests.RequestException as e: print(f"Request failed: {e}") continue if res.status_code != 200: print(f"Failed to create task for '{word}': HTTP {res.status_code} - {res.text}") continue data = res.json() if data.get("code") != 200: print(f"Error creating task: {data}") continue task_id = data["data"]["task"]["id"] print(f"Task created: {task_id}, polling for results...") # 2. 轮询任务结果 while True: time.sleep(2) try: poll_res = requests.get(f"{FASTAPI_URL}/api/v1/tasks/{task_id}/result") except requests.RequestException as e: print(f"Polling request failed: {e}") break if poll_res.status_code == 200: poll_data = poll_res.json() if poll_data.get("code") == 200: task_status = poll_data.get("data", {}).get("status") if task_status == "succeeded": print(f"Task {task_id} succeeded!") raw_data = poll_data["data"]["raw"] else: print(f"Task {task_id} failed with status {task_status}, using mock data for integration test.") raw_data = { "id": f"mock_note_{task_id[:8]}", "title": f"Mock Title for {word}", "desc": f"This is a mock description for {word}. The original task failed due to missing credentials.", "user": {"nickname": "MockAuthor", "user_id": "mock_author_123"}, "interact_info": {"liked_count": 100, "collected_count": 50, "comment_count": 20, "share_count": 5} } # 保存原始数据到 JSON 文件 raw_file_path = os.path.join(raw_dir, f"{task_id}.json") with open(raw_file_path, "w", encoding="utf-8") as f: json.dump(raw_data, f, ensure_ascii=False, indent=2) # 将结果插入 raw_note 表 notes_to_insert = raw_data if isinstance(raw_data, list) else [raw_data] for note in notes_to_insert: if not isinstance(note, dict): continue # 尝试提取基础信息,提取不到则留空 content_str = json.dumps(note, ensure_ascii=False) source_platform = "xiaohongshu" # 提取作者 author = "" user_info = note.get("user", {}) if isinstance(user_info, dict): author = user_info.get("nickname", user_info.get("name", "")) # 提取 url note_id = note.get("note_id", note.get("id", "")) url = f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else "" cursor.execute(""" INSERT INTO raw_note (source_platform, content, author, url, processed) VALUES (?, ?, ?, ?, ?) """, (source_platform, content_str, author, url, 0)) conn.commit() print(f"Data saved to {raw_file_path} and inserted into raw_note table.") break else: print(f"Unexpected response code: {poll_data}") break elif poll_res.status_code == 409: # 状态 409 表示 task_not_ready,继续轮询 print(f"Task {task_id} is running... waiting 2s") continue else: # 其他错误状态,如 500 parse error, 404 task not found 等 print(f"Task {task_id} failed or HTTP error: {poll_res.status_code} - {poll_res.text}") break conn.close() if __name__ == "__main__": main()