import streamlit as st import pandas as pd import numpy as np import plotly.express as px import plotly.graph_objects as go from typing import Dict, List, Any, Optional import os from dotenv import load_dotenv from data_handler import * from io import BytesIO # Load environment variables load_dotenv() # Optional AI Integration try: import openai OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False try: import google.generativeai as genai GEMINI_AVAILABLE = True except ImportError: GEMINI_AVAILABLE = False class AIAssistant: """AI-powered analysis assistant""" def __init__(self): self.openai_key = os.getenv('OPENAI_API_KEY') self.gemini_key = os.getenv('GOOGLE_API_KEY') if self.gemini_key and GEMINI_AVAILABLE: genai.configure(api_key=self.gemini_key) self.gemini_model = genai.GenerativeModel('gemini-1.5-flash') def get_available_models(self) -> List[str]: """Get list of available AI models""" models = [] if self.openai_key and OPENAI_AVAILABLE: models.append("OpenAI GPT") if self.gemini_key and GEMINI_AVAILABLE: models.append("Google Gemini") return models def analyze_insights(self, df: pd.DataFrame, insights: List[Dict], model: str = "Google Gemini") -> str: """Get AI analysis of insights""" # Prepare data summary summary = f""" Dataset Summary: - Shape: {df.shape} - Columns: {list(df.columns)} - Data types: {df.dtypes.value_counts().to_dict()} Key Insights Found: """ for insight in insights: summary += f"\n- {insight['insight']}" prompt = f""" As a senior data scientist, analyze this dataset and provide: 1. Business implications of the findings 2. Potential opportunities or risks 3. Recommendations for decision-making 4. Suggestions for further analysis {summary} Provide actionable insights in a professional format. """ try: if model == "Google Gemini" and hasattr(self, 'gemini_model'): response = self.gemini_model.generate_content(prompt) return response.text elif model == "OpenAI GPT" and self.openai_key: client = openai.OpenAI(api_key=self.openai_key) response = client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content else: return "AI analysis not available. Please configure API keys." except Exception as e: return f"AI Analysis Error: {str(e)}" class DataAnalysisWorkflow: """Optimized data analysis workflow with caching and pagination""" def __init__(self, df: pd.DataFrame): self.df = df self.stats = calculate_basic_stats(df) self.column_types = get_column_types(df) self.insights = [] self.page_size = 1000 # For pagination def add_insight(self, insight: str, stage: int): """Add insight to analysis report""" self.insights.append({ 'stage': stage, 'insight': insight, 'timestamp': pd.Timestamp.now() }) def get_paginated_data(self, page: int = 0) -> pd.DataFrame: """Get paginated data for display""" start_idx = page * self.page_size end_idx = start_idx + self.page_size return self.df.iloc[start_idx:end_idx] def stage_1_overview(self): """Stage 1: Data Overview with caching""" st.subheader("๐Ÿ“Š Data Overview") # Data Quality Score quality_metrics = calculate_data_quality_score(self.df) col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Rows", f"{self.stats['shape'][0]:,}") with col2: st.metric("Columns", f"{self.stats['shape'][1]:,}") with col3: st.metric("Quality Score", f"{quality_metrics['score']:.1f}/100") with col4: st.metric("Grade", quality_metrics['grade']) if quality_metrics['issues']: st.warning("Quality Issues Found:") for issue in quality_metrics['issues']: st.write(f"โ€ข {issue}") # Memory Usage and Optimization st.subheader("Memory Analysis") memory_opt = calculate_memory_optimization(self.df) col1, col2 = st.columns(2) with col1: st.metric("Current Memory", f"{memory_opt['current_memory_mb']:.1f} MB") with col2: if memory_opt['potential_savings_mb'] > 0: st.metric("Potential Savings", f"{memory_opt['potential_savings_mb']:.1f} MB", f"{memory_opt['potential_savings_pct']:.1f}%") if st.button("Show Optimization Details"): st.dataframe(pd.DataFrame(memory_opt['suggestions'])) # Column Cardinality Analysis st.subheader("Column Cardinality Analysis") cardinality_df = calculate_column_cardinality(self.df) # Filter options col_types = cardinality_df['Type'].unique() selected_types = st.multiselect("Filter by Column Type", col_types, default=col_types) filtered_df = cardinality_df[cardinality_df['Type'].isin(selected_types)] st.dataframe(filtered_df, use_container_width=True) # Highlight important findings id_cols = filtered_df[filtered_df['Type'] == 'Unique Identifier']['Column'].tolist() if id_cols: st.info(f"๐Ÿ“Œ Potential ID columns found: {', '.join(id_cols)}") const_cols = filtered_df[filtered_df['Type'] == 'Constant']['Column'].tolist() if const_cols: st.warning(f"โš ๏ธ Constant columns found: {', '.join(const_cols)}") # Data types visualization if self.stats['dtypes']: st.subheader("Data Types Distribution") fig = px.pie(values=list(self.stats['dtypes'].values()), names=list(self.stats['dtypes'].keys()), title="Data Types") st.plotly_chart(fig, use_container_width=True) # Sample data with pagination st.subheader("Sample Data") total_pages = (len(self.df) - 1) // self.page_size + 1 if total_pages > 1: page = st.slider("Page", 0, total_pages - 1, 0) sample_data = self.get_paginated_data(page) st.write(f"Showing rows {page * self.page_size + 1} to {min((page + 1) * self.page_size, len(self.df))}") else: sample_data = self.df.head(10) st.dataframe(sample_data, use_container_width=True) # Missing values analysis missing_df = calculate_missing_data(self.df) if not missing_df.empty: st.subheader("Missing Values Analysis") st.dataframe(missing_df, use_container_width=True) worst_column = missing_df.iloc[0]['Column'] worst_percentage = missing_df.iloc[0]['Missing %'] self.add_insight(f"Column '{worst_column}' has highest missing data: {worst_percentage:.1f}%", 1) else: st.success("โœ… No missing values found!") self.add_insight("Dataset has no missing values - excellent data quality", 1) # Add insights about data quality and cardinality if quality_metrics['score'] < 80: self.add_insight(f"Data quality needs improvement (Score: {quality_metrics['score']:.1f}/100)", 1) if memory_opt['potential_savings_pct'] > 20: self.add_insight(f"Potential memory optimization of {memory_opt['potential_savings_pct']:.1f}% identified", 1) if id_cols: self.add_insight(f"Found {len(id_cols)} potential ID columns", 1) def stage_2_exploration(self): """Stage 2: Exploratory Data Analysis with caching""" st.subheader("๐Ÿ” Exploratory Data Analysis") numeric_cols = self.column_types['numeric'] categorical_cols = self.column_types['categorical'] # Numeric analysis if numeric_cols: st.subheader("Numeric Variables") selected_numeric = st.selectbox("Select numeric column:", numeric_cols) col1, col2 = st.columns(2) with col1: fig = px.histogram(self.df, x=selected_numeric, title=f"Distribution of {selected_numeric}") st.plotly_chart(fig, use_container_width=True) with col2: fig = px.box(self.df, y=selected_numeric, title=f"Box Plot of {selected_numeric}") st.plotly_chart(fig, use_container_width=True) # Statistical summary st.subheader("Statistical Summary") summary_stats = self.df[numeric_cols].describe() st.dataframe(summary_stats, use_container_width=True) # Correlation analysis if len(numeric_cols) > 1: st.subheader("Correlation Analysis") corr_matrix = calculate_correlation_matrix(self.df) if not corr_matrix.empty: fig = px.imshow(corr_matrix, text_auto=True, aspect="auto", title="Correlation Matrix") st.plotly_chart(fig, use_container_width=True) # Find highest correlation corr_values = [] for i in range(len(corr_matrix.columns)): for j in range(i+1, len(corr_matrix.columns)): corr_values.append(abs(corr_matrix.iloc[i, j])) if corr_values: max_corr = max(corr_values) self.add_insight(f"Maximum correlation coefficient: {max_corr:.3f}", 2) # Categorical analysis if categorical_cols: st.subheader("Categorical Variables") selected_categorical = st.selectbox("Select categorical column:", categorical_cols) value_counts = get_value_counts(self.df, selected_categorical) fig = px.bar(x=value_counts.index, y=value_counts.values, title=f"Top 10 {selected_categorical} Values") st.plotly_chart(fig, use_container_width=True) total_categories = self.df[selected_categorical].nunique() self.add_insight(f"Column '{selected_categorical}' has {total_categories} unique categories", 2) def stage_3_cleaning(self): """Stage 3: Data Quality Assessment""" st.subheader("๐Ÿงน Data Quality Assessment") cleaning_actions = [] cleaning_history = [] # Missing values handling if self.stats['missing_values'] > 0: st.subheader("Missing Values Treatment") missing_df = calculate_missing_data(self.df) st.dataframe(missing_df, use_container_width=True) col1, col2 = st.columns(2) with col1: selected_col = st.selectbox("Select column to handle missing values:", missing_df['Column'].tolist()) with col2: fill_method = st.selectbox("Choose fill method:", ["Drop rows", "Mean", "Median", "Mode", "Custom value"]) if st.button("Apply Missing Value Treatment"): try: if fill_method == "Drop rows": self.df = self.df.dropna(subset=[selected_col]) cleaning_history.append(f"Dropped rows with missing values in {selected_col}") else: if fill_method == "Mean": fill_value = self.df[selected_col].mean() elif fill_method == "Median": fill_value = self.df[selected_col].median() elif fill_method == "Mode": fill_value = self.df[selected_col].mode()[0] else: # Custom value fill_value = st.number_input("Enter custom value:", value=0.0) self.df[selected_col] = self.df[selected_col].fillna(fill_value) cleaning_history.append(f"Filled missing values in {selected_col} with {fill_method}") st.success("โœ… Missing values handled successfully!") except Exception as e: st.error(f"Error handling missing values: {str(e)}") # Duplicates handling if self.stats['duplicates'] > 0: st.subheader("Duplicate Rows") st.warning(f"Found {self.stats['duplicates']} duplicate rows") if st.button("Remove Duplicate Rows"): original_len = len(self.df) self.df = self.df.drop_duplicates() removed = original_len - len(self.df) cleaning_history.append(f"Removed {removed} duplicate rows") st.success(f"โœ… Removed {removed} duplicate rows") else: st.success("โœ… No duplicate rows found") # Mixed type detection and handling mixed_types = detect_mixed_types(self.df) if mixed_types: st.subheader("Mixed Data Types") mixed_df = pd.DataFrame(mixed_types) st.dataframe(mixed_df, use_container_width=True) selected_col = st.selectbox("Select column to fix data type:", [item['column'] for item in mixed_types]) fix_method = st.selectbox("Choose fix method:", ["Convert to numeric", "Convert to string"]) if st.button("Fix Data Type"): try: if fix_method == "Convert to numeric": self.df[selected_col] = pd.to_numeric(self.df[selected_col], errors='coerce') else: self.df[selected_col] = self.df[selected_col].astype(str) cleaning_history.append(f"Fixed data type for {selected_col} to {fix_method}") st.success("โœ… Data type fixed successfully!") except Exception as e: st.error(f"Error fixing data type: {str(e)}") # Outlier detection and handling numeric_cols = self.column_types['numeric'] if numeric_cols: st.subheader("Outlier Detection") selected_col = st.selectbox("Select column for outlier detection:", numeric_cols) outliers = calculate_outliers(self.df, selected_col) outlier_count = len(outliers) if outlier_count > 0: st.warning(f"Found {outlier_count} potential outliers in '{selected_col}'") st.dataframe(outliers[[selected_col]].head(100), use_container_width=True) treatment_method = st.selectbox("Choose outlier treatment method:", ["None", "Remove", "Cap at percentiles"]) if treatment_method != "None" and st.button("Apply Outlier Treatment"): try: if treatment_method == "Remove": self.df = self.df[~self.df.index.isin(outliers.index)] cleaning_history.append(f"Removed {outlier_count} outliers from {selected_col}") else: # Cap at percentiles Q1 = self.df[selected_col].quantile(0.25) Q3 = self.df[selected_col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR self.df[selected_col] = self.df[selected_col].clip(lower_bound, upper_bound) cleaning_history.append(f"Capped outliers in {selected_col} at percentiles") st.success("โœ… Outliers handled successfully!") except Exception as e: st.error(f"Error handling outliers: {str(e)}") else: st.success(f"โœ… No outliers detected in '{selected_col}'") # Cleaning History if cleaning_history: st.subheader("Cleaning Operations History") for i, operation in enumerate(cleaning_history, 1): st.write(f"{i}. {operation}") self.add_insight(f"Performed {len(cleaning_history)} data cleaning operations", 3) # Summary if cleaning_actions: st.subheader("Remaining Action Items") for i, action in enumerate(cleaning_actions, 1): st.write(f"{i}. {action}") self.add_insight(f"Identified {len(cleaning_actions)} data quality issues", 3) else: st.success("โœ… Data quality is excellent!") self.add_insight("No major data quality issues found", 3) def stage_4_analysis(self): """Stage 4: Advanced Analysis""" st.subheader("๐Ÿ”ฌ Advanced Analysis") numeric_cols = self.column_types['numeric'] categorical_cols = self.column_types['categorical'] # Relationship analysis if len(numeric_cols) >= 2: st.subheader("Variable Relationships") col1, col2 = st.columns(2) with col1: x_var = st.selectbox("X Variable:", numeric_cols) with col2: y_var = st.selectbox("Y Variable:", [col for col in numeric_cols if col != x_var]) # Sample data for performance if dataset is large sample_size = min(5000, len(self.df)) sample_df = self.df.sample(n=sample_size) if len(self.df) > sample_size else self.df fig = px.scatter(sample_df, x=x_var, y=y_var, title=f"Relationship: {x_var} vs {y_var}") st.plotly_chart(fig, use_container_width=True) correlation = self.df[x_var].corr(self.df[y_var]) st.metric("Correlation", f"{correlation:.3f}") if abs(correlation) > 0.7: strength = "Strong" elif abs(correlation) > 0.3: strength = "Moderate" else: strength = "Weak" direction = "positive" if correlation > 0 else "negative" st.write(f"**Result:** {strength} {direction} correlation") self.add_insight(f"{strength} correlation ({correlation:.3f}) between {x_var} and {y_var}", 4) # Group analysis if categorical_cols and numeric_cols: st.subheader("Group Analysis") col1, col2 = st.columns(2) with col1: group_var = st.selectbox("Group by:", categorical_cols) with col2: metric_var = st.selectbox("Analyze:", numeric_cols) group_stats = calculate_group_stats(self.df, group_var, metric_var) st.dataframe(group_stats, use_container_width=True) # Sample for visualization if too many groups unique_groups = self.df[group_var].nunique() if unique_groups <= 20: fig = px.box(self.df, x=group_var, y=metric_var, title=f"{metric_var} by {group_var}") st.plotly_chart(fig, use_container_width=True) else: st.info(f"Too many groups ({unique_groups}) for visualization. Showing statistics only.") best_group = group_stats['mean'].idxmax() best_value = group_stats.loc[best_group, 'mean'] self.add_insight(f"'{best_group}' has highest average {metric_var}: {best_value:.2f}", 4) def stage_5_summary(self): """Stage 5: Summary and Export""" st.subheader("๐Ÿ“ˆ Analysis Summary") # Key metrics col1, col2, col3 = st.columns(3) with col1: st.metric("Total Insights", len(self.insights)) with col2: quality = "High" if self.stats['missing_values'] == 0 else "Medium" st.metric("Data Quality", quality) with col3: st.metric("Analysis Complete", "โœ…") # Insights summary st.subheader("Key Insights") for i, insight in enumerate(self.insights, 1): st.write(f"{i}. **Stage {insight['stage']}:** {insight['insight']}") # Export options st.subheader("Export Results") export_format = st.selectbox("Choose export format:", ["Text Report", "Markdown Report", "Python Code", "Cleaned Data"]) if export_format == "Text Report": report = self.generate_text_report() st.download_button( label="Download Text Report", data=report, file_name="analysis_report.txt", mime="text/plain" ) elif export_format == "Markdown Report": report = self.generate_markdown_report() st.download_button( label="Download Markdown Report", data=report, file_name="analysis_report.md", mime="text/markdown" ) elif export_format == "Python Code": code = self.generate_python_code() st.code(code, language="python") st.download_button( label="Download Python Script", data=code, file_name="analysis_script.py", mime="text/plain" ) else: # Cleaned Data # Offer different export formats data_format = st.selectbox("Choose data format:", ["CSV", "Excel", "Parquet"]) if st.button("Export Data"): try: if data_format == "CSV": csv = self.df.to_csv(index=False) st.download_button( label="Download CSV", data=csv, file_name="cleaned_data.csv", mime="text/csv" ) elif data_format == "Excel": excel_buffer = BytesIO() self.df.to_excel(excel_buffer, index=False) excel_data = excel_buffer.getvalue() st.download_button( label="Download Excel", data=excel_data, file_name="cleaned_data.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) else: # Parquet parquet_buffer = BytesIO() self.df.to_parquet(parquet_buffer, index=False) parquet_data = parquet_buffer.getvalue() st.download_button( label="Download Parquet", data=parquet_data, file_name="cleaned_data.parquet", mime="application/octet-stream" ) except Exception as e: st.error(f"Error exporting data: {str(e)}") def generate_text_report(self) -> str: """Generate text analysis report""" report = f"""DATA ANALYSIS REPORT ================== Dataset Overview: - Rows: {self.stats['shape'][0]:,} - Columns: {self.stats['shape'][1]:,} - Missing Values: {self.stats['missing_values']:,} - Memory Usage: {self.stats['memory_usage']:.1f} MB Key Insights: """ for insight in self.insights: report += f"\n- Stage {insight['stage']}: {insight['insight']}" report += f"\n\nGenerated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}" return report def generate_markdown_report(self) -> str: """Generate markdown analysis report""" report = f"""# Data Analysis Report ## Dataset Overview * **Rows:** {self.stats['shape'][0]:,} * **Columns:** {self.stats['shape'][1]:,} * **Missing Values:** {self.stats['missing_values']:,} * **Memory Usage:** {self.stats['memory_usage']:.1f} MB ## Data Types ``` {pd.DataFrame(self.stats['dtypes'].items(), columns=['Type', 'Count']).to_markdown()} ``` ## Key Insights """ # Group insights by stage for stage in range(1, 6): stage_insights = [i for i in self.insights if i['stage'] == stage] if stage_insights: report += f"\n### Stage {stage}\n" for insight in stage_insights: report += f"* {insight['insight']}\n" report += f"\n\n*Generated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}*" return report def generate_python_code(self) -> str: """Generate reproducible Python code""" code = """import pandas as pd import numpy as np import plotly.express as px from typing import Dict, List, Any # Load and prepare data df = pd.read_csv('your_data.csv') # Update with your data source # Basic statistics def calculate_basic_stats(df: pd.DataFrame) -> Dict[str, Any]: return { 'shape': df.shape, 'memory_usage': float(df.memory_usage(deep=True).sum() / 1024**2), 'missing_values': int(df.isnull().sum().sum()), 'dtypes': df.dtypes.value_counts().to_dict(), 'duplicates': int(df.duplicated().sum()) } stats = calculate_basic_stats(df) print("\\nBasic Statistics:") print(f"- Shape: {stats['shape']}") print(f"- Memory Usage: {stats['memory_usage']:.1f} MB") print(f"- Missing Values: {stats['missing_values']}") print(f"- Duplicates: {stats['duplicates']}") """ # Add data cleaning operations if any were performed if hasattr(self, 'cleaning_history'): code += "\n# Data Cleaning\n" for operation in self.cleaning_history: if "missing values" in operation.lower(): code += "# Handle missing values\n" code += "df = df.fillna(method='ffill') # Update with your chosen method\n" elif "duplicate" in operation.lower(): code += "# Remove duplicates\n" code += "df = df.drop_duplicates()\n" elif "outlier" in operation.lower(): code += """# Handle outliers def remove_outliers(df: pd.DataFrame, column: str) -> pd.DataFrame: Q1 = df[column].quantile(0.25) Q3 = df[column].quantile(0.75) IQR = Q3 - Q1 return df[~((df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR)))] # Apply to numeric columns as needed numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: df = remove_outliers(df, col) """ # Add visualization code code += """ # Visualizations def plot_missing_values(df: pd.DataFrame): missing = df.isnull().sum() if missing.sum() > 0: missing = missing[missing > 0] fig = px.bar(x=missing.index, y=missing.values, title='Missing Values by Column') fig.show() def plot_correlations(df: pd.DataFrame): numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 1: corr = df[numeric_cols].corr() fig = px.imshow(corr, title='Correlation Matrix') fig.show() # Generate plots plot_missing_values(df) plot_correlations(df) """ return code