## data ingestion & preprocessing & schema detection import pandas as pd import numpy as np from pathlib import Path import json class DataProcessor: def __init__(self): self.df = None self.schema = {} def load_data(self, file_path): ##______________load csv or json file________________________ file_ext = Path(file_path).suffix.lower() if file_ext == '.csv': self.df = pd.read_csv(file_path) elif file_ext == '.json': self.df = pd.read_json(file_path) else: raise ValueError("Unsupported file type. Use CSV or JSON file") return self.df def load_from_upload(self, uploaded_file): ###__________load from stramlit upload_____________ if uploaded_file.name.endswith('.csv'): self.df = pd.read_csv(uploaded_file) elif uploaded_file.name.endswith('.json'): self.df = pd.read_json(uploaded_file) else: raise ValueError("Unsupported file type") return self.df def preprocess(self): """Step 2: Clean the data - Enhanced version""" print("šŸ”„ Preprocessing data...") #FIRST: Replace '?' and other placeholders with NaN placeholder_values = ['?', 'None', 'null', 'NULL', 'NaN', 'nan', '', ' ', 'Unknown', 'unknown'] self.df = self.df.replace(placeholder_values, pd.NA) # Remove duplicate rows initial_rows = len(self.df) self.df = self.df.drop_duplicates() print(f" Removed {initial_rows - len(self.df)} duplicates") #Handle missing values missing_before = self.df.isnull().sum().sum() #For numeric columns: fill with median numeric_cols = self.df.select_dtypes(include=[np.number]).columns for col in numeric_cols: self.df[col] = self.df[col].fillna(self.df[col].median()) #For categorical columns: fill with mode or 'Unknown' categorical_cols = self.df.select_dtypes(include=['object']).columns for col in categorical_cols: if not self.df[col].isnull().all(): mode_val = self.df[col].mode() if len(mode_val) > 0: self.df[col] = self.df[col].fillna(mode_val[0]) else: self.df[col] = self.df[col].fillna("Unknown") missing_after = self.df.isnull().sum().sum() print(f" Filled {missing_before - missing_after} missing values") #Convert data types intelligently self._convert_types() return self.df def _convert_types(self): ##________auto-convert data typpes_______ #try to convert object columns to datetime for col in self.df.columns: if self.df[col].dtype == 'object': try: self.df[col] = pd.to_datetime(self.df[col]) print(f" Converted {col} to datetime") except: pass def detect_schema(self): """Step 3: Detect schema - identify column types""" self.schema = { 'numeric': [], 'categorical': [], 'datetime': [], 'text': [] } for col in self.df.columns: if pd.api.types.is_datetime64_any_dtype(self.df[col]): self.schema['datetime'].append(col) elif pd.api.types.is_numeric_dtype(self.df[col]): self.schema['numeric'].append(col) elif pd.api.types.is_object_dtype(self.df[col]): # Check if it's categorical (few unique values) unique_ratio = self.df[col].nunique() / len(self.df) # Lower threshold to catch more categories (0.05 = 5%) if unique_ratio < 0.5: # Changed from 0.05 to 0.5 to catch product, category, region self.schema['categorical'].append(col) else: self.schema['text'].append(col) print("\nšŸ“Š Schema Detected:") print(f" Numeric columns: {self.schema['numeric']}") print(f" Categorical columns: {self.schema['categorical']}") print(f" Date columns: {self.schema['datetime']}") return self.schema def get_summary(self): ##__________get basic data summary_________ return{ 'rows': len(self.df), 'columns': len(self.df.columns), 'column_names': list(self.df.columns), 'missing_values': self.df.isnull().sum().to_dict(), 'memory_usage': self.df.memory_usage(deep=True).sum() / 1024**2 # MB }