""" Data loading and processing module for RAG Analytics """ import pandas as pd import os from pathlib import Path from typing import Tuple, List from config import DATA_FOLDER, COLUMN_MAP, METRIC_COLUMNS, NUMERIC_CONFIG_COLUMNS, REQUIRED_COLUMNS, DEBUG def normalize_dataframe(df: pd.DataFrame) -> pd.DataFrame: """ 1. Renames columns by stripping special chars (spaces, =, -). 2. Forces metric columns to numeric (floats). 3. Retains all data without schema validation dropping rows. Args: df: Raw dataframe loaded from CSV Returns: Normalized dataframe with standardized column names and types """ rename_dict = {} for col in df.columns: # Aggressive clean: "RMSE=trace relevance" -> "rmsetracerelevance" # Remove spaces, underscores, hyphens, equals signs clean_col = "".join(ch for ch in str(col).lower() if ch.isalnum()) if clean_col in COLUMN_MAP: rename_dict[col] = COLUMN_MAP[clean_col] df = df.rename(columns=rename_dict) # Force ALL metric columns to float64 (Coerce errors to NaN then 0.0) # This ensures "Empty" strings or invalid values don't crash the graph # Using astype(float) explicitly ensures floating-point display for metric in METRIC_COLUMNS: if metric in df.columns: df[metric] = pd.to_numeric(df[metric], errors='coerce').fillna(0.0).astype(float) # Force ALL numeric configuration columns to float64 # This prevents integers like "256" from displaying as integers in graphs for config_col in NUMERIC_CONFIG_COLUMNS: if config_col in df.columns: # Convert to numeric, but preserve N/A as NaN (don't fill) df[config_col] = pd.to_numeric(df[config_col], errors='coerce').astype(float) return df def validate_dataframe(df: pd.DataFrame) -> Tuple[bool, str]: """ Validates that the dataframe has required columns. Args: df: Dataframe to validate Returns: Tuple of (is_valid, error_message) """ missing_cols = REQUIRED_COLUMNS - set(df.columns) if missing_cols: return False, f"Missing required columns: {', '.join(missing_cols)}" if df.empty: return False, "Dataframe is empty" return True, "Valid" def load_csv_from_folder(folder_path: str = None) -> Tuple[pd.DataFrame, str]: """ Loads all CSV files from the specified folder and combines them. Args: folder_path: Path to folder containing CSV files. If None, uses DATA_FOLDER from config. Returns: Tuple of (combined_dataframe, status_message) """ if folder_path is None: folder_path = DATA_FOLDER folder = Path(folder_path) if not folder.exists(): return pd.DataFrame(), f"Error: Data folder '{folder_path}' does not exist." if not folder.is_dir(): return pd.DataFrame(), f"Error: '{folder_path}' is not a directory." # Find all CSV files csv_files = list(folder.glob("*.csv")) if not csv_files: return pd.DataFrame(), f"Error: No CSV files found in '{folder_path}'." all_dfs = [] loaded_files = [] errors = [] for csv_file in csv_files: try: # Load raw CSV df_raw = pd.read_csv(csv_file, encoding='utf-8-sig') # Normalize column names and types df_clean = normalize_dataframe(df_raw) # Validate is_valid, error_msg = validate_dataframe(df_clean) if not is_valid: errors.append(f"{csv_file.name}: {error_msg}") continue all_dfs.append(df_clean) loaded_files.append(csv_file.name) except Exception as e: errors.append(f"{csv_file.name}: {str(e)}") if not all_dfs: error_summary = "\n".join(errors) if errors else "Unknown error" return pd.DataFrame(), f"Error: Failed to load any valid CSV files.\n{error_summary}" # Combine all dataframes final_df = pd.concat(all_dfs, ignore_index=True) # Build status message status_parts = [f"Successfully loaded {len(final_df)} test runs from {len(loaded_files)} file(s):"] status_parts.extend([f" • {fname}" for fname in loaded_files]) if errors: status_parts.append(f"\n{len(errors)} file(s) skipped due to errors:") status_parts.extend([f" • {err}" for err in errors]) # Add debug info if enabled if DEBUG and not final_df.empty: sample = final_df.iloc[0] debug_info = f"\nDEBUG (Row 1): Relevance={sample.get('rmse_relevance', 'N/A')}, F1={sample.get('f1_score', 'N/A')}, AUCROC={sample.get('aucroc', 'N/A')}" status_parts.append(debug_info) return final_df, "\n".join(status_parts) def get_available_datasets(df: pd.DataFrame) -> List[str]: """ Extracts unique dataset names from the dataframe. Args: df: Dataframe containing dataset_name column Returns: List of unique dataset names """ if df.empty or 'dataset_name' not in df.columns: return [] return sorted(df['dataset_name'].unique().tolist())