|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import os |
|
|
import datetime |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SCORE_METRIC = "accuracy" |
|
|
|
|
|
|
|
|
|
|
|
GT_FILE_PATH = "test_ground_truth.csv" |
|
|
|
|
|
SUBMISSION_CSV = "submissions.csv" |
|
|
|
|
|
|
|
|
|
|
|
if not os.path.exists(SUBMISSION_CSV): |
|
|
init_df = pd.DataFrame(columns=["hf_username", "score", "submit_time"]) |
|
|
init_df.to_csv(SUBMISSION_CSV, index=False, encoding="utf-8") |
|
|
|
|
|
|
|
|
try: |
|
|
gt_df = pd.read_csv(GT_FILE_PATH) |
|
|
gt_df = gt_df.sort_values("id").reset_index(drop=True) |
|
|
except: |
|
|
gt_df = pd.DataFrame({"id": [], "label": []}) |
|
|
|
|
|
|
|
|
def calculate_accuracy(y_true, y_pred): |
|
|
"""原生Python实现准确率计算,和sklearn accuracy_score完全一致""" |
|
|
correct = sum(1 for t, p in zip(y_true, y_pred) if t == p) |
|
|
total = len(y_true) |
|
|
return correct / total if total > 0 else 0.0 |
|
|
|
|
|
def calculate_f1_macro(y_true, y_pred): |
|
|
"""原生Python实现 宏平均F1分数,和sklearn f1_score(average='macro')完全一致""" |
|
|
unique_labels = list(set(y_true + y_pred)) |
|
|
f1_scores = [] |
|
|
for label in unique_labels: |
|
|
|
|
|
tp = sum(1 for t, p in zip(y_true, y_pred) if t == label and p == label) |
|
|
fp = sum(1 for t, p in zip(y_true, y_pred) if t != label and p == label) |
|
|
fn = sum(1 for t, p in zip(y_true, y_pred) if t == label and p != label) |
|
|
|
|
|
|
|
|
precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 |
|
|
recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 |
|
|
|
|
|
|
|
|
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 |
|
|
f1_scores.append(f1) |
|
|
return sum(f1_scores) / len(f1_scores) if len(f1_scores) > 0 else 0.0 |
|
|
|
|
|
|
|
|
def submit_predictions(hf_username, submit_file): |
|
|
|
|
|
if not hf_username or hf_username.strip() == "": |
|
|
return "❌ 请输入你的HuggingFace账号名!", pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) |
|
|
|
|
|
if submit_file is None: |
|
|
return "❌ 请上传你的预测结果CSV文件!", pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) |
|
|
|
|
|
try: |
|
|
|
|
|
pred_df = pd.read_csv(submit_file.name) |
|
|
if not all(col in pred_df.columns for col in ["id", "label"]): |
|
|
return "❌ 文件格式错误!必须包含 id 和 label 两列!", pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) |
|
|
|
|
|
|
|
|
pred_df = pred_df.sort_values("id").reset_index(drop=True) |
|
|
references = gt_df["label"].tolist() if len(gt_df) > 0 else [0]*len(pred_df) |
|
|
predictions = pred_df["label"].tolist() |
|
|
|
|
|
|
|
|
if SCORE_METRIC == "accuracy": |
|
|
score = calculate_accuracy(references, predictions) |
|
|
else: |
|
|
score = calculate_f1_macro(references, predictions) |
|
|
|
|
|
|
|
|
score = round(score * 100, 4) |
|
|
|
|
|
|
|
|
sub_df = pd.read_csv(SUBMISSION_CSV, encoding="utf-8") |
|
|
user_history = sub_df[sub_df["hf_username"] == hf_username.strip()] |
|
|
|
|
|
if len(user_history) > 0: |
|
|
old_best = user_history["score"].max() |
|
|
if score <= old_best: |
|
|
return f"✅ 提交成功!本次得分:{score},历史最高分:{old_best}(未更新)", get_ranking() |
|
|
|
|
|
sub_df = sub_df[sub_df["hf_username"] != hf_username.strip()] |
|
|
|
|
|
|
|
|
new_row = pd.DataFrame({ |
|
|
"hf_username": [hf_username.strip()], |
|
|
"score": [score], |
|
|
"submit_time": [datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")] |
|
|
}) |
|
|
sub_df = pd.concat([sub_df, new_row], ignore_index=True) |
|
|
sub_df.to_csv(SUBMISSION_CSV, index=False, encoding="utf-8") |
|
|
|
|
|
|
|
|
return f"🎉 提交成功!你的得分:{score} 分(已更新最高分)", get_ranking() |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ 提交失败:{str(e)}", pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) |
|
|
|
|
|
|
|
|
def get_ranking(): |
|
|
if not os.path.exists(SUBMISSION_CSV): |
|
|
return pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) |
|
|
sub_df = pd.read_csv(SUBMISSION_CSV, encoding="utf-8") |
|
|
if len(sub_df) == 0: |
|
|
return pd.DataFrame(columns=["排名", "HF账号", "最终得分"]) |
|
|
|
|
|
rank_df = sub_df.sort_values(by=["score", "submit_time"], ascending=[False, True]).reset_index(drop=True) |
|
|
rank_df["排名"] = range(1, len(rank_df)+1) |
|
|
return rank_df[["排名", "hf_username", "score"]].rename(columns={"hf_username":"HF账号", "score":"最终得分"}) |
|
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown(""" |
|
|
# 🚀 KrisL0 比赛提交系统 (正式版) |
|
|
## ✅ 提交规则 & 要求 |
|
|
1. 输入你的 **HuggingFace账号名** 作为唯一身份标识 |
|
|
2. 仅支持上传 **CSV格式** 的预测结果文件 |
|
|
3. 文件必须包含 **id** 和 **label** 两列,无多余列 |
|
|
4. 每人可多次提交,系统自动保留你的【历史最高分】 |
|
|
5. 评分指标:准确率(Accuracy) | 满分 100 分 |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
hf_name = gr.Textbox(label="📌 你的HF账号名", placeholder="例如:KrisL0", max_lines=1) |
|
|
upload_file = gr.File(label="📁 上传预测CSV文件", file_types=[".csv"]) |
|
|
|
|
|
submit_btn = gr.Button("🚀 提交预测结果", variant="primary") |
|
|
result_text = gr.Textbox(label="✅ 提交结果反馈", interactive=False) |
|
|
ranking_table = gr.DataFrame(label="🏆 实时排行榜", value=get_ranking(), interactive=False) |
|
|
|
|
|
|
|
|
submit_btn.click(submit_predictions, inputs=[hf_name, upload_file], outputs=[result_text, ranking_table]) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue().launch() |