|
|
""" |
|
|
Data Transformation Module |
|
|
|
|
|
Handles DataFrame transformations and CSV loading. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import html |
|
|
import re |
|
|
from typing import List, Optional, Union |
|
|
from pathlib import Path |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
|
|
|
from ..core.columns import column_registry, ColumnType |
|
|
from ..core.config import settings |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def parse_parameter_string(value: Union[str, float, int]) -> Optional[float]: |
|
|
""" |
|
|
Parse parameter strings like '307M', '1B', '1.7B', '4B' to numeric values. |
|
|
|
|
|
Args: |
|
|
value: Parameter string (e.g., '307M', '1B', '1.7B') or numeric value. |
|
|
|
|
|
Returns: |
|
|
Numeric value (in millions for consistency) or None if parsing fails. |
|
|
""" |
|
|
if pd.isna(value): |
|
|
return None |
|
|
|
|
|
|
|
|
if isinstance(value, (int, float)): |
|
|
return float(value) |
|
|
|
|
|
value_str = str(value).strip().upper() |
|
|
|
|
|
|
|
|
if value_str in ('', 'N/A', 'NA', 'NAN', 'NONE', '∞'): |
|
|
return None |
|
|
|
|
|
|
|
|
pattern = r'^([\d.]+)\s*([KMBT])?$' |
|
|
match = re.match(pattern, value_str) |
|
|
|
|
|
if not match: |
|
|
return None |
|
|
|
|
|
try: |
|
|
number = float(match.group(1)) |
|
|
suffix = match.group(2) |
|
|
|
|
|
|
|
|
multipliers = { |
|
|
None: 1, |
|
|
'K': 1_000, |
|
|
'M': 1_000_000, |
|
|
'B': 1_000_000_000, |
|
|
'T': 1_000_000_000_000 |
|
|
} |
|
|
|
|
|
return number * multipliers.get(suffix, 1) |
|
|
except (ValueError, TypeError): |
|
|
return None |
|
|
|
|
|
|
|
|
def format_parameter_count(value: Union[float, int, None]) -> str: |
|
|
""" |
|
|
Format a numeric parameter count to human-readable string. |
|
|
|
|
|
Args: |
|
|
value: Numeric parameter count. |
|
|
|
|
|
Returns: |
|
|
Formatted string like '307M', '1.7B', '4B'. |
|
|
""" |
|
|
if pd.isna(value) or value is None: |
|
|
return '' |
|
|
|
|
|
try: |
|
|
value = float(value) |
|
|
except (ValueError, TypeError): |
|
|
return str(value) |
|
|
|
|
|
if value >= 1_000_000_000_000: |
|
|
formatted = value / 1_000_000_000_000 |
|
|
return f"{formatted:.1f}T" if formatted != int(formatted) else f"{int(formatted)}T" |
|
|
elif value >= 1_000_000_000: |
|
|
formatted = value / 1_000_000_000 |
|
|
return f"{formatted:.1f}B" if formatted != int(formatted) else f"{int(formatted)}B" |
|
|
elif value >= 1_000_000: |
|
|
formatted = value / 1_000_000 |
|
|
return f"{formatted:.0f}M" if formatted >= 10 else f"{formatted:.1f}M".rstrip('0').rstrip('.')+"M" if formatted != int(formatted) else f"{int(formatted)}M" |
|
|
elif value >= 1_000: |
|
|
formatted = value / 1_000 |
|
|
return f"{formatted:.0f}K" if formatted >= 10 else f"{formatted:.1f}K" |
|
|
else: |
|
|
return str(int(value)) |
|
|
|
|
|
|
|
|
class DataTransformer: |
|
|
""" |
|
|
Transforms data between different formats. |
|
|
|
|
|
Handles CSV -> DataFrame conversions and display preparation. |
|
|
""" |
|
|
|
|
|
@staticmethod |
|
|
def create_empty_dataframe() -> pd.DataFrame: |
|
|
"""Create an empty DataFrame with all column definitions.""" |
|
|
return pd.DataFrame(columns=column_registry.all_columns) |
|
|
|
|
|
@staticmethod |
|
|
def load_from_csv(file_path: Path = None) -> pd.DataFrame: |
|
|
""" |
|
|
Load leaderboard data from CSV file. |
|
|
|
|
|
Args: |
|
|
file_path: Path to CSV file (uses default if None). |
|
|
|
|
|
Returns: |
|
|
DataFrame with leaderboard data. |
|
|
""" |
|
|
path = file_path or settings.data.csv_file |
|
|
|
|
|
if not path.exists(): |
|
|
logger.warning(f"CSV file not found: {path}") |
|
|
return DataTransformer.create_empty_dataframe() |
|
|
|
|
|
try: |
|
|
df = pd.read_csv(path) |
|
|
logger.info(f"Loaded {len(df)} records from {path}") |
|
|
|
|
|
|
|
|
df = DataTransformer._normalize_columns(df) |
|
|
df = DataTransformer._convert_parameters_to_numeric(df) |
|
|
df = DataTransformer._sort_by_rank(df) |
|
|
|
|
|
return df |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error loading CSV: {e}") |
|
|
return DataTransformer.create_empty_dataframe() |
|
|
|
|
|
@staticmethod |
|
|
def _normalize_columns(df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Normalize column names from CSV variations to standard names.""" |
|
|
|
|
|
column_mappings = { |
|
|
"Mean (TaskType)": "MTEB Score", |
|
|
"Score(Legal)": "Legal Score", |
|
|
"Embedding Dimensions": "Embed Dim", |
|
|
"Embedding Dim": "Embed Dim", |
|
|
"Max Tokens": "Max Sequence Length", |
|
|
"Max Seq Length": "Max Sequence Length", |
|
|
"Number of Parameters": "Parameters", |
|
|
"PairClassification": "Pair Classification", |
|
|
"Vocabulary Size": "Vocab Size", |
|
|
"Vocabulary": "Vocab Size", |
|
|
} |
|
|
|
|
|
df = df.copy() |
|
|
|
|
|
|
|
|
for old_name, new_name in column_mappings.items(): |
|
|
if old_name in df.columns and new_name not in df.columns: |
|
|
df = df.rename(columns={old_name: new_name}) |
|
|
|
|
|
return df |
|
|
|
|
|
@staticmethod |
|
|
def _sort_by_rank(df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Sort DataFrame by MTEB Score descending and recalculate ranks.""" |
|
|
if "MTEB Score" in df.columns: |
|
|
|
|
|
df = df.sort_values("MTEB Score", ascending=False, na_position='last').reset_index(drop=True) |
|
|
|
|
|
df["Rank"] = range(1, len(df) + 1) |
|
|
elif "Rank" in df.columns: |
|
|
|
|
|
df = df.sort_values("Rank", ascending=True).reset_index(drop=True) |
|
|
return df |
|
|
|
|
|
@staticmethod |
|
|
def _convert_parameters_to_numeric(df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
Convert Parameters column from string format to numeric for proper sorting. |
|
|
|
|
|
Converts values like '307M', '1B', '1.7B' to numeric values. |
|
|
""" |
|
|
if "Parameters" not in df.columns: |
|
|
return df |
|
|
|
|
|
df = df.copy() |
|
|
df["Parameters"] = df["Parameters"].apply(parse_parameter_string) |
|
|
return df |
|
|
|
|
|
@staticmethod |
|
|
def add_model_links(df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Add clickable HuggingFace links to Model column.""" |
|
|
if "Model" not in df.columns: |
|
|
return df |
|
|
|
|
|
df = df.copy() |
|
|
df["Model"] = df["Model"].apply( |
|
|
lambda x: f'<a href="https://huggingface.co/{html.escape(str(x))}" target="_blank" ' |
|
|
f'style="color: #2563eb; text-decoration: underline;">{html.escape(str(x))}</a>' |
|
|
) |
|
|
return df |
|
|
|
|
|
@staticmethod |
|
|
def ensure_numeric_columns(df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Convert numeric columns to proper types.""" |
|
|
df = df.copy() |
|
|
|
|
|
for col_name in column_registry.numeric_columns: |
|
|
if col_name not in df.columns: |
|
|
continue |
|
|
|
|
|
col_def = column_registry.get(col_name) |
|
|
if col_def is None: |
|
|
continue |
|
|
|
|
|
|
|
|
df[col_name] = df[col_name].replace("N/A", pd.NA) |
|
|
df[col_name] = pd.to_numeric(df[col_name], errors='coerce') |
|
|
|
|
|
|
|
|
if col_def.decimals == 0: |
|
|
|
|
|
pass |
|
|
else: |
|
|
df[col_name] = df[col_name].round(col_def.decimals) |
|
|
|
|
|
return df |
|
|
|
|
|
@staticmethod |
|
|
def filter_columns(df: pd.DataFrame, columns: List[str]) -> pd.DataFrame: |
|
|
"""Filter DataFrame to only include specified columns (preserves order).""" |
|
|
available = [col for col in columns if col in df.columns] |
|
|
return df[available] |
|
|
|
|
|
@classmethod |
|
|
def prepare_for_display( |
|
|
cls, |
|
|
df: pd.DataFrame, |
|
|
columns: List[str] = None, |
|
|
add_links: bool = True |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
Prepare DataFrame for Gradio display. |
|
|
|
|
|
Args: |
|
|
df: Source DataFrame. |
|
|
columns: Columns to include (preserves order passed in). |
|
|
add_links: Whether to add HuggingFace links. |
|
|
|
|
|
Returns: |
|
|
Prepared DataFrame. |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return cls.create_empty_dataframe() |
|
|
|
|
|
|
|
|
result = df.copy() |
|
|
|
|
|
|
|
|
if columns: |
|
|
result = cls.filter_columns(result, columns) |
|
|
|
|
|
|
|
|
result = cls.ensure_numeric_columns(result) |
|
|
|
|
|
|
|
|
if add_links and "Model" in result.columns: |
|
|
result = cls.add_model_links(result) |
|
|
|
|
|
return result |
|
|
|