Business_Intelligence_Dashboard / data_processor.py
Saumith devarsetty
Initial deployment
3a1de6f
"""
Data processing module for the Business Intelligence Dashboard.
Handles data loading, cleaning, validation, and filtering operations.
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from utils import get_column_types, detect_datetime_columns, validate_dataframe
def load_data(file_path: str) -> Tuple[Optional[pd.DataFrame], str]:
"""
Load data from CSV or Excel file.
Args:
file_path: Path to the data file
Returns:
Tuple of (DataFrame, error_message). DataFrame is None if loading failed.
"""
try:
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
elif file_path.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_path)
else:
return None, "Unsupported file format. Please upload CSV or Excel files."
# Validate the loaded DataFrame
is_valid, error_msg = validate_dataframe(df)
if not is_valid:
return None, f"Invalid data: {error_msg}"
# Clean numeric columns with commas (e.g., "1,234" -> 1234)
df = clean_numeric_columns(df)
# Auto-detect and convert datetime columns
datetime_cols = detect_datetime_columns(df)
for col in datetime_cols:
try:
df[col] = pd.to_datetime(df[col], errors='coerce')
except Exception:
pass
return df, ""
except Exception as e:
return None, f"Error loading file: {str(e)}"
def clean_numeric_columns(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean columns that contain numeric values with commas or other formatting.
Converts strings like '1,234', '50,000' to proper numeric types.
Args:
df: Input DataFrame
Returns:
DataFrame with cleaned numeric columns
"""
df_cleaned = df.copy()
for col in df_cleaned.columns:
# Skip if already numeric
if pd.api.types.is_numeric_dtype(df_cleaned[col]):
continue
# Check if column contains string values that look like numbers
if df_cleaned[col].dtype == 'object':
# Get a sample of non-null values
sample = df_cleaned[col].dropna().head(100)
if len(sample) > 0:
# Check if values contain commas and look numeric
sample_str = sample.astype(str)
# Count how many values look like comma-separated numbers
numeric_looking = sample_str.str.match(r'^-?[\d,]+\.?\d*$').sum()
# If more than 50% look like numbers with commas, try to convert
if numeric_looking > len(sample) * 0.5:
try:
# Remove commas and convert to numeric
df_cleaned[col] = df_cleaned[col].astype(str).str.replace(',', '').replace('--', '0')
df_cleaned[col] = pd.to_numeric(df_cleaned[col], errors='coerce')
except Exception:
# If conversion fails, keep original
pass
return df_cleaned
def get_data_summary(df: pd.DataFrame) -> Dict[str, Any]:
"""
Generate comprehensive summary statistics for the DataFrame.
Args:
df: Input DataFrame
Returns:
Dictionary containing summary statistics
"""
col_types = get_column_types(df)
summary = {
'shape': df.shape,
'columns': df.columns.tolist(),
'dtypes': df.dtypes.to_dict(),
'column_types': col_types,
'missing_values': df.isnull().sum().to_dict(),
'duplicate_rows': df.duplicated().sum(),
'memory_usage': df.memory_usage(deep=True).sum() / 1024 / 1024, # MB
}
# Numerical statistics
if col_types['numerical']:
numerical_stats = df[col_types['numerical']].describe().to_dict()
summary['numerical_stats'] = numerical_stats
# Categorical statistics
if col_types['categorical']:
categorical_stats = {}
for col in col_types['categorical']:
categorical_stats[col] = {
'unique_count': df[col].nunique(),
'top_values': df[col].value_counts().head(10).to_dict(),
'mode': df[col].mode().iloc[0] if len(df[col].mode()) > 0 else None
}
summary['categorical_stats'] = categorical_stats
return summary
def get_correlation_matrix(df: pd.DataFrame) -> Optional[pd.DataFrame]:
"""
Calculate correlation matrix for numerical columns.
Args:
df: Input DataFrame
Returns:
Correlation matrix DataFrame or None if no numerical columns
"""
col_types = get_column_types(df)
if not col_types['numerical']:
return None
return df[col_types['numerical']].corr()
def apply_filters(df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame:
"""
Apply filters to the DataFrame based on user selections.
Args:
df: Input DataFrame
filters: Dictionary of filter specifications
Format: {
'column_name': {
'type': 'numerical' | 'categorical' | 'datetime',
'min': value, # for numerical
'max': value, # for numerical
'values': [list], # for categorical
'start_date': value, # for datetime
'end_date': value # for datetime
}
}
Returns:
Filtered DataFrame
"""
filtered_df = df.copy()
for column, filter_spec in filters.items():
if column not in filtered_df.columns:
continue
filter_type = filter_spec.get('type')
if filter_type == 'numerical':
min_val = filter_spec.get('min')
max_val = filter_spec.get('max')
if min_val is not None:
filtered_df = filtered_df[filtered_df[column] >= min_val]
if max_val is not None:
filtered_df = filtered_df[filtered_df[column] <= max_val]
elif filter_type == 'categorical':
values = filter_spec.get('values', [])
if values:
filtered_df = filtered_df[filtered_df[column].isin(values)]
elif filter_type == 'datetime':
start_date = filter_spec.get('start_date')
end_date = filter_spec.get('end_date')
if start_date is not None:
filtered_df = filtered_df[filtered_df[column] >= pd.to_datetime(start_date)]
if end_date is not None:
filtered_df = filtered_df[filtered_df[column] <= pd.to_datetime(end_date)]
return filtered_df
def clean_data(df: pd.DataFrame,
drop_duplicates: bool = False,
fill_numerical: Optional[str] = None,
fill_categorical: Optional[str] = None) -> pd.DataFrame:
"""
Clean the DataFrame by handling missing values and duplicates.
Args:
df: Input DataFrame
drop_duplicates: Whether to drop duplicate rows
fill_numerical: Strategy for filling numerical NaNs ('mean', 'median', 'zero', or None)
fill_categorical: Strategy for filling categorical NaNs ('mode', 'unknown', or None)
Returns:
Cleaned DataFrame
"""
cleaned_df = df.copy()
col_types = get_column_types(cleaned_df)
# Handle duplicates
if drop_duplicates:
cleaned_df = cleaned_df.drop_duplicates()
# Handle missing values in numerical columns
if fill_numerical and col_types['numerical']:
for col in col_types['numerical']:
if fill_numerical == 'mean':
cleaned_df[col].fillna(cleaned_df[col].mean(), inplace=True)
elif fill_numerical == 'median':
cleaned_df[col].fillna(cleaned_df[col].median(), inplace=True)
elif fill_numerical == 'zero':
cleaned_df[col].fillna(0, inplace=True)
# Handle missing values in categorical columns
if fill_categorical and col_types['categorical']:
for col in col_types['categorical']:
if fill_categorical == 'mode':
mode_val = cleaned_df[col].mode()
if len(mode_val) > 0:
cleaned_df[col].fillna(mode_val.iloc[0], inplace=True)
elif fill_categorical == 'unknown':
cleaned_df[col].fillna('Unknown', inplace=True)
return cleaned_df
def aggregate_data(df: pd.DataFrame,
group_by: str,
value_column: str,
agg_method: str = 'sum') -> pd.DataFrame:
"""
Aggregate data by grouping and applying an aggregation method.
Args:
df: Input DataFrame
group_by: Column to group by
value_column: Column to aggregate
agg_method: Aggregation method ('sum', 'mean', 'count', 'median', 'min', 'max')
Returns:
Aggregated DataFrame
"""
if group_by not in df.columns or value_column not in df.columns:
return pd.DataFrame()
agg_methods = {
'sum': 'sum',
'mean': 'mean',
'count': 'count',
'median': 'median',
'min': 'min',
'max': 'max'
}
method = agg_methods.get(agg_method, 'sum')
try:
aggregated = df.groupby(group_by)[value_column].agg(method).reset_index()
aggregated.columns = [group_by, f'{value_column}_{method}']
return aggregated
except Exception:
return pd.DataFrame()
def get_data_preview(df: pd.DataFrame, n_rows: int = 10, head: bool = True) -> pd.DataFrame:
"""
Get a preview of the DataFrame.
Args:
df: Input DataFrame
n_rows: Number of rows to return
head: If True, return first n rows; if False, return last n rows
Returns:
Preview DataFrame
"""
if head:
return df.head(n_rows)
else:
return df.tail(n_rows)