Spaces:
Running
Running
| from app.core.cache import cache_config | |
| from typing import List, Dict, Any | |
| from pathlib import Path | |
| import json | |
| import datasets | |
| from fastapi import HTTPException | |
| import logging | |
| from app.config.base import HF_ORGANIZATION | |
| from app.config.benchmarks import EEG_BENCHMARKS | |
| from app.core.formatting import LogFormatter | |
| logger = logging.getLogger(__name__) | |
| # Path to sample data bundled with the app | |
| SAMPLE_DATA_PATH = Path(__file__).parent.parent / "data" / "sample_results.json" | |
| def _build_entry_id(data: Dict[str, Any]) -> str: | |
| """Build a unique ID for a leaderboard entry.""" | |
| return ( | |
| f"{data.get('fullname', 'Unknown')}" | |
| f"_{data.get('adapter', 'Unknown')}" | |
| f"_{data.get('Precision', 'Unknown')}" | |
| f"_{data.get('Model sha', 'Unknown')}" | |
| ) | |
| class LeaderboardService: | |
| def __init__(self): | |
| pass | |
| async def fetch_raw_data(self) -> List[Dict[str, Any]]: | |
| """Fetch raw leaderboard data from HuggingFace dataset, with local fallback""" | |
| try: | |
| logger.info(LogFormatter.section("FETCHING LEADERBOARD DATA")) | |
| # Try HuggingFace dataset first | |
| try: | |
| logger.info(LogFormatter.info(f"Loading dataset from {HF_ORGANIZATION}/contents")) | |
| dataset = datasets.load_dataset( | |
| f"{HF_ORGANIZATION}/contents", | |
| cache_dir=cache_config.get_cache_path("datasets") | |
| )["train"] | |
| df = dataset.to_pandas() | |
| data = df.to_dict('records') | |
| stats = { | |
| "Total_Entries": len(data), | |
| "Dataset_Size": f"{df.memory_usage(deep=True).sum() / 1024 / 1024:.1f}MB", | |
| "Source": "HuggingFace Hub", | |
| } | |
| for line in LogFormatter.stats(stats, "Dataset Statistics"): | |
| logger.info(line) | |
| return data | |
| except Exception as hf_error: | |
| logger.warning(LogFormatter.warning( | |
| f"Could not load HF dataset: {hf_error}. Using local sample data." | |
| )) | |
| # Fallback to local sample data | |
| if SAMPLE_DATA_PATH.exists(): | |
| with open(SAMPLE_DATA_PATH, "r") as f: | |
| data = json.load(f) | |
| stats = { | |
| "Total_Entries": len(data), | |
| "Source": "Local sample data", | |
| } | |
| for line in LogFormatter.stats(stats, "Dataset Statistics"): | |
| logger.info(line) | |
| return data | |
| else: | |
| raise HTTPException( | |
| status_code=500, | |
| detail="No data source available: HF dataset not found and no local data." | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(LogFormatter.error("Failed to fetch leaderboard data", e)) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_formatted_data(self) -> List[Dict[str, Any]]: | |
| """Get formatted leaderboard data""" | |
| try: | |
| logger.info(LogFormatter.section("FORMATTING LEADERBOARD DATA")) | |
| raw_data = await self.fetch_raw_data() | |
| formatted_data = [] | |
| type_counts = {} | |
| error_count = 0 | |
| # Initialize progress tracking | |
| total_items = len(raw_data) | |
| logger.info(LogFormatter.info(f"Processing {total_items:,} entries...")) | |
| for i, item in enumerate(raw_data, 1): | |
| try: | |
| formatted_item = await self.transform_data(item) | |
| formatted_data.append(formatted_item) | |
| # Count model types | |
| model_type = formatted_item["model"]["type"] | |
| type_counts[model_type] = type_counts.get(model_type, 0) + 1 | |
| except Exception as e: | |
| error_count += 1 | |
| logger.error(LogFormatter.error(f"Failed to format entry {i}/{total_items}", e)) | |
| continue | |
| # Log progress every 10% | |
| if i % max(1, total_items // 10) == 0: | |
| progress = (i / total_items) * 100 | |
| logger.info(LogFormatter.info(f"Progress: {LogFormatter.progress_bar(i, total_items)}")) | |
| # Log final statistics | |
| stats = { | |
| "Total_Processed": total_items, | |
| "Successful": len(formatted_data), | |
| "Failed": error_count | |
| } | |
| logger.info(LogFormatter.section("PROCESSING SUMMARY")) | |
| for line in LogFormatter.stats(stats, "Processing Statistics"): | |
| logger.info(line) | |
| # Log model type distribution | |
| type_stats = {f"Type_{k}": v for k, v in type_counts.items()} | |
| logger.info(LogFormatter.subsection("MODEL TYPE DISTRIBUTION")) | |
| for line in LogFormatter.stats(type_stats): | |
| logger.info(line) | |
| return formatted_data | |
| except Exception as e: | |
| logger.error(LogFormatter.error("Failed to format leaderboard data", e)) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def transform_data(self, data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Transform raw data into the format expected by the frontend. | |
| Evaluations correspond to EEG downstream datasets used in the | |
| Parameter-Efficient Fine-Tuning benchmark for EEG Foundation Models: | |
| - Motor Imagery: BCIC-2a (4-class), PhysioNet MI (4-class) | |
| - Sleep Staging: ISRUC-SLEEP (5-class) | |
| - Pathology Detection: TUAB (binary), TUEV (6-class) | |
| - Seizure Detection: CHB-MIT (binary) | |
| - Emotion Recognition: FACED (9-class), SEED-V (5-class) | |
| """ | |
| try: | |
| model_name = data.get("fullname", "Unknown") | |
| logger.debug(LogFormatter.info(f"Transforming data for model: {model_name}")) | |
| unique_id = _build_entry_id(data) | |
| # EEG benchmark evaluations from the canonical registry | |
| evaluations = {} | |
| for key, benchmark in EEG_BENCHMARKS.items(): | |
| score = data.get(benchmark.accuracy_field, 0) | |
| evaluations[key] = { | |
| "name": benchmark.display_name, | |
| "value": score, | |
| "normalized_score": score * 100 if score else 0, | |
| } | |
| features = { | |
| "is_not_available_on_hub": data.get("Available on the hub", False), | |
| } | |
| metadata = { | |
| "upload_date": data.get("Upload To Hub Date"), | |
| "submission_date": data.get("Submission Date"), | |
| "base_model": data.get("Base Model"), | |
| "hub_license": data.get("Hub License"), | |
| "hub_hearts": data.get("Hub \u2764\ufe0f"), | |
| "params_millions": data.get("#Params (M)"), | |
| "adapter_method": data.get("adapter"), | |
| "embed_dim": data.get("embed_dim"), | |
| "trainable_params": data.get("trainable_params"), | |
| } | |
| # Adapter method / fine-tuning approach determines the model "type" | |
| original_adapter = data.get("adapter", "") | |
| adapter_lower = original_adapter.lower().strip() if original_adapter else "" | |
| adapter_type_mapping = { | |
| "lora": "lora", | |
| "ia3": "ia3", | |
| "adalora": "adalora", | |
| "dora": "dora", | |
| "oft": "oft", | |
| "probe": "probe", | |
| "full_finetune": "full_finetune", | |
| "full": "full_finetune", | |
| } | |
| mapped_type = adapter_type_mapping.get(adapter_lower, adapter_lower) | |
| # Architecture is the foundation model name | |
| original_arch = data.get("Architecture", "") or data.get("model", "") | |
| arch_lower = original_arch.lower().strip() if original_arch else "" | |
| arch_mapping = { | |
| "labram": "LaBraM", | |
| "labram_small": "LaBraM-Small", | |
| "eegpt": "EEGPT", | |
| "biot": "BIOT", | |
| "bendr": "BENDR", | |
| "signal_jepa": "SignalJEPA", | |
| "signaljepa": "SignalJEPA", | |
| "cbramod": "CBraMod", | |
| "reve": "REVE", | |
| } | |
| architecture = arch_mapping.get(arch_lower, original_arch) | |
| transformed_data = { | |
| "id": unique_id, | |
| "model": { | |
| "name": data.get("fullname"), | |
| "sha": data.get("Model sha"), | |
| "precision": data.get("Precision"), | |
| "type": mapped_type, | |
| "weight_type": data.get("Weight type"), | |
| "architecture": architecture, | |
| "average_score": data.get("Average \u2b06\ufe0f"), | |
| }, | |
| "evaluations": evaluations, | |
| "features": features, | |
| "metadata": metadata, | |
| } | |
| logger.debug(LogFormatter.success(f"Successfully transformed data for {model_name}")) | |
| return transformed_data | |
| except Exception as e: | |
| logger.error(LogFormatter.error(f"Failed to transform data for {data.get('fullname', 'Unknown')}", e)) | |
| raise | |