Spaces:
Building
Building
| """ | |
| Corpus Data Visualizer Module | |
| This module provides functionality for merging, filtering, and visualizing corpus data. | |
| Supports merging metadata with results files, applying filters, and generating | |
| visualizations such as box plots and scatter plots. | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from typing import Dict, List, Tuple, Optional, Union, Any | |
| import logging | |
| import re | |
| from io import StringIO | |
| import natsort | |
| import csv | |
| from scipy import stats | |
| from scipy.stats import f_oneway | |
| import warnings | |
| logger = logging.getLogger(__name__) | |
| class CorpusVisualizer: | |
| """ | |
| A class for merging, filtering, and visualizing corpus data. | |
| Supports: | |
| - Merging two dataframes (metadata and results) | |
| - Detecting potential join columns | |
| - Filtering merged data | |
| - Creating visualizations (box plots, scatter plots) | |
| - Validating merge quality | |
| - Exporting merged data | |
| """ | |
| def __init__(self, file_size_limit_mb: int = 300): | |
| """ | |
| Initialize the corpus visualizer. | |
| Args: | |
| file_size_limit_mb: Maximum file size limit in MB for uploads | |
| """ | |
| self.metadata_df = None | |
| self.results_df = None | |
| self.merged_df = None | |
| self.file_size_limit = file_size_limit_mb * 1024 * 1024 | |
| self.merge_stats = None | |
| self.filters = [] | |
| self.category_orders = {} # Store custom category orders | |
| def detect_file_format(self, content: Union[str, bytes]) -> Dict[str, any]: | |
| """ | |
| Detect file format and separator. | |
| Args: | |
| content: File content as string or bytes | |
| Returns: | |
| Dict with format information | |
| """ | |
| if isinstance(content, bytes): | |
| content = content.decode('utf-8') | |
| # Check file size | |
| if len(content.encode('utf-8')) > self.file_size_limit: | |
| raise ValueError(f"File too large. Maximum size is {self.file_size_limit // (1024*1024)}MB") | |
| # Detect separator by checking first few lines | |
| lines = content.strip().split('\n')[:5] | |
| separators = ['\t', ',', ';', '|'] | |
| best_sep = '\t' | |
| max_columns = 0 | |
| for sep in separators: | |
| avg_cols = np.mean([len(line.split(sep)) for line in lines]) | |
| if avg_cols > max_columns: | |
| max_columns = avg_cols | |
| best_sep = sep | |
| return { | |
| 'separator': best_sep, | |
| 'has_header': True, | |
| 'estimated_columns': int(max_columns), | |
| 'sample_lines': lines[:3] | |
| } | |
| def load_dataframe(self, content: Union[str, bytes], file_type: str) -> pd.DataFrame: | |
| """ | |
| Load content into a pandas DataFrame. | |
| Args: | |
| content: File content as string or bytes | |
| file_type: Type of file ('metadata' or 'results') | |
| Returns: | |
| pd.DataFrame: Loaded dataframe | |
| """ | |
| if isinstance(content, bytes): | |
| content = content.decode('utf-8') | |
| # Detect file format | |
| format_info = self.detect_file_format(content) | |
| separator = format_info['separator'] | |
| # Load into DataFrame | |
| df = pd.read_csv(StringIO(content), sep=separator, | |
| quoting=csv.QUOTE_MINIMAL, quotechar='"') | |
| # Store the dataframe | |
| if file_type == 'metadata': | |
| self.metadata_df = df | |
| else: # results | |
| self.results_df = df | |
| return df | |
| def detect_column_types(self, df: pd.DataFrame) -> Dict[str, List[str]]: | |
| """ | |
| Detect and categorize columns by data type. | |
| Args: | |
| df: DataFrame to analyze | |
| Returns: | |
| Dict with categorized column lists | |
| """ | |
| numeric_cols = [] | |
| categorical_cols = [] | |
| id_cols = [] | |
| text_cols = [] | |
| for col in df.columns: | |
| col_str = str(col).lower() | |
| # Check for ID columns first | |
| if any(id_pattern in col_str for id_pattern in ['id', 'filename', 'file']): | |
| id_cols.append(col) | |
| elif pd.api.types.is_numeric_dtype(df[col]): | |
| numeric_cols.append(col) | |
| elif df[col].dtype == 'object': | |
| unique_ratio = df[col].nunique() / len(df) | |
| if unique_ratio < 0.2: | |
| categorical_cols.append(col) | |
| else: | |
| text_cols.append(col) | |
| return { | |
| 'numeric_columns': numeric_cols, | |
| 'categorical_columns': categorical_cols, | |
| 'id_columns': id_cols, | |
| 'text_columns': text_cols | |
| } | |
| def detect_join_columns(self) -> List[Dict[str, Any]]: | |
| """ | |
| Detect potential join columns between metadata and results dataframes. | |
| Returns: | |
| List of potential join column pairs with quality scores | |
| """ | |
| if self.metadata_df is None or self.results_df is None: | |
| raise ValueError("Both metadata and results dataframes must be loaded") | |
| metadata_cols = self.detect_column_types(self.metadata_df) | |
| results_cols = self.detect_column_types(self.results_df) | |
| potential_joins = [] | |
| # Check ID columns first | |
| for meta_col in metadata_cols['id_columns']: | |
| for results_col in results_cols['id_columns']: | |
| potential_joins.append({ | |
| 'metadata_column': meta_col, | |
| 'results_column': results_col, | |
| 'match_percent': 75.0, # Simplified for now | |
| 'recommendation': True | |
| }) | |
| return potential_joins | |
| def merge_dataframes(self, metadata_column: str, results_column: str, | |
| handle_extensions: bool = True) -> pd.DataFrame: | |
| """ | |
| Merge metadata and results dataframes. | |
| Args: | |
| metadata_column: Join column in metadata dataframe | |
| results_column: Join column in results dataframe | |
| handle_extensions: Whether to handle file extensions in joins | |
| Returns: | |
| pd.DataFrame: Merged dataframe | |
| """ | |
| if self.metadata_df is None or self.results_df is None: | |
| raise ValueError("Both dataframes must be loaded before merging") | |
| # Make copies to avoid modifying original dataframes | |
| metadata = self.metadata_df.copy() | |
| results = self.results_df.copy() | |
| if handle_extensions: | |
| # Create cleaned join keys by stripping extensions and whitespace | |
| def clean_join_key(filename): | |
| """Clean filename for joining: strip whitespace and remove file extensions.""" | |
| clean = str(filename).strip() # Remove leading/trailing whitespace | |
| clean = re.sub(r'\.[^.]*$', '', clean) # Remove any file extension | |
| return clean | |
| # Apply cleaning to both columns | |
| metadata['_join_key'] = metadata[metadata_column].apply(clean_join_key) | |
| results['_join_key'] = results[results_column].apply(clean_join_key) | |
| # Perform merge on cleaned keys | |
| merged = pd.merge( | |
| metadata, | |
| results, | |
| left_on='_join_key', | |
| right_on='_join_key', | |
| how='left', | |
| suffixes=('_meta', '_results') | |
| ) | |
| # Drop the temporary join key columns | |
| merged = merged.drop(columns=['_join_key']) | |
| else: | |
| # Direct merge without extension handling | |
| merged = pd.merge( | |
| metadata, | |
| results, | |
| left_on=metadata_column, | |
| right_on=results_column, | |
| how='left', | |
| suffixes=('_meta', '_results') | |
| ) | |
| # Calculate merge statistics | |
| total_rows = len(self.metadata_df) | |
| # Count matched rows by checking if any results column has non-null values | |
| results_columns = [col for col in merged.columns if col.endswith('_results')] | |
| if results_columns: | |
| matched_rows = merged[results_columns[0]].notna().sum() | |
| else: | |
| matched_rows = len(merged.dropna()) | |
| self.merge_stats = { | |
| 'total_rows': total_rows, | |
| 'matched_rows': matched_rows, | |
| 'match_percent': (matched_rows / total_rows * 100) if total_rows > 0 else 0, | |
| 'metadata_column': metadata_column, | |
| 'results_column': results_column, | |
| 'handle_extensions': handle_extensions | |
| } | |
| self.merged_df = merged | |
| return merged | |
| def validate_merge(self) -> Dict[str, Any]: | |
| """ | |
| Validate merge quality and generate statistics. | |
| Returns: | |
| Dict with merge quality statistics | |
| """ | |
| if self.merge_stats is None: | |
| raise ValueError("Must perform merge before validation") | |
| validation = self.merge_stats.copy() | |
| validation['quality_score'] = validation['match_percent'] | |
| validation['quality_assessment'] = "Good" if validation['match_percent'] > 75 else "Fair" | |
| return validation | |
| def filter_dataframe(self, filters: List[Dict[str, Any]]) -> pd.DataFrame: | |
| """ | |
| Apply filters to the merged dataframe. | |
| Args: | |
| filters: List of filter dictionaries | |
| Returns: | |
| pd.DataFrame: Filtered dataframe | |
| """ | |
| if self.merged_df is None: | |
| raise ValueError("Must perform merge before filtering") | |
| filtered_df = self.merged_df.copy() | |
| for filter_item in filters: | |
| column = filter_item.get('column') | |
| operator = filter_item.get('operator') | |
| value = filter_item.get('value') | |
| if column and operator and value is not None and column in filtered_df.columns: | |
| if operator == '=': | |
| filtered_df = filtered_df[filtered_df[column] == value] | |
| elif operator == 'in': | |
| filtered_df = filtered_df[filtered_df[column].isin(value)] | |
| return filtered_df | |
| def get_smart_category_order(self, column: str, values: List[str]) -> List[str]: | |
| """ | |
| Generate smart ordering for categorical values. | |
| Args: | |
| column: Column name | |
| values: List of unique values in the column | |
| Returns: | |
| List of values in smart order | |
| """ | |
| # Convert to strings and remove None/NaN values | |
| clean_values = [str(v) for v in values if pd.notna(v) and str(v).strip()] | |
| if not clean_values: | |
| return values | |
| # Check for common patterns that should use natural sorting | |
| patterns = [ | |
| r'year\d+', # year1, year2, year11 | |
| r'level\d+', # level1, level2, level10 | |
| r'grade\d+', # grade1, grade2, grade12 | |
| r'week\d+', # week1, week2, week52 | |
| r'day\d+', # day1, day2, day365 | |
| r'session\d+', # session1, session2, session10 | |
| r'group\d+', # group1, group2, group15 | |
| r'class\d+', # class1, class2, class20 | |
| r'stage\d+', # stage1, stage2, stage5 | |
| r'phase\d+', # phase1, phase2, phase3 | |
| r'\w+\d+', # Any word followed by numbers | |
| ] | |
| # Check if values match any pattern | |
| for pattern in patterns: | |
| if all(re.match(pattern, str(val).lower()) for val in clean_values): | |
| # Use natural sorting for numeric patterns | |
| try: | |
| return natsort.natsorted(clean_values, key=str.lower) | |
| except: | |
| break | |
| # Check if all values are numeric (as strings) | |
| try: | |
| numeric_values = [float(v) for v in clean_values] | |
| return [str(v) for v in sorted(set(numeric_values))] | |
| except (ValueError, TypeError): | |
| pass | |
| # Default to alphabetical sorting | |
| return sorted(clean_values, key=str.lower) | |
| def set_category_order(self, column: str, order: List[str]) -> None: | |
| """ | |
| Set custom ordering for a categorical column. | |
| Args: | |
| column: Column name | |
| order: List of category values in desired order | |
| """ | |
| self.category_orders[column] = order | |
| def get_category_order(self, column: str, df: Optional[pd.DataFrame] = None) -> List[str]: | |
| """ | |
| Get category order for a column. | |
| Args: | |
| column: Column name | |
| df: DataFrame to get values from (defaults to merged_df) | |
| Returns: | |
| List of category values in order | |
| """ | |
| # Use provided dataframe or default to merged_df | |
| if df is None: | |
| df = self.merged_df | |
| if df is None: | |
| raise ValueError("No dataframe available") | |
| # Return custom order if set | |
| if column in self.category_orders: | |
| return self.category_orders[column] | |
| # Get unique values from the column | |
| unique_values = df[column].dropna().unique().tolist() | |
| # Generate smart order | |
| return self.get_smart_category_order(column, unique_values) | |
| def reset_category_order(self, column: str) -> None: | |
| """ | |
| Reset category order for a column to default smart ordering. | |
| Args: | |
| column: Column name | |
| """ | |
| if column in self.category_orders: | |
| del self.category_orders[column] | |
| def create_boxplot(self, x_column: str, y_column: str, color_column: Optional[str] = None, | |
| title: Optional[str] = None, height: int = 600, | |
| category_orders: Optional[Dict[str, List[str]]] = None) -> Tuple[go.Figure, Optional[Dict[str, Any]]]: | |
| """ | |
| Create a box plot visualization using Plotly with statistical analysis. | |
| Args: | |
| x_column: Categorical column for x-axis | |
| y_column: Numeric column for y-axis | |
| color_column: Optional column for color grouping | |
| title: Plot title | |
| height: Plot height | |
| category_orders: Optional custom category orders | |
| Returns: | |
| Tuple of (Plotly figure object, Statistical results dict) | |
| """ | |
| if self.merged_df is None: | |
| raise ValueError("Must perform merge before creating visualizations") | |
| plot_df = self.merged_df | |
| # Build category orders dict | |
| plot_category_orders = {} | |
| # Add x-axis category order | |
| if category_orders and x_column in category_orders: | |
| plot_category_orders[x_column] = category_orders[x_column] | |
| else: | |
| plot_category_orders[x_column] = self.get_category_order(x_column, plot_df) | |
| # Add color column category order if specified | |
| if color_column: | |
| if category_orders and color_column in category_orders: | |
| plot_category_orders[color_column] = category_orders[color_column] | |
| else: | |
| plot_category_orders[color_column] = self.get_category_order(color_column, plot_df) | |
| # Create the plot | |
| if color_column: | |
| fig = px.box(plot_df, x=x_column, y=y_column, color=color_column, | |
| title=title or f"Box Plot: {y_column} by {x_column}", height=height, | |
| category_orders=plot_category_orders) | |
| else: | |
| fig = px.box(plot_df, x=x_column, y=y_column, | |
| title=title or f"Box Plot: {y_column} by {x_column}", height=height, | |
| category_orders=plot_category_orders) | |
| fig.update_layout(template="plotly_white") | |
| # Perform statistical analysis | |
| stats_results = None | |
| try: | |
| if color_column: | |
| # Two-way ANOVA | |
| stats_results = self.perform_two_way_anova(plot_df, x_column, y_column, color_column) | |
| else: | |
| # One-way ANOVA | |
| stats_results = self.perform_one_way_anova(plot_df, x_column, y_column) | |
| except Exception as e: | |
| stats_results = {"error": f"Statistical analysis failed: {str(e)}"} | |
| return fig, stats_results | |
| def create_scatterplot(self, x_column: str, y_column: str, color_column: Optional[str] = None, | |
| title: Optional[str] = None, height: int = 600, | |
| category_orders: Optional[Dict[str, List[str]]] = None, | |
| add_trendline: bool = True, add_confidence_interval: bool = True) -> Tuple[go.Figure, Optional[Dict[str, Any]]]: | |
| """ | |
| Create a scatter plot visualization using Plotly with statistical analysis. | |
| Args: | |
| x_column: Numeric column for x-axis | |
| y_column: Numeric column for y-axis | |
| color_column: Optional column for color coding points | |
| title: Plot title | |
| height: Plot height | |
| category_orders: Optional custom category orders | |
| add_trendline: Whether to add regression line (default True) | |
| add_confidence_interval: Whether to add confidence interval around trendline (default True) | |
| Returns: | |
| Tuple of (Plotly figure object, Statistical results dict) | |
| """ | |
| if self.merged_df is None: | |
| raise ValueError("Must perform merge before creating visualizations") | |
| plot_df = self.merged_df | |
| # Build category orders dict (for color column if categorical) | |
| plot_category_orders = {} | |
| # Add color column category order if specified and categorical | |
| if color_column: | |
| column_types = self.detect_column_types(plot_df) | |
| if color_column in column_types.get('categorical_columns', []): | |
| if category_orders and color_column in category_orders: | |
| plot_category_orders[color_column] = category_orders[color_column] | |
| else: | |
| plot_category_orders[color_column] = self.get_category_order(color_column, plot_df) | |
| # Create the base scatter plot | |
| if color_column: | |
| fig = px.scatter(plot_df, x=x_column, y=y_column, color=color_column, | |
| title=title or f"Scatter Plot: {y_column} vs {x_column}", height=height, | |
| category_orders=plot_category_orders if plot_category_orders else None) | |
| else: | |
| fig = px.scatter(plot_df, x=x_column, y=y_column, | |
| title=title or f"Scatter Plot: {y_column} vs {x_column}", height=height) | |
| # Perform statistical analysis | |
| stats_results = None | |
| try: | |
| stats_results = self.perform_simple_regression(plot_df, x_column, y_column) | |
| # Add trendline and confidence interval if requested and regression successful | |
| if add_trendline and 'error' not in stats_results: | |
| clean_df = plot_df[[x_column, y_column]].dropna() | |
| x_vals = clean_df[x_column].values | |
| y_vals = clean_df[y_column].values | |
| # Get regression parameters | |
| slope = stats_results['regression']['slope'] | |
| intercept = stats_results['regression']['intercept'] | |
| # Create more detailed x range for smooth curves | |
| x_min, x_max = x_vals.min(), x_vals.max() | |
| x_range = np.linspace(x_min, x_max, 100) | |
| y_range = slope * x_range + intercept | |
| # Calculate confidence intervals if requested | |
| if add_confidence_interval: | |
| n = len(x_vals) | |
| mean_x = np.mean(x_vals) | |
| ss_x = np.sum((x_vals - mean_x) ** 2) | |
| mse = np.sum((y_vals - (slope * x_vals + intercept)) ** 2) / (n - 2) | |
| # Standard error for each prediction point | |
| se_y = np.sqrt(mse * (1/n + (x_range - mean_x)**2 / ss_x)) | |
| # 95% confidence interval (t-distribution for small samples) | |
| from scipy.stats import t | |
| t_val = t.ppf(0.975, n - 2) # 95% confidence | |
| y_upper = y_range + t_val * se_y | |
| y_lower = y_range - t_val * se_y | |
| # Add confidence interval as filled area | |
| fig.add_trace(go.Scatter( | |
| x=np.concatenate([x_range, x_range[::-1]]), | |
| y=np.concatenate([y_upper, y_lower[::-1]]), | |
| fill='toself', | |
| fillcolor='rgba(255, 0, 0, 0.2)', | |
| line=dict(color='rgba(255,255,255,0)'), | |
| hoverinfo="skip", | |
| showlegend=True, | |
| name='95% Confidence Interval' | |
| )) | |
| # Add trendline to the plot | |
| fig.add_trace(go.Scatter( | |
| x=x_range, | |
| y=y_range, | |
| mode='lines', | |
| name=f'Trendline (R² = {stats_results["regression"]["r_squared"]:.3f})', | |
| line=dict(color='red', dash='dash', width=2) | |
| )) | |
| except Exception as e: | |
| stats_results = {"error": f"Statistical analysis failed: {str(e)}"} | |
| fig.update_layout(template="plotly_white") | |
| return fig, stats_results | |
| def export_merged_data(self) -> pd.DataFrame: | |
| """ | |
| Export merged dataframe. | |
| Returns: | |
| pd.DataFrame: DataFrame ready for export | |
| """ | |
| if self.merged_df is None: | |
| raise ValueError("Must perform merge before exporting") | |
| return self.merged_df | |
| # Statistical Analysis Methods | |
| def calculate_eta_squared(self, ss_between: float, ss_total: float) -> float: | |
| """ | |
| Calculate eta-squared effect size for ANOVA. | |
| Args: | |
| ss_between: Sum of squares between groups | |
| ss_total: Total sum of squares | |
| Returns: | |
| float: Eta-squared value | |
| """ | |
| if ss_total == 0: | |
| return 0.0 | |
| return ss_between / ss_total | |
| def calculate_partial_eta_squared(self, ss_effect: float, ss_error: float) -> float: | |
| """ | |
| Calculate partial eta-squared effect size for factorial ANOVA. | |
| Args: | |
| ss_effect: Sum of squares for the effect | |
| ss_error: Sum of squares for error | |
| Returns: | |
| float: Partial eta-squared value | |
| """ | |
| if (ss_effect + ss_error) == 0: | |
| return 0.0 | |
| return ss_effect / (ss_effect + ss_error) | |
| def calculate_cohens_d(self, group1: np.ndarray, group2: np.ndarray) -> float: | |
| """ | |
| Calculate Cohen's d effect size for two groups. | |
| Args: | |
| group1: Data for first group | |
| group2: Data for second group | |
| Returns: | |
| float: Cohen's d value | |
| """ | |
| n1, n2 = len(group1), len(group2) | |
| if n1 < 2 or n2 < 2: | |
| return 0.0 | |
| # Calculate pooled standard deviation | |
| pooled_std = np.sqrt(((n1 - 1) * np.var(group1, ddof=1) + | |
| (n2 - 1) * np.var(group2, ddof=1)) / (n1 + n2 - 2)) | |
| if pooled_std == 0: | |
| return 0.0 | |
| return (np.mean(group1) - np.mean(group2)) / pooled_std | |
| def calculate_cohens_f_squared(self, r_squared: float) -> float: | |
| """ | |
| Calculate Cohen's f² effect size for regression. | |
| Args: | |
| r_squared: R-squared value from regression | |
| Returns: | |
| float: Cohen's f² value | |
| """ | |
| if r_squared >= 1.0 or r_squared < 0: | |
| return 0.0 | |
| return r_squared / (1 - r_squared) | |
| def interpret_effect_size(self, value: float, metric_type: str) -> str: | |
| """ | |
| Provide interpretation for effect sizes. | |
| Args: | |
| value: Effect size value | |
| metric_type: Type of effect size ('eta_squared', 'cohens_d', 'r_squared', 'cohens_f') | |
| Returns: | |
| str: Interpretation (Small, Medium, Large) | |
| """ | |
| if metric_type == 'eta_squared' or metric_type == 'partial_eta_squared': | |
| if value < 0.01: | |
| return "Small" | |
| elif value < 0.06: | |
| return "Small" | |
| elif value < 0.14: | |
| return "Medium" | |
| else: | |
| return "Large" | |
| elif metric_type == 'cohens_d': | |
| abs_value = abs(value) | |
| if abs_value < 0.2: | |
| return "Small" | |
| elif abs_value < 0.5: | |
| return "Small" | |
| elif abs_value < 0.8: | |
| return "Medium" | |
| else: | |
| return "Large" | |
| elif metric_type == 'r_squared': | |
| if value < 0.01: | |
| return "Small" | |
| elif value < 0.09: | |
| return "Small" | |
| elif value < 0.25: | |
| return "Medium" | |
| else: | |
| return "Large" | |
| elif metric_type == 'cohens_f': | |
| if value < 0.02: | |
| return "Small" | |
| elif value < 0.15: | |
| return "Small" | |
| elif value < 0.35: | |
| return "Medium" | |
| else: | |
| return "Large" | |
| else: | |
| return "Unknown" | |
| def perform_one_way_anova(self, df: pd.DataFrame, x_column: str, y_column: str) -> Dict[str, Any]: | |
| """ | |
| Perform one-way ANOVA analysis. | |
| Args: | |
| df: DataFrame containing the data | |
| x_column: Categorical column (groups) | |
| y_column: Numeric column (dependent variable) | |
| Returns: | |
| Dict containing ANOVA results and effect sizes | |
| """ | |
| try: | |
| # Remove missing values | |
| clean_df = df[[x_column, y_column]].dropna() | |
| if len(clean_df) < 3: | |
| return {"error": "Insufficient data for ANOVA (need at least 3 observations)"} | |
| # Get groups | |
| groups = [group[y_column].values for name, group in clean_df.groupby(x_column)] | |
| # Check if we have at least 2 groups with data | |
| valid_groups = [g for g in groups if len(g) > 0] | |
| if len(valid_groups) < 2: | |
| return {"error": "Need at least 2 groups for ANOVA"} | |
| # Perform ANOVA | |
| f_stat, p_value = f_oneway(*valid_groups) | |
| # Calculate effect size (eta-squared) | |
| group_data = [] | |
| group_names = [] | |
| for name, group in clean_df.groupby(x_column): | |
| if len(group) > 0: | |
| group_data.append(group[y_column].values) | |
| group_names.append(name) | |
| # Calculate sums of squares | |
| grand_mean = clean_df[y_column].mean() | |
| ss_total = np.sum((clean_df[y_column] - grand_mean) ** 2) | |
| ss_between = 0 | |
| for group in group_data: | |
| ss_between += len(group) * (np.mean(group) - grand_mean) ** 2 | |
| eta_squared = self.calculate_eta_squared(ss_between, ss_total) | |
| # Degrees of freedom | |
| df_between = len(valid_groups) - 1 | |
| df_within = len(clean_df) - len(valid_groups) | |
| results = { | |
| "test_type": "One-way ANOVA", | |
| "f_statistic": f_stat, | |
| "p_value": p_value, | |
| "df_between": df_between, | |
| "df_within": df_within, | |
| "eta_squared": eta_squared, | |
| "eta_squared_interpretation": self.interpret_effect_size(eta_squared, "eta_squared"), | |
| "sample_size": len(clean_df), | |
| "groups": group_names, | |
| "group_means": [np.mean(group) for group in group_data], | |
| "group_sizes": [len(group) for group in group_data] | |
| } | |
| # Post hoc analysis if significant and more than 2 groups | |
| if p_value < 0.05 and len(valid_groups) > 2: | |
| try: | |
| posthoc_results = [] | |
| for i in range(len(group_data)): | |
| for j in range(i + 1, len(group_data)): | |
| # Calculate Cohen's d for this pair | |
| cohens_d = self.calculate_cohens_d(group_data[i], group_data[j]) | |
| # Simple t-test for this pair (for p-value) | |
| t_stat, t_p = stats.ttest_ind(group_data[i], group_data[j]) | |
| posthoc_results.append({ | |
| "group1": group_names[i], | |
| "group2": group_names[j], | |
| "cohens_d": cohens_d, | |
| "cohens_d_interpretation": self.interpret_effect_size(cohens_d, "cohens_d"), | |
| "p_value": t_p, | |
| "mean_diff": np.mean(group_data[i]) - np.mean(group_data[j]) | |
| }) | |
| results["posthoc"] = posthoc_results | |
| except Exception as e: | |
| results["posthoc_error"] = f"Error in post hoc analysis: {str(e)}" | |
| return results | |
| except Exception as e: | |
| return {"error": f"Error performing ANOVA: {str(e)}"} | |
| def perform_two_way_anova(self, df: pd.DataFrame, x_column: str, y_column: str, color_column: str) -> Dict[str, Any]: | |
| """ | |
| Perform two-way ANOVA analysis. | |
| Args: | |
| df: DataFrame containing the data | |
| x_column: First factor (categorical) | |
| y_column: Dependent variable (numeric) | |
| color_column: Second factor (categorical) | |
| Returns: | |
| Dict containing two-way ANOVA results and effect sizes | |
| """ | |
| try: | |
| # Remove missing values | |
| clean_df = df[[x_column, y_column, color_column]].dropna() | |
| if len(clean_df) < 6: # Need minimum samples for 2-way ANOVA | |
| return {"error": "Insufficient data for two-way ANOVA (need at least 6 observations)"} | |
| # Get factor levels | |
| factor1_levels = clean_df[x_column].unique() | |
| factor2_levels = clean_df[color_column].unique() | |
| if len(factor1_levels) < 2 or len(factor2_levels) < 2: | |
| return {"error": "Need at least 2 levels per factor for two-way ANOVA"} | |
| # Manual two-way ANOVA calculation | |
| grand_mean = clean_df[y_column].mean() | |
| n_total = len(clean_df) | |
| # Calculate sums of squares | |
| ss_total = np.sum((clean_df[y_column] - grand_mean) ** 2) | |
| # Factor A (x_column) effect | |
| ss_a = 0 | |
| for level in factor1_levels: | |
| group_data = clean_df[clean_df[x_column] == level][y_column] | |
| if len(group_data) > 0: | |
| ss_a += len(group_data) * (np.mean(group_data) - grand_mean) ** 2 | |
| # Factor B (color_column) effect | |
| ss_b = 0 | |
| for level in factor2_levels: | |
| group_data = clean_df[clean_df[color_column] == level][y_column] | |
| if len(group_data) > 0: | |
| ss_b += len(group_data) * (np.mean(group_data) - grand_mean) ** 2 | |
| # Interaction effect | |
| ss_ab = 0 | |
| for a_level in factor1_levels: | |
| for b_level in factor2_levels: | |
| cell_data = clean_df[(clean_df[x_column] == a_level) & (clean_df[color_column] == b_level)][y_column] | |
| if len(cell_data) > 0: | |
| # Cell mean | |
| cell_mean = np.mean(cell_data) | |
| # Marginal means | |
| a_mean = np.mean(clean_df[clean_df[x_column] == a_level][y_column]) | |
| b_mean = np.mean(clean_df[clean_df[color_column] == b_level][y_column]) | |
| # Interaction sum of squares | |
| ss_ab += len(cell_data) * (cell_mean - a_mean - b_mean + grand_mean) ** 2 | |
| # Error sum of squares | |
| ss_error = ss_total - ss_a - ss_b - ss_ab | |
| # Degrees of freedom | |
| df_a = len(factor1_levels) - 1 | |
| df_b = len(factor2_levels) - 1 | |
| df_ab = df_a * df_b | |
| df_error = n_total - len(factor1_levels) * len(factor2_levels) | |
| # Mean squares | |
| ms_a = ss_a / df_a if df_a > 0 else 0 | |
| ms_b = ss_b / df_b if df_b > 0 else 0 | |
| ms_ab = ss_ab / df_ab if df_ab > 0 else 0 | |
| ms_error = ss_error / df_error if df_error > 0 else 1 | |
| # F statistics | |
| f_a = ms_a / ms_error if ms_error > 0 else 0 | |
| f_b = ms_b / ms_error if ms_error > 0 else 0 | |
| f_ab = ms_ab / ms_error if ms_error > 0 else 0 | |
| # P values | |
| p_a = 1 - stats.f.cdf(f_a, df_a, df_error) if f_a > 0 else 1 | |
| p_b = 1 - stats.f.cdf(f_b, df_b, df_error) if f_b > 0 else 1 | |
| p_ab = 1 - stats.f.cdf(f_ab, df_ab, df_error) if f_ab > 0 else 1 | |
| # Effect sizes (partial eta squared) | |
| eta_squared_a = self.calculate_partial_eta_squared(ss_a, ss_error) | |
| eta_squared_b = self.calculate_partial_eta_squared(ss_b, ss_error) | |
| eta_squared_ab = self.calculate_partial_eta_squared(ss_ab, ss_error) | |
| results = { | |
| "test_type": "Two-way ANOVA", | |
| "factor_a": { | |
| "name": x_column, | |
| "f_statistic": f_a, | |
| "p_value": p_a, | |
| "df": df_a, | |
| "partial_eta_squared": eta_squared_a, | |
| "interpretation": self.interpret_effect_size(eta_squared_a, "partial_eta_squared") | |
| }, | |
| "factor_b": { | |
| "name": color_column, | |
| "f_statistic": f_b, | |
| "p_value": p_b, | |
| "df": df_b, | |
| "partial_eta_squared": eta_squared_b, | |
| "interpretation": self.interpret_effect_size(eta_squared_b, "partial_eta_squared") | |
| }, | |
| "interaction": { | |
| "name": f"{x_column} × {color_column}", | |
| "f_statistic": f_ab, | |
| "p_value": p_ab, | |
| "df": df_ab, | |
| "partial_eta_squared": eta_squared_ab, | |
| "interpretation": self.interpret_effect_size(eta_squared_ab, "partial_eta_squared") | |
| }, | |
| "df_error": df_error, | |
| "sample_size": n_total, | |
| "factor_a_levels": list(factor1_levels), | |
| "factor_b_levels": list(factor2_levels) | |
| } | |
| return results | |
| except Exception as e: | |
| return {"error": f"Error performing two-way ANOVA: {str(e)}"} | |
| def perform_simple_regression(self, df: pd.DataFrame, x_column: str, y_column: str) -> Dict[str, Any]: | |
| """ | |
| Perform simple linear regression analysis. | |
| Args: | |
| df: DataFrame containing the data | |
| x_column: Independent variable (numeric) | |
| y_column: Dependent variable (numeric) | |
| Returns: | |
| Dict containing regression results and effect sizes | |
| """ | |
| try: | |
| # Remove missing values | |
| clean_df = df[[x_column, y_column]].dropna() | |
| if len(clean_df) < 3: | |
| return {"error": "Insufficient data for regression (need at least 3 observations)"} | |
| x = clean_df[x_column].values | |
| y = clean_df[y_column].values | |
| # Calculate correlation | |
| correlation, corr_p = stats.pearsonr(x, y) | |
| # Simple linear regression | |
| slope, intercept, r_value, p_value, std_err = stats.linregress(x, y) | |
| # Calculate additional statistics | |
| r_squared = r_value ** 2 | |
| adjusted_r_squared = 1 - (1 - r_squared) * (len(clean_df) - 1) / (len(clean_df) - 2) | |
| # Effect sizes | |
| cohens_f_squared = self.calculate_cohens_f_squared(r_squared) | |
| # Standard error of the slope | |
| n = len(clean_df) | |
| mean_x = np.mean(x) | |
| ss_x = np.sum((x - mean_x) ** 2) | |
| mse = np.sum((y - (slope * x + intercept)) ** 2) / (n - 2) | |
| se_slope = np.sqrt(mse / ss_x) | |
| # t-statistic for slope | |
| t_stat = slope / se_slope if se_slope > 0 else 0 | |
| results = { | |
| "test_type": "Simple Linear Regression", | |
| "correlation": { | |
| "pearson_r": correlation, | |
| "p_value": corr_p, | |
| "interpretation": self.interpret_effect_size(abs(correlation), "r_squared") | |
| }, | |
| "regression": { | |
| "slope": slope, | |
| "intercept": intercept, | |
| "r_squared": r_squared, | |
| "adjusted_r_squared": adjusted_r_squared, | |
| "p_value": p_value, | |
| "standard_error": std_err, | |
| "t_statistic": t_stat, | |
| "cohens_f_squared": cohens_f_squared, | |
| "f_squared_interpretation": self.interpret_effect_size(cohens_f_squared, "cohens_f") | |
| }, | |
| "sample_size": len(clean_df), | |
| "variance_explained": f"{r_squared * 100:.1f}%" | |
| } | |
| return results | |
| except Exception as e: | |
| return {"error": f"Error performing regression: {str(e)}"} | |