Chart / app.py
Pekku's picture
Upload folder using huggingface_hub
9a93f1f verified
import gradio as gr
import json
import base64
import re # 【新增】用于正则匹配本地脚本路径
from pathlib import Path # 【新增】用于处理相对路径
from data_manager import data_manager
# ============== 状态管理 (封装逻辑) ==============
class ReviewState:
def __init__(self):
self.all_paths = []
self.current_idx = -1
def sync_paths(self):
self.all_paths = data_manager.get_all_chart_paths()
def get_nav_target(self, direction):
new_idx = self.current_idx + direction
if 0 <= new_idx < len(self.all_paths):
self.current_idx = new_idx
return self.all_paths[new_idx]
return None
nav_state = ReviewState()
# ============== 工具函数 (动态内联注入升级版) ==============
def to_html_frame(html_content, html_path):
if not html_content or not html_path:
return '<div style="padding:20px;text-align:center;">请选择数据进行加载</div>'
# 获取当前 html 文件所在的本地绝对路径文件夹
base_dir = Path(html_path).parent
# 1. 动态内联本地 JS 文件
def script_replacer(match):
src_path = match.group(1)
# 如果已经是互联网 CDN 链接,则跳过
if src_path.startswith("http://") or src_path.startswith("https://") or src_path.startswith("//"):
return match.group(0)
# 拼接并解析出本地 JS 文件的真实路径
local_file = (base_dir / src_path).resolve()
try:
with open(local_file, 'r', encoding='utf-8') as f:
content = f.read()
# 关键替换:防止 JS 源码中的 </script> 导致 HTML 提前闭合崩溃
content = content.replace('</script>', '<\\/script>')
return f'<script>\n{content}\n</script>'
except Exception as e:
print(f"Warning: 未找到依赖的本地脚本 {local_file}")
return match.group(0) # 失败则保持原样
# 正则匹配所有 <script src="..."></script> 标签
html_content = re.sub(r'<script\b[^>]*?src=["\']([^"\']+)["\'][^>]*></script>', script_replacer, html_content)
# 2. 动态内联本地 CSS 文件
def css_replacer(match):
href_path = match.group(1)
if href_path.startswith("http://") or href_path.startswith("https://") or href_path.startswith("//"):
return match.group(0)
local_file = (base_dir / href_path).resolve()
try:
with open(local_file, 'r', encoding='utf-8') as f:
content = f.read()
return f'<style>\n{content}\n</style>'
except Exception as e:
return match.group(0)
# 匹配 <link rel="stylesheet" href="...">
html_content = re.sub(r'<link\b[^>]*?rel=["\']stylesheet["\'][^>]*?href=["\']([^"\']+)["\'][^>]*?>', css_replacer, html_content)
html_content = re.sub(r'<link\b[^>]*?href=["\']([^"\']+)["\'][^>]*?rel=["\']stylesheet["\'][^>]*?>', css_replacer, html_content)
# 3. 转换为最终的自我包裹式 Base64 Iframe
b64_content = base64.b64encode(html_content.encode('utf-8')).decode('utf-8')
return f'<iframe src="data:text/html;base64,{b64_content}" style="width:100%;height:600px;border:none;"></iframe>'
# ============== 交互逻辑 ==============
def handle_source_change(source):
struct = data_manager.get_dataset_structure()
types = list(struct.get('sources', {}).get(source, {}).get('chart_types', {}).keys())
return gr.update(choices=types, value=types[0] if types else None)
def handle_type_change(source, c_type):
charts = data_manager.get_chart_list(source, c_type)
struct = data_manager.get_dataset_structure()
models = struct.get('sources', {}).get(source, {}).get('chart_types', {}).get(c_type, {}).get('models', [])
return (
gr.update(choices=charts, value=charts[0] if charts else None),
gr.update(choices=models, value=models[0] if models else None)
)
def handle_load(source, c_type, c_id, model):
if not all([source, c_type, c_id, model]):
return [gr.update()] * 8
chart_data = data_manager.get_chart_data(source, c_type, c_id)
qa_list = data_manager.get_qa_list(source, c_type, model, c_id)
stats = data_manager.get_review_stats()
nav_state.sync_paths()
for i, p in enumerate(nav_state.all_paths):
if p['chart_id'] == c_id and p['model'] == model:
nav_state.current_idx = i
break
# 【传入参数升级】传入 html_path 用于定位本地依赖
html_code = to_html_frame(
chart_data.get('html_content', ''),
chart_data.get('html_path', '')
)
meta_md = "\n".join([f"- **{k}**: {v}" for k, v in chart_data.get('label_info', {}).items()])
qa_json = json.dumps([{"id": q.id, "q": q.question, "a": q.answer} for q in qa_list])
stats_str = f"✅{stats['correct']} | ❌{stats['incorrect']} | 总{stats['total']}"
prog_str = f"{nav_state.current_idx + 1} / {len(nav_state.all_paths)}"
radio_choices = [f"Q{i+1}: {q.question[:20]}..." for i, q in enumerate(qa_list)]
return [
html_code,
meta_md,
qa_json,
stats_str,
prog_str,
f"{source}/{c_type}/{c_id}",
gr.update(choices=radio_choices, value=radio_choices[0] if radio_choices else None),
json.dumps({})
]
# 严格匹配 6 个 outputs 数量
def handle_qa_select(selection, qa_json):
if not selection or not qa_json:
return ["", "", "", "正确", "无", ""]
try:
qas = json.loads(qa_json)
idx = int(selection.split(":")[0][1:]) - 1
curr = qas[idx]
return [curr['id'], curr['q'], curr['a'], "正确", "无", ""]
except:
return ["", "", "", "正确", "无", ""]
# ============== UI 布局 ==============
def create_ui():
with gr.Blocks(title="审核系统 V2", theme=gr.themes.Soft()) as demo:
qa_store = gr.State(value="[]")
review_store = gr.State(value="{}")
gr.Markdown("## 📑 图表问答数据集审核")
with gr.Row():
with gr.Column(scale=4):
with gr.Row():
src_dd = gr.Dropdown(label="数据源", choices=["None"])
typ_dd = gr.Dropdown(label="图表类型")
id_dd = gr.Dropdown(label="图表 ID")
mdl_dd = gr.Dropdown(label="类型")
chart_view = gr.HTML(value='<div style="height:500px; background:#f0f0f0;"></div>')
path_info = gr.Text(label="当前路径", interactive=False)
with gr.Column(scale=2):
with gr.Group():
stats_txt = gr.Text(label="统计信息", interactive=False)
prog_txt = gr.Text(label="审核进度", interactive=False)
with gr.Accordion("元数据解析", open=False):
meta_md = gr.Markdown()
gr.Markdown("---")
qa_radio = gr.Radio(label="题目列表", choices=[])
with gr.Group():
curr_qid = gr.Text(visible=False)
q_disp = gr.Text(label="问题内容", lines=2)
a_disp = gr.Text(label="标准答案")
status_opt = gr.Radio(
label="审核结论",
choices=["正确", "错误", "优化"],
value="正确"
)
err_type = gr.Dropdown(label="错误分类", choices=["无", "事实错误", "逻辑错误", "图表无法读取"])
comment = gr.Text(label="审核备注")
save_status = gr.Text(label="操作反馈", interactive=False)
save_btn = gr.Button("💾 提交本题并下一题", variant="primary")
with gr.Row():
prev_btn = gr.Button("⬅️ 上一个图表")
next_btn = gr.Button("➡️ 下一个图表")
gr.Markdown("---")
gr.Markdown("### 📦 记录导出与预览")
# 【新增替代方案】使用 gr.Code 展示所有记录,自带右上角复制按钮
records_code = gr.Code(
label="当前所有审核记录 JSON (光标悬浮至代码块右上角可一键复制)",
language="json",
interactive=False,
lines=15
)
# --- 事件绑定 ---
demo.load(
fn=lambda: gr.update(choices=list(data_manager.get_dataset_structure().get('sources', {}).keys())),
outputs=[src_dd]
)
src_dd.change(handle_source_change, inputs=[src_dd], outputs=[typ_dd])
typ_dd.change(handle_type_change, inputs=[src_dd, typ_dd], outputs=[id_dd, mdl_dd])
load_event_outputs = [chart_view, meta_md, qa_store, stats_txt, prog_txt, path_info, qa_radio, review_store]
id_dd.change(handle_load, inputs=[src_dd, typ_dd, id_dd, mdl_dd], outputs=load_event_outputs)
mdl_dd.change(handle_load, inputs=[src_dd, typ_dd, id_dd, mdl_dd], outputs=load_event_outputs)
qa_radio.change(
handle_qa_select,
inputs=[qa_radio, qa_store],
outputs=[curr_qid, q_disp, a_disp, status_opt, err_type, comment]
)
def navigate(direction):
target = nav_state.get_nav_target(direction)
if target:
return [
gr.update(value=target['source']),
gr.update(value=target['chart_type']),
gr.update(value=target['chart_id']),
gr.update(value=target['model']),
gr.update(value="")
]
return [gr.update(), gr.update(), gr.update(), gr.update(), gr.update()]
prev_btn.click(lambda: navigate(-1), outputs=[src_dd, typ_dd, id_dd, mdl_dd, save_status])
next_btn.click(lambda: navigate(1), outputs=[src_dd, typ_dd, id_dd, mdl_dd, save_status])
# 【核心逻辑升级】保存 + 自动跳转 + 刷新记录展示
def quick_save_and_next(qid, cid, src, status_label, cmt, current_qa_selection, qa_json_str):
if not qid:
return "⚠️ 无效操作:未选择题目", gr.update(), gr.update(), gr.update()
# 1. 状态映射与保存
status_map = {
"正确": "correct",
"错误": "incorrect",
"优化": "needs_modification"
}
mapped_status = status_map.get(status_label, "correct")
data_manager.save_review({
"qa_id": qid, "chart_id": cid, "source": src,
"status": mapped_status, "comment": cmt
})
# 2. 刷新统计
stats = data_manager.get_review_stats()
stats_str = f"✅{stats['correct']} | ❌{stats['incorrect']} | 总{stats['total']}"
feedback = f"✅ 保存成功 (ID: {qid})"
# 3. 提取所有记录并格式化为 JSON 字符串给代码块展示
all_reviews = data_manager.get_all_reviews()
reviews_json_text = json.dumps(all_reviews, ensure_ascii=False, indent=2)
# 4. 自动跳转下一题逻辑
next_qa_update = gr.update()
try:
qas = json.loads(qa_json_str)
# 解析当前选中的题号,比如 "Q1: xxx..." 提取数字 1
curr_q_num = int(current_qa_selection.split(":")[0][1:])
next_idx = curr_q_num # 下一题的索引正好等于当前题号(基于0的索引)
if next_idx < len(qas):
next_q = qas[next_idx]
next_choice = f"Q{next_idx+1}: {next_q['q'][:20]}..."
# 通知 Radio 组件切换到下一题,这会自动触发 qa_radio.change 事件刷新题目内容
next_qa_update = gr.update(value=next_choice)
else:
feedback += " (当前图表题目已全部审核完毕)"
except Exception as e:
pass
return feedback, stats_str, next_qa_update, reviews_json_text
# 绑定升级后的保存事件
save_btn.click(
quick_save_and_next,
inputs=[curr_qid, id_dd, src_dd, status_opt, comment, qa_radio, qa_store],
outputs=[save_status, stats_txt, qa_radio, records_code]
)
# 初始化时也加载一次记录
demo.load(
fn=lambda: json.dumps(data_manager.get_all_reviews(), ensure_ascii=False, indent=2),
outputs=[records_code]
)
return demo
if __name__ == "__main__":
app = create_ui()
app.launch(
server_name="0.0.0.0",
server_port=7860,
show_api=False,
max_threads=10
)