Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import warnings | |
| from typing import Dict, List, Any, Tuple | |
| from scipy import stats | |
| warnings.filterwarnings('ignore') | |
| # All cached data processing functions | |
| def load_csv_with_encoding(file_content: bytes, filename: str) -> pd.DataFrame: | |
| """Load CSV with automatic encoding detection - cached""" | |
| import chardet | |
| detected = chardet.detect(file_content) | |
| encoding = detected['encoding'] | |
| try: | |
| from io import BytesIO | |
| return pd.read_csv(BytesIO(file_content), encoding=encoding) | |
| except: | |
| encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] | |
| for enc in encodings: | |
| try: | |
| return pd.read_csv(BytesIO(file_content), encoding=enc) | |
| except: | |
| continue | |
| raise Exception("Cannot read file with any encoding") | |
| def load_excel_file(file_content: bytes) -> pd.DataFrame: | |
| """Load Excel file - cached""" | |
| from io import BytesIO | |
| return pd.read_excel(BytesIO(file_content)) | |
| def calculate_basic_stats(df: pd.DataFrame) -> Dict[str, Any]: | |
| """Calculate basic statistics - cached""" | |
| dtype_counts = df.dtypes.value_counts() | |
| dtype_dict = {str(k): int(v) for k, v in dtype_counts.items()} | |
| return { | |
| 'shape': df.shape, | |
| 'memory_usage': float(df.memory_usage(deep=True).sum() / 1024**2), | |
| 'missing_values': int(df.isnull().sum().sum()), | |
| 'dtypes': dtype_dict, | |
| 'duplicates': int(df.duplicated().sum()) | |
| } | |
| def calculate_column_cardinality(df: pd.DataFrame) -> pd.DataFrame: | |
| """Calculate column cardinality analysis - cached""" | |
| cardinality_data = [] | |
| for col in df.columns: | |
| unique_count = df[col].nunique() | |
| unique_ratio = unique_count / len(df) | |
| # Determine column type based on cardinality | |
| if unique_count == 1: | |
| col_type = "Constant" | |
| elif unique_count == len(df): | |
| col_type = "Unique Identifier" | |
| elif unique_ratio < 0.05: | |
| col_type = "Low Cardinality" | |
| elif unique_ratio < 0.5: | |
| col_type = "Medium Cardinality" | |
| else: | |
| col_type = "High Cardinality" | |
| cardinality_data.append({ | |
| 'Column': col, | |
| 'Unique Count': unique_count, | |
| 'Unique Ratio': unique_ratio, | |
| 'Type': col_type, | |
| 'Data Type': str(df[col].dtype) | |
| }) | |
| return pd.DataFrame(cardinality_data) | |
| def calculate_memory_optimization(df: pd.DataFrame) -> Dict[str, Any]: | |
| """Calculate memory optimization suggestions - cached""" | |
| suggestions = [] | |
| current_memory = df.memory_usage(deep=True).sum() / 1024**2 | |
| potential_savings = 0 | |
| for col in df.columns: | |
| if df[col].dtype == 'object': | |
| unique_ratio = df[col].nunique() / len(df) | |
| if unique_ratio < 0.5: # Less than 50% unique values | |
| # Estimate category memory usage | |
| category_memory = df[col].astype('category').memory_usage(deep=True) | |
| object_memory = df[col].memory_usage(deep=True) | |
| savings = (object_memory - category_memory) / 1024**2 | |
| if savings > 0.1: # More than 0.1MB savings | |
| suggestions.append({ | |
| 'column': col, | |
| 'current_type': 'object', | |
| 'suggested_type': 'category', | |
| 'savings_mb': savings | |
| }) | |
| potential_savings += savings | |
| return { | |
| 'suggestions': suggestions, | |
| 'current_memory_mb': current_memory, | |
| 'potential_savings_mb': potential_savings, | |
| 'potential_savings_pct': (potential_savings / current_memory) * 100 if current_memory > 0 else 0 | |
| } | |
| def calculate_missing_data(df: pd.DataFrame) -> pd.DataFrame: | |
| """Calculate missing data analysis - cached""" | |
| missing_data = df.isnull().sum() | |
| if missing_data.sum() > 0: | |
| missing_df = pd.DataFrame({ | |
| 'Column': missing_data.index, | |
| 'Missing Count': missing_data.values, | |
| 'Missing %': (missing_data.values / len(df)) * 100 | |
| }) | |
| return missing_df[missing_df['Missing Count'] > 0].sort_values('Missing %', ascending=False) | |
| return pd.DataFrame() | |
| def calculate_correlation_matrix(df: pd.DataFrame) -> pd.DataFrame: | |
| """Calculate correlation matrix - cached""" | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| return df[numeric_cols].corr() if len(numeric_cols) > 1 else pd.DataFrame() | |
| def get_column_types(df: pd.DataFrame) -> Dict[str, List[str]]: | |
| """Get column types - cached""" | |
| return { | |
| 'numeric': df.select_dtypes(include=[np.number]).columns.tolist(), | |
| 'categorical': df.select_dtypes(include=['object']).columns.tolist(), | |
| 'datetime': df.select_dtypes(include=['datetime64']).columns.tolist() | |
| } | |
| def calculate_numeric_stats(df: pd.DataFrame, column: str) -> Dict[str, float]: | |
| """Calculate enhanced numeric statistics - cached""" | |
| series = df[column].dropna() | |
| return { | |
| 'mean': series.mean(), | |
| 'median': series.median(), | |
| 'std': series.std(), | |
| 'skewness': series.skew(), | |
| 'kurtosis': series.kurtosis(), | |
| 'min': series.min(), | |
| 'max': series.max(), | |
| 'q25': series.quantile(0.25), | |
| 'q75': series.quantile(0.75) | |
| } | |
| def calculate_outliers(df: pd.DataFrame, column: str) -> pd.DataFrame: | |
| """Calculate outliers using IQR method - cached""" | |
| Q1 = df[column].quantile(0.25) | |
| Q3 = df[column].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| return df[(df[column] < lower_bound) | (df[column] > upper_bound)] | |
| def detect_mixed_types(df: pd.DataFrame) -> List[Dict[str, Any]]: | |
| """Detect columns with mixed data types - cached""" | |
| mixed_type_issues = [] | |
| for col in df.select_dtypes(include=['object']).columns: | |
| # Try to convert to numeric | |
| numeric_conversion = pd.to_numeric(df[col], errors='coerce') | |
| new_nulls = numeric_conversion.isnull().sum() - df[col].isnull().sum() | |
| if new_nulls > 0: | |
| mixed_type_issues.append({ | |
| 'column': col, | |
| 'problematic_values': new_nulls, | |
| 'total_values': len(df[col]), | |
| 'percentage': (new_nulls / len(df[col])) * 100 | |
| }) | |
| return mixed_type_issues | |
| def get_value_counts(df: pd.DataFrame, column: str, top_n: int = 10) -> pd.Series: | |
| """Get value counts for categorical column - cached""" | |
| return df[column].value_counts().head(top_n) | |
| def calculate_crosstab(df: pd.DataFrame, col1: str, col2: str) -> pd.DataFrame: | |
| """Calculate crosstab between two categorical columns - cached""" | |
| return pd.crosstab(df[col1], df[col2]) | |
| def calculate_group_stats(df: pd.DataFrame, group_col: str, metric_col: str) -> pd.DataFrame: | |
| """Calculate group statistics - cached""" | |
| return df.groupby(group_col)[metric_col].agg(['mean', 'median', 'std', 'count']) | |
| def calculate_data_quality_score(df: pd.DataFrame) -> Dict[str, Any]: | |
| """Calculate overall data quality score - cached""" | |
| score = 100 | |
| issues = [] | |
| # Missing values penalty | |
| missing_pct = (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100 | |
| if missing_pct > 0: | |
| penalty = min(30, missing_pct * 2) # Max 30 points penalty | |
| score -= penalty | |
| issues.append(f"Missing values: {missing_pct:.1f}%") | |
| # Duplicates penalty | |
| duplicate_pct = (df.duplicated().sum() / len(df)) * 100 | |
| if duplicate_pct > 0: | |
| penalty = min(20, duplicate_pct * 4) # Max 20 points penalty | |
| score -= penalty | |
| issues.append(f"Duplicate rows: {duplicate_pct:.1f}%") | |
| # Constant columns penalty | |
| constant_cols = [col for col in df.columns if df[col].nunique() == 1] | |
| if constant_cols: | |
| penalty = min(10, len(constant_cols) * 2) | |
| score -= penalty | |
| issues.append(f"Constant columns: {len(constant_cols)}") | |
| # Mixed types penalty | |
| mixed_types = detect_mixed_types(df) | |
| if mixed_types: | |
| penalty = min(10, len(mixed_types) * 3) | |
| score -= penalty | |
| issues.append(f"Mixed type columns: {len(mixed_types)}") | |
| return { | |
| 'score': max(0, score), | |
| 'issues': issues, | |
| 'grade': 'A' if score >= 90 else 'B' if score >= 80 else 'C' if score >= 70 else 'D' if score >= 60 else 'F' | |
| } | |
| def load_data(uploaded_file): | |
| """Unified data loading function""" | |
| file_content = uploaded_file.read() | |
| uploaded_file.seek(0) | |
| if uploaded_file.name.endswith('.csv'): | |
| return load_csv_with_encoding(file_content, uploaded_file.name) | |
| else: | |
| return load_excel_file(file_content) | |
| def apply_data_cleaning(df: pd.DataFrame, operations: List[Dict[str, Any]]) -> pd.DataFrame: | |
| """Apply data cleaning operations""" | |
| cleaned_df = df.copy() | |
| for operation in operations: | |
| if operation['type'] == 'fill_missing': | |
| if operation['method'] == 'mean': | |
| cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( | |
| cleaned_df[operation['column']].mean()) | |
| elif operation['method'] == 'median': | |
| cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( | |
| cleaned_df[operation['column']].median()) | |
| elif operation['method'] == 'mode': | |
| cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna( | |
| cleaned_df[operation['column']].mode().iloc[0] if not cleaned_df[operation['column']].mode().empty else 0) | |
| elif operation['method'] == 'drop': | |
| cleaned_df = cleaned_df.dropna(subset=[operation['column']]) | |
| elif operation['type'] == 'remove_duplicates': | |
| cleaned_df = cleaned_df.drop_duplicates() | |
| elif operation['type'] == 'remove_outliers': | |
| Q1 = cleaned_df[operation['column']].quantile(0.25) | |
| Q3 = cleaned_df[operation['column']].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| cleaned_df = cleaned_df[ | |
| (cleaned_df[operation['column']] >= lower_bound) & | |
| (cleaned_df[operation['column']] <= upper_bound) | |
| ] | |
| elif operation['type'] == 'cap_outliers': | |
| Q1 = cleaned_df[operation['column']].quantile(0.25) | |
| Q3 = cleaned_df[operation['column']].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| cleaned_df[operation['column']] = cleaned_df[operation['column']].clip(lower_bound, upper_bound) | |
| elif operation['type'] == 'convert_type': | |
| if operation['target_type'] == 'category': | |
| cleaned_df[operation['column']] = cleaned_df[operation['column']].astype('category') | |
| return cleaned_df |