import gradio as gr import json import base64 import re # 【新增】用于正则匹配本地脚本路径 from pathlib import Path # 【新增】用于处理相对路径 from data_manager import data_manager # ============== 状态管理 (封装逻辑) ============== class ReviewState: def __init__(self): self.all_paths = [] self.current_idx = -1 def sync_paths(self): self.all_paths = data_manager.get_all_chart_paths() def get_nav_target(self, direction): new_idx = self.current_idx + direction if 0 <= new_idx < len(self.all_paths): self.current_idx = new_idx return self.all_paths[new_idx] return None nav_state = ReviewState() # ============== 工具函数 (动态内联注入升级版) ============== def to_html_frame(html_content, html_path): if not html_content or not html_path: return '
请选择数据进行加载
' # 获取当前 html 文件所在的本地绝对路径文件夹 base_dir = Path(html_path).parent # 1. 动态内联本地 JS 文件 def script_replacer(match): src_path = match.group(1) # 如果已经是互联网 CDN 链接,则跳过 if src_path.startswith("http://") or src_path.startswith("https://") or src_path.startswith("//"): return match.group(0) # 拼接并解析出本地 JS 文件的真实路径 local_file = (base_dir / src_path).resolve() try: with open(local_file, 'r', encoding='utf-8') as f: content = f.read() # 关键替换:防止 JS 源码中的 导致 HTML 提前闭合崩溃 content = content.replace('', '<\\/script>') return f'' except Exception as e: print(f"Warning: 未找到依赖的本地脚本 {local_file}") return match.group(0) # 失败则保持原样 # 正则匹配所有 标签 html_content = re.sub(r']*?src=["\']([^"\']+)["\'][^>]*>', script_replacer, html_content) # 2. 动态内联本地 CSS 文件 def css_replacer(match): href_path = match.group(1) if href_path.startswith("http://") or href_path.startswith("https://") or href_path.startswith("//"): return match.group(0) local_file = (base_dir / href_path).resolve() try: with open(local_file, 'r', encoding='utf-8') as f: content = f.read() return f'' except Exception as e: return match.group(0) # 匹配 html_content = re.sub(r']*?rel=["\']stylesheet["\'][^>]*?href=["\']([^"\']+)["\'][^>]*?>', css_replacer, html_content) html_content = re.sub(r']*?href=["\']([^"\']+)["\'][^>]*?rel=["\']stylesheet["\'][^>]*?>', css_replacer, html_content) # 3. 转换为最终的自我包裹式 Base64 Iframe b64_content = base64.b64encode(html_content.encode('utf-8')).decode('utf-8') return f'' # ============== 交互逻辑 ============== def handle_source_change(source): struct = data_manager.get_dataset_structure() types = list(struct.get('sources', {}).get(source, {}).get('chart_types', {}).keys()) return gr.update(choices=types, value=types[0] if types else None) def handle_type_change(source, c_type): charts = data_manager.get_chart_list(source, c_type) struct = data_manager.get_dataset_structure() models = struct.get('sources', {}).get(source, {}).get('chart_types', {}).get(c_type, {}).get('models', []) return ( gr.update(choices=charts, value=charts[0] if charts else None), gr.update(choices=models, value=models[0] if models else None) ) def handle_load(source, c_type, c_id, model): if not all([source, c_type, c_id, model]): return [gr.update()] * 8 chart_data = data_manager.get_chart_data(source, c_type, c_id) qa_list = data_manager.get_qa_list(source, c_type, model, c_id) stats = data_manager.get_review_stats() nav_state.sync_paths() for i, p in enumerate(nav_state.all_paths): if p['chart_id'] == c_id and p['model'] == model: nav_state.current_idx = i break # 【传入参数升级】传入 html_path 用于定位本地依赖 html_code = to_html_frame( chart_data.get('html_content', ''), chart_data.get('html_path', '') ) meta_md = "\n".join([f"- **{k}**: {v}" for k, v in chart_data.get('label_info', {}).items()]) qa_json = json.dumps([{"id": q.id, "q": q.question, "a": q.answer} for q in qa_list]) stats_str = f"✅{stats['correct']} | ❌{stats['incorrect']} | 总{stats['total']}" prog_str = f"{nav_state.current_idx + 1} / {len(nav_state.all_paths)}" radio_choices = [f"Q{i+1}: {q.question[:20]}..." for i, q in enumerate(qa_list)] return [ html_code, meta_md, qa_json, stats_str, prog_str, f"{source}/{c_type}/{c_id}", gr.update(choices=radio_choices, value=radio_choices[0] if radio_choices else None), json.dumps({}) ] # 严格匹配 6 个 outputs 数量 def handle_qa_select(selection, qa_json): if not selection or not qa_json: return ["", "", "", "正确", "无", ""] try: qas = json.loads(qa_json) idx = int(selection.split(":")[0][1:]) - 1 curr = qas[idx] return [curr['id'], curr['q'], curr['a'], "正确", "无", ""] except: return ["", "", "", "正确", "无", ""] # ============== UI 布局 ============== def create_ui(): with gr.Blocks(title="审核系统 V2", theme=gr.themes.Soft()) as demo: qa_store = gr.State(value="[]") review_store = gr.State(value="{}") gr.Markdown("## 📑 图表问答数据集审核") with gr.Row(): with gr.Column(scale=4): with gr.Row(): src_dd = gr.Dropdown(label="数据源", choices=["None"]) typ_dd = gr.Dropdown(label="图表类型") id_dd = gr.Dropdown(label="图表 ID") mdl_dd = gr.Dropdown(label="类型") chart_view = gr.HTML(value='
') path_info = gr.Text(label="当前路径", interactive=False) with gr.Column(scale=2): with gr.Group(): stats_txt = gr.Text(label="统计信息", interactive=False) prog_txt = gr.Text(label="审核进度", interactive=False) with gr.Accordion("元数据解析", open=False): meta_md = gr.Markdown() gr.Markdown("---") qa_radio = gr.Radio(label="题目列表", choices=[]) with gr.Group(): curr_qid = gr.Text(visible=False) q_disp = gr.Text(label="问题内容", lines=2) a_disp = gr.Text(label="标准答案") status_opt = gr.Radio( label="审核结论", choices=["正确", "错误", "优化"], value="正确" ) err_type = gr.Dropdown(label="错误分类", choices=["无", "事实错误", "逻辑错误", "图表无法读取"]) comment = gr.Text(label="审核备注") save_status = gr.Text(label="操作反馈", interactive=False) save_btn = gr.Button("💾 提交本题并下一题", variant="primary") with gr.Row(): prev_btn = gr.Button("⬅️ 上一个图表") next_btn = gr.Button("➡️ 下一个图表") gr.Markdown("---") gr.Markdown("### 📦 记录导出与预览") # 【新增替代方案】使用 gr.Code 展示所有记录,自带右上角复制按钮 records_code = gr.Code( label="当前所有审核记录 JSON (光标悬浮至代码块右上角可一键复制)", language="json", interactive=False, lines=15 ) # --- 事件绑定 --- demo.load( fn=lambda: gr.update(choices=list(data_manager.get_dataset_structure().get('sources', {}).keys())), outputs=[src_dd] ) src_dd.change(handle_source_change, inputs=[src_dd], outputs=[typ_dd]) typ_dd.change(handle_type_change, inputs=[src_dd, typ_dd], outputs=[id_dd, mdl_dd]) load_event_outputs = [chart_view, meta_md, qa_store, stats_txt, prog_txt, path_info, qa_radio, review_store] id_dd.change(handle_load, inputs=[src_dd, typ_dd, id_dd, mdl_dd], outputs=load_event_outputs) mdl_dd.change(handle_load, inputs=[src_dd, typ_dd, id_dd, mdl_dd], outputs=load_event_outputs) qa_radio.change( handle_qa_select, inputs=[qa_radio, qa_store], outputs=[curr_qid, q_disp, a_disp, status_opt, err_type, comment] ) def navigate(direction): target = nav_state.get_nav_target(direction) if target: return [ gr.update(value=target['source']), gr.update(value=target['chart_type']), gr.update(value=target['chart_id']), gr.update(value=target['model']), gr.update(value="") ] return [gr.update(), gr.update(), gr.update(), gr.update(), gr.update()] prev_btn.click(lambda: navigate(-1), outputs=[src_dd, typ_dd, id_dd, mdl_dd, save_status]) next_btn.click(lambda: navigate(1), outputs=[src_dd, typ_dd, id_dd, mdl_dd, save_status]) # 【核心逻辑升级】保存 + 自动跳转 + 刷新记录展示 def quick_save_and_next(qid, cid, src, status_label, cmt, current_qa_selection, qa_json_str): if not qid: return "⚠️ 无效操作:未选择题目", gr.update(), gr.update(), gr.update() # 1. 状态映射与保存 status_map = { "正确": "correct", "错误": "incorrect", "优化": "needs_modification" } mapped_status = status_map.get(status_label, "correct") data_manager.save_review({ "qa_id": qid, "chart_id": cid, "source": src, "status": mapped_status, "comment": cmt }) # 2. 刷新统计 stats = data_manager.get_review_stats() stats_str = f"✅{stats['correct']} | ❌{stats['incorrect']} | 总{stats['total']}" feedback = f"✅ 保存成功 (ID: {qid})" # 3. 提取所有记录并格式化为 JSON 字符串给代码块展示 all_reviews = data_manager.get_all_reviews() reviews_json_text = json.dumps(all_reviews, ensure_ascii=False, indent=2) # 4. 自动跳转下一题逻辑 next_qa_update = gr.update() try: qas = json.loads(qa_json_str) # 解析当前选中的题号,比如 "Q1: xxx..." 提取数字 1 curr_q_num = int(current_qa_selection.split(":")[0][1:]) next_idx = curr_q_num # 下一题的索引正好等于当前题号(基于0的索引) if next_idx < len(qas): next_q = qas[next_idx] next_choice = f"Q{next_idx+1}: {next_q['q'][:20]}..." # 通知 Radio 组件切换到下一题,这会自动触发 qa_radio.change 事件刷新题目内容 next_qa_update = gr.update(value=next_choice) else: feedback += " (当前图表题目已全部审核完毕)" except Exception as e: pass return feedback, stats_str, next_qa_update, reviews_json_text # 绑定升级后的保存事件 save_btn.click( quick_save_and_next, inputs=[curr_qid, id_dd, src_dd, status_opt, comment, qa_radio, qa_store], outputs=[save_status, stats_txt, qa_radio, records_code] ) # 初始化时也加载一次记录 demo.load( fn=lambda: json.dumps(data_manager.get_all_reviews(), ensure_ascii=False, indent=2), outputs=[records_code] ) return demo if __name__ == "__main__": app = create_ui() app.launch( server_name="0.0.0.0", server_port=7860, show_api=False, max_threads=10 )