import pandas as pd import numpy as np from prophet import Prophet from datetime import datetime import redis import json from sklearn.cluster import KMeans, DBSCAN from sklearn.preprocessing import StandardScaler, MinMaxScaler from sklearn.decomposition import PCA from sklearn.ensemble import IsolationForest from .json_utils import CustomJSONEncoder from scipy import stats from scipy.stats import pearsonr from statsmodels.tsa.seasonal import seasonal_decompose from statsmodels.tsa.stattools import adfuller import networkx as nx from sklearn.metrics import silhouette_score from sklearn.feature_extraction.text import TfidfVectorizer from .supermarket_metrics import supermarket_insights from app.utils.detect_industry import is_supermarket # next snippet class AnalyticsService: def __init__(self): self.redis_client = redis.Redis(host='localhost', port=6379, db=0) self.industry_metrics = { 'retail': self._retail_metrics, 'wholesale': self._wholesale_metrics, 'supermarket': self._supermarket_metrics, 'manufacturing': self._manufacturing_metrics, 'healthcare': self._healthcare_metrics } self.cross_industry_analyzers = { 'market_dynamics': self._analyze_market_dynamics, 'supply_chain': self._analyze_supply_chain, 'customer_insights': self._analyze_customer_insights, 'operational_efficiency': self._analyze_operational_efficiency, 'risk_assessment': self._analyze_risk_patterns, 'sustainability': self._analyze_sustainability_metrics } def perform_eda(self, data, industry=None): """ Perform enhanced Exploratory Data Analysis with cross-industry insights """ if not data: raise ValueError("Empty dataset provided") df = pd.DataFrame(data) if df.empty: raise ValueError("Empty dataset provided") # Validate numeric columns numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) == 0: raise ValueError("Non-numeric values found in dataset") # Convert date columns to datetime date_columns = [] for col in df.columns: if df[col].dtype == 'object': try: df[col] = pd.to_datetime(df[col]) date_columns.append(col) except (ValueError, TypeError): continue # Get numeric columns excluding dates numeric_cols = df.select_dtypes(include=[np.number]).columns # Advanced statistics and AI-ready features analysis_results = { 'basic_stats': df[numeric_cols].describe().to_dict() if len(numeric_cols) > 0 else {}, 'missing_values': df.isnull().sum().to_dict(), 'columns': list(df.columns), 'row_count': len(df), 'correlation_matrix': df[numeric_cols].corr().to_dict() if len(numeric_cols) > 0 else {}, 'skewness': df[numeric_cols].skew().to_dict() if len(numeric_cols) > 0 else {}, 'kurtosis': df[numeric_cols].kurtosis().to_dict() if len(numeric_cols) > 0 else {}, 'outliers': self._detect_outliers(df), 'distribution_tests': self._perform_distribution_tests(df), 'dimensionality_reduction': self._perform_dimensionality_reduction(df), 'temporal_patterns': self._analyze_temporal_patterns(df), 'anomaly_detection': self._detect_anomalies(df), 'feature_importance': self._calculate_feature_importance(df) } # --- supermarket auto-detection --- if is_supermarket(df): industry = 'supermarket' results['supermarket_kpis'] = supermarket_insights(df) # Add industry-specific metrics if industry and industry.lower() in self.industry_metrics: analysis_results['industry_metrics'] = self.industry_metrics[industry.lower()](df) # Add cross-industry insights analysis_results['cross_industry_insights'] = {} for analyzer_name, analyzer_func in self.cross_industry_analyzers.items(): analysis_results['cross_industry_insights'][analyzer_name] = analyzer_func(df) return analysis_results def _detect_outliers(self, df): """ Detect outliers using IQR method for numerical columns """ outliers = {} for column in df.select_dtypes(include=[np.number]).columns: Q1 = df[column].quantile(0.25) Q3 = df[column].quantile(0.75) IQR = Q3 - Q1 outliers[column] = { 'count': len(df[(df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR))]), 'percentage': len(df[(df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR))]) / len(df) * 100 } return outliers def _perform_distribution_tests(self, df): """ Perform distribution tests for numerical columns """ tests = {} for column in df.select_dtypes(include=[np.number]).columns: shapiro_test = stats.shapiro(df[column].dropna()) tests[column] = { 'shapiro_test': { 'statistic': float(shapiro_test.statistic), 'p_value': float(shapiro_test.pvalue) } } return tests def _perform_dimensionality_reduction(self, df): """ Perform PCA for dimensional insights """ numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) < 2: return {} scaler = StandardScaler() scaled_data = scaler.fit_transform(df[numeric_cols]) pca = PCA() pca_result = pca.fit_transform(scaled_data) return { 'explained_variance_ratio': pca.explained_variance_ratio_.tolist(), 'cumulative_variance_ratio': np.cumsum(pca.explained_variance_ratio_).tolist(), 'n_components_95_variance': np.argmax(np.cumsum(pca.explained_variance_ratio_) >= 0.95) + 1 } def _analyze_temporal_patterns(self, df): """ Analyze temporal patterns and seasonality """ date_cols = df.select_dtypes(include=['datetime64']).columns if len(date_cols) == 0: return None patterns = {} for date_col in date_cols: df['year'] = df[date_col].dt.year df['month'] = df[date_col].dt.month df['day_of_week'] = df[date_col].dt.dayofweek numeric_cols = df.select_dtypes(include=[np.number]).columns for metric in numeric_cols: if metric not in ['year', 'month', 'day_of_week']: patterns[f"{metric}_by_month"] = df.groupby('month')[metric].mean().to_dict() patterns[f"{metric}_by_day_of_week"] = df.groupby('day_of_week')[metric].mean().to_dict() return patterns def _detect_anomalies(self, df): """ Detect anomalies using multiple methods """ numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) == 0: return None scaler = StandardScaler() scaled_data = scaler.fit_transform(df[numeric_cols]) isolation_forest = IsolationForest(random_state=42, contamination=0.1) anomalies = isolation_forest.fit_predict(scaled_data) return { 'anomaly_percentage': float((anomalies == -1).mean() * 100), 'anomaly_indices': np.where(anomalies == -1)[0].tolist() } def _calculate_feature_importance(self, df): """ Calculate feature importance and relationships """ numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) < 2: return None importance = {} for col in numeric_cols: correlations = [] for other_col in numeric_cols: if col != other_col: # Check if either column is constant if df[col].nunique() <= 1 or df[other_col].nunique() <= 1: continue try: corr, _ = pearsonr(df[col].fillna(0), df[other_col].fillna(0)) if not np.isnan(corr): # Only add if correlation is valid correlations.append((other_col, abs(corr))) except ValueError: continue # Skip if correlation can't be calculated # Handle empty correlations case correlation_values = [abs(c[1]) for c in correlations] importance[col] = { 'top_correlations': sorted(correlations, key=lambda x: abs(x[1]), reverse=True)[:3], 'correlation_strength': float(np.mean(correlation_values)) if correlation_values else 0.0 } return importance def _retail_metrics(self, df): """Calculate retail-specific metrics""" if not all(col in df.columns for col in ['sales', 'inventory', 'customer_satisfaction']): # Return default structure if required columns are missing return { 'sales_performance': {}, 'customer_behavior': {}, 'inventory': {} } metrics = { 'sales_performance': { 'total_sales': float(df['sales'].sum()) if 'sales' in df.columns else 0.0, 'average_daily_sales': float(df['sales'].mean()) if 'sales' in df.columns else 0.0, 'sales_growth': float((df['sales'].iloc[-1] / df['sales'].iloc[0] - 1) * 100) if 'sales' in df.columns else 0.0 }, 'inventory_turnover': { 'rate': float(df['sales'].sum() / df['inventory'].mean()) if all(col in df.columns for col in ['sales', 'inventory']) else 0.0, 'days_of_inventory': float(df['inventory'].mean() / (df['sales'].mean() / 30)) if all(col in df.columns for col in ['sales', 'inventory']) else 0.0 }, 'customer_metrics': { 'satisfaction_score': float(df['customer_satisfaction'].mean()) if 'customer_satisfaction' in df.columns else 0.0, 'satisfaction_trend': df['customer_satisfaction'].rolling(window=7).mean().to_dict() if 'customer_satisfaction' in df.columns else {} } } return metrics def _wholesale_metrics(self, df): """ Calculate wholesale-specific metrics """ metrics = { 'order_analytics': {}, 'supplier_performance': {}, 'distribution': {} } if 'order_value' in df.columns: metrics['order_analytics']['average_order_value'] = float(df['order_value'].mean()) metrics['order_analytics']['order_value_distribution'] = df['order_value'].quantile([0.25, 0.5, 0.75]).to_dict() if 'supplier_id' in df.columns and 'delivery_time' in df.columns: supplier_performance = df.groupby('supplier_id')['delivery_time'].agg(['mean', 'std']).to_dict() metrics['supplier_performance'] = supplier_performance return metrics def _supermarket_metrics(self, df): """ Calculate supermarket-specific metrics """ metrics = { 'category_performance': {}, 'basket_analysis': {}, 'promotion_impact': {} } if 'category' in df.columns and 'sales_amount' in df.columns: category_sales = df.groupby('category')['sales_amount'].sum() metrics['category_performance']['top_categories'] = category_sales.nlargest(5).to_dict() if 'transaction_id' in df.columns and 'product_id' in df.columns: # Simple basket analysis transactions = df.groupby('transaction_id')['product_id'].count() metrics['basket_analysis']['average_items_per_transaction'] = float(transactions.mean()) if 'promotion_flag' in df.columns and 'sales_amount' in df.columns: promo_impact = df.groupby('promotion_flag')['sales_amount'].mean() metrics['promotion_impact']['sales_lift'] = float( (promo_impact.get(1, 0) - promo_impact.get(0, 0)) / promo_impact.get(0, 1) * 100 ) return metrics def _manufacturing_metrics(self, df): """Calculate manufacturing-specific metrics""" production_col = 'production_volume' if 'production_volume' in df.columns else 'units_produced' metrics = { 'production_efficiency': { 'volume': float(df[production_col].mean()), 'trend': df[production_col].rolling(window=7).mean().to_dict() }, 'quality_metrics': { 'defect_rate': float(df['defect_rate'].mean()) if 'defect_rate' in df.columns else 0.0, 'quality_trend': df['defect_rate'].rolling(window=7).mean().to_dict() if 'defect_rate' in df.columns else {} }, 'quality_control': { 'defects_per_unit': float(df['defect_rate'].mean()) if 'defect_rate' in df.columns else 0.0, 'defect_trend': df['defect_rate'].rolling(window=7).mean().to_dict() if 'defect_rate' in df.columns else {} }, 'equipment_utilization': { 'rate': float((df[production_col] / df[production_col].max()).mean() * 100), 'trend': df[production_col].rolling(window=7).mean().to_dict() } } return metrics def _healthcare_metrics(self, df): """Calculate healthcare-specific metrics""" metrics = { 'patient_outcomes': { 'satisfaction': float(df['patient_satisfaction'].mean()), 'treatment_success': float(df['treatment_success_rate'].mean()) }, 'operational_efficiency': { 'avg_wait_time': float(df['order_fulfillment_time'].mean()), 'utilization_rate': float(df['production_volume'].mean() / df['production_volume'].max()) }, 'quality_of_care': { 'satisfaction_trend': df['patient_satisfaction'].rolling(window=7).mean().to_dict(), 'success_rate_trend': df['treatment_success_rate'].rolling(window=7).mean().to_dict() } } return metrics def forecast_timeseries(self, data, date_column, value_column): """ Forecast time series data with support for edge cases """ if not data: raise ValueError("Empty dataset provided") df = pd.DataFrame(data) if date_column not in df.columns: raise KeyError(f"Required column '{date_column}' not found") if value_column not in df.columns: raise KeyError(f"Required column '{value_column}' not found") # Convert to datetime try: df[date_column] = pd.to_datetime(df[date_column]) except ValueError as exc: raise ValueError("Invalid date format") from exc # Handle missing values has_missing = df[value_column].isnull().any() if has_missing: df[value_column] = df[value_column].interpolate(method='linear') # Detect and handle outliers Q1 = df[value_column].quantile(0.25) Q3 = df[value_column].quantile(0.75) IQR = Q3 - Q1 outlier_mask = (df[value_column] < (Q1 - 1.5 * IQR)) | (df[value_column] > (Q3 + 1.5 * IQR)) has_outliers = outlier_mask.any() # Prepare data for Prophet prophet_df = df.rename(columns={date_column: 'ds', value_column: 'y'}) model = Prophet(yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=True) model.fit(prophet_df) # Make future dataframe for forecasting future = model.make_future_dataframe(periods=30) forecast = model.predict(future) result = { 'forecast': forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].to_dict('records'), 'components': { 'trend': forecast['trend'].to_dict(), 'yearly': forecast['yearly'].to_dict() if 'yearly' in forecast else {}, 'weekly': forecast['weekly'].to_dict() if 'weekly' in forecast else {}, 'daily': forecast['daily'].to_dict() if 'daily' in forecast else {} } } if has_missing: result['handling_missing_values'] = {'filled_indices': df[value_column].isnull().sum()} if has_outliers: result['outlier_impact'] = { 'outlier_indices': outlier_mask[outlier_mask].index.tolist(), 'outlier_values': df.loc[outlier_mask, value_column].tolist() } # Detect seasonality decomposition = seasonal_decompose(df[value_column], period=7, extrapolate_trend='freq') result['seasonality_components'] = { 'trend': decomposition.trend.to_dict(), 'seasonal': decomposition.seasonal.to_dict(), 'residual': decomposition.resid.to_dict() } # Cache the forecast with timestamp to ensure freshness timestamp = datetime.now().strftime('%Y%m%d%H') cache_key = f"forecast_{date_column}_{value_column}_{timestamp}" self.redis_client.set(cache_key, json.dumps(result, cls=CustomJSONEncoder)) return result def get_cached_forecast(self, date_column, value_column): """ Retrieve cached forecast results """ timestamp = datetime.now().strftime('%Y%m%d%H') cache_key = f"forecast_{date_column}_{value_column}_{timestamp}" cached = self.redis_client.get(cache_key) if cached: return json.loads(cached) return None def _analyze_market_dynamics(self, df): """ Analyze market dynamics across industries """ metrics = { 'market_trends': {}, 'competitive_analysis': {}, 'growth_patterns': {} } if 'revenue' in df.columns and 'date' in df.columns: # Trend Analysis df['month'] = pd.to_datetime(df['date']).dt.to_period('M') monthly_revenue = df.groupby('month')['revenue'].sum() # Calculate growth rates metrics['growth_patterns']['monthly_growth'] = float( ((monthly_revenue.iloc[-1] / monthly_revenue.iloc[0]) ** (1/len(monthly_revenue)) - 1) * 100 ) # Market volatility mean_revenue = monthly_revenue.mean() if mean_revenue > 0: # Avoid division by zero metrics['market_trends']['volatility'] = float(monthly_revenue.std() / mean_revenue) else: metrics['market_trends']['volatility'] = 0.0 if 'competitor_price' in df.columns and 'price' in df.columns: comp_price_mean = df['competitor_price'].mean() if comp_price_mean > 0: # Avoid division by zero metrics['competitive_analysis']['price_position'] = float( (df['price'].mean() / comp_price_mean - 1) * 100 ) else: metrics['competitive_analysis']['price_position'] = 0.0 return metrics def _analyze_supply_chain(self, df): """ Analyze supply chain metrics across industries """ metrics = { 'efficiency': {}, 'reliability': {}, 'cost_analysis': {} } # Supply Chain Network Analysis if 'supplier_id' in df.columns and 'delivery_time' in df.columns: supplier_performance = df.groupby('supplier_id').agg({ 'delivery_time': ['mean', 'std'], 'order_value': ['sum', 'mean'] }).round(2) metrics['reliability']['supplier_consistency'] = float( 1 - (supplier_performance['delivery_time']['std'] / supplier_performance['delivery_time']['mean']).mean() ) # Cost and Efficiency Analysis if 'transportation_cost' in df.columns and 'order_value' in df.columns: metrics['cost_analysis']['logistics_cost_ratio'] = float( (df['transportation_cost'].sum() / df['order_value'].sum()) * 100 ) return metrics def _analyze_customer_insights(self, df): """ Cross-industry customer behavior analysis """ insights = { 'customer_segments': {}, 'behavior_patterns': {}, 'lifetime_value': {} } if 'customer_id' in df.columns and 'transaction_amount' in df.columns: # Customer Segmentation using DBSCAN for more natural clustering customer_features = df.groupby('customer_id').agg({ 'transaction_amount': ['sum', 'mean', 'count'] }).values scaler = MinMaxScaler() scaled_features = scaler.fit_transform(customer_features) # Find optimal eps parameter for DBSCAN dbscan = DBSCAN(eps=0.3, min_samples=5) clusters = dbscan.fit_predict(scaled_features) insights['customer_segments']['natural_segments'] = { 'n_segments': len(np.unique(clusters[clusters >= 0])), 'segment_sizes': pd.Series(clusters).value_counts().to_dict() } return insights def _analyze_operational_efficiency(self, df): """ Cross-industry operational efficiency analysis """ metrics = { 'process_efficiency': {}, 'resource_utilization': {}, 'bottleneck_analysis': {} } if 'process_time' in df.columns and 'output_quantity' in df.columns: # Process Efficiency Analysis metrics['process_efficiency']['throughput_rate'] = float( df['output_quantity'].sum() / df['process_time'].sum() ) # Calculate process stability process_stability = 1 - (df['process_time'].std() / df['process_time'].mean()) metrics['process_efficiency']['stability_score'] = float(process_stability) return metrics def _analyze_risk_patterns(self, df): """ Cross-industry risk pattern analysis """ risk_metrics = { 'operational_risk': {}, 'market_risk': {}, 'compliance_risk': {} } numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: # Use Isolation Forest for risk pattern detection iso_forest = IsolationForest(contamination=0.1, random_state=42) risk_scores = iso_forest.fit_predict(df[numeric_cols]) risk_metrics['operational_risk']['anomaly_percentage'] = float( (risk_scores == -1).mean() * 100 ) return risk_metrics def _analyze_sustainability_metrics(self, df): """ Analyze sustainability metrics including environmental impact, resource utilization, and waste management """ if not all(col in df.columns for col in ['energy_consumption', 'water_consumption', 'waste_generated']): return {} results = { 'environmental_impact': { 'carbon_footprint_trend': df['carbon_footprint'].rolling(window=7).mean().to_dict() if 'carbon_footprint' in df.columns else {}, 'total_emissions': float(df['energy_consumption'].sum() * 0.5) }, 'resource_utilization': { 'energy_efficiency': float(df['energy_consumption'].mean()), 'water_efficiency': float(df['water_consumption'].mean()) }, 'waste_management': { 'recycling_performance': float(df['recycling_rate'].mean()) if 'recycling_rate' in df.columns else 0.0, 'waste_reduction_trend': df['waste_generated'].rolling(window=7).mean().to_dict() } } return results def prepare_ai_query_interface(self, df): """ Prepare data for natural language analytics queries with enhanced semantic understanding """ query_interface = { 'semantic_mappings': {}, 'entity_relationships': {}, 'available_metrics': {}, 'temporal_context': {}, 'metric_relationships': {}, 'data_patterns': {}, 'suggested_queries': [] } try: # Create semantic mappings for textual columns text_columns = df.select_dtypes(include=['object']).columns vectorizer = TfidfVectorizer(max_features=1000) for col in text_columns: if df[col].str.len().mean() > 5: # Only process meaningful text fields text_features = vectorizer.fit_transform(df[col].fillna('').astype(str)) query_interface['semantic_mappings'][col] = { 'vocabulary': vectorizer.vocabulary_, 'idf_values': vectorizer.idf_.tolist(), 'top_terms': dict(zip( vectorizer.get_feature_names_out(), np.asarray(text_features.sum(axis=0)).ravel() )) } # Map entity relationships and hierarchies entity_columns = [col for col in df.columns if any(entity in col.lower() for entity in ['id', 'category', 'type', 'name', 'class', 'group'])] for col in entity_columns: if df[col].dtype == 'object': value_counts = df[col].value_counts() unique_values = df[col].unique().tolist() # Find potential hierarchical relationships hierarchy = {} if '_' in col or col.lower().endswith('_id'): related_cols = [c for c in df.columns if col.split('_')[0] in c and c != col] for rel_col in related_cols: hierarchy[rel_col] = df.groupby(col)[rel_col].agg(list).to_dict() query_interface['entity_relationships'][col] = { 'unique_values': unique_values, 'value_counts': value_counts.to_dict(), 'hierarchy': hierarchy, 'cardinality': len(unique_values) } # Document available metrics and their relationships numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: stats = df[col].describe() query_interface['available_metrics'][col] = { 'min': float(stats['min']), 'max': float(stats['max']), 'mean': float(stats['mean']), 'std': float(stats['std']), 'quartiles': { '25%': float(stats['25%']), '50%': float(stats['50%']), '75%': float(stats['75%']) } } # Analyze metric relationships correlations = {} for other_col in numeric_cols: if col != other_col: corr = df[col].corr(df[other_col]) if abs(corr) > 0.3: # Only store meaningful correlations correlations[other_col] = float(corr) query_interface['metric_relationships'][col] = { 'correlations': correlations, 'trends': self._analyze_metric_trends(df, col) } # Add temporal context if available date_cols = df.select_dtypes(include=['datetime64']).columns if len(date_cols) == 0: # Try to convert string columns that might contain dates for col in df.columns: if df[col].dtype == 'object': try: pd.to_datetime(df[col]) date_cols = date_cols.append(col) except: continue for date_col in date_cols: df[date_col] = pd.to_datetime(df[date_col]) temporal_stats = { 'min_date': df[date_col].min().isoformat(), 'max_date': df[date_col].max().isoformat(), 'frequency': pd.infer_freq(df[date_col]), 'temporal_patterns': {} } # Analyze temporal patterns temporal_stats['temporal_patterns'] = { 'daily_pattern': df.groupby(df[date_col].dt.dayofweek).size().to_dict(), 'monthly_pattern': df.groupby(df[date_col].dt.month).size().to_dict(), 'yearly_pattern': df.groupby(df[date_col].dt.year).size().to_dict() } query_interface['temporal_context'][date_col] = temporal_stats # Identify data patterns and anomalies query_interface['data_patterns'] = { 'missing_patterns': df.isnull().sum().to_dict(), 'unique_value_counts': df.nunique().to_dict(), 'distribution_types': self._analyze_distributions(df) } # Generate suggested queries based on data characteristics query_interface['suggested_queries'] = self._generate_suggested_queries(df) # Add metadata about the dataset query_interface['metadata'] = { 'row_count': len(df), 'column_count': len(df.columns), 'memory_usage': df.memory_usage(deep=True).sum(), 'data_types': df.dtypes.astype(str).to_dict() } except Exception as e: query_interface['error'] = str(e) return query_interface def _analyze_metric_trends(self, df, column): """Helper method to analyze trends in numeric columns""" trends = {} if 'date' in df.columns: df['date'] = pd.to_datetime(df['date']) time_series = df.groupby('date')[column].mean() if len(time_series) > 2: # Calculate trend x = np.arange(len(time_series)) y = time_series.values slope, intercept = np.polyfit(x, y, 1) trends['slope'] = float(slope) trends['trend_direction'] = 'increasing' if slope > 0 else 'decreasing' trends['trend_strength'] = float(abs(slope) / time_series.mean()) return trends def _analyze_distributions(self, df): """Helper method to analyze value distributions""" distributions = {} numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: if df[col].nunique() > 5: # Skip columns with too few unique values # Test for normality _, p_value = stats.normaltest(df[col].dropna()) skewness = float(df[col].skew()) kurtosis = float(df[col].kurtosis()) distributions[col] = { 'distribution_type': 'normal' if p_value > 0.05 else 'non_normal', 'skewness': skewness, 'kurtosis': kurtosis } return distributions def _generate_suggested_queries(self, df): """Helper method to generate relevant query suggestions""" suggestions = [] # Add time-based queries if temporal data exists if 'date' in df.columns: suggestions.extend([ "Show the trend over time", "Compare year-over-year growth", "Find seasonal patterns" ]) # Add metric-based queries numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: suggestions.extend([ f"Analyze the distribution of {col}" for col in numeric_cols[:3] ]) # Add categorical analysis queries categorical_cols = df.select_dtypes(include=['object']).columns if len(categorical_cols) > 0: suggestions.extend([ f"Break down metrics by {col}" for col in categorical_cols[:3] ]) return suggestions def enhance_cross_industry_correlations(self, df): """ Enhanced analysis of correlations across different industries """ correlations = { 'metric_correlations': {}, 'industry_patterns': {}, 'shared_trends': {} } if 'industry' in df.columns: industries = df['industry'].unique() numeric_cols = df.select_dtypes(include=[np.number]).columns # Calculate cross-industry metric correlations for ind1 in industries: for ind2 in industries: if ind1 < ind2: # Avoid duplicate comparisons ind1_data = df[df['industry'] == ind1][numeric_cols] ind2_data = df[df['industry'] == ind2][numeric_cols] if not ind1_data.empty and not ind2_data.empty: common_metrics = set(ind1_data.columns) & set(ind2_data.columns) for metric in common_metrics: corr, p_value = pearsonr( ind1_data[metric].fillna(0), ind2_data[metric].fillna(0) ) correlations['metric_correlations'][f"{ind1}_{ind2}_{metric}"] = { 'correlation': float(corr), 'p_value': float(p_value) } # Identify shared trends if 'date' in df.columns: for metric in numeric_cols: industry_trends = {} for industry in industries: industry_data = df[df['industry'] == industry] if not industry_data.empty: trend = industry_data.groupby('date')[metric].mean() if len(trend) > 0: industry_trends[industry] = trend.to_dict() correlations['shared_trends'][metric] = industry_trends return correlations def perform_market_basket_analysis(self, df: pd.DataFrame, min_support: float = 0.01, min_confidence: float = 0.3, min_lift: float = 1.0) -> dict: """ Perform advanced market basket analysis with support for multiple analytics dimensions. Args: df (pd.DataFrame): Input transaction data with required columns min_support (float): Minimum support threshold for frequent itemsets (default: 0.01) min_confidence (float): Minimum confidence threshold for rules (default: 0.3) min_lift (float): Minimum lift threshold for rules (default: 1.0) Returns: dict: Dictionary containing: - product_associations: Support, confidence, and lift metrics for product pairs - temporal_baskets: Time-based purchase patterns - product_clusters: Product groupings based on purchase behavior - customer_segments: Customer segments based on purchase patterns - performance_metrics: Key performance indicators Raises: ValueError: If required columns are missing or data validation fails """ try: # Validate input data required_columns = ['transaction_id', 'product_id'] if not all(col in df.columns for col in required_columns): raise ValueError(f"Missing required columns: {set(required_columns) - set(df.columns)}") if df.empty: raise ValueError("Empty dataframe provided") # Work with a copy of the dataframe df = df.copy() # Convert to basket format with optimization for large datasets baskets = (df.groupby('transaction_id')['product_id'] .agg(lambda x: frozenset(x.values)) # Using frozenset for better performance .reset_index()) total_transactions = len(baskets) # Calculate product frequencies using vectorized operations product_freq = df.groupby('product_id').size().to_dict() # Generate product pairs efficiently pairs_data = [] for products in baskets['product_id']: products_list = list(products) # Convert frozenset to list once pairs_data.extend( tuple(sorted([p1, p2])) for i, p1 in enumerate(products_list) for p2 in products_list[i+1:] ) pair_freq = pd.Series(pairs_data).value_counts().to_dict() # Calculate association metrics with validation product_associations = { 'support': {}, 'confidence': {}, 'lift': {}, 'metrics_distribution': { 'support': {'min': float('inf'), 'max': 0, 'mean': 0}, 'confidence': {'min': float('inf'), 'max': 0, 'mean': 0}, 'lift': {'min': float('inf'), 'max': 0, 'mean': 0} } } valid_rules = [] for pair, freq in pair_freq.items(): prod1, prod2 = pair support = freq / total_transactions if support >= min_support: confidence_1_2 = freq / product_freq[prod1] confidence_2_1 = freq / product_freq[prod2] max_confidence = max(confidence_1_2, confidence_2_1) if max_confidence >= min_confidence: lift = (freq * total_transactions) / (product_freq[prod1] * product_freq[prod2]) if lift >= min_lift: valid_rules.append({ 'pair': pair, 'support': support, 'confidence': max_confidence, 'lift': lift }) # Store metrics with string keys for JSON serialization pair_key = f"({prod1}, {prod2})" product_associations['support'][pair_key] = float(support) product_associations['confidence'][pair_key] = float(max_confidence) product_associations['lift'][pair_key] = float(lift) # Update metrics distribution for metric_type, value in [('support', support), ('confidence', max_confidence), ('lift', lift)]: dist = product_associations['metrics_distribution'][metric_type] dist['min'] = min(dist['min'], value) dist['max'] = max(dist['max'], value) # Calculate means for distributions for metric_type in ['support', 'confidence', 'lift']: values = [rule[metric_type] for rule in valid_rules] if values: product_associations['metrics_distribution'][metric_type]['mean'] = float(sum(values) / len(values)) else: product_associations['metrics_distribution'][metric_type] = {'min': 0, 'max': 0, 'mean': 0} # Enhanced temporal analysis temporal_patterns = self._analyze_temporal_patterns(df) if 'timestamp' in df.columns else {} # Enhanced product clustering product_clusters = self._perform_product_clustering(df) if 'quantity' in df.columns else {} # Customer segmentation customer_segments = self._analyze_customer_segments(df) if 'customer_id' in df.columns else {} # Performance metrics performance_metrics = { 'total_transactions': total_transactions, 'unique_products': len(product_freq), 'avg_basket_size': float(df.groupby('transaction_id')['product_id'].count().mean()), 'total_rules_found': len(valid_rules), 'rules_distribution': { 'strong_associations': len([r for r in valid_rules if r['lift'] > 2]), 'moderate_associations': len([r for r in valid_rules if 1 < r['lift'] <= 2]), 'weak_associations': len([r for r in valid_rules if r['lift'] <= 1]) } } return { 'product_associations': product_associations, 'temporal_baskets': temporal_patterns, 'product_clusters': product_clusters, 'customer_segments': customer_segments, 'performance_metrics': performance_metrics } except Exception as e: print(f"Error in market basket analysis: {str(e)}") raise ValueError(f"Market basket analysis failed: {str(e)}") from e def _analyze_temporal_patterns(self, df: pd.DataFrame) -> dict: """Analyze temporal patterns in purchase behavior""" patterns = { 'daily_patterns': {}, 'weekly_patterns': {}, 'monthly_patterns': {}, 'hourly_patterns': {} } try: timestamps = pd.to_datetime(df['timestamp']) for period, grouper in [ ('hourly_patterns', timestamps.dt.hour), ('daily_patterns', timestamps.dt.day), ('weekly_patterns', timestamps.dt.dayofweek), ('monthly_patterns', timestamps.dt.month) ]: pattern_data = df.groupby(grouper).agg({ 'product_id': ['count', 'nunique'], 'transaction_id': 'nunique', 'quantity': ['sum', 'mean'] if 'quantity' in df.columns else ['count'] }).round(2) patterns[period] = { 'transaction_count': pattern_data['transaction_id']['nunique'].to_dict(), 'product_count': pattern_data['product_id']['count'].to_dict(), 'unique_products': pattern_data['product_id']['nunique'].to_dict(), 'total_quantity': pattern_data['quantity']['sum'].to_dict() if 'quantity' in df.columns else {}, 'avg_quantity': pattern_data['quantity']['mean'].to_dict() if 'quantity' in df.columns else {} } except (ValueError, KeyError) as e: print(f"Error in temporal pattern analysis: {str(e)}") return patterns return patterns def _perform_product_clustering(self, df: pd.DataFrame) -> dict: """Perform advanced product clustering analysis""" try: # Create rich product features product_features = df.groupby('product_id').agg({ 'quantity': ['mean', 'std', 'sum', 'count'], 'transaction_id': 'nunique' }).fillna(0) # Feature engineering product_features['quantity_per_transaction'] = ( product_features['quantity']['sum'] / product_features['transaction_id']['nunique'] ) # Prepare features for clustering features_for_clustering = product_features.copy() features_for_clustering.columns = [f"{col[0]}_{col[1]}" if isinstance(col, tuple) else col for col in features_for_clustering.columns] if len(features_for_clustering) > 1: # Scale features scaler = StandardScaler() scaled_features = scaler.fit_transform(features_for_clustering) # Determine optimal number of clusters max_clusters = min(5, len(features_for_clustering) - 1) scores = [] for k in range(2, max_clusters + 1): kmeans = KMeans(n_clusters=k, random_state=42) clusters = kmeans.fit_predict(scaled_features) score = silhouette_score(scaled_features, clusters) scores.append((k, score)) # Use optimal number of clusters optimal_k = max(scores, key=lambda x: x[1])[0] kmeans = KMeans(n_clusters=optimal_k, random_state=42) clusters = kmeans.fit_predict(scaled_features) # Prepare cluster insights cluster_data = { 'cluster_assignments': { prod: int(cluster) for prod, cluster in zip(product_features.index, clusters) }, 'cluster_profiles': {}, 'evaluation_metrics': { 'silhouette_score': float(max(scores, key=lambda x: x[1])[1]), 'num_clusters': optimal_k } } # Generate cluster profiles for cluster_id in range(optimal_k): cluster_mask = clusters == cluster_id cluster_data['cluster_profiles'][str(cluster_id)] = { 'size': int(sum(cluster_mask)), 'avg_quantity': float(product_features['quantity']['mean'][cluster_mask].mean()), 'avg_transactions': float(product_features['transaction_id']['nunique'][cluster_mask].mean()), 'total_quantity': float(product_features['quantity']['sum'][cluster_mask].sum()), 'purchase_frequency': float( (product_features['quantity']['count'][cluster_mask].sum() / product_features['transaction_id']['nunique'][cluster_mask].sum()) ) } return cluster_data except np.linalg.LinAlgError as e: print(f"Error in clustering computation: {str(e)}") return {} except (ValueError, KeyError) as e: print(f"Error in product clustering: {str(e)}") return {} return {} def _analyze_customer_segments(self, df: pd.DataFrame) -> dict: """Analyze customer segments based on purchase behavior""" try: if 'customer_id' not in df.columns: return {} customer_stats = df.groupby('customer_id').agg({ 'transaction_id': 'nunique', 'product_id': ['nunique', 'count'], 'quantity': ['sum', 'mean'] if 'quantity' in df.columns else ['count', 'mean'] }) # Calculate RFM scores if 'timestamp' in df.columns: current_date = pd.to_datetime(df['timestamp']).max() customer_stats['recency'] = df.groupby('customer_id')['timestamp'].max().apply( lambda x: (current_date - pd.to_datetime(x)).days ) # Segment customers stats_for_clustering = customer_stats.copy() stats_for_clustering.columns = [f"{col[0]}_{col[1]}" if isinstance(col, tuple) else col for col in stats_for_clustering.columns] if len(stats_for_clustering) > 1: scaler = StandardScaler() scaled_features = scaler.fit_transform(stats_for_clustering) # Use DBSCAN for flexible cluster numbers dbscan = DBSCAN(eps=0.5, min_samples=3) clusters = dbscan.fit_predict(scaled_features) return { 'customer_segments': { str(cust): int(cluster) for cust, cluster in zip(customer_stats.index, clusters) }, 'segment_profiles': { str(segment): { 'size': int(sum(clusters == segment)), 'avg_transactions': float(customer_stats['transaction_id']['nunique'][clusters == segment].mean()), 'avg_products': float(customer_stats['product_id']['nunique'][clusters == segment].mean()) } for segment in set(clusters) if segment != -1 }, 'segment_statistics': { 'num_segments': len(set(clusters) - {-1}), 'noise_points': int(sum(clusters == -1)) } } except Exception as e: print(f"Error in customer segmentation: {str(e)}") return {} def _calculate_correlations(self, df: pd.DataFrame) -> dict: """Calculate correlations between numeric columns with detailed statistics""" correlations = {} try: numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) < 2: return correlations # Calculate correlation matrix corr_matrix = df[numeric_cols].corr() # Convert correlations to dictionary with additional metadata for col1 in numeric_cols: correlations[col1] = {} for col2 in numeric_cols: if col1 != col2: correlation = corr_matrix.loc[col1, col2] if not np.isnan(correlation): # Calculate p-value using pearsonr coef, p_value = pearsonr(df[col1].fillna(0), df[col2].fillna(0)) correlations[col1][col2] = { 'coefficient': float(correlation), 'p_value': float(p_value), 'strength': 'strong' if abs(correlation) > 0.7 else 'moderate' if abs(correlation) > 0.3 else 'weak', 'direction': 'positive' if correlation > 0 else 'negative', 'sample_size': len(df) } except Exception as e: print(f"Error calculating correlations: {str(e)}") return {} return correlations