|
|
""" |
|
|
Data processing module for the Business Intelligence Dashboard. |
|
|
Handles data loading, cleaning, validation, filtering, and statistical analysis. |
|
|
Works with ANY dataset - no hardcoded column names. |
|
|
""" |
|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from typing import Dict, List, Any, Optional, Tuple |
|
|
from utils import ( |
|
|
get_numeric_columns, get_categorical_columns, get_datetime_columns, |
|
|
DATE_FORMATS |
|
|
) |
|
|
|
|
|
|
|
|
def load_data(file_path: str) -> Tuple[Optional[pd.DataFrame], str]: |
|
|
""" |
|
|
Load data from CSV or Excel file. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the uploaded file |
|
|
|
|
|
Returns: |
|
|
Tuple of (DataFrame or None, status message) |
|
|
""" |
|
|
if file_path is None: |
|
|
return None, "No file uploaded. Please upload a CSV or Excel file." |
|
|
|
|
|
try: |
|
|
|
|
|
if file_path.endswith('.csv'): |
|
|
|
|
|
for encoding in ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252']: |
|
|
try: |
|
|
df = pd.read_csv(file_path, encoding=encoding) |
|
|
break |
|
|
except UnicodeDecodeError: |
|
|
continue |
|
|
else: |
|
|
df = pd.read_csv(file_path, encoding='utf-8', errors='ignore') |
|
|
elif file_path.endswith(('.xlsx', '.xls')): |
|
|
df = pd.read_excel(file_path) |
|
|
else: |
|
|
return None, "Unsupported file format. Please upload CSV or Excel files." |
|
|
|
|
|
if df.empty: |
|
|
return None, "The uploaded file is empty." |
|
|
|
|
|
return df, f"β
Successfully loaded {len(df):,} rows and {len(df.columns)} columns." |
|
|
|
|
|
except pd.errors.EmptyDataError: |
|
|
return None, "The uploaded file is empty or has no valid data." |
|
|
except pd.errors.ParserError as e: |
|
|
return None, f"Error parsing file: {str(e)}" |
|
|
except Exception as e: |
|
|
return None, f"Error loading file: {str(e)}" |
|
|
|
|
|
|
|
|
def clean_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, str]: |
|
|
""" |
|
|
Clean the DataFrame: handle missing values, duplicates, data types. |
|
|
|
|
|
Args: |
|
|
df: Raw DataFrame |
|
|
|
|
|
Returns: |
|
|
Tuple of (cleaned DataFrame, cleaning report) |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return df, "No data to clean." |
|
|
|
|
|
report = [] |
|
|
original_rows = len(df) |
|
|
original_cols = len(df.columns) |
|
|
|
|
|
|
|
|
df.columns = df.columns.str.strip() |
|
|
report.append("β
Stripped whitespace from column names") |
|
|
|
|
|
|
|
|
empty_rows = df.isna().all(axis=1).sum() |
|
|
if empty_rows > 0: |
|
|
df = df.dropna(how='all') |
|
|
report.append(f"β
Removed {empty_rows} completely empty rows") |
|
|
|
|
|
|
|
|
empty_cols = df.isna().all(axis=0).sum() |
|
|
if empty_cols > 0: |
|
|
df = df.dropna(axis=1, how='all') |
|
|
report.append(f"β
Removed {empty_cols} completely empty columns") |
|
|
|
|
|
|
|
|
duplicates = df.duplicated().sum() |
|
|
if duplicates > 0: |
|
|
df = df.drop_duplicates() |
|
|
report.append(f"β
Removed {duplicates} duplicate rows") |
|
|
|
|
|
|
|
|
string_cols = df.select_dtypes(include=['object']).columns |
|
|
for col in string_cols: |
|
|
df[col] = df[col].astype(str).str.strip() |
|
|
|
|
|
df[col] = df[col].replace('nan', np.nan) |
|
|
if len(string_cols) > 0: |
|
|
report.append(f"β
Cleaned whitespace in {len(string_cols)} text columns") |
|
|
|
|
|
|
|
|
date_converted = 0 |
|
|
for col in df.columns: |
|
|
if df[col].dtype == 'object': |
|
|
|
|
|
if any(date_word in col.lower() for date_word in ['date', 'time', 'created', 'updated', 'timestamp']): |
|
|
try: |
|
|
df[col] = pd.to_datetime(df[col], errors='coerce') |
|
|
date_converted += 1 |
|
|
except Exception: |
|
|
pass |
|
|
if date_converted > 0: |
|
|
report.append(f"β
Converted {date_converted} columns to datetime") |
|
|
|
|
|
|
|
|
numeric_converted = 0 |
|
|
for col in df.select_dtypes(include=['object']).columns: |
|
|
try: |
|
|
|
|
|
cleaned = df[col].astype(str).str.replace(r'[$Β£β¬,]', '', regex=True) |
|
|
numeric_values = pd.to_numeric(cleaned, errors='coerce') |
|
|
|
|
|
if numeric_values.notna().sum() / len(df) > 0.8: |
|
|
df[col] = numeric_values |
|
|
numeric_converted += 1 |
|
|
except Exception: |
|
|
pass |
|
|
if numeric_converted > 0: |
|
|
report.append(f"β
Converted {numeric_converted} columns to numeric") |
|
|
|
|
|
|
|
|
final_rows = len(df) |
|
|
final_cols = len(df.columns) |
|
|
|
|
|
summary = f""" |
|
|
## π§Ή Data Cleaning Complete! |
|
|
|
|
|
### Summary |
|
|
- **Original:** {original_rows:,} rows Γ {original_cols} columns |
|
|
- **Cleaned:** {final_rows:,} rows Γ {final_cols} columns |
|
|
- **Rows removed:** {original_rows - final_rows:,} |
|
|
- **Columns removed:** {original_cols - final_cols} |
|
|
|
|
|
### Actions Performed |
|
|
""" + "\n".join(f"- {r}" for r in report) |
|
|
|
|
|
return df, summary |
|
|
|
|
|
|
|
|
def get_data_info(df: pd.DataFrame) -> Dict[str, Any]: |
|
|
""" |
|
|
Get comprehensive information about the DataFrame. |
|
|
|
|
|
Args: |
|
|
df: pandas DataFrame |
|
|
|
|
|
Returns: |
|
|
Dictionary containing data information |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return {} |
|
|
|
|
|
info = { |
|
|
'rows': len(df), |
|
|
'columns': len(df.columns), |
|
|
'column_names': df.columns.tolist(), |
|
|
'dtypes': df.dtypes.astype(str).to_dict(), |
|
|
'memory_usage': df.memory_usage(deep=True).sum(), |
|
|
'numeric_columns': get_numeric_columns(df), |
|
|
'categorical_columns': get_categorical_columns(df), |
|
|
'datetime_columns': get_datetime_columns(df), |
|
|
'missing_values': df.isnull().sum().to_dict(), |
|
|
'total_missing': df.isnull().sum().sum() |
|
|
} |
|
|
|
|
|
return info |
|
|
|
|
|
|
|
|
def get_data_preview(df: pd.DataFrame, n_rows: int = 10, position: str = 'head') -> pd.DataFrame: |
|
|
""" |
|
|
Get a preview of the DataFrame. |
|
|
|
|
|
Args: |
|
|
df: pandas DataFrame |
|
|
n_rows: Number of rows to show |
|
|
position: 'head' for first rows, 'tail' for last rows |
|
|
|
|
|
Returns: |
|
|
Preview DataFrame |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return pd.DataFrame() |
|
|
|
|
|
if position == 'tail': |
|
|
return df.tail(n_rows) |
|
|
return df.head(n_rows) |
|
|
|
|
|
|
|
|
def get_summary_statistics(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: |
|
|
""" |
|
|
Generate summary statistics for numerical and categorical columns. |
|
|
|
|
|
Args: |
|
|
df: pandas DataFrame |
|
|
|
|
|
Returns: |
|
|
Tuple of (numerical_stats DataFrame, categorical_stats DataFrame) |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return pd.DataFrame(), pd.DataFrame() |
|
|
|
|
|
|
|
|
numeric_cols = get_numeric_columns(df) |
|
|
if numeric_cols: |
|
|
num_stats = df[numeric_cols].describe().T |
|
|
num_stats['missing'] = df[numeric_cols].isnull().sum() |
|
|
num_stats['missing_%'] = (num_stats['missing'] / len(df) * 100).round(2) |
|
|
num_stats = num_stats.round(2) |
|
|
else: |
|
|
num_stats = pd.DataFrame() |
|
|
|
|
|
|
|
|
cat_cols = get_categorical_columns(df) |
|
|
if cat_cols: |
|
|
cat_stats_list = [] |
|
|
for col in cat_cols: |
|
|
stats = { |
|
|
'column': col, |
|
|
'unique_values': df[col].nunique(), |
|
|
'most_common': df[col].mode().iloc[0] if not df[col].mode().empty else 'N/A', |
|
|
'most_common_freq': df[col].value_counts().iloc[0] if not df[col].value_counts().empty else 0, |
|
|
'missing': df[col].isnull().sum(), |
|
|
'missing_%': round((df[col].isnull().sum() / len(df)) * 100, 2) |
|
|
} |
|
|
cat_stats_list.append(stats) |
|
|
cat_stats = pd.DataFrame(cat_stats_list) |
|
|
else: |
|
|
cat_stats = pd.DataFrame() |
|
|
|
|
|
return num_stats, cat_stats |
|
|
|
|
|
|
|
|
def get_missing_value_report(df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
Generate a detailed missing value report. |
|
|
|
|
|
Args: |
|
|
df: pandas DataFrame |
|
|
|
|
|
Returns: |
|
|
DataFrame with missing value information |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return pd.DataFrame() |
|
|
|
|
|
missing_data = [] |
|
|
for col in df.columns: |
|
|
missing_count = df[col].isnull().sum() |
|
|
missing_data.append({ |
|
|
'Column': col, |
|
|
'Data Type': str(df[col].dtype), |
|
|
'Missing Count': missing_count, |
|
|
'Missing %': round((missing_count / len(df)) * 100, 2), |
|
|
'Non-Missing Count': len(df) - missing_count |
|
|
}) |
|
|
|
|
|
report = pd.DataFrame(missing_data) |
|
|
report = report.sort_values('Missing Count', ascending=False) |
|
|
|
|
|
return report |
|
|
|
|
|
|
|
|
def get_correlation_matrix(df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
Calculate correlation matrix for numerical columns. |
|
|
|
|
|
Args: |
|
|
df: pandas DataFrame |
|
|
|
|
|
Returns: |
|
|
Correlation matrix DataFrame |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return pd.DataFrame() |
|
|
|
|
|
numeric_cols = get_numeric_columns(df) |
|
|
if len(numeric_cols) < 2: |
|
|
return pd.DataFrame() |
|
|
|
|
|
corr_matrix = df[numeric_cols].corr().round(3) |
|
|
return corr_matrix |
|
|
|
|
|
|
|
|
def filter_data_dynamic( |
|
|
df: pd.DataFrame, |
|
|
filters: Dict[str, Any] |
|
|
) -> Tuple[pd.DataFrame, str]: |
|
|
""" |
|
|
Apply dynamic filters to the DataFrame based on provided filter dictionary. |
|
|
|
|
|
Args: |
|
|
df: pandas DataFrame to filter |
|
|
filters: Dictionary with column names as keys and filter values |
|
|
Format: { |
|
|
'column_name': { |
|
|
'type': 'range' | 'categorical' | 'date_range' | 'search', |
|
|
'min': value, 'max': value, # for range |
|
|
'values': [list], # for categorical |
|
|
'start': date, 'end': date, # for date_range |
|
|
'term': string # for search |
|
|
} |
|
|
} |
|
|
|
|
|
Returns: |
|
|
Tuple of (filtered DataFrame, filter summary message) |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return pd.DataFrame(), "No data to filter." |
|
|
|
|
|
filtered_df = df.copy() |
|
|
filters_applied = [] |
|
|
|
|
|
for col, filter_config in filters.items(): |
|
|
if col not in filtered_df.columns: |
|
|
continue |
|
|
|
|
|
filter_type = filter_config.get('type', '') |
|
|
|
|
|
if filter_type == 'range': |
|
|
min_val = filter_config.get('min') |
|
|
max_val = filter_config.get('max') |
|
|
if min_val is not None: |
|
|
filtered_df = filtered_df[filtered_df[col] >= min_val] |
|
|
filters_applied.append(f"{col} >= {min_val}") |
|
|
if max_val is not None: |
|
|
filtered_df = filtered_df[filtered_df[col] <= max_val] |
|
|
filters_applied.append(f"{col} <= {max_val}") |
|
|
|
|
|
elif filter_type == 'categorical': |
|
|
values = filter_config.get('values', []) |
|
|
if values and len(values) > 0: |
|
|
filtered_df = filtered_df[filtered_df[col].isin(values)] |
|
|
filters_applied.append(f"{col} in {values}") |
|
|
|
|
|
elif filter_type == 'date_range': |
|
|
start = filter_config.get('start') |
|
|
end = filter_config.get('end') |
|
|
if start: |
|
|
try: |
|
|
start_date = pd.to_datetime(start) |
|
|
filtered_df = filtered_df[filtered_df[col] >= start_date] |
|
|
filters_applied.append(f"{col} >= {start}") |
|
|
except Exception: |
|
|
pass |
|
|
if end: |
|
|
try: |
|
|
end_date = pd.to_datetime(end) |
|
|
filtered_df = filtered_df[filtered_df[col] <= end_date] |
|
|
filters_applied.append(f"{col} <= {end}") |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
elif filter_type == 'search': |
|
|
term = filter_config.get('term', '') |
|
|
if term: |
|
|
filtered_df = filtered_df[ |
|
|
filtered_df[col].fillna('').astype(str).str.lower().str.contains(term.lower(), regex=False) |
|
|
] |
|
|
filters_applied.append(f"{col} contains '{term}'") |
|
|
|
|
|
|
|
|
if filters_applied: |
|
|
summary = f"β
Filters applied: {', '.join(filters_applied)}\n" |
|
|
summary += f"π Results: {len(filtered_df):,} rows (from {len(df):,} original rows)" |
|
|
else: |
|
|
summary = f"βΉοΈ No filters applied. Showing all {len(filtered_df):,} rows." |
|
|
|
|
|
return filtered_df, summary |
|
|
|
|
|
|
|
|
def get_column_unique_values(df: pd.DataFrame, column: str, max_values: int = 100) -> List[str]: |
|
|
""" |
|
|
Get unique values from a column for filter dropdowns. |
|
|
|
|
|
Args: |
|
|
df: pandas DataFrame |
|
|
column: Column name |
|
|
max_values: Maximum number of values to return |
|
|
|
|
|
Returns: |
|
|
List of unique values |
|
|
""" |
|
|
if df is None or column not in df.columns: |
|
|
return [] |
|
|
|
|
|
unique_vals = df[column].dropna().unique() |
|
|
|
|
|
|
|
|
try: |
|
|
sorted_vals = sorted([str(v) for v in unique_vals]) |
|
|
except TypeError: |
|
|
sorted_vals = [str(v) for v in unique_vals] |
|
|
|
|
|
return sorted_vals[:max_values] |
|
|
|
|
|
|
|
|
def aggregate_data( |
|
|
df: pd.DataFrame, |
|
|
group_by: str, |
|
|
value_column: str, |
|
|
agg_method: str = 'sum' |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
Aggregate data by a grouping column. |
|
|
|
|
|
Args: |
|
|
df: pandas DataFrame |
|
|
group_by: Column to group by |
|
|
value_column: Column to aggregate |
|
|
agg_method: Aggregation method ('sum', 'mean', 'count', 'median', 'min', 'max') |
|
|
|
|
|
Returns: |
|
|
Aggregated DataFrame |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return pd.DataFrame() |
|
|
|
|
|
if group_by not in df.columns or value_column not in df.columns: |
|
|
return pd.DataFrame() |
|
|
|
|
|
try: |
|
|
if agg_method == 'count': |
|
|
result = df.groupby(group_by)[value_column].count().reset_index() |
|
|
else: |
|
|
result = df.groupby(group_by)[value_column].agg(agg_method).reset_index() |
|
|
|
|
|
result.columns = [group_by, f'{value_column}_{agg_method}'] |
|
|
result = result.sort_values(result.columns[1], ascending=False) |
|
|
|
|
|
return result |
|
|
except Exception: |
|
|
return pd.DataFrame() |
|
|
|
|
|
|
|
|
def export_to_csv(df: pd.DataFrame, filename: str = "exported_data.csv") -> str: |
|
|
""" |
|
|
Export DataFrame to CSV and return the file path. |
|
|
|
|
|
Args: |
|
|
df: pandas DataFrame to export |
|
|
filename: Output filename |
|
|
|
|
|
Returns: |
|
|
Path to the exported file |
|
|
""" |
|
|
if df is None or df.empty: |
|
|
return None |
|
|
|
|
|
try: |
|
|
df.to_csv(filename, index=False) |
|
|
return filename |
|
|
except Exception as e: |
|
|
return None |