""" Data processing module for the Business Intelligence Dashboard. Handles data loading, cleaning, validation, filtering, and statistical analysis. Works with ANY dataset - no hardcoded column names. """ import pandas as pd import numpy as np from typing import Dict, List, Any, Optional, Tuple from utils import ( get_numeric_columns, get_categorical_columns, get_datetime_columns, DATE_FORMATS ) def load_data(file_path: str) -> Tuple[Optional[pd.DataFrame], str]: """ Load data from CSV or Excel file. Args: file_path: Path to the uploaded file Returns: Tuple of (DataFrame or None, status message) """ if file_path is None: return None, "No file uploaded. Please upload a CSV or Excel file." try: # Determine file type and load accordingly if file_path.endswith('.csv'): # Try different encodings for encoding in ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252']: try: df = pd.read_csv(file_path, encoding=encoding) break except UnicodeDecodeError: continue else: df = pd.read_csv(file_path, encoding='utf-8', errors='ignore') elif file_path.endswith(('.xlsx', '.xls')): df = pd.read_excel(file_path) else: return None, "Unsupported file format. Please upload CSV or Excel files." if df.empty: return None, "The uploaded file is empty." return df, f"✅ Successfully loaded {len(df):,} rows and {len(df.columns)} columns." except pd.errors.EmptyDataError: return None, "The uploaded file is empty or has no valid data." except pd.errors.ParserError as e: return None, f"Error parsing file: {str(e)}" except Exception as e: return None, f"Error loading file: {str(e)}" def clean_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, str]: """ Clean the DataFrame: handle missing values, duplicates, data types. Args: df: Raw DataFrame Returns: Tuple of (cleaned DataFrame, cleaning report) """ if df is None or df.empty: return df, "No data to clean." report = [] original_rows = len(df) original_cols = len(df.columns) # 1. Strip whitespace from column names df.columns = df.columns.str.strip() report.append("✅ Stripped whitespace from column names") # 2. Remove completely empty rows empty_rows = df.isna().all(axis=1).sum() if empty_rows > 0: df = df.dropna(how='all') report.append(f"✅ Removed {empty_rows} completely empty rows") # 3. Remove completely empty columns empty_cols = df.isna().all(axis=0).sum() if empty_cols > 0: df = df.dropna(axis=1, how='all') report.append(f"✅ Removed {empty_cols} completely empty columns") # 4. Remove duplicate rows duplicates = df.duplicated().sum() if duplicates > 0: df = df.drop_duplicates() report.append(f"✅ Removed {duplicates} duplicate rows") # 5. Strip whitespace from string columns string_cols = df.select_dtypes(include=['object']).columns for col in string_cols: df[col] = df[col].astype(str).str.strip() # Replace 'nan' strings with actual NaN df[col] = df[col].replace('nan', np.nan) if len(string_cols) > 0: report.append(f"✅ Cleaned whitespace in {len(string_cols)} text columns") # 6. Try to convert date columns date_converted = 0 for col in df.columns: if df[col].dtype == 'object': # Check if column name suggests it's a date if any(date_word in col.lower() for date_word in ['date', 'time', 'created', 'updated', 'timestamp']): try: df[col] = pd.to_datetime(df[col], errors='coerce') date_converted += 1 except Exception: pass if date_converted > 0: report.append(f"✅ Converted {date_converted} columns to datetime") # 7. Try to convert numeric columns that are stored as strings numeric_converted = 0 for col in df.select_dtypes(include=['object']).columns: try: # Remove currency symbols and commas cleaned = df[col].astype(str).str.replace(r'[$£€,]', '', regex=True) numeric_values = pd.to_numeric(cleaned, errors='coerce') # If more than 80% converted successfully, keep the conversion if numeric_values.notna().sum() / len(df) > 0.8: df[col] = numeric_values numeric_converted += 1 except Exception: pass if numeric_converted > 0: report.append(f"✅ Converted {numeric_converted} columns to numeric") # Summary final_rows = len(df) final_cols = len(df.columns) summary = f""" ## 🧹 Data Cleaning Complete! ### Summary - **Original:** {original_rows:,} rows × {original_cols} columns - **Cleaned:** {final_rows:,} rows × {final_cols} columns - **Rows removed:** {original_rows - final_rows:,} - **Columns removed:** {original_cols - final_cols} ### Actions Performed """ + "\n".join(f"- {r}" for r in report) return df, summary def get_data_info(df: pd.DataFrame) -> Dict[str, Any]: """ Get comprehensive information about the DataFrame. Args: df: pandas DataFrame Returns: Dictionary containing data information """ if df is None or df.empty: return {} info = { 'rows': len(df), 'columns': len(df.columns), 'column_names': df.columns.tolist(), 'dtypes': df.dtypes.astype(str).to_dict(), 'memory_usage': df.memory_usage(deep=True).sum(), 'numeric_columns': get_numeric_columns(df), 'categorical_columns': get_categorical_columns(df), 'datetime_columns': get_datetime_columns(df), 'missing_values': df.isnull().sum().to_dict(), 'total_missing': df.isnull().sum().sum() } return info def get_data_preview(df: pd.DataFrame, n_rows: int = 10, position: str = 'head') -> pd.DataFrame: """ Get a preview of the DataFrame. Args: df: pandas DataFrame n_rows: Number of rows to show position: 'head' for first rows, 'tail' for last rows Returns: Preview DataFrame """ if df is None or df.empty: return pd.DataFrame() if position == 'tail': return df.tail(n_rows) return df.head(n_rows) def get_summary_statistics(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Generate summary statistics for numerical and categorical columns. Args: df: pandas DataFrame Returns: Tuple of (numerical_stats DataFrame, categorical_stats DataFrame) """ if df is None or df.empty: return pd.DataFrame(), pd.DataFrame() # Numerical statistics numeric_cols = get_numeric_columns(df) if numeric_cols: num_stats = df[numeric_cols].describe().T num_stats['missing'] = df[numeric_cols].isnull().sum() num_stats['missing_%'] = (num_stats['missing'] / len(df) * 100).round(2) num_stats = num_stats.round(2) else: num_stats = pd.DataFrame() # Categorical statistics cat_cols = get_categorical_columns(df) if cat_cols: cat_stats_list = [] for col in cat_cols: stats = { 'column': col, 'unique_values': df[col].nunique(), 'most_common': df[col].mode().iloc[0] if not df[col].mode().empty else 'N/A', 'most_common_freq': df[col].value_counts().iloc[0] if not df[col].value_counts().empty else 0, 'missing': df[col].isnull().sum(), 'missing_%': round((df[col].isnull().sum() / len(df)) * 100, 2) } cat_stats_list.append(stats) cat_stats = pd.DataFrame(cat_stats_list) else: cat_stats = pd.DataFrame() return num_stats, cat_stats def get_missing_value_report(df: pd.DataFrame) -> pd.DataFrame: """ Generate a detailed missing value report. Args: df: pandas DataFrame Returns: DataFrame with missing value information """ if df is None or df.empty: return pd.DataFrame() missing_data = [] for col in df.columns: missing_count = df[col].isnull().sum() missing_data.append({ 'Column': col, 'Data Type': str(df[col].dtype), 'Missing Count': missing_count, 'Missing %': round((missing_count / len(df)) * 100, 2), 'Non-Missing Count': len(df) - missing_count }) report = pd.DataFrame(missing_data) report = report.sort_values('Missing Count', ascending=False) return report def get_correlation_matrix(df: pd.DataFrame) -> pd.DataFrame: """ Calculate correlation matrix for numerical columns. Args: df: pandas DataFrame Returns: Correlation matrix DataFrame """ if df is None or df.empty: return pd.DataFrame() numeric_cols = get_numeric_columns(df) if len(numeric_cols) < 2: return pd.DataFrame() corr_matrix = df[numeric_cols].corr().round(3) return corr_matrix def filter_data_dynamic( df: pd.DataFrame, filters: Dict[str, Any] ) -> Tuple[pd.DataFrame, str]: """ Apply dynamic filters to the DataFrame based on provided filter dictionary. Args: df: pandas DataFrame to filter filters: Dictionary with column names as keys and filter values Format: { 'column_name': { 'type': 'range' | 'categorical' | 'date_range' | 'search', 'min': value, 'max': value, # for range 'values': [list], # for categorical 'start': date, 'end': date, # for date_range 'term': string # for search } } Returns: Tuple of (filtered DataFrame, filter summary message) """ if df is None or df.empty: return pd.DataFrame(), "No data to filter." filtered_df = df.copy() filters_applied = [] for col, filter_config in filters.items(): if col not in filtered_df.columns: continue filter_type = filter_config.get('type', '') if filter_type == 'range': min_val = filter_config.get('min') max_val = filter_config.get('max') if min_val is not None: filtered_df = filtered_df[filtered_df[col] >= min_val] filters_applied.append(f"{col} >= {min_val}") if max_val is not None: filtered_df = filtered_df[filtered_df[col] <= max_val] filters_applied.append(f"{col} <= {max_val}") elif filter_type == 'categorical': values = filter_config.get('values', []) if values and len(values) > 0: filtered_df = filtered_df[filtered_df[col].isin(values)] filters_applied.append(f"{col} in {values}") elif filter_type == 'date_range': start = filter_config.get('start') end = filter_config.get('end') if start: try: start_date = pd.to_datetime(start) filtered_df = filtered_df[filtered_df[col] >= start_date] filters_applied.append(f"{col} >= {start}") except Exception: pass if end: try: end_date = pd.to_datetime(end) filtered_df = filtered_df[filtered_df[col] <= end_date] filters_applied.append(f"{col} <= {end}") except Exception: pass elif filter_type == 'search': term = filter_config.get('term', '') if term: filtered_df = filtered_df[ filtered_df[col].fillna('').astype(str).str.lower().str.contains(term.lower(), regex=False) ] filters_applied.append(f"{col} contains '{term}'") # Create summary message if filters_applied: summary = f"✅ Filters applied: {', '.join(filters_applied)}\n" summary += f"📊 Results: {len(filtered_df):,} rows (from {len(df):,} original rows)" else: summary = f"ℹ️ No filters applied. Showing all {len(filtered_df):,} rows." return filtered_df, summary def get_column_unique_values(df: pd.DataFrame, column: str, max_values: int = 100) -> List[str]: """ Get unique values from a column for filter dropdowns. Args: df: pandas DataFrame column: Column name max_values: Maximum number of values to return Returns: List of unique values """ if df is None or column not in df.columns: return [] unique_vals = df[column].dropna().unique() # Sort and limit try: sorted_vals = sorted([str(v) for v in unique_vals]) except TypeError: sorted_vals = [str(v) for v in unique_vals] return sorted_vals[:max_values] def aggregate_data( df: pd.DataFrame, group_by: str, value_column: str, agg_method: str = 'sum' ) -> pd.DataFrame: """ Aggregate data by a grouping column. Args: df: pandas DataFrame group_by: Column to group by value_column: Column to aggregate agg_method: Aggregation method ('sum', 'mean', 'count', 'median', 'min', 'max') Returns: Aggregated DataFrame """ if df is None or df.empty: return pd.DataFrame() if group_by not in df.columns or value_column not in df.columns: return pd.DataFrame() try: if agg_method == 'count': result = df.groupby(group_by)[value_column].count().reset_index() else: result = df.groupby(group_by)[value_column].agg(agg_method).reset_index() result.columns = [group_by, f'{value_column}_{agg_method}'] result = result.sort_values(result.columns[1], ascending=False) return result except Exception: return pd.DataFrame() def export_to_csv(df: pd.DataFrame, filename: str = "exported_data.csv") -> str: """ Export DataFrame to CSV and return the file path. Args: df: pandas DataFrame to export filename: Output filename Returns: Path to the exported file """ if df is None or df.empty: return None try: df.to_csv(filename, index=False) return filename except Exception as e: return None