UGI-Leaderboard-Presets / data_loader.py
VOIDER's picture
🛡️ Security & Stability Update
a597782 verified
import pandas as pd
import numpy as np
import os
import time
import json
import logging
from datetime import datetime, timedelta
from io import StringIO
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from config import *
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataLoader:
def __init__(self):
self.df = None
self.last_updated = "Unknown"
def load_data(self, force_refresh=False):
"""
Загрузка данных.
Приоритет:
1. Если force_refresh=True -> Скачать свежие.
2. Если кеш устарел -> Скачать свежие.
3. Если кеш свежий -> Загрузить из кеша.
4. Если скачать не удалось -> Fallback на кеш (даже старый).
"""
if force_refresh or self._needs_update():
logger.info("🔄 Cache expired or missing. Fetching fresh data...")
try:
raw_df = self._fetch_remote_csv()
self._validate_input_data(raw_df)
self.df = self._process_data(raw_df)
self._save_cache()
logger.info(f"✅ Data processed. Rows: {len(self.df)}")
except Exception as e:
logger.error(f"⚠️ Error fetching/processing data: {e}")
# Fallback to cache if available
if os.path.exists(CACHE_FILE):
logger.warning("⚠️ Falling back to cached data due to fetch error.")
self.df = pd.read_parquet(CACHE_FILE)
self._load_meta()
else:
logger.error("❌ No cache available. Returning empty DataFrame.")
self.df = pd.DataFrame()
else:
logger.info("⚡ Loading from cache.")
try:
self.df = pd.read_parquet(CACHE_FILE)
self._load_meta()
except Exception as e:
logger.error(f"❌ Corrupt cache, forcing refresh: {e}")
return self.load_data(force_refresh=True)
return self.df
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(requests.RequestException)
)
def _fetch_remote_csv(self):
"""Скачивание CSV с retry-логикой."""
logger.info(f"⬇️ Downloading CSV from {CSV_URL}...")
response = requests.get(CSV_URL, timeout=30)
response.raise_for_status()
return pd.read_csv(StringIO(response.text), on_bad_lines='skip')
def _validate_input_data(self, df):
"""Базовая валидация структуры CSV."""
required_cols = ['author/model_name', 'Total Parameters']
missing = [col for col in required_cols if col not in df.columns]
if missing:
raise ValueError(f"Input CSV is missing required columns: {missing}")
if df.empty:
raise ValueError("Input CSV is empty")
def _needs_update(self):
"""Проверка необходимости обновления кеша."""
if not os.path.exists(CACHE_FILE) or not os.path.exists(META_FILE):
return True
try:
with open(META_FILE, 'r') as f:
data = json.load(f)
timestamp = data.get('timestamp', 0)
return (time.time() - timestamp) > CACHE_DURATION
except (OSError, json.JSONDecodeError, ValueError):
return True
def clear_cache(self):
"""Принудительная очистка всех файлов кеша."""
deleted = []
for file in [CACHE_FILE, META_FILE]:
if os.path.exists(file):
try:
os.remove(file)
deleted.append(file)
except OSError as e:
logger.error(f"⚠️ Failed to delete {file}: {e}")
if deleted:
logger.info(f"🗑️ Cleared cache: {', '.join(deleted)}")
return deleted
def _save_cache(self):
try:
self.df.to_parquet(CACHE_FILE)
with open(META_FILE, 'w') as f:
json.dump({'timestamp': time.time()}, f)
self.last_updated = datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d %H:%M")
except OSError as e:
logger.error(f"⚠️ Failed to save cache: {e}")
def _load_meta(self):
try:
with open(META_FILE, 'r') as f:
self.last_updated = datetime.fromtimestamp(json.load(f)['timestamp']).strftime("%Y-%m-%d %H:%M")
except (OSError, json.JSONDecodeError, KeyError):
self.last_updated = "Unknown"
def _clean_column(self, series, scale=1.0):
"""Безопасная очистка и масштабирование числовых колонок."""
if pd.api.types.is_string_dtype(series) or series.dtype == 'object':
series = series.astype(str).str.rstrip('%')
# Coerce errors to NaN (NO sentinel values)
series = pd.to_numeric(series, errors='coerce')
if scale > 1:
return series / scale
return series
def _get_model_type(self, row):
"""Определение типа модели для сортировки."""
# Returns: (sort_value, short_code, full_name)
params = row.get('Total Parameters', np.nan)
if pd.isna(params) or params <= 0:
return (3, 'P', 'Proprietary')
is_foundation = row.get('Is Foundation', False)
is_merged = row.get('Is Merged', False)
is_finetuned = row.get('Is Finetuned', False)
if is_foundation and not is_merged:
return (0, 'B', 'Base')
if is_merged:
return (2, 'M', 'Merge')
if is_finetuned and not is_merged:
return (1, 'F', 'Finetune')
return (4, '', 'Unknown')
def _process_data(self, df):
"""Основной пайплайн обработки."""
logger.info("⚙️ Processing pipeline started...")
df.columns = df.columns.str.strip()
# === 1. COLUMN GROUPS ===
col_groups = {
'percentage': (['Textbook', 'Pop Culture', 'Dialogue_Percentage', 'Verb_to_Noun_Ratio',
'Show Rec Correlation', 'avg_length_error_pct'], 100.0),
'already_norm': (['avg_writing_style_score', 'originality_score', 'internal_semantic_redundancy',
'lexical_stuckness', 'wm_recipe_percent_error_score', 'wm_geoguessr_mae_score',
'wm_weight_percent_error_score', 'wm_music_mae_score'], 1.0),
'numeric': (['Total Parameters', 'Active Parameters', 'Repetition Interrupts', 'Avg Thinking Chars'], 1.0),
'scale_10': (['avg_nsfw_score', 'avg_dark_score', 'Hazardous', 'Entertainment',
'SocPol', 'W/10-Direct', 'W/10-Adherence'], 10.0)
}
for group, (cols, scale) in col_groups.items():
for col in cols:
if col in df.columns:
df[col] = self._clean_column(df[col], scale)
if group == 'already_norm':
df[col] = df[col].clip(0, 1.0)
else:
df[col] = np.nan
# === 2. BOOLEANS & STRINGS ===
if 'Is Thinking Model' in df.columns:
# Robust boolean conversion
df['Is Thinking Model'] = df['Is Thinking Model'].astype(str).str.strip().str.upper().isin(['TRUE', 'YES', '1'])
else:
df['Is Thinking Model'] = False
df['Architecture'] = df.get('Architecture', 'Unknown').fillna('Unknown').replace('null', 'Unknown')
# === 3. MODEL TYPES & DATES ===
type_data = df.apply(self._get_model_type, axis=1)
df['_type_sort'] = type_data.apply(lambda x: x[0])
df['Type_Code'] = type_data.apply(lambda x: x[1])
df['Type_Name'] = type_data.apply(lambda x: x[2])
if 'Test Date' in df.columns:
df['Test Date'] = pd.to_datetime(df['Test Date'], format='%m/%d/%Y', errors='coerce')
week_ago = datetime.now() - timedelta(days=7)
df['Is_New'] = df['Test Date'].apply(lambda x: True if pd.notna(x) and x >= week_ago else False)
df['Test Date'] = df['Test Date'].dt.strftime('%Y-%m-%d')
else:
df['Is_New'] = False
# === 4. PENALTIES ===
df['penalty_repetition'] = REPETITION_BASE ** df['Repetition Interrupts'].fillna(0)
chars = df['Avg Thinking Chars'].fillna(0)
# Avoid division by zero with safe implementation
df['penalty_thinking'] = np.where(
df['Is Thinking Model'] & (chars > THINKING_THRESHOLD),
np.power(THINKING_THRESHOLD / (chars + 1e-6), THINKING_PENALTY_POWER).clip(upper=1.0),
1.0
)
# === 5. GAUSSIAN SCORES ===
df['gauss_Dialogue'] = self._gaussian_score(df['Dialogue_Percentage'], GAUSSIAN_DIALOGUE_TARGET, GAUSSIAN_DIALOGUE_SIGMA)
df['gauss_VerbNoun'] = self._gaussian_score(df['Verb_to_Noun_Ratio'], GAUSSIAN_VERBNOUN_TARGET, GAUSSIAN_VERBNOUN_SIGMA)
# === 6. NORMALIZATION ===
norm_config = {
# Direct normalization (Higher = Better)
'norm_Textbook': ('Textbook', 'direct'),
'norm_PopCulture': ('Pop Culture', 'direct'),
'norm_ShowRec': ('Show Rec Correlation', 'direct'),
'norm_Style': ('avg_writing_style_score', 'direct'),
'norm_Originality': ('originality_score', 'direct'),
'norm_NSFW': ('avg_nsfw_score', 'direct'),
'norm_Dark': ('avg_dark_score', 'direct'),
'norm_Hazardous': ('Hazardous', 'direct'),
'norm_Entertainment': ('Entertainment', 'direct'),
'norm_Instruction': ('W/10-Adherence', 'direct'),
'norm_Unbound_Direct': ('W/10-Direct', 'direct'),
# World Model (Direct)
'norm_Recipe': ('wm_recipe_percent_error_score', 'direct'),
'norm_Geo': ('wm_geoguessr_mae_score', 'direct'),
'norm_Weight': ('wm_weight_percent_error_score', 'direct'),
'norm_Music': ('wm_music_mae_score', 'direct'),
# Inverse normalization (Higher = Worse)
'inv_Semantic': ('internal_semantic_redundancy', 'inverse'),
'inv_Lexical': ('lexical_stuckness', 'inverse'),
'inv_LengthErr': ('avg_length_error_pct', 'inverse')
}
for dest, (src, mode) in norm_config.items():
if src in df.columns:
df[dest] = self._inverse_normalize(df[src]) if mode == 'inverse' else self._robust_normalize(df[src])
else:
df[dest] = np.nan
# === 7. COMPOSITES ===
composites = {
'Composite_WorldModel': ['norm_Recipe', 'norm_Geo', 'norm_Weight', 'norm_Music'],
'Composite_Unbound': ['norm_Unbound_Direct', 'norm_Entertainment', 'norm_Hazardous'],
'Composite_Redundancy': ['inv_Semantic', 'inv_Lexical']
}
for comp, cols in composites.items():
# Calculate mean ignoring NaNs
valid_cols = [c for c in cols if c in df.columns]
if valid_cols:
df[comp] = df[valid_cols].mean(axis=1, skipna=True)
else:
df[comp] = np.nan
# REMOVED: Section 8 (Sentinel Value Filling).
# We now keep NaNs as NaNs to be handled by the scoring engine and UI sorting logic.
logger.info("✅ Processing complete!")
return df
def _robust_normalize(self, series):
"""Robust normalization with divide-by-zero protection."""
valid = series.dropna()
if valid.empty or valid.std() < MIN_STD_THRESHOLD:
return pd.Series(np.nan, index=series.index)
q05, q95 = valid.quantile(ROBUST_QUANTILE_LOW), valid.quantile(ROBUST_QUANTILE_HIGH)
denominator = q95 - q05
if abs(denominator) < MIN_STD_THRESHOLD:
return pd.Series(np.nan, index=series.index)
return (series.clip(q05, q95) - q05) / denominator
def _inverse_normalize(self, series):
"""Inverse robust normalization."""
valid = series.dropna()
if valid.empty or valid.std() < MIN_STD_THRESHOLD:
return pd.Series(np.nan, index=series.index)
p5, p95 = valid.quantile(ROBUST_QUANTILE_LOW), valid.quantile(ROBUST_QUANTILE_HIGH)
denominator = p95 - p5
if abs(denominator) < MIN_STD_THRESHOLD:
return pd.Series(np.nan, index=series.index)
return (p95 - series.clip(p5, p95)) / denominator
def _gaussian_score(self, series, target, sigma):
return np.exp(-((series - target) ** 2) / (2 * sigma ** 2))