""" Data Loader: Load from HuggingFace, parse JSON files, and build tables. """ import json import pandas as pd from pathlib import Path from bisect import bisect_left from datasets import load_dataset # Global caches HF_DATASET_CACHE = {} LEADERBOARD_CACHE = {} # Compact search index: tuples (model_lower, model_name, leaderboard_lower) MODEL_SEARCH_INDEX = [] # Prefix map for fast narrowing by model prefix MODEL_PREFIX_MAP = {} # Lightweight incremental cache LAST_QUERY = "" LAST_RESULTS = [] DATA_DIR = Path("leaderboard_data") def load_hf_dataset_on_startup(): """Load all splits from HuggingFace dataset at startup.""" print("Loading dataset from HuggingFace...") try: dataset = load_dataset("evaleval/every_eval_ever") for split_name, split_data in dataset.items(): print(f"Loading split: {split_name} ({len(split_data)} rows)") df = split_data.to_pandas() parsed_items = [] for _, row in df.iterrows(): evaluation_results = json.loads(row['evaluation_results']) results = {} for eval_result in evaluation_results: eval_name = eval_result.get("evaluation_name") score = eval_result.get("score_details", {}).get("score") if eval_name and score is not None: results[eval_name] = score additional_details = {} if pd.notna(row.get('additional_details')): additional_details = json.loads(row['additional_details']) parsed_item = { "leaderboard": row['_leaderboard'], "provider": row['source_organization_name'], "model": row['model_id'], "developer": row['model_developer'], "params": additional_details.get('params_billions'), "architecture": additional_details.get('architecture', 'Unknown'), "precision": additional_details.get('precision', 'Unknown'), "results": results, "raw_data": { "schema_version": row['schema_version'], "evaluation_id": row['evaluation_id'], "retrieved_timestamp": row['retrieved_timestamp'], "source_data": json.loads(row['source_data']), "evaluation_source": { "evaluation_source_name": row['evaluation_source_name'], "evaluation_source_type": row['evaluation_source_type'] }, "source_metadata": { "source_organization_name": row['source_organization_name'], "evaluator_relationship": row['evaluator_relationship'], }, "model_info": { "name": row['model_name'], "id": row['model_id'], "developer": row['model_developer'], }, "evaluation_results": evaluation_results, "additional_details": additional_details } } if pd.notna(row.get('source_organization_url')): parsed_item["raw_data"]["source_metadata"]["source_organization_url"] = row['source_organization_url'] if pd.notna(row.get('source_organization_logo_url')): parsed_item["raw_data"]["source_metadata"]["source_organization_logo_url"] = row['source_organization_logo_url'] if pd.notna(row.get('model_inference_platform')): parsed_item["raw_data"]["model_info"]["inference_platform"] = row['model_inference_platform'] parsed_items.append(parsed_item) HF_DATASET_CACHE[split_name] = parsed_items print(f"Loaded {len(HF_DATASET_CACHE)} leaderboard(s) from HuggingFace") _build_search_index() return True except Exception as e: print(f"Warning: Could not load HuggingFace dataset: {e}") print("Falling back to local file system...") return False def parse_eval_json(file_path): """Parses a single JSON file to extract model, provider, and results.""" try: with open(file_path, 'r') as f: data = json.load(f) leaderboard_name = data.get("evaluation_source", {}).get("evaluation_source_name", "Unknown Leaderboard") provider_name = data.get("source_metadata", {}).get("source_organization_name", "Unknown Provider") model_id = data.get("model_info", {}).get("id", "Unknown Model") developer_name = data.get("model_info", {}).get("developer", "Unknown Developer") params = data.get("model_info", {}).get("params_billions", None) architecture = data.get("model_info", {}).get("architecture", "Unknown") precision = data.get("additional_details", {}).get("precision", "Unknown") if precision == "Unknown": precision = data.get("model_info", {}).get("precision", "Unknown") results = {} if "evaluation_results" in data: for res in data["evaluation_results"]: eval_name = res.get("evaluation_name", "Unknown Metric") score = res.get("score_details", {}).get("score", None) if score is not None: results[eval_name] = score return { "leaderboard": leaderboard_name, "provider": provider_name, "model": model_id, "developer": developer_name, "params": params, "architecture": architecture, "precision": precision, "results": results, "raw_data": data } except Exception as e: print(f"Error parsing {file_path}: {e}") return None def get_available_leaderboards(): """Returns available leaderboards from HF cache or local directory.""" if HF_DATASET_CACHE: return list(HF_DATASET_CACHE.keys()) if not DATA_DIR.exists(): return [] return [d.name for d in DATA_DIR.iterdir() if d.is_dir()] def walk_eval_files(leaderboard_name): """Generator that walks through Leaderboard directory recursively.""" lb_path = DATA_DIR / leaderboard_name if not lb_path.exists(): return yield from lb_path.rglob("*.json") def get_eval_metadata(selected_leaderboard): """Extracts evaluation metadata from the leaderboard data.""" if not selected_leaderboard: return {} eval_metadata = {"evals": {}, "source_info": {}} if selected_leaderboard in HF_DATASET_CACHE: parsed_items = HF_DATASET_CACHE[selected_leaderboard] if parsed_items: parsed = parsed_items[0] source_meta = parsed["raw_data"].get("source_metadata", {}) source_data_list = parsed["raw_data"].get("source_data", []) url = source_data_list[0] if isinstance(source_data_list, list) and source_data_list else "#" eval_metadata["source_info"] = { "organization": source_meta.get("source_organization_name", "Unknown"), "relationship": source_meta.get("evaluator_relationship", "Unknown"), "url": url } if "evaluation_results" in parsed["raw_data"]: for res in parsed["raw_data"]["evaluation_results"]: eval_name = res.get("evaluation_name", "Unknown Metric") if eval_name not in eval_metadata["evals"]: metric_config = res.get("metric_config", {}) eval_metadata["evals"][eval_name] = { "description": metric_config.get("evaluation_description", "No description available"), "score_type": metric_config.get("score_type", "unknown"), "lower_is_better": metric_config.get("lower_is_better", False), "min_score": metric_config.get("min_score"), "max_score": metric_config.get("max_score"), "level_names": metric_config.get("level_names", []), "level_metadata": metric_config.get("level_metadata", []), "has_unknown_level": metric_config.get("has_unknown_level", False) } return eval_metadata # Fall back to file system for json_file in walk_eval_files(selected_leaderboard): parsed = parse_eval_json(json_file) if parsed: if not eval_metadata["source_info"]: source_meta = parsed["raw_data"].get("source_metadata", {}) source_data_list = parsed["raw_data"].get("source_data", []) url = source_data_list[0] if isinstance(source_data_list, list) and source_data_list else "#" eval_metadata["source_info"] = { "organization": source_meta.get("source_organization_name", "Unknown"), "relationship": source_meta.get("evaluator_relationship", "Unknown"), "url": url } if "evaluation_results" in parsed["raw_data"]: for res in parsed["raw_data"]["evaluation_results"]: eval_name = res.get("evaluation_name", "Unknown Metric") if eval_name not in eval_metadata["evals"]: metric_config = res.get("metric_config", {}) eval_metadata["evals"][eval_name] = { "description": metric_config.get("evaluation_description", "No description available"), "score_type": metric_config.get("score_type", "unknown"), "lower_is_better": metric_config.get("lower_is_better", False), "min_score": metric_config.get("min_score"), "max_score": metric_config.get("max_score"), "level_names": metric_config.get("level_names", []), "level_metadata": metric_config.get("level_metadata", []), "has_unknown_level": metric_config.get("has_unknown_level", False) } break return eval_metadata def build_leaderboard_table(selected_leaderboard, search_query="", progress_callback=None): """Builds the leaderboard DataFrame from cache or files.""" if not selected_leaderboard: return pd.DataFrame() if selected_leaderboard in LEADERBOARD_CACHE: df, _ = LEADERBOARD_CACHE[selected_leaderboard] else: rows = [] if selected_leaderboard in HF_DATASET_CACHE: if progress_callback: progress_callback(0, desc=f"Loading {selected_leaderboard} from cache...") parsed_items = HF_DATASET_CACHE[selected_leaderboard] for i, parsed in enumerate(parsed_items): if i % 100 == 0 and progress_callback: progress_callback((i / len(parsed_items)), desc=f"Processing {selected_leaderboard}...") row = { "Model": parsed["model"], "Developer": parsed["developer"], "Params (B)": parsed["params"], "Arch": parsed["architecture"], "Precision": parsed["precision"] } row.update(parsed["results"]) rows.append(row) else: # Fall back to file system if progress_callback: progress_callback(0, desc=f"Scanning {selected_leaderboard}...") all_files = list(walk_eval_files(selected_leaderboard)) total_files = len(all_files) for i, json_file in enumerate(all_files): if i % 100 == 0 and progress_callback: progress_callback((i / total_files), desc=f"Loading {selected_leaderboard}...") parsed = parse_eval_json(json_file) if parsed: row = { "Model": parsed["model"], "Developer": parsed["developer"], "Params (B)": parsed["params"], "Arch": parsed["architecture"], "Precision": parsed["precision"] } row.update(parsed["results"]) rows.append(row) if not rows: df = pd.DataFrame(columns=["Model", "Developer", "Params (B)", "Arch", "Precision"]) LEADERBOARD_CACHE[selected_leaderboard] = (df, None) return df df = pd.DataFrame(rows) df = df.dropna(axis=1, how='all') if df.empty: LEADERBOARD_CACHE[selected_leaderboard] = (df, None) return df numeric_cols = df.select_dtypes(include=['float', 'int']).columns df[numeric_cols] = df[numeric_cols].round(2) # Add Average Score eval_only_cols = [c for c in numeric_cols if c not in ["Params (B)"]] if len(eval_only_cols) > 0: df["Average"] = df[eval_only_cols].mean(axis=1).round(2) # Base columns: Model, Developer, Params, Average # Eval columns: all evaluation scores # Model detail columns: Arch, Precision (moved to end) base_cols = ["Model", "Developer", "Params (B)", "Average"] model_detail_cols = ["Arch", "Precision"] eval_cols = [c for c in df.columns if c not in base_cols and c not in model_detail_cols] base_cols = [c for c in base_cols if c in df.columns] model_detail_cols = [c for c in model_detail_cols if c in df.columns] final_cols = base_cols + sorted(eval_cols) + model_detail_cols df = df[final_cols] if "Average" in df.columns: df = df.sort_values("Average", ascending=False) LEADERBOARD_CACHE[selected_leaderboard] = (df, None) return df def clear_cache(): """Clears all caches.""" LEADERBOARD_CACHE.clear() def _build_search_index(): """Build compact sorted search index for fast prefix/substring matching.""" global MODEL_SEARCH_INDEX, MODEL_PREFIX_MAP, LAST_QUERY, LAST_RESULTS entries = [] for leaderboard_name, parsed_items in HF_DATASET_CACHE.items(): lb_lower = leaderboard_name.lower() for item in parsed_items: model_name = item.get("model", "") entries.append((model_name.lower(), model_name, lb_lower)) # Sort by model_lower for prefix binary search MODEL_SEARCH_INDEX = sorted(entries, key=lambda x: x[0]) # Build small prefix map (first 2 chars of model) to narrow searches MODEL_PREFIX_MAP = {} for model_lower, model_name, lb_lower in MODEL_SEARCH_INDEX: key = model_lower[:2] if len(model_lower) >= 2 else model_lower MODEL_PREFIX_MAP.setdefault(key, []).append((model_lower, model_name, lb_lower)) # Reset incremental cache LAST_QUERY = "" LAST_RESULTS = [] print(f"Built search index with {len(MODEL_SEARCH_INDEX)} entries") def get_model_suggestions_fast(query, limit=15): """Fast search with prefix narrowing and incremental reuse (substring only).""" global LAST_QUERY, LAST_RESULTS if not query or len(query) < 2 or not MODEL_SEARCH_INDEX: return [] query_lower = query.lower() results = [] # Incremental reuse: if user keeps typing the same prefix, reuse last pool base_pool = None if LAST_QUERY and query_lower.startswith(LAST_QUERY) and LAST_RESULTS: base_pool = LAST_RESULTS else: prefix_key = query_lower[:2] base_pool = MODEL_PREFIX_MAP.get(prefix_key, MODEL_SEARCH_INDEX) # 1) Prefix match on model names if base_pool is MODEL_SEARCH_INDEX: idx = bisect_left(MODEL_SEARCH_INDEX, (query_lower,)) while idx < len(MODEL_SEARCH_INDEX) and len(results) < limit: name_lower, name_orig, lb_lower = MODEL_SEARCH_INDEX[idx] if name_lower.startswith(query_lower): results.append((0, len(name_lower), name_orig)) idx += 1 else: break else: for name_lower, name_orig, lb_lower in base_pool: if name_lower.startswith(query_lower): results.append((0, len(name_lower), name_orig)) if len(results) >= limit: break # 2) Substring fallback on the narrowed pool if len(results) < limit: seen = {r[2] for r in results} # Use full index for substring to catch leaderboard-name matches scan_pool = MODEL_SEARCH_INDEX for name_lower, name_orig, lb_lower in scan_pool: if name_orig in seen: continue pos_model = name_lower.find(query_lower) pos_lb = lb_lower.find(query_lower) if pos_model != -1 or pos_lb != -1: # Prefer model matches; leaderboard-only matches still allowed pos = pos_model if pos_model != -1 else (pos_lb + 1) results.append((pos, len(name_lower), name_orig)) if len(results) >= limit * 2: break results.sort(key=lambda x: (x[0], x[1])) # Update incremental cache LAST_QUERY = query_lower LAST_RESULTS = base_pool if base_pool is not None else MODEL_SEARCH_INDEX return [r[2] for r in results[:limit]] def search_model_across_leaderboards(model_query): """Search for a model across all leaderboards and return aggregated results.""" if not model_query or not HF_DATASET_CACHE: return {}, [] # Use fast fuzzy search for suggestions matches = get_model_suggestions_fast(model_query, limit=20) # Get detailed results only for matched models results = {} for leaderboard_name, parsed_items in HF_DATASET_CACHE.items(): for item in parsed_items: model_id = item.get("model", "") if model_id in matches: if model_id not in results: results[model_id] = {} results[model_id][leaderboard_name] = { "developer": item.get("developer"), "params": item.get("params"), "architecture": item.get("architecture"), "precision": item.get("precision"), "results": item.get("results", {}) } return results, matches def get_all_model_names(): """Get all unique model names across all leaderboards.""" if not HF_DATASET_CACHE: return [] models = set() for parsed_items in HF_DATASET_CACHE.values(): for item in parsed_items: models.add(item.get("model", "")) return sorted(models)