| | import pandas as pd
|
| | import numpy as np
|
| | import os
|
| | import time
|
| | import json
|
| | import logging
|
| | from datetime import datetime, timedelta
|
| | from io import StringIO
|
| | import requests
|
| | from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
|
| | from config import *
|
| |
|
| |
|
| | logging.basicConfig(level=logging.INFO)
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | class DataLoader:
|
| | def __init__(self):
|
| | self.df = None
|
| | self.last_updated = "Unknown"
|
| |
|
| | def load_data(self, force_refresh=False):
|
| | """
|
| | Загрузка данных.
|
| | Приоритет:
|
| | 1. Если force_refresh=True -> Скачать свежие.
|
| | 2. Если кеш устарел -> Скачать свежие.
|
| | 3. Если кеш свежий -> Загрузить из кеша.
|
| | 4. Если скачать не удалось -> Fallback на кеш (даже старый).
|
| | """
|
| | if force_refresh or self._needs_update():
|
| | logger.info("🔄 Cache expired or missing. Fetching fresh data...")
|
| | try:
|
| | raw_df = self._fetch_remote_csv()
|
| | self._validate_input_data(raw_df)
|
| | self.df = self._process_data(raw_df)
|
| | self._save_cache()
|
| | logger.info(f"✅ Data processed. Rows: {len(self.df)}")
|
| | except Exception as e:
|
| | logger.error(f"⚠️ Error fetching/processing data: {e}")
|
| |
|
| | if os.path.exists(CACHE_FILE):
|
| | logger.warning("⚠️ Falling back to cached data due to fetch error.")
|
| | self.df = pd.read_parquet(CACHE_FILE)
|
| | self._load_meta()
|
| | else:
|
| | logger.error("❌ No cache available. Returning empty DataFrame.")
|
| | self.df = pd.DataFrame()
|
| | else:
|
| | logger.info("⚡ Loading from cache.")
|
| | try:
|
| | self.df = pd.read_parquet(CACHE_FILE)
|
| | self._load_meta()
|
| | except Exception as e:
|
| | logger.error(f"❌ Corrupt cache, forcing refresh: {e}")
|
| | return self.load_data(force_refresh=True)
|
| |
|
| | return self.df
|
| |
|
| | @retry(
|
| | stop=stop_after_attempt(3),
|
| | wait=wait_exponential(multiplier=1, min=2, max=10),
|
| | retry=retry_if_exception_type(requests.RequestException)
|
| | )
|
| | def _fetch_remote_csv(self):
|
| | """Скачивание CSV с retry-логикой."""
|
| | logger.info(f"⬇️ Downloading CSV from {CSV_URL}...")
|
| | response = requests.get(CSV_URL, timeout=30)
|
| | response.raise_for_status()
|
| | return pd.read_csv(StringIO(response.text), on_bad_lines='skip')
|
| |
|
| | def _validate_input_data(self, df):
|
| | """Базовая валидация структуры CSV."""
|
| | required_cols = ['author/model_name', 'Total Parameters']
|
| | missing = [col for col in required_cols if col not in df.columns]
|
| | if missing:
|
| | raise ValueError(f"Input CSV is missing required columns: {missing}")
|
| | if df.empty:
|
| | raise ValueError("Input CSV is empty")
|
| |
|
| | def _needs_update(self):
|
| | """Проверка необходимости обновления кеша."""
|
| | if not os.path.exists(CACHE_FILE) or not os.path.exists(META_FILE):
|
| | return True
|
| | try:
|
| | with open(META_FILE, 'r') as f:
|
| | data = json.load(f)
|
| | timestamp = data.get('timestamp', 0)
|
| | return (time.time() - timestamp) > CACHE_DURATION
|
| | except (OSError, json.JSONDecodeError, ValueError):
|
| | return True
|
| |
|
| | def clear_cache(self):
|
| | """Принудительная очистка всех файлов кеша."""
|
| | deleted = []
|
| | for file in [CACHE_FILE, META_FILE]:
|
| | if os.path.exists(file):
|
| | try:
|
| | os.remove(file)
|
| | deleted.append(file)
|
| | except OSError as e:
|
| | logger.error(f"⚠️ Failed to delete {file}: {e}")
|
| | if deleted:
|
| | logger.info(f"🗑️ Cleared cache: {', '.join(deleted)}")
|
| | return deleted
|
| |
|
| | def _save_cache(self):
|
| | try:
|
| | self.df.to_parquet(CACHE_FILE)
|
| | with open(META_FILE, 'w') as f:
|
| | json.dump({'timestamp': time.time()}, f)
|
| | self.last_updated = datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d %H:%M")
|
| | except OSError as e:
|
| | logger.error(f"⚠️ Failed to save cache: {e}")
|
| |
|
| | def _load_meta(self):
|
| | try:
|
| | with open(META_FILE, 'r') as f:
|
| | self.last_updated = datetime.fromtimestamp(json.load(f)['timestamp']).strftime("%Y-%m-%d %H:%M")
|
| | except (OSError, json.JSONDecodeError, KeyError):
|
| | self.last_updated = "Unknown"
|
| |
|
| | def _clean_column(self, series, scale=1.0):
|
| | """Безопасная очистка и масштабирование числовых колонок."""
|
| | if pd.api.types.is_string_dtype(series) or series.dtype == 'object':
|
| | series = series.astype(str).str.rstrip('%')
|
| |
|
| |
|
| | series = pd.to_numeric(series, errors='coerce')
|
| |
|
| | if scale > 1:
|
| | return series / scale
|
| | return series
|
| |
|
| | def _get_model_type(self, row):
|
| | """Определение типа модели для сортировки."""
|
| |
|
| | params = row.get('Total Parameters', np.nan)
|
| | if pd.isna(params) or params <= 0:
|
| | return (3, 'P', 'Proprietary')
|
| |
|
| | is_foundation = row.get('Is Foundation', False)
|
| | is_merged = row.get('Is Merged', False)
|
| | is_finetuned = row.get('Is Finetuned', False)
|
| |
|
| | if is_foundation and not is_merged:
|
| | return (0, 'B', 'Base')
|
| | if is_merged:
|
| | return (2, 'M', 'Merge')
|
| | if is_finetuned and not is_merged:
|
| | return (1, 'F', 'Finetune')
|
| |
|
| | return (4, '', 'Unknown')
|
| |
|
| | def _process_data(self, df):
|
| | """Основной пайплайн обработки."""
|
| | logger.info("⚙️ Processing pipeline started...")
|
| | df.columns = df.columns.str.strip()
|
| |
|
| |
|
| | col_groups = {
|
| | 'percentage': (['Textbook', 'Pop Culture', 'Dialogue_Percentage', 'Verb_to_Noun_Ratio',
|
| | 'Show Rec Correlation', 'avg_length_error_pct'], 100.0),
|
| | 'already_norm': (['avg_writing_style_score', 'originality_score', 'internal_semantic_redundancy',
|
| | 'lexical_stuckness', 'wm_recipe_percent_error_score', 'wm_geoguessr_mae_score',
|
| | 'wm_weight_percent_error_score', 'wm_music_mae_score'], 1.0),
|
| | 'numeric': (['Total Parameters', 'Active Parameters', 'Repetition Interrupts', 'Avg Thinking Chars'], 1.0),
|
| | 'scale_10': (['avg_nsfw_score', 'avg_dark_score', 'Hazardous', 'Entertainment',
|
| | 'SocPol', 'W/10-Direct', 'W/10-Adherence'], 10.0)
|
| | }
|
| |
|
| | for group, (cols, scale) in col_groups.items():
|
| | for col in cols:
|
| | if col in df.columns:
|
| | df[col] = self._clean_column(df[col], scale)
|
| | if group == 'already_norm':
|
| | df[col] = df[col].clip(0, 1.0)
|
| | else:
|
| | df[col] = np.nan
|
| |
|
| |
|
| | if 'Is Thinking Model' in df.columns:
|
| |
|
| | df['Is Thinking Model'] = df['Is Thinking Model'].astype(str).str.strip().str.upper().isin(['TRUE', 'YES', '1'])
|
| | else:
|
| | df['Is Thinking Model'] = False
|
| |
|
| | df['Architecture'] = df.get('Architecture', 'Unknown').fillna('Unknown').replace('null', 'Unknown')
|
| |
|
| |
|
| | type_data = df.apply(self._get_model_type, axis=1)
|
| | df['_type_sort'] = type_data.apply(lambda x: x[0])
|
| | df['Type_Code'] = type_data.apply(lambda x: x[1])
|
| | df['Type_Name'] = type_data.apply(lambda x: x[2])
|
| |
|
| | if 'Test Date' in df.columns:
|
| | df['Test Date'] = pd.to_datetime(df['Test Date'], format='%m/%d/%Y', errors='coerce')
|
| | week_ago = datetime.now() - timedelta(days=7)
|
| | df['Is_New'] = df['Test Date'].apply(lambda x: True if pd.notna(x) and x >= week_ago else False)
|
| | df['Test Date'] = df['Test Date'].dt.strftime('%Y-%m-%d')
|
| | else:
|
| | df['Is_New'] = False
|
| |
|
| |
|
| | df['penalty_repetition'] = REPETITION_BASE ** df['Repetition Interrupts'].fillna(0)
|
| |
|
| | chars = df['Avg Thinking Chars'].fillna(0)
|
| |
|
| | df['penalty_thinking'] = np.where(
|
| | df['Is Thinking Model'] & (chars > THINKING_THRESHOLD),
|
| | np.power(THINKING_THRESHOLD / (chars + 1e-6), THINKING_PENALTY_POWER).clip(upper=1.0),
|
| | 1.0
|
| | )
|
| |
|
| |
|
| | df['gauss_Dialogue'] = self._gaussian_score(df['Dialogue_Percentage'], GAUSSIAN_DIALOGUE_TARGET, GAUSSIAN_DIALOGUE_SIGMA)
|
| | df['gauss_VerbNoun'] = self._gaussian_score(df['Verb_to_Noun_Ratio'], GAUSSIAN_VERBNOUN_TARGET, GAUSSIAN_VERBNOUN_SIGMA)
|
| |
|
| |
|
| | norm_config = {
|
| |
|
| | 'norm_Textbook': ('Textbook', 'direct'),
|
| | 'norm_PopCulture': ('Pop Culture', 'direct'),
|
| | 'norm_ShowRec': ('Show Rec Correlation', 'direct'),
|
| | 'norm_Style': ('avg_writing_style_score', 'direct'),
|
| | 'norm_Originality': ('originality_score', 'direct'),
|
| | 'norm_NSFW': ('avg_nsfw_score', 'direct'),
|
| | 'norm_Dark': ('avg_dark_score', 'direct'),
|
| | 'norm_Hazardous': ('Hazardous', 'direct'),
|
| | 'norm_Entertainment': ('Entertainment', 'direct'),
|
| | 'norm_Instruction': ('W/10-Adherence', 'direct'),
|
| | 'norm_Unbound_Direct': ('W/10-Direct', 'direct'),
|
| |
|
| | 'norm_Recipe': ('wm_recipe_percent_error_score', 'direct'),
|
| | 'norm_Geo': ('wm_geoguessr_mae_score', 'direct'),
|
| | 'norm_Weight': ('wm_weight_percent_error_score', 'direct'),
|
| | 'norm_Music': ('wm_music_mae_score', 'direct'),
|
| |
|
| | 'inv_Semantic': ('internal_semantic_redundancy', 'inverse'),
|
| | 'inv_Lexical': ('lexical_stuckness', 'inverse'),
|
| | 'inv_LengthErr': ('avg_length_error_pct', 'inverse')
|
| | }
|
| |
|
| | for dest, (src, mode) in norm_config.items():
|
| | if src in df.columns:
|
| | df[dest] = self._inverse_normalize(df[src]) if mode == 'inverse' else self._robust_normalize(df[src])
|
| | else:
|
| | df[dest] = np.nan
|
| |
|
| |
|
| | composites = {
|
| | 'Composite_WorldModel': ['norm_Recipe', 'norm_Geo', 'norm_Weight', 'norm_Music'],
|
| | 'Composite_Unbound': ['norm_Unbound_Direct', 'norm_Entertainment', 'norm_Hazardous'],
|
| | 'Composite_Redundancy': ['inv_Semantic', 'inv_Lexical']
|
| | }
|
| | for comp, cols in composites.items():
|
| |
|
| | valid_cols = [c for c in cols if c in df.columns]
|
| | if valid_cols:
|
| | df[comp] = df[valid_cols].mean(axis=1, skipna=True)
|
| | else:
|
| | df[comp] = np.nan
|
| |
|
| |
|
| |
|
| |
|
| | logger.info("✅ Processing complete!")
|
| | return df
|
| |
|
| | def _robust_normalize(self, series):
|
| | """Robust normalization with divide-by-zero protection."""
|
| | valid = series.dropna()
|
| | if valid.empty or valid.std() < MIN_STD_THRESHOLD:
|
| | return pd.Series(np.nan, index=series.index)
|
| | q05, q95 = valid.quantile(ROBUST_QUANTILE_LOW), valid.quantile(ROBUST_QUANTILE_HIGH)
|
| | denominator = q95 - q05
|
| | if abs(denominator) < MIN_STD_THRESHOLD:
|
| | return pd.Series(np.nan, index=series.index)
|
| | return (series.clip(q05, q95) - q05) / denominator
|
| |
|
| | def _inverse_normalize(self, series):
|
| | """Inverse robust normalization."""
|
| | valid = series.dropna()
|
| | if valid.empty or valid.std() < MIN_STD_THRESHOLD:
|
| | return pd.Series(np.nan, index=series.index)
|
| | p5, p95 = valid.quantile(ROBUST_QUANTILE_LOW), valid.quantile(ROBUST_QUANTILE_HIGH)
|
| | denominator = p95 - p5
|
| | if abs(denominator) < MIN_STD_THRESHOLD:
|
| | return pd.Series(np.nan, index=series.index)
|
| | return (p95 - series.clip(p5, p95)) / denominator
|
| |
|
| | def _gaussian_score(self, series, target, sigma):
|
| | return np.exp(-((series - target) ** 2) / (2 * sigma ** 2)) |