Spaces:
Sleeping
Sleeping
| """ | |
| Data loading and processing module for RAG Analytics | |
| """ | |
| import pandas as pd | |
| import os | |
| from pathlib import Path | |
| from typing import Tuple, List | |
| from config import DATA_FOLDER, COLUMN_MAP, METRIC_COLUMNS, NUMERIC_CONFIG_COLUMNS, REQUIRED_COLUMNS, DEBUG | |
| def normalize_dataframe(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| 1. Renames columns by stripping special chars (spaces, =, -). | |
| 2. Forces metric columns to numeric (floats). | |
| 3. Retains all data without schema validation dropping rows. | |
| Args: | |
| df: Raw dataframe loaded from CSV | |
| Returns: | |
| Normalized dataframe with standardized column names and types | |
| """ | |
| rename_dict = {} | |
| for col in df.columns: | |
| # Aggressive clean: "RMSE=trace relevance" -> "rmsetracerelevance" | |
| # Remove spaces, underscores, hyphens, equals signs | |
| clean_col = "".join(ch for ch in str(col).lower() if ch.isalnum()) | |
| if clean_col in COLUMN_MAP: | |
| rename_dict[col] = COLUMN_MAP[clean_col] | |
| df = df.rename(columns=rename_dict) | |
| # Force ALL metric columns to float64 (Coerce errors to NaN then 0.0) | |
| # This ensures "Empty" strings or invalid values don't crash the graph | |
| # Using astype(float) explicitly ensures floating-point display | |
| for metric in METRIC_COLUMNS: | |
| if metric in df.columns: | |
| df[metric] = pd.to_numeric(df[metric], errors='coerce').fillna(0.0).astype(float) | |
| # Force ALL numeric configuration columns to float64 | |
| # This prevents integers like "256" from displaying as integers in graphs | |
| for config_col in NUMERIC_CONFIG_COLUMNS: | |
| if config_col in df.columns: | |
| # Convert to numeric, but preserve N/A as NaN (don't fill) | |
| df[config_col] = pd.to_numeric(df[config_col], errors='coerce').astype(float) | |
| return df | |
| def validate_dataframe(df: pd.DataFrame) -> Tuple[bool, str]: | |
| """ | |
| Validates that the dataframe has required columns. | |
| Args: | |
| df: Dataframe to validate | |
| Returns: | |
| Tuple of (is_valid, error_message) | |
| """ | |
| missing_cols = REQUIRED_COLUMNS - set(df.columns) | |
| if missing_cols: | |
| return False, f"Missing required columns: {', '.join(missing_cols)}" | |
| if df.empty: | |
| return False, "Dataframe is empty" | |
| return True, "Valid" | |
| def load_csv_from_folder(folder_path: str = None) -> Tuple[pd.DataFrame, str]: | |
| """ | |
| Loads all CSV files from the specified folder and combines them. | |
| Args: | |
| folder_path: Path to folder containing CSV files. If None, uses DATA_FOLDER from config. | |
| Returns: | |
| Tuple of (combined_dataframe, status_message) | |
| """ | |
| if folder_path is None: | |
| folder_path = DATA_FOLDER | |
| folder = Path(folder_path) | |
| if not folder.exists(): | |
| return pd.DataFrame(), f"Error: Data folder '{folder_path}' does not exist." | |
| if not folder.is_dir(): | |
| return pd.DataFrame(), f"Error: '{folder_path}' is not a directory." | |
| # Find all CSV files | |
| csv_files = list(folder.glob("*.csv")) | |
| if not csv_files: | |
| return pd.DataFrame(), f"Error: No CSV files found in '{folder_path}'." | |
| all_dfs = [] | |
| loaded_files = [] | |
| errors = [] | |
| for csv_file in csv_files: | |
| try: | |
| # Load raw CSV | |
| df_raw = pd.read_csv(csv_file, encoding='utf-8-sig') | |
| # Normalize column names and types | |
| df_clean = normalize_dataframe(df_raw) | |
| # Validate | |
| is_valid, error_msg = validate_dataframe(df_clean) | |
| if not is_valid: | |
| errors.append(f"{csv_file.name}: {error_msg}") | |
| continue | |
| all_dfs.append(df_clean) | |
| loaded_files.append(csv_file.name) | |
| except Exception as e: | |
| errors.append(f"{csv_file.name}: {str(e)}") | |
| if not all_dfs: | |
| error_summary = "\n".join(errors) if errors else "Unknown error" | |
| return pd.DataFrame(), f"Error: Failed to load any valid CSV files.\n{error_summary}" | |
| # Combine all dataframes | |
| final_df = pd.concat(all_dfs, ignore_index=True) | |
| # Build status message | |
| status_parts = [f"Successfully loaded {len(final_df)} test runs from {len(loaded_files)} file(s):"] | |
| status_parts.extend([f" • {fname}" for fname in loaded_files]) | |
| if errors: | |
| status_parts.append(f"\n{len(errors)} file(s) skipped due to errors:") | |
| status_parts.extend([f" • {err}" for err in errors]) | |
| # Add debug info if enabled | |
| if DEBUG and not final_df.empty: | |
| sample = final_df.iloc[0] | |
| debug_info = f"\nDEBUG (Row 1): Relevance={sample.get('rmse_relevance', 'N/A')}, F1={sample.get('f1_score', 'N/A')}, AUCROC={sample.get('aucroc', 'N/A')}" | |
| status_parts.append(debug_info) | |
| return final_df, "\n".join(status_parts) | |
| def get_available_datasets(df: pd.DataFrame) -> List[str]: | |
| """ | |
| Extracts unique dataset names from the dataframe. | |
| Args: | |
| df: Dataframe containing dataset_name column | |
| Returns: | |
| List of unique dataset names | |
| """ | |
| if df.empty or 'dataset_name' not in df.columns: | |
| return [] | |
| return sorted(df['dataset_name'].unique().tolist()) | |