ICBCBench-Leaderboard / tabs /shared_data.py
imlrz's picture
Super-squash branch 'main' using huggingface_hub
f9c0202
Raw
History Blame Contribute Delete
5.5 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Lazy-loaded shared data cache for data viewer tabs.
Loads data_viewer.jsonl once on first access, not at import time.
"""
from __future__ import annotations
import json
import pandas as pd
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
# Bucket mount point (HF Storage Bucket mounted at /data in Space runtime)
_BUCKET_DIR = Path("/data")
# Prefer bucket path if available, fallback to repo-local path
DATA_VIEWER_FILE = (
_BUCKET_DIR / "data_viewer.jsonl"
if (_BUCKET_DIR / "data_viewer.jsonl").exists()
else BASE_DIR / "data" / "data_viewer.jsonl"
)
DATA_VIEWER_INDEX_FILE = BASE_DIR / "data" / "data_viewer_index.json"
_REQUIRED_COLS = [
"model_name", "id", "prompt", "article", "overall_score",
"comprehensiveness_score", "insight_score",
"instruction_following_score", "readability_score",
]
_cache: pd.DataFrame | None = None
_index_cache: dict | None = None
def get_data() -> pd.DataFrame:
global _cache
if _cache is not None:
return _cache
records = []
if DATA_VIEWER_FILE.exists():
with DATA_VIEWER_FILE.open(encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
records.append(json.loads(line))
except json.JSONDecodeError:
continue
df = pd.DataFrame(records)
if df.empty or not all(c in df.columns for c in _REQUIRED_COLS):
_cache = pd.DataFrame(columns=_REQUIRED_COLS)
else:
df["id"] = df["id"].astype(str)
_cache = df
return _cache
def get_index() -> dict:
global _index_cache
if _index_cache is not None:
return _index_cache
if DATA_VIEWER_INDEX_FILE.exists():
try:
_index_cache = json.loads(DATA_VIEWER_INDEX_FILE.read_text(encoding="utf-8"))
return _index_cache
except json.JSONDecodeError:
pass
models = set()
tasks = {}
if DATA_VIEWER_FILE.exists():
with DATA_VIEWER_FILE.open(encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
item = json.loads(line)
except json.JSONDecodeError:
continue
model = item.get("model_name")
item_id = str(item.get("id"))
prompt = item.get("prompt") or ""
if model:
models.add(model)
if item_id and item_id not in tasks:
tasks[item_id] = prompt
_index_cache = {
"models": sorted(models),
"tasks": [
{"id": item_id, "prompt": tasks[item_id]}
for item_id in sorted(tasks, key=lambda value: int(value))
],
}
return _index_cache
def get_entry(model_name: str, item_id: str) -> dict | None:
if not model_name or not item_id or not DATA_VIEWER_FILE.exists():
return None
item_id = str(item_id)
index = get_index()
location = index.get("lookup", {}).get(f"{model_name}\t{item_id}")
if location:
offset, length = location
with DATA_VIEWER_FILE.open("rb") as fh:
fh.seek(offset)
line = fh.read(length).decode("utf-8")
try:
item = json.loads(line)
if item.get("model_name") == model_name and str(item.get("id")) == item_id:
return item
except json.JSONDecodeError:
pass
with DATA_VIEWER_FILE.open(encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
item = json.loads(line)
except json.JSONDecodeError:
continue
if item.get("model_name") == model_name and str(item.get("id")) == item_id:
return item
return None
def get_entries_for_task(item_id: str, model_names: set[str]) -> dict[str, dict]:
if not item_id or not model_names or not DATA_VIEWER_FILE.exists():
return {}
item_id = str(item_id)
index = get_index()
locations = {
model: index.get("lookup", {}).get(f"{model}\t{item_id}")
for model in model_names
}
locations = {model: loc for model, loc in locations.items() if loc}
if locations:
found = {}
with DATA_VIEWER_FILE.open("rb") as fh:
for model, (offset, length) in locations.items():
fh.seek(offset)
try:
item = json.loads(fh.read(length).decode("utf-8"))
if item.get("model_name") == model and str(item.get("id")) == item_id:
found[model] = item
except json.JSONDecodeError:
pass
if len(found) == len(locations):
return found
found = {}
with DATA_VIEWER_FILE.open(encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
item = json.loads(line)
except json.JSONDecodeError:
continue
model = item.get("model_name")
if str(item.get("id")) == item_id and model in model_names:
found[model] = item
if len(found) == len(model_names):
break
return found