|
|
|
|
|
""" |
|
|
批量分析JSON文件脚本 |
|
|
|
|
|
从指定目录递归搜索JSON文件,读取其中的文本内容,向后端发送分析请求, |
|
|
并将结果保存到新文件。 |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import os |
|
|
import sys |
|
|
import time |
|
|
from pathlib import Path |
|
|
from typing import Optional, Tuple |
|
|
|
|
|
try: |
|
|
import requests |
|
|
except ImportError: |
|
|
print("错误: 需要安装 requests 库") |
|
|
print("请运行: pip install requests") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_API_BASE = "http://localhost:5001" |
|
|
API_ENDPOINT = "/api/analyze" |
|
|
|
|
|
|
|
|
HF_TOKEN_ENV = "HF_TOKEN" |
|
|
|
|
|
|
|
|
SEARCH_DIRS = [ |
|
|
"data/demo/未读" |
|
|
] |
|
|
|
|
|
|
|
|
def find_json_files(base_dir: str, search_dirs: list) -> list[Path]: |
|
|
"""递归搜索指定目录下的所有JSON文件""" |
|
|
json_files = [] |
|
|
base_path = Path(base_dir) |
|
|
|
|
|
for search_dir in search_dirs: |
|
|
search_path = base_path / search_dir |
|
|
if not search_path.exists(): |
|
|
print(f"⚠️ 目录不存在: {search_path}") |
|
|
continue |
|
|
|
|
|
|
|
|
for json_file in search_path.rglob("*.json"): |
|
|
json_files.append(json_file) |
|
|
|
|
|
return json_files |
|
|
|
|
|
|
|
|
def load_json_file(file_path: Path) -> Optional[dict]: |
|
|
"""加载JSON文件""" |
|
|
try: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
return json.load(f) |
|
|
except json.JSONDecodeError as e: |
|
|
print(f"❌ JSON解析错误 {file_path}: {e}") |
|
|
return None |
|
|
except Exception as e: |
|
|
print(f"❌ 读取文件错误 {file_path}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def extract_text_from_json(data: dict) -> Optional[str]: |
|
|
"""从JSON数据中提取文本内容""" |
|
|
if not isinstance(data, dict): |
|
|
return None |
|
|
|
|
|
|
|
|
request = data.get('request', {}) |
|
|
if isinstance(request, dict): |
|
|
text = request.get('text') |
|
|
if text: |
|
|
return text |
|
|
|
|
|
|
|
|
text = data.get('text') |
|
|
if text: |
|
|
return text |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def get_model_from_json(data: dict) -> Optional[str]: |
|
|
"""从JSON数据中提取模型名称(从result.model读取)""" |
|
|
if not isinstance(data, dict): |
|
|
return None |
|
|
|
|
|
result = data.get('result', {}) |
|
|
if isinstance(result, dict): |
|
|
model = result.get('model') |
|
|
if model: |
|
|
return model |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def generate_output_filename(input_path: Path, model_name: Optional[str] = None) -> Path: |
|
|
"""生成输出文件名,处理后缀逻辑""" |
|
|
stem = input_path.stem |
|
|
|
|
|
|
|
|
if stem.endswith('_qwen2.5'): |
|
|
stem = stem[:-8] |
|
|
elif stem.endswith('_qwen2'): |
|
|
stem = stem[:-6] |
|
|
|
|
|
|
|
|
if model_name: |
|
|
|
|
|
clean_model = model_name.replace(' ', '_') |
|
|
stem = f"{stem}_{clean_model}" |
|
|
|
|
|
|
|
|
return input_path.parent / f"{stem}.json" |
|
|
|
|
|
|
|
|
def analyze_text(api_base: str, text: str, model: Optional[str] = None, token: Optional[str] = None, max_retries: int = 3) -> Optional[dict]: |
|
|
"""向后端发送分析请求,支持自动重试""" |
|
|
|
|
|
api_base = api_base.rstrip('/') |
|
|
endpoint = API_ENDPOINT.lstrip('/') |
|
|
url = f"{api_base}/{endpoint}" |
|
|
|
|
|
payload = { |
|
|
"text": text, |
|
|
"model": model if model else "default" |
|
|
} |
|
|
|
|
|
|
|
|
headers = {"Content-Type": "application/json"} |
|
|
if token: |
|
|
headers["Authorization"] = f"Bearer {token}" |
|
|
|
|
|
|
|
|
last_error = None |
|
|
for attempt in range(1, max_retries + 1): |
|
|
try: |
|
|
response = requests.post( |
|
|
url, |
|
|
json=payload, |
|
|
headers=headers, |
|
|
timeout=300 |
|
|
) |
|
|
response.raise_for_status() |
|
|
if attempt > 1: |
|
|
print(f" ✅ 重试成功 (第 {attempt} 次尝试)") |
|
|
return response.json() |
|
|
|
|
|
except requests.exceptions.SSLError as e: |
|
|
last_error = e |
|
|
if attempt < max_retries: |
|
|
wait_time = attempt * 2 |
|
|
print(f" ⚠️ SSL错误 (尝试 {attempt}/{max_retries}),{wait_time}秒后重试...") |
|
|
time.sleep(wait_time) |
|
|
else: |
|
|
print(f"❌ SSL错误: {e}") |
|
|
print(f" 💡 提示: 网络连接不稳定,已重试 {max_retries} 次仍失败") |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
last_error = e |
|
|
|
|
|
if hasattr(e, 'response') and e.response is not None: |
|
|
status_code = e.response.status_code |
|
|
if status_code in [404, 401, 400]: |
|
|
print(f"❌ API请求错误: {e}") |
|
|
print(f" 响应状态码: {status_code}") |
|
|
if status_code == 404: |
|
|
print(f" 💡 提示: 如果是Private Space,请确保设置了HF Token") |
|
|
elif status_code == 401: |
|
|
print(f" 💡 提示: Token无效或已过期,请检查HF Token") |
|
|
return None |
|
|
|
|
|
|
|
|
if attempt < max_retries: |
|
|
wait_time = attempt * 2 |
|
|
print(f" ⚠️ 请求错误 (尝试 {attempt}/{max_retries}),{wait_time}秒后重试...") |
|
|
time.sleep(wait_time) |
|
|
else: |
|
|
print(f"❌ API请求错误: {e}") |
|
|
if hasattr(e, 'response') and e.response is not None: |
|
|
try: |
|
|
error_detail = e.response.json() |
|
|
print(f" 错误详情: {error_detail}") |
|
|
except: |
|
|
if e.response.text: |
|
|
print(f" 响应内容: {e.response.text[:200]}") |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def save_result(output_path: Path, result: dict): |
|
|
"""保存分析结果到文件""" |
|
|
try: |
|
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(result, f, ensure_ascii=False, indent=2) |
|
|
print(f"✅ 已保存") |
|
|
except Exception as e: |
|
|
print(f"❌ 保存文件错误 {output_path}: {e}") |
|
|
|
|
|
|
|
|
def process_file( |
|
|
file_path: Path, |
|
|
api_base: str, |
|
|
dry_run: bool = False, |
|
|
no_write: bool = False, |
|
|
token: Optional[str] = None |
|
|
) -> Tuple[bool, Optional[Path]]: |
|
|
"""处理单个JSON文件""" |
|
|
print(f"\n📄 处理文件: {file_path}") |
|
|
|
|
|
|
|
|
data = load_json_file(file_path) |
|
|
if data is None: |
|
|
return False, None |
|
|
|
|
|
|
|
|
text = extract_text_from_json(data) |
|
|
if not text: |
|
|
print(f"⚠️ 未找到文本内容,跳过") |
|
|
return False, None |
|
|
|
|
|
|
|
|
original_model = get_model_from_json(data) |
|
|
|
|
|
print(f" 文本长度: {len(text)} 字符") |
|
|
if original_model: |
|
|
print(f" 原文件模型: {original_model} (将使用默认模型)") |
|
|
else: |
|
|
print(f" 将使用默认模型") |
|
|
|
|
|
if dry_run: |
|
|
|
|
|
print(f" [DRY RUN] 将发送分析请求(使用默认模型,不实际执行)") |
|
|
|
|
|
return True, None |
|
|
|
|
|
|
|
|
print(f" 📤 发送分析请求(使用默认模型)...") |
|
|
result = analyze_text(api_base, text, "default", token) |
|
|
|
|
|
if result is None: |
|
|
print(f" ❌ 分析失败") |
|
|
return False, None |
|
|
|
|
|
|
|
|
if result.get('success') is False: |
|
|
error_msg = result.get('message', '未知错误') |
|
|
print(f" ❌ 分析失败: {error_msg}") |
|
|
return False, None |
|
|
|
|
|
|
|
|
response_model = None |
|
|
result_info = result.get('result', {}) |
|
|
if isinstance(result_info, dict): |
|
|
response_model = result_info.get('model') |
|
|
|
|
|
if response_model: |
|
|
print(f" 实际使用模型: {response_model}") |
|
|
else: |
|
|
|
|
|
response_model = "default" |
|
|
print(f" ⚠️ 响应中未找到模型名,使用 'default' 作为文件名后缀") |
|
|
|
|
|
|
|
|
output_path = generate_output_filename(file_path, response_model) |
|
|
print(f" 输出文件: {output_path}") |
|
|
|
|
|
|
|
|
if no_write: |
|
|
print(f" [NO WRITE] 跳过保存文件") |
|
|
else: |
|
|
save_result(output_path, result) |
|
|
|
|
|
return True, output_path |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="批量分析JSON文件", |
|
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
|
epilog=""" |
|
|
示例: |
|
|
# 干运行模式(不实际分析) |
|
|
python analyze_json.py --dry-run |
|
|
|
|
|
# 实际分析但不保存文件 |
|
|
python analyze_json.py --no-write |
|
|
|
|
|
# 实际分析并保存文件 |
|
|
python analyze_json.py |
|
|
|
|
|
# 指定自定义API地址 |
|
|
python analyze_json.py --api-base http://localhost:5001 |
|
|
|
|
|
# 限制最多分析10个文件 |
|
|
python analyze_json.py --max-file 10 |
|
|
|
|
|
# 使用HF Token访问Private Space |
|
|
python analyze_json.py --hf-token hf_xxxxxxxxxxxxx |
|
|
|
|
|
# 或通过环境变量设置HF Token |
|
|
export HF_TOKEN=hf_xxxxxxxxxxxxx |
|
|
python analyze_json.py |
|
|
""" |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
'--dry-run', |
|
|
action='store_true', |
|
|
help='干运行模式,不实际发送分析请求' |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
'--no-write', |
|
|
action='store_true', |
|
|
help='不实际保存文件(仍会发送分析请求)' |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
'--api-base', |
|
|
type=str, |
|
|
default=DEFAULT_API_BASE, |
|
|
help=f'后端API基础地址 (默认: {DEFAULT_API_BASE})' |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
'--base-dir', |
|
|
type=str, |
|
|
default='.', |
|
|
help='项目根目录 (默认: 当前目录)' |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
'--max-file', |
|
|
type=int, |
|
|
default=None, |
|
|
help='最多分析的文件数量 (默认: 无限制)' |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
'--hf-token', |
|
|
type=str, |
|
|
default=None, |
|
|
help=f'Hugging Face Token(用于Private Space,也可通过环境变量{HF_TOKEN_ENV}设置)' |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
hf_token = args.hf_token or os.environ.get(HF_TOKEN_ENV) |
|
|
|
|
|
|
|
|
print("=" * 60) |
|
|
print("批量分析JSON文件") |
|
|
print("=" * 60) |
|
|
print(f"API地址: {args.api_base}") |
|
|
print(f"基础目录: {args.base_dir}") |
|
|
print(f"搜索目录: {', '.join(SEARCH_DIRS)}") |
|
|
print(f"模式: {'DRY RUN (不实际分析)' if args.dry_run else '实际分析'}") |
|
|
if args.max_file: |
|
|
print(f"最大文件数: {args.max_file}") |
|
|
if hf_token: |
|
|
token_preview = hf_token[:10] + "..." if len(hf_token) > 10 else hf_token |
|
|
print(f"HF Token: {token_preview} (已设置)") |
|
|
else: |
|
|
print(f"HF Token: 未设置 (如果是Private Space,请通过 --hf-token 或环境变量 {HF_TOKEN_ENV} 设置)") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
print(f"\n🔍 搜索JSON文件...") |
|
|
json_files = find_json_files(args.base_dir, SEARCH_DIRS) |
|
|
|
|
|
if not json_files: |
|
|
print("❌ 未找到任何JSON文件") |
|
|
return |
|
|
|
|
|
print(f"✅ 找到 {len(json_files)} 个JSON文件") |
|
|
|
|
|
|
|
|
if args.max_file and args.max_file > 0: |
|
|
original_count = len(json_files) |
|
|
json_files = json_files[:args.max_file] |
|
|
if len(json_files) < original_count: |
|
|
print(f"📌 限制处理数量: {len(json_files)} 个文件 (共找到 {original_count} 个)") |
|
|
|
|
|
|
|
|
success_count = 0 |
|
|
failed_count = 0 |
|
|
total_to_process = len(json_files) |
|
|
|
|
|
for i, json_file in enumerate(json_files, 1): |
|
|
print(f"\n[{i}/{total_to_process}]") |
|
|
success, output_path = process_file(json_file, args.api_base, args.dry_run, args.no_write, hf_token) |
|
|
|
|
|
if success: |
|
|
success_count += 1 |
|
|
else: |
|
|
failed_count += 1 |
|
|
|
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("处理完成") |
|
|
print("=" * 60) |
|
|
print(f"成功: {success_count}") |
|
|
print(f"失败: {failed_count}") |
|
|
print(f"总计: {total_to_process}") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|
|
|
|