import json import os from datetime import datetime def filter_by_duration(input_file, output_file, min_duration=30, max_duration=90): """ 过滤JSON文件,只保留total_duration在[min_duration, max_duration]范围内的条目 并记录被删除的文件信息到日志文件 :param input_file: 输入JSON文件路径 :param output_file: 输出JSON文件路径 :param min_duration: 最小持续时间(秒) :param max_duration: 最大持续时间(秒) """ # 创建日志目录 log_dir = os.path.join(os.path.dirname(output_file), "filter_logs") if not os.path.exists(log_dir): os.makedirs(log_dir) # 创建日志文件(以当前时间命名) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = os.path.join(log_dir, f"removed_entries_{timestamp}.log") # 加载原始JSON文件 with open(input_file, 'r', encoding='utf-8') as f: data = json.load(f) # 初始化过滤结果和删除列表 filtered_data = {} removed_entries = [] # 过滤数据并记录被删除的条目 for key, value in data.items(): if 'total_duration' in value and min_duration <= value['total_duration'] <= max_duration: filtered_data[key] = value else: duration = value.get('total_duration', 'N/A') removed_entries.append({ 'key': key, 'duration': duration, 'original_dialog_id': value.get('original_dialog_id', 'N/A'), 'reason': 'too_short' if isinstance(duration, (int, float)) and duration < min_duration else 'too_long' if isinstance(duration, (int, float)) and duration > max_duration else 'missing_or_invalid' }) # 保存过滤后的结果 with open(output_file, 'w', encoding='utf-8') as f: json.dump(filtered_data, f, indent=2, ensure_ascii=False) # 保存删除日志 with open(log_file, 'w', encoding='utf-8') as f: f.write(f"Filtering log - {timestamp}\n") f.write(f"Input file: {input_file}\n") f.write(f"Output file: {output_file}\n") f.write(f"Duration range: {min_duration}s to {max_duration}s\n\n") f.write("Removed Entries:\n") f.write("="*50 + "\n") for entry in removed_entries: f.write(f"Key: {entry['key']}\n") f.write(f"Original Dialog ID: {entry['original_dialog_id']}\n") f.write(f"Duration: {entry['duration']}s\n") f.write(f"Reason: {entry['reason']}\n") f.write("-"*50 + "\n") print(f"\n处理结果: {os.path.basename(input_file)}") print(f"原始条目数: {len(data)}") print(f"过滤后条目数: {len(filtered_data)}") print(f"已删除 {len(removed_entries)} 个不符合时长要求的条目") print(f"过滤后的数据已保存到: {output_file}") print(f"删除条目日志已保存到: {log_file}") def process_directory(input_dir, output_dir, min_duration=30, max_duration=90): """ 处理目录中的所有JSON文件 """ if not os.path.exists(output_dir): os.makedirs(output_dir) # 创建总日志文件 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") summary_log = os.path.join(output_dir, f"summary_removed_entries_{timestamp}.log") total_removed = 0 total_processed = 0 with open(summary_log, 'w', encoding='utf-8') as summary_f: summary_f.write(f"Summary Filtering Log - {timestamp}\n") summary_f.write(f"Input directory: {input_dir}\n") summary_f.write(f"Output directory: {output_dir}\n") summary_f.write(f"Duration range: {min_duration}s to {max_duration}s\n\n") for filename in os.listdir(input_dir): if filename.endswith('.json'): input_path = os.path.join(input_dir, filename) output_path = os.path.join(output_dir, filename) print(f"\n处理文件: {filename}") filter_by_duration(input_path, output_path, min_duration, max_duration) # 读取单个文件日志以获取统计信息 log_dir = os.path.join(output_dir, "filter_logs") latest_log = max( [f for f in os.listdir(log_dir) if f.startswith('removed_entries')], key=lambda f: os.path.getmtime(os.path.join(log_dir, f))) with open(os.path.join(log_dir, latest_log), 'r', encoding='utf-8') as log_f: log_content = log_f.read() removed_count = log_content.count("Key: ") summary_f.write(f"\nFile: {filename}\n") summary_f.write(f"Removed entries: {removed_count}\n") summary_f.write("-"*40 + "\n") total_removed += removed_count total_processed += 1 summary_f.write(f"\nTotal files processed: {total_processed}\n") summary_f.write(f"Total entries removed: {total_removed}\n") print(f"\n处理完成!所有文件的总日志已保存到: {summary_log}") if __name__ == "__main__": # 使用示例 - 处理单个文件 input_json = "silence.json" # 替换为你的输入文件路径 output_json = "silence_filtered_output.json" # 输出文件路径 filter_by_duration(input_json, output_json) # 使用示例 - 处理整个目录 # input_directory = "./input_4JOB_overlap" # 替换为你的输入目录 # output_directory = "./filtered_output" # 替换为你的输出目录 # process_directory(input_directory, output_directory)