Business_Intelligence_Dashboard / data_processor.py
yogesh882's picture
Upload 6 files
df86d3a verified
"""
Data processing module for the Business Intelligence Dashboard.
Handles data loading, cleaning, validation, filtering, and statistical analysis.
Works with ANY dataset - no hardcoded column names.
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
from utils import (
get_numeric_columns, get_categorical_columns, get_datetime_columns,
DATE_FORMATS
)
def load_data(file_path: str) -> Tuple[Optional[pd.DataFrame], str]:
"""
Load data from CSV or Excel file.
Args:
file_path: Path to the uploaded file
Returns:
Tuple of (DataFrame or None, status message)
"""
if file_path is None:
return None, "No file uploaded. Please upload a CSV or Excel file."
try:
# Determine file type and load accordingly
if file_path.endswith('.csv'):
# Try different encodings
for encoding in ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252']:
try:
df = pd.read_csv(file_path, encoding=encoding)
break
except UnicodeDecodeError:
continue
else:
df = pd.read_csv(file_path, encoding='utf-8', errors='ignore')
elif file_path.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_path)
else:
return None, "Unsupported file format. Please upload CSV or Excel files."
if df.empty:
return None, "The uploaded file is empty."
return df, f"βœ… Successfully loaded {len(df):,} rows and {len(df.columns)} columns."
except pd.errors.EmptyDataError:
return None, "The uploaded file is empty or has no valid data."
except pd.errors.ParserError as e:
return None, f"Error parsing file: {str(e)}"
except Exception as e:
return None, f"Error loading file: {str(e)}"
def clean_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, str]:
"""
Clean the DataFrame: handle missing values, duplicates, data types.
Args:
df: Raw DataFrame
Returns:
Tuple of (cleaned DataFrame, cleaning report)
"""
if df is None or df.empty:
return df, "No data to clean."
report = []
original_rows = len(df)
original_cols = len(df.columns)
# 1. Strip whitespace from column names
df.columns = df.columns.str.strip()
report.append("βœ… Stripped whitespace from column names")
# 2. Remove completely empty rows
empty_rows = df.isna().all(axis=1).sum()
if empty_rows > 0:
df = df.dropna(how='all')
report.append(f"βœ… Removed {empty_rows} completely empty rows")
# 3. Remove completely empty columns
empty_cols = df.isna().all(axis=0).sum()
if empty_cols > 0:
df = df.dropna(axis=1, how='all')
report.append(f"βœ… Removed {empty_cols} completely empty columns")
# 4. Remove duplicate rows
duplicates = df.duplicated().sum()
if duplicates > 0:
df = df.drop_duplicates()
report.append(f"βœ… Removed {duplicates} duplicate rows")
# 5. Strip whitespace from string columns
string_cols = df.select_dtypes(include=['object']).columns
for col in string_cols:
df[col] = df[col].astype(str).str.strip()
# Replace 'nan' strings with actual NaN
df[col] = df[col].replace('nan', np.nan)
if len(string_cols) > 0:
report.append(f"βœ… Cleaned whitespace in {len(string_cols)} text columns")
# 6. Try to convert date columns
date_converted = 0
for col in df.columns:
if df[col].dtype == 'object':
# Check if column name suggests it's a date
if any(date_word in col.lower() for date_word in ['date', 'time', 'created', 'updated', 'timestamp']):
try:
df[col] = pd.to_datetime(df[col], errors='coerce')
date_converted += 1
except Exception:
pass
if date_converted > 0:
report.append(f"βœ… Converted {date_converted} columns to datetime")
# 7. Try to convert numeric columns that are stored as strings
numeric_converted = 0
for col in df.select_dtypes(include=['object']).columns:
try:
# Remove currency symbols and commas
cleaned = df[col].astype(str).str.replace(r'[$£€,]', '', regex=True)
numeric_values = pd.to_numeric(cleaned, errors='coerce')
# If more than 80% converted successfully, keep the conversion
if numeric_values.notna().sum() / len(df) > 0.8:
df[col] = numeric_values
numeric_converted += 1
except Exception:
pass
if numeric_converted > 0:
report.append(f"βœ… Converted {numeric_converted} columns to numeric")
# Summary
final_rows = len(df)
final_cols = len(df.columns)
summary = f"""
## 🧹 Data Cleaning Complete!
### Summary
- **Original:** {original_rows:,} rows Γ— {original_cols} columns
- **Cleaned:** {final_rows:,} rows Γ— {final_cols} columns
- **Rows removed:** {original_rows - final_rows:,}
- **Columns removed:** {original_cols - final_cols}
### Actions Performed
""" + "\n".join(f"- {r}" for r in report)
return df, summary
def get_data_info(df: pd.DataFrame) -> Dict[str, Any]:
"""
Get comprehensive information about the DataFrame.
Args:
df: pandas DataFrame
Returns:
Dictionary containing data information
"""
if df is None or df.empty:
return {}
info = {
'rows': len(df),
'columns': len(df.columns),
'column_names': df.columns.tolist(),
'dtypes': df.dtypes.astype(str).to_dict(),
'memory_usage': df.memory_usage(deep=True).sum(),
'numeric_columns': get_numeric_columns(df),
'categorical_columns': get_categorical_columns(df),
'datetime_columns': get_datetime_columns(df),
'missing_values': df.isnull().sum().to_dict(),
'total_missing': df.isnull().sum().sum()
}
return info
def get_data_preview(df: pd.DataFrame, n_rows: int = 10, position: str = 'head') -> pd.DataFrame:
"""
Get a preview of the DataFrame.
Args:
df: pandas DataFrame
n_rows: Number of rows to show
position: 'head' for first rows, 'tail' for last rows
Returns:
Preview DataFrame
"""
if df is None or df.empty:
return pd.DataFrame()
if position == 'tail':
return df.tail(n_rows)
return df.head(n_rows)
def get_summary_statistics(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Generate summary statistics for numerical and categorical columns.
Args:
df: pandas DataFrame
Returns:
Tuple of (numerical_stats DataFrame, categorical_stats DataFrame)
"""
if df is None or df.empty:
return pd.DataFrame(), pd.DataFrame()
# Numerical statistics
numeric_cols = get_numeric_columns(df)
if numeric_cols:
num_stats = df[numeric_cols].describe().T
num_stats['missing'] = df[numeric_cols].isnull().sum()
num_stats['missing_%'] = (num_stats['missing'] / len(df) * 100).round(2)
num_stats = num_stats.round(2)
else:
num_stats = pd.DataFrame()
# Categorical statistics
cat_cols = get_categorical_columns(df)
if cat_cols:
cat_stats_list = []
for col in cat_cols:
stats = {
'column': col,
'unique_values': df[col].nunique(),
'most_common': df[col].mode().iloc[0] if not df[col].mode().empty else 'N/A',
'most_common_freq': df[col].value_counts().iloc[0] if not df[col].value_counts().empty else 0,
'missing': df[col].isnull().sum(),
'missing_%': round((df[col].isnull().sum() / len(df)) * 100, 2)
}
cat_stats_list.append(stats)
cat_stats = pd.DataFrame(cat_stats_list)
else:
cat_stats = pd.DataFrame()
return num_stats, cat_stats
def get_missing_value_report(df: pd.DataFrame) -> pd.DataFrame:
"""
Generate a detailed missing value report.
Args:
df: pandas DataFrame
Returns:
DataFrame with missing value information
"""
if df is None or df.empty:
return pd.DataFrame()
missing_data = []
for col in df.columns:
missing_count = df[col].isnull().sum()
missing_data.append({
'Column': col,
'Data Type': str(df[col].dtype),
'Missing Count': missing_count,
'Missing %': round((missing_count / len(df)) * 100, 2),
'Non-Missing Count': len(df) - missing_count
})
report = pd.DataFrame(missing_data)
report = report.sort_values('Missing Count', ascending=False)
return report
def get_correlation_matrix(df: pd.DataFrame) -> pd.DataFrame:
"""
Calculate correlation matrix for numerical columns.
Args:
df: pandas DataFrame
Returns:
Correlation matrix DataFrame
"""
if df is None or df.empty:
return pd.DataFrame()
numeric_cols = get_numeric_columns(df)
if len(numeric_cols) < 2:
return pd.DataFrame()
corr_matrix = df[numeric_cols].corr().round(3)
return corr_matrix
def filter_data_dynamic(
df: pd.DataFrame,
filters: Dict[str, Any]
) -> Tuple[pd.DataFrame, str]:
"""
Apply dynamic filters to the DataFrame based on provided filter dictionary.
Args:
df: pandas DataFrame to filter
filters: Dictionary with column names as keys and filter values
Format: {
'column_name': {
'type': 'range' | 'categorical' | 'date_range' | 'search',
'min': value, 'max': value, # for range
'values': [list], # for categorical
'start': date, 'end': date, # for date_range
'term': string # for search
}
}
Returns:
Tuple of (filtered DataFrame, filter summary message)
"""
if df is None or df.empty:
return pd.DataFrame(), "No data to filter."
filtered_df = df.copy()
filters_applied = []
for col, filter_config in filters.items():
if col not in filtered_df.columns:
continue
filter_type = filter_config.get('type', '')
if filter_type == 'range':
min_val = filter_config.get('min')
max_val = filter_config.get('max')
if min_val is not None:
filtered_df = filtered_df[filtered_df[col] >= min_val]
filters_applied.append(f"{col} >= {min_val}")
if max_val is not None:
filtered_df = filtered_df[filtered_df[col] <= max_val]
filters_applied.append(f"{col} <= {max_val}")
elif filter_type == 'categorical':
values = filter_config.get('values', [])
if values and len(values) > 0:
filtered_df = filtered_df[filtered_df[col].isin(values)]
filters_applied.append(f"{col} in {values}")
elif filter_type == 'date_range':
start = filter_config.get('start')
end = filter_config.get('end')
if start:
try:
start_date = pd.to_datetime(start)
filtered_df = filtered_df[filtered_df[col] >= start_date]
filters_applied.append(f"{col} >= {start}")
except Exception:
pass
if end:
try:
end_date = pd.to_datetime(end)
filtered_df = filtered_df[filtered_df[col] <= end_date]
filters_applied.append(f"{col} <= {end}")
except Exception:
pass
elif filter_type == 'search':
term = filter_config.get('term', '')
if term:
filtered_df = filtered_df[
filtered_df[col].fillna('').astype(str).str.lower().str.contains(term.lower(), regex=False)
]
filters_applied.append(f"{col} contains '{term}'")
# Create summary message
if filters_applied:
summary = f"βœ… Filters applied: {', '.join(filters_applied)}\n"
summary += f"πŸ“Š Results: {len(filtered_df):,} rows (from {len(df):,} original rows)"
else:
summary = f"ℹ️ No filters applied. Showing all {len(filtered_df):,} rows."
return filtered_df, summary
def get_column_unique_values(df: pd.DataFrame, column: str, max_values: int = 100) -> List[str]:
"""
Get unique values from a column for filter dropdowns.
Args:
df: pandas DataFrame
column: Column name
max_values: Maximum number of values to return
Returns:
List of unique values
"""
if df is None or column not in df.columns:
return []
unique_vals = df[column].dropna().unique()
# Sort and limit
try:
sorted_vals = sorted([str(v) for v in unique_vals])
except TypeError:
sorted_vals = [str(v) for v in unique_vals]
return sorted_vals[:max_values]
def aggregate_data(
df: pd.DataFrame,
group_by: str,
value_column: str,
agg_method: str = 'sum'
) -> pd.DataFrame:
"""
Aggregate data by a grouping column.
Args:
df: pandas DataFrame
group_by: Column to group by
value_column: Column to aggregate
agg_method: Aggregation method ('sum', 'mean', 'count', 'median', 'min', 'max')
Returns:
Aggregated DataFrame
"""
if df is None or df.empty:
return pd.DataFrame()
if group_by not in df.columns or value_column not in df.columns:
return pd.DataFrame()
try:
if agg_method == 'count':
result = df.groupby(group_by)[value_column].count().reset_index()
else:
result = df.groupby(group_by)[value_column].agg(agg_method).reset_index()
result.columns = [group_by, f'{value_column}_{agg_method}']
result = result.sort_values(result.columns[1], ascending=False)
return result
except Exception:
return pd.DataFrame()
def export_to_csv(df: pd.DataFrame, filename: str = "exported_data.csv") -> str:
"""
Export DataFrame to CSV and return the file path.
Args:
df: pandas DataFrame to export
filename: Output filename
Returns:
Path to the exported file
"""
if df is None or df.empty:
return None
try:
df.to_csv(filename, index=False)
return filename
except Exception as e:
return None