import gradio as gr import pandas as pd import numpy as np import os import datetime # ===================== 【仅需改这里,按需切换】 ===================== # 比赛评分指标配置 二选一即可,不用改其他任何地方 # 推荐:分类任务用 accuracy,不平衡数据集/多分类用 f1 SCORE_METRIC = "accuracy" # 准确率 ✅ 推荐新手用这个 # SCORE_METRIC = "f1" # F1分数(宏平均) # 你的标准答案文件名称,上传csv后改这里,默认即可 GT_FILE_PATH = "test_ground_truth.csv" # 存储提交记录的文件,自动创建,无需手动上传 SUBMISSION_CSV = "submissions.csv" # ===================== 初始化配置,无需修改 ===================== # 初始化提交记录表 if not os.path.exists(SUBMISSION_CSV): init_df = pd.DataFrame(columns=["hf_username", "score", "submit_time"]) init_df.to_csv(SUBMISSION_CSV, index=False, encoding="utf-8") # 初始化标准答案文件(没上传也能正常启动页面,不影响使用) try: gt_df = pd.read_csv(GT_FILE_PATH) gt_df = gt_df.sort_values("id").reset_index(drop=True) except: gt_df = pd.DataFrame({"id": [], "label": []}) # ===================== ✨ 原生Python实现 准确率/ F1计算 ✨ 无任何依赖 ✨ ===================== def calculate_accuracy(y_true, y_pred): """原生Python实现准确率计算,和sklearn accuracy_score完全一致""" correct = sum(1 for t, p in zip(y_true, y_pred) if t == p) total = len(y_true) return correct / total if total > 0 else 0.0 def calculate_f1_macro(y_true, y_pred): """原生Python实现 宏平均F1分数,和sklearn f1_score(average='macro')完全一致""" unique_labels = list(set(y_true + y_pred)) f1_scores = [] for label in unique_labels: # 计算TP/FP/FN tp = sum(1 for t, p in zip(y_true, y_pred) if t == label and p == label) fp = sum(1 for t, p in zip(y_true, y_pred) if t != label and p == label) fn = sum(1 for t, p in zip(y_true, y_pred) if t == label and p != label) # 避免除零错误 precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 # 计算F1 f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 f1_scores.append(f1) return sum(f1_scores) / len(f1_scores) if len(f1_scores) > 0 else 0.0 # ===================== 核心提交+评分逻辑 ===================== def submit_predictions(hf_username, submit_file): # 校验:必须输入HF账号 if not hf_username or hf_username.strip() == "": return "❌ 请输入你的HuggingFace账号名!", pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) # 校验:必须上传文件 if submit_file is None: return "❌ 请上传你的预测结果CSV文件!", pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) try: # 读取并校验文件格式:必须有id和label列 pred_df = pd.read_csv(submit_file.name) if not all(col in pred_df.columns for col in ["id", "label"]): return "❌ 文件格式错误!必须包含 id 和 label 两列!", pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) # 按ID排序,防止选手提交的顺序和标准答案不一致,导致评分错误 pred_df = pred_df.sort_values("id").reset_index(drop=True) references = gt_df["label"].tolist() if len(gt_df) > 0 else [0]*len(pred_df) predictions = pred_df["label"].tolist() # 选择评分指标计算分数 if SCORE_METRIC == "accuracy": score = calculate_accuracy(references, predictions) else: score = calculate_f1_macro(references, predictions) # 转为百分比,保留4位小数,展示更友好 score = round(score * 100, 4) # 读取历史提交记录,只保留【最高分】核心规则 sub_df = pd.read_csv(SUBMISSION_CSV, encoding="utf-8") user_history = sub_df[sub_df["hf_username"] == hf_username.strip()] if len(user_history) > 0: old_best = user_history["score"].max() if score <= old_best: return f"✅ 提交成功!本次得分:{score},历史最高分:{old_best}(未更新)", get_ranking() # 删除该用户历史记录,只保留最高分 sub_df = sub_df[sub_df["hf_username"] != hf_username.strip()] # 写入新的提交记录 new_row = pd.DataFrame({ "hf_username": [hf_username.strip()], "score": [score], "submit_time": [datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")] }) sub_df = pd.concat([sub_df, new_row], ignore_index=True) sub_df.to_csv(SUBMISSION_CSV, index=False, encoding="utf-8") # 返回成功提示+最新排行榜 return f"🎉 提交成功!你的得分:{score} 分(已更新最高分)", get_ranking() # 捕获所有异常,防止页面崩溃,给出具体错误信息 except Exception as e: return f"❌ 提交失败:{str(e)}", pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) # ===================== 生成实时排行榜 ===================== def get_ranking(): if not os.path.exists(SUBMISSION_CSV): return pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) sub_df = pd.read_csv(SUBMISSION_CSV, encoding="utf-8") if len(sub_df) == 0: return pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) # 按分数降序排列,同分按提交时间早的排前面 rank_df = sub_df.sort_values(by=["score", "submit_time"], ascending=[False, True]).reset_index(drop=True) rank_df["排名"] = range(1, len(rank_df)+1) return rank_df[["排名", "hf_username", "score"]].rename(columns={"hf_username":"HF账号", "score":"最终得分"}) # ===================== 比赛可视化页面(美观简洁) ===================== with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🚀 KrisL0 比赛提交系统 (正式版) ## ✅ 提交规则 & 要求 1. 输入你的 **HuggingFace账号名** 作为唯一身份标识 2. 仅支持上传 **CSV格式** 的预测结果文件 3. 文件必须包含 **id** 和 **label** 两列,无多余列 4. 每人可多次提交,系统自动保留你的【历史最高分】 5. 评分指标:准确率(Accuracy) | 满分 100 分 """) with gr.Row(): hf_name = gr.Textbox(label="📌 你的HF账号名", placeholder="例如:KrisL0", max_lines=1) upload_file = gr.File(label="📁 上传预测CSV文件", file_types=[".csv"]) submit_btn = gr.Button("🚀 提交预测结果", variant="primary") result_text = gr.Textbox(label="✅ 提交结果反馈", interactive=False) ranking_table = gr.DataFrame(label="🏆 实时排行榜", value=get_ranking(), interactive=False) # 绑定按钮点击事件 submit_btn.click(submit_predictions, inputs=[hf_name, upload_file], outputs=[result_text, ranking_table]) # 启动应用 if __name__ == "__main__": demo.queue().launch()