Spaces:
Paused
Paused
| import pandas as pd | |
| import numpy as np | |
| from prophet import Prophet | |
| from datetime import datetime | |
| import redis | |
| import json | |
| from sklearn.cluster import KMeans, DBSCAN | |
| from sklearn.preprocessing import StandardScaler, MinMaxScaler | |
| from sklearn.decomposition import PCA | |
| from sklearn.ensemble import IsolationForest | |
| from .json_utils import CustomJSONEncoder | |
| from scipy import stats | |
| from scipy.stats import pearsonr | |
| from statsmodels.tsa.seasonal import seasonal_decompose | |
| from statsmodels.tsa.stattools import adfuller | |
| import networkx as nx | |
| from sklearn.metrics import silhouette_score | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from .supermarket_metrics import supermarket_insights | |
| from app.utils.detect_industry import is_supermarket # next snippet | |
| class AnalyticsService: | |
| def __init__(self): | |
| self.redis_client = redis.Redis(host='localhost', port=6379, db=0) | |
| self.industry_metrics = { | |
| 'retail': self._retail_metrics, | |
| 'wholesale': self._wholesale_metrics, | |
| 'supermarket': self._supermarket_metrics, | |
| 'manufacturing': self._manufacturing_metrics, | |
| 'healthcare': self._healthcare_metrics | |
| } | |
| self.cross_industry_analyzers = { | |
| 'market_dynamics': self._analyze_market_dynamics, | |
| 'supply_chain': self._analyze_supply_chain, | |
| 'customer_insights': self._analyze_customer_insights, | |
| 'operational_efficiency': self._analyze_operational_efficiency, | |
| 'risk_assessment': self._analyze_risk_patterns, | |
| 'sustainability': self._analyze_sustainability_metrics | |
| } | |
| def perform_eda(self, data, industry=None): | |
| """ | |
| Perform enhanced Exploratory Data Analysis with cross-industry insights | |
| """ | |
| if not data: | |
| raise ValueError("Empty dataset provided") | |
| df = pd.DataFrame(data) | |
| if df.empty: | |
| raise ValueError("Empty dataset provided") | |
| # Validate numeric columns | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) == 0: | |
| raise ValueError("Non-numeric values found in dataset") | |
| # Convert date columns to datetime | |
| date_columns = [] | |
| for col in df.columns: | |
| if df[col].dtype == 'object': | |
| try: | |
| df[col] = pd.to_datetime(df[col]) | |
| date_columns.append(col) | |
| except (ValueError, TypeError): | |
| continue | |
| # Get numeric columns excluding dates | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| # Advanced statistics and AI-ready features | |
| analysis_results = { | |
| 'basic_stats': df[numeric_cols].describe().to_dict() if len(numeric_cols) > 0 else {}, | |
| 'missing_values': df.isnull().sum().to_dict(), | |
| 'columns': list(df.columns), | |
| 'row_count': len(df), | |
| 'correlation_matrix': df[numeric_cols].corr().to_dict() if len(numeric_cols) > 0 else {}, | |
| 'skewness': df[numeric_cols].skew().to_dict() if len(numeric_cols) > 0 else {}, | |
| 'kurtosis': df[numeric_cols].kurtosis().to_dict() if len(numeric_cols) > 0 else {}, | |
| 'outliers': self._detect_outliers(df), | |
| 'distribution_tests': self._perform_distribution_tests(df), | |
| 'dimensionality_reduction': self._perform_dimensionality_reduction(df), | |
| 'temporal_patterns': self._analyze_temporal_patterns(df), | |
| 'anomaly_detection': self._detect_anomalies(df), | |
| 'feature_importance': self._calculate_feature_importance(df) | |
| } | |
| # --- supermarket auto-detection --- | |
| if is_supermarket(df): | |
| industry = 'supermarket' | |
| results['supermarket_kpis'] = supermarket_insights(df) | |
| # Add industry-specific metrics | |
| if industry and industry.lower() in self.industry_metrics: | |
| analysis_results['industry_metrics'] = self.industry_metrics[industry.lower()](df) | |
| # Add cross-industry insights | |
| analysis_results['cross_industry_insights'] = {} | |
| for analyzer_name, analyzer_func in self.cross_industry_analyzers.items(): | |
| analysis_results['cross_industry_insights'][analyzer_name] = analyzer_func(df) | |
| return analysis_results | |
| def _detect_outliers(self, df): | |
| """ | |
| Detect outliers using IQR method for numerical columns | |
| """ | |
| outliers = {} | |
| for column in df.select_dtypes(include=[np.number]).columns: | |
| Q1 = df[column].quantile(0.25) | |
| Q3 = df[column].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| outliers[column] = { | |
| 'count': len(df[(df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR))]), | |
| 'percentage': len(df[(df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR))]) / len(df) * 100 | |
| } | |
| return outliers | |
| def _perform_distribution_tests(self, df): | |
| """ | |
| Perform distribution tests for numerical columns | |
| """ | |
| tests = {} | |
| for column in df.select_dtypes(include=[np.number]).columns: | |
| shapiro_test = stats.shapiro(df[column].dropna()) | |
| tests[column] = { | |
| 'shapiro_test': { | |
| 'statistic': float(shapiro_test.statistic), | |
| 'p_value': float(shapiro_test.pvalue) | |
| } | |
| } | |
| return tests | |
| def _perform_dimensionality_reduction(self, df): | |
| """ | |
| Perform PCA for dimensional insights | |
| """ | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) < 2: | |
| return {} | |
| scaler = StandardScaler() | |
| scaled_data = scaler.fit_transform(df[numeric_cols]) | |
| pca = PCA() | |
| pca_result = pca.fit_transform(scaled_data) | |
| return { | |
| 'explained_variance_ratio': pca.explained_variance_ratio_.tolist(), | |
| 'cumulative_variance_ratio': np.cumsum(pca.explained_variance_ratio_).tolist(), | |
| 'n_components_95_variance': np.argmax(np.cumsum(pca.explained_variance_ratio_) >= 0.95) + 1 | |
| } | |
| def _analyze_temporal_patterns(self, df): | |
| """ | |
| Analyze temporal patterns and seasonality | |
| """ | |
| date_cols = df.select_dtypes(include=['datetime64']).columns | |
| if len(date_cols) == 0: | |
| return None | |
| patterns = {} | |
| for date_col in date_cols: | |
| df['year'] = df[date_col].dt.year | |
| df['month'] = df[date_col].dt.month | |
| df['day_of_week'] = df[date_col].dt.dayofweek | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| for metric in numeric_cols: | |
| if metric not in ['year', 'month', 'day_of_week']: | |
| patterns[f"{metric}_by_month"] = df.groupby('month')[metric].mean().to_dict() | |
| patterns[f"{metric}_by_day_of_week"] = df.groupby('day_of_week')[metric].mean().to_dict() | |
| return patterns | |
| def _detect_anomalies(self, df): | |
| """ | |
| Detect anomalies using multiple methods | |
| """ | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) == 0: | |
| return None | |
| scaler = StandardScaler() | |
| scaled_data = scaler.fit_transform(df[numeric_cols]) | |
| isolation_forest = IsolationForest(random_state=42, contamination=0.1) | |
| anomalies = isolation_forest.fit_predict(scaled_data) | |
| return { | |
| 'anomaly_percentage': float((anomalies == -1).mean() * 100), | |
| 'anomaly_indices': np.where(anomalies == -1)[0].tolist() | |
| } | |
| def _calculate_feature_importance(self, df): | |
| """ | |
| Calculate feature importance and relationships | |
| """ | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) < 2: | |
| return None | |
| importance = {} | |
| for col in numeric_cols: | |
| correlations = [] | |
| for other_col in numeric_cols: | |
| if col != other_col: | |
| # Check if either column is constant | |
| if df[col].nunique() <= 1 or df[other_col].nunique() <= 1: | |
| continue | |
| try: | |
| corr, _ = pearsonr(df[col].fillna(0), df[other_col].fillna(0)) | |
| if not np.isnan(corr): # Only add if correlation is valid | |
| correlations.append((other_col, abs(corr))) | |
| except ValueError: | |
| continue # Skip if correlation can't be calculated | |
| # Handle empty correlations case | |
| correlation_values = [abs(c[1]) for c in correlations] | |
| importance[col] = { | |
| 'top_correlations': sorted(correlations, key=lambda x: abs(x[1]), reverse=True)[:3], | |
| 'correlation_strength': float(np.mean(correlation_values)) if correlation_values else 0.0 | |
| } | |
| return importance | |
| def _retail_metrics(self, df): | |
| """Calculate retail-specific metrics""" | |
| if not all(col in df.columns for col in ['sales', 'inventory', 'customer_satisfaction']): | |
| # Return default structure if required columns are missing | |
| return { | |
| 'sales_performance': {}, | |
| 'customer_behavior': {}, | |
| 'inventory': {} | |
| } | |
| metrics = { | |
| 'sales_performance': { | |
| 'total_sales': float(df['sales'].sum()) if 'sales' in df.columns else 0.0, | |
| 'average_daily_sales': float(df['sales'].mean()) if 'sales' in df.columns else 0.0, | |
| 'sales_growth': float((df['sales'].iloc[-1] / df['sales'].iloc[0] - 1) * 100) if 'sales' in df.columns else 0.0 | |
| }, | |
| 'inventory_turnover': { | |
| 'rate': float(df['sales'].sum() / df['inventory'].mean()) if all(col in df.columns for col in ['sales', 'inventory']) else 0.0, | |
| 'days_of_inventory': float(df['inventory'].mean() / (df['sales'].mean() / 30)) if all(col in df.columns for col in ['sales', 'inventory']) else 0.0 | |
| }, | |
| 'customer_metrics': { | |
| 'satisfaction_score': float(df['customer_satisfaction'].mean()) if 'customer_satisfaction' in df.columns else 0.0, | |
| 'satisfaction_trend': df['customer_satisfaction'].rolling(window=7).mean().to_dict() if 'customer_satisfaction' in df.columns else {} | |
| } | |
| } | |
| return metrics | |
| def _wholesale_metrics(self, df): | |
| """ | |
| Calculate wholesale-specific metrics | |
| """ | |
| metrics = { | |
| 'order_analytics': {}, | |
| 'supplier_performance': {}, | |
| 'distribution': {} | |
| } | |
| if 'order_value' in df.columns: | |
| metrics['order_analytics']['average_order_value'] = float(df['order_value'].mean()) | |
| metrics['order_analytics']['order_value_distribution'] = df['order_value'].quantile([0.25, 0.5, 0.75]).to_dict() | |
| if 'supplier_id' in df.columns and 'delivery_time' in df.columns: | |
| supplier_performance = df.groupby('supplier_id')['delivery_time'].agg(['mean', 'std']).to_dict() | |
| metrics['supplier_performance'] = supplier_performance | |
| return metrics | |
| def _supermarket_metrics(self, df): | |
| """ | |
| Calculate supermarket-specific metrics | |
| """ | |
| metrics = { | |
| 'category_performance': {}, | |
| 'basket_analysis': {}, | |
| 'promotion_impact': {} | |
| } | |
| if 'category' in df.columns and 'sales_amount' in df.columns: | |
| category_sales = df.groupby('category')['sales_amount'].sum() | |
| metrics['category_performance']['top_categories'] = category_sales.nlargest(5).to_dict() | |
| if 'transaction_id' in df.columns and 'product_id' in df.columns: | |
| # Simple basket analysis | |
| transactions = df.groupby('transaction_id')['product_id'].count() | |
| metrics['basket_analysis']['average_items_per_transaction'] = float(transactions.mean()) | |
| if 'promotion_flag' in df.columns and 'sales_amount' in df.columns: | |
| promo_impact = df.groupby('promotion_flag')['sales_amount'].mean() | |
| metrics['promotion_impact']['sales_lift'] = float( | |
| (promo_impact.get(1, 0) - promo_impact.get(0, 0)) / promo_impact.get(0, 1) * 100 | |
| ) | |
| return metrics | |
| def _manufacturing_metrics(self, df): | |
| """Calculate manufacturing-specific metrics""" | |
| production_col = 'production_volume' if 'production_volume' in df.columns else 'units_produced' | |
| metrics = { | |
| 'production_efficiency': { | |
| 'volume': float(df[production_col].mean()), | |
| 'trend': df[production_col].rolling(window=7).mean().to_dict() | |
| }, | |
| 'quality_metrics': { | |
| 'defect_rate': float(df['defect_rate'].mean()) if 'defect_rate' in df.columns else 0.0, | |
| 'quality_trend': df['defect_rate'].rolling(window=7).mean().to_dict() if 'defect_rate' in df.columns else {} | |
| }, | |
| 'quality_control': { | |
| 'defects_per_unit': float(df['defect_rate'].mean()) if 'defect_rate' in df.columns else 0.0, | |
| 'defect_trend': df['defect_rate'].rolling(window=7).mean().to_dict() if 'defect_rate' in df.columns else {} | |
| }, | |
| 'equipment_utilization': { | |
| 'rate': float((df[production_col] / df[production_col].max()).mean() * 100), | |
| 'trend': df[production_col].rolling(window=7).mean().to_dict() | |
| } | |
| } | |
| return metrics | |
| def _healthcare_metrics(self, df): | |
| """Calculate healthcare-specific metrics""" | |
| metrics = { | |
| 'patient_outcomes': { | |
| 'satisfaction': float(df['patient_satisfaction'].mean()), | |
| 'treatment_success': float(df['treatment_success_rate'].mean()) | |
| }, | |
| 'operational_efficiency': { | |
| 'avg_wait_time': float(df['order_fulfillment_time'].mean()), | |
| 'utilization_rate': float(df['production_volume'].mean() / df['production_volume'].max()) | |
| }, | |
| 'quality_of_care': { | |
| 'satisfaction_trend': df['patient_satisfaction'].rolling(window=7).mean().to_dict(), | |
| 'success_rate_trend': df['treatment_success_rate'].rolling(window=7).mean().to_dict() | |
| } | |
| } | |
| return metrics | |
| def forecast_timeseries(self, data, date_column, value_column): | |
| """ | |
| Forecast time series data with support for edge cases | |
| """ | |
| if not data: | |
| raise ValueError("Empty dataset provided") | |
| df = pd.DataFrame(data) | |
| if date_column not in df.columns: | |
| raise KeyError(f"Required column '{date_column}' not found") | |
| if value_column not in df.columns: | |
| raise KeyError(f"Required column '{value_column}' not found") | |
| # Convert to datetime | |
| try: | |
| df[date_column] = pd.to_datetime(df[date_column]) | |
| except ValueError as exc: | |
| raise ValueError("Invalid date format") from exc | |
| # Handle missing values | |
| has_missing = df[value_column].isnull().any() | |
| if has_missing: | |
| df[value_column] = df[value_column].interpolate(method='linear') | |
| # Detect and handle outliers | |
| Q1 = df[value_column].quantile(0.25) | |
| Q3 = df[value_column].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| outlier_mask = (df[value_column] < (Q1 - 1.5 * IQR)) | (df[value_column] > (Q3 + 1.5 * IQR)) | |
| has_outliers = outlier_mask.any() | |
| # Prepare data for Prophet | |
| prophet_df = df.rename(columns={date_column: 'ds', value_column: 'y'}) | |
| model = Prophet(yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=True) | |
| model.fit(prophet_df) | |
| # Make future dataframe for forecasting | |
| future = model.make_future_dataframe(periods=30) | |
| forecast = model.predict(future) | |
| result = { | |
| 'forecast': forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].to_dict('records'), | |
| 'components': { | |
| 'trend': forecast['trend'].to_dict(), | |
| 'yearly': forecast['yearly'].to_dict() if 'yearly' in forecast else {}, | |
| 'weekly': forecast['weekly'].to_dict() if 'weekly' in forecast else {}, | |
| 'daily': forecast['daily'].to_dict() if 'daily' in forecast else {} | |
| } | |
| } | |
| if has_missing: | |
| result['handling_missing_values'] = {'filled_indices': df[value_column].isnull().sum()} | |
| if has_outliers: | |
| result['outlier_impact'] = { | |
| 'outlier_indices': outlier_mask[outlier_mask].index.tolist(), | |
| 'outlier_values': df.loc[outlier_mask, value_column].tolist() | |
| } | |
| # Detect seasonality | |
| decomposition = seasonal_decompose(df[value_column], period=7, extrapolate_trend='freq') | |
| result['seasonality_components'] = { | |
| 'trend': decomposition.trend.to_dict(), | |
| 'seasonal': decomposition.seasonal.to_dict(), | |
| 'residual': decomposition.resid.to_dict() | |
| } | |
| # Cache the forecast with timestamp to ensure freshness | |
| timestamp = datetime.now().strftime('%Y%m%d%H') | |
| cache_key = f"forecast_{date_column}_{value_column}_{timestamp}" | |
| self.redis_client.set(cache_key, json.dumps(result, cls=CustomJSONEncoder)) | |
| return result | |
| def get_cached_forecast(self, date_column, value_column): | |
| """ | |
| Retrieve cached forecast results | |
| """ | |
| timestamp = datetime.now().strftime('%Y%m%d%H') | |
| cache_key = f"forecast_{date_column}_{value_column}_{timestamp}" | |
| cached = self.redis_client.get(cache_key) | |
| if cached: | |
| return json.loads(cached) | |
| return None | |
| def _analyze_market_dynamics(self, df): | |
| """ | |
| Analyze market dynamics across industries | |
| """ | |
| metrics = { | |
| 'market_trends': {}, | |
| 'competitive_analysis': {}, | |
| 'growth_patterns': {} | |
| } | |
| if 'revenue' in df.columns and 'date' in df.columns: | |
| # Trend Analysis | |
| df['month'] = pd.to_datetime(df['date']).dt.to_period('M') | |
| monthly_revenue = df.groupby('month')['revenue'].sum() | |
| # Calculate growth rates | |
| metrics['growth_patterns']['monthly_growth'] = float( | |
| ((monthly_revenue.iloc[-1] / monthly_revenue.iloc[0]) ** (1/len(monthly_revenue)) - 1) * 100 | |
| ) | |
| # Market volatility | |
| mean_revenue = monthly_revenue.mean() | |
| if mean_revenue > 0: # Avoid division by zero | |
| metrics['market_trends']['volatility'] = float(monthly_revenue.std() / mean_revenue) | |
| else: | |
| metrics['market_trends']['volatility'] = 0.0 | |
| if 'competitor_price' in df.columns and 'price' in df.columns: | |
| comp_price_mean = df['competitor_price'].mean() | |
| if comp_price_mean > 0: # Avoid division by zero | |
| metrics['competitive_analysis']['price_position'] = float( | |
| (df['price'].mean() / comp_price_mean - 1) * 100 | |
| ) | |
| else: | |
| metrics['competitive_analysis']['price_position'] = 0.0 | |
| return metrics | |
| def _analyze_supply_chain(self, df): | |
| """ | |
| Analyze supply chain metrics across industries | |
| """ | |
| metrics = { | |
| 'efficiency': {}, | |
| 'reliability': {}, | |
| 'cost_analysis': {} | |
| } | |
| # Supply Chain Network Analysis | |
| if 'supplier_id' in df.columns and 'delivery_time' in df.columns: | |
| supplier_performance = df.groupby('supplier_id').agg({ | |
| 'delivery_time': ['mean', 'std'], | |
| 'order_value': ['sum', 'mean'] | |
| }).round(2) | |
| metrics['reliability']['supplier_consistency'] = float( | |
| 1 - (supplier_performance['delivery_time']['std'] / supplier_performance['delivery_time']['mean']).mean() | |
| ) | |
| # Cost and Efficiency Analysis | |
| if 'transportation_cost' in df.columns and 'order_value' in df.columns: | |
| metrics['cost_analysis']['logistics_cost_ratio'] = float( | |
| (df['transportation_cost'].sum() / df['order_value'].sum()) * 100 | |
| ) | |
| return metrics | |
| def _analyze_customer_insights(self, df): | |
| """ | |
| Cross-industry customer behavior analysis | |
| """ | |
| insights = { | |
| 'customer_segments': {}, | |
| 'behavior_patterns': {}, | |
| 'lifetime_value': {} | |
| } | |
| if 'customer_id' in df.columns and 'transaction_amount' in df.columns: | |
| # Customer Segmentation using DBSCAN for more natural clustering | |
| customer_features = df.groupby('customer_id').agg({ | |
| 'transaction_amount': ['sum', 'mean', 'count'] | |
| }).values | |
| scaler = MinMaxScaler() | |
| scaled_features = scaler.fit_transform(customer_features) | |
| # Find optimal eps parameter for DBSCAN | |
| dbscan = DBSCAN(eps=0.3, min_samples=5) | |
| clusters = dbscan.fit_predict(scaled_features) | |
| insights['customer_segments']['natural_segments'] = { | |
| 'n_segments': len(np.unique(clusters[clusters >= 0])), | |
| 'segment_sizes': pd.Series(clusters).value_counts().to_dict() | |
| } | |
| return insights | |
| def _analyze_operational_efficiency(self, df): | |
| """ | |
| Cross-industry operational efficiency analysis | |
| """ | |
| metrics = { | |
| 'process_efficiency': {}, | |
| 'resource_utilization': {}, | |
| 'bottleneck_analysis': {} | |
| } | |
| if 'process_time' in df.columns and 'output_quantity' in df.columns: | |
| # Process Efficiency Analysis | |
| metrics['process_efficiency']['throughput_rate'] = float( | |
| df['output_quantity'].sum() / df['process_time'].sum() | |
| ) | |
| # Calculate process stability | |
| process_stability = 1 - (df['process_time'].std() / df['process_time'].mean()) | |
| metrics['process_efficiency']['stability_score'] = float(process_stability) | |
| return metrics | |
| def _analyze_risk_patterns(self, df): | |
| """ | |
| Cross-industry risk pattern analysis | |
| """ | |
| risk_metrics = { | |
| 'operational_risk': {}, | |
| 'market_risk': {}, | |
| 'compliance_risk': {} | |
| } | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) > 0: | |
| # Use Isolation Forest for risk pattern detection | |
| iso_forest = IsolationForest(contamination=0.1, random_state=42) | |
| risk_scores = iso_forest.fit_predict(df[numeric_cols]) | |
| risk_metrics['operational_risk']['anomaly_percentage'] = float( | |
| (risk_scores == -1).mean() * 100 | |
| ) | |
| return risk_metrics | |
| def _analyze_sustainability_metrics(self, df): | |
| """ | |
| Analyze sustainability metrics including environmental impact, resource utilization, and waste management | |
| """ | |
| if not all(col in df.columns for col in ['energy_consumption', 'water_consumption', 'waste_generated']): | |
| return {} | |
| results = { | |
| 'environmental_impact': { | |
| 'carbon_footprint_trend': df['carbon_footprint'].rolling(window=7).mean().to_dict() if 'carbon_footprint' in df.columns else {}, | |
| 'total_emissions': float(df['energy_consumption'].sum() * 0.5) | |
| }, | |
| 'resource_utilization': { | |
| 'energy_efficiency': float(df['energy_consumption'].mean()), | |
| 'water_efficiency': float(df['water_consumption'].mean()) | |
| }, | |
| 'waste_management': { | |
| 'recycling_performance': float(df['recycling_rate'].mean()) if 'recycling_rate' in df.columns else 0.0, | |
| 'waste_reduction_trend': df['waste_generated'].rolling(window=7).mean().to_dict() | |
| } | |
| } | |
| return results | |
| def prepare_ai_query_interface(self, df): | |
| """ | |
| Prepare data for natural language analytics queries with enhanced semantic understanding | |
| """ | |
| query_interface = { | |
| 'semantic_mappings': {}, | |
| 'entity_relationships': {}, | |
| 'available_metrics': {}, | |
| 'temporal_context': {}, | |
| 'metric_relationships': {}, | |
| 'data_patterns': {}, | |
| 'suggested_queries': [] | |
| } | |
| try: | |
| # Create semantic mappings for textual columns | |
| text_columns = df.select_dtypes(include=['object']).columns | |
| vectorizer = TfidfVectorizer(max_features=1000) | |
| for col in text_columns: | |
| if df[col].str.len().mean() > 5: # Only process meaningful text fields | |
| text_features = vectorizer.fit_transform(df[col].fillna('').astype(str)) | |
| query_interface['semantic_mappings'][col] = { | |
| 'vocabulary': vectorizer.vocabulary_, | |
| 'idf_values': vectorizer.idf_.tolist(), | |
| 'top_terms': dict(zip( | |
| vectorizer.get_feature_names_out(), | |
| np.asarray(text_features.sum(axis=0)).ravel() | |
| )) | |
| } | |
| # Map entity relationships and hierarchies | |
| entity_columns = [col for col in df.columns if any(entity in col.lower() | |
| for entity in ['id', 'category', 'type', 'name', 'class', 'group'])] | |
| for col in entity_columns: | |
| if df[col].dtype == 'object': | |
| value_counts = df[col].value_counts() | |
| unique_values = df[col].unique().tolist() | |
| # Find potential hierarchical relationships | |
| hierarchy = {} | |
| if '_' in col or col.lower().endswith('_id'): | |
| related_cols = [c for c in df.columns if col.split('_')[0] in c and c != col] | |
| for rel_col in related_cols: | |
| hierarchy[rel_col] = df.groupby(col)[rel_col].agg(list).to_dict() | |
| query_interface['entity_relationships'][col] = { | |
| 'unique_values': unique_values, | |
| 'value_counts': value_counts.to_dict(), | |
| 'hierarchy': hierarchy, | |
| 'cardinality': len(unique_values) | |
| } | |
| # Document available metrics and their relationships | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| for col in numeric_cols: | |
| stats = df[col].describe() | |
| query_interface['available_metrics'][col] = { | |
| 'min': float(stats['min']), | |
| 'max': float(stats['max']), | |
| 'mean': float(stats['mean']), | |
| 'std': float(stats['std']), | |
| 'quartiles': { | |
| '25%': float(stats['25%']), | |
| '50%': float(stats['50%']), | |
| '75%': float(stats['75%']) | |
| } | |
| } | |
| # Analyze metric relationships | |
| correlations = {} | |
| for other_col in numeric_cols: | |
| if col != other_col: | |
| corr = df[col].corr(df[other_col]) | |
| if abs(corr) > 0.3: # Only store meaningful correlations | |
| correlations[other_col] = float(corr) | |
| query_interface['metric_relationships'][col] = { | |
| 'correlations': correlations, | |
| 'trends': self._analyze_metric_trends(df, col) | |
| } | |
| # Add temporal context if available | |
| date_cols = df.select_dtypes(include=['datetime64']).columns | |
| if len(date_cols) == 0: | |
| # Try to convert string columns that might contain dates | |
| for col in df.columns: | |
| if df[col].dtype == 'object': | |
| try: | |
| pd.to_datetime(df[col]) | |
| date_cols = date_cols.append(col) | |
| except: | |
| continue | |
| for date_col in date_cols: | |
| df[date_col] = pd.to_datetime(df[date_col]) | |
| temporal_stats = { | |
| 'min_date': df[date_col].min().isoformat(), | |
| 'max_date': df[date_col].max().isoformat(), | |
| 'frequency': pd.infer_freq(df[date_col]), | |
| 'temporal_patterns': {} | |
| } | |
| # Analyze temporal patterns | |
| temporal_stats['temporal_patterns'] = { | |
| 'daily_pattern': df.groupby(df[date_col].dt.dayofweek).size().to_dict(), | |
| 'monthly_pattern': df.groupby(df[date_col].dt.month).size().to_dict(), | |
| 'yearly_pattern': df.groupby(df[date_col].dt.year).size().to_dict() | |
| } | |
| query_interface['temporal_context'][date_col] = temporal_stats | |
| # Identify data patterns and anomalies | |
| query_interface['data_patterns'] = { | |
| 'missing_patterns': df.isnull().sum().to_dict(), | |
| 'unique_value_counts': df.nunique().to_dict(), | |
| 'distribution_types': self._analyze_distributions(df) | |
| } | |
| # Generate suggested queries based on data characteristics | |
| query_interface['suggested_queries'] = self._generate_suggested_queries(df) | |
| # Add metadata about the dataset | |
| query_interface['metadata'] = { | |
| 'row_count': len(df), | |
| 'column_count': len(df.columns), | |
| 'memory_usage': df.memory_usage(deep=True).sum(), | |
| 'data_types': df.dtypes.astype(str).to_dict() | |
| } | |
| except Exception as e: | |
| query_interface['error'] = str(e) | |
| return query_interface | |
| def _analyze_metric_trends(self, df, column): | |
| """Helper method to analyze trends in numeric columns""" | |
| trends = {} | |
| if 'date' in df.columns: | |
| df['date'] = pd.to_datetime(df['date']) | |
| time_series = df.groupby('date')[column].mean() | |
| if len(time_series) > 2: | |
| # Calculate trend | |
| x = np.arange(len(time_series)) | |
| y = time_series.values | |
| slope, intercept = np.polyfit(x, y, 1) | |
| trends['slope'] = float(slope) | |
| trends['trend_direction'] = 'increasing' if slope > 0 else 'decreasing' | |
| trends['trend_strength'] = float(abs(slope) / time_series.mean()) | |
| return trends | |
| def _analyze_distributions(self, df): | |
| """Helper method to analyze value distributions""" | |
| distributions = {} | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| for col in numeric_cols: | |
| if df[col].nunique() > 5: # Skip columns with too few unique values | |
| # Test for normality | |
| _, p_value = stats.normaltest(df[col].dropna()) | |
| skewness = float(df[col].skew()) | |
| kurtosis = float(df[col].kurtosis()) | |
| distributions[col] = { | |
| 'distribution_type': 'normal' if p_value > 0.05 else 'non_normal', | |
| 'skewness': skewness, | |
| 'kurtosis': kurtosis | |
| } | |
| return distributions | |
| def _generate_suggested_queries(self, df): | |
| """Helper method to generate relevant query suggestions""" | |
| suggestions = [] | |
| # Add time-based queries if temporal data exists | |
| if 'date' in df.columns: | |
| suggestions.extend([ | |
| "Show the trend over time", | |
| "Compare year-over-year growth", | |
| "Find seasonal patterns" | |
| ]) | |
| # Add metric-based queries | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) > 0: | |
| suggestions.extend([ | |
| f"Analyze the distribution of {col}" for col in numeric_cols[:3] | |
| ]) | |
| # Add categorical analysis queries | |
| categorical_cols = df.select_dtypes(include=['object']).columns | |
| if len(categorical_cols) > 0: | |
| suggestions.extend([ | |
| f"Break down metrics by {col}" for col in categorical_cols[:3] | |
| ]) | |
| return suggestions | |
| def enhance_cross_industry_correlations(self, df): | |
| """ | |
| Enhanced analysis of correlations across different industries | |
| """ | |
| correlations = { | |
| 'metric_correlations': {}, | |
| 'industry_patterns': {}, | |
| 'shared_trends': {} | |
| } | |
| if 'industry' in df.columns: | |
| industries = df['industry'].unique() | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| # Calculate cross-industry metric correlations | |
| for ind1 in industries: | |
| for ind2 in industries: | |
| if ind1 < ind2: # Avoid duplicate comparisons | |
| ind1_data = df[df['industry'] == ind1][numeric_cols] | |
| ind2_data = df[df['industry'] == ind2][numeric_cols] | |
| if not ind1_data.empty and not ind2_data.empty: | |
| common_metrics = set(ind1_data.columns) & set(ind2_data.columns) | |
| for metric in common_metrics: | |
| corr, p_value = pearsonr( | |
| ind1_data[metric].fillna(0), | |
| ind2_data[metric].fillna(0) | |
| ) | |
| correlations['metric_correlations'][f"{ind1}_{ind2}_{metric}"] = { | |
| 'correlation': float(corr), | |
| 'p_value': float(p_value) | |
| } | |
| # Identify shared trends | |
| if 'date' in df.columns: | |
| for metric in numeric_cols: | |
| industry_trends = {} | |
| for industry in industries: | |
| industry_data = df[df['industry'] == industry] | |
| if not industry_data.empty: | |
| trend = industry_data.groupby('date')[metric].mean() | |
| if len(trend) > 0: | |
| industry_trends[industry] = trend.to_dict() | |
| correlations['shared_trends'][metric] = industry_trends | |
| return correlations | |
| def perform_market_basket_analysis(self, df: pd.DataFrame, min_support: float = 0.01, | |
| min_confidence: float = 0.3, min_lift: float = 1.0) -> dict: | |
| """ | |
| Perform advanced market basket analysis with support for multiple analytics dimensions. | |
| Args: | |
| df (pd.DataFrame): Input transaction data with required columns | |
| min_support (float): Minimum support threshold for frequent itemsets (default: 0.01) | |
| min_confidence (float): Minimum confidence threshold for rules (default: 0.3) | |
| min_lift (float): Minimum lift threshold for rules (default: 1.0) | |
| Returns: | |
| dict: Dictionary containing: | |
| - product_associations: Support, confidence, and lift metrics for product pairs | |
| - temporal_baskets: Time-based purchase patterns | |
| - product_clusters: Product groupings based on purchase behavior | |
| - customer_segments: Customer segments based on purchase patterns | |
| - performance_metrics: Key performance indicators | |
| Raises: | |
| ValueError: If required columns are missing or data validation fails | |
| """ | |
| try: | |
| # Validate input data | |
| required_columns = ['transaction_id', 'product_id'] | |
| if not all(col in df.columns for col in required_columns): | |
| raise ValueError(f"Missing required columns: {set(required_columns) - set(df.columns)}") | |
| if df.empty: | |
| raise ValueError("Empty dataframe provided") | |
| # Work with a copy of the dataframe | |
| df = df.copy() | |
| # Convert to basket format with optimization for large datasets | |
| baskets = (df.groupby('transaction_id')['product_id'] | |
| .agg(lambda x: frozenset(x.values)) # Using frozenset for better performance | |
| .reset_index()) | |
| total_transactions = len(baskets) | |
| # Calculate product frequencies using vectorized operations | |
| product_freq = df.groupby('product_id').size().to_dict() | |
| # Generate product pairs efficiently | |
| pairs_data = [] | |
| for products in baskets['product_id']: | |
| products_list = list(products) # Convert frozenset to list once | |
| pairs_data.extend( | |
| tuple(sorted([p1, p2])) | |
| for i, p1 in enumerate(products_list) | |
| for p2 in products_list[i+1:] | |
| ) | |
| pair_freq = pd.Series(pairs_data).value_counts().to_dict() | |
| # Calculate association metrics with validation | |
| product_associations = { | |
| 'support': {}, | |
| 'confidence': {}, | |
| 'lift': {}, | |
| 'metrics_distribution': { | |
| 'support': {'min': float('inf'), 'max': 0, 'mean': 0}, | |
| 'confidence': {'min': float('inf'), 'max': 0, 'mean': 0}, | |
| 'lift': {'min': float('inf'), 'max': 0, 'mean': 0} | |
| } | |
| } | |
| valid_rules = [] | |
| for pair, freq in pair_freq.items(): | |
| prod1, prod2 = pair | |
| support = freq / total_transactions | |
| if support >= min_support: | |
| confidence_1_2 = freq / product_freq[prod1] | |
| confidence_2_1 = freq / product_freq[prod2] | |
| max_confidence = max(confidence_1_2, confidence_2_1) | |
| if max_confidence >= min_confidence: | |
| lift = (freq * total_transactions) / (product_freq[prod1] * product_freq[prod2]) | |
| if lift >= min_lift: | |
| valid_rules.append({ | |
| 'pair': pair, | |
| 'support': support, | |
| 'confidence': max_confidence, | |
| 'lift': lift | |
| }) | |
| # Store metrics with string keys for JSON serialization | |
| pair_key = f"({prod1}, {prod2})" | |
| product_associations['support'][pair_key] = float(support) | |
| product_associations['confidence'][pair_key] = float(max_confidence) | |
| product_associations['lift'][pair_key] = float(lift) | |
| # Update metrics distribution | |
| for metric_type, value in [('support', support), | |
| ('confidence', max_confidence), | |
| ('lift', lift)]: | |
| dist = product_associations['metrics_distribution'][metric_type] | |
| dist['min'] = min(dist['min'], value) | |
| dist['max'] = max(dist['max'], value) | |
| # Calculate means for distributions | |
| for metric_type in ['support', 'confidence', 'lift']: | |
| values = [rule[metric_type] for rule in valid_rules] | |
| if values: | |
| product_associations['metrics_distribution'][metric_type]['mean'] = float(sum(values) / len(values)) | |
| else: | |
| product_associations['metrics_distribution'][metric_type] = {'min': 0, 'max': 0, 'mean': 0} | |
| # Enhanced temporal analysis | |
| temporal_patterns = self._analyze_temporal_patterns(df) if 'timestamp' in df.columns else {} | |
| # Enhanced product clustering | |
| product_clusters = self._perform_product_clustering(df) if 'quantity' in df.columns else {} | |
| # Customer segmentation | |
| customer_segments = self._analyze_customer_segments(df) if 'customer_id' in df.columns else {} | |
| # Performance metrics | |
| performance_metrics = { | |
| 'total_transactions': total_transactions, | |
| 'unique_products': len(product_freq), | |
| 'avg_basket_size': float(df.groupby('transaction_id')['product_id'].count().mean()), | |
| 'total_rules_found': len(valid_rules), | |
| 'rules_distribution': { | |
| 'strong_associations': len([r for r in valid_rules if r['lift'] > 2]), | |
| 'moderate_associations': len([r for r in valid_rules if 1 < r['lift'] <= 2]), | |
| 'weak_associations': len([r for r in valid_rules if r['lift'] <= 1]) | |
| } | |
| } | |
| return { | |
| 'product_associations': product_associations, | |
| 'temporal_baskets': temporal_patterns, | |
| 'product_clusters': product_clusters, | |
| 'customer_segments': customer_segments, | |
| 'performance_metrics': performance_metrics | |
| } | |
| except Exception as e: | |
| print(f"Error in market basket analysis: {str(e)}") | |
| raise ValueError(f"Market basket analysis failed: {str(e)}") from e | |
| def _analyze_temporal_patterns(self, df: pd.DataFrame) -> dict: | |
| """Analyze temporal patterns in purchase behavior""" | |
| patterns = { | |
| 'daily_patterns': {}, | |
| 'weekly_patterns': {}, | |
| 'monthly_patterns': {}, | |
| 'hourly_patterns': {} | |
| } | |
| try: | |
| timestamps = pd.to_datetime(df['timestamp']) | |
| for period, grouper in [ | |
| ('hourly_patterns', timestamps.dt.hour), | |
| ('daily_patterns', timestamps.dt.day), | |
| ('weekly_patterns', timestamps.dt.dayofweek), | |
| ('monthly_patterns', timestamps.dt.month) | |
| ]: | |
| pattern_data = df.groupby(grouper).agg({ | |
| 'product_id': ['count', 'nunique'], | |
| 'transaction_id': 'nunique', | |
| 'quantity': ['sum', 'mean'] if 'quantity' in df.columns else ['count'] | |
| }).round(2) | |
| patterns[period] = { | |
| 'transaction_count': pattern_data['transaction_id']['nunique'].to_dict(), | |
| 'product_count': pattern_data['product_id']['count'].to_dict(), | |
| 'unique_products': pattern_data['product_id']['nunique'].to_dict(), | |
| 'total_quantity': pattern_data['quantity']['sum'].to_dict() if 'quantity' in df.columns else {}, | |
| 'avg_quantity': pattern_data['quantity']['mean'].to_dict() if 'quantity' in df.columns else {} | |
| } | |
| except (ValueError, KeyError) as e: | |
| print(f"Error in temporal pattern analysis: {str(e)}") | |
| return patterns | |
| return patterns | |
| def _perform_product_clustering(self, df: pd.DataFrame) -> dict: | |
| """Perform advanced product clustering analysis""" | |
| try: | |
| # Create rich product features | |
| product_features = df.groupby('product_id').agg({ | |
| 'quantity': ['mean', 'std', 'sum', 'count'], | |
| 'transaction_id': 'nunique' | |
| }).fillna(0) | |
| # Feature engineering | |
| product_features['quantity_per_transaction'] = ( | |
| product_features['quantity']['sum'] / | |
| product_features['transaction_id']['nunique'] | |
| ) | |
| # Prepare features for clustering | |
| features_for_clustering = product_features.copy() | |
| features_for_clustering.columns = [f"{col[0]}_{col[1]}" if isinstance(col, tuple) else col | |
| for col in features_for_clustering.columns] | |
| if len(features_for_clustering) > 1: | |
| # Scale features | |
| scaler = StandardScaler() | |
| scaled_features = scaler.fit_transform(features_for_clustering) | |
| # Determine optimal number of clusters | |
| max_clusters = min(5, len(features_for_clustering) - 1) | |
| scores = [] | |
| for k in range(2, max_clusters + 1): | |
| kmeans = KMeans(n_clusters=k, random_state=42) | |
| clusters = kmeans.fit_predict(scaled_features) | |
| score = silhouette_score(scaled_features, clusters) | |
| scores.append((k, score)) | |
| # Use optimal number of clusters | |
| optimal_k = max(scores, key=lambda x: x[1])[0] | |
| kmeans = KMeans(n_clusters=optimal_k, random_state=42) | |
| clusters = kmeans.fit_predict(scaled_features) | |
| # Prepare cluster insights | |
| cluster_data = { | |
| 'cluster_assignments': { | |
| prod: int(cluster) for prod, cluster in zip(product_features.index, clusters) | |
| }, | |
| 'cluster_profiles': {}, | |
| 'evaluation_metrics': { | |
| 'silhouette_score': float(max(scores, key=lambda x: x[1])[1]), | |
| 'num_clusters': optimal_k | |
| } | |
| } | |
| # Generate cluster profiles | |
| for cluster_id in range(optimal_k): | |
| cluster_mask = clusters == cluster_id | |
| cluster_data['cluster_profiles'][str(cluster_id)] = { | |
| 'size': int(sum(cluster_mask)), | |
| 'avg_quantity': float(product_features['quantity']['mean'][cluster_mask].mean()), | |
| 'avg_transactions': float(product_features['transaction_id']['nunique'][cluster_mask].mean()), | |
| 'total_quantity': float(product_features['quantity']['sum'][cluster_mask].sum()), | |
| 'purchase_frequency': float( | |
| (product_features['quantity']['count'][cluster_mask].sum() / | |
| product_features['transaction_id']['nunique'][cluster_mask].sum()) | |
| ) | |
| } | |
| return cluster_data | |
| except np.linalg.LinAlgError as e: | |
| print(f"Error in clustering computation: {str(e)}") | |
| return {} | |
| except (ValueError, KeyError) as e: | |
| print(f"Error in product clustering: {str(e)}") | |
| return {} | |
| return {} | |
| def _analyze_customer_segments(self, df: pd.DataFrame) -> dict: | |
| """Analyze customer segments based on purchase behavior""" | |
| try: | |
| if 'customer_id' not in df.columns: | |
| return {} | |
| customer_stats = df.groupby('customer_id').agg({ | |
| 'transaction_id': 'nunique', | |
| 'product_id': ['nunique', 'count'], | |
| 'quantity': ['sum', 'mean'] if 'quantity' in df.columns else ['count', 'mean'] | |
| }) | |
| # Calculate RFM scores | |
| if 'timestamp' in df.columns: | |
| current_date = pd.to_datetime(df['timestamp']).max() | |
| customer_stats['recency'] = df.groupby('customer_id')['timestamp'].max().apply( | |
| lambda x: (current_date - pd.to_datetime(x)).days | |
| ) | |
| # Segment customers | |
| stats_for_clustering = customer_stats.copy() | |
| stats_for_clustering.columns = [f"{col[0]}_{col[1]}" if isinstance(col, tuple) else col | |
| for col in stats_for_clustering.columns] | |
| if len(stats_for_clustering) > 1: | |
| scaler = StandardScaler() | |
| scaled_features = scaler.fit_transform(stats_for_clustering) | |
| # Use DBSCAN for flexible cluster numbers | |
| dbscan = DBSCAN(eps=0.5, min_samples=3) | |
| clusters = dbscan.fit_predict(scaled_features) | |
| return { | |
| 'customer_segments': { | |
| str(cust): int(cluster) for cust, cluster in zip(customer_stats.index, clusters) | |
| }, | |
| 'segment_profiles': { | |
| str(segment): { | |
| 'size': int(sum(clusters == segment)), | |
| 'avg_transactions': float(customer_stats['transaction_id']['nunique'][clusters == segment].mean()), | |
| 'avg_products': float(customer_stats['product_id']['nunique'][clusters == segment].mean()) | |
| } | |
| for segment in set(clusters) if segment != -1 | |
| }, | |
| 'segment_statistics': { | |
| 'num_segments': len(set(clusters) - {-1}), | |
| 'noise_points': int(sum(clusters == -1)) | |
| } | |
| } | |
| except Exception as e: | |
| print(f"Error in customer segmentation: {str(e)}") | |
| return {} | |
| def _calculate_correlations(self, df: pd.DataFrame) -> dict: | |
| """Calculate correlations between numeric columns with detailed statistics""" | |
| correlations = {} | |
| try: | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) < 2: | |
| return correlations | |
| # Calculate correlation matrix | |
| corr_matrix = df[numeric_cols].corr() | |
| # Convert correlations to dictionary with additional metadata | |
| for col1 in numeric_cols: | |
| correlations[col1] = {} | |
| for col2 in numeric_cols: | |
| if col1 != col2: | |
| correlation = corr_matrix.loc[col1, col2] | |
| if not np.isnan(correlation): | |
| # Calculate p-value using pearsonr | |
| coef, p_value = pearsonr(df[col1].fillna(0), df[col2].fillna(0)) | |
| correlations[col1][col2] = { | |
| 'coefficient': float(correlation), | |
| 'p_value': float(p_value), | |
| 'strength': 'strong' if abs(correlation) > 0.7 | |
| else 'moderate' if abs(correlation) > 0.3 | |
| else 'weak', | |
| 'direction': 'positive' if correlation > 0 else 'negative', | |
| 'sample_size': len(df) | |
| } | |
| except Exception as e: | |
| print(f"Error calculating correlations: {str(e)}") | |
| return {} | |
| return correlations | |