| |
| |
| """ |
| Lazy-loaded shared data cache for data viewer tabs. |
| Loads data_viewer.jsonl once on first access, not at import time. |
| """ |
|
|
| from __future__ import annotations |
| import json |
| import pandas as pd |
| from pathlib import Path |
|
|
| BASE_DIR = Path(__file__).resolve().parent.parent |
|
|
| |
| _BUCKET_DIR = Path("/data") |
| |
| DATA_VIEWER_FILE = ( |
| _BUCKET_DIR / "data_viewer.jsonl" |
| if (_BUCKET_DIR / "data_viewer.jsonl").exists() |
| else BASE_DIR / "data" / "data_viewer.jsonl" |
| ) |
| DATA_VIEWER_INDEX_FILE = BASE_DIR / "data" / "data_viewer_index.json" |
|
|
| _REQUIRED_COLS = [ |
| "model_name", "id", "prompt", "article", "overall_score", |
| "comprehensiveness_score", "insight_score", |
| "instruction_following_score", "readability_score", |
| ] |
|
|
| _cache: pd.DataFrame | None = None |
| _index_cache: dict | None = None |
|
|
|
|
| def get_data() -> pd.DataFrame: |
| global _cache |
| if _cache is not None: |
| return _cache |
|
|
| records = [] |
| if DATA_VIEWER_FILE.exists(): |
| with DATA_VIEWER_FILE.open(encoding="utf-8") as fh: |
| for line in fh: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| records.append(json.loads(line)) |
| except json.JSONDecodeError: |
| continue |
|
|
| df = pd.DataFrame(records) |
| if df.empty or not all(c in df.columns for c in _REQUIRED_COLS): |
| _cache = pd.DataFrame(columns=_REQUIRED_COLS) |
| else: |
| df["id"] = df["id"].astype(str) |
| _cache = df |
| return _cache |
|
|
|
|
| def get_index() -> dict: |
| global _index_cache |
| if _index_cache is not None: |
| return _index_cache |
|
|
| if DATA_VIEWER_INDEX_FILE.exists(): |
| try: |
| _index_cache = json.loads(DATA_VIEWER_INDEX_FILE.read_text(encoding="utf-8")) |
| return _index_cache |
| except json.JSONDecodeError: |
| pass |
|
|
| models = set() |
| tasks = {} |
| if DATA_VIEWER_FILE.exists(): |
| with DATA_VIEWER_FILE.open(encoding="utf-8") as fh: |
| for line in fh: |
| if not line.strip(): |
| continue |
| try: |
| item = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| model = item.get("model_name") |
| item_id = str(item.get("id")) |
| prompt = item.get("prompt") or "" |
| if model: |
| models.add(model) |
| if item_id and item_id not in tasks: |
| tasks[item_id] = prompt |
|
|
| _index_cache = { |
| "models": sorted(models), |
| "tasks": [ |
| {"id": item_id, "prompt": tasks[item_id]} |
| for item_id in sorted(tasks, key=lambda value: int(value)) |
| ], |
| } |
| return _index_cache |
|
|
|
|
| def get_entry(model_name: str, item_id: str) -> dict | None: |
| if not model_name or not item_id or not DATA_VIEWER_FILE.exists(): |
| return None |
|
|
| item_id = str(item_id) |
| index = get_index() |
| location = index.get("lookup", {}).get(f"{model_name}\t{item_id}") |
| if location: |
| offset, length = location |
| with DATA_VIEWER_FILE.open("rb") as fh: |
| fh.seek(offset) |
| line = fh.read(length).decode("utf-8") |
| try: |
| item = json.loads(line) |
| if item.get("model_name") == model_name and str(item.get("id")) == item_id: |
| return item |
| except json.JSONDecodeError: |
| pass |
|
|
| with DATA_VIEWER_FILE.open(encoding="utf-8") as fh: |
| for line in fh: |
| if not line.strip(): |
| continue |
| try: |
| item = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if item.get("model_name") == model_name and str(item.get("id")) == item_id: |
| return item |
| return None |
|
|
|
|
| def get_entries_for_task(item_id: str, model_names: set[str]) -> dict[str, dict]: |
| if not item_id or not model_names or not DATA_VIEWER_FILE.exists(): |
| return {} |
|
|
| item_id = str(item_id) |
| index = get_index() |
| locations = { |
| model: index.get("lookup", {}).get(f"{model}\t{item_id}") |
| for model in model_names |
| } |
| locations = {model: loc for model, loc in locations.items() if loc} |
| if locations: |
| found = {} |
| with DATA_VIEWER_FILE.open("rb") as fh: |
| for model, (offset, length) in locations.items(): |
| fh.seek(offset) |
| try: |
| item = json.loads(fh.read(length).decode("utf-8")) |
| if item.get("model_name") == model and str(item.get("id")) == item_id: |
| found[model] = item |
| except json.JSONDecodeError: |
| pass |
| if len(found) == len(locations): |
| return found |
|
|
| found = {} |
| with DATA_VIEWER_FILE.open(encoding="utf-8") as fh: |
| for line in fh: |
| if not line.strip(): |
| continue |
| try: |
| item = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| model = item.get("model_name") |
| if str(item.get("id")) == item_id and model in model_names: |
| found[model] = item |
| if len(found) == len(model_names): |
| break |
| return found |
|
|