| import pandas as pd |
| import numpy as np |
| from prophet import Prophet |
| from datetime import datetime |
| import redis |
| import json |
| from sklearn.cluster import KMeans, DBSCAN |
| from sklearn.preprocessing import StandardScaler, MinMaxScaler |
| from sklearn.decomposition import PCA |
| from sklearn.ensemble import IsolationForest |
| from .json_utils import CustomJSONEncoder |
| from scipy import stats |
| from scipy.stats import pearsonr |
| from statsmodels.tsa.seasonal import seasonal_decompose |
| from statsmodels.tsa.stattools import adfuller |
| import networkx as nx |
| from sklearn.metrics import silhouette_score |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from .supermarket_metrics import supermarket_insights |
| from app.utils.detect_industry import is_supermarket |
|
|
| class AnalyticsService: |
| def __init__(self): |
| self.redis_client = redis.Redis(host='localhost', port=6379, db=0) |
| self.industry_metrics = { |
| 'retail': self._retail_metrics, |
| 'wholesale': self._wholesale_metrics, |
| 'supermarket': self._supermarket_metrics, |
| 'manufacturing': self._manufacturing_metrics, |
| 'healthcare': self._healthcare_metrics |
| } |
| self.cross_industry_analyzers = { |
| 'market_dynamics': self._analyze_market_dynamics, |
| 'supply_chain': self._analyze_supply_chain, |
| 'customer_insights': self._analyze_customer_insights, |
| 'operational_efficiency': self._analyze_operational_efficiency, |
| 'risk_assessment': self._analyze_risk_patterns, |
| 'sustainability': self._analyze_sustainability_metrics |
| } |
| |
| def perform_eda(self, data, industry=None): |
| """ |
| Perform enhanced Exploratory Data Analysis with cross-industry insights |
| """ |
| if not data: |
| raise ValueError("Empty dataset provided") |
| |
| df = pd.DataFrame(data) |
|
|
| if df.empty: |
| raise ValueError("Empty dataset provided") |
| |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| if len(numeric_cols) == 0: |
| raise ValueError("Non-numeric values found in dataset") |
| |
| |
| date_columns = [] |
| for col in df.columns: |
| if df[col].dtype == 'object': |
| try: |
| df[col] = pd.to_datetime(df[col]) |
| date_columns.append(col) |
| except (ValueError, TypeError): |
| continue |
| |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| |
| |
| analysis_results = { |
| 'basic_stats': df[numeric_cols].describe().to_dict() if len(numeric_cols) > 0 else {}, |
| 'missing_values': df.isnull().sum().to_dict(), |
| 'columns': list(df.columns), |
| 'row_count': len(df), |
| 'correlation_matrix': df[numeric_cols].corr().to_dict() if len(numeric_cols) > 0 else {}, |
| 'skewness': df[numeric_cols].skew().to_dict() if len(numeric_cols) > 0 else {}, |
| 'kurtosis': df[numeric_cols].kurtosis().to_dict() if len(numeric_cols) > 0 else {}, |
| 'outliers': self._detect_outliers(df), |
| 'distribution_tests': self._perform_distribution_tests(df), |
| 'dimensionality_reduction': self._perform_dimensionality_reduction(df), |
| 'temporal_patterns': self._analyze_temporal_patterns(df), |
| 'anomaly_detection': self._detect_anomalies(df), |
| 'feature_importance': self._calculate_feature_importance(df) |
| } |
| |
| if is_supermarket(df): |
| industry = 'supermarket' |
| results['supermarket_kpis'] = supermarket_insights(df) |
| |
| if industry and industry.lower() in self.industry_metrics: |
| analysis_results['industry_metrics'] = self.industry_metrics[industry.lower()](df) |
| |
| |
| analysis_results['cross_industry_insights'] = {} |
| for analyzer_name, analyzer_func in self.cross_industry_analyzers.items(): |
| analysis_results['cross_industry_insights'][analyzer_name] = analyzer_func(df) |
| |
| return analysis_results |
|
|
| def _detect_outliers(self, df): |
| """ |
| Detect outliers using IQR method for numerical columns |
| """ |
| outliers = {} |
| for column in df.select_dtypes(include=[np.number]).columns: |
| Q1 = df[column].quantile(0.25) |
| Q3 = df[column].quantile(0.75) |
| IQR = Q3 - Q1 |
| outliers[column] = { |
| 'count': len(df[(df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR))]), |
| 'percentage': len(df[(df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR))]) / len(df) * 100 |
| } |
| return outliers |
| |
| def _perform_distribution_tests(self, df): |
| """ |
| Perform distribution tests for numerical columns |
| """ |
| tests = {} |
| for column in df.select_dtypes(include=[np.number]).columns: |
| shapiro_test = stats.shapiro(df[column].dropna()) |
| tests[column] = { |
| 'shapiro_test': { |
| 'statistic': float(shapiro_test.statistic), |
| 'p_value': float(shapiro_test.pvalue) |
| } |
| } |
| return tests |
|
|
| def _perform_dimensionality_reduction(self, df): |
| """ |
| Perform PCA for dimensional insights |
| """ |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| if len(numeric_cols) < 2: |
| return {} |
| |
| scaler = StandardScaler() |
| scaled_data = scaler.fit_transform(df[numeric_cols]) |
| pca = PCA() |
| pca_result = pca.fit_transform(scaled_data) |
| |
| return { |
| 'explained_variance_ratio': pca.explained_variance_ratio_.tolist(), |
| 'cumulative_variance_ratio': np.cumsum(pca.explained_variance_ratio_).tolist(), |
| 'n_components_95_variance': np.argmax(np.cumsum(pca.explained_variance_ratio_) >= 0.95) + 1 |
| } |
|
|
| def _analyze_temporal_patterns(self, df): |
| """ |
| Analyze temporal patterns and seasonality |
| """ |
| date_cols = df.select_dtypes(include=['datetime64']).columns |
| if len(date_cols) == 0: |
| return None |
| |
| patterns = {} |
| for date_col in date_cols: |
| df['year'] = df[date_col].dt.year |
| df['month'] = df[date_col].dt.month |
| df['day_of_week'] = df[date_col].dt.dayofweek |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| for metric in numeric_cols: |
| if metric not in ['year', 'month', 'day_of_week']: |
| patterns[f"{metric}_by_month"] = df.groupby('month')[metric].mean().to_dict() |
| patterns[f"{metric}_by_day_of_week"] = df.groupby('day_of_week')[metric].mean().to_dict() |
| |
| return patterns |
|
|
| def _detect_anomalies(self, df): |
| """ |
| Detect anomalies using multiple methods |
| """ |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| if len(numeric_cols) == 0: |
| return None |
| |
| scaler = StandardScaler() |
| scaled_data = scaler.fit_transform(df[numeric_cols]) |
| |
| isolation_forest = IsolationForest(random_state=42, contamination=0.1) |
| anomalies = isolation_forest.fit_predict(scaled_data) |
| |
| return { |
| 'anomaly_percentage': float((anomalies == -1).mean() * 100), |
| 'anomaly_indices': np.where(anomalies == -1)[0].tolist() |
| } |
|
|
| def _calculate_feature_importance(self, df): |
| """ |
| Calculate feature importance and relationships |
| """ |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| if len(numeric_cols) < 2: |
| return None |
| |
| importance = {} |
| for col in numeric_cols: |
| correlations = [] |
| for other_col in numeric_cols: |
| if col != other_col: |
| |
| if df[col].nunique() <= 1 or df[other_col].nunique() <= 1: |
| continue |
| try: |
| corr, _ = pearsonr(df[col].fillna(0), df[other_col].fillna(0)) |
| if not np.isnan(corr): |
| correlations.append((other_col, abs(corr))) |
| except ValueError: |
| continue |
| |
| |
| correlation_values = [abs(c[1]) for c in correlations] |
| importance[col] = { |
| 'top_correlations': sorted(correlations, key=lambda x: abs(x[1]), reverse=True)[:3], |
| 'correlation_strength': float(np.mean(correlation_values)) if correlation_values else 0.0 |
| } |
| |
| return importance |
|
|
| def _retail_metrics(self, df): |
|
|
| """Calculate retail-specific metrics""" |
| if not all(col in df.columns for col in ['sales', 'inventory', 'customer_satisfaction']): |
| |
| return { |
| 'sales_performance': {}, |
| 'customer_behavior': {}, |
| 'inventory': {} |
| } |
|
|
| metrics = { |
| 'sales_performance': { |
| 'total_sales': float(df['sales'].sum()) if 'sales' in df.columns else 0.0, |
| 'average_daily_sales': float(df['sales'].mean()) if 'sales' in df.columns else 0.0, |
| 'sales_growth': float((df['sales'].iloc[-1] / df['sales'].iloc[0] - 1) * 100) if 'sales' in df.columns else 0.0 |
| }, |
| 'inventory_turnover': { |
| 'rate': float(df['sales'].sum() / df['inventory'].mean()) if all(col in df.columns for col in ['sales', 'inventory']) else 0.0, |
| 'days_of_inventory': float(df['inventory'].mean() / (df['sales'].mean() / 30)) if all(col in df.columns for col in ['sales', 'inventory']) else 0.0 |
| }, |
| 'customer_metrics': { |
| 'satisfaction_score': float(df['customer_satisfaction'].mean()) if 'customer_satisfaction' in df.columns else 0.0, |
| 'satisfaction_trend': df['customer_satisfaction'].rolling(window=7).mean().to_dict() if 'customer_satisfaction' in df.columns else {} |
| } |
| } |
| return metrics |
|
|
| def _wholesale_metrics(self, df): |
| """ |
| Calculate wholesale-specific metrics |
| """ |
| metrics = { |
| 'order_analytics': {}, |
| 'supplier_performance': {}, |
| 'distribution': {} |
| } |
| |
| if 'order_value' in df.columns: |
| metrics['order_analytics']['average_order_value'] = float(df['order_value'].mean()) |
| metrics['order_analytics']['order_value_distribution'] = df['order_value'].quantile([0.25, 0.5, 0.75]).to_dict() |
| |
| if 'supplier_id' in df.columns and 'delivery_time' in df.columns: |
| supplier_performance = df.groupby('supplier_id')['delivery_time'].agg(['mean', 'std']).to_dict() |
| metrics['supplier_performance'] = supplier_performance |
| |
| return metrics |
| |
| def _supermarket_metrics(self, df): |
| """ |
| Calculate supermarket-specific metrics |
| """ |
| metrics = { |
| 'category_performance': {}, |
| 'basket_analysis': {}, |
| 'promotion_impact': {} |
| } |
| |
| if 'category' in df.columns and 'sales_amount' in df.columns: |
| category_sales = df.groupby('category')['sales_amount'].sum() |
| metrics['category_performance']['top_categories'] = category_sales.nlargest(5).to_dict() |
| |
| if 'transaction_id' in df.columns and 'product_id' in df.columns: |
| |
| transactions = df.groupby('transaction_id')['product_id'].count() |
| metrics['basket_analysis']['average_items_per_transaction'] = float(transactions.mean()) |
| |
| if 'promotion_flag' in df.columns and 'sales_amount' in df.columns: |
| promo_impact = df.groupby('promotion_flag')['sales_amount'].mean() |
| metrics['promotion_impact']['sales_lift'] = float( |
| (promo_impact.get(1, 0) - promo_impact.get(0, 0)) / promo_impact.get(0, 1) * 100 |
| ) |
| |
| return metrics |
| |
| def _manufacturing_metrics(self, df): |
|
|
|
|
| """Calculate manufacturing-specific metrics""" |
| production_col = 'production_volume' if 'production_volume' in df.columns else 'units_produced' |
| metrics = { |
| 'production_efficiency': { |
| 'volume': float(df[production_col].mean()), |
| 'trend': df[production_col].rolling(window=7).mean().to_dict() |
| }, |
| 'quality_metrics': { |
| 'defect_rate': float(df['defect_rate'].mean()) if 'defect_rate' in df.columns else 0.0, |
| 'quality_trend': df['defect_rate'].rolling(window=7).mean().to_dict() if 'defect_rate' in df.columns else {} |
| }, |
| 'quality_control': { |
| 'defects_per_unit': float(df['defect_rate'].mean()) if 'defect_rate' in df.columns else 0.0, |
| 'defect_trend': df['defect_rate'].rolling(window=7).mean().to_dict() if 'defect_rate' in df.columns else {} |
| }, |
| 'equipment_utilization': { |
| 'rate': float((df[production_col] / df[production_col].max()).mean() * 100), |
| 'trend': df[production_col].rolling(window=7).mean().to_dict() |
| } |
| } |
| return metrics |
| |
| def _healthcare_metrics(self, df): |
|
|
| """Calculate healthcare-specific metrics""" |
| metrics = { |
| 'patient_outcomes': { |
| 'satisfaction': float(df['patient_satisfaction'].mean()), |
| 'treatment_success': float(df['treatment_success_rate'].mean()) |
| }, |
| 'operational_efficiency': { |
| 'avg_wait_time': float(df['order_fulfillment_time'].mean()), |
| 'utilization_rate': float(df['production_volume'].mean() / df['production_volume'].max()) |
| }, |
| 'quality_of_care': { |
| 'satisfaction_trend': df['patient_satisfaction'].rolling(window=7).mean().to_dict(), |
| 'success_rate_trend': df['treatment_success_rate'].rolling(window=7).mean().to_dict() |
| } |
| } |
| return metrics |
| |
| def forecast_timeseries(self, data, date_column, value_column): |
| """ |
| Forecast time series data with support for edge cases |
| """ |
| if not data: |
| raise ValueError("Empty dataset provided") |
| |
| df = pd.DataFrame(data) |
| if date_column not in df.columns: |
| raise KeyError(f"Required column '{date_column}' not found") |
| if value_column not in df.columns: |
| raise KeyError(f"Required column '{value_column}' not found") |
| |
| |
| try: |
| df[date_column] = pd.to_datetime(df[date_column]) |
| except ValueError as exc: |
| raise ValueError("Invalid date format") from exc |
| |
| |
| has_missing = df[value_column].isnull().any() |
| if has_missing: |
| df[value_column] = df[value_column].interpolate(method='linear') |
| |
| |
| Q1 = df[value_column].quantile(0.25) |
| Q3 = df[value_column].quantile(0.75) |
| IQR = Q3 - Q1 |
| outlier_mask = (df[value_column] < (Q1 - 1.5 * IQR)) | (df[value_column] > (Q3 + 1.5 * IQR)) |
| has_outliers = outlier_mask.any() |
| |
| |
| prophet_df = df.rename(columns={date_column: 'ds', value_column: 'y'}) |
| model = Prophet(yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=True) |
| model.fit(prophet_df) |
| |
| |
| future = model.make_future_dataframe(periods=30) |
| forecast = model.predict(future) |
| |
| result = { |
| 'forecast': forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].to_dict('records'), |
| 'components': { |
| 'trend': forecast['trend'].to_dict(), |
| 'yearly': forecast['yearly'].to_dict() if 'yearly' in forecast else {}, |
| 'weekly': forecast['weekly'].to_dict() if 'weekly' in forecast else {}, |
| 'daily': forecast['daily'].to_dict() if 'daily' in forecast else {} |
| } |
| } |
| |
| if has_missing: |
| result['handling_missing_values'] = {'filled_indices': df[value_column].isnull().sum()} |
| |
| if has_outliers: |
| result['outlier_impact'] = { |
| 'outlier_indices': outlier_mask[outlier_mask].index.tolist(), |
| 'outlier_values': df.loc[outlier_mask, value_column].tolist() |
| } |
| |
| |
| decomposition = seasonal_decompose(df[value_column], period=7, extrapolate_trend='freq') |
| result['seasonality_components'] = { |
| 'trend': decomposition.trend.to_dict(), |
| 'seasonal': decomposition.seasonal.to_dict(), |
| 'residual': decomposition.resid.to_dict() |
| } |
| |
|
|
|
|
|
|
| |
| timestamp = datetime.now().strftime('%Y%m%d%H') |
| cache_key = f"forecast_{date_column}_{value_column}_{timestamp}" |
| self.redis_client.set(cache_key, json.dumps(result, cls=CustomJSONEncoder)) |
| |
| return result |
| |
| def get_cached_forecast(self, date_column, value_column): |
| """ |
| Retrieve cached forecast results |
| """ |
| timestamp = datetime.now().strftime('%Y%m%d%H') |
| cache_key = f"forecast_{date_column}_{value_column}_{timestamp}" |
| cached = self.redis_client.get(cache_key) |
| |
| if cached: |
| return json.loads(cached) |
| return None |
|
|
| def _analyze_market_dynamics(self, df): |
| """ |
| Analyze market dynamics across industries |
| """ |
| metrics = { |
| 'market_trends': {}, |
| 'competitive_analysis': {}, |
| 'growth_patterns': {} |
| } |
| |
| if 'revenue' in df.columns and 'date' in df.columns: |
| |
| df['month'] = pd.to_datetime(df['date']).dt.to_period('M') |
| monthly_revenue = df.groupby('month')['revenue'].sum() |
| |
| |
| metrics['growth_patterns']['monthly_growth'] = float( |
| ((monthly_revenue.iloc[-1] / monthly_revenue.iloc[0]) ** (1/len(monthly_revenue)) - 1) * 100 |
| ) |
| |
| |
| mean_revenue = monthly_revenue.mean() |
| if mean_revenue > 0: |
| metrics['market_trends']['volatility'] = float(monthly_revenue.std() / mean_revenue) |
| else: |
| metrics['market_trends']['volatility'] = 0.0 |
| |
| if 'competitor_price' in df.columns and 'price' in df.columns: |
|
|
| comp_price_mean = df['competitor_price'].mean() |
| if comp_price_mean > 0: |
| metrics['competitive_analysis']['price_position'] = float( |
| (df['price'].mean() / comp_price_mean - 1) * 100 |
| ) |
| else: |
| metrics['competitive_analysis']['price_position'] = 0.0 |
| |
| return metrics |
|
|
| def _analyze_supply_chain(self, df): |
| """ |
| Analyze supply chain metrics across industries |
| """ |
| metrics = { |
| 'efficiency': {}, |
| 'reliability': {}, |
| 'cost_analysis': {} |
| } |
| |
| |
| if 'supplier_id' in df.columns and 'delivery_time' in df.columns: |
| supplier_performance = df.groupby('supplier_id').agg({ |
| 'delivery_time': ['mean', 'std'], |
| 'order_value': ['sum', 'mean'] |
| }).round(2) |
| |
| metrics['reliability']['supplier_consistency'] = float( |
| 1 - (supplier_performance['delivery_time']['std'] / supplier_performance['delivery_time']['mean']).mean() |
| ) |
| |
| |
| if 'transportation_cost' in df.columns and 'order_value' in df.columns: |
| metrics['cost_analysis']['logistics_cost_ratio'] = float( |
| (df['transportation_cost'].sum() / df['order_value'].sum()) * 100 |
| ) |
| |
| return metrics |
|
|
| def _analyze_customer_insights(self, df): |
| """ |
| Cross-industry customer behavior analysis |
| """ |
| insights = { |
| 'customer_segments': {}, |
| 'behavior_patterns': {}, |
| 'lifetime_value': {} |
| } |
| |
| if 'customer_id' in df.columns and 'transaction_amount' in df.columns: |
| |
| customer_features = df.groupby('customer_id').agg({ |
| 'transaction_amount': ['sum', 'mean', 'count'] |
| }).values |
| |
| scaler = MinMaxScaler() |
| scaled_features = scaler.fit_transform(customer_features) |
| |
| |
| dbscan = DBSCAN(eps=0.3, min_samples=5) |
| clusters = dbscan.fit_predict(scaled_features) |
| |
| insights['customer_segments']['natural_segments'] = { |
| 'n_segments': len(np.unique(clusters[clusters >= 0])), |
| 'segment_sizes': pd.Series(clusters).value_counts().to_dict() |
| } |
| |
| return insights |
|
|
| def _analyze_operational_efficiency(self, df): |
| """ |
| Cross-industry operational efficiency analysis |
| """ |
| metrics = { |
| 'process_efficiency': {}, |
| 'resource_utilization': {}, |
| 'bottleneck_analysis': {} |
| } |
| |
| if 'process_time' in df.columns and 'output_quantity' in df.columns: |
| |
| metrics['process_efficiency']['throughput_rate'] = float( |
| df['output_quantity'].sum() / df['process_time'].sum() |
| ) |
| |
| |
| process_stability = 1 - (df['process_time'].std() / df['process_time'].mean()) |
| metrics['process_efficiency']['stability_score'] = float(process_stability) |
| |
| return metrics |
|
|
| def _analyze_risk_patterns(self, df): |
| """ |
| Cross-industry risk pattern analysis |
| """ |
| risk_metrics = { |
| 'operational_risk': {}, |
| 'market_risk': {}, |
| 'compliance_risk': {} |
| } |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| if len(numeric_cols) > 0: |
| |
| iso_forest = IsolationForest(contamination=0.1, random_state=42) |
| risk_scores = iso_forest.fit_predict(df[numeric_cols]) |
| |
| risk_metrics['operational_risk']['anomaly_percentage'] = float( |
| (risk_scores == -1).mean() * 100 |
| ) |
| |
| return risk_metrics |
|
|
| def _analyze_sustainability_metrics(self, df): |
| """ |
| |
| Analyze sustainability metrics including environmental impact, resource utilization, and waste management |
| """ |
| if not all(col in df.columns for col in ['energy_consumption', 'water_consumption', 'waste_generated']): |
| return {} |
| |
| results = { |
| 'environmental_impact': { |
| 'carbon_footprint_trend': df['carbon_footprint'].rolling(window=7).mean().to_dict() if 'carbon_footprint' in df.columns else {}, |
| 'total_emissions': float(df['energy_consumption'].sum() * 0.5) |
| }, |
| 'resource_utilization': { |
| 'energy_efficiency': float(df['energy_consumption'].mean()), |
| 'water_efficiency': float(df['water_consumption'].mean()) |
| }, |
| 'waste_management': { |
| 'recycling_performance': float(df['recycling_rate'].mean()) if 'recycling_rate' in df.columns else 0.0, |
| 'waste_reduction_trend': df['waste_generated'].rolling(window=7).mean().to_dict() |
| } |
| } |
| return results |
|
|
| def prepare_ai_query_interface(self, df): |
| """ |
| Prepare data for natural language analytics queries with enhanced semantic understanding |
| """ |
| query_interface = { |
| 'semantic_mappings': {}, |
| 'entity_relationships': {}, |
| 'available_metrics': {}, |
| 'temporal_context': {}, |
| 'metric_relationships': {}, |
| 'data_patterns': {}, |
| 'suggested_queries': [] |
| } |
| |
| try: |
| |
| text_columns = df.select_dtypes(include=['object']).columns |
| vectorizer = TfidfVectorizer(max_features=1000) |
| |
| for col in text_columns: |
| if df[col].str.len().mean() > 5: |
| text_features = vectorizer.fit_transform(df[col].fillna('').astype(str)) |
| query_interface['semantic_mappings'][col] = { |
| 'vocabulary': vectorizer.vocabulary_, |
| 'idf_values': vectorizer.idf_.tolist(), |
| 'top_terms': dict(zip( |
| vectorizer.get_feature_names_out(), |
| np.asarray(text_features.sum(axis=0)).ravel() |
| )) |
| } |
| |
| |
| entity_columns = [col for col in df.columns if any(entity in col.lower() |
| for entity in ['id', 'category', 'type', 'name', 'class', 'group'])] |
| |
| for col in entity_columns: |
| if df[col].dtype == 'object': |
| value_counts = df[col].value_counts() |
| unique_values = df[col].unique().tolist() |
| |
| |
| hierarchy = {} |
| if '_' in col or col.lower().endswith('_id'): |
| related_cols = [c for c in df.columns if col.split('_')[0] in c and c != col] |
| for rel_col in related_cols: |
| hierarchy[rel_col] = df.groupby(col)[rel_col].agg(list).to_dict() |
| |
| query_interface['entity_relationships'][col] = { |
| 'unique_values': unique_values, |
| 'value_counts': value_counts.to_dict(), |
| 'hierarchy': hierarchy, |
| 'cardinality': len(unique_values) |
| } |
| |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| for col in numeric_cols: |
| stats = df[col].describe() |
| query_interface['available_metrics'][col] = { |
| 'min': float(stats['min']), |
| 'max': float(stats['max']), |
| 'mean': float(stats['mean']), |
| 'std': float(stats['std']), |
| 'quartiles': { |
| '25%': float(stats['25%']), |
| '50%': float(stats['50%']), |
| '75%': float(stats['75%']) |
| } |
| } |
| |
| |
| correlations = {} |
| for other_col in numeric_cols: |
| if col != other_col: |
| corr = df[col].corr(df[other_col]) |
| if abs(corr) > 0.3: |
| correlations[other_col] = float(corr) |
| |
| query_interface['metric_relationships'][col] = { |
| 'correlations': correlations, |
| 'trends': self._analyze_metric_trends(df, col) |
| } |
| |
| |
| date_cols = df.select_dtypes(include=['datetime64']).columns |
| if len(date_cols) == 0: |
| |
| for col in df.columns: |
| if df[col].dtype == 'object': |
| try: |
| pd.to_datetime(df[col]) |
| date_cols = date_cols.append(col) |
| except: |
| continue |
| |
| for date_col in date_cols: |
| df[date_col] = pd.to_datetime(df[date_col]) |
| temporal_stats = { |
| 'min_date': df[date_col].min().isoformat(), |
| 'max_date': df[date_col].max().isoformat(), |
| 'frequency': pd.infer_freq(df[date_col]), |
| 'temporal_patterns': {} |
| } |
| |
| |
| temporal_stats['temporal_patterns'] = { |
| 'daily_pattern': df.groupby(df[date_col].dt.dayofweek).size().to_dict(), |
| 'monthly_pattern': df.groupby(df[date_col].dt.month).size().to_dict(), |
| 'yearly_pattern': df.groupby(df[date_col].dt.year).size().to_dict() |
| } |
| |
| query_interface['temporal_context'][date_col] = temporal_stats |
| |
| |
| query_interface['data_patterns'] = { |
| 'missing_patterns': df.isnull().sum().to_dict(), |
| 'unique_value_counts': df.nunique().to_dict(), |
| 'distribution_types': self._analyze_distributions(df) |
| } |
| |
| |
| query_interface['suggested_queries'] = self._generate_suggested_queries(df) |
| |
| |
| query_interface['metadata'] = { |
| 'row_count': len(df), |
| 'column_count': len(df.columns), |
| 'memory_usage': df.memory_usage(deep=True).sum(), |
| 'data_types': df.dtypes.astype(str).to_dict() |
| } |
| |
| except Exception as e: |
| query_interface['error'] = str(e) |
| |
| return query_interface |
| |
| def _analyze_metric_trends(self, df, column): |
| """Helper method to analyze trends in numeric columns""" |
| trends = {} |
| if 'date' in df.columns: |
| df['date'] = pd.to_datetime(df['date']) |
| time_series = df.groupby('date')[column].mean() |
| if len(time_series) > 2: |
| |
| x = np.arange(len(time_series)) |
| y = time_series.values |
| slope, intercept = np.polyfit(x, y, 1) |
| trends['slope'] = float(slope) |
| trends['trend_direction'] = 'increasing' if slope > 0 else 'decreasing' |
| trends['trend_strength'] = float(abs(slope) / time_series.mean()) |
| return trends |
| |
| def _analyze_distributions(self, df): |
| """Helper method to analyze value distributions""" |
| distributions = {} |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| |
| for col in numeric_cols: |
| if df[col].nunique() > 5: |
| |
| _, p_value = stats.normaltest(df[col].dropna()) |
| skewness = float(df[col].skew()) |
| kurtosis = float(df[col].kurtosis()) |
| |
| distributions[col] = { |
| 'distribution_type': 'normal' if p_value > 0.05 else 'non_normal', |
| 'skewness': skewness, |
| 'kurtosis': kurtosis |
| } |
| return distributions |
| |
| def _generate_suggested_queries(self, df): |
| """Helper method to generate relevant query suggestions""" |
| suggestions = [] |
| |
| |
| if 'date' in df.columns: |
| suggestions.extend([ |
| "Show the trend over time", |
| "Compare year-over-year growth", |
| "Find seasonal patterns" |
| ]) |
| |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| if len(numeric_cols) > 0: |
| suggestions.extend([ |
| f"Analyze the distribution of {col}" for col in numeric_cols[:3] |
| ]) |
| |
| |
| categorical_cols = df.select_dtypes(include=['object']).columns |
| if len(categorical_cols) > 0: |
| suggestions.extend([ |
| f"Break down metrics by {col}" for col in categorical_cols[:3] |
| ]) |
| |
| return suggestions |
|
|
| def enhance_cross_industry_correlations(self, df): |
| """ |
| Enhanced analysis of correlations across different industries |
| """ |
| correlations = { |
| 'metric_correlations': {}, |
| 'industry_patterns': {}, |
| 'shared_trends': {} |
| } |
| |
| if 'industry' in df.columns: |
| industries = df['industry'].unique() |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| |
| |
| for ind1 in industries: |
| for ind2 in industries: |
| if ind1 < ind2: |
| ind1_data = df[df['industry'] == ind1][numeric_cols] |
| ind2_data = df[df['industry'] == ind2][numeric_cols] |
| |
| if not ind1_data.empty and not ind2_data.empty: |
| common_metrics = set(ind1_data.columns) & set(ind2_data.columns) |
| for metric in common_metrics: |
| corr, p_value = pearsonr( |
| ind1_data[metric].fillna(0), |
| ind2_data[metric].fillna(0) |
| ) |
| correlations['metric_correlations'][f"{ind1}_{ind2}_{metric}"] = { |
| 'correlation': float(corr), |
| 'p_value': float(p_value) |
| } |
| |
| |
| if 'date' in df.columns: |
| for metric in numeric_cols: |
| industry_trends = {} |
| for industry in industries: |
| industry_data = df[df['industry'] == industry] |
| if not industry_data.empty: |
| trend = industry_data.groupby('date')[metric].mean() |
| if len(trend) > 0: |
| industry_trends[industry] = trend.to_dict() |
| |
| correlations['shared_trends'][metric] = industry_trends |
| |
| return correlations |
|
|
| def perform_market_basket_analysis(self, df: pd.DataFrame, min_support: float = 0.01, |
| min_confidence: float = 0.3, min_lift: float = 1.0) -> dict: |
| """ |
| Perform advanced market basket analysis with support for multiple analytics dimensions. |
| |
| Args: |
| df (pd.DataFrame): Input transaction data with required columns |
| min_support (float): Minimum support threshold for frequent itemsets (default: 0.01) |
| min_confidence (float): Minimum confidence threshold for rules (default: 0.3) |
| min_lift (float): Minimum lift threshold for rules (default: 1.0) |
| |
| Returns: |
| dict: Dictionary containing: |
| - product_associations: Support, confidence, and lift metrics for product pairs |
| - temporal_baskets: Time-based purchase patterns |
| - product_clusters: Product groupings based on purchase behavior |
| - customer_segments: Customer segments based on purchase patterns |
| - performance_metrics: Key performance indicators |
| |
| Raises: |
| ValueError: If required columns are missing or data validation fails |
| """ |
| try: |
| |
| required_columns = ['transaction_id', 'product_id'] |
| if not all(col in df.columns for col in required_columns): |
| raise ValueError(f"Missing required columns: {set(required_columns) - set(df.columns)}") |
| |
| if df.empty: |
| raise ValueError("Empty dataframe provided") |
| |
| |
| df = df.copy() |
| |
| |
| baskets = (df.groupby('transaction_id')['product_id'] |
| .agg(lambda x: frozenset(x.values)) |
| .reset_index()) |
| |
| total_transactions = len(baskets) |
| |
| |
| product_freq = df.groupby('product_id').size().to_dict() |
| |
| |
| pairs_data = [] |
| for products in baskets['product_id']: |
| products_list = list(products) |
| pairs_data.extend( |
| tuple(sorted([p1, p2])) |
| for i, p1 in enumerate(products_list) |
| for p2 in products_list[i+1:] |
| ) |
| |
| pair_freq = pd.Series(pairs_data).value_counts().to_dict() |
| |
| |
| product_associations = { |
| 'support': {}, |
| 'confidence': {}, |
| 'lift': {}, |
| 'metrics_distribution': { |
| 'support': {'min': float('inf'), 'max': 0, 'mean': 0}, |
| 'confidence': {'min': float('inf'), 'max': 0, 'mean': 0}, |
| 'lift': {'min': float('inf'), 'max': 0, 'mean': 0} |
| } |
| } |
| |
| valid_rules = [] |
| for pair, freq in pair_freq.items(): |
| prod1, prod2 = pair |
| support = freq / total_transactions |
| |
| if support >= min_support: |
| confidence_1_2 = freq / product_freq[prod1] |
| confidence_2_1 = freq / product_freq[prod2] |
| max_confidence = max(confidence_1_2, confidence_2_1) |
| |
| if max_confidence >= min_confidence: |
| lift = (freq * total_transactions) / (product_freq[prod1] * product_freq[prod2]) |
| |
| if lift >= min_lift: |
| valid_rules.append({ |
| 'pair': pair, |
| 'support': support, |
| 'confidence': max_confidence, |
| 'lift': lift |
| }) |
| |
| |
| pair_key = f"({prod1}, {prod2})" |
| product_associations['support'][pair_key] = float(support) |
| product_associations['confidence'][pair_key] = float(max_confidence) |
| product_associations['lift'][pair_key] = float(lift) |
| |
| |
| for metric_type, value in [('support', support), |
| ('confidence', max_confidence), |
| ('lift', lift)]: |
| dist = product_associations['metrics_distribution'][metric_type] |
| dist['min'] = min(dist['min'], value) |
| dist['max'] = max(dist['max'], value) |
| |
| |
| for metric_type in ['support', 'confidence', 'lift']: |
| values = [rule[metric_type] for rule in valid_rules] |
| if values: |
| product_associations['metrics_distribution'][metric_type]['mean'] = float(sum(values) / len(values)) |
| else: |
| product_associations['metrics_distribution'][metric_type] = {'min': 0, 'max': 0, 'mean': 0} |
| |
| |
| temporal_patterns = self._analyze_temporal_patterns(df) if 'timestamp' in df.columns else {} |
| |
| |
| product_clusters = self._perform_product_clustering(df) if 'quantity' in df.columns else {} |
| |
| |
| customer_segments = self._analyze_customer_segments(df) if 'customer_id' in df.columns else {} |
| |
| |
| performance_metrics = { |
| 'total_transactions': total_transactions, |
| 'unique_products': len(product_freq), |
| 'avg_basket_size': float(df.groupby('transaction_id')['product_id'].count().mean()), |
| 'total_rules_found': len(valid_rules), |
| 'rules_distribution': { |
| 'strong_associations': len([r for r in valid_rules if r['lift'] > 2]), |
| 'moderate_associations': len([r for r in valid_rules if 1 < r['lift'] <= 2]), |
| 'weak_associations': len([r for r in valid_rules if r['lift'] <= 1]) |
| } |
| } |
| |
| return { |
| 'product_associations': product_associations, |
| 'temporal_baskets': temporal_patterns, |
| 'product_clusters': product_clusters, |
| 'customer_segments': customer_segments, |
| 'performance_metrics': performance_metrics |
| } |
| |
| except Exception as e: |
| print(f"Error in market basket analysis: {str(e)}") |
| raise ValueError(f"Market basket analysis failed: {str(e)}") from e |
| |
| def _analyze_temporal_patterns(self, df: pd.DataFrame) -> dict: |
| """Analyze temporal patterns in purchase behavior""" |
| patterns = { |
| 'daily_patterns': {}, |
| 'weekly_patterns': {}, |
| 'monthly_patterns': {}, |
| 'hourly_patterns': {} |
| } |
| |
| try: |
| timestamps = pd.to_datetime(df['timestamp']) |
| |
| for period, grouper in [ |
| ('hourly_patterns', timestamps.dt.hour), |
| ('daily_patterns', timestamps.dt.day), |
| ('weekly_patterns', timestamps.dt.dayofweek), |
| ('monthly_patterns', timestamps.dt.month) |
| ]: |
| pattern_data = df.groupby(grouper).agg({ |
| 'product_id': ['count', 'nunique'], |
| 'transaction_id': 'nunique', |
| 'quantity': ['sum', 'mean'] if 'quantity' in df.columns else ['count'] |
| }).round(2) |
| |
| patterns[period] = { |
| 'transaction_count': pattern_data['transaction_id']['nunique'].to_dict(), |
| 'product_count': pattern_data['product_id']['count'].to_dict(), |
| 'unique_products': pattern_data['product_id']['nunique'].to_dict(), |
| 'total_quantity': pattern_data['quantity']['sum'].to_dict() if 'quantity' in df.columns else {}, |
| 'avg_quantity': pattern_data['quantity']['mean'].to_dict() if 'quantity' in df.columns else {} |
| } |
| |
| except (ValueError, KeyError) as e: |
| print(f"Error in temporal pattern analysis: {str(e)}") |
| return patterns |
| |
| return patterns |
| |
| def _perform_product_clustering(self, df: pd.DataFrame) -> dict: |
| """Perform advanced product clustering analysis""" |
| try: |
| |
| product_features = df.groupby('product_id').agg({ |
| 'quantity': ['mean', 'std', 'sum', 'count'], |
| 'transaction_id': 'nunique' |
| }).fillna(0) |
| |
| |
| product_features['quantity_per_transaction'] = ( |
| product_features['quantity']['sum'] / |
| product_features['transaction_id']['nunique'] |
| ) |
| |
| |
| features_for_clustering = product_features.copy() |
| features_for_clustering.columns = [f"{col[0]}_{col[1]}" if isinstance(col, tuple) else col |
| for col in features_for_clustering.columns] |
| |
| if len(features_for_clustering) > 1: |
| |
| scaler = StandardScaler() |
| scaled_features = scaler.fit_transform(features_for_clustering) |
| |
| |
| max_clusters = min(5, len(features_for_clustering) - 1) |
| scores = [] |
| for k in range(2, max_clusters + 1): |
| kmeans = KMeans(n_clusters=k, random_state=42) |
| clusters = kmeans.fit_predict(scaled_features) |
| score = silhouette_score(scaled_features, clusters) |
| scores.append((k, score)) |
| |
| |
| optimal_k = max(scores, key=lambda x: x[1])[0] |
| kmeans = KMeans(n_clusters=optimal_k, random_state=42) |
| clusters = kmeans.fit_predict(scaled_features) |
| |
| |
| cluster_data = { |
| 'cluster_assignments': { |
| prod: int(cluster) for prod, cluster in zip(product_features.index, clusters) |
| }, |
| 'cluster_profiles': {}, |
| 'evaluation_metrics': { |
| 'silhouette_score': float(max(scores, key=lambda x: x[1])[1]), |
| 'num_clusters': optimal_k |
| } |
| } |
| |
| |
| for cluster_id in range(optimal_k): |
| cluster_mask = clusters == cluster_id |
| cluster_data['cluster_profiles'][str(cluster_id)] = { |
| 'size': int(sum(cluster_mask)), |
| 'avg_quantity': float(product_features['quantity']['mean'][cluster_mask].mean()), |
| 'avg_transactions': float(product_features['transaction_id']['nunique'][cluster_mask].mean()), |
| 'total_quantity': float(product_features['quantity']['sum'][cluster_mask].sum()), |
| 'purchase_frequency': float( |
| (product_features['quantity']['count'][cluster_mask].sum() / |
| product_features['transaction_id']['nunique'][cluster_mask].sum()) |
| ) |
| } |
| |
| return cluster_data |
| |
| except np.linalg.LinAlgError as e: |
| print(f"Error in clustering computation: {str(e)}") |
| return {} |
| except (ValueError, KeyError) as e: |
| print(f"Error in product clustering: {str(e)}") |
| return {} |
| |
| return {} |
| |
| def _analyze_customer_segments(self, df: pd.DataFrame) -> dict: |
| """Analyze customer segments based on purchase behavior""" |
| try: |
| if 'customer_id' not in df.columns: |
| return {} |
| |
| customer_stats = df.groupby('customer_id').agg({ |
| 'transaction_id': 'nunique', |
| 'product_id': ['nunique', 'count'], |
| 'quantity': ['sum', 'mean'] if 'quantity' in df.columns else ['count', 'mean'] |
| }) |
| |
| |
| if 'timestamp' in df.columns: |
| current_date = pd.to_datetime(df['timestamp']).max() |
| customer_stats['recency'] = df.groupby('customer_id')['timestamp'].max().apply( |
| lambda x: (current_date - pd.to_datetime(x)).days |
| ) |
| |
| |
| stats_for_clustering = customer_stats.copy() |
| stats_for_clustering.columns = [f"{col[0]}_{col[1]}" if isinstance(col, tuple) else col |
| for col in stats_for_clustering.columns] |
| |
| if len(stats_for_clustering) > 1: |
| scaler = StandardScaler() |
| scaled_features = scaler.fit_transform(stats_for_clustering) |
| |
| |
| dbscan = DBSCAN(eps=0.5, min_samples=3) |
| clusters = dbscan.fit_predict(scaled_features) |
| |
| return { |
| 'customer_segments': { |
| str(cust): int(cluster) for cust, cluster in zip(customer_stats.index, clusters) |
| }, |
| 'segment_profiles': { |
| str(segment): { |
| 'size': int(sum(clusters == segment)), |
| 'avg_transactions': float(customer_stats['transaction_id']['nunique'][clusters == segment].mean()), |
| 'avg_products': float(customer_stats['product_id']['nunique'][clusters == segment].mean()) |
| } |
| for segment in set(clusters) if segment != -1 |
| }, |
| 'segment_statistics': { |
| 'num_segments': len(set(clusters) - {-1}), |
| 'noise_points': int(sum(clusters == -1)) |
| } |
| } |
| |
| except Exception as e: |
| print(f"Error in customer segmentation: {str(e)}") |
| return {} |
| |
| def _calculate_correlations(self, df: pd.DataFrame) -> dict: |
| """Calculate correlations between numeric columns with detailed statistics""" |
| correlations = {} |
| |
| try: |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| if len(numeric_cols) < 2: |
| return correlations |
| |
| |
| corr_matrix = df[numeric_cols].corr() |
| |
| |
| for col1 in numeric_cols: |
| correlations[col1] = {} |
| for col2 in numeric_cols: |
| if col1 != col2: |
| correlation = corr_matrix.loc[col1, col2] |
| if not np.isnan(correlation): |
| |
| coef, p_value = pearsonr(df[col1].fillna(0), df[col2].fillna(0)) |
| correlations[col1][col2] = { |
| 'coefficient': float(correlation), |
| 'p_value': float(p_value), |
| 'strength': 'strong' if abs(correlation) > 0.7 |
| else 'moderate' if abs(correlation) > 0.3 |
| else 'weak', |
| 'direction': 'positive' if correlation > 0 else 'negative', |
| 'sample_size': len(df) |
| } |
| |
| except Exception as e: |
| print(f"Error calculating correlations: {str(e)}") |
| return {} |
| |
| return correlations |
|
|