InfoRadar / scripts /analyze_json.py
dqy08's picture
调整json中的model字段位置;增加批量分析脚本
2463616
#!/usr/bin/env python3
"""
批量分析JSON文件脚本
从指定目录递归搜索JSON文件,读取其中的文本内容,向后端发送分析请求,
并将结果保存到新文件。
"""
import argparse
import json
import os
import sys
import time
from pathlib import Path
from typing import Optional, Tuple
try:
import requests
except ImportError:
print("错误: 需要安装 requests 库")
print("请运行: pip install requests")
sys.exit(1)
# 后端API地址
# DEFAULT_API_BASE = "https://dqy08-inforadar.hf.space"
DEFAULT_API_BASE = "http://localhost:5001"
API_ENDPOINT = "/api/analyze"
# Hugging Face Token(用于Private Space,可通过环境变量HF_TOKEN设置)
HF_TOKEN_ENV = "HF_TOKEN"
# 要搜索的目录列表
SEARCH_DIRS = [
"data/demo/未读"
]
def find_json_files(base_dir: str, search_dirs: list) -> list[Path]:
"""递归搜索指定目录下的所有JSON文件"""
json_files = []
base_path = Path(base_dir)
for search_dir in search_dirs:
search_path = base_path / search_dir
if not search_path.exists():
print(f"⚠️ 目录不存在: {search_path}")
continue
# 递归搜索所有.json文件
for json_file in search_path.rglob("*.json"):
json_files.append(json_file)
return json_files
def load_json_file(file_path: Path) -> Optional[dict]:
"""加载JSON文件"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError as e:
print(f"❌ JSON解析错误 {file_path}: {e}")
return None
except Exception as e:
print(f"❌ 读取文件错误 {file_path}: {e}")
return None
def extract_text_from_json(data: dict) -> Optional[str]:
"""从JSON数据中提取文本内容"""
if not isinstance(data, dict):
return None
# 尝试从 request.text 字段提取
request = data.get('request', {})
if isinstance(request, dict):
text = request.get('text')
if text:
return text
# 如果没有request.text,尝试直接获取text字段
text = data.get('text')
if text:
return text
return None
def get_model_from_json(data: dict) -> Optional[str]:
"""从JSON数据中提取模型名称(从result.model读取)"""
if not isinstance(data, dict):
return None
result = data.get('result', {})
if isinstance(result, dict):
model = result.get('model')
if model:
return model
return None
def generate_output_filename(input_path: Path, model_name: Optional[str] = None) -> Path:
"""生成输出文件名,处理后缀逻辑"""
stem = input_path.stem # 不含扩展名的文件名
# 如果已有 _qwen2.5 后缀,则删除
if stem.endswith('_qwen2.5'):
stem = stem[:-8] # 删除 '_qwen2.5'
elif stem.endswith('_qwen2'):
stem = stem[:-6] # 删除 '_qwen2'
# 如果有模型名,添加模型名后缀
if model_name:
# 清理模型名:只将空格替换为下划线,保留点和其他字符
clean_model = model_name.replace(' ', '_')
stem = f"{stem}_{clean_model}"
# 构建新路径
return input_path.parent / f"{stem}.json"
def analyze_text(api_base: str, text: str, model: Optional[str] = None, token: Optional[str] = None, max_retries: int = 3) -> Optional[dict]:
"""向后端发送分析请求,支持自动重试"""
# URL 拼接:确保正确拼接路径
api_base = api_base.rstrip('/')
endpoint = API_ENDPOINT.lstrip('/')
url = f"{api_base}/{endpoint}"
payload = {
"text": text,
"model": model if model else "default" # 使用 "default" 让后端使用默认模型
}
# 构建请求头
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
# 重试逻辑
last_error = None
for attempt in range(1, max_retries + 1):
try:
response = requests.post(
url,
json=payload,
headers=headers,
timeout=300 # 5分钟超时
)
response.raise_for_status()
if attempt > 1:
print(f" ✅ 重试成功 (第 {attempt} 次尝试)")
return response.json()
except requests.exceptions.SSLError as e:
last_error = e
if attempt < max_retries:
wait_time = attempt * 2 # 2秒、4秒、6秒
print(f" ⚠️ SSL错误 (尝试 {attempt}/{max_retries}),{wait_time}秒后重试...")
time.sleep(wait_time)
else:
print(f"❌ SSL错误: {e}")
print(f" 💡 提示: 网络连接不稳定,已重试 {max_retries} 次仍失败")
except requests.exceptions.RequestException as e:
last_error = e
# 对于某些错误,不重试(如 404, 401, 400)
if hasattr(e, 'response') and e.response is not None:
status_code = e.response.status_code
if status_code in [404, 401, 400]:
print(f"❌ API请求错误: {e}")
print(f" 响应状态码: {status_code}")
if status_code == 404:
print(f" 💡 提示: 如果是Private Space,请确保设置了HF Token")
elif status_code == 401:
print(f" 💡 提示: Token无效或已过期,请检查HF Token")
return None
# 对于其他错误,尝试重试
if attempt < max_retries:
wait_time = attempt * 2
print(f" ⚠️ 请求错误 (尝试 {attempt}/{max_retries}),{wait_time}秒后重试...")
time.sleep(wait_time)
else:
print(f"❌ API请求错误: {e}")
if hasattr(e, 'response') and e.response is not None:
try:
error_detail = e.response.json()
print(f" 错误详情: {error_detail}")
except:
if e.response.text:
print(f" 响应内容: {e.response.text[:200]}")
return None
def save_result(output_path: Path, result: dict):
"""保存分析结果到文件"""
try:
# 确保输出目录存在
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"✅ 已保存")
except Exception as e:
print(f"❌ 保存文件错误 {output_path}: {e}")
def process_file(
file_path: Path,
api_base: str,
dry_run: bool = False,
no_write: bool = False,
token: Optional[str] = None
) -> Tuple[bool, Optional[Path]]:
"""处理单个JSON文件"""
print(f"\n📄 处理文件: {file_path}")
# 加载JSON文件
data = load_json_file(file_path)
if data is None:
return False, None
# 提取文本
text = extract_text_from_json(data)
if not text:
print(f"⚠️ 未找到文本内容,跳过")
return False, None
# 提取模型名(仅用于日志显示,实际请求使用默认模型)
original_model = get_model_from_json(data)
print(f" 文本长度: {len(text)} 字符")
if original_model:
print(f" 原文件模型: {original_model} (将使用默认模型)")
else:
print(f" 将使用默认模型")
if dry_run:
# dry-run模式下,无法知道实际使用的模型,使用占位符
print(f" [DRY RUN] 将发送分析请求(使用默认模型,不实际执行)")
# 输出文件名会在实际运行时从响应中获取模型名后生成
return True, None
# 发送分析请求(传递 "default" 让后端使用默认模型)
print(f" 📤 发送分析请求(使用默认模型)...")
result = analyze_text(api_base, text, "default", token) # 传递 "default" 使用默认模型
if result is None:
print(f" ❌ 分析失败")
return False, None
# 检查是否有错误
if result.get('success') is False:
error_msg = result.get('message', '未知错误')
print(f" ❌ 分析失败: {error_msg}")
return False, None
# 从响应中提取实际使用的模型名(用于生成输出文件名)
response_model = None
result_info = result.get('result', {})
if isinstance(result_info, dict):
response_model = result_info.get('model')
if response_model:
print(f" 实际使用模型: {response_model}")
else:
# 如果响应中没有模型名,使用 "default" 作为占位符
response_model = "default"
print(f" ⚠️ 响应中未找到模型名,使用 'default' 作为文件名后缀")
# 使用响应中的模型名生成输出文件名
output_path = generate_output_filename(file_path, response_model)
print(f" 输出文件: {output_path}")
# 保存结果(除非指定了 --no-write)
if no_write:
print(f" [NO WRITE] 跳过保存文件")
else:
save_result(output_path, result)
return True, output_path
def main():
parser = argparse.ArgumentParser(
description="批量分析JSON文件",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 干运行模式(不实际分析)
python analyze_json.py --dry-run
# 实际分析但不保存文件
python analyze_json.py --no-write
# 实际分析并保存文件
python analyze_json.py
# 指定自定义API地址
python analyze_json.py --api-base http://localhost:5001
# 限制最多分析10个文件
python analyze_json.py --max-file 10
# 使用HF Token访问Private Space
python analyze_json.py --hf-token hf_xxxxxxxxxxxxx
# 或通过环境变量设置HF Token
export HF_TOKEN=hf_xxxxxxxxxxxxx
python analyze_json.py
"""
)
parser.add_argument(
'--dry-run',
action='store_true',
help='干运行模式,不实际发送分析请求'
)
parser.add_argument(
'--no-write',
action='store_true',
help='不实际保存文件(仍会发送分析请求)'
)
parser.add_argument(
'--api-base',
type=str,
default=DEFAULT_API_BASE,
help=f'后端API基础地址 (默认: {DEFAULT_API_BASE})'
)
parser.add_argument(
'--base-dir',
type=str,
default='.',
help='项目根目录 (默认: 当前目录)'
)
parser.add_argument(
'--max-file',
type=int,
default=None,
help='最多分析的文件数量 (默认: 无限制)'
)
parser.add_argument(
'--hf-token',
type=str,
default=None,
help=f'Hugging Face Token(用于Private Space,也可通过环境变量{HF_TOKEN_ENV}设置)'
)
args = parser.parse_args()
# 获取HF Token(优先使用命令行参数,其次环境变量)
hf_token = args.hf_token or os.environ.get(HF_TOKEN_ENV)
# 显示配置
print("=" * 60)
print("批量分析JSON文件")
print("=" * 60)
print(f"API地址: {args.api_base}")
print(f"基础目录: {args.base_dir}")
print(f"搜索目录: {', '.join(SEARCH_DIRS)}")
print(f"模式: {'DRY RUN (不实际分析)' if args.dry_run else '实际分析'}")
if args.max_file:
print(f"最大文件数: {args.max_file}")
if hf_token:
token_preview = hf_token[:10] + "..." if len(hf_token) > 10 else hf_token
print(f"HF Token: {token_preview} (已设置)")
else:
print(f"HF Token: 未设置 (如果是Private Space,请通过 --hf-token 或环境变量 {HF_TOKEN_ENV} 设置)")
print("=" * 60)
# 查找所有JSON文件
print(f"\n🔍 搜索JSON文件...")
json_files = find_json_files(args.base_dir, SEARCH_DIRS)
if not json_files:
print("❌ 未找到任何JSON文件")
return
print(f"✅ 找到 {len(json_files)} 个JSON文件")
# 根据 max_file 限制文件数量
if args.max_file and args.max_file > 0:
original_count = len(json_files)
json_files = json_files[:args.max_file]
if len(json_files) < original_count:
print(f"📌 限制处理数量: {len(json_files)} 个文件 (共找到 {original_count} 个)")
# 处理每个文件
success_count = 0
failed_count = 0
total_to_process = len(json_files)
for i, json_file in enumerate(json_files, 1):
print(f"\n[{i}/{total_to_process}]")
success, output_path = process_file(json_file, args.api_base, args.dry_run, args.no_write, hf_token)
if success:
success_count += 1
else:
failed_count += 1
# 统计结果
print("\n" + "=" * 60)
print("处理完成")
print("=" * 60)
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
print(f"总计: {total_to_process}")
print("=" * 60)
if __name__ == '__main__':
main()