##________automated analysis________## import pandas as pd import numpy as np from scipy import stats class Analyzer: def __init__(self, df, schema): self.df = df self.schema = schema self.insights = [] def run_full_analysis(self): """run all analysis methods""" print("Running automated analysis....") analysis = { 'descriptive_stats': self.descriptive_statistics(), 'correlations': self.correlation_analysis(), 'trends': self.trend_detection(), 'group_analysis': self.group_by_analysis(), 'outliers': self.detect_outliers(), 'distributions': self.get_distributions() } return analysis def descriptive_statistics(self): """basic statistics for numeric columns""" stats = {} for col in self.schema['numeric']: stats[col] = { 'mean': self.df[col].mean(), 'median': self.df[col].median(), 'std': self.df[col].std(), 'min': self.df[col].min(), 'max': self.df[col].max(), 'q1': self.df[col].quantile(0.25), 'q3': self.df[col].quantile(0.75) } return stats def correlation_analysis(self): """fins correlations between numeric columns""" if len(self.schema['numeric']) >= 2: corr_matrix = self.df[self.schema['numeric']].corr() ## ind strong correlations strong_corrs = [] for i in range(len(corr_matrix.columns)): for j in range(i+1, len(corr_matrix.columns)): corr_value = corr_matrix.iloc[i,j] if abs(corr_value) > 0.5: # strong correlation threshold strong_corrs.append({ 'col1': corr_matrix.columns[i], 'col2': corr_matrix.columns[j], 'correlation': corr_value, 'strength': 'positive' if corr_value > 0 else 'negative' }) return strong_corrs return [] def trend_detection(self): """detect trends in time series data""" trends = [] for date_col in self.schema['datetime']: for num_col in self.schema['numeric']: #group by date and calculate mean trend_data = self.df.groupby(pd.Grouper(key=date_col, freq='M'))[num_col].mean() if len(trend_data) > 1: # simple trend detection: compare first and last first_val = trend_data.iloc[0] last_val = trend_data.iloc[-1] percent_change = ((last_val - first_val) / first_val) * 100 if first_val != 0 else 0 trends.append({ 'column': num_col, 'time_column': date_col, 'percent_change': percent_change, 'direction': 'increasing' if percent_change > 0 else 'decreasing', 'first_value': first_val, 'last_value': last_val }) return trends def group_by_analysis(self): """analyze data by categorical groups""" group_analysis = {} for cat_col in self.schema['categorical']: group_analysis[cat_col] = {} for num_col in self.schema['numeric']: grouped = self.df.groupby(cat_col)[num_col].agg(['mean', 'sum', 'count']) #find top performer top_category = grouped['mean'].idxmax() if len(grouped) > 0 else None top_value = grouped['mean'].max() if len(grouped) > 0 else 0 group_analysis[cat_col][num_col] = { 'grouped_data': grouped.to_dict(), 'top_category': top_category, 'top_value': top_value, 'total_categories': len(grouped) } return group_analysis def detect_outliers(self): """detect outliers using IQR method""" outliers = {} for col in self.schema['numeric']: Q1 = self.df[col].quantile(0.25) Q3 = self.df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outlier_count = len(self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)]) if outlier_count > 0: outliers[col] = { 'count': outlier_count, 'percentage': (outlier_count / len(self.df)) * 100, 'lower_bound': lower_bound, 'upper_bound': upper_bound } return outliers def get_distributions(self): """get distribution information for numeric columns""" distributions = {} for col in self.schema['numeric']: distributions[col] = { 'skewness': self.df[col].skew(), 'kurtosis': self.df[col].kurtosis(), 'unique_values': self.df[col].nunique() } #determine distribution shape skew = distributions[col]['skewness'] if skew > 1: distributions[col]['shape'] = 'right-skewed' elif skew < -1: distributions[col]['shape'] = 'left-skewed' else: distributions[col]['shape'] = 'approximately normal' return distributions