rag12-analytics / data_loader.py
npuliga's picture
added files
a657e9e
"""
Data loading and processing module for RAG Analytics
"""
import pandas as pd
import os
from pathlib import Path
from typing import Tuple, List
from config import DATA_FOLDER, COLUMN_MAP, METRIC_COLUMNS, NUMERIC_CONFIG_COLUMNS, REQUIRED_COLUMNS, DEBUG
def normalize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""
1. Renames columns by stripping special chars (spaces, =, -).
2. Forces metric columns to numeric (floats).
3. Retains all data without schema validation dropping rows.
Args:
df: Raw dataframe loaded from CSV
Returns:
Normalized dataframe with standardized column names and types
"""
rename_dict = {}
for col in df.columns:
# Aggressive clean: "RMSE=trace relevance" -> "rmsetracerelevance"
# Remove spaces, underscores, hyphens, equals signs
clean_col = "".join(ch for ch in str(col).lower() if ch.isalnum())
if clean_col in COLUMN_MAP:
rename_dict[col] = COLUMN_MAP[clean_col]
df = df.rename(columns=rename_dict)
# Force ALL metric columns to float64 (Coerce errors to NaN then 0.0)
# This ensures "Empty" strings or invalid values don't crash the graph
# Using astype(float) explicitly ensures floating-point display
for metric in METRIC_COLUMNS:
if metric in df.columns:
df[metric] = pd.to_numeric(df[metric], errors='coerce').fillna(0.0).astype(float)
# Force ALL numeric configuration columns to float64
# This prevents integers like "256" from displaying as integers in graphs
for config_col in NUMERIC_CONFIG_COLUMNS:
if config_col in df.columns:
# Convert to numeric, but preserve N/A as NaN (don't fill)
df[config_col] = pd.to_numeric(df[config_col], errors='coerce').astype(float)
return df
def validate_dataframe(df: pd.DataFrame) -> Tuple[bool, str]:
"""
Validates that the dataframe has required columns.
Args:
df: Dataframe to validate
Returns:
Tuple of (is_valid, error_message)
"""
missing_cols = REQUIRED_COLUMNS - set(df.columns)
if missing_cols:
return False, f"Missing required columns: {', '.join(missing_cols)}"
if df.empty:
return False, "Dataframe is empty"
return True, "Valid"
def load_csv_from_folder(folder_path: str = None) -> Tuple[pd.DataFrame, str]:
"""
Loads all CSV files from the specified folder and combines them.
Args:
folder_path: Path to folder containing CSV files. If None, uses DATA_FOLDER from config.
Returns:
Tuple of (combined_dataframe, status_message)
"""
if folder_path is None:
folder_path = DATA_FOLDER
folder = Path(folder_path)
if not folder.exists():
return pd.DataFrame(), f"Error: Data folder '{folder_path}' does not exist."
if not folder.is_dir():
return pd.DataFrame(), f"Error: '{folder_path}' is not a directory."
# Find all CSV files
csv_files = list(folder.glob("*.csv"))
if not csv_files:
return pd.DataFrame(), f"Error: No CSV files found in '{folder_path}'."
all_dfs = []
loaded_files = []
errors = []
for csv_file in csv_files:
try:
# Load raw CSV
df_raw = pd.read_csv(csv_file, encoding='utf-8-sig')
# Normalize column names and types
df_clean = normalize_dataframe(df_raw)
# Validate
is_valid, error_msg = validate_dataframe(df_clean)
if not is_valid:
errors.append(f"{csv_file.name}: {error_msg}")
continue
all_dfs.append(df_clean)
loaded_files.append(csv_file.name)
except Exception as e:
errors.append(f"{csv_file.name}: {str(e)}")
if not all_dfs:
error_summary = "\n".join(errors) if errors else "Unknown error"
return pd.DataFrame(), f"Error: Failed to load any valid CSV files.\n{error_summary}"
# Combine all dataframes
final_df = pd.concat(all_dfs, ignore_index=True)
# Build status message
status_parts = [f"Successfully loaded {len(final_df)} test runs from {len(loaded_files)} file(s):"]
status_parts.extend([f" • {fname}" for fname in loaded_files])
if errors:
status_parts.append(f"\n{len(errors)} file(s) skipped due to errors:")
status_parts.extend([f" • {err}" for err in errors])
# Add debug info if enabled
if DEBUG and not final_df.empty:
sample = final_df.iloc[0]
debug_info = f"\nDEBUG (Row 1): Relevance={sample.get('rmse_relevance', 'N/A')}, F1={sample.get('f1_score', 'N/A')}, AUCROC={sample.get('aucroc', 'N/A')}"
status_parts.append(debug_info)
return final_df, "\n".join(status_parts)
def get_available_datasets(df: pd.DataFrame) -> List[str]:
"""
Extracts unique dataset names from the dataframe.
Args:
df: Dataframe containing dataset_name column
Returns:
List of unique dataset names
"""
if df.empty or 'dataset_name' not in df.columns:
return []
return sorted(df['dataset_name'].unique().tolist())