import streamlit as st import pandas as pd import numpy as np import warnings from typing import Dict, List, Any, Tuple from scipy import stats warnings.filterwarnings('ignore') # All cached data processing functions @st.cache_data def load_csv_with_encoding(file_content: bytes, filename: str) -> pd.DataFrame: """Load CSV with automatic encoding detection - cached""" import chardet detected = chardet.detect(file_content) encoding = detected['encoding'] try: from io import BytesIO return pd.read_csv(BytesIO(file_content), encoding=encoding) except: encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] for enc in encodings: try: return pd.read_csv(BytesIO(file_content), encoding=enc) except: continue raise Exception("Cannot read file with any encoding") @st.cache_data def load_excel_file(file_content: bytes) -> pd.DataFrame: """Load Excel file - cached""" from io import BytesIO return pd.read_excel(BytesIO(file_content)) @st.cache_data def calculate_basic_stats(df: pd.DataFrame) -> Dict[str, Any]: """Calculate basic statistics - cached""" dtype_counts = df.dtypes.value_counts() dtype_dict = {str(k): int(v) for k, v in dtype_counts.items()} return { 'shape': df.shape, 'memory_usage': float(df.memory_usage(deep=True).sum() / 1024**2), 'missing_values': int(df.isnull().sum().sum()), 'dtypes': dtype_dict, 'duplicates': int(df.duplicated().sum()) } @st.cache_data def calculate_column_cardinality(df: pd.DataFrame) -> pd.DataFrame: """Calculate column cardinality analysis - cached""" cardinality_data = [] for col in df.columns: unique_count = df[col].nunique() unique_ratio = unique_count / len(df) # Determine column type based on cardinality if unique_count == 1: col_type = "Constant" elif unique_count == len(df): col_type = "Unique Identifier" elif unique_ratio < 0.05: col_type = "Low Cardinality" elif unique_ratio < 0.5: col_type = "Medium Cardinality" else: col_type = "High Cardinality" cardinality_data.append({ 'Column': col, 'Unique Count': unique_count, 'Unique Ratio': unique_ratio, 'Type': col_type, 'Data Type': str(df[col].dtype) }) return pd.DataFrame(cardinality_data) @st.cache_data def calculate_memory_optimization(df: pd.DataFrame) -> Dict[str, Any]: """Calculate memory optimization suggestions - cached""" suggestions = [] current_memory = df.memory_usage(deep=True).sum() / 1024**2 potential_savings = 0 for col in df.columns: if df[col].dtype == 'object': unique_ratio = df[col].nunique() / len(df) if unique_ratio < 0.5: # Less than 50% unique values # Estimate category memory usage category_memory = df[col].astype('category').memory_usage(deep=True) object_memory = df[col].memory_usage(deep=True) savings = (object_memory - category_memory) / 1024**2 if savings > 0.1: # More than 0.1MB savings suggestions.append({ 'column': col, 'current_type': 'object', 'suggested_type': 'category', 'savings_mb': savings }) potential_savings += savings return { 'suggestions': suggestions, 'current_memory_mb': current_memory, 'potential_savings_mb': potential_savings, 'potential_savings_pct': (potential_savings / current_memory) * 100 if current_memory > 0 else 0 } @st.cache_data def calculate_missing_data(df: pd.DataFrame) -> pd.DataFrame: """Calculate missing data analysis - cached""" missing_data = df.isnull().sum() if missing_data.sum() > 0: missing_df = pd.DataFrame({ 'Column': missing_data.index, 'Missing Count': missing_data.values, 'Missing %': (missing_data.values / len(df)) * 100 }) return missing_df[missing_df['Missing Count'] > 0].sort_values('Missing %', ascending=False) return pd.DataFrame() @st.cache_data def calculate_correlation_matrix(df: pd.DataFrame) -> pd.DataFrame: """Calculate correlation matrix - cached""" numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() return df[numeric_cols].corr() if len(numeric_cols) > 1 else pd.DataFrame() @st.cache_data def get_column_types(df: pd.DataFrame) -> Dict[str, List[str]]: """Get column types - cached""" return { 'numeric': df.select_dtypes(include=[np.number]).columns.tolist(), 'categorical': df.select_dtypes(include=['object']).columns.tolist(), 'datetime': df.select_dtypes(include=['datetime64']).columns.tolist() } @st.cache_data def calculate_numeric_stats(df: pd.DataFrame, column: str) -> Dict[str, float]: """Calculate enhanced numeric statistics - cached""" series = df[column].dropna() return { 'mean': series.mean(), 'median': series.median(), 'std': series.std(), 'skewness': series.skew(), 'kurtosis': series.kurtosis(), 'min': series.min(), 'max': series.max(), 'q25': series.quantile(0.25), 'q75': series.quantile(0.75) } @st.cache_data def calculate_outliers(df: pd.DataFrame, column: str) -> pd.DataFrame: """Calculate outliers using IQR method - cached""" Q1 = df[column].quantile(0.25) Q3 = df[column].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR return df[(df[column] < lower_bound) | (df[column] > upper_bound)] @st.cache_data def detect_mixed_types(df: pd.DataFrame) -> List[Dict[str, Any]]: """Detect columns with mixed data types - cached""" mixed_type_issues = [] for col in df.select_dtypes(include=['object']).columns: # Try to convert to numeric numeric_conversion = pd.to_numeric(df[col], errors='coerce') new_nulls = numeric_conversion.isnull().sum() - df[col].isnull().sum() if new_nulls > 0: mixed_type_issues.append({ 'column': col, 'problematic_values': new_nulls, 'total_values': len(df[col]), 'percentage': (new_nulls / len(df[col])) * 100 }) return mixed_type_issues @st.cache_data def get_value_counts(df: pd.DataFrame, column: str, top_n: int = 10) -> pd.Series: """Get value counts for categorical column - cached""" return df[column].value_counts().head(top_n) @st.cache_data def calculate_crosstab(df: pd.DataFrame, col1: str, col2: str) -> pd.DataFrame: """Calculate crosstab between two categorical columns - cached""" return pd.crosstab(df[col1], df[col2]) @st.cache_data def calculate_group_stats(df: pd.DataFrame, group_col: str, metric_col: str) -> pd.DataFrame: """Calculate group statistics - cached""" return df.groupby(group_col)[metric_col].agg(['mean', 'median', 'std', 'count']) @st.cache_data def calculate_data_quality_score(df: pd.DataFrame) -> Dict[str, Any]: """Calculate overall data quality score - cached""" score = 100 issues = [] # Missing values penalty missing_pct = (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100 if missing_pct > 0: penalty = min(30, missing_pct * 2) # Max 30 points penalty score -= penalty issues.append(f"Missing values: {missing_pct:.1f}%") # Duplicates penalty duplicate_pct = (df.duplicated().sum() / len(df)) * 100 if duplicate_pct > 0: penalty = min(20, duplicate_pct * 4) # Max 20 points penalty score -= penalty issues.append(f"Duplicate rows: {duplicate_pct:.1f}%") # Constant columns penalty constant_cols = [col for col in df.columns if df[col].nunique() == 1] if constant_cols: penalty = min(10, len(constant_cols) * 2) score -= penalty issues.append(f"Constant columns: {len(constant_cols)}") # Mixed types penalty mixed_types = detect_mixed_types(df) if mixed_types: penalty = min(10, len(mixed_types) * 3) score -= penalty issues.append(f"Mixed type columns: {len(mixed_types)}") return { 'score': max(0, score), 'issues': issues, 'grade': 'A' if score >= 90 else 'B' if score >= 80 else 'C' if score >= 70 else 'D' if score >= 60 else 'F' } def load_data(uploaded_file): """Unified data loading function""" file_content = uploaded_file.read() uploaded_file.seek(0) if uploaded_file.name.endswith('.csv'): return load_csv_with_encoding(file_content, uploaded_file.name) else: return load_excel_file(file_content) def apply_data_cleaning(df: pd.DataFrame, operations: List[Dict[str, Any]]) -> pd.DataFrame: """Apply data cleaning operations""" cleaned_df = df.copy() for operation in operations: if operation['type'] == 'fill_missing': if operation['method'] == 'mean': cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( cleaned_df[operation['column']].mean()) elif operation['method'] == 'median': cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( cleaned_df[operation['column']].median()) elif operation['method'] == 'mode': cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( cleaned_df[operation['column']].mode().iloc[0] if not cleaned_df[operation['column']].mode().empty else 0) elif operation['method'] == 'drop': cleaned_df = cleaned_df.dropna(subset=[operation['column']]) elif operation['type'] == 'remove_duplicates': cleaned_df = cleaned_df.drop_duplicates() elif operation['type'] == 'remove_outliers': Q1 = cleaned_df[operation['column']].quantile(0.25) Q3 = cleaned_df[operation['column']].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR cleaned_df = cleaned_df[ (cleaned_df[operation['column']] >= lower_bound) & (cleaned_df[operation['column']] <= upper_bound) ] elif operation['type'] == 'cap_outliers': Q1 = cleaned_df[operation['column']].quantile(0.25) Q3 = cleaned_df[operation['column']].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR cleaned_df[operation['column']] = cleaned_df[operation['column']].clip(lower_bound, upper_bound) elif operation['type'] == 'convert_type': if operation['target_type'] == 'category': cleaned_df[operation['column']] = cleaned_df[operation['column']].astype('category') return cleaned_df