import sqlite3 import json from orchestrator.config import DB_PATH def main(): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 确保 raw_note 表有 processed 字段,用于标记是否已经被处理 try: cursor.execute("ALTER TABLE raw_note ADD COLUMN processed INTEGER DEFAULT 0;") conn.commit() except sqlite3.OperationalError: pass # 查询尚未处理的 raw_note 记录 cursor.execute("SELECT id, content FROM raw_note WHERE processed = 0") unprocessed_notes = cursor.fetchall() if not unprocessed_notes: print("No unprocessed notes found.") conn.close() return for note_id, content_str in unprocessed_notes: try: note_data = json.loads(content_str) except (json.JSONDecodeError, TypeError): note_data = {} cleaned_dict = {} # 优雅地提取字段并处理缺失键 if isinstance(note_data, dict): # 提取标题和正文 cleaned_dict["title"] = note_data.get("title", "") cleaned_dict["content"] = note_data.get("desc", note_data.get("content", "")) # 提取互动数据(点赞、收藏、评论、分享) interact_info = note_data.get("interact_info", {}) if isinstance(interact_info, dict): cleaned_dict["likes"] = interact_info.get("liked_count", note_data.get("likes", 0)) cleaned_dict["collects"] = interact_info.get("collected_count", note_data.get("collects", 0)) cleaned_dict["comments"] = interact_info.get("comment_count", note_data.get("comments", 0)) cleaned_dict["shares"] = interact_info.get("share_count", note_data.get("shares", 0)) else: cleaned_dict["likes"] = note_data.get("likes", 0) cleaned_dict["collects"] = note_data.get("collects", 0) cleaned_dict["comments"] = note_data.get("comments", 0) cleaned_dict["shares"] = note_data.get("shares", 0) # 提取作者相关信息 user_info = note_data.get("user", {}) if isinstance(user_info, dict): cleaned_dict["author"] = user_info.get("nickname", user_info.get("name", "")) cleaned_dict["author_id"] = user_info.get("user_id", "") cleaned_content_str = json.dumps(cleaned_dict, ensure_ascii=False) else: # 如果解析出来的不是字典,作为回退将其转换为字符串 cleaned_content_str = json.dumps({"raw_fallback": str(note_data)}, ensure_ascii=False) # 将清洗后的数据插入 cleaned_note cursor.execute(""" INSERT INTO cleaned_note (raw_note_id, cleaned_content) VALUES (?, ?) """, (note_id, cleaned_content_str)) # 标记为已处理 cursor.execute(""" UPDATE raw_note SET processed = 1 WHERE id = ? """, (note_id,)) conn.commit() print(f"Successfully processed and cleaned {len(unprocessed_notes)} notes.") conn.close() if __name__ == "__main__": main()