manip / app.py
chrisleng's picture
Update app.py
6257b95 verified
import gradio as gr
import pandas as pd
import numpy as np
import os
import datetime
# ===================== 【仅需改这里,按需切换】 =====================
# 比赛评分指标配置 二选一即可,不用改其他任何地方
# 推荐:分类任务用 accuracy,不平衡数据集/多分类用 f1
SCORE_METRIC = "accuracy" # 准确率 ✅ 推荐新手用这个
# SCORE_METRIC = "f1" # F1分数(宏平均)
# 你的标准答案文件名称,上传csv后改这里,默认即可
GT_FILE_PATH = "test_ground_truth.csv"
# 存储提交记录的文件,自动创建,无需手动上传
SUBMISSION_CSV = "submissions.csv"
# ===================== 初始化配置,无需修改 =====================
# 初始化提交记录表
if not os.path.exists(SUBMISSION_CSV):
init_df = pd.DataFrame(columns=["hf_username", "score", "submit_time"])
init_df.to_csv(SUBMISSION_CSV, index=False, encoding="utf-8")
# 初始化标准答案文件(没上传也能正常启动页面,不影响使用)
try:
gt_df = pd.read_csv(GT_FILE_PATH)
gt_df = gt_df.sort_values("id").reset_index(drop=True)
except:
gt_df = pd.DataFrame({"id": [], "label": []})
# ===================== ✨ 原生Python实现 准确率/ F1计算 ✨ 无任何依赖 ✨ =====================
def calculate_accuracy(y_true, y_pred):
"""原生Python实现准确率计算,和sklearn accuracy_score完全一致"""
correct = sum(1 for t, p in zip(y_true, y_pred) if t == p)
total = len(y_true)
return correct / total if total > 0 else 0.0
def calculate_f1_macro(y_true, y_pred):
"""原生Python实现 宏平均F1分数,和sklearn f1_score(average='macro')完全一致"""
unique_labels = list(set(y_true + y_pred))
f1_scores = []
for label in unique_labels:
# 计算TP/FP/FN
tp = sum(1 for t, p in zip(y_true, y_pred) if t == label and p == label)
fp = sum(1 for t, p in zip(y_true, y_pred) if t != label and p == label)
fn = sum(1 for t, p in zip(y_true, y_pred) if t == label and p != label)
# 避免除零错误
precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
# 计算F1
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0
f1_scores.append(f1)
return sum(f1_scores) / len(f1_scores) if len(f1_scores) > 0 else 0.0
# ===================== 核心提交+评分逻辑 =====================
def submit_predictions(hf_username, submit_file):
# 校验:必须输入HF账号
if not hf_username or hf_username.strip() == "":
return "❌ 请输入你的HuggingFace账号名!", pd.DataFrame(columns=["排名", "HF账号", "最终得分"])
# 校验:必须上传文件
if submit_file is None:
return "❌ 请上传你的预测结果CSV文件!", pd.DataFrame(columns=["排名", "HF账号", "最终得分"])
try:
# 读取并校验文件格式:必须有id和label列
pred_df = pd.read_csv(submit_file.name)
if not all(col in pred_df.columns for col in ["id", "label"]):
return "❌ 文件格式错误!必须包含 id 和 label 两列!", pd.DataFrame(columns=["排名", "HF账号", "最终得分"])
# 按ID排序,防止选手提交的顺序和标准答案不一致,导致评分错误
pred_df = pred_df.sort_values("id").reset_index(drop=True)
references = gt_df["label"].tolist() if len(gt_df) > 0 else [0]*len(pred_df)
predictions = pred_df["label"].tolist()
# 选择评分指标计算分数
if SCORE_METRIC == "accuracy":
score = calculate_accuracy(references, predictions)
else:
score = calculate_f1_macro(references, predictions)
# 转为百分比,保留4位小数,展示更友好
score = round(score * 100, 4)
# 读取历史提交记录,只保留【最高分】核心规则
sub_df = pd.read_csv(SUBMISSION_CSV, encoding="utf-8")
user_history = sub_df[sub_df["hf_username"] == hf_username.strip()]
if len(user_history) > 0:
old_best = user_history["score"].max()
if score <= old_best:
return f"✅ 提交成功!本次得分:{score},历史最高分:{old_best}(未更新)", get_ranking()
# 删除该用户历史记录,只保留最高分
sub_df = sub_df[sub_df["hf_username"] != hf_username.strip()]
# 写入新的提交记录
new_row = pd.DataFrame({
"hf_username": [hf_username.strip()],
"score": [score],
"submit_time": [datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")]
})
sub_df = pd.concat([sub_df, new_row], ignore_index=True)
sub_df.to_csv(SUBMISSION_CSV, index=False, encoding="utf-8")
# 返回成功提示+最新排行榜
return f"🎉 提交成功!你的得分:{score} 分(已更新最高分)", get_ranking()
# 捕获所有异常,防止页面崩溃,给出具体错误信息
except Exception as e:
return f"❌ 提交失败:{str(e)}", pd.DataFrame(columns=["排名", "HF账号", "最终得分"])
# ===================== 生成实时排行榜 =====================
def get_ranking():
if not os.path.exists(SUBMISSION_CSV):
return pd.DataFrame(columns=["排名", "HF账号", "最终得分"])
sub_df = pd.read_csv(SUBMISSION_CSV, encoding="utf-8")
if len(sub_df) == 0:
return pd.DataFrame(columns=["排名", "HF账号", "最终得分"])
# 按分数降序排列,同分按提交时间早的排前面
rank_df = sub_df.sort_values(by=["score", "submit_time"], ascending=[False, True]).reset_index(drop=True)
rank_df["排名"] = range(1, len(rank_df)+1)
return rank_df[["排名", "hf_username", "score"]].rename(columns={"hf_username":"HF账号", "score":"最终得分"})
# ===================== 比赛可视化页面(美观简洁) =====================
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🚀 KrisL0 比赛提交系统 (正式版)
## ✅ 提交规则 & 要求
1. 输入你的 **HuggingFace账号名** 作为唯一身份标识
2. 仅支持上传 **CSV格式** 的预测结果文件
3. 文件必须包含 **id** 和 **label** 两列,无多余列
4. 每人可多次提交,系统自动保留你的【历史最高分】
5. 评分指标:准确率(Accuracy) | 满分 100 分
""")
with gr.Row():
hf_name = gr.Textbox(label="📌 你的HF账号名", placeholder="例如:KrisL0", max_lines=1)
upload_file = gr.File(label="📁 上传预测CSV文件", file_types=[".csv"])
submit_btn = gr.Button("🚀 提交预测结果", variant="primary")
result_text = gr.Textbox(label="✅ 提交结果反馈", interactive=False)
ranking_table = gr.DataFrame(label="🏆 实时排行榜", value=get_ranking(), interactive=False)
# 绑定按钮点击事件
submit_btn.click(submit_predictions, inputs=[hf_name, upload_file], outputs=[result_text, ranking_table])
# 启动应用
if __name__ == "__main__":
demo.queue().launch()