Spaces:
Running
Running
| import datetime | |
| import html as html_lib | |
| import json | |
| import os | |
| import re | |
| from typing import Dict, List, Optional | |
| import gradio as gr | |
| from datasets import Dataset, load_dataset | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from content import ( | |
| SUBMISSION_TEXT, | |
| format_error, | |
| format_log, | |
| format_warning, | |
| ) | |
| from evaluator import SimpleEvaluator | |
| TOKEN = os.getenv("HF_TOKEN") or os.getenv("TOKEN") | |
| DATASET_REPO = "RUC-NLPIR/GISA" | |
| RESULTS_REPO = "RUC-NLPIR/GISA-leaderboard" | |
| META_FILE = "encrypted_question.jsonl" | |
| ANSWER_DIR = "answer" | |
| CACHE_DIR = "cache/answers" | |
| SEED_FILE = os.path.join(os.path.dirname(__file__), "seed.json") | |
| ASSETS_DIR = os.path.join(os.path.dirname(__file__), "assets") | |
| INDEX_HTML = os.path.join(ASSETS_DIR, "index.html") | |
| STYLES_CSS = os.path.join(ASSETS_DIR, "styles.css") | |
| SCRIPT_JS = os.path.join(ASSETS_DIR, "script.js") | |
| ALLOWED_TYPES = {"item", "set", "list", "table"} | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| api = HfApi() | |
| evaluator = SimpleEvaluator() | |
| def _extract_username(profile, request: Optional[gr.Request]) -> Optional[str]: | |
| """ | |
| Best-effort extraction of the HF username across Gradio versions. | |
| On Hugging Face Spaces with hf_oauth enabled, Gradio can inject an OAuth profile object | |
| (usually exposing `.username`). Some versions also provide `request.username`. | |
| """ | |
| if profile is not None: | |
| username = getattr(profile, "username", None) | |
| if username: | |
| return str(username) | |
| # Some versions may pass a dict-like profile | |
| if isinstance(profile, dict): | |
| for key in ("username", "preferred_username", "name"): | |
| val = profile.get(key) | |
| if val: | |
| return str(val) | |
| if request is None: | |
| return None | |
| username = getattr(request, "username", None) | |
| if username: | |
| return str(username) | |
| headers = getattr(request, "headers", None) | |
| if not headers: | |
| return None | |
| # Starlette Headers is case-insensitive; also tolerate plain dicts. | |
| for key in ( | |
| "x-forwarded-user", | |
| "x-hf-user", | |
| "x-huggingface-user", | |
| "x-user", | |
| ): | |
| try: | |
| val = headers.get(key) | |
| except Exception: | |
| val = None | |
| if val: | |
| return str(val) | |
| return None | |
| def _safe_float(val): | |
| try: | |
| if val is None: | |
| return None | |
| if isinstance(val, str) and not val.strip(): | |
| return None | |
| return float(val) | |
| except Exception: | |
| return None | |
| def _to_percent(val: Optional[float]) -> float: | |
| if val is None: | |
| return 0.0 | |
| return round(float(val) * 100, 2) | |
| def _load_text(path: str) -> str: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| def load_meta_map() -> Dict[str, str]: | |
| meta_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=META_FILE, | |
| repo_type="dataset", | |
| token=TOKEN, | |
| ) | |
| meta_map: Dict[str, str] = {} | |
| with open(meta_path, "r", encoding="utf-8") as f: | |
| for idx, line in enumerate(f, start=1): | |
| if not line.strip(): | |
| continue | |
| try: | |
| item = json.loads(line) | |
| except json.JSONDecodeError as e: | |
| raise ValueError(f"Invalid JSON in meta.jsonl at line {idx}: {e}") | |
| if "id" not in item or "answer_type" not in item: | |
| raise ValueError(f"meta.jsonl line {idx} missing id/answer_type") | |
| qid = str(item["id"]) | |
| qtype = str(item["answer_type"]).lower().strip() | |
| if qtype not in ALLOWED_TYPES: | |
| raise ValueError(f"Unsupported answer_type '{qtype}' for id {qid}") | |
| meta_map[qid] = qtype | |
| if not meta_map: | |
| raise ValueError("meta.jsonl is empty") | |
| return meta_map | |
| def download_answer(qid: str) -> str: | |
| filename = f"{ANSWER_DIR}/{qid}.csv" | |
| return hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=filename, | |
| repo_type="dataset", | |
| token=TOKEN, | |
| cache_dir=CACHE_DIR, | |
| ) | |
| def load_results_dataset(): | |
| try: | |
| return load_dataset(RESULTS_REPO, split="train", token=TOKEN) | |
| except Exception: | |
| return None | |
| def build_leaderboard_rows() -> List[dict]: | |
| ds = load_results_dataset() | |
| if ds is None or len(ds) == 0: | |
| seed_rows = load_seed_rows() | |
| if not seed_rows: | |
| return [] | |
| return _rows_from_source(seed_rows) | |
| return _rows_from_source(ds) | |
| def _rows_from_source(source) -> List[dict]: | |
| rows: List[dict] = [] | |
| for row in source: | |
| rows.append( | |
| { | |
| "model": row.get("model", "-"), | |
| "org": row.get("org", "-"), | |
| "framework": row.get("framework", "-"), | |
| "date": row.get("date", "-"), | |
| "overall": _safe_float(row.get("overall_em", row.get("overall"))), | |
| "item_em": _safe_float(row.get("item_em")), | |
| "set_em": _safe_float(row.get("set_em")), | |
| "set_f1": _safe_float(row.get("set_f1")), | |
| "list_em": _safe_float(row.get("list_em")), | |
| "list_f1": _safe_float(row.get("list_f1")), | |
| "list_order": _safe_float(row.get("list_order")), | |
| "table_em": _safe_float(row.get("table_em")), | |
| "table_row_f1": _safe_float(row.get("table_row_f1")), | |
| "table_item_f1": _safe_float(row.get("table_item_f1")), | |
| } | |
| ) | |
| return rows | |
| def load_seed_rows() -> List[dict]: | |
| if not os.path.exists(SEED_FILE): | |
| return _load_seed_from_root_script() | |
| try: | |
| with open(SEED_FILE, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return data if isinstance(data, list) else [] | |
| except Exception: | |
| return _load_seed_from_root_script() | |
| def _load_seed_from_root_script() -> List[dict]: | |
| root_script = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "script.js")) | |
| if not os.path.exists(root_script): | |
| return [] | |
| try: | |
| text = _load_text(root_script) | |
| match = re.search(r"const\\s+data\\s*=\\s*(\\[.*?\\]);", text, re.S) | |
| if not match: | |
| return [] | |
| arr_text = match.group(1) | |
| arr_text = re.sub(r"(\\w+)\\s*:", r'\"\\1\":', arr_text) | |
| arr_text = re.sub(r",\\s*([}\\]])", r"\\1", arr_text) | |
| data = json.loads(arr_text) | |
| return data if isinstance(data, list) else [] | |
| except Exception: | |
| return [] | |
| def render_page() -> str: | |
| page = _load_text(INDEX_HTML) | |
| page = page.replace("__LEADERBOARD_DATA__", "") | |
| page = page.replace("__SCRIPT__", "") | |
| return page | |
| def _format_score(val: Optional[float]) -> str: | |
| if val is None: | |
| return "-" | |
| try: | |
| return f"{float(val):.2f}" | |
| except Exception: | |
| return "-" | |
| def _render_leaderboard_rows(data: List[dict]) -> str: | |
| # Render a static table body so the leaderboard is not empty even if client JS | |
| # runs before Gradio mounts the HTML. | |
| if not data: | |
| return '<tr><td colspan="14">No submissions yet.</td></tr>' | |
| # Default sort: Overall desc, then date desc (best-effort). | |
| def _date_key(s: str) -> int: | |
| m = re.match(r"^(\\d{4})-(\\d{2})-(\\d{2})$", str(s or "")) | |
| if not m: | |
| return 0 | |
| return int(m.group(1)) * 10000 + int(m.group(2)) * 100 + int(m.group(3)) | |
| sorted_rows = sorted( | |
| data, | |
| key=lambda r: ( | |
| _safe_float(r.get("overall")) or 0.0, | |
| _date_key(r.get("date")), | |
| ), | |
| reverse=True, | |
| ) | |
| out = [] | |
| for idx, row in enumerate(sorted_rows, start=1): | |
| model = html_lib.escape(str(row.get("model") or "-")) | |
| org = html_lib.escape(str(row.get("org") or "-")) | |
| framework = html_lib.escape(str(row.get("framework") or "-")) | |
| date = html_lib.escape(str(row.get("date") or "-")) | |
| out.append( | |
| "\n".join( | |
| [ | |
| "<tr>", | |
| f" <td>{idx}</td>", | |
| ' <td class="model-cell">', | |
| f' <div class="model-name">{model}</div>', | |
| f' <div class="model-org">{org}</div>', | |
| " </td>", | |
| f" <td>{framework}</td>", | |
| f" <td>{date}</td>", | |
| f' <td class="highlight-em">{_format_score(_safe_float(row.get("overall")))}</td>', | |
| f" <td>{_format_score(_safe_float(row.get('item_em')))}</td>", | |
| f" <td>{_format_score(_safe_float(row.get('set_em')))}</td>", | |
| f" <td>{_format_score(_safe_float(row.get('set_f1')))}</td>", | |
| f" <td>{_format_score(_safe_float(row.get('list_em')))}</td>", | |
| f" <td>{_format_score(_safe_float(row.get('list_f1')))}</td>", | |
| f" <td>{_format_score(_safe_float(row.get('list_order')))}</td>", | |
| f" <td>{_format_score(_safe_float(row.get('table_em')))}</td>", | |
| f" <td>{_format_score(_safe_float(row.get('table_row_f1')))}</td>", | |
| f" <td>{_format_score(_safe_float(row.get('table_item_f1')))}</td>", | |
| "</tr>", | |
| ] | |
| ) | |
| ) | |
| return "\n".join(out) | |
| def build_js(data: List[dict]) -> str: | |
| script = _load_text(SCRIPT_JS) | |
| data_json = json.dumps(data, ensure_ascii=False) | |
| return f"window.LEADERBOARD_DATA = {data_json};\n" + script | |
| def ensure_results_repo(): | |
| if not TOKEN: | |
| return | |
| api.create_repo( | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| private=False, | |
| exist_ok=True, | |
| token=TOKEN, | |
| ) | |
| def seed_results_if_needed(): | |
| seed_rows = load_seed_rows() | |
| if not seed_rows: | |
| return | |
| ds = load_results_dataset() | |
| if ds is not None and len(ds) > 0: | |
| return | |
| if not TOKEN: | |
| return | |
| entries = [] | |
| for row in seed_rows: | |
| entries.append( | |
| { | |
| "model": row.get("model", "-"), | |
| "org": row.get("org", "-"), | |
| "framework": row.get("framework", "N/A"), | |
| "date": row.get("date", "-"), | |
| "overall_em": _safe_float(row.get("overall")), | |
| "item_em": _safe_float(row.get("item_em")), | |
| "set_em": _safe_float(row.get("set_em")), | |
| "set_f1": _safe_float(row.get("set_f1")), | |
| "list_em": _safe_float(row.get("list_em")), | |
| "list_f1": _safe_float(row.get("list_f1")), | |
| "list_order": _safe_float(row.get("list_order")), | |
| "table_em": _safe_float(row.get("table_em")), | |
| "table_row_f1": _safe_float(row.get("table_row_f1")), | |
| "table_item_f1": _safe_float(row.get("table_item_f1")), | |
| "url": row.get("url", ""), | |
| "email": row.get("email", ""), | |
| "username": row.get("username", "seed"), | |
| } | |
| ) | |
| try: | |
| ensure_results_repo() | |
| Dataset.from_list(entries).push_to_hub(RESULTS_REPO, token=TOKEN) | |
| except Exception: | |
| pass | |
| def _get_metric(summary: dict, qtype: str, key: str, fallback: float = 0.0) -> float: | |
| return float(summary.get(qtype, {}).get(key, fallback) or 0.0) | |
| def compute_leaderboard_metrics(score_list: List[dict]) -> Dict[str, float]: | |
| summary = evaluator.gather_results(score_list) | |
| overall_em = _to_percent(summary.get("overall_global_em", 0.0)) | |
| item_em = _to_percent( | |
| _get_metric(summary, "item", "overall_item_em", _get_metric(summary, "item", "overall_global_em")) | |
| ) | |
| set_em = _to_percent(_get_metric(summary, "set", "overall_global_em")) | |
| set_f1 = _to_percent(_get_metric(summary, "set", "overall_set_f1")) | |
| list_em = _to_percent(_get_metric(summary, "list", "overall_global_em")) | |
| list_f1 = _to_percent(_get_metric(summary, "list", "overall_list_content_f1")) | |
| list_order = _to_percent(_get_metric(summary, "list", "overall_list_order_score")) | |
| table_em = _to_percent(_get_metric(summary, "table", "overall_global_em")) | |
| table_row_f1 = _to_percent(_get_metric(summary, "table", "overall_table_row_f1")) | |
| table_item_f1 = _to_percent(_get_metric(summary, "table", "overall_table_item_f1")) | |
| return { | |
| "overall_em": overall_em, | |
| "item_em": item_em, | |
| "set_em": set_em, | |
| "set_f1": set_f1, | |
| "list_em": list_em, | |
| "list_f1": list_f1, | |
| "list_order": list_order, | |
| "table_em": table_em, | |
| "table_row_f1": table_row_f1, | |
| "table_item_f1": table_item_f1, | |
| } | |
| def parse_jsonl(file_path: str) -> Dict[str, str]: | |
| preds: Dict[str, str] = {} | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| for idx, line in enumerate(f, start=1): | |
| if not line.strip(): | |
| continue | |
| try: | |
| item = json.loads(line) | |
| except json.JSONDecodeError as e: | |
| raise ValueError(f"Line {idx} is not valid JSON: {e}") | |
| if "id" not in item or "prediction" not in item: | |
| raise ValueError(f"Line {idx} must contain 'id' and 'prediction'") | |
| qid = str(item["id"]) | |
| if qid in preds: | |
| raise ValueError(f"Duplicate id: {qid}") | |
| preds[qid] = item["prediction"] | |
| if not preds: | |
| raise ValueError("Empty submission file") | |
| return preds | |
| def add_new_eval( | |
| model: str, | |
| org: str, | |
| framework: str, | |
| url: str, | |
| email: str, | |
| file_obj, | |
| profile: Optional[gr.OAuthProfile] = None, | |
| request: gr.Request = None, | |
| ): | |
| if not TOKEN: | |
| return format_error("Server misconfigured: HF_TOKEN is missing.") | |
| username = _extract_username(profile, request) | |
| if not username: | |
| return format_warning("Please log in with HuggingFace to submit.") | |
| if not model or not org: | |
| return format_warning("Please provide model name and organization.") | |
| if file_obj is None: | |
| return format_warning("Please upload a JSONL file.") | |
| today = datetime.date.today().isoformat() | |
| try: | |
| ds = load_results_dataset() | |
| if ds is not None: | |
| for row in ds: | |
| if row.get("username") == username and row.get("date") == today: | |
| return format_warning("You already submitted today. Please try again tomorrow.") | |
| except Exception: | |
| return format_error("Failed to load leaderboard results. Please try again later.") | |
| try: | |
| meta_map = load_meta_map() | |
| except Exception as e: | |
| return format_error(f"Failed to load meta.jsonl: {e}") | |
| try: | |
| preds = parse_jsonl(file_obj.name) | |
| except Exception as e: | |
| return format_error(str(e)) | |
| pred_ids = set(preds.keys()) | |
| meta_ids = set(meta_map.keys()) | |
| extra = sorted(pred_ids - meta_ids) | |
| missing = sorted(meta_ids - pred_ids) | |
| if extra: | |
| return format_error(f"Submission has {len(extra)} unknown ids (e.g., {extra[0]}).") | |
| if missing: | |
| return format_error(f"Submission missing {len(missing)} ids (e.g., {missing[0]}).") | |
| score_list: List[dict] = [] | |
| for qid, prediction in preds.items(): | |
| gt_path = download_answer(qid) | |
| qtype = meta_map[qid] | |
| metrics = evaluator.evaluate_one( | |
| prediction=str(prediction), | |
| gt_path=gt_path, | |
| question_type=qtype, | |
| qid=qid, | |
| ) | |
| score_list.append(metrics) | |
| metrics = compute_leaderboard_metrics(score_list) | |
| entry = { | |
| "model": model, | |
| "org": org, | |
| "framework": framework or "N/A", | |
| "url": url or "", | |
| "email": email or "", | |
| "username": username, | |
| "date": today, | |
| **metrics, | |
| } | |
| try: | |
| ensure_results_repo() | |
| if ds is None: | |
| Dataset.from_list([entry]).push_to_hub(RESULTS_REPO, token=TOKEN) | |
| else: | |
| ds = ds.add_item(entry) | |
| ds.push_to_hub(RESULTS_REPO, token=TOKEN) | |
| except Exception: | |
| return format_error("Failed to save results. Please contact the maintainers.") | |
| return format_log("Submission received! Please refresh the leaderboard to see your score.") | |
| seed_results_if_needed() | |
| leaderboard_data = build_leaderboard_rows() | |
| css = _load_text(STYLES_CSS) | |
| page_html = render_page() | |
| rows_html = _render_leaderboard_rows(leaderboard_data) | |
| page_html = page_html.replace( | |
| '<tbody id="leaderboard-body"></tbody>', | |
| f'<tbody id="leaderboard-body">{rows_html}</tbody>', | |
| ) | |
| js = build_js(leaderboard_data) | |
| with gr.Blocks() as demo: | |
| gr.HTML(page_html) | |
| with gr.Accordion("Submit your results", open=True): | |
| gr.Markdown(SUBMISSION_TEXT) | |
| with gr.Row(): | |
| with gr.Column(): | |
| model_text = gr.Textbox(label="Model / System") | |
| org_text = gr.Textbox(label="Organization") | |
| framework_text = gr.Textbox(label="Framework", value="ReAct") | |
| url_text = gr.Textbox(label="Model URL", placeholder="Optional") | |
| with gr.Column(): | |
| email_text = gr.Textbox(label="Contact email (public)") | |
| file_input = gr.File(label="Upload JSONL") | |
| with gr.Row(): | |
| login_btn = gr.LoginButton() | |
| submit_btn = gr.Button("Submit") | |
| result_md = gr.Markdown() | |
| submit_btn.click( | |
| add_new_eval, | |
| inputs=[ | |
| model_text, | |
| org_text, | |
| framework_text, | |
| url_text, | |
| email_text, | |
| file_input, | |
| ], | |
| outputs=result_md, | |
| ) | |
| def _launch(): | |
| demo.queue() | |
| demo.launch(css=css, js=js, ssr_mode=False) | |
| if __name__ == "__main__": | |
| _launch() | |