| | """ |
| | Data processing module for the Business Intelligence Dashboard. |
| | Handles data loading, cleaning, validation, and filtering operations. |
| | """ |
| |
|
| | import pandas as pd |
| | import numpy as np |
| | from typing import Dict, List, Tuple, Optional, Any |
| | from utils import get_column_types, detect_datetime_columns, validate_dataframe |
| |
|
| |
|
| | def load_data(file_path: str) -> Tuple[Optional[pd.DataFrame], str]: |
| | """ |
| | Load data from CSV or Excel file. |
| | |
| | Args: |
| | file_path: Path to the data file |
| | |
| | Returns: |
| | Tuple of (DataFrame, error_message). DataFrame is None if loading failed. |
| | """ |
| | try: |
| | if file_path.endswith('.csv'): |
| | df = pd.read_csv(file_path) |
| | elif file_path.endswith(('.xlsx', '.xls')): |
| | df = pd.read_excel(file_path) |
| | else: |
| | return None, "Unsupported file format. Please upload CSV or Excel files." |
| | |
| | |
| | is_valid, error_msg = validate_dataframe(df) |
| | if not is_valid: |
| | return None, f"Invalid data: {error_msg}" |
| | |
| | |
| | df = clean_numeric_columns(df) |
| | |
| | |
| | datetime_cols = detect_datetime_columns(df) |
| | for col in datetime_cols: |
| | try: |
| | df[col] = pd.to_datetime(df[col], errors='coerce') |
| | except Exception: |
| | pass |
| | |
| | return df, "" |
| | |
| | except Exception as e: |
| | return None, f"Error loading file: {str(e)}" |
| |
|
| |
|
| | def clean_numeric_columns(df: pd.DataFrame) -> pd.DataFrame: |
| | """ |
| | Clean columns that contain numeric values with commas or other formatting. |
| | Converts strings like '1,234', '50,000' to proper numeric types. |
| | |
| | Args: |
| | df: Input DataFrame |
| | |
| | Returns: |
| | DataFrame with cleaned numeric columns |
| | """ |
| | df_cleaned = df.copy() |
| | |
| | for col in df_cleaned.columns: |
| | |
| | if pd.api.types.is_numeric_dtype(df_cleaned[col]): |
| | continue |
| | |
| | |
| | if df_cleaned[col].dtype == 'object': |
| | |
| | sample = df_cleaned[col].dropna().head(100) |
| | |
| | if len(sample) > 0: |
| | |
| | sample_str = sample.astype(str) |
| | |
| | |
| | numeric_looking = sample_str.str.match(r'^-?[\d,]+\.?\d*$').sum() |
| | |
| | |
| | if numeric_looking > len(sample) * 0.5: |
| | try: |
| | |
| | df_cleaned[col] = df_cleaned[col].astype(str).str.replace(',', '').replace('--', '0') |
| | df_cleaned[col] = pd.to_numeric(df_cleaned[col], errors='coerce') |
| | except Exception: |
| | |
| | pass |
| | |
| | return df_cleaned |
| |
|
| |
|
| |
|
| | def get_data_summary(df: pd.DataFrame) -> Dict[str, Any]: |
| | """ |
| | Generate comprehensive summary statistics for the DataFrame. |
| | |
| | Args: |
| | df: Input DataFrame |
| | |
| | Returns: |
| | Dictionary containing summary statistics |
| | """ |
| | col_types = get_column_types(df) |
| | |
| | summary = { |
| | 'shape': df.shape, |
| | 'columns': df.columns.tolist(), |
| | 'dtypes': df.dtypes.to_dict(), |
| | 'column_types': col_types, |
| | 'missing_values': df.isnull().sum().to_dict(), |
| | 'duplicate_rows': df.duplicated().sum(), |
| | 'memory_usage': df.memory_usage(deep=True).sum() / 1024 / 1024, |
| | } |
| | |
| | |
| | if col_types['numerical']: |
| | numerical_stats = df[col_types['numerical']].describe().to_dict() |
| | summary['numerical_stats'] = numerical_stats |
| | |
| | |
| | if col_types['categorical']: |
| | categorical_stats = {} |
| | for col in col_types['categorical']: |
| | categorical_stats[col] = { |
| | 'unique_count': df[col].nunique(), |
| | 'top_values': df[col].value_counts().head(10).to_dict(), |
| | 'mode': df[col].mode().iloc[0] if len(df[col].mode()) > 0 else None |
| | } |
| | summary['categorical_stats'] = categorical_stats |
| | |
| | return summary |
| |
|
| |
|
| | def get_correlation_matrix(df: pd.DataFrame) -> Optional[pd.DataFrame]: |
| | """ |
| | Calculate correlation matrix for numerical columns. |
| | |
| | Args: |
| | df: Input DataFrame |
| | |
| | Returns: |
| | Correlation matrix DataFrame or None if no numerical columns |
| | """ |
| | col_types = get_column_types(df) |
| | |
| | if not col_types['numerical']: |
| | return None |
| | |
| | return df[col_types['numerical']].corr() |
| |
|
| |
|
| | def apply_filters(df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame: |
| | """ |
| | Apply filters to the DataFrame based on user selections. |
| | |
| | Args: |
| | df: Input DataFrame |
| | filters: Dictionary of filter specifications |
| | Format: { |
| | 'column_name': { |
| | 'type': 'numerical' | 'categorical' | 'datetime', |
| | 'min': value, # for numerical |
| | 'max': value, # for numerical |
| | 'values': [list], # for categorical |
| | 'start_date': value, # for datetime |
| | 'end_date': value # for datetime |
| | } |
| | } |
| | |
| | Returns: |
| | Filtered DataFrame |
| | """ |
| | filtered_df = df.copy() |
| | |
| | for column, filter_spec in filters.items(): |
| | if column not in filtered_df.columns: |
| | continue |
| | |
| | filter_type = filter_spec.get('type') |
| | |
| | if filter_type == 'numerical': |
| | min_val = filter_spec.get('min') |
| | max_val = filter_spec.get('max') |
| | |
| | if min_val is not None: |
| | filtered_df = filtered_df[filtered_df[column] >= min_val] |
| | if max_val is not None: |
| | filtered_df = filtered_df[filtered_df[column] <= max_val] |
| | |
| | elif filter_type == 'categorical': |
| | values = filter_spec.get('values', []) |
| | if values: |
| | filtered_df = filtered_df[filtered_df[column].isin(values)] |
| | |
| | elif filter_type == 'datetime': |
| | start_date = filter_spec.get('start_date') |
| | end_date = filter_spec.get('end_date') |
| | |
| | if start_date is not None: |
| | filtered_df = filtered_df[filtered_df[column] >= pd.to_datetime(start_date)] |
| | if end_date is not None: |
| | filtered_df = filtered_df[filtered_df[column] <= pd.to_datetime(end_date)] |
| | |
| | return filtered_df |
| |
|
| |
|
| | def clean_data(df: pd.DataFrame, |
| | drop_duplicates: bool = False, |
| | fill_numerical: Optional[str] = None, |
| | fill_categorical: Optional[str] = None) -> pd.DataFrame: |
| | """ |
| | Clean the DataFrame by handling missing values and duplicates. |
| | |
| | Args: |
| | df: Input DataFrame |
| | drop_duplicates: Whether to drop duplicate rows |
| | fill_numerical: Strategy for filling numerical NaNs ('mean', 'median', 'zero', or None) |
| | fill_categorical: Strategy for filling categorical NaNs ('mode', 'unknown', or None) |
| | |
| | Returns: |
| | Cleaned DataFrame |
| | """ |
| | cleaned_df = df.copy() |
| | col_types = get_column_types(cleaned_df) |
| | |
| | |
| | if drop_duplicates: |
| | cleaned_df = cleaned_df.drop_duplicates() |
| | |
| | |
| | if fill_numerical and col_types['numerical']: |
| | for col in col_types['numerical']: |
| | if fill_numerical == 'mean': |
| | cleaned_df[col].fillna(cleaned_df[col].mean(), inplace=True) |
| | elif fill_numerical == 'median': |
| | cleaned_df[col].fillna(cleaned_df[col].median(), inplace=True) |
| | elif fill_numerical == 'zero': |
| | cleaned_df[col].fillna(0, inplace=True) |
| | |
| | |
| | if fill_categorical and col_types['categorical']: |
| | for col in col_types['categorical']: |
| | if fill_categorical == 'mode': |
| | mode_val = cleaned_df[col].mode() |
| | if len(mode_val) > 0: |
| | cleaned_df[col].fillna(mode_val.iloc[0], inplace=True) |
| | elif fill_categorical == 'unknown': |
| | cleaned_df[col].fillna('Unknown', inplace=True) |
| | |
| | return cleaned_df |
| |
|
| |
|
| | def aggregate_data(df: pd.DataFrame, |
| | group_by: str, |
| | value_column: str, |
| | agg_method: str = 'sum') -> pd.DataFrame: |
| | """ |
| | Aggregate data by grouping and applying an aggregation method. |
| | |
| | Args: |
| | df: Input DataFrame |
| | group_by: Column to group by |
| | value_column: Column to aggregate |
| | agg_method: Aggregation method ('sum', 'mean', 'count', 'median', 'min', 'max') |
| | |
| | Returns: |
| | Aggregated DataFrame |
| | """ |
| | if group_by not in df.columns or value_column not in df.columns: |
| | return pd.DataFrame() |
| | |
| | agg_methods = { |
| | 'sum': 'sum', |
| | 'mean': 'mean', |
| | 'count': 'count', |
| | 'median': 'median', |
| | 'min': 'min', |
| | 'max': 'max' |
| | } |
| | |
| | method = agg_methods.get(agg_method, 'sum') |
| | |
| | try: |
| | aggregated = df.groupby(group_by)[value_column].agg(method).reset_index() |
| | aggregated.columns = [group_by, f'{value_column}_{method}'] |
| | return aggregated |
| | except Exception: |
| | return pd.DataFrame() |
| |
|
| |
|
| | def get_data_preview(df: pd.DataFrame, n_rows: int = 10, head: bool = True) -> pd.DataFrame: |
| | """ |
| | Get a preview of the DataFrame. |
| | |
| | Args: |
| | df: Input DataFrame |
| | n_rows: Number of rows to return |
| | head: If True, return first n rows; if False, return last n rows |
| | |
| | Returns: |
| | Preview DataFrame |
| | """ |
| | if head: |
| | return df.head(n_rows) |
| | else: |
| | return df.tail(n_rows) |
| |
|