Spaces:
Sleeping
Sleeping
File size: 9,111 Bytes
84ed1d1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | import json
import os
import sys
import copy
from config_loader import get_paths
from datetime import datetime
from export_todolist import export_todolist_to_json
from receiving_useful_messages import main
# --- 配置与辅助函数 ---
sys.stdout.reconfigure(encoding='utf-8')
def convert_datetime(obj):
"""自定义JSON序列化处理器"""
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Type {type(obj)} not serializable")
def load_json_data(file_path):
"""加载JSON文件数据,处理错误并确保返回列表"""
if not os.path.exists(file_path):
print(f"[错误] 文件未找到: {file_path}")
return None
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
return data
else:
print(f"[错误] 文件格式不正确,预期为列表: {file_path}")
return None
except json.JSONDecodeError:
print(f"[错误] JSON解码失败: {file_path}")
return None
except Exception as e:
print(f"[错误] 加载文件时发生未知错误 {file_path}: {str(e)}")
return None
def generate_unique_id(base_id, existing_ids_set):
"""生成唯一的 ID (base_id_upd, base_id_upd_1, ...)"""
potential_id = f"{str(base_id)}_upd"
counter = 1
while potential_id in existing_ids_set:
potential_id = f"{str(base_id)}_upd_{counter}"
counter += 1
return potential_id
# --- 核心处理逻辑函数 ---
def process_record(item_r, existing_message_ids, existing_todo_contents, existing_message_id_to_record, all_known_message_ids, stats):
"""
处理来自 result1.json 的单条记录,根据规则决定操作。
返回: 要添加到 compare.json 的记录 (字典) 或 None。
同时更新 stats 字典和 all_known_message_ids 集合。
"""
record_to_save = None # 初始化返回值
try:
r_message_id_str = str(item_r['message_id'])
r_todo_content_str = str(item_r['todo_content'])
except KeyError as e:
print(f"[警告] result1.json 中的记录缺少键 {e},已跳过: {item_r}")
stats['skipped_missing_keys'] += 1
return None
except TypeError as e:
print(f"[警告] result1.json 中的记录键值类型错误 {e},已跳过: {item_r}")
stats['skipped_missing_keys'] += 1
return None
if r_message_id_str not in existing_message_ids:
# --- 情况 1: 新 message_id ---
if r_todo_content_str not in existing_todo_contents:
# 1.1: 新 todo_content -> 保存
record_to_save = item_r
all_known_message_ids.add(r_message_id_str) # 追踪新 ID
stats['saved_new_id_new_content'] += 1
else:
# 1.2: 已存在 todo_content -> 打印
print("-" * 30)
print(f"打印 (新 message_id: {r_message_id_str}, 但 todo_content 已存在):")
print(json.dumps(item_r, indent=2, ensure_ascii=False, default=convert_datetime))
print("-" * 30)
stats['printed_new_id_existing_content'] += 1
else:
# --- 情况 2: 已存在 message_id ---
record_e = existing_message_id_to_record.get(r_message_id_str) # 获取现有记录
if record_e is None:
# 理论上不应发生,因为 ID 在 existing_message_ids 中
print(f"[警告] ID {r_message_id_str} 在集合中但在字典中找不到?跳过。")
stats['skipped_internal_error'] = stats.get('skipped_internal_error', 0) + 1 # 新增统计
return None
e_todo_content_str = str(record_e.get('todo_content', '')) # 安全获取
if r_todo_content_str != e_todo_content_str:
# 2.1: todo_content 不同 -> 修改 ID 并保存
new_unique_id = generate_unique_id(r_message_id_str, all_known_message_ids)
# all_known_message_ids.add(new_unique_id) # 追踪新生成的 ID
modified_item_r = copy.deepcopy(item_r)
# modified_item_r['message_id'] = new_unique_id
record_to_save = modified_item_r
stats['saved_modified_id_diff_content'] += 1
else:
# 2.2: todo_content 相同 -> 打印
print("-" * 30)
print(f"打印 (message_id: {r_message_id_str} 已存在, todo_content 相同):")
print("来自 result1.json:")
print(json.dumps(item_r, indent=2, ensure_ascii=False, default=convert_datetime))
print("-" * 30)
stats['printed_existing_id_same_content'] += 1
return record_to_save
# --- 主函数 ---
def compare_and_generate_updates():
"""主函数:加载数据、处理、保存和打印统计信息"""
paths = get_paths()
data_dir = paths['data_dir']
# +++ 新增: 定义 compare_output_file 路径 +++
compare_output_file = os.path.join(data_dir, "compare.json") # 明确输出路径
# 1. 主动触发数据导出流程,获取导出的 JSON 文件路径
extracted_list_path = export_todolist_to_json() # 返回 todolist_export.json 的路径
if not extracted_list_path or not os.path.exists(extracted_list_path):
print("[错误] 导出 todolist 数据失败,流程终止。")
return
# 2. 主动触发消息处理流程,生成 result1.json
result1_path = main() # 返回 result1.json 的路径
if not result1_path or not os.path.exists(result1_path):
print("[错误] 生成 result1.json 失败,流程终止。")
return
# 3. 加载数据
result1_data = load_json_data(result1_path)
extracted_data = load_json_data(extracted_list_path)
if result1_data is None or extracted_data is None:
print("[错误] 数据加载失败,流程终止。")
return
# --- 创建查找结构 ---
try:
existing_message_ids = {str(item['message_id']) for item in extracted_data if 'message_id' in item}
existing_message_id_to_record = {str(item['message_id']): item for item in extracted_data if 'message_id' in item}
existing_todo_contents = {str(item['todo_content']) for item in extracted_data if 'todo_content' in item}
except (KeyError, TypeError) as e:
print(f"[错误] extracted_list.json 文件处理失败: {e}。请检查文件内容和格式。")
return
# --- 初始化 ---
records_for_compare_json = []
all_known_message_ids = set(existing_message_ids)
stats = { # 使用字典来存储统计数据
'processed': 0,
'skipped_missing_keys': 0,
'saved_new_id_new_content': 0,
'printed_new_id_existing_content': 0,
'saved_modified_id_diff_content': 0,
'printed_existing_id_same_content': 0,
'skipped_internal_error': 0 # 用于 process_record 内部错误
}
print("[信息] 开始比较和处理数据...")
# --- 主循环 ---
for item_r in result1_data:
stats['processed'] += 1
record_to_save = process_record(
item_r,
existing_message_ids,
existing_todo_contents,
existing_message_id_to_record,
all_known_message_ids,
stats # 传递 stats 字典用于更新
)
if record_to_save is not None:
records_for_compare_json.append(record_to_save)
# --- 写入文件 ---
try:
with open(compare_output_file, 'w', encoding='utf-8') as f:
json.dump(records_for_compare_json, f, indent=2, ensure_ascii=False, default=convert_datetime)
print(f"✅ 成功生成 compare.json 文件,包含 {len(records_for_compare_json)} 条记录。")
except Exception as e:
print(f"[错误] 写入 compare.json 文件时发生错误: {str(e)}")
# --- 打印统计 ---
print("=" * 40)
print("处理统计:")
print(f" 处理 result1.json 记录总数: {stats['processed']}")
print(f" 跳过 (缺少关键键或类型错误): {stats['skipped_missing_keys']}")
if stats['skipped_internal_error'] > 0:
print(f" 跳过 (内部逻辑错误): {stats['skipped_internal_error']}")
print("-" * 20)
print(" 写入 compare.json:")
print(f" - 新 message_id, 新 todo_content: {stats['saved_new_id_new_content']}")
print(f" - 修改后 message_id (因冲突且 todo_content 不同): {stats['saved_modified_id_diff_content']}")
print("-" * 20)
print(" 打印到控制台:")
print(f" - 新 message_id, 但 todo_content 已存在: {stats['printed_new_id_existing_content']}")
print(f" - message_id 已存在, todo_content 相同: {stats['printed_existing_id_same_content']}")
print("=" * 40)
return records_for_compare_json
if __name__ == "__main__":
compare_and_generate_updates() # 触发整个流程 |