Spaces:
Sleeping
Sleeping
| ## data ingestion & preprocessing & schema detection | |
| import pandas as pd | |
| import numpy as np | |
| from pathlib import Path | |
| import json | |
| class DataProcessor: | |
| def __init__(self): | |
| self.df = None | |
| self.schema = {} | |
| def load_data(self, file_path): | |
| ##______________load csv or json file________________________ | |
| file_ext = Path(file_path).suffix.lower() | |
| if file_ext == '.csv': | |
| self.df = pd.read_csv(file_path) | |
| elif file_ext == '.json': | |
| self.df = pd.read_json(file_path) | |
| else: | |
| raise ValueError("Unsupported file type. Use CSV or JSON file") | |
| return self.df | |
| def load_from_upload(self, uploaded_file): | |
| ###__________load from stramlit upload_____________ | |
| if uploaded_file.name.endswith('.csv'): | |
| self.df = pd.read_csv(uploaded_file) | |
| elif uploaded_file.name.endswith('.json'): | |
| self.df = pd.read_json(uploaded_file) | |
| else: | |
| raise ValueError("Unsupported file type") | |
| return self.df | |
| def preprocess(self): | |
| """Step 2: Clean the data - Enhanced version""" | |
| print("๐ Preprocessing data...") | |
| #FIRST: Replace '?' and other placeholders with NaN | |
| placeholder_values = ['?', 'None', 'null', 'NULL', 'NaN', 'nan', '', ' ', 'Unknown', 'unknown'] | |
| self.df = self.df.replace(placeholder_values, pd.NA) | |
| # Remove duplicate rows | |
| initial_rows = len(self.df) | |
| self.df = self.df.drop_duplicates() | |
| print(f" Removed {initial_rows - len(self.df)} duplicates") | |
| #Handle missing values | |
| missing_before = self.df.isnull().sum().sum() | |
| #For numeric columns: fill with median | |
| numeric_cols = self.df.select_dtypes(include=[np.number]).columns | |
| for col in numeric_cols: | |
| self.df[col] = self.df[col].fillna(self.df[col].median()) | |
| #For categorical columns: fill with mode or 'Unknown' | |
| categorical_cols = self.df.select_dtypes(include=['object']).columns | |
| for col in categorical_cols: | |
| if not self.df[col].isnull().all(): | |
| mode_val = self.df[col].mode() | |
| if len(mode_val) > 0: | |
| self.df[col] = self.df[col].fillna(mode_val[0]) | |
| else: | |
| self.df[col] = self.df[col].fillna("Unknown") | |
| missing_after = self.df.isnull().sum().sum() | |
| print(f" Filled {missing_before - missing_after} missing values") | |
| #Convert data types intelligently | |
| self._convert_types() | |
| return self.df | |
| def _convert_types(self): | |
| ##________auto-convert data typpes_______ | |
| #try to convert object columns to datetime | |
| for col in self.df.columns: | |
| if self.df[col].dtype == 'object': | |
| try: | |
| self.df[col] = pd.to_datetime(self.df[col]) | |
| print(f" Converted {col} to datetime") | |
| except: | |
| pass | |
| def detect_schema(self): | |
| """Step 3: Detect schema - identify column types""" | |
| self.schema = { | |
| 'numeric': [], | |
| 'categorical': [], | |
| 'datetime': [], | |
| 'text': [] | |
| } | |
| for col in self.df.columns: | |
| if pd.api.types.is_datetime64_any_dtype(self.df[col]): | |
| self.schema['datetime'].append(col) | |
| elif pd.api.types.is_numeric_dtype(self.df[col]): | |
| self.schema['numeric'].append(col) | |
| elif pd.api.types.is_object_dtype(self.df[col]): | |
| # Check if it's categorical (few unique values) | |
| unique_ratio = self.df[col].nunique() / len(self.df) | |
| # Lower threshold to catch more categories (0.05 = 5%) | |
| if unique_ratio < 0.5: # Changed from 0.05 to 0.5 to catch product, category, region | |
| self.schema['categorical'].append(col) | |
| else: | |
| self.schema['text'].append(col) | |
| print("\n๐ Schema Detected:") | |
| print(f" Numeric columns: {self.schema['numeric']}") | |
| print(f" Categorical columns: {self.schema['categorical']}") | |
| print(f" Date columns: {self.schema['datetime']}") | |
| return self.schema | |
| def get_summary(self): | |
| ##__________get basic data summary_________ | |
| return{ | |
| 'rows': len(self.df), | |
| 'columns': len(self.df.columns), | |
| 'column_names': list(self.df.columns), | |
| 'missing_values': self.df.isnull().sum().to_dict(), | |
| 'memory_usage': self.df.memory_usage(deep=True).sum() / 1024**2 # MB | |
| } |