#!/usr/bin/env python3 """ 批量分析JSON文件脚本 从指定目录递归搜索JSON文件,读取其中的文本内容,向后端发送分析请求, 并将结果保存到新文件。 """ import argparse import json import os import sys import time from pathlib import Path from typing import Optional, Tuple try: import requests except ImportError: print("错误: 需要安装 requests 库") print("请运行: pip install requests") sys.exit(1) # 后端API地址 # DEFAULT_API_BASE = "https://dqy08-inforadar.hf.space" DEFAULT_API_BASE = "http://localhost:5001" API_ENDPOINT = "/api/analyze" # Hugging Face Token(用于Private Space,可通过环境变量HF_TOKEN设置) HF_TOKEN_ENV = "HF_TOKEN" # 要搜索的目录列表 SEARCH_DIRS = [ "data/demo/未读" ] def find_json_files(base_dir: str, search_dirs: list) -> list[Path]: """递归搜索指定目录下的所有JSON文件""" json_files = [] base_path = Path(base_dir) for search_dir in search_dirs: search_path = base_path / search_dir if not search_path.exists(): print(f"⚠️ 目录不存在: {search_path}") continue # 递归搜索所有.json文件 for json_file in search_path.rglob("*.json"): json_files.append(json_file) return json_files def load_json_file(file_path: Path) -> Optional[dict]: """加载JSON文件""" try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except json.JSONDecodeError as e: print(f"❌ JSON解析错误 {file_path}: {e}") return None except Exception as e: print(f"❌ 读取文件错误 {file_path}: {e}") return None def extract_text_from_json(data: dict) -> Optional[str]: """从JSON数据中提取文本内容""" if not isinstance(data, dict): return None # 尝试从 request.text 字段提取 request = data.get('request', {}) if isinstance(request, dict): text = request.get('text') if text: return text # 如果没有request.text,尝试直接获取text字段 text = data.get('text') if text: return text return None def get_model_from_json(data: dict) -> Optional[str]: """从JSON数据中提取模型名称(从result.model读取)""" if not isinstance(data, dict): return None result = data.get('result', {}) if isinstance(result, dict): model = result.get('model') if model: return model return None def generate_output_filename(input_path: Path, model_name: Optional[str] = None) -> Path: """生成输出文件名,处理后缀逻辑""" stem = input_path.stem # 不含扩展名的文件名 # 如果已有 _qwen2.5 后缀,则删除 if stem.endswith('_qwen2.5'): stem = stem[:-8] # 删除 '_qwen2.5' elif stem.endswith('_qwen2'): stem = stem[:-6] # 删除 '_qwen2' # 如果有模型名,添加模型名后缀 if model_name: # 清理模型名:只将空格替换为下划线,保留点和其他字符 clean_model = model_name.replace(' ', '_') stem = f"{stem}_{clean_model}" # 构建新路径 return input_path.parent / f"{stem}.json" def analyze_text(api_base: str, text: str, model: Optional[str] = None, token: Optional[str] = None, max_retries: int = 3) -> Optional[dict]: """向后端发送分析请求,支持自动重试""" # URL 拼接:确保正确拼接路径 api_base = api_base.rstrip('/') endpoint = API_ENDPOINT.lstrip('/') url = f"{api_base}/{endpoint}" payload = { "text": text, "model": model if model else "default" # 使用 "default" 让后端使用默认模型 } # 构建请求头 headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" # 重试逻辑 last_error = None for attempt in range(1, max_retries + 1): try: response = requests.post( url, json=payload, headers=headers, timeout=300 # 5分钟超时 ) response.raise_for_status() if attempt > 1: print(f" ✅ 重试成功 (第 {attempt} 次尝试)") return response.json() except requests.exceptions.SSLError as e: last_error = e if attempt < max_retries: wait_time = attempt * 2 # 2秒、4秒、6秒 print(f" ⚠️ SSL错误 (尝试 {attempt}/{max_retries}),{wait_time}秒后重试...") time.sleep(wait_time) else: print(f"❌ SSL错误: {e}") print(f" 💡 提示: 网络连接不稳定,已重试 {max_retries} 次仍失败") except requests.exceptions.RequestException as e: last_error = e # 对于某些错误,不重试(如 404, 401, 400) if hasattr(e, 'response') and e.response is not None: status_code = e.response.status_code if status_code in [404, 401, 400]: print(f"❌ API请求错误: {e}") print(f" 响应状态码: {status_code}") if status_code == 404: print(f" 💡 提示: 如果是Private Space,请确保设置了HF Token") elif status_code == 401: print(f" 💡 提示: Token无效或已过期,请检查HF Token") return None # 对于其他错误,尝试重试 if attempt < max_retries: wait_time = attempt * 2 print(f" ⚠️ 请求错误 (尝试 {attempt}/{max_retries}),{wait_time}秒后重试...") time.sleep(wait_time) else: print(f"❌ API请求错误: {e}") if hasattr(e, 'response') and e.response is not None: try: error_detail = e.response.json() print(f" 错误详情: {error_detail}") except: if e.response.text: print(f" 响应内容: {e.response.text[:200]}") return None def save_result(output_path: Path, result: dict): """保存分析结果到文件""" try: # 确保输出目录存在 output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) print(f"✅ 已保存") except Exception as e: print(f"❌ 保存文件错误 {output_path}: {e}") def process_file( file_path: Path, api_base: str, dry_run: bool = False, no_write: bool = False, token: Optional[str] = None ) -> Tuple[bool, Optional[Path]]: """处理单个JSON文件""" print(f"\n📄 处理文件: {file_path}") # 加载JSON文件 data = load_json_file(file_path) if data is None: return False, None # 提取文本 text = extract_text_from_json(data) if not text: print(f"⚠️ 未找到文本内容,跳过") return False, None # 提取模型名(仅用于日志显示,实际请求使用默认模型) original_model = get_model_from_json(data) print(f" 文本长度: {len(text)} 字符") if original_model: print(f" 原文件模型: {original_model} (将使用默认模型)") else: print(f" 将使用默认模型") if dry_run: # dry-run模式下,无法知道实际使用的模型,使用占位符 print(f" [DRY RUN] 将发送分析请求(使用默认模型,不实际执行)") # 输出文件名会在实际运行时从响应中获取模型名后生成 return True, None # 发送分析请求(传递 "default" 让后端使用默认模型) print(f" 📤 发送分析请求(使用默认模型)...") result = analyze_text(api_base, text, "default", token) # 传递 "default" 使用默认模型 if result is None: print(f" ❌ 分析失败") return False, None # 检查是否有错误 if result.get('success') is False: error_msg = result.get('message', '未知错误') print(f" ❌ 分析失败: {error_msg}") return False, None # 从响应中提取实际使用的模型名(用于生成输出文件名) response_model = None result_info = result.get('result', {}) if isinstance(result_info, dict): response_model = result_info.get('model') if response_model: print(f" 实际使用模型: {response_model}") else: # 如果响应中没有模型名,使用 "default" 作为占位符 response_model = "default" print(f" ⚠️ 响应中未找到模型名,使用 'default' 作为文件名后缀") # 使用响应中的模型名生成输出文件名 output_path = generate_output_filename(file_path, response_model) print(f" 输出文件: {output_path}") # 保存结果(除非指定了 --no-write) if no_write: print(f" [NO WRITE] 跳过保存文件") else: save_result(output_path, result) return True, output_path def main(): parser = argparse.ArgumentParser( description="批量分析JSON文件", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 干运行模式(不实际分析) python analyze_json.py --dry-run # 实际分析但不保存文件 python analyze_json.py --no-write # 实际分析并保存文件 python analyze_json.py # 指定自定义API地址 python analyze_json.py --api-base http://localhost:5001 # 限制最多分析10个文件 python analyze_json.py --max-file 10 # 使用HF Token访问Private Space python analyze_json.py --hf-token hf_xxxxxxxxxxxxx # 或通过环境变量设置HF Token export HF_TOKEN=hf_xxxxxxxxxxxxx python analyze_json.py """ ) parser.add_argument( '--dry-run', action='store_true', help='干运行模式,不实际发送分析请求' ) parser.add_argument( '--no-write', action='store_true', help='不实际保存文件(仍会发送分析请求)' ) parser.add_argument( '--api-base', type=str, default=DEFAULT_API_BASE, help=f'后端API基础地址 (默认: {DEFAULT_API_BASE})' ) parser.add_argument( '--base-dir', type=str, default='.', help='项目根目录 (默认: 当前目录)' ) parser.add_argument( '--max-file', type=int, default=None, help='最多分析的文件数量 (默认: 无限制)' ) parser.add_argument( '--hf-token', type=str, default=None, help=f'Hugging Face Token(用于Private Space,也可通过环境变量{HF_TOKEN_ENV}设置)' ) args = parser.parse_args() # 获取HF Token(优先使用命令行参数,其次环境变量) hf_token = args.hf_token or os.environ.get(HF_TOKEN_ENV) # 显示配置 print("=" * 60) print("批量分析JSON文件") print("=" * 60) print(f"API地址: {args.api_base}") print(f"基础目录: {args.base_dir}") print(f"搜索目录: {', '.join(SEARCH_DIRS)}") print(f"模式: {'DRY RUN (不实际分析)' if args.dry_run else '实际分析'}") if args.max_file: print(f"最大文件数: {args.max_file}") if hf_token: token_preview = hf_token[:10] + "..." if len(hf_token) > 10 else hf_token print(f"HF Token: {token_preview} (已设置)") else: print(f"HF Token: 未设置 (如果是Private Space,请通过 --hf-token 或环境变量 {HF_TOKEN_ENV} 设置)") print("=" * 60) # 查找所有JSON文件 print(f"\n🔍 搜索JSON文件...") json_files = find_json_files(args.base_dir, SEARCH_DIRS) if not json_files: print("❌ 未找到任何JSON文件") return print(f"✅ 找到 {len(json_files)} 个JSON文件") # 根据 max_file 限制文件数量 if args.max_file and args.max_file > 0: original_count = len(json_files) json_files = json_files[:args.max_file] if len(json_files) < original_count: print(f"📌 限制处理数量: {len(json_files)} 个文件 (共找到 {original_count} 个)") # 处理每个文件 success_count = 0 failed_count = 0 total_to_process = len(json_files) for i, json_file in enumerate(json_files, 1): print(f"\n[{i}/{total_to_process}]") success, output_path = process_file(json_file, args.api_base, args.dry_run, args.no_write, hf_token) if success: success_count += 1 else: failed_count += 1 # 统计结果 print("\n" + "=" * 60) print("处理完成") print("=" * 60) print(f"成功: {success_count}") print(f"失败: {failed_count}") print(f"总计: {total_to_process}") print("=" * 60) if __name__ == '__main__': main()