Spaces:
Sleeping
Sleeping
| """ | |
| Data Loader Module - InsightGenAI | |
| ================================ | |
| Handles CSV upload, data validation, missing value analysis, | |
| and automatic column type detection. | |
| Author: InsightGenAI Team | |
| Version: 1.0.0 | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, Tuple, Optional, List | |
| import streamlit as st | |
| class DataLoader: | |
| """ | |
| A class to handle all data loading and validation operations. | |
| Attributes: | |
| df (pd.DataFrame): The loaded dataset | |
| file_name (str): Name of the uploaded file | |
| column_types (Dict): Dictionary mapping columns to their detected types | |
| """ | |
| def __init__(self): | |
| """Initialize the DataLoader with empty attributes.""" | |
| self.df: Optional[pd.DataFrame] = None | |
| self.file_name: str = "" | |
| self.column_types: Dict[str, str] = {} | |
| self.missing_summary: Dict = {} | |
| def load_csv(self, uploaded_file) -> Tuple[bool, str]: | |
| """ | |
| Load and validate a CSV file. | |
| Args: | |
| uploaded_file: Streamlit uploaded file object | |
| Returns: | |
| Tuple[bool, str]: (Success status, Message) | |
| """ | |
| try: | |
| self.file_name = uploaded_file.name | |
| self.df = pd.read_csv(uploaded_file) | |
| # Basic validation | |
| if self.df.empty: | |
| return False, "The uploaded file is empty." | |
| if len(self.df.columns) < 2: | |
| return False, "Dataset must have at least 2 columns (features + target)." | |
| # Detect column types | |
| self._detect_column_types() | |
| # Generate missing value summary | |
| self._generate_missing_summary() | |
| return True, f"Successfully loaded {self.file_name} with {len(self.df)} rows and {len(self.df.columns)} columns." | |
| except pd.errors.EmptyDataError: | |
| return False, "The uploaded file is empty." | |
| except pd.errors.ParserError: | |
| return False, "Error parsing CSV file. Please check the file format." | |
| except Exception as e: | |
| return False, f"Error loading file: {str(e)}" | |
| def _detect_column_types(self) -> None: | |
| """ | |
| Automatically detect the type of each column. | |
| Detected types: | |
| - numeric: Integer or float columns | |
| - categorical: Object/category columns with low cardinality | |
| - text: Object columns with high cardinality (potential text data) | |
| - datetime: Columns that can be parsed as dates | |
| - boolean: Columns with binary values | |
| """ | |
| if self.df is None: | |
| return | |
| for col in self.df.columns: | |
| # Check for datetime | |
| if self.df[col].dtype == 'object': | |
| try: | |
| pd.to_datetime(self.df[col], errors='raise') | |
| self.column_types[col] = 'datetime' | |
| continue | |
| except: | |
| pass | |
| # Check for numeric | |
| if pd.api.types.is_numeric_dtype(self.df[col]): | |
| # Check if it's boolean (0/1 or True/False) | |
| unique_vals = self.df[col].dropna().unique() | |
| if len(unique_vals) <= 2 and set(unique_vals).issubset({0, 1, True, False}): | |
| self.column_types[col] = 'boolean' | |
| else: | |
| self.column_types[col] = 'numeric' | |
| # Check for categorical vs text | |
| elif self.df[col].dtype == 'object': | |
| unique_count = self.df[col].nunique() | |
| total_count = len(self.df[col]) | |
| # If unique values are less than 10% of total, it's categorical | |
| if unique_count / total_count < 0.1 and unique_count < 50: | |
| self.column_types[col] = 'categorical' | |
| else: | |
| # Check average string length for text detection | |
| avg_length = self.df[col].dropna().astype(str).str.len().mean() | |
| if avg_length > 20: | |
| self.column_types[col] = 'text' | |
| else: | |
| self.column_types[col] = 'categorical' | |
| # Check for boolean | |
| elif self.df[col].dtype == 'bool': | |
| self.column_types[col] = 'boolean' | |
| else: | |
| self.column_types[col] = 'other' | |
| def _generate_missing_summary(self) -> None: | |
| """Generate a summary of missing values in the dataset.""" | |
| if self.df is None: | |
| return | |
| missing_counts = self.df.isnull().sum() | |
| missing_percent = (missing_counts / len(self.df)) * 100 | |
| self.missing_summary = { | |
| 'total_rows': len(self.df), | |
| 'total_columns': len(self.df.columns), | |
| 'columns_with_missing': missing_counts[missing_counts > 0].to_dict(), | |
| 'missing_percentages': missing_percent[missing_percent > 0].to_dict(), | |
| 'total_missing': missing_counts.sum(), | |
| 'complete_rows': len(self.df.dropna()) | |
| } | |
| def get_dataframe(self) -> Optional[pd.DataFrame]: | |
| """Return the loaded dataframe.""" | |
| return self.df | |
| def get_column_types(self) -> Dict[str, str]: | |
| """Return the detected column types.""" | |
| return self.column_types | |
| def get_missing_summary(self) -> Dict: | |
| """Return the missing value summary.""" | |
| return self.missing_summary | |
| def get_numeric_columns(self) -> List[str]: | |
| """Return list of numeric column names.""" | |
| return [col for col, type_ in self.column_types.items() if type_ == 'numeric'] | |
| def get_categorical_columns(self) -> List[str]: | |
| """Return list of categorical column names.""" | |
| return [col for col, type_ in self.column_types.items() if type_ == 'categorical'] | |
| def get_text_columns(self) -> List[str]: | |
| """Return list of text column names.""" | |
| return [col for col, type_ in self.column_types.items() if type_ == 'text'] | |
| def get_datetime_columns(self) -> List[str]: | |
| """Return list of datetime column names.""" | |
| return [col for col, type_ in self.column_types.items() if type_ == 'datetime'] | |
| def get_basic_stats(self) -> Dict: | |
| """ | |
| Return basic statistics about the dataset. | |
| Returns: | |
| Dict containing dataset statistics | |
| """ | |
| if self.df is None: | |
| return {} | |
| return { | |
| 'shape': self.df.shape, | |
| 'memory_usage': self.df.memory_usage(deep=True).sum() / (1024 * 1024), # MB | |
| 'duplicates': self.df.duplicated().sum(), | |
| 'column_types_count': pd.Series(self.column_types).value_counts().to_dict() | |
| } | |
| def suggest_target_column(self) -> Optional[str]: | |
| """ | |
| Suggest a potential target column based on heuristics. | |
| Returns: | |
| str: Suggested target column name or None | |
| """ | |
| if self.df is None: | |
| return None | |
| # Common target column names | |
| target_patterns = ['target', 'label', 'class', 'y', 'output', 'result', | |
| 'prediction', 'category', 'type', 'grade', 'score'] | |
| # First, look for columns matching common target patterns | |
| for col in self.df.columns: | |
| col_lower = col.lower() | |
| if any(pattern in col_lower for pattern in target_patterns): | |
| return col | |
| # If no pattern match, suggest the last column (common convention) | |
| return self.df.columns[-1] | |
| def clean_data(self, handle_missing: str = 'drop', | |
| outlier_method: Optional[str] = None) -> pd.DataFrame: | |
| """ | |
| Clean the dataset based on specified parameters. | |
| Args: | |
| handle_missing: How to handle missing values ('drop', 'mean', 'median', 'mode') | |
| outlier_method: Method for outlier detection ('iqr', 'zscore', None) | |
| Returns: | |
| pd.DataFrame: Cleaned dataframe | |
| """ | |
| if self.df is None: | |
| raise ValueError("No data loaded. Please load data first.") | |
| df_clean = self.df.copy() | |
| # Handle missing values | |
| if handle_missing == 'drop': | |
| df_clean = df_clean.dropna() | |
| elif handle_missing == 'mean': | |
| numeric_cols = self.get_numeric_columns() | |
| df_clean[numeric_cols] = df_clean[numeric_cols].fillna(df_clean[numeric_cols].mean()) | |
| elif handle_missing == 'median': | |
| numeric_cols = self.get_numeric_columns() | |
| df_clean[numeric_cols] = df_clean[numeric_cols].fillna(df_clean[numeric_cols].median()) | |
| elif handle_missing == 'mode': | |
| df_clean = df_clean.fillna(df_clean.mode().iloc[0]) | |
| # Handle outliers | |
| if outlier_method == 'iqr': | |
| numeric_cols = self.get_numeric_columns() | |
| for col in numeric_cols: | |
| Q1 = df_clean[col].quantile(0.25) | |
| Q3 = df_clean[col].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| df_clean = df_clean[(df_clean[col] >= lower_bound) & (df_clean[col] <= upper_bound)] | |
| elif outlier_method == 'zscore': | |
| from scipy import stats | |
| numeric_cols = self.get_numeric_columns() | |
| z_scores = np.abs(stats.zscore(df_clean[numeric_cols])) | |
| df_clean = df_clean[(z_scores < 3).all(axis=1)] | |
| return df_clean | |
| # Utility functions for Streamlit integration | |
| def display_data_summary(data_loader: DataLoader): | |
| """ | |
| Display a summary of the loaded data in Streamlit. | |
| Args: | |
| data_loader: Instance of DataLoader with loaded data | |
| """ | |
| if data_loader.df is None: | |
| st.warning("No data loaded yet.") | |
| return | |
| # Basic info | |
| stats = data_loader.get_basic_stats() | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Rows", stats['shape'][0]) | |
| with col2: | |
| st.metric("Columns", stats['shape'][1]) | |
| with col3: | |
| st.metric("Duplicates", stats['duplicates']) | |
| with col4: | |
| st.metric("Memory (MB)", f"{stats['memory_usage']:.2f}") | |
| # Column types | |
| st.subheader("Column Types") | |
| type_df = pd.DataFrame(list(data_loader.column_types.items()), | |
| columns=['Column', 'Type']) | |
| st.dataframe(type_df, use_container_width=True) | |
| # Missing values | |
| if data_loader.missing_summary['columns_with_missing']: | |
| st.subheader("Missing Values") | |
| missing_df = pd.DataFrame({ | |
| 'Column': list(data_loader.missing_summary['missing_percentages'].keys()), | |
| 'Missing Count': list(data_loader.missing_summary['columns_with_missing'].values()), | |
| 'Missing %': [f"{v:.2f}%" for v in data_loader.missing_summary['missing_percentages'].values()] | |
| }) | |
| st.dataframe(missing_df, use_container_width=True) | |
| else: | |
| st.success("No missing values found! 🎉") | |