Spaces:
Sleeping
Sleeping
| ##________automated analysis________## | |
| import pandas as pd | |
| import numpy as np | |
| from scipy import stats | |
| class Analyzer: | |
| def __init__(self, df, schema): | |
| self.df = df | |
| self.schema = schema | |
| self.insights = [] | |
| def run_full_analysis(self): | |
| """run all analysis methods""" | |
| print("Running automated analysis....") | |
| analysis = { | |
| 'descriptive_stats': self.descriptive_statistics(), | |
| 'correlations': self.correlation_analysis(), | |
| 'trends': self.trend_detection(), | |
| 'group_analysis': self.group_by_analysis(), | |
| 'outliers': self.detect_outliers(), | |
| 'distributions': self.get_distributions() | |
| } | |
| return analysis | |
| def descriptive_statistics(self): | |
| """basic statistics for numeric columns""" | |
| stats = {} | |
| for col in self.schema['numeric']: | |
| stats[col] = { | |
| 'mean': self.df[col].mean(), | |
| 'median': self.df[col].median(), | |
| 'std': self.df[col].std(), | |
| 'min': self.df[col].min(), | |
| 'max': self.df[col].max(), | |
| 'q1': self.df[col].quantile(0.25), | |
| 'q3': self.df[col].quantile(0.75) | |
| } | |
| return stats | |
| def correlation_analysis(self): | |
| """fins correlations between numeric columns""" | |
| if len(self.schema['numeric']) >= 2: | |
| corr_matrix = self.df[self.schema['numeric']].corr() | |
| ## ind strong correlations | |
| strong_corrs = [] | |
| for i in range(len(corr_matrix.columns)): | |
| for j in range(i+1, len(corr_matrix.columns)): | |
| corr_value = corr_matrix.iloc[i,j] | |
| if abs(corr_value) > 0.5: # strong correlation threshold | |
| strong_corrs.append({ | |
| 'col1': corr_matrix.columns[i], | |
| 'col2': corr_matrix.columns[j], | |
| 'correlation': corr_value, | |
| 'strength': 'positive' if corr_value > 0 else 'negative' | |
| }) | |
| return strong_corrs | |
| return [] | |
| def trend_detection(self): | |
| """detect trends in time series data""" | |
| trends = [] | |
| for date_col in self.schema['datetime']: | |
| for num_col in self.schema['numeric']: | |
| #group by date and calculate mean | |
| trend_data = self.df.groupby(pd.Grouper(key=date_col, freq='M'))[num_col].mean() | |
| if len(trend_data) > 1: | |
| # simple trend detection: compare first and last | |
| first_val = trend_data.iloc[0] | |
| last_val = trend_data.iloc[-1] | |
| percent_change = ((last_val - first_val) / first_val) * 100 if first_val != 0 else 0 | |
| trends.append({ | |
| 'column': num_col, | |
| 'time_column': date_col, | |
| 'percent_change': percent_change, | |
| 'direction': 'increasing' if percent_change > 0 else 'decreasing', | |
| 'first_value': first_val, | |
| 'last_value': last_val | |
| }) | |
| return trends | |
| def group_by_analysis(self): | |
| """analyze data by categorical groups""" | |
| group_analysis = {} | |
| for cat_col in self.schema['categorical']: | |
| group_analysis[cat_col] = {} | |
| for num_col in self.schema['numeric']: | |
| grouped = self.df.groupby(cat_col)[num_col].agg(['mean', 'sum', 'count']) | |
| #find top performer | |
| top_category = grouped['mean'].idxmax() if len(grouped) > 0 else None | |
| top_value = grouped['mean'].max() if len(grouped) > 0 else 0 | |
| group_analysis[cat_col][num_col] = { | |
| 'grouped_data': grouped.to_dict(), | |
| 'top_category': top_category, | |
| 'top_value': top_value, | |
| 'total_categories': len(grouped) | |
| } | |
| return group_analysis | |
| def detect_outliers(self): | |
| """detect outliers using IQR method""" | |
| outliers = {} | |
| for col in self.schema['numeric']: | |
| Q1 = self.df[col].quantile(0.25) | |
| Q3 = self.df[col].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| outlier_count = len(self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)]) | |
| if outlier_count > 0: | |
| outliers[col] = { | |
| 'count': outlier_count, | |
| 'percentage': (outlier_count / len(self.df)) * 100, | |
| 'lower_bound': lower_bound, | |
| 'upper_bound': upper_bound | |
| } | |
| return outliers | |
| def get_distributions(self): | |
| """get distribution information for numeric columns""" | |
| distributions = {} | |
| for col in self.schema['numeric']: | |
| distributions[col] = { | |
| 'skewness': self.df[col].skew(), | |
| 'kurtosis': self.df[col].kurtosis(), | |
| 'unique_values': self.df[col].nunique() | |
| } | |
| #determine distribution shape | |
| skew = distributions[col]['skewness'] | |
| if skew > 1: | |
| distributions[col]['shape'] = 'right-skewed' | |
| elif skew < -1: | |
| distributions[col]['shape'] = 'left-skewed' | |
| else: | |
| distributions[col]['shape'] = 'approximately normal' | |
| return distributions | |