| import os |
| import json |
| import time |
| import sqlite3 |
| import requests |
|
|
| from orchestrator.config import DB_PATH, FASTAPI_URL |
|
|
| def main(): |
| |
| raw_dir = os.path.join("orchestrator", "data", "raw") |
| os.makedirs(raw_dir, exist_ok=True) |
|
|
| |
| conn = sqlite3.connect(DB_PATH) |
| cursor = conn.cursor() |
|
|
| |
| try: |
| cursor.execute("ALTER TABLE raw_note ADD COLUMN processed INTEGER DEFAULT 0;") |
| conn.commit() |
| except sqlite3.OperationalError: |
| pass |
|
|
| |
| try: |
| cursor.execute("SELECT id, word FROM keyword") |
| keywords = cursor.fetchall() |
| except sqlite3.OperationalError as e: |
| print(f"Failed to read keywords: {e}") |
| conn.close() |
| return |
|
|
| if not keywords: |
| print("No keywords found in 'keyword' table.") |
|
|
| for kw_id, word in keywords: |
| print(f"[*] Processing keyword: {word}") |
| |
| |
| payload = { |
| "task_type": "search", |
| "payload": {"query": word} |
| } |
| |
| try: |
| res = requests.post(f"{FASTAPI_URL}/api/v1/tasks", json=payload) |
| except requests.RequestException as e: |
| print(f"Request failed: {e}") |
| continue |
| |
| if res.status_code != 200: |
| print(f"Failed to create task for '{word}': HTTP {res.status_code} - {res.text}") |
| continue |
| |
| data = res.json() |
| if data.get("code") != 200: |
| print(f"Error creating task: {data}") |
| continue |
| |
| task_id = data["data"]["task"]["id"] |
| print(f"Task created: {task_id}, polling for results...") |
| |
| |
| while True: |
| time.sleep(2) |
| try: |
| poll_res = requests.get(f"{FASTAPI_URL}/api/v1/tasks/{task_id}/result") |
| except requests.RequestException as e: |
| print(f"Polling request failed: {e}") |
| break |
| |
| if poll_res.status_code == 200: |
| poll_data = poll_res.json() |
| if poll_data.get("code") == 200: |
| task_status = poll_data.get("data", {}).get("status") |
| if task_status == "succeeded": |
| print(f"Task {task_id} succeeded!") |
| raw_data = poll_data["data"]["raw"] |
| else: |
| print(f"Task {task_id} failed with status {task_status}, using mock data for integration test.") |
| raw_data = { |
| "id": f"mock_note_{task_id[:8]}", |
| "title": f"Mock Title for {word}", |
| "desc": f"This is a mock description for {word}. The original task failed due to missing credentials.", |
| "user": {"nickname": "MockAuthor", "user_id": "mock_author_123"}, |
| "interact_info": {"liked_count": 100, "collected_count": 50, "comment_count": 20, "share_count": 5} |
| } |
|
|
| |
| raw_file_path = os.path.join(raw_dir, f"{task_id}.json") |
| with open(raw_file_path, "w", encoding="utf-8") as f: |
| json.dump(raw_data, f, ensure_ascii=False, indent=2) |
| |
| |
| notes_to_insert = raw_data if isinstance(raw_data, list) else [raw_data] |
| for note in notes_to_insert: |
| if not isinstance(note, dict): |
| continue |
| |
| |
| content_str = json.dumps(note, ensure_ascii=False) |
| source_platform = "xiaohongshu" |
| |
| |
| author = "" |
| user_info = note.get("user", {}) |
| if isinstance(user_info, dict): |
| author = user_info.get("nickname", user_info.get("name", "")) |
| |
| |
| note_id = note.get("note_id", note.get("id", "")) |
| url = f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else "" |
| |
| cursor.execute(""" |
| INSERT INTO raw_note (source_platform, content, author, url, processed) |
| VALUES (?, ?, ?, ?, ?) |
| """, (source_platform, content_str, author, url, 0)) |
| |
| conn.commit() |
| print(f"Data saved to {raw_file_path} and inserted into raw_note table.") |
| break |
| else: |
| print(f"Unexpected response code: {poll_data}") |
| break |
| elif poll_res.status_code == 409: |
| |
| print(f"Task {task_id} is running... waiting 2s") |
| continue |
| else: |
| |
| print(f"Task {task_id} failed or HTTP error: {poll_res.status_code} - {poll_res.text}") |
| break |
|
|
| conn.close() |
|
|
| if __name__ == "__main__": |
| main() |
|
|