File size: 10,272 Bytes
3a1de6f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 | """
Data processing module for the Business Intelligence Dashboard.
Handles data loading, cleaning, validation, and filtering operations.
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from utils import get_column_types, detect_datetime_columns, validate_dataframe
def load_data(file_path: str) -> Tuple[Optional[pd.DataFrame], str]:
"""
Load data from CSV or Excel file.
Args:
file_path: Path to the data file
Returns:
Tuple of (DataFrame, error_message). DataFrame is None if loading failed.
"""
try:
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
elif file_path.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_path)
else:
return None, "Unsupported file format. Please upload CSV or Excel files."
# Validate the loaded DataFrame
is_valid, error_msg = validate_dataframe(df)
if not is_valid:
return None, f"Invalid data: {error_msg}"
# Clean numeric columns with commas (e.g., "1,234" -> 1234)
df = clean_numeric_columns(df)
# Auto-detect and convert datetime columns
datetime_cols = detect_datetime_columns(df)
for col in datetime_cols:
try:
df[col] = pd.to_datetime(df[col], errors='coerce')
except Exception:
pass
return df, ""
except Exception as e:
return None, f"Error loading file: {str(e)}"
def clean_numeric_columns(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean columns that contain numeric values with commas or other formatting.
Converts strings like '1,234', '50,000' to proper numeric types.
Args:
df: Input DataFrame
Returns:
DataFrame with cleaned numeric columns
"""
df_cleaned = df.copy()
for col in df_cleaned.columns:
# Skip if already numeric
if pd.api.types.is_numeric_dtype(df_cleaned[col]):
continue
# Check if column contains string values that look like numbers
if df_cleaned[col].dtype == 'object':
# Get a sample of non-null values
sample = df_cleaned[col].dropna().head(100)
if len(sample) > 0:
# Check if values contain commas and look numeric
sample_str = sample.astype(str)
# Count how many values look like comma-separated numbers
numeric_looking = sample_str.str.match(r'^-?[\d,]+\.?\d*$').sum()
# If more than 50% look like numbers with commas, try to convert
if numeric_looking > len(sample) * 0.5:
try:
# Remove commas and convert to numeric
df_cleaned[col] = df_cleaned[col].astype(str).str.replace(',', '').replace('--', '0')
df_cleaned[col] = pd.to_numeric(df_cleaned[col], errors='coerce')
except Exception:
# If conversion fails, keep original
pass
return df_cleaned
def get_data_summary(df: pd.DataFrame) -> Dict[str, Any]:
"""
Generate comprehensive summary statistics for the DataFrame.
Args:
df: Input DataFrame
Returns:
Dictionary containing summary statistics
"""
col_types = get_column_types(df)
summary = {
'shape': df.shape,
'columns': df.columns.tolist(),
'dtypes': df.dtypes.to_dict(),
'column_types': col_types,
'missing_values': df.isnull().sum().to_dict(),
'duplicate_rows': df.duplicated().sum(),
'memory_usage': df.memory_usage(deep=True).sum() / 1024 / 1024, # MB
}
# Numerical statistics
if col_types['numerical']:
numerical_stats = df[col_types['numerical']].describe().to_dict()
summary['numerical_stats'] = numerical_stats
# Categorical statistics
if col_types['categorical']:
categorical_stats = {}
for col in col_types['categorical']:
categorical_stats[col] = {
'unique_count': df[col].nunique(),
'top_values': df[col].value_counts().head(10).to_dict(),
'mode': df[col].mode().iloc[0] if len(df[col].mode()) > 0 else None
}
summary['categorical_stats'] = categorical_stats
return summary
def get_correlation_matrix(df: pd.DataFrame) -> Optional[pd.DataFrame]:
"""
Calculate correlation matrix for numerical columns.
Args:
df: Input DataFrame
Returns:
Correlation matrix DataFrame or None if no numerical columns
"""
col_types = get_column_types(df)
if not col_types['numerical']:
return None
return df[col_types['numerical']].corr()
def apply_filters(df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame:
"""
Apply filters to the DataFrame based on user selections.
Args:
df: Input DataFrame
filters: Dictionary of filter specifications
Format: {
'column_name': {
'type': 'numerical' | 'categorical' | 'datetime',
'min': value, # for numerical
'max': value, # for numerical
'values': [list], # for categorical
'start_date': value, # for datetime
'end_date': value # for datetime
}
}
Returns:
Filtered DataFrame
"""
filtered_df = df.copy()
for column, filter_spec in filters.items():
if column not in filtered_df.columns:
continue
filter_type = filter_spec.get('type')
if filter_type == 'numerical':
min_val = filter_spec.get('min')
max_val = filter_spec.get('max')
if min_val is not None:
filtered_df = filtered_df[filtered_df[column] >= min_val]
if max_val is not None:
filtered_df = filtered_df[filtered_df[column] <= max_val]
elif filter_type == 'categorical':
values = filter_spec.get('values', [])
if values:
filtered_df = filtered_df[filtered_df[column].isin(values)]
elif filter_type == 'datetime':
start_date = filter_spec.get('start_date')
end_date = filter_spec.get('end_date')
if start_date is not None:
filtered_df = filtered_df[filtered_df[column] >= pd.to_datetime(start_date)]
if end_date is not None:
filtered_df = filtered_df[filtered_df[column] <= pd.to_datetime(end_date)]
return filtered_df
def clean_data(df: pd.DataFrame,
drop_duplicates: bool = False,
fill_numerical: Optional[str] = None,
fill_categorical: Optional[str] = None) -> pd.DataFrame:
"""
Clean the DataFrame by handling missing values and duplicates.
Args:
df: Input DataFrame
drop_duplicates: Whether to drop duplicate rows
fill_numerical: Strategy for filling numerical NaNs ('mean', 'median', 'zero', or None)
fill_categorical: Strategy for filling categorical NaNs ('mode', 'unknown', or None)
Returns:
Cleaned DataFrame
"""
cleaned_df = df.copy()
col_types = get_column_types(cleaned_df)
# Handle duplicates
if drop_duplicates:
cleaned_df = cleaned_df.drop_duplicates()
# Handle missing values in numerical columns
if fill_numerical and col_types['numerical']:
for col in col_types['numerical']:
if fill_numerical == 'mean':
cleaned_df[col].fillna(cleaned_df[col].mean(), inplace=True)
elif fill_numerical == 'median':
cleaned_df[col].fillna(cleaned_df[col].median(), inplace=True)
elif fill_numerical == 'zero':
cleaned_df[col].fillna(0, inplace=True)
# Handle missing values in categorical columns
if fill_categorical and col_types['categorical']:
for col in col_types['categorical']:
if fill_categorical == 'mode':
mode_val = cleaned_df[col].mode()
if len(mode_val) > 0:
cleaned_df[col].fillna(mode_val.iloc[0], inplace=True)
elif fill_categorical == 'unknown':
cleaned_df[col].fillna('Unknown', inplace=True)
return cleaned_df
def aggregate_data(df: pd.DataFrame,
group_by: str,
value_column: str,
agg_method: str = 'sum') -> pd.DataFrame:
"""
Aggregate data by grouping and applying an aggregation method.
Args:
df: Input DataFrame
group_by: Column to group by
value_column: Column to aggregate
agg_method: Aggregation method ('sum', 'mean', 'count', 'median', 'min', 'max')
Returns:
Aggregated DataFrame
"""
if group_by not in df.columns or value_column not in df.columns:
return pd.DataFrame()
agg_methods = {
'sum': 'sum',
'mean': 'mean',
'count': 'count',
'median': 'median',
'min': 'min',
'max': 'max'
}
method = agg_methods.get(agg_method, 'sum')
try:
aggregated = df.groupby(group_by)[value_column].agg(method).reset_index()
aggregated.columns = [group_by, f'{value_column}_{method}']
return aggregated
except Exception:
return pd.DataFrame()
def get_data_preview(df: pd.DataFrame, n_rows: int = 10, head: bool = True) -> pd.DataFrame:
"""
Get a preview of the DataFrame.
Args:
df: Input DataFrame
n_rows: Number of rows to return
head: If True, return first n rows; if False, return last n rows
Returns:
Preview DataFrame
"""
if head:
return df.head(n_rows)
else:
return df.tail(n_rows)
|