SamadhiDBS's picture
Upload 24 files
d18f851 verified
##________automated analysis________##
import pandas as pd
import numpy as np
from scipy import stats
class Analyzer:
def __init__(self, df, schema):
self.df = df
self.schema = schema
self.insights = []
def run_full_analysis(self):
"""run all analysis methods"""
print("Running automated analysis....")
analysis = {
'descriptive_stats': self.descriptive_statistics(),
'correlations': self.correlation_analysis(),
'trends': self.trend_detection(),
'group_analysis': self.group_by_analysis(),
'outliers': self.detect_outliers(),
'distributions': self.get_distributions()
}
return analysis
def descriptive_statistics(self):
"""basic statistics for numeric columns"""
stats = {}
for col in self.schema['numeric']:
stats[col] = {
'mean': self.df[col].mean(),
'median': self.df[col].median(),
'std': self.df[col].std(),
'min': self.df[col].min(),
'max': self.df[col].max(),
'q1': self.df[col].quantile(0.25),
'q3': self.df[col].quantile(0.75)
}
return stats
def correlation_analysis(self):
"""fins correlations between numeric columns"""
if len(self.schema['numeric']) >= 2:
corr_matrix = self.df[self.schema['numeric']].corr()
## ind strong correlations
strong_corrs = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
corr_value = corr_matrix.iloc[i,j]
if abs(corr_value) > 0.5: # strong correlation threshold
strong_corrs.append({
'col1': corr_matrix.columns[i],
'col2': corr_matrix.columns[j],
'correlation': corr_value,
'strength': 'positive' if corr_value > 0 else 'negative'
})
return strong_corrs
return []
def trend_detection(self):
"""detect trends in time series data"""
trends = []
for date_col in self.schema['datetime']:
for num_col in self.schema['numeric']:
#group by date and calculate mean
trend_data = self.df.groupby(pd.Grouper(key=date_col, freq='M'))[num_col].mean()
if len(trend_data) > 1:
# simple trend detection: compare first and last
first_val = trend_data.iloc[0]
last_val = trend_data.iloc[-1]
percent_change = ((last_val - first_val) / first_val) * 100 if first_val != 0 else 0
trends.append({
'column': num_col,
'time_column': date_col,
'percent_change': percent_change,
'direction': 'increasing' if percent_change > 0 else 'decreasing',
'first_value': first_val,
'last_value': last_val
})
return trends
def group_by_analysis(self):
"""analyze data by categorical groups"""
group_analysis = {}
for cat_col in self.schema['categorical']:
group_analysis[cat_col] = {}
for num_col in self.schema['numeric']:
grouped = self.df.groupby(cat_col)[num_col].agg(['mean', 'sum', 'count'])
#find top performer
top_category = grouped['mean'].idxmax() if len(grouped) > 0 else None
top_value = grouped['mean'].max() if len(grouped) > 0 else 0
group_analysis[cat_col][num_col] = {
'grouped_data': grouped.to_dict(),
'top_category': top_category,
'top_value': top_value,
'total_categories': len(grouped)
}
return group_analysis
def detect_outliers(self):
"""detect outliers using IQR method"""
outliers = {}
for col in self.schema['numeric']:
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_count = len(self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)])
if outlier_count > 0:
outliers[col] = {
'count': outlier_count,
'percentage': (outlier_count / len(self.df)) * 100,
'lower_bound': lower_bound,
'upper_bound': upper_bound
}
return outliers
def get_distributions(self):
"""get distribution information for numeric columns"""
distributions = {}
for col in self.schema['numeric']:
distributions[col] = {
'skewness': self.df[col].skew(),
'kurtosis': self.df[col].kurtosis(),
'unique_values': self.df[col].nunique()
}
#determine distribution shape
skew = distributions[col]['skewness']
if skew > 1:
distributions[col]['shape'] = 'right-skewed'
elif skew < -1:
distributions[col]['shape'] = 'left-skewed'
else:
distributions[col]['shape'] = 'approximately normal'
return distributions