File size: 13,580 Bytes
2463616 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 |
#!/usr/bin/env python3
"""
批量分析JSON文件脚本
从指定目录递归搜索JSON文件,读取其中的文本内容,向后端发送分析请求,
并将结果保存到新文件。
"""
import argparse
import json
import os
import sys
import time
from pathlib import Path
from typing import Optional, Tuple
try:
import requests
except ImportError:
print("错误: 需要安装 requests 库")
print("请运行: pip install requests")
sys.exit(1)
# 后端API地址
# DEFAULT_API_BASE = "https://dqy08-inforadar.hf.space"
DEFAULT_API_BASE = "http://localhost:5001"
API_ENDPOINT = "/api/analyze"
# Hugging Face Token(用于Private Space,可通过环境变量HF_TOKEN设置)
HF_TOKEN_ENV = "HF_TOKEN"
# 要搜索的目录列表
SEARCH_DIRS = [
"data/demo/未读"
]
def find_json_files(base_dir: str, search_dirs: list) -> list[Path]:
"""递归搜索指定目录下的所有JSON文件"""
json_files = []
base_path = Path(base_dir)
for search_dir in search_dirs:
search_path = base_path / search_dir
if not search_path.exists():
print(f"⚠️ 目录不存在: {search_path}")
continue
# 递归搜索所有.json文件
for json_file in search_path.rglob("*.json"):
json_files.append(json_file)
return json_files
def load_json_file(file_path: Path) -> Optional[dict]:
"""加载JSON文件"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError as e:
print(f"❌ JSON解析错误 {file_path}: {e}")
return None
except Exception as e:
print(f"❌ 读取文件错误 {file_path}: {e}")
return None
def extract_text_from_json(data: dict) -> Optional[str]:
"""从JSON数据中提取文本内容"""
if not isinstance(data, dict):
return None
# 尝试从 request.text 字段提取
request = data.get('request', {})
if isinstance(request, dict):
text = request.get('text')
if text:
return text
# 如果没有request.text,尝试直接获取text字段
text = data.get('text')
if text:
return text
return None
def get_model_from_json(data: dict) -> Optional[str]:
"""从JSON数据中提取模型名称(从result.model读取)"""
if not isinstance(data, dict):
return None
result = data.get('result', {})
if isinstance(result, dict):
model = result.get('model')
if model:
return model
return None
def generate_output_filename(input_path: Path, model_name: Optional[str] = None) -> Path:
"""生成输出文件名,处理后缀逻辑"""
stem = input_path.stem # 不含扩展名的文件名
# 如果已有 _qwen2.5 后缀,则删除
if stem.endswith('_qwen2.5'):
stem = stem[:-8] # 删除 '_qwen2.5'
elif stem.endswith('_qwen2'):
stem = stem[:-6] # 删除 '_qwen2'
# 如果有模型名,添加模型名后缀
if model_name:
# 清理模型名:只将空格替换为下划线,保留点和其他字符
clean_model = model_name.replace(' ', '_')
stem = f"{stem}_{clean_model}"
# 构建新路径
return input_path.parent / f"{stem}.json"
def analyze_text(api_base: str, text: str, model: Optional[str] = None, token: Optional[str] = None, max_retries: int = 3) -> Optional[dict]:
"""向后端发送分析请求,支持自动重试"""
# URL 拼接:确保正确拼接路径
api_base = api_base.rstrip('/')
endpoint = API_ENDPOINT.lstrip('/')
url = f"{api_base}/{endpoint}"
payload = {
"text": text,
"model": model if model else "default" # 使用 "default" 让后端使用默认模型
}
# 构建请求头
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
# 重试逻辑
last_error = None
for attempt in range(1, max_retries + 1):
try:
response = requests.post(
url,
json=payload,
headers=headers,
timeout=300 # 5分钟超时
)
response.raise_for_status()
if attempt > 1:
print(f" ✅ 重试成功 (第 {attempt} 次尝试)")
return response.json()
except requests.exceptions.SSLError as e:
last_error = e
if attempt < max_retries:
wait_time = attempt * 2 # 2秒、4秒、6秒
print(f" ⚠️ SSL错误 (尝试 {attempt}/{max_retries}),{wait_time}秒后重试...")
time.sleep(wait_time)
else:
print(f"❌ SSL错误: {e}")
print(f" 💡 提示: 网络连接不稳定,已重试 {max_retries} 次仍失败")
except requests.exceptions.RequestException as e:
last_error = e
# 对于某些错误,不重试(如 404, 401, 400)
if hasattr(e, 'response') and e.response is not None:
status_code = e.response.status_code
if status_code in [404, 401, 400]:
print(f"❌ API请求错误: {e}")
print(f" 响应状态码: {status_code}")
if status_code == 404:
print(f" 💡 提示: 如果是Private Space,请确保设置了HF Token")
elif status_code == 401:
print(f" 💡 提示: Token无效或已过期,请检查HF Token")
return None
# 对于其他错误,尝试重试
if attempt < max_retries:
wait_time = attempt * 2
print(f" ⚠️ 请求错误 (尝试 {attempt}/{max_retries}),{wait_time}秒后重试...")
time.sleep(wait_time)
else:
print(f"❌ API请求错误: {e}")
if hasattr(e, 'response') and e.response is not None:
try:
error_detail = e.response.json()
print(f" 错误详情: {error_detail}")
except:
if e.response.text:
print(f" 响应内容: {e.response.text[:200]}")
return None
def save_result(output_path: Path, result: dict):
"""保存分析结果到文件"""
try:
# 确保输出目录存在
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"✅ 已保存")
except Exception as e:
print(f"❌ 保存文件错误 {output_path}: {e}")
def process_file(
file_path: Path,
api_base: str,
dry_run: bool = False,
no_write: bool = False,
token: Optional[str] = None
) -> Tuple[bool, Optional[Path]]:
"""处理单个JSON文件"""
print(f"\n📄 处理文件: {file_path}")
# 加载JSON文件
data = load_json_file(file_path)
if data is None:
return False, None
# 提取文本
text = extract_text_from_json(data)
if not text:
print(f"⚠️ 未找到文本内容,跳过")
return False, None
# 提取模型名(仅用于日志显示,实际请求使用默认模型)
original_model = get_model_from_json(data)
print(f" 文本长度: {len(text)} 字符")
if original_model:
print(f" 原文件模型: {original_model} (将使用默认模型)")
else:
print(f" 将使用默认模型")
if dry_run:
# dry-run模式下,无法知道实际使用的模型,使用占位符
print(f" [DRY RUN] 将发送分析请求(使用默认模型,不实际执行)")
# 输出文件名会在实际运行时从响应中获取模型名后生成
return True, None
# 发送分析请求(传递 "default" 让后端使用默认模型)
print(f" 📤 发送分析请求(使用默认模型)...")
result = analyze_text(api_base, text, "default", token) # 传递 "default" 使用默认模型
if result is None:
print(f" ❌ 分析失败")
return False, None
# 检查是否有错误
if result.get('success') is False:
error_msg = result.get('message', '未知错误')
print(f" ❌ 分析失败: {error_msg}")
return False, None
# 从响应中提取实际使用的模型名(用于生成输出文件名)
response_model = None
result_info = result.get('result', {})
if isinstance(result_info, dict):
response_model = result_info.get('model')
if response_model:
print(f" 实际使用模型: {response_model}")
else:
# 如果响应中没有模型名,使用 "default" 作为占位符
response_model = "default"
print(f" ⚠️ 响应中未找到模型名,使用 'default' 作为文件名后缀")
# 使用响应中的模型名生成输出文件名
output_path = generate_output_filename(file_path, response_model)
print(f" 输出文件: {output_path}")
# 保存结果(除非指定了 --no-write)
if no_write:
print(f" [NO WRITE] 跳过保存文件")
else:
save_result(output_path, result)
return True, output_path
def main():
parser = argparse.ArgumentParser(
description="批量分析JSON文件",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 干运行模式(不实际分析)
python analyze_json.py --dry-run
# 实际分析但不保存文件
python analyze_json.py --no-write
# 实际分析并保存文件
python analyze_json.py
# 指定自定义API地址
python analyze_json.py --api-base http://localhost:5001
# 限制最多分析10个文件
python analyze_json.py --max-file 10
# 使用HF Token访问Private Space
python analyze_json.py --hf-token hf_xxxxxxxxxxxxx
# 或通过环境变量设置HF Token
export HF_TOKEN=hf_xxxxxxxxxxxxx
python analyze_json.py
"""
)
parser.add_argument(
'--dry-run',
action='store_true',
help='干运行模式,不实际发送分析请求'
)
parser.add_argument(
'--no-write',
action='store_true',
help='不实际保存文件(仍会发送分析请求)'
)
parser.add_argument(
'--api-base',
type=str,
default=DEFAULT_API_BASE,
help=f'后端API基础地址 (默认: {DEFAULT_API_BASE})'
)
parser.add_argument(
'--base-dir',
type=str,
default='.',
help='项目根目录 (默认: 当前目录)'
)
parser.add_argument(
'--max-file',
type=int,
default=None,
help='最多分析的文件数量 (默认: 无限制)'
)
parser.add_argument(
'--hf-token',
type=str,
default=None,
help=f'Hugging Face Token(用于Private Space,也可通过环境变量{HF_TOKEN_ENV}设置)'
)
args = parser.parse_args()
# 获取HF Token(优先使用命令行参数,其次环境变量)
hf_token = args.hf_token or os.environ.get(HF_TOKEN_ENV)
# 显示配置
print("=" * 60)
print("批量分析JSON文件")
print("=" * 60)
print(f"API地址: {args.api_base}")
print(f"基础目录: {args.base_dir}")
print(f"搜索目录: {', '.join(SEARCH_DIRS)}")
print(f"模式: {'DRY RUN (不实际分析)' if args.dry_run else '实际分析'}")
if args.max_file:
print(f"最大文件数: {args.max_file}")
if hf_token:
token_preview = hf_token[:10] + "..." if len(hf_token) > 10 else hf_token
print(f"HF Token: {token_preview} (已设置)")
else:
print(f"HF Token: 未设置 (如果是Private Space,请通过 --hf-token 或环境变量 {HF_TOKEN_ENV} 设置)")
print("=" * 60)
# 查找所有JSON文件
print(f"\n🔍 搜索JSON文件...")
json_files = find_json_files(args.base_dir, SEARCH_DIRS)
if not json_files:
print("❌ 未找到任何JSON文件")
return
print(f"✅ 找到 {len(json_files)} 个JSON文件")
# 根据 max_file 限制文件数量
if args.max_file and args.max_file > 0:
original_count = len(json_files)
json_files = json_files[:args.max_file]
if len(json_files) < original_count:
print(f"📌 限制处理数量: {len(json_files)} 个文件 (共找到 {original_count} 个)")
# 处理每个文件
success_count = 0
failed_count = 0
total_to_process = len(json_files)
for i, json_file in enumerate(json_files, 1):
print(f"\n[{i}/{total_to_process}]")
success, output_path = process_file(json_file, args.api_base, args.dry_run, args.no_write, hf_token)
if success:
success_count += 1
else:
failed_count += 1
# 统计结果
print("\n" + "=" * 60)
print("处理完成")
print("=" * 60)
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
print(f"总计: {total_to_process}")
print("=" * 60)
if __name__ == '__main__':
main()
|