smart-analytics-copilot / app /data_processor.py
SamadhiDBS's picture
Update app/data_processor.py
a897569 verified
## data ingestion & preprocessing & schema detection
import pandas as pd
import numpy as np
from pathlib import Path
import json
class DataProcessor:
def __init__(self):
self.df = None
self.schema = {}
def load_data(self, file_path):
##______________load csv or json file________________________
file_ext = Path(file_path).suffix.lower()
if file_ext == '.csv':
self.df = pd.read_csv(file_path)
elif file_ext == '.json':
self.df = pd.read_json(file_path)
else:
raise ValueError("Unsupported file type. Use CSV or JSON file")
return self.df
def load_from_upload(self, uploaded_file):
###__________load from stramlit upload_____________
if uploaded_file.name.endswith('.csv'):
self.df = pd.read_csv(uploaded_file)
elif uploaded_file.name.endswith('.json'):
self.df = pd.read_json(uploaded_file)
else:
raise ValueError("Unsupported file type")
return self.df
def preprocess(self):
"""Step 2: Clean the data - Enhanced version"""
print("๐Ÿ”„ Preprocessing data...")
#FIRST: Replace '?' and other placeholders with NaN
placeholder_values = ['?', 'None', 'null', 'NULL', 'NaN', 'nan', '', ' ', 'Unknown', 'unknown']
self.df = self.df.replace(placeholder_values, pd.NA)
# Remove duplicate rows
initial_rows = len(self.df)
self.df = self.df.drop_duplicates()
print(f" Removed {initial_rows - len(self.df)} duplicates")
#Handle missing values
missing_before = self.df.isnull().sum().sum()
#For numeric columns: fill with median
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
self.df[col] = self.df[col].fillna(self.df[col].median())
#For categorical columns: fill with mode or 'Unknown'
categorical_cols = self.df.select_dtypes(include=['object']).columns
for col in categorical_cols:
if not self.df[col].isnull().all():
mode_val = self.df[col].mode()
if len(mode_val) > 0:
self.df[col] = self.df[col].fillna(mode_val[0])
else:
self.df[col] = self.df[col].fillna("Unknown")
missing_after = self.df.isnull().sum().sum()
print(f" Filled {missing_before - missing_after} missing values")
#Convert data types intelligently
self._convert_types()
return self.df
def _convert_types(self):
##________auto-convert data typpes_______
#try to convert object columns to datetime
for col in self.df.columns:
if self.df[col].dtype == 'object':
try:
self.df[col] = pd.to_datetime(self.df[col])
print(f" Converted {col} to datetime")
except:
pass
def detect_schema(self):
"""Step 3: Detect schema - identify column types"""
self.schema = {
'numeric': [],
'categorical': [],
'datetime': [],
'text': []
}
for col in self.df.columns:
if pd.api.types.is_datetime64_any_dtype(self.df[col]):
self.schema['datetime'].append(col)
elif pd.api.types.is_numeric_dtype(self.df[col]):
self.schema['numeric'].append(col)
elif pd.api.types.is_object_dtype(self.df[col]):
# Check if it's categorical (few unique values)
unique_ratio = self.df[col].nunique() / len(self.df)
# Lower threshold to catch more categories (0.05 = 5%)
if unique_ratio < 0.5: # Changed from 0.05 to 0.5 to catch product, category, region
self.schema['categorical'].append(col)
else:
self.schema['text'].append(col)
print("\n๐Ÿ“Š Schema Detected:")
print(f" Numeric columns: {self.schema['numeric']}")
print(f" Categorical columns: {self.schema['categorical']}")
print(f" Date columns: {self.schema['datetime']}")
return self.schema
def get_summary(self):
##__________get basic data summary_________
return{
'rows': len(self.df),
'columns': len(self.df.columns),
'column_names': list(self.df.columns),
'missing_values': self.df.isnull().sum().to_dict(),
'memory_usage': self.df.memory_usage(deep=True).sum() / 1024**2 # MB
}