#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Lazy-loaded shared data cache for data viewer tabs. Loads data_viewer.jsonl once on first access, not at import time. """ from __future__ import annotations import json import pandas as pd from pathlib import Path BASE_DIR = Path(__file__).resolve().parent.parent # Bucket mount point (HF Storage Bucket mounted at /data in Space runtime) _BUCKET_DIR = Path("/data") # Prefer bucket path if available, fallback to repo-local path DATA_VIEWER_FILE = ( _BUCKET_DIR / "data_viewer.jsonl" if (_BUCKET_DIR / "data_viewer.jsonl").exists() else BASE_DIR / "data" / "data_viewer.jsonl" ) DATA_VIEWER_INDEX_FILE = BASE_DIR / "data" / "data_viewer_index.json" _REQUIRED_COLS = [ "model_name", "id", "prompt", "article", "overall_score", "comprehensiveness_score", "insight_score", "instruction_following_score", "readability_score", ] _cache: pd.DataFrame | None = None _index_cache: dict | None = None def get_data() -> pd.DataFrame: global _cache if _cache is not None: return _cache records = [] if DATA_VIEWER_FILE.exists(): with DATA_VIEWER_FILE.open(encoding="utf-8") as fh: for line in fh: line = line.strip() if not line: continue try: records.append(json.loads(line)) except json.JSONDecodeError: continue df = pd.DataFrame(records) if df.empty or not all(c in df.columns for c in _REQUIRED_COLS): _cache = pd.DataFrame(columns=_REQUIRED_COLS) else: df["id"] = df["id"].astype(str) _cache = df return _cache def get_index() -> dict: global _index_cache if _index_cache is not None: return _index_cache if DATA_VIEWER_INDEX_FILE.exists(): try: _index_cache = json.loads(DATA_VIEWER_INDEX_FILE.read_text(encoding="utf-8")) return _index_cache except json.JSONDecodeError: pass models = set() tasks = {} if DATA_VIEWER_FILE.exists(): with DATA_VIEWER_FILE.open(encoding="utf-8") as fh: for line in fh: if not line.strip(): continue try: item = json.loads(line) except json.JSONDecodeError: continue model = item.get("model_name") item_id = str(item.get("id")) prompt = item.get("prompt") or "" if model: models.add(model) if item_id and item_id not in tasks: tasks[item_id] = prompt _index_cache = { "models": sorted(models), "tasks": [ {"id": item_id, "prompt": tasks[item_id]} for item_id in sorted(tasks, key=lambda value: int(value)) ], } return _index_cache def get_entry(model_name: str, item_id: str) -> dict | None: if not model_name or not item_id or not DATA_VIEWER_FILE.exists(): return None item_id = str(item_id) index = get_index() location = index.get("lookup", {}).get(f"{model_name}\t{item_id}") if location: offset, length = location with DATA_VIEWER_FILE.open("rb") as fh: fh.seek(offset) line = fh.read(length).decode("utf-8") try: item = json.loads(line) if item.get("model_name") == model_name and str(item.get("id")) == item_id: return item except json.JSONDecodeError: pass with DATA_VIEWER_FILE.open(encoding="utf-8") as fh: for line in fh: if not line.strip(): continue try: item = json.loads(line) except json.JSONDecodeError: continue if item.get("model_name") == model_name and str(item.get("id")) == item_id: return item return None def get_entries_for_task(item_id: str, model_names: set[str]) -> dict[str, dict]: if not item_id or not model_names or not DATA_VIEWER_FILE.exists(): return {} item_id = str(item_id) index = get_index() locations = { model: index.get("lookup", {}).get(f"{model}\t{item_id}") for model in model_names } locations = {model: loc for model, loc in locations.items() if loc} if locations: found = {} with DATA_VIEWER_FILE.open("rb") as fh: for model, (offset, length) in locations.items(): fh.seek(offset) try: item = json.loads(fh.read(length).decode("utf-8")) if item.get("model_name") == model and str(item.get("id")) == item_id: found[model] = item except json.JSONDecodeError: pass if len(found) == len(locations): return found found = {} with DATA_VIEWER_FILE.open(encoding="utf-8") as fh: for line in fh: if not line.strip(): continue try: item = json.loads(line) except json.JSONDecodeError: continue model = item.get("model_name") if str(item.get("id")) == item_id and model in model_names: found[model] = item if len(found) == len(model_names): break return found