Auto_ML / backend /core /data_profiler.py
abhiraj12's picture
Initial commit
2c29579
import pandas as pd
import numpy as np
def profile_dataset(df: pd.DataFrame) -> dict:
# ✅ FIX 1: handle empty dataframe
if df is None or df.empty:
return {"error": "Empty dataset"}
rows, cols = df.shape
if rows < 1000:
size = "Small"
elif rows < 100000:
size = "Medium"
else:
size = "Large"
# Missing values
total_cells = rows * cols
missing_by_col = df.isnull().sum()
missing_cells = int(missing_by_col.sum())
missing_pct = round((missing_cells / total_cells) * 100, 2) if total_cells > 0 else 0
# Sampling
is_sampled = rows > 100000
if is_sampled:
# ✅ FIX 2: prevent crash if rows < 100000 due to edge conditions
sample_n = min(100000, rows)
df_stats = df.sample(n=sample_n, random_state=42)
else:
df_stats = df
columns = df.columns.tolist()
target_names = [
'target', 'label', 'class', 'outcome', 'result', 'churn', 'price',
'default', 'y', 'survived', 'fraud', 'output', 'pred', 'prediction',
'target_class', 'target_label'
]
id_hints = ['id', 'uuid', 'uid', 'index', 'idx', 'timestamp', 'date', 'time', 'row_id']
suggested_target = None
for col in columns:
# ✅ FIX 3: safe string conversion
if str(col).lower() in target_names:
suggested_target = col
break
if not suggested_target and columns:
for col in reversed(columns):
if not any(h in str(col).lower() for h in id_hints):
suggested_target = col
break
if not suggested_target:
suggested_target = columns[-1]
# Feature types
num_cols = df_stats.select_dtypes(include=[np.number]).columns.tolist()
cat_cols = df_stats.select_dtypes(include=['object', 'category']).columns.tolist()
# Model suggestion
suggested_model = "Tree-based (Random Forest / XGBoost)"
if rows > 10000 and len(num_cols) > len(cat_cols):
suggested_model = "Gradient Boosting (XGBoost/LightGBM)"
elif len(num_cols) > 0 and len(cat_cols) == 0 and rows < 5000:
suggested_model = "Linear Model / SVM"
# Imbalance
imbalance = "Low"
if suggested_target and suggested_target in df_stats.columns:
target_counts = df_stats[suggested_target].value_counts()
# ✅ FIX 4: avoid division errors
if len(target_counts) >= 2 and target_counts.iloc[1] != 0:
ratio = target_counts.iloc[0] / target_counts.iloc[1]
if ratio > 3 or ratio < 0.33:
imbalance = "High ⚠️"
column_stats = {}
feature_types = {}
for col in columns:
# ✅ FIX 5: safe column access
if col not in df_stats.columns:
continue
col_series_stats = df_stats[col]
unique_count = int(col_series_stats.nunique())
unique_pct = unique_count / (len(df_stats) if len(df_stats) > 0 else 1)
stats = {
"dtype": str(df[col].dtype),
"missing": int(missing_by_col.get(col, 0)), # ✅ FIX 6
"missing_pct": round(float(missing_by_col.get(col, 0) / rows * 100), 1) if rows > 0 else 0,
"unique": unique_count,
"unique_pct": unique_pct,
"outliers": 0
}
# Semantic typing
semantic_type = "Unknown"
if unique_count == 2:
semantic_type = "Binary"
elif any(id_str in str(col).lower() for id_str in ['id', 'uuid', 'index']) and unique_pct > 0.8:
semantic_type = "ID/Index"
elif col in num_cols:
# ✅ FIX 7: robust dtype check
if pd.api.types.is_float_dtype(df[col]):
semantic_type = "Continuous"
elif unique_count < 20:
semantic_type = "Discrete/Ordinal"
else:
semantic_type = "Continuous"
else:
# ✅ FIX 8: proper datetime detection
if pd.api.types.is_datetime64_any_dtype(df[col]):
semantic_type = "DateTime"
else:
semantic_type = "Nominal Category"
feature_types[col] = semantic_type
stats["semantic_type"] = semantic_type
# Numeric stats
if col in num_cols and not col_series_stats.isnull().all():
try:
stats["mean"] = round(float(col_series_stats.mean()), 4)
stats["std"] = round(float(col_series_stats.std()), 4)
stats["min"] = round(float(col_series_stats.min()), 4)
stats["max"] = round(float(col_series_stats.max()), 4)
stats["skew"] = (
round(float(col_series_stats.skew()), 1)
if not is_sampled else "N/A (Sampled)"
)
# Outliers (IQR)
q1 = col_series_stats.quantile(0.25)
q3 = col_series_stats.quantile(0.75)
iqr = q3 - q1
if iqr != 0: # ✅ FIX 9: avoid zero division
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
outliers = col_series_stats[
(col_series_stats < lower_bound) | (col_series_stats > upper_bound)
]
stats["outliers"] = int(len(outliers))
stats["outlier_pct"] = round(
(stats["outliers"] / len(df_stats)) * 100, 2
)
except Exception:
pass # ✅ FIX 10: prevent crash on bad data
elif col in cat_cols:
try:
top_vals = col_series_stats.value_counts().head(3)
stats["top_values"] = top_vals.index.tolist()
except Exception:
stats["top_values"] = []
column_stats[col] = stats
task_type = "classification"
if suggested_target and suggested_target in df.columns:
target_series = df[suggested_target].dropna()
if not target_series.empty:
if pd.api.types.is_numeric_dtype(target_series):
unique_count = target_series.nunique(dropna=True)
unique_ratio = unique_count / max(len(target_series), 1)
if pd.api.types.is_float_dtype(target_series) or not (
unique_count <= 20 and unique_ratio <= 0.2
):
task_type = "regression"
# Health score
try:
from core.health_score import compute_health_score
health_metadata = compute_health_score({
"rows": rows,
"cols": cols,
"missing_pct": missing_pct,
"imbalance": imbalance,
"num_cols": num_cols,
"cat_cols": cat_cols,
"column_stats": column_stats
})
except Exception:
health_metadata = {"error": "health_score unavailable"} # ✅ FIX 11
return {
"rows": rows,
"cols": cols,
"size": size,
"missing_pct": missing_pct,
"missing_values": missing_cells,
"columns": columns,
"num_cols": num_cols,
"cat_cols": cat_cols,
"imbalance": imbalance,
"suggested_target": suggested_target,
"task_type": task_type,
"suggested_model": suggested_model,
"column_stats": column_stats,
"is_sampled": is_sampled,
"sample_size": len(df_stats), # ✅ FIX 12 (accurate)
"health": health_metadata
}