Spaces:
Sleeping
Sleeping
File size: 5,988 Bytes
d18f851 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | ##________automated analysis________##
import pandas as pd
import numpy as np
from scipy import stats
class Analyzer:
def __init__(self, df, schema):
self.df = df
self.schema = schema
self.insights = []
def run_full_analysis(self):
"""run all analysis methods"""
print("Running automated analysis....")
analysis = {
'descriptive_stats': self.descriptive_statistics(),
'correlations': self.correlation_analysis(),
'trends': self.trend_detection(),
'group_analysis': self.group_by_analysis(),
'outliers': self.detect_outliers(),
'distributions': self.get_distributions()
}
return analysis
def descriptive_statistics(self):
"""basic statistics for numeric columns"""
stats = {}
for col in self.schema['numeric']:
stats[col] = {
'mean': self.df[col].mean(),
'median': self.df[col].median(),
'std': self.df[col].std(),
'min': self.df[col].min(),
'max': self.df[col].max(),
'q1': self.df[col].quantile(0.25),
'q3': self.df[col].quantile(0.75)
}
return stats
def correlation_analysis(self):
"""fins correlations between numeric columns"""
if len(self.schema['numeric']) >= 2:
corr_matrix = self.df[self.schema['numeric']].corr()
## ind strong correlations
strong_corrs = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
corr_value = corr_matrix.iloc[i,j]
if abs(corr_value) > 0.5: # strong correlation threshold
strong_corrs.append({
'col1': corr_matrix.columns[i],
'col2': corr_matrix.columns[j],
'correlation': corr_value,
'strength': 'positive' if corr_value > 0 else 'negative'
})
return strong_corrs
return []
def trend_detection(self):
"""detect trends in time series data"""
trends = []
for date_col in self.schema['datetime']:
for num_col in self.schema['numeric']:
#group by date and calculate mean
trend_data = self.df.groupby(pd.Grouper(key=date_col, freq='M'))[num_col].mean()
if len(trend_data) > 1:
# simple trend detection: compare first and last
first_val = trend_data.iloc[0]
last_val = trend_data.iloc[-1]
percent_change = ((last_val - first_val) / first_val) * 100 if first_val != 0 else 0
trends.append({
'column': num_col,
'time_column': date_col,
'percent_change': percent_change,
'direction': 'increasing' if percent_change > 0 else 'decreasing',
'first_value': first_val,
'last_value': last_val
})
return trends
def group_by_analysis(self):
"""analyze data by categorical groups"""
group_analysis = {}
for cat_col in self.schema['categorical']:
group_analysis[cat_col] = {}
for num_col in self.schema['numeric']:
grouped = self.df.groupby(cat_col)[num_col].agg(['mean', 'sum', 'count'])
#find top performer
top_category = grouped['mean'].idxmax() if len(grouped) > 0 else None
top_value = grouped['mean'].max() if len(grouped) > 0 else 0
group_analysis[cat_col][num_col] = {
'grouped_data': grouped.to_dict(),
'top_category': top_category,
'top_value': top_value,
'total_categories': len(grouped)
}
return group_analysis
def detect_outliers(self):
"""detect outliers using IQR method"""
outliers = {}
for col in self.schema['numeric']:
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_count = len(self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)])
if outlier_count > 0:
outliers[col] = {
'count': outlier_count,
'percentage': (outlier_count / len(self.df)) * 100,
'lower_bound': lower_bound,
'upper_bound': upper_bound
}
return outliers
def get_distributions(self):
"""get distribution information for numeric columns"""
distributions = {}
for col in self.schema['numeric']:
distributions[col] = {
'skewness': self.df[col].skew(),
'kurtosis': self.df[col].kurtosis(),
'unique_values': self.df[col].nunique()
}
#determine distribution shape
skew = distributions[col]['skewness']
if skew > 1:
distributions[col]['shape'] = 'right-skewed'
elif skew < -1:
distributions[col]['shape'] = 'left-skewed'
else:
distributions[col]['shape'] = 'approximately normal'
return distributions
|