EDA_Explorer / agents /analysis_agent.py
ProfessionalMario's picture
Fresh deployment with LFS tracking
9eecab5
from utils.logger import logger
import numpy as np
from sklearn.ensemble import RandomForestClassifier
class AnalysisAgent:
def __init__(self, registry):
self.registry = registry
# ---------------------------
# Proper dataset extraction
# ---------------------------
def _extract_dataset(self, text):
print("🔍 Running dataset analysis...")
datasets = self.registry.list_datasets()
words = str(text).lower().split()
for word in words:
for d in datasets:
if word == d.lower():
return d
return None
# ---------------------------
# Remove ID-like columns
# ---------------------------
def _remove_id_like_columns(self, df):
cols_to_drop = []
for col in df.columns:
unique_ratio = df[col].nunique() / len(df)
if unique_ratio > 0.9:
cols_to_drop.append(col)
df_clean = df.drop(columns=cols_to_drop)
return df_clean, cols_to_drop
# ---------------------------
# Select target column
# ---------------------------
def _select_target(self, df):
candidates = []
for col in df.columns:
unique_count = df[col].nunique()
unique_ratio = unique_count / len(df)
# Skip obvious bad columns
if any(k in col.lower() for k in ["id", "name", "email", "phone"]):
continue
# Skip high-cardinality
if unique_ratio > 0.5:
continue
# Prefer categorical / classification targets
if unique_count <= 20:
candidates.append((col, unique_count))
# Pick best candidate (lowest unique count but >1)
if candidates:
candidates = sorted(candidates, key=lambda x: x[1])
return candidates[0][0]
return None
# ---------------------------
# Feature importance
# ---------------------------
def _compute_feature_importance(self, df):
df_clean, dropped_cols = self._remove_id_like_columns(df)
if len(df.columns) <= 2:
return None, dropped_cols, "Dataset too small for feature importance."
target = self._select_target(df_clean)
if not target:
return None, dropped_cols, "No suitable target column found."
y = df_clean[target]
# Fix NaN issue
if y.isnull().sum() > 0:
return None, dropped_cols, "Target contains missing values. Cannot compute feature importance."
if not target:
return None, dropped_cols, "No suitable target column found."
# Prevent sklearn warning
if df_clean[target].nunique() > 0.5 * len(df_clean):
return None, dropped_cols, "Target not suitable for classification."
X = df_clean.drop(columns=[target])
y = df_clean[target]
# Encode categoricals
X = X.apply(lambda col: col.astype('category').cat.codes)
try:
model = RandomForestClassifier(n_estimators=50)
model.fit(X, y)
importances = dict(zip(X.columns, model.feature_importances_))
sorted_imp = sorted(importances.items(), key=lambda x: x[1], reverse=True)
return sorted_imp[:5], dropped_cols, None
except Exception as e:
return None, dropped_cols, str(e)
# ---------------------------
# Optional explanation layer
# ---------------------------
def _explain_feature(self, col):
return f"{col} shows strong predictive signal based on dataset patterns."
#----------------------------
# Outlier Detection
#----------------------------
def _detect_outliers(self, df):
try:
numeric_df = df.select_dtypes(include="number")
outlier_summary = {}
for col in numeric_df.columns:
q1 = numeric_df[col].quantile(0.25)
q3 = numeric_df[col].quantile(0.75)
iqr = q3 - q1
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
outliers = numeric_df[(numeric_df[col] < lower) | (numeric_df[col] > upper)]
if len(outliers) > 0:
outlier_summary[col] = len(outliers)
return outlier_summary, None
except Exception as e:
logger.error(f"Outlier detection failed | {e}")
return None, str(e)
#---------------------------
# Correlation analysis
#---------------------------
def _compute_correlation(self, df):
try:
numeric_df = df.select_dtypes(include="number")
if numeric_df.shape[1] < 2:
return None, "Not enough numeric columns for correlation."
# corr = numeric_df.corr()
# Get top correlations (excluding self)
corr_matrix = numeric_df.corr().abs()
upper = corr_matrix.where(
np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
)
top_pairs = (
upper.unstack()
.dropna()
.sort_values(ascending=False)
.head(5)
)
return top_pairs.to_dict(), None
except Exception as e:
logger.error(f"Correlation failed | {e}")
return None, str(e)
#-------------------------
#Saving report
#-------------------------
def _export_report(self, dataset, content):
try:
path = f"output/report_{dataset}.txt"
with open(path, "w", encoding="utf-8") as f:
f.write(content)
logger.info(f"Report exported: {path}")
return path
except Exception as e:
logger.error(f"Report export failed | {e}")
return None
# ---------------------------
# MAIN HANDLER
# ---------------------------
def handle(self, dataset=None):
try:
# ---- HANDLE "analyze people" CASE ----
if isinstance(dataset, str):
extracted = self._extract_dataset(dataset)
if extracted:
dataset = extracted
# ---- STRICT DATASET CHECK ----
if not dataset:
return "Please specify a dataset (e.g., 'analyze people')"
df = self.registry.load_dataframe(dataset)
except Exception as e:
logger.error(f"Failed loading dataset | {e}")
return f"Failed to load dataset: {dataset}"
try:
# ---------- OUTPUT ----------
output = []
rows, cols = df.shape
print("🧹 Checking duplicates...")
# ---------- DATA QUALITY ----------
total_missing = df.isnull().sum().sum()
duplicates = df.duplicated().sum()
missing_by_column = df.isnull().sum()
missing_by_column = missing_by_column[missing_by_column > 0]
# ---------- COLUMN TYPES ----------
numeric_cols = df.select_dtypes(include="number").columns.tolist()
categorical_cols = df.select_dtypes(exclude="number").columns.tolist()
# ---------- WARNINGS ----------
print("⚠️ Generating warnings...")
warnings = []
for col in df.columns:
if len(df) == 0:
continue
unique_ratio = df[col].nunique() / len(df)
if unique_ratio > 0.95 and "id" in col.lower():
warnings.append(f"{col} looks like an ID column")
missing_ratio = df[col].isnull().sum() / len(df)
if missing_ratio > 0.5:
warnings.append(f"{col} has {missing_ratio:.2%} missing values")
if df[col].nunique() == 1:
warnings.append(f"{col} is constant (no variance)")
# ---------- FEATURE IMPORTANCE (NEW CLEAN VERSION) ----------
print("📈 Looking for potential feature importance...")
fi, dropped_cols, error = self._compute_feature_importance(df)
# ---------- CORRELATION ANALYSIS ----------
print("📊 Computing correlation...")
corr_pairs, corr_error = self._compute_correlation(df)
output= []
output.append(f"\nDataset Analysis: {dataset}")
output.append("=" * 40)
output.append(f"Rows: {rows}")
output.append(f"Columns: {cols}")
output.append("\nData Quality")
output.append("-" * 20)
output.append(f"Total Missing Values : {total_missing}")
output.append(f"Duplicate Rows : {duplicates}")
# ---------- CORRELATION OUTPUT ----------
output.append("\nTop Correlations")
output.append("-" * 20)
if corr_error:
output.append(corr_error)
elif corr_pairs is not None:
for (col1, col2), val in corr_pairs.items():
output.append(f"{col1}{col2}: {val:.3f}")
else:
output.append("No correlation data available.")
if not missing_by_column.empty:
output.append("\nMissing by Column")
output.append("-" * 20)
for col, val in missing_by_column.items():
output.append(f"{col}: {val}")
output.append("\nColumn Types")
output.append("-" * 20)
output.append(f"Numeric : {', '.join(numeric_cols) if numeric_cols else 'None'}")
output.append(f"Categorical : {', '.join(categorical_cols) if categorical_cols else 'None'}")
if warnings:
output.append("\n⚠️ Data Warnings")
output.append("-" * 20)
for w in warnings[:5]:
output.append(f"- {w}")
# ---------- FEATURE IMPORTANCE OUTPUT ----------
output.append("\nPotential Feature Importance")
output.append("-" * 20)
if error:
output.append(error)
else:
for col, score in fi:
explanation = self._explain_feature(col)
output.append(f"{col}: {score:.4f}{explanation}")
# ---------- DROPPED COLUMNS ----------
if dropped_cols:
output.append("\n⚠️ Ignored high-cardinality columns:")
for col in dropped_cols:
output.append(f"- {col}")
# ---------- EXPORT (ONLY ONCE) ----------
report_path = self._export_report(dataset, "\n".join(output))
if report_path:
output.append(f"\n📁 Report saved to: {report_path}")
return "\n".join(output)
except Exception as e:
logger.error(f"Analysis failed | {e}")
return "Analysis agent error."