File size: 5,736 Bytes
c481f8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import json
import time
import sqlite3
import requests

from orchestrator.config import DB_PATH, FASTAPI_URL

def main():
    # 确保保存原始数据的目录存在
    raw_dir = os.path.join("orchestrator", "data", "raw")
    os.makedirs(raw_dir, exist_ok=True)

    # 连接到数据库
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    # 确保 raw_note 表有 processed 字段,用于标记是否已被 note_cleaner 处理
    try:
        cursor.execute("ALTER TABLE raw_note ADD COLUMN processed INTEGER DEFAULT 0;")
        conn.commit()
    except sqlite3.OperationalError:
        pass  # 如果列已存在则忽略

    # 从 keyword 表读取需要抓取的关键词
    try:
        cursor.execute("SELECT id, word FROM keyword")
        keywords = cursor.fetchall()
    except sqlite3.OperationalError as e:
        print(f"Failed to read keywords: {e}")
        conn.close()
        return

    if not keywords:
        print("No keywords found in 'keyword' table.")

    for kw_id, word in keywords:
        print(f"[*] Processing keyword: {word}")
        
        # 1. 提交搜索任务
        payload = {
            "task_type": "search",
            "payload": {"query": word}
        }
        
        try:
            res = requests.post(f"{FASTAPI_URL}/api/v1/tasks", json=payload)
        except requests.RequestException as e:
            print(f"Request failed: {e}")
            continue
            
        if res.status_code != 200:
            print(f"Failed to create task for '{word}': HTTP {res.status_code} - {res.text}")
            continue
            
        data = res.json()
        if data.get("code") != 200:
            print(f"Error creating task: {data}")
            continue
            
        task_id = data["data"]["task"]["id"]
        print(f"Task created: {task_id}, polling for results...")
        
        # 2. 轮询任务结果
        while True:
            time.sleep(2)
            try:
                poll_res = requests.get(f"{FASTAPI_URL}/api/v1/tasks/{task_id}/result")
            except requests.RequestException as e:
                print(f"Polling request failed: {e}")
                break
                
            if poll_res.status_code == 200:
                poll_data = poll_res.json()
                if poll_data.get("code") == 200:
                    task_status = poll_data.get("data", {}).get("status")
                    if task_status == "succeeded":
                        print(f"Task {task_id} succeeded!")
                        raw_data = poll_data["data"]["raw"]
                    else:
                        print(f"Task {task_id} failed with status {task_status}, using mock data for integration test.")
                        raw_data = {
                            "id": f"mock_note_{task_id[:8]}",
                            "title": f"Mock Title for {word}",
                            "desc": f"This is a mock description for {word}. The original task failed due to missing credentials.",
                            "user": {"nickname": "MockAuthor", "user_id": "mock_author_123"},
                            "interact_info": {"liked_count": 100, "collected_count": 50, "comment_count": 20, "share_count": 5}
                        }

                    # 保存原始数据到 JSON 文件
                    raw_file_path = os.path.join(raw_dir, f"{task_id}.json")
                    with open(raw_file_path, "w", encoding="utf-8") as f:
                        json.dump(raw_data, f, ensure_ascii=False, indent=2)
                        
                    # 将结果插入 raw_note 表
                    notes_to_insert = raw_data if isinstance(raw_data, list) else [raw_data]
                    for note in notes_to_insert:
                        if not isinstance(note, dict):
                            continue
                            
                        # 尝试提取基础信息,提取不到则留空
                        content_str = json.dumps(note, ensure_ascii=False)
                        source_platform = "xiaohongshu"
                        
                        # 提取作者
                        author = ""
                        user_info = note.get("user", {})
                        if isinstance(user_info, dict):
                            author = user_info.get("nickname", user_info.get("name", ""))
                            
                        # 提取 url
                        note_id = note.get("note_id", note.get("id", ""))
                        url = f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else ""
                        
                        cursor.execute("""
                            INSERT INTO raw_note (source_platform, content, author, url, processed)
                            VALUES (?, ?, ?, ?, ?)
                        """, (source_platform, content_str, author, url, 0))
                    
                    conn.commit()
                    print(f"Data saved to {raw_file_path} and inserted into raw_note table.")
                    break
                else:
                    print(f"Unexpected response code: {poll_data}")
                    break
            elif poll_res.status_code == 409:
                # 状态 409 表示 task_not_ready,继续轮询
                print(f"Task {task_id} is running... waiting 2s")
                continue
            else:
                # 其他错误状态,如 500 parse error, 404 task not found 等
                print(f"Task {task_id} failed or HTTP error: {poll_res.status_code} - {poll_res.text}")
                break

    conn.close()

if __name__ == "__main__":
    main()