Spaces:
Building
Building
| """ | |
| Frequency Analysis Module for Word Frequency Visualization | |
| This module provides functionality to analyze word frequency data from various file formats, | |
| create histogram data, and sample representative words for each frequency bin. | |
| Supports flexible column mapping for diverse frequency data formats. | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Tuple, Optional, Union | |
| import logging | |
| import random | |
| from io import StringIO | |
| import csv | |
| logger = logging.getLogger(__name__) | |
| class FrequencyAnalyzer: | |
| """ | |
| Analyzes word frequency data and provides visualization-ready outputs. | |
| Supports flexible column mapping for various frequency data formats. | |
| Can handle both traditional 'Type'/'Freq' format and modern multi-column formats. | |
| """ | |
| # Default column names to try for auto-detection | |
| DEFAULT_WORD_COLUMNS = ['lForm', 'lemma', 'word', 'Type', 'surface_form'] | |
| DEFAULT_FREQUENCY_COLUMNS = ['frequency', 'freq', 'Freq', 'pmw', 'NormFreq'] | |
| DEFAULT_POS_COLUMNS = ['pos', 'POS', 'tag'] | |
| def __init__(self, file_size_limit_mb: int = 300): | |
| """ | |
| Initialize the frequency analyzer. | |
| Args: | |
| file_size_limit_mb: Maximum file size limit in MB for uploads | |
| """ | |
| self.data = None | |
| self.original_data = None | |
| self.column_config = None | |
| self.file_size_limit = file_size_limit_mb * 1024 * 1024 | |
| self.detected_columns = None | |
| def detect_file_format(self, content: Union[str, bytes]) -> Dict[str, any]: | |
| """ | |
| Detect file format and separator. | |
| Args: | |
| content: File content as string or bytes | |
| Returns: | |
| Dict with format information | |
| """ | |
| if isinstance(content, bytes): | |
| content = content.decode('utf-8') | |
| # Check file size | |
| if len(content.encode('utf-8')) > self.file_size_limit: | |
| raise ValueError(f"File too large. Maximum size is {self.file_size_limit // (1024*1024)}MB") | |
| # Detect separator by checking first few lines | |
| lines = content.strip().split('\n')[:5] | |
| separators = ['\t', ',', ';', '|'] | |
| best_sep = '\t' | |
| max_columns = 0 | |
| for sep in separators: | |
| avg_cols = np.mean([len(line.split(sep)) for line in lines]) | |
| if avg_cols > max_columns: | |
| max_columns = avg_cols | |
| best_sep = sep | |
| # Detect if first row is header | |
| first_line = lines[0].split(best_sep) | |
| second_line = lines[1].split(best_sep) if len(lines) > 1 else [] | |
| # Simple heuristic: if first row contains mostly strings and second row has numbers | |
| has_header = True | |
| if len(second_line) > 0: | |
| try: | |
| # Try to convert second row elements to numbers | |
| numeric_count = sum(1 for x in second_line if self._is_numeric(x.strip())) | |
| if numeric_count > len(second_line) * 0.3: # If >30% are numeric | |
| has_header = True | |
| except: | |
| has_header = False | |
| return { | |
| 'separator': best_sep, | |
| 'has_header': has_header, | |
| 'estimated_columns': int(max_columns), | |
| 'sample_lines': lines[:3] | |
| } | |
| def _is_numeric(self, value: str) -> bool: | |
| """Check if a string value is numeric.""" | |
| try: | |
| float(value) | |
| return True | |
| except (ValueError, TypeError): | |
| return False | |
| def detect_columns(self, df: pd.DataFrame) -> Dict[str, List[str]]: | |
| """ | |
| Detect and categorize columns by data type and content. | |
| Args: | |
| df: DataFrame to analyze | |
| Returns: | |
| Dict with categorized column lists | |
| """ | |
| word_candidates = [] | |
| frequency_candidates = [] | |
| pos_candidates = [] | |
| other_columns = [] | |
| for col in df.columns: | |
| col_str = str(col).lower() | |
| # Check if column contains string data (potential word column) | |
| if df[col].dtype == 'object': | |
| # Check if it looks like words (not mostly numbers) | |
| sample_values = df[col].dropna().head(100) | |
| if len(sample_values) > 0: | |
| non_numeric_ratio = sum(1 for x in sample_values if not self._is_numeric(str(x))) / len(sample_values) | |
| if non_numeric_ratio > 0.8: # >80% non-numeric | |
| if any(word in col_str for word in ['form', 'lemma', 'word', 'type']): | |
| word_candidates.append(col) | |
| elif any(pos in col_str for pos in ['pos', 'tag', 'part']): | |
| pos_candidates.append(col) | |
| else: | |
| word_candidates.append(col) # Default string columns to word candidates | |
| # Check if column contains numeric data (potential frequency column) | |
| elif pd.api.types.is_numeric_dtype(df[col]): | |
| # Skip rank columns (usually sequential integers starting from 1) | |
| if col_str in ['rank', 'index'] or (df[col].equals(pd.Series(range(1, len(df) + 1)))): | |
| other_columns.append(col) | |
| else: | |
| frequency_candidates.append(col) | |
| else: | |
| other_columns.append(col) | |
| # Sort candidates by preference based on common naming patterns | |
| word_candidates = self._sort_by_preference(word_candidates, self.DEFAULT_WORD_COLUMNS) | |
| frequency_candidates = self._sort_by_preference(frequency_candidates, self.DEFAULT_FREQUENCY_COLUMNS) | |
| pos_candidates = self._sort_by_preference(pos_candidates, self.DEFAULT_POS_COLUMNS) | |
| return { | |
| 'word_columns': word_candidates, | |
| 'frequency_columns': frequency_candidates, | |
| 'pos_columns': pos_candidates, | |
| 'other_columns': other_columns | |
| } | |
| def _sort_by_preference(self, columns: List[str], preferred_order: List[str]) -> List[str]: | |
| """Sort columns by preference order.""" | |
| sorted_cols = [] | |
| remaining_cols = columns.copy() | |
| # Add preferred columns first | |
| for pref in preferred_order: | |
| for col in columns: | |
| if pref.lower() in str(col).lower() and col in remaining_cols: | |
| sorted_cols.append(col) | |
| remaining_cols.remove(col) | |
| break | |
| # Add remaining columns | |
| sorted_cols.extend(remaining_cols) | |
| return sorted_cols | |
| def load_frequency_data(self, content: Union[str, bytes], column_config: Dict[str, str]) -> pd.DataFrame: | |
| """ | |
| Load and validate frequency data with flexible column mapping. | |
| Args: | |
| content: File content as string or bytes | |
| column_config: Column mapping configuration | |
| { | |
| 'word_column': 'lForm', | |
| 'frequency_column': 'frequency', | |
| 'pos_column': 'pos', # optional | |
| 'separator': '\t' # optional, will auto-detect if not provided | |
| } | |
| Returns: | |
| pd.DataFrame: Loaded and validated frequency data | |
| Raises: | |
| ValueError: If data format is invalid or columns not found | |
| """ | |
| try: | |
| # Handle both string and bytes input | |
| if isinstance(content, bytes): | |
| content = content.decode('utf-8') | |
| # Auto-detect format if separator not provided | |
| if 'separator' not in column_config: | |
| format_info = self.detect_file_format(content) | |
| separator = format_info['separator'] | |
| has_header = format_info['has_header'] | |
| else: | |
| separator = column_config['separator'] | |
| has_header = column_config.get('has_header', True) | |
| # Read data | |
| df = pd.read_csv(StringIO(content), sep=separator, header=0 if has_header else None, | |
| quoting=csv.QUOTE_MINIMAL, quotechar='"') | |
| # Store column configuration | |
| self.column_config = column_config.copy() | |
| self.column_config['separator'] = separator | |
| self.column_config['has_header'] = has_header | |
| # Detect available columns | |
| self.detected_columns = self.detect_columns(df) | |
| # Validate column configuration | |
| if not self.validate_column_config(df, column_config): | |
| raise ValueError("Invalid column configuration") | |
| # Clean and prepare data with flexible column mapping | |
| df = self._clean_data_flexible(df, column_config) | |
| # Store data | |
| self.original_data = df.copy() | |
| self.data = df | |
| logger.info(f"Loaded {len(df)} frequency entries with columns: {list(df.columns)}") | |
| return df | |
| except Exception as e: | |
| logger.error(f"Error loading frequency data: {str(e)}") | |
| raise ValueError(f"Failed to load frequency data: {str(e)}") | |
| def validate_column_config(self, df: pd.DataFrame, column_config: Dict[str, str]) -> bool: | |
| """ | |
| Validate that the specified columns exist and contain appropriate data. | |
| Args: | |
| df: DataFrame to validate | |
| column_config: Column configuration | |
| Returns: | |
| bool: True if configuration is valid | |
| """ | |
| # Check required columns exist | |
| word_col = column_config.get('word_column') | |
| freq_col = column_config.get('frequency_column') | |
| if not word_col or word_col not in df.columns: | |
| logger.error(f"Word column '{word_col}' not found in data") | |
| return False | |
| if not freq_col or freq_col not in df.columns: | |
| logger.error(f"Frequency column '{freq_col}' not found in data") | |
| return False | |
| # Check that word column contains string data | |
| if df[word_col].dtype != 'object': | |
| logger.error(f"Word column '{word_col}' must contain text data") | |
| return False | |
| # Check that frequency column contains numeric data | |
| if not pd.api.types.is_numeric_dtype(df[freq_col]): | |
| logger.error(f"Frequency column '{freq_col}' must contain numeric data") | |
| return False | |
| # Check optional POS column if specified | |
| pos_col = column_config.get('pos_column') | |
| if pos_col and pos_col not in df.columns: | |
| logger.warning(f"POS column '{pos_col}' not found in data, skipping") | |
| return True | |
| def _clean_data_flexible(self, df: pd.DataFrame, column_config: Dict[str, str]) -> pd.DataFrame: | |
| """ | |
| Clean and prepare the frequency data with flexible column mapping. | |
| Args: | |
| df: Raw DataFrame | |
| column_config: Column configuration | |
| Returns: | |
| pd.DataFrame: Cleaned DataFrame with standardized column names | |
| """ | |
| word_col = column_config['word_column'] | |
| freq_col = column_config['frequency_column'] | |
| pos_col = column_config.get('pos_column') | |
| # Create a copy and rename columns to standard names for compatibility | |
| df_clean = df.copy() | |
| # Remove rows with missing word or frequency data | |
| df_clean = df_clean.dropna(subset=[word_col, freq_col]) | |
| # Ensure frequency is numeric | |
| df_clean[freq_col] = pd.to_numeric(df_clean[freq_col], errors='coerce') | |
| df_clean = df_clean.dropna(subset=[freq_col]) | |
| # Remove zero or negative frequencies | |
| df_clean = df_clean[df_clean[freq_col] > 0] | |
| # Clean word column (remove extra whitespace) | |
| df_clean[word_col] = df_clean[word_col].astype(str).str.strip() | |
| # Add standardized column names for backward compatibility | |
| df_clean['Type'] = df_clean[word_col] | |
| df_clean['Freq'] = df_clean[freq_col] | |
| # Add POS column if available | |
| if pos_col and pos_col in df_clean.columns: | |
| df_clean['POS'] = df_clean[pos_col] | |
| # Sort by frequency (descending) for better analysis | |
| df_clean = df_clean.sort_values(freq_col, ascending=False).reset_index(drop=True) | |
| return df_clean | |
| def get_available_frequency_columns(self) -> List[str]: | |
| """ | |
| Get list of available frequency columns for analysis. | |
| Returns: | |
| List[str]: Available frequency columns from the detected columns | |
| """ | |
| if self.detected_columns is None: | |
| return [] | |
| return self.detected_columns.get('frequency_columns', []) | |
| def get_available_word_columns(self) -> List[str]: | |
| """ | |
| Get list of available word columns. | |
| Returns: | |
| List[str]: Available word columns from the detected columns | |
| """ | |
| if self.detected_columns is None: | |
| return [] | |
| return self.detected_columns.get('word_columns', []) | |
| def create_multi_frequency_analysis(self, frequency_columns: List[str], bin_size: int = 500, log_transform: bool = False) -> Dict[str, Dict]: | |
| """ | |
| Create rank-based analysis for multiple frequency columns. | |
| Args: | |
| frequency_columns: List of frequency column names to analyze | |
| bin_size: Number of words per rank group | |
| log_transform: Whether to apply log10 transformation | |
| Returns: | |
| Dict mapping column names to their analysis results | |
| """ | |
| if self.original_data is None: | |
| raise ValueError("No data loaded") | |
| results = {} | |
| for freq_col in frequency_columns: | |
| if freq_col not in self.original_data.columns: | |
| logger.warning(f"Frequency column '{freq_col}' not found, skipping") | |
| continue | |
| try: | |
| # Create analysis for this frequency column | |
| analysis = self.create_rank_based_visualization_flexible( | |
| column=freq_col, | |
| bin_size=bin_size, | |
| log_transform=log_transform | |
| ) | |
| results[freq_col] = analysis | |
| except Exception as e: | |
| logger.error(f"Error analyzing column '{freq_col}': {e}") | |
| continue | |
| return results | |
| def create_rank_based_visualization_flexible(self, column: str, bin_size: int = 500, log_transform: bool = False, max_words_to_retain: Optional[int] = None) -> Dict: | |
| """ | |
| Create rank-based visualization with flexible column support. | |
| Args: | |
| column: Column name to analyze (can be any numeric column) | |
| bin_size: Number of words per rank group | |
| log_transform: Whether to apply log10 transformation | |
| max_words_to_retain: Maximum number of top frequent words to retain for analysis | |
| Returns: | |
| Dict: Rank-based visualization data | |
| """ | |
| if self.original_data is None: | |
| raise ValueError("No data loaded") | |
| if column not in self.original_data.columns: | |
| raise ValueError(f"Column '{column}' not found in data") | |
| # Get word column from config or use default | |
| word_col = self.column_config.get('word_column', 'Type') if self.column_config else 'Type' | |
| if word_col not in self.original_data.columns: | |
| word_col = 'Type' # Fallback to standardized column | |
| # Sort by the specified frequency column (descending) | |
| sorted_data = self.original_data.sort_values(column, ascending=False).reset_index(drop=True) | |
| # Apply word limit if specified | |
| if max_words_to_retain and max_words_to_retain < len(sorted_data): | |
| sorted_data = sorted_data.head(max_words_to_retain) | |
| logger.info(f"Limited analysis to top {max_words_to_retain} words") | |
| # Create bins by slicing exactly bin_size words | |
| group_labels = [] | |
| group_centers = [] | |
| avg_frequencies = [] | |
| sample_words = {} | |
| group_stats_list = [] | |
| # Limit to top 20 bins for better UI performance | |
| max_display_bins = 20 | |
| for i in range(0, len(sorted_data), bin_size): | |
| if len(group_labels) >= max_display_bins: | |
| break | |
| end_idx = min(i + bin_size, len(sorted_data)) | |
| bin_data = sorted_data[i:end_idx] | |
| # Calculate group boundaries | |
| start_rank = i + 1 | |
| end_rank = end_idx | |
| group_label = f"{start_rank}-{end_rank}" | |
| group_labels.append(group_label) | |
| group_centers.append((start_rank + end_rank) / 2) | |
| # Calculate average frequency | |
| avg_freq = bin_data[column].mean() | |
| if log_transform: | |
| avg_freq = np.log10(avg_freq + 1e-10) | |
| avg_frequencies.append(avg_freq) | |
| # Get sample words (5 randomly sampled from this bin) | |
| n_samples = min(5, len(bin_data)) | |
| if n_samples > 0: | |
| if n_samples == len(bin_data): | |
| # If fewer than 5 words, take all | |
| sample_word_list = bin_data[word_col].tolist() | |
| else: | |
| # Randomly sample 5 words | |
| sample_indices = random.sample(range(len(bin_data)), n_samples) | |
| sample_word_list = [bin_data.iloc[idx][word_col] for idx in sample_indices] | |
| else: | |
| sample_word_list = [] | |
| group_idx = len(group_labels) - 1 | |
| sample_words[group_idx] = [{'word': word, 'group': group_label} for word in sample_word_list] | |
| # Store group statistics | |
| group_stats_list.append({ | |
| 'group_idx': group_idx, | |
| f'{column}_mean': bin_data[column].mean(), | |
| f'{column}_count': len(bin_data), | |
| f'{column}_min': bin_data[column].min(), | |
| f'{column}_max': bin_data[column].max(), | |
| 'start_rank': start_rank, | |
| 'end_rank': end_rank | |
| }) | |
| # Create a DataFrame for group stats | |
| group_stats = pd.DataFrame(group_stats_list) | |
| # Create title suffix with word limit info | |
| title_parts = [f"Bin Size: {bin_size}"] | |
| if max_words_to_retain: | |
| title_parts.append(f"Top {max_words_to_retain:,} words") | |
| title_parts.append(f"{'Log₁₀ ' if log_transform else ''}{column}") | |
| title_suffix = " (" + ", ".join(title_parts) + ")" | |
| return { | |
| 'group_labels': group_labels, | |
| 'group_centers': group_centers, | |
| 'avg_frequencies': avg_frequencies, | |
| 'group_stats': group_stats, | |
| 'sample_words': sample_words, | |
| 'bin_size': bin_size, | |
| 'column': column, | |
| 'log_transform': log_transform, | |
| 'max_words_to_retain': max_words_to_retain, | |
| 'total_groups': len(group_labels), | |
| 'title_suffix': title_suffix, | |
| 'x_label': f"Rank Groups (bin size: {bin_size})", | |
| 'y_label': f"{'Log₁₀ ' if log_transform else ''}Average {column}" | |
| } | |
| # Legacy methods for backward compatibility | |
| def validate_format(self, df: pd.DataFrame) -> bool: | |
| """Legacy method for backward compatibility.""" | |
| return 'Type' in df.columns and 'Freq' in df.columns | |
| def get_available_columns(self) -> List[str]: | |
| """Legacy method for backward compatibility.""" | |
| if self.data is None: | |
| return [] | |
| freq_columns = [] | |
| if 'Freq' in self.data.columns: | |
| freq_columns.append('Freq') | |
| if 'NormFreq' in self.data.columns: | |
| freq_columns.append('NormFreq') | |
| return freq_columns | |
| def create_histogram_data(self, column: str = 'Freq', bins: int = 25, log_transform: bool = False) -> Dict: | |
| """Legacy histogram method for backward compatibility.""" | |
| if self.data is None: | |
| raise ValueError("No data loaded") | |
| if column not in self.data.columns: | |
| raise ValueError(f"Column '{column}' not found in data") | |
| # Get frequency values | |
| freq_values = self.data[column].copy() | |
| # Apply log transformation if requested | |
| if log_transform: | |
| freq_values = np.log10(freq_values + 1e-10) | |
| title_suffix = f" (Log₁₀ {column})" | |
| x_label = f"Log₁₀ {column}" | |
| else: | |
| title_suffix = f" ({column})" | |
| x_label = column | |
| # Create histogram | |
| counts, bin_edges = np.histogram(freq_values, bins=bins) | |
| bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 | |
| bin_widths = bin_edges[1:] - bin_edges[:-1] | |
| return { | |
| 'counts': counts, | |
| 'bin_edges': bin_edges, | |
| 'bin_centers': bin_centers, | |
| 'bin_widths': bin_widths, | |
| 'freq_values': freq_values, | |
| 'original_column': column, | |
| 'log_transform': log_transform, | |
| 'title_suffix': title_suffix, | |
| 'x_label': x_label, | |
| 'total_words': len(self.data) | |
| } | |
| def sample_words_per_bin(self, histogram_data: Dict, samples_per_bin: int = 5) -> Dict[int, List[Dict]]: | |
| """Legacy word sampling method for backward compatibility.""" | |
| if self.data is None: | |
| raise ValueError("No data loaded") | |
| bin_edges = histogram_data['bin_edges'] | |
| freq_values = histogram_data['freq_values'] | |
| original_column = histogram_data['original_column'] | |
| sampled_words = {} | |
| for i in range(len(bin_edges) - 1): | |
| bin_start = bin_edges[i] | |
| bin_end = bin_edges[i + 1] | |
| # Find words in this bin | |
| if i == len(bin_edges) - 2: # Last bin, include right edge | |
| mask = (freq_values >= bin_start) & (freq_values <= bin_end) | |
| else: | |
| mask = (freq_values >= bin_start) & (freq_values < bin_end) | |
| bin_words = self.data[mask] | |
| if len(bin_words) > 0: | |
| # Sample words (up to samples_per_bin) | |
| n_samples = min(samples_per_bin, len(bin_words)) | |
| sampled = bin_words.sample(n=n_samples, random_state=42) | |
| # Create word info list | |
| word_list = [] | |
| for _, word_row in sampled.iterrows(): | |
| word_info = { | |
| 'word': word_row['Type'], | |
| 'freq': word_row[original_column], | |
| 'rank': word_row.get('Rank', 'N/A'), | |
| 'original_freq': word_row['Freq'] if original_column != 'Freq' else word_row['Freq'] | |
| } | |
| word_list.append(word_info) | |
| sampled_words[i] = word_list | |
| else: | |
| sampled_words[i] = [] | |
| return sampled_words | |
| def create_rank_based_visualization(self, column: str = 'Freq', bin_size: int = 500, log_transform: bool = False) -> Dict: | |
| """Legacy rank-based visualization method for backward compatibility.""" | |
| return self.create_rank_based_visualization_flexible(column, bin_size, log_transform) | |
| def calculate_statistics(self, column: str = 'Freq') -> Dict: | |
| """Calculate descriptive statistics for the frequency data.""" | |
| if self.data is None: | |
| raise ValueError("No data loaded") | |
| if column not in self.data.columns: | |
| raise ValueError(f"Column '{column}' not found in data") | |
| freq_values = self.data[column] | |
| stats = { | |
| 'count': len(freq_values), | |
| 'mean': float(freq_values.mean()), | |
| 'median': float(freq_values.median()), | |
| 'std': float(freq_values.std()), | |
| 'min': float(freq_values.min()), | |
| 'max': float(freq_values.max()), | |
| 'q25': float(freq_values.quantile(0.25)), | |
| 'q75': float(freq_values.quantile(0.75)), | |
| 'skewness': float(freq_values.skew()), | |
| 'column_name': column | |
| } | |
| # Add some additional insights | |
| stats['range'] = stats['max'] - stats['min'] | |
| stats['iqr'] = stats['q75'] - stats['q25'] | |
| stats['cv'] = stats['std'] / stats['mean'] if stats['mean'] != 0 else 0 | |
| return stats | |
| def get_top_words(self, column: str = 'Freq', n: int = 10) -> List[Dict]: | |
| """Get the top N words by frequency.""" | |
| if self.data is None: | |
| raise ValueError("No data loaded") | |
| if column not in self.data.columns: | |
| raise ValueError(f"Column '{column}' not found in data") | |
| top_words = self.data.nlargest(n, column) | |
| result = [] | |
| for _, row in top_words.iterrows(): | |
| word_info = { | |
| 'word': row['Type'], | |
| 'freq': row[column], | |
| 'rank': row.get('Rank', 'N/A'), | |
| 'original_freq': row['Freq'] | |
| } | |
| result.append(word_info) | |
| return result | |