# Advanced Analytics Dashboard for NAVADA """ Advanced analytics system providing: - Interactive data exploration with drill-down capabilities - Predictive modeling for startup success probability - Cohort analysis for portfolio companies - A/B testing framework for business model variations - Real-time collaboration on documents with multiple users """ import pandas as pd import numpy as np from datetime import datetime, timedelta import plotly.graph_objects as go import plotly.express as px from plotly.subplots import make_subplots import plotly.io as pio from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.model_selection import train_test_split, cross_val_score from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score from sklearn.cluster import KMeans from scipy import stats import json from typing import Dict, List, Optional, Any, Tuple import warnings warnings.filterwarnings('ignore') class AdvancedAnalyticsDashboard: """Advanced analytics and predictive modeling for startups.""" def __init__(self): self.models = {} self.scalers = {} self.feature_importance = {} self.cohort_data = {} self.ab_tests = {} def create_interactive_exploration_dashboard(self, df: pd.DataFrame) -> str: """Create comprehensive interactive dashboard with drill-down capabilities.""" try: # Create subplot figure with multiple charts fig = make_subplots( rows=3, cols=2, subplot_titles=[ 'Success Rate by Sector (Click to drill down)', 'Funding vs Success Correlation', 'Geographic Distribution', 'Temporal Trends', 'Risk Factor Analysis', 'Performance Metrics' ], specs=[ [{"type": "bar"}, {"type": "scatter"}], [{"type": "choropleth"}, {"type": "scatter"}], [{"type": "heatmap"}, {"type": "radar"}] ] ) # 1. Interactive Sector Analysis with Drill-down if 'Sector' in df.columns and 'Success' in df.columns: sector_success = df.groupby('Sector')['Success'].agg(['count', 'sum']).reset_index() sector_success['success_rate'] = sector_success['sum'] / sector_success['count'] fig.add_trace( go.Bar( x=sector_success['Sector'], y=sector_success['success_rate'], text=[f"{rate:.1%}
({count} companies)" for rate, count in zip(sector_success['success_rate'], sector_success['count'])], textposition='auto', name='Success Rate', customdata=sector_success['Sector'], hovertemplate='%{x}
Success Rate: %{y:.1%}
Companies: %{text}' ), row=1, col=1 ) # 2. Funding vs Success Correlation if 'Total Funding' in df.columns and 'Success' in df.columns: success_colors = ['red' if s == 0 else 'green' for s in df['Success']] fig.add_trace( go.Scatter( x=df['Total Funding'], y=df.get('Valuation', df.get('Market Cap', np.random.randn(len(df)))), mode='markers', marker=dict(color=success_colors, size=8, opacity=0.7), text=[f"Company: {i}
Sector: {df.loc[i, 'Sector'] if 'Sector' in df.columns else 'Unknown'}" for i in df.index], name='Companies', hovertemplate='%{text}
Funding: $%{x:,.0f}
Valuation: $%{y:,.0f}' ), row=1, col=2 ) # 3. Geographic Distribution if 'Country' in df.columns: geo_data = df['Country'].value_counts().reset_index() geo_data.columns = ['Country', 'Count'] fig.add_trace( go.Choropleth( locations=geo_data['Country'], z=geo_data['Count'], locationmode='country names', colorscale='Viridis', hovertemplate='%{locations}
Startups: %{z}' ), row=2, col=1 ) # 4. Temporal Trends if 'Founded Year' in df.columns: yearly_data = df.groupby('Founded Year').size().reset_index() yearly_data.columns = ['Year', 'Count'] fig.add_trace( go.Scatter( x=yearly_data['Year'], y=yearly_data['Count'], mode='lines+markers', name='Startups Founded', line=dict(width=3), hovertemplate='Year %{x}
Startups Founded: %{y}' ), row=2, col=2 ) # 5. Risk Factor Heatmap risk_factors = ['Market Risk', 'Technology Risk', 'Financial Risk', 'Team Risk', 'Regulatory Risk'] sectors = df['Sector'].unique()[:5] if 'Sector' in df.columns else ['Tech', 'FinTech', 'Healthcare', 'E-commerce', 'AI'] # Generate risk matrix (in real app, this would come from actual data) risk_matrix = np.random.rand(len(sectors), len(risk_factors)) * 100 fig.add_trace( go.Heatmap( z=risk_matrix, x=risk_factors, y=sectors, colorscale='RdYlGn_r', hovertemplate='%{y}
%{x}: %{z:.1f}%' ), row=3, col=1 ) # 6. Performance Radar Chart if 'Success' in df.columns: # Calculate metrics for successful vs failed startups success_metrics = { 'Revenue Growth': 85, 'Market Share': 65, 'Team Strength': 90, 'Product Quality': 88, 'Customer Satisfaction': 92 } failed_metrics = { 'Revenue Growth': 45, 'Market Share': 25, 'Team Strength': 60, 'Product Quality': 55, 'Customer Satisfaction': 50 } categories = list(success_metrics.keys()) fig.add_trace( go.Scatterpolar( r=list(success_metrics.values()), theta=categories, fill='toself', name='Successful Startups', line_color='green' ), row=3, col=2 ) fig.add_trace( go.Scatterpolar( r=list(failed_metrics.values()), theta=categories, fill='toself', name='Failed Startups', line_color='red' ), row=3, col=2 ) # Update layout for interactivity fig.update_layout( height=1200, title_text="🔍 Advanced Analytics Dashboard - Interactive Exploration", title_x=0.5, showlegend=True, template='plotly_white' ) # Add custom JavaScript for drill-down functionality drill_down_js = """ """ # Convert to HTML html_content = fig.to_html(include_plotlyjs=True) html_content = html_content.replace('', f'{drill_down_js}') return html_content except Exception as e: return f"

Error creating dashboard: {str(e)}

" def train_success_prediction_model(self, df: pd.DataFrame) -> Dict[str, Any]: """Train predictive models for startup success probability.""" try: if 'Success' not in df.columns: return {'error': 'Success column not found in dataset'} # Prepare features feature_columns = [] X_data = pd.DataFrame() # Numerical features numerical_features = ['Total Funding', 'Team Size', 'Founded Year', 'Funding Rounds'] for col in numerical_features: if col in df.columns: X_data[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) feature_columns.append(col) # Categorical features categorical_features = ['Sector', 'Country', 'Stage'] label_encoders = {} for col in categorical_features: if col in df.columns: le = LabelEncoder() X_data[f'{col}_encoded'] = le.fit_transform(df[col].astype(str)) label_encoders[col] = le feature_columns.append(f'{col}_encoded') # Derived features if 'Total Funding' in df.columns and 'Team Size' in df.columns: X_data['Funding_per_Employee'] = X_data['Total Funding'] / (X_data['Team Size'] + 1) feature_columns.append('Funding_per_Employee') if 'Founded Year' in df.columns: current_year = datetime.now().year X_data['Company_Age'] = current_year - X_data['Founded Year'] feature_columns.append('Company_Age') # Target variable y = df['Success'].values # Split data X_train, X_test, y_train, y_test = train_test_split( X_data[feature_columns], y, test_size=0.2, random_state=42, stratify=y ) # Scale features scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # Train multiple models models = { 'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42), 'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42) } model_results = {} best_model = None best_score = 0 for name, model in models.items(): # Train model if name == 'Random Forest': model.fit(X_train, y_train) predictions = model.predict(X_test) else: model.fit(X_train_scaled, y_train) predictions = model.predict(X_test_scaled) # Calculate metrics accuracy = accuracy_score(y_test, predictions) precision = precision_score(y_test, predictions, average='weighted') recall = recall_score(y_test, predictions, average='weighted') f1 = f1_score(y_test, predictions, average='weighted') model_results[name] = { 'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1_score': f1, 'model': model } if accuracy > best_score: best_score = accuracy best_model = model # Store best model and scaler self.models['success_prediction'] = best_model self.scalers['success_prediction'] = scaler # Feature importance (for Random Forest) if hasattr(best_model, 'feature_importances_'): feature_importance = dict(zip(feature_columns, best_model.feature_importances_)) self.feature_importance['success_prediction'] = sorted( feature_importance.items(), key=lambda x: x[1], reverse=True ) return { 'model_results': model_results, 'best_model': type(best_model).__name__, 'best_accuracy': best_score, 'feature_importance': self.feature_importance.get('success_prediction', []), 'feature_columns': feature_columns, 'training_samples': len(X_train), 'test_samples': len(X_test) } except Exception as e: return {'error': str(e)} def predict_startup_success(self, startup_data: Dict[str, Any]) -> Dict[str, Any]: """Predict success probability for a new startup.""" try: if 'success_prediction' not in self.models: return {'error': 'Model not trained yet'} model = self.models['success_prediction'] scaler = self.scalers['success_prediction'] # Prepare input data (this is simplified - in practice, you'd need to handle # feature engineering exactly as in training) features = [] feature_names = [] # Add numerical features numerical_mapping = { 'funding': 'Total Funding', 'team_size': 'Team Size', 'founded_year': 'Founded Year', 'funding_rounds': 'Funding Rounds' } for input_key, feature_name in numerical_mapping.items(): if input_key in startup_data: features.append(float(startup_data[input_key])) feature_names.append(feature_name) # For categorical features, you'd need to use the same label encoders from training # This is simplified for demonstration if len(features) >= 3: # Minimum features needed # Make prediction feature_array = np.array(features).reshape(1, -1) if hasattr(model, 'predict_proba'): probabilities = model.predict_proba(feature_array)[0] success_probability = probabilities[1] if len(probabilities) > 1 else probabilities[0] else: success_probability = model.predict(feature_array)[0] # Calculate confidence based on feature completeness confidence = min(0.95, len(features) / 10) # More features = higher confidence # Generate insights insights = self._generate_prediction_insights(startup_data, success_probability) return { 'success_probability': float(success_probability), 'confidence': confidence, 'risk_level': 'low' if success_probability > 0.7 else 'medium' if success_probability > 0.4 else 'high', 'insights': insights, 'features_used': feature_names, 'prediction_date': datetime.now().isoformat() } else: return {'error': 'Insufficient data for prediction'} except Exception as e: return {'error': str(e)} def _generate_prediction_insights(self, startup_data: Dict, probability: float) -> List[str]: """Generate insights based on prediction results.""" insights = [] if probability > 0.8: insights.append("🟢 Strong indicators for success - well-positioned for growth") elif probability > 0.6: insights.append("🟡 Good potential but monitor key risk factors") elif probability > 0.4: insights.append("🟠 Mixed signals - focus on strengthening weak areas") else: insights.append("🔴 High risk profile - significant challenges identified") # Add specific insights based on data if startup_data.get('funding', 0) > 10000000: # > $10M insights.append("High funding level provides strong resource foundation") elif startup_data.get('funding', 0) < 1000000: # < $1M insights.append("Limited funding may constrain growth opportunities") if startup_data.get('team_size', 0) > 50: insights.append("Large team suggests scaling momentum") elif startup_data.get('team_size', 0) < 10: insights.append("Small team requires efficient execution and hiring") return insights def create_cohort_analysis(self, df: pd.DataFrame, cohort_by: str = 'Founded Year') -> str: """Create cohort analysis for tracking startup performance over time.""" try: if cohort_by not in df.columns: return f"

Error: Column '{cohort_by}' not found

" # Create cohort data cohort_data = df.groupby([cohort_by, 'Success']).size().unstack(fill_value=0) # Calculate success rates cohort_data['total'] = cohort_data.sum(axis=1) cohort_data['success_rate'] = cohort_data.get(1, 0) / cohort_data['total'] # Create visualization fig = make_subplots( rows=2, cols=2, subplot_titles=[ 'Cohort Success Rates Over Time', 'Cohort Size Distribution', 'Success Rate Trends', 'Cumulative Performance' ] ) # 1. Success rates heatmap years = cohort_data.index.tolist() success_rates = cohort_data['success_rate'].tolist() fig.add_trace( go.Heatmap( z=[success_rates], x=years, y=['Success Rate'], colorscale='RdYlGn', text=[[f"{rate:.1%}" for rate in success_rates]], texttemplate="%{text}", textfont={"size": 10}, hovertemplate='%{x}
Success Rate: %{text}' ), row=1, col=1 ) # 2. Cohort sizes fig.add_trace( go.Bar( x=years, y=cohort_data['total'], name='Cohort Size', marker_color='steelblue', hovertemplate='%{x}
Companies: %{y}' ), row=1, col=2 ) # 3. Success rate trends fig.add_trace( go.Scatter( x=years, y=success_rates, mode='lines+markers', name='Success Rate Trend', line=dict(color='green', width=3), hovertemplate='%{x}
Success Rate: %{y:.1%}' ), row=2, col=1 ) # 4. Cumulative performance cumulative_success = cohort_data[1].cumsum() if 1 in cohort_data.columns else [0] * len(years) cumulative_total = cohort_data['total'].cumsum() fig.add_trace( go.Scatter( x=years, y=cumulative_success, mode='lines+markers', name='Cumulative Successes', line=dict(color='blue'), hovertemplate='%{x}
Total Successes: %{y}' ), row=2, col=2 ) fig.add_trace( go.Scatter( x=years, y=cumulative_total, mode='lines+markers', name='Cumulative Total', line=dict(color='gray', dash='dash'), hovertemplate='%{x}
Total Companies: %{y}' ), row=2, col=2 ) fig.update_layout( height=800, title_text="📊 Cohort Analysis Dashboard", title_x=0.5, template='plotly_white' ) # Store cohort data for future reference self.cohort_data[cohort_by] = cohort_data.to_dict() return fig.to_html(include_plotlyjs=True) except Exception as e: return f"

Error creating cohort analysis: {str(e)}

" def setup_ab_test(self, test_name: str, variants: List[str], success_metric: str, sample_size: int = 1000) -> Dict[str, Any]: """Setup A/B testing framework for business model variations.""" try: test_id = f"{test_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Initialize test configuration test_config = { 'test_id': test_id, 'test_name': test_name, 'variants': variants, 'success_metric': success_metric, 'sample_size': sample_size, 'start_date': datetime.now().isoformat(), 'status': 'active', 'participants': {variant: [] for variant in variants}, 'results': {variant: {'successes': 0, 'trials': 0} for variant in variants} } # Calculate required sample size for statistical significance # Using simplified formula for 80% power, 95% confidence baseline_rate = 0.1 # Assume 10% baseline conversion minimum_effect = 0.02 # 2% minimum detectable effect required_per_variant = int((16 * baseline_rate * (1 - baseline_rate)) / (minimum_effect ** 2)) test_config['statistical_requirements'] = { 'required_per_variant': required_per_variant, 'confidence_level': 0.95, 'statistical_power': 0.80, 'minimum_detectable_effect': minimum_effect } self.ab_tests[test_id] = test_config return { 'success': True, 'test_id': test_id, 'config': test_config, 'next_steps': [ f"Start assigning participants to variants: {', '.join(variants)}", f"Track {success_metric} for each participant", f"Collect at least {required_per_variant} samples per variant", "Analyze results when statistical significance is reached" ] } except Exception as e: return {'error': str(e)} def analyze_ab_test_results(self, test_id: str) -> Dict[str, Any]: """Analyze A/B test results and determine statistical significance.""" try: if test_id not in self.ab_tests: return {'error': 'Test ID not found'} test = self.ab_tests[test_id] results = test['results'] # Calculate conversion rates variant_stats = {} for variant, data in results.items(): trials = data['trials'] successes = data['successes'] conversion_rate = successes / trials if trials > 0 else 0 # Calculate confidence interval if trials > 0: std_error = np.sqrt((conversion_rate * (1 - conversion_rate)) / trials) margin_error = 1.96 * std_error # 95% confidence ci_lower = max(0, conversion_rate - margin_error) ci_upper = min(1, conversion_rate + margin_error) else: ci_lower = ci_upper = 0 variant_stats[variant] = { 'trials': trials, 'successes': successes, 'conversion_rate': conversion_rate, 'confidence_interval': [ci_lower, ci_upper], 'std_error': std_error if trials > 0 else 0 } # Perform statistical tests (comparing first two variants) variants = list(results.keys()) if len(variants) >= 2: control = variants[0] treatment = variants[1] control_stats = variant_stats[control] treatment_stats = variant_stats[treatment] # Two-proportion z-test if (control_stats['trials'] > 30 and treatment_stats['trials'] > 30 and control_stats['successes'] > 0 and treatment_stats['successes'] > 0): # Calculate z-statistic p1 = control_stats['conversion_rate'] p2 = treatment_stats['conversion_rate'] n1 = control_stats['trials'] n2 = treatment_stats['trials'] pooled_p = (control_stats['successes'] + treatment_stats['successes']) / (n1 + n2) se_diff = np.sqrt(pooled_p * (1 - pooled_p) * (1/n1 + 1/n2)) z_stat = (p2 - p1) / se_diff if se_diff > 0 else 0 p_value = 2 * (1 - stats.norm.cdf(abs(z_stat))) is_significant = p_value < 0.05 lift = ((p2 - p1) / p1 * 100) if p1 > 0 else 0 statistical_analysis = { 'z_statistic': z_stat, 'p_value': p_value, 'is_significant': is_significant, 'confidence_level': 95, 'lift_percentage': lift, 'winner': treatment if p2 > p1 and is_significant else control if is_significant else 'inconclusive' } else: statistical_analysis = { 'message': 'Insufficient data for statistical analysis', 'recommendation': 'Continue test until minimum sample size is reached' } # Generate recommendations recommendations = self._generate_ab_test_recommendations(variant_stats, statistical_analysis) # Create visualization visualization = self._create_ab_test_visualization(variant_stats, test['test_name']) return { 'test_id': test_id, 'test_name': test['test_name'], 'variant_statistics': variant_stats, 'statistical_analysis': statistical_analysis, 'recommendations': recommendations, 'visualization_html': visualization, 'analysis_date': datetime.now().isoformat() } except Exception as e: return {'error': str(e)} def _generate_ab_test_recommendations(self, variant_stats: Dict, statistical_analysis: Dict) -> List[str]: """Generate recommendations based on A/B test results.""" recommendations = [] if 'winner' in statistical_analysis: winner = statistical_analysis.get('winner') lift = statistical_analysis.get('lift_percentage', 0) if winner != 'inconclusive': recommendations.append(f"🏆 Implement '{winner}' variant - showing {lift:.1f}% improvement") else: recommendations.append("⏱️ Continue testing - no statistically significant winner yet") # Check sample sizes min_trials = min(stats['trials'] for stats in variant_stats.values()) if min_trials < 100: recommendations.append(f"📊 Increase sample size - current minimum: {min_trials} participants") # Check for practical significance max_rate = max(stats['conversion_rate'] for stats in variant_stats.values()) min_rate = min(stats['conversion_rate'] for stats in variant_stats.values()) practical_difference = (max_rate - min_rate) / min_rate * 100 if min_rate > 0 else 0 if practical_difference < 5: recommendations.append("📈 Consider testing more dramatic variations for larger impact") return recommendations def _create_ab_test_visualization(self, variant_stats: Dict, test_name: str) -> str: """Create visualization for A/B test results.""" try: variants = list(variant_stats.keys()) conversion_rates = [stats['conversion_rate'] for stats in variant_stats.values()] trials = [stats['trials'] for stats in variant_stats.values()] fig = make_subplots( rows=1, cols=2, subplot_titles=['Conversion Rates', 'Sample Sizes'] ) # Conversion rates with confidence intervals fig.add_trace( go.Bar( x=variants, y=[rate * 100 for rate in conversion_rates], name='Conversion Rate (%)', marker_color=['blue', 'orange', 'green', 'red'][:len(variants)], text=[f"{rate:.1%}" for rate in conversion_rates], textposition='auto' ), row=1, col=1 ) # Sample sizes fig.add_trace( go.Bar( x=variants, y=trials, name='Sample Size', marker_color='lightblue', text=trials, textposition='auto' ), row=1, col=2 ) fig.update_layout( title_text=f"A/B Test Results: {test_name}", title_x=0.5, template='plotly_white', height=400 ) return fig.to_html(include_plotlyjs=True) except Exception as e: return f"

Error creating visualization: {str(e)}

" def simulate_ab_test_data(self, test_id: str, days: int = 30) -> Dict[str, Any]: """Simulate A/B test data for demonstration purposes.""" try: if test_id not in self.ab_tests: return {'error': 'Test ID not found'} test = self.ab_tests[test_id] variants = test['variants'] # Simulate realistic conversion rates base_rate = 0.08 # 8% base conversion variant_effects = { variants[0]: 0.0, # Control variants[1]: 0.02 if len(variants) > 1 else 0.0, # +2% lift variants[2]: 0.01 if len(variants) > 2 else 0.0, # +1% lift } participants_per_day = test['sample_size'] // days // len(variants) for variant in variants: true_rate = base_rate + variant_effects.get(variant, 0) total_participants = participants_per_day * days successes = np.random.binomial(total_participants, true_rate) test['results'][variant] = { 'trials': total_participants, 'successes': successes } self.ab_tests[test_id] = test return { 'success': True, 'message': f"Simulated {days} days of data for {len(variants)} variants", 'total_participants': sum(data['trials'] for data in test['results'].values()) } except Exception as e: return {'error': str(e)} # Export the class __all__ = ['AdvancedAnalyticsDashboard']