Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from typing import Dict, List, Any, Optional | |
| import os | |
| from dotenv import load_dotenv | |
| from data_handler import * | |
| from io import BytesIO | |
| # Load environment variables | |
| load_dotenv() | |
| # Optional AI Integration | |
| try: | |
| import openai | |
| OPENAI_AVAILABLE = True | |
| except ImportError: | |
| OPENAI_AVAILABLE = False | |
| try: | |
| import google.generativeai as genai | |
| GEMINI_AVAILABLE = True | |
| except ImportError: | |
| GEMINI_AVAILABLE = False | |
| class AIAssistant: | |
| """AI-powered analysis assistant""" | |
| def __init__(self): | |
| self.openai_key = os.getenv('OPENAI_API_KEY') | |
| self.gemini_key = os.getenv('GOOGLE_API_KEY') | |
| if self.gemini_key and GEMINI_AVAILABLE: | |
| genai.configure(api_key=self.gemini_key) | |
| self.gemini_model = genai.GenerativeModel('gemini-1.5-flash') | |
| def get_available_models(self) -> List[str]: | |
| """Get list of available AI models""" | |
| models = [] | |
| if self.openai_key and OPENAI_AVAILABLE: | |
| models.append("OpenAI GPT") | |
| if self.gemini_key and GEMINI_AVAILABLE: | |
| models.append("Google Gemini") | |
| return models | |
| def analyze_insights(self, df: pd.DataFrame, insights: List[Dict], model: str = "Google Gemini") -> str: | |
| """Get AI analysis of insights""" | |
| # Prepare data summary | |
| summary = f""" | |
| Dataset Summary: | |
| - Shape: {df.shape} | |
| - Columns: {list(df.columns)} | |
| - Data types: {df.dtypes.value_counts().to_dict()} | |
| Key Insights Found: | |
| """ | |
| for insight in insights: | |
| summary += f"\n- {insight['insight']}" | |
| prompt = f""" | |
| As a senior data scientist, analyze this dataset and provide: | |
| 1. Business implications of the findings | |
| 2. Potential opportunities or risks | |
| 3. Recommendations for decision-making | |
| 4. Suggestions for further analysis | |
| {summary} | |
| Provide actionable insights in a professional format. | |
| """ | |
| try: | |
| if model == "Google Gemini" and hasattr(self, 'gemini_model'): | |
| response = self.gemini_model.generate_content(prompt) | |
| return response.text | |
| elif model == "OpenAI GPT" and self.openai_key: | |
| client = openai.OpenAI(api_key=self.openai_key) | |
| response = client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| return response.choices[0].message.content | |
| else: | |
| return "AI analysis not available. Please configure API keys." | |
| except Exception as e: | |
| return f"AI Analysis Error: {str(e)}" | |
| class DataAnalysisWorkflow: | |
| """Optimized data analysis workflow with caching and pagination""" | |
| def __init__(self, df: pd.DataFrame): | |
| self.df = df | |
| self.stats = calculate_basic_stats(df) | |
| self.column_types = get_column_types(df) | |
| self.insights = [] | |
| self.page_size = 1000 # For pagination | |
| def add_insight(self, insight: str, stage: int): | |
| """Add insight to analysis report""" | |
| self.insights.append({ | |
| 'stage': stage, | |
| 'insight': insight, | |
| 'timestamp': pd.Timestamp.now() | |
| }) | |
| def get_paginated_data(self, page: int = 0) -> pd.DataFrame: | |
| """Get paginated data for display""" | |
| start_idx = page * self.page_size | |
| end_idx = start_idx + self.page_size | |
| return self.df.iloc[start_idx:end_idx] | |
| def stage_1_overview(self): | |
| """Stage 1: Data Overview with caching""" | |
| st.subheader("📊 Data Overview") | |
| # Data Quality Score | |
| quality_metrics = calculate_data_quality_score(self.df) | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Rows", f"{self.stats['shape'][0]:,}") | |
| with col2: | |
| st.metric("Columns", f"{self.stats['shape'][1]:,}") | |
| with col3: | |
| st.metric("Quality Score", f"{quality_metrics['score']:.1f}/100") | |
| with col4: | |
| st.metric("Grade", quality_metrics['grade']) | |
| if quality_metrics['issues']: | |
| st.warning("Quality Issues Found:") | |
| for issue in quality_metrics['issues']: | |
| st.write(f"• {issue}") | |
| # Memory Usage and Optimization | |
| st.subheader("Memory Analysis") | |
| memory_opt = calculate_memory_optimization(self.df) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("Current Memory", f"{memory_opt['current_memory_mb']:.1f} MB") | |
| with col2: | |
| if memory_opt['potential_savings_mb'] > 0: | |
| st.metric("Potential Savings", | |
| f"{memory_opt['potential_savings_mb']:.1f} MB", | |
| f"{memory_opt['potential_savings_pct']:.1f}%") | |
| if st.button("Show Optimization Details"): | |
| st.dataframe(pd.DataFrame(memory_opt['suggestions'])) | |
| # Column Cardinality Analysis | |
| st.subheader("Column Cardinality Analysis") | |
| cardinality_df = calculate_column_cardinality(self.df) | |
| # Filter options | |
| col_types = cardinality_df['Type'].unique() | |
| selected_types = st.multiselect("Filter by Column Type", | |
| col_types, | |
| default=col_types) | |
| filtered_df = cardinality_df[cardinality_df['Type'].isin(selected_types)] | |
| st.dataframe(filtered_df, use_container_width=True) | |
| # Highlight important findings | |
| id_cols = filtered_df[filtered_df['Type'] == 'Unique Identifier']['Column'].tolist() | |
| if id_cols: | |
| st.info(f"📌 Potential ID columns found: {', '.join(id_cols)}") | |
| const_cols = filtered_df[filtered_df['Type'] == 'Constant']['Column'].tolist() | |
| if const_cols: | |
| st.warning(f"⚠️ Constant columns found: {', '.join(const_cols)}") | |
| # Data types visualization | |
| if self.stats['dtypes']: | |
| st.subheader("Data Types Distribution") | |
| fig = px.pie(values=list(self.stats['dtypes'].values()), | |
| names=list(self.stats['dtypes'].keys()), | |
| title="Data Types") | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Sample data with pagination | |
| st.subheader("Sample Data") | |
| total_pages = (len(self.df) - 1) // self.page_size + 1 | |
| if total_pages > 1: | |
| page = st.slider("Page", 0, total_pages - 1, 0) | |
| sample_data = self.get_paginated_data(page) | |
| st.write(f"Showing rows {page * self.page_size + 1} to {min((page + 1) * self.page_size, len(self.df))}") | |
| else: | |
| sample_data = self.df.head(10) | |
| st.dataframe(sample_data, use_container_width=True) | |
| # Missing values analysis | |
| missing_df = calculate_missing_data(self.df) | |
| if not missing_df.empty: | |
| st.subheader("Missing Values Analysis") | |
| st.dataframe(missing_df, use_container_width=True) | |
| worst_column = missing_df.iloc[0]['Column'] | |
| worst_percentage = missing_df.iloc[0]['Missing %'] | |
| self.add_insight(f"Column '{worst_column}' has highest missing data: {worst_percentage:.1f}%", 1) | |
| else: | |
| st.success("✅ No missing values found!") | |
| self.add_insight("Dataset has no missing values - excellent data quality", 1) | |
| # Add insights about data quality and cardinality | |
| if quality_metrics['score'] < 80: | |
| self.add_insight(f"Data quality needs improvement (Score: {quality_metrics['score']:.1f}/100)", 1) | |
| if memory_opt['potential_savings_pct'] > 20: | |
| self.add_insight(f"Potential memory optimization of {memory_opt['potential_savings_pct']:.1f}% identified", 1) | |
| if id_cols: | |
| self.add_insight(f"Found {len(id_cols)} potential ID columns", 1) | |
| def stage_2_exploration(self): | |
| """Stage 2: Exploratory Data Analysis with caching""" | |
| st.subheader("🔍 Exploratory Data Analysis") | |
| numeric_cols = self.column_types['numeric'] | |
| categorical_cols = self.column_types['categorical'] | |
| # Numeric analysis | |
| if numeric_cols: | |
| st.subheader("Numeric Variables") | |
| selected_numeric = st.selectbox("Select numeric column:", numeric_cols) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| fig = px.histogram(self.df, x=selected_numeric, | |
| title=f"Distribution of {selected_numeric}") | |
| st.plotly_chart(fig, use_container_width=True) | |
| with col2: | |
| fig = px.box(self.df, y=selected_numeric, | |
| title=f"Box Plot of {selected_numeric}") | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Statistical summary | |
| st.subheader("Statistical Summary") | |
| summary_stats = self.df[numeric_cols].describe() | |
| st.dataframe(summary_stats, use_container_width=True) | |
| # Correlation analysis | |
| if len(numeric_cols) > 1: | |
| st.subheader("Correlation Analysis") | |
| corr_matrix = calculate_correlation_matrix(self.df) | |
| if not corr_matrix.empty: | |
| fig = px.imshow(corr_matrix, text_auto=True, aspect="auto", | |
| title="Correlation Matrix") | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Find highest correlation | |
| corr_values = [] | |
| for i in range(len(corr_matrix.columns)): | |
| for j in range(i+1, len(corr_matrix.columns)): | |
| corr_values.append(abs(corr_matrix.iloc[i, j])) | |
| if corr_values: | |
| max_corr = max(corr_values) | |
| self.add_insight(f"Maximum correlation coefficient: {max_corr:.3f}", 2) | |
| # Categorical analysis | |
| if categorical_cols: | |
| st.subheader("Categorical Variables") | |
| selected_categorical = st.selectbox("Select categorical column:", categorical_cols) | |
| value_counts = get_value_counts(self.df, selected_categorical) | |
| fig = px.bar(x=value_counts.index, y=value_counts.values, | |
| title=f"Top 10 {selected_categorical} Values") | |
| st.plotly_chart(fig, use_container_width=True) | |
| total_categories = self.df[selected_categorical].nunique() | |
| self.add_insight(f"Column '{selected_categorical}' has {total_categories} unique categories", 2) | |
| def stage_3_cleaning(self): | |
| """Stage 3: Data Quality Assessment""" | |
| st.subheader("🧹 Data Quality Assessment") | |
| cleaning_actions = [] | |
| cleaning_history = [] | |
| # Missing values handling | |
| if self.stats['missing_values'] > 0: | |
| st.subheader("Missing Values Treatment") | |
| missing_df = calculate_missing_data(self.df) | |
| st.dataframe(missing_df, use_container_width=True) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| selected_col = st.selectbox("Select column to handle missing values:", | |
| missing_df['Column'].tolist()) | |
| with col2: | |
| fill_method = st.selectbox("Choose fill method:", | |
| ["Drop rows", "Mean", "Median", "Mode", "Custom value"]) | |
| if st.button("Apply Missing Value Treatment"): | |
| try: | |
| if fill_method == "Drop rows": | |
| self.df = self.df.dropna(subset=[selected_col]) | |
| cleaning_history.append(f"Dropped rows with missing values in {selected_col}") | |
| else: | |
| if fill_method == "Mean": | |
| fill_value = self.df[selected_col].mean() | |
| elif fill_method == "Median": | |
| fill_value = self.df[selected_col].median() | |
| elif fill_method == "Mode": | |
| fill_value = self.df[selected_col].mode()[0] | |
| else: # Custom value | |
| fill_value = st.number_input("Enter custom value:", value=0.0) | |
| self.df[selected_col] = self.df[selected_col].fillna(fill_value) | |
| cleaning_history.append(f"Filled missing values in {selected_col} with {fill_method}") | |
| st.success("✅ Missing values handled successfully!") | |
| except Exception as e: | |
| st.error(f"Error handling missing values: {str(e)}") | |
| # Duplicates handling | |
| if self.stats['duplicates'] > 0: | |
| st.subheader("Duplicate Rows") | |
| st.warning(f"Found {self.stats['duplicates']} duplicate rows") | |
| if st.button("Remove Duplicate Rows"): | |
| original_len = len(self.df) | |
| self.df = self.df.drop_duplicates() | |
| removed = original_len - len(self.df) | |
| cleaning_history.append(f"Removed {removed} duplicate rows") | |
| st.success(f"✅ Removed {removed} duplicate rows") | |
| else: | |
| st.success("✅ No duplicate rows found") | |
| # Mixed type detection and handling | |
| mixed_types = detect_mixed_types(self.df) | |
| if mixed_types: | |
| st.subheader("Mixed Data Types") | |
| mixed_df = pd.DataFrame(mixed_types) | |
| st.dataframe(mixed_df, use_container_width=True) | |
| selected_col = st.selectbox("Select column to fix data type:", | |
| [item['column'] for item in mixed_types]) | |
| fix_method = st.selectbox("Choose fix method:", | |
| ["Convert to numeric", "Convert to string"]) | |
| if st.button("Fix Data Type"): | |
| try: | |
| if fix_method == "Convert to numeric": | |
| self.df[selected_col] = pd.to_numeric(self.df[selected_col], errors='coerce') | |
| else: | |
| self.df[selected_col] = self.df[selected_col].astype(str) | |
| cleaning_history.append(f"Fixed data type for {selected_col} to {fix_method}") | |
| st.success("✅ Data type fixed successfully!") | |
| except Exception as e: | |
| st.error(f"Error fixing data type: {str(e)}") | |
| # Outlier detection and handling | |
| numeric_cols = self.column_types['numeric'] | |
| if numeric_cols: | |
| st.subheader("Outlier Detection") | |
| selected_col = st.selectbox("Select column for outlier detection:", numeric_cols) | |
| outliers = calculate_outliers(self.df, selected_col) | |
| outlier_count = len(outliers) | |
| if outlier_count > 0: | |
| st.warning(f"Found {outlier_count} potential outliers in '{selected_col}'") | |
| st.dataframe(outliers[[selected_col]].head(100), use_container_width=True) | |
| treatment_method = st.selectbox("Choose outlier treatment method:", | |
| ["None", "Remove", "Cap at percentiles"]) | |
| if treatment_method != "None" and st.button("Apply Outlier Treatment"): | |
| try: | |
| if treatment_method == "Remove": | |
| self.df = self.df[~self.df.index.isin(outliers.index)] | |
| cleaning_history.append(f"Removed {outlier_count} outliers from {selected_col}") | |
| else: # Cap at percentiles | |
| Q1 = self.df[selected_col].quantile(0.25) | |
| Q3 = self.df[selected_col].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 1.5 * IQR | |
| upper_bound = Q3 + 1.5 * IQR | |
| self.df[selected_col] = self.df[selected_col].clip(lower_bound, upper_bound) | |
| cleaning_history.append(f"Capped outliers in {selected_col} at percentiles") | |
| st.success("✅ Outliers handled successfully!") | |
| except Exception as e: | |
| st.error(f"Error handling outliers: {str(e)}") | |
| else: | |
| st.success(f"✅ No outliers detected in '{selected_col}'") | |
| # Cleaning History | |
| if cleaning_history: | |
| st.subheader("Cleaning Operations History") | |
| for i, operation in enumerate(cleaning_history, 1): | |
| st.write(f"{i}. {operation}") | |
| self.add_insight(f"Performed {len(cleaning_history)} data cleaning operations", 3) | |
| # Summary | |
| if cleaning_actions: | |
| st.subheader("Remaining Action Items") | |
| for i, action in enumerate(cleaning_actions, 1): | |
| st.write(f"{i}. {action}") | |
| self.add_insight(f"Identified {len(cleaning_actions)} data quality issues", 3) | |
| else: | |
| st.success("✅ Data quality is excellent!") | |
| self.add_insight("No major data quality issues found", 3) | |
| def stage_4_analysis(self): | |
| """Stage 4: Advanced Analysis""" | |
| st.subheader("🔬 Advanced Analysis") | |
| numeric_cols = self.column_types['numeric'] | |
| categorical_cols = self.column_types['categorical'] | |
| # Relationship analysis | |
| if len(numeric_cols) >= 2: | |
| st.subheader("Variable Relationships") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| x_var = st.selectbox("X Variable:", numeric_cols) | |
| with col2: | |
| y_var = st.selectbox("Y Variable:", | |
| [col for col in numeric_cols if col != x_var]) | |
| # Sample data for performance if dataset is large | |
| sample_size = min(5000, len(self.df)) | |
| sample_df = self.df.sample(n=sample_size) if len(self.df) > sample_size else self.df | |
| fig = px.scatter(sample_df, x=x_var, y=y_var, | |
| title=f"Relationship: {x_var} vs {y_var}") | |
| st.plotly_chart(fig, use_container_width=True) | |
| correlation = self.df[x_var].corr(self.df[y_var]) | |
| st.metric("Correlation", f"{correlation:.3f}") | |
| if abs(correlation) > 0.7: | |
| strength = "Strong" | |
| elif abs(correlation) > 0.3: | |
| strength = "Moderate" | |
| else: | |
| strength = "Weak" | |
| direction = "positive" if correlation > 0 else "negative" | |
| st.write(f"**Result:** {strength} {direction} correlation") | |
| self.add_insight(f"{strength} correlation ({correlation:.3f}) between {x_var} and {y_var}", 4) | |
| # Group analysis | |
| if categorical_cols and numeric_cols: | |
| st.subheader("Group Analysis") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| group_var = st.selectbox("Group by:", categorical_cols) | |
| with col2: | |
| metric_var = st.selectbox("Analyze:", numeric_cols) | |
| group_stats = calculate_group_stats(self.df, group_var, metric_var) | |
| st.dataframe(group_stats, use_container_width=True) | |
| # Sample for visualization if too many groups | |
| unique_groups = self.df[group_var].nunique() | |
| if unique_groups <= 20: | |
| fig = px.box(self.df, x=group_var, y=metric_var, | |
| title=f"{metric_var} by {group_var}") | |
| st.plotly_chart(fig, use_container_width=True) | |
| else: | |
| st.info(f"Too many groups ({unique_groups}) for visualization. Showing statistics only.") | |
| best_group = group_stats['mean'].idxmax() | |
| best_value = group_stats.loc[best_group, 'mean'] | |
| self.add_insight(f"'{best_group}' has highest average {metric_var}: {best_value:.2f}", 4) | |
| def stage_5_summary(self): | |
| """Stage 5: Summary and Export""" | |
| st.subheader("📈 Analysis Summary") | |
| # Key metrics | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Total Insights", len(self.insights)) | |
| with col2: | |
| quality = "High" if self.stats['missing_values'] == 0 else "Medium" | |
| st.metric("Data Quality", quality) | |
| with col3: | |
| st.metric("Analysis Complete", "✅") | |
| # Insights summary | |
| st.subheader("Key Insights") | |
| for i, insight in enumerate(self.insights, 1): | |
| st.write(f"{i}. **Stage {insight['stage']}:** {insight['insight']}") | |
| # Export options | |
| st.subheader("Export Results") | |
| export_format = st.selectbox("Choose export format:", | |
| ["Text Report", "Markdown Report", "Python Code", "Cleaned Data"]) | |
| if export_format == "Text Report": | |
| report = self.generate_text_report() | |
| st.download_button( | |
| label="Download Text Report", | |
| data=report, | |
| file_name="analysis_report.txt", | |
| mime="text/plain" | |
| ) | |
| elif export_format == "Markdown Report": | |
| report = self.generate_markdown_report() | |
| st.download_button( | |
| label="Download Markdown Report", | |
| data=report, | |
| file_name="analysis_report.md", | |
| mime="text/markdown" | |
| ) | |
| elif export_format == "Python Code": | |
| code = self.generate_python_code() | |
| st.code(code, language="python") | |
| st.download_button( | |
| label="Download Python Script", | |
| data=code, | |
| file_name="analysis_script.py", | |
| mime="text/plain" | |
| ) | |
| else: # Cleaned Data | |
| # Offer different export formats | |
| data_format = st.selectbox("Choose data format:", | |
| ["CSV", "Excel", "Parquet"]) | |
| if st.button("Export Data"): | |
| try: | |
| if data_format == "CSV": | |
| csv = self.df.to_csv(index=False) | |
| st.download_button( | |
| label="Download CSV", | |
| data=csv, | |
| file_name="cleaned_data.csv", | |
| mime="text/csv" | |
| ) | |
| elif data_format == "Excel": | |
| excel_buffer = BytesIO() | |
| self.df.to_excel(excel_buffer, index=False) | |
| excel_data = excel_buffer.getvalue() | |
| st.download_button( | |
| label="Download Excel", | |
| data=excel_data, | |
| file_name="cleaned_data.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| else: # Parquet | |
| parquet_buffer = BytesIO() | |
| self.df.to_parquet(parquet_buffer, index=False) | |
| parquet_data = parquet_buffer.getvalue() | |
| st.download_button( | |
| label="Download Parquet", | |
| data=parquet_data, | |
| file_name="cleaned_data.parquet", | |
| mime="application/octet-stream" | |
| ) | |
| except Exception as e: | |
| st.error(f"Error exporting data: {str(e)}") | |
| def generate_text_report(self) -> str: | |
| """Generate text analysis report""" | |
| report = f"""DATA ANALYSIS REPORT | |
| ================== | |
| Dataset Overview: | |
| - Rows: {self.stats['shape'][0]:,} | |
| - Columns: {self.stats['shape'][1]:,} | |
| - Missing Values: {self.stats['missing_values']:,} | |
| - Memory Usage: {self.stats['memory_usage']:.1f} MB | |
| Key Insights: | |
| """ | |
| for insight in self.insights: | |
| report += f"\n- Stage {insight['stage']}: {insight['insight']}" | |
| report += f"\n\nGenerated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}" | |
| return report | |
| def generate_markdown_report(self) -> str: | |
| """Generate markdown analysis report""" | |
| report = f"""# Data Analysis Report | |
| ## Dataset Overview | |
| * **Rows:** {self.stats['shape'][0]:,} | |
| * **Columns:** {self.stats['shape'][1]:,} | |
| * **Missing Values:** {self.stats['missing_values']:,} | |
| * **Memory Usage:** {self.stats['memory_usage']:.1f} MB | |
| ## Data Types | |
| ``` | |
| {pd.DataFrame(self.stats['dtypes'].items(), columns=['Type', 'Count']).to_markdown()} | |
| ``` | |
| ## Key Insights | |
| """ | |
| # Group insights by stage | |
| for stage in range(1, 6): | |
| stage_insights = [i for i in self.insights if i['stage'] == stage] | |
| if stage_insights: | |
| report += f"\n### Stage {stage}\n" | |
| for insight in stage_insights: | |
| report += f"* {insight['insight']}\n" | |
| report += f"\n\n*Generated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}*" | |
| return report | |
| def generate_python_code(self) -> str: | |
| """Generate reproducible Python code""" | |
| code = """import pandas as pd | |
| import numpy as np | |
| import plotly.express as px | |
| from typing import Dict, List, Any | |
| # Load and prepare data | |
| df = pd.read_csv('your_data.csv') # Update with your data source | |
| # Basic statistics | |
| def calculate_basic_stats(df: pd.DataFrame) -> Dict[str, Any]: | |
| return { | |
| 'shape': df.shape, | |
| 'memory_usage': float(df.memory_usage(deep=True).sum() / 1024**2), | |
| 'missing_values': int(df.isnull().sum().sum()), | |
| 'dtypes': df.dtypes.value_counts().to_dict(), | |
| 'duplicates': int(df.duplicated().sum()) | |
| } | |
| stats = calculate_basic_stats(df) | |
| print("\\nBasic Statistics:") | |
| print(f"- Shape: {stats['shape']}") | |
| print(f"- Memory Usage: {stats['memory_usage']:.1f} MB") | |
| print(f"- Missing Values: {stats['missing_values']}") | |
| print(f"- Duplicates: {stats['duplicates']}") | |
| """ | |
| # Add data cleaning operations if any were performed | |
| if hasattr(self, 'cleaning_history'): | |
| code += "\n# Data Cleaning\n" | |
| for operation in self.cleaning_history: | |
| if "missing values" in operation.lower(): | |
| code += "# Handle missing values\n" | |
| code += "df = df.fillna(method='ffill') # Update with your chosen method\n" | |
| elif "duplicate" in operation.lower(): | |
| code += "# Remove duplicates\n" | |
| code += "df = df.drop_duplicates()\n" | |
| elif "outlier" in operation.lower(): | |
| code += """# Handle outliers | |
| def remove_outliers(df: pd.DataFrame, column: str) -> pd.DataFrame: | |
| Q1 = df[column].quantile(0.25) | |
| Q3 = df[column].quantile(0.75) | |
| IQR = Q3 - Q1 | |
| return df[~((df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR)))] | |
| # Apply to numeric columns as needed | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| for col in numeric_cols: | |
| df = remove_outliers(df, col) | |
| """ | |
| # Add visualization code | |
| code += """ | |
| # Visualizations | |
| def plot_missing_values(df: pd.DataFrame): | |
| missing = df.isnull().sum() | |
| if missing.sum() > 0: | |
| missing = missing[missing > 0] | |
| fig = px.bar(x=missing.index, y=missing.values, | |
| title='Missing Values by Column') | |
| fig.show() | |
| def plot_correlations(df: pd.DataFrame): | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) > 1: | |
| corr = df[numeric_cols].corr() | |
| fig = px.imshow(corr, title='Correlation Matrix') | |
| fig.show() | |
| # Generate plots | |
| plot_missing_values(df) | |
| plot_correlations(df) | |
| """ | |
| return code |