| import gradio as gr |
| import json |
| import base64 |
| import re |
| from pathlib import Path |
| from data_manager import data_manager |
|
|
| |
| class ReviewState: |
| def __init__(self): |
| self.all_paths = [] |
| self.current_idx = -1 |
|
|
| def sync_paths(self): |
| self.all_paths = data_manager.get_all_chart_paths() |
|
|
| def get_nav_target(self, direction): |
| new_idx = self.current_idx + direction |
| if 0 <= new_idx < len(self.all_paths): |
| self.current_idx = new_idx |
| return self.all_paths[new_idx] |
| return None |
|
|
| nav_state = ReviewState() |
|
|
| |
| def to_html_frame(html_content, html_path): |
| if not html_content or not html_path: |
| return '<div style="padding:20px;text-align:center;">请选择数据进行加载</div>' |
| |
| |
| base_dir = Path(html_path).parent |
|
|
| |
| def script_replacer(match): |
| src_path = match.group(1) |
| |
| if src_path.startswith("http://") or src_path.startswith("https://") or src_path.startswith("//"): |
| return match.group(0) |
| |
| |
| local_file = (base_dir / src_path).resolve() |
| try: |
| with open(local_file, 'r', encoding='utf-8') as f: |
| content = f.read() |
| |
| content = content.replace('</script>', '<\\/script>') |
| return f'<script>\n{content}\n</script>' |
| except Exception as e: |
| print(f"Warning: 未找到依赖的本地脚本 {local_file}") |
| return match.group(0) |
| |
| |
| html_content = re.sub(r'<script\b[^>]*?src=["\']([^"\']+)["\'][^>]*></script>', script_replacer, html_content) |
| |
| |
| def css_replacer(match): |
| href_path = match.group(1) |
| if href_path.startswith("http://") or href_path.startswith("https://") or href_path.startswith("//"): |
| return match.group(0) |
| |
| local_file = (base_dir / href_path).resolve() |
| try: |
| with open(local_file, 'r', encoding='utf-8') as f: |
| content = f.read() |
| return f'<style>\n{content}\n</style>' |
| except Exception as e: |
| return match.group(0) |
| |
| |
| html_content = re.sub(r'<link\b[^>]*?rel=["\']stylesheet["\'][^>]*?href=["\']([^"\']+)["\'][^>]*?>', css_replacer, html_content) |
| html_content = re.sub(r'<link\b[^>]*?href=["\']([^"\']+)["\'][^>]*?rel=["\']stylesheet["\'][^>]*?>', css_replacer, html_content) |
|
|
| |
| b64_content = base64.b64encode(html_content.encode('utf-8')).decode('utf-8') |
| return f'<iframe src="data:text/html;base64,{b64_content}" style="width:100%;height:600px;border:none;"></iframe>' |
|
|
| |
| def handle_source_change(source): |
| struct = data_manager.get_dataset_structure() |
| types = list(struct.get('sources', {}).get(source, {}).get('chart_types', {}).keys()) |
| return gr.update(choices=types, value=types[0] if types else None) |
|
|
| def handle_type_change(source, c_type): |
| charts = data_manager.get_chart_list(source, c_type) |
| struct = data_manager.get_dataset_structure() |
| models = struct.get('sources', {}).get(source, {}).get('chart_types', {}).get(c_type, {}).get('models', []) |
| return ( |
| gr.update(choices=charts, value=charts[0] if charts else None), |
| gr.update(choices=models, value=models[0] if models else None) |
| ) |
|
|
| def handle_load(source, c_type, c_id, model): |
| if not all([source, c_type, c_id, model]): |
| return [gr.update()] * 8 |
|
|
| chart_data = data_manager.get_chart_data(source, c_type, c_id) |
| qa_list = data_manager.get_qa_list(source, c_type, model, c_id) |
| stats = data_manager.get_review_stats() |
|
|
| nav_state.sync_paths() |
| for i, p in enumerate(nav_state.all_paths): |
| if p['chart_id'] == c_id and p['model'] == model: |
| nav_state.current_idx = i |
| break |
|
|
| |
| html_code = to_html_frame( |
| chart_data.get('html_content', ''), |
| chart_data.get('html_path', '') |
| ) |
| |
| meta_md = "\n".join([f"- **{k}**: {v}" for k, v in chart_data.get('label_info', {}).items()]) |
| qa_json = json.dumps([{"id": q.id, "q": q.question, "a": q.answer} for q in qa_list]) |
| stats_str = f"✅{stats['correct']} | ❌{stats['incorrect']} | 总{stats['total']}" |
| prog_str = f"{nav_state.current_idx + 1} / {len(nav_state.all_paths)}" |
| radio_choices = [f"Q{i+1}: {q.question[:20]}..." for i, q in enumerate(qa_list)] |
|
|
| return [ |
| html_code, |
| meta_md, |
| qa_json, |
| stats_str, |
| prog_str, |
| f"{source}/{c_type}/{c_id}", |
| gr.update(choices=radio_choices, value=radio_choices[0] if radio_choices else None), |
| json.dumps({}) |
| ] |
|
|
| |
| def handle_qa_select(selection, qa_json): |
| if not selection or not qa_json: |
| return ["", "", "", "正确", "无", ""] |
| try: |
| qas = json.loads(qa_json) |
| idx = int(selection.split(":")[0][1:]) - 1 |
| curr = qas[idx] |
| return [curr['id'], curr['q'], curr['a'], "正确", "无", ""] |
| except: |
| return ["", "", "", "正确", "无", ""] |
|
|
| |
| def create_ui(): |
| with gr.Blocks(title="审核系统 V2", theme=gr.themes.Soft()) as demo: |
| qa_store = gr.State(value="[]") |
| review_store = gr.State(value="{}") |
| |
| gr.Markdown("## 📑 图表问答数据集审核") |
| |
| with gr.Row(): |
| with gr.Column(scale=4): |
| with gr.Row(): |
| src_dd = gr.Dropdown(label="数据源", choices=["None"]) |
| typ_dd = gr.Dropdown(label="图表类型") |
| id_dd = gr.Dropdown(label="图表 ID") |
| mdl_dd = gr.Dropdown(label="类型") |
| |
| chart_view = gr.HTML(value='<div style="height:500px; background:#f0f0f0;"></div>') |
| path_info = gr.Text(label="当前路径", interactive=False) |
| |
| with gr.Column(scale=2): |
| with gr.Group(): |
| stats_txt = gr.Text(label="统计信息", interactive=False) |
| prog_txt = gr.Text(label="审核进度", interactive=False) |
| |
| with gr.Accordion("元数据解析", open=False): |
| meta_md = gr.Markdown() |
| |
| gr.Markdown("---") |
| qa_radio = gr.Radio(label="题目列表", choices=[]) |
| |
| with gr.Group(): |
| curr_qid = gr.Text(visible=False) |
| q_disp = gr.Text(label="问题内容", lines=2) |
| a_disp = gr.Text(label="标准答案") |
| |
| status_opt = gr.Radio( |
| label="审核结论", |
| choices=["正确", "错误", "优化"], |
| value="正确" |
| ) |
| err_type = gr.Dropdown(label="错误分类", choices=["无", "事实错误", "逻辑错误", "图表无法读取"]) |
| |
| comment = gr.Text(label="审核备注") |
| save_status = gr.Text(label="操作反馈", interactive=False) |
| save_btn = gr.Button("💾 提交本题并下一题", variant="primary") |
| |
| with gr.Row(): |
| prev_btn = gr.Button("⬅️ 上一个图表") |
| next_btn = gr.Button("➡️ 下一个图表") |
|
|
| gr.Markdown("---") |
| gr.Markdown("### 📦 记录导出与预览") |
| |
| records_code = gr.Code( |
| label="当前所有审核记录 JSON (光标悬浮至代码块右上角可一键复制)", |
| language="json", |
| interactive=False, |
| lines=15 |
| ) |
|
|
| |
| demo.load( |
| fn=lambda: gr.update(choices=list(data_manager.get_dataset_structure().get('sources', {}).keys())), |
| outputs=[src_dd] |
| ) |
|
|
| src_dd.change(handle_source_change, inputs=[src_dd], outputs=[typ_dd]) |
| typ_dd.change(handle_type_change, inputs=[src_dd, typ_dd], outputs=[id_dd, mdl_dd]) |
|
|
| load_event_outputs = [chart_view, meta_md, qa_store, stats_txt, prog_txt, path_info, qa_radio, review_store] |
| id_dd.change(handle_load, inputs=[src_dd, typ_dd, id_dd, mdl_dd], outputs=load_event_outputs) |
| mdl_dd.change(handle_load, inputs=[src_dd, typ_dd, id_dd, mdl_dd], outputs=load_event_outputs) |
|
|
| qa_radio.change( |
| handle_qa_select, |
| inputs=[qa_radio, qa_store], |
| outputs=[curr_qid, q_disp, a_disp, status_opt, err_type, comment] |
| ) |
|
|
| def navigate(direction): |
| target = nav_state.get_nav_target(direction) |
| if target: |
| return [ |
| gr.update(value=target['source']), |
| gr.update(value=target['chart_type']), |
| gr.update(value=target['chart_id']), |
| gr.update(value=target['model']), |
| gr.update(value="") |
| ] |
| return [gr.update(), gr.update(), gr.update(), gr.update(), gr.update()] |
| |
| prev_btn.click(lambda: navigate(-1), outputs=[src_dd, typ_dd, id_dd, mdl_dd, save_status]) |
| next_btn.click(lambda: navigate(1), outputs=[src_dd, typ_dd, id_dd, mdl_dd, save_status]) |
|
|
| |
| def quick_save_and_next(qid, cid, src, status_label, cmt, current_qa_selection, qa_json_str): |
| if not qid: |
| return "⚠️ 无效操作:未选择题目", gr.update(), gr.update(), gr.update() |
| |
| |
| status_map = { |
| "正确": "correct", |
| "错误": "incorrect", |
| "优化": "needs_modification" |
| } |
| mapped_status = status_map.get(status_label, "correct") |
| |
| data_manager.save_review({ |
| "qa_id": qid, "chart_id": cid, "source": src, |
| "status": mapped_status, "comment": cmt |
| }) |
| |
| |
| stats = data_manager.get_review_stats() |
| stats_str = f"✅{stats['correct']} | ❌{stats['incorrect']} | 总{stats['total']}" |
| feedback = f"✅ 保存成功 (ID: {qid})" |
| |
| |
| all_reviews = data_manager.get_all_reviews() |
| reviews_json_text = json.dumps(all_reviews, ensure_ascii=False, indent=2) |
|
|
| |
| next_qa_update = gr.update() |
| try: |
| qas = json.loads(qa_json_str) |
| |
| curr_q_num = int(current_qa_selection.split(":")[0][1:]) |
| next_idx = curr_q_num |
| |
| if next_idx < len(qas): |
| next_q = qas[next_idx] |
| next_choice = f"Q{next_idx+1}: {next_q['q'][:20]}..." |
| |
| next_qa_update = gr.update(value=next_choice) |
| else: |
| feedback += " (当前图表题目已全部审核完毕)" |
| except Exception as e: |
| pass |
| |
| return feedback, stats_str, next_qa_update, reviews_json_text |
|
|
| |
| save_btn.click( |
| quick_save_and_next, |
| inputs=[curr_qid, id_dd, src_dd, status_opt, comment, qa_radio, qa_store], |
| outputs=[save_status, stats_txt, qa_radio, records_code] |
| ) |
|
|
| |
| demo.load( |
| fn=lambda: json.dumps(data_manager.get_all_reviews(), ensure_ascii=False, indent=2), |
| outputs=[records_code] |
| ) |
|
|
| return demo |
|
|
| if __name__ == "__main__": |
| app = create_ui() |
| app.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| show_api=False, |
| max_threads=10 |
| ) |
|
|