simple-text-analyzer / text_analyzer /frequency_analyzer.py
egumasa's picture
emuTAALES
e7279e4
"""
Frequency Analysis Module for Word Frequency Visualization
This module provides functionality to analyze word frequency data from various file formats,
create histogram data, and sample representative words for each frequency bin.
Supports flexible column mapping for diverse frequency data formats.
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional, Union
import logging
import random
from io import StringIO
import csv
logger = logging.getLogger(__name__)
class FrequencyAnalyzer:
"""
Analyzes word frequency data and provides visualization-ready outputs.
Supports flexible column mapping for various frequency data formats.
Can handle both traditional 'Type'/'Freq' format and modern multi-column formats.
"""
# Default column names to try for auto-detection
DEFAULT_WORD_COLUMNS = ['lForm', 'lemma', 'word', 'Type', 'surface_form']
DEFAULT_FREQUENCY_COLUMNS = ['frequency', 'freq', 'Freq', 'pmw', 'NormFreq']
DEFAULT_POS_COLUMNS = ['pos', 'POS', 'tag']
def __init__(self, file_size_limit_mb: int = 300):
"""
Initialize the frequency analyzer.
Args:
file_size_limit_mb: Maximum file size limit in MB for uploads
"""
self.data = None
self.original_data = None
self.column_config = None
self.file_size_limit = file_size_limit_mb * 1024 * 1024
self.detected_columns = None
def detect_file_format(self, content: Union[str, bytes]) -> Dict[str, any]:
"""
Detect file format and separator.
Args:
content: File content as string or bytes
Returns:
Dict with format information
"""
if isinstance(content, bytes):
content = content.decode('utf-8')
# Check file size
if len(content.encode('utf-8')) > self.file_size_limit:
raise ValueError(f"File too large. Maximum size is {self.file_size_limit // (1024*1024)}MB")
# Detect separator by checking first few lines
lines = content.strip().split('\n')[:5]
separators = ['\t', ',', ';', '|']
best_sep = '\t'
max_columns = 0
for sep in separators:
avg_cols = np.mean([len(line.split(sep)) for line in lines])
if avg_cols > max_columns:
max_columns = avg_cols
best_sep = sep
# Detect if first row is header
first_line = lines[0].split(best_sep)
second_line = lines[1].split(best_sep) if len(lines) > 1 else []
# Simple heuristic: if first row contains mostly strings and second row has numbers
has_header = True
if len(second_line) > 0:
try:
# Try to convert second row elements to numbers
numeric_count = sum(1 for x in second_line if self._is_numeric(x.strip()))
if numeric_count > len(second_line) * 0.3: # If >30% are numeric
has_header = True
except:
has_header = False
return {
'separator': best_sep,
'has_header': has_header,
'estimated_columns': int(max_columns),
'sample_lines': lines[:3]
}
def _is_numeric(self, value: str) -> bool:
"""Check if a string value is numeric."""
try:
float(value)
return True
except (ValueError, TypeError):
return False
def detect_columns(self, df: pd.DataFrame) -> Dict[str, List[str]]:
"""
Detect and categorize columns by data type and content.
Args:
df: DataFrame to analyze
Returns:
Dict with categorized column lists
"""
word_candidates = []
frequency_candidates = []
pos_candidates = []
other_columns = []
for col in df.columns:
col_str = str(col).lower()
# Check if column contains string data (potential word column)
if df[col].dtype == 'object':
# Check if it looks like words (not mostly numbers)
sample_values = df[col].dropna().head(100)
if len(sample_values) > 0:
non_numeric_ratio = sum(1 for x in sample_values if not self._is_numeric(str(x))) / len(sample_values)
if non_numeric_ratio > 0.8: # >80% non-numeric
if any(word in col_str for word in ['form', 'lemma', 'word', 'type']):
word_candidates.append(col)
elif any(pos in col_str for pos in ['pos', 'tag', 'part']):
pos_candidates.append(col)
else:
word_candidates.append(col) # Default string columns to word candidates
# Check if column contains numeric data (potential frequency column)
elif pd.api.types.is_numeric_dtype(df[col]):
# Skip rank columns (usually sequential integers starting from 1)
if col_str in ['rank', 'index'] or (df[col].equals(pd.Series(range(1, len(df) + 1)))):
other_columns.append(col)
else:
frequency_candidates.append(col)
else:
other_columns.append(col)
# Sort candidates by preference based on common naming patterns
word_candidates = self._sort_by_preference(word_candidates, self.DEFAULT_WORD_COLUMNS)
frequency_candidates = self._sort_by_preference(frequency_candidates, self.DEFAULT_FREQUENCY_COLUMNS)
pos_candidates = self._sort_by_preference(pos_candidates, self.DEFAULT_POS_COLUMNS)
return {
'word_columns': word_candidates,
'frequency_columns': frequency_candidates,
'pos_columns': pos_candidates,
'other_columns': other_columns
}
def _sort_by_preference(self, columns: List[str], preferred_order: List[str]) -> List[str]:
"""Sort columns by preference order."""
sorted_cols = []
remaining_cols = columns.copy()
# Add preferred columns first
for pref in preferred_order:
for col in columns:
if pref.lower() in str(col).lower() and col in remaining_cols:
sorted_cols.append(col)
remaining_cols.remove(col)
break
# Add remaining columns
sorted_cols.extend(remaining_cols)
return sorted_cols
def load_frequency_data(self, content: Union[str, bytes], column_config: Dict[str, str]) -> pd.DataFrame:
"""
Load and validate frequency data with flexible column mapping.
Args:
content: File content as string or bytes
column_config: Column mapping configuration
{
'word_column': 'lForm',
'frequency_column': 'frequency',
'pos_column': 'pos', # optional
'separator': '\t' # optional, will auto-detect if not provided
}
Returns:
pd.DataFrame: Loaded and validated frequency data
Raises:
ValueError: If data format is invalid or columns not found
"""
try:
# Handle both string and bytes input
if isinstance(content, bytes):
content = content.decode('utf-8')
# Auto-detect format if separator not provided
if 'separator' not in column_config:
format_info = self.detect_file_format(content)
separator = format_info['separator']
has_header = format_info['has_header']
else:
separator = column_config['separator']
has_header = column_config.get('has_header', True)
# Read data
df = pd.read_csv(StringIO(content), sep=separator, header=0 if has_header else None,
quoting=csv.QUOTE_MINIMAL, quotechar='"')
# Store column configuration
self.column_config = column_config.copy()
self.column_config['separator'] = separator
self.column_config['has_header'] = has_header
# Detect available columns
self.detected_columns = self.detect_columns(df)
# Validate column configuration
if not self.validate_column_config(df, column_config):
raise ValueError("Invalid column configuration")
# Clean and prepare data with flexible column mapping
df = self._clean_data_flexible(df, column_config)
# Store data
self.original_data = df.copy()
self.data = df
logger.info(f"Loaded {len(df)} frequency entries with columns: {list(df.columns)}")
return df
except Exception as e:
logger.error(f"Error loading frequency data: {str(e)}")
raise ValueError(f"Failed to load frequency data: {str(e)}")
def validate_column_config(self, df: pd.DataFrame, column_config: Dict[str, str]) -> bool:
"""
Validate that the specified columns exist and contain appropriate data.
Args:
df: DataFrame to validate
column_config: Column configuration
Returns:
bool: True if configuration is valid
"""
# Check required columns exist
word_col = column_config.get('word_column')
freq_col = column_config.get('frequency_column')
if not word_col or word_col not in df.columns:
logger.error(f"Word column '{word_col}' not found in data")
return False
if not freq_col or freq_col not in df.columns:
logger.error(f"Frequency column '{freq_col}' not found in data")
return False
# Check that word column contains string data
if df[word_col].dtype != 'object':
logger.error(f"Word column '{word_col}' must contain text data")
return False
# Check that frequency column contains numeric data
if not pd.api.types.is_numeric_dtype(df[freq_col]):
logger.error(f"Frequency column '{freq_col}' must contain numeric data")
return False
# Check optional POS column if specified
pos_col = column_config.get('pos_column')
if pos_col and pos_col not in df.columns:
logger.warning(f"POS column '{pos_col}' not found in data, skipping")
return True
def _clean_data_flexible(self, df: pd.DataFrame, column_config: Dict[str, str]) -> pd.DataFrame:
"""
Clean and prepare the frequency data with flexible column mapping.
Args:
df: Raw DataFrame
column_config: Column configuration
Returns:
pd.DataFrame: Cleaned DataFrame with standardized column names
"""
word_col = column_config['word_column']
freq_col = column_config['frequency_column']
pos_col = column_config.get('pos_column')
# Create a copy and rename columns to standard names for compatibility
df_clean = df.copy()
# Remove rows with missing word or frequency data
df_clean = df_clean.dropna(subset=[word_col, freq_col])
# Ensure frequency is numeric
df_clean[freq_col] = pd.to_numeric(df_clean[freq_col], errors='coerce')
df_clean = df_clean.dropna(subset=[freq_col])
# Remove zero or negative frequencies
df_clean = df_clean[df_clean[freq_col] > 0]
# Clean word column (remove extra whitespace)
df_clean[word_col] = df_clean[word_col].astype(str).str.strip()
# Add standardized column names for backward compatibility
df_clean['Type'] = df_clean[word_col]
df_clean['Freq'] = df_clean[freq_col]
# Add POS column if available
if pos_col and pos_col in df_clean.columns:
df_clean['POS'] = df_clean[pos_col]
# Sort by frequency (descending) for better analysis
df_clean = df_clean.sort_values(freq_col, ascending=False).reset_index(drop=True)
return df_clean
def get_available_frequency_columns(self) -> List[str]:
"""
Get list of available frequency columns for analysis.
Returns:
List[str]: Available frequency columns from the detected columns
"""
if self.detected_columns is None:
return []
return self.detected_columns.get('frequency_columns', [])
def get_available_word_columns(self) -> List[str]:
"""
Get list of available word columns.
Returns:
List[str]: Available word columns from the detected columns
"""
if self.detected_columns is None:
return []
return self.detected_columns.get('word_columns', [])
def create_multi_frequency_analysis(self, frequency_columns: List[str], bin_size: int = 500, log_transform: bool = False) -> Dict[str, Dict]:
"""
Create rank-based analysis for multiple frequency columns.
Args:
frequency_columns: List of frequency column names to analyze
bin_size: Number of words per rank group
log_transform: Whether to apply log10 transformation
Returns:
Dict mapping column names to their analysis results
"""
if self.original_data is None:
raise ValueError("No data loaded")
results = {}
for freq_col in frequency_columns:
if freq_col not in self.original_data.columns:
logger.warning(f"Frequency column '{freq_col}' not found, skipping")
continue
try:
# Create analysis for this frequency column
analysis = self.create_rank_based_visualization_flexible(
column=freq_col,
bin_size=bin_size,
log_transform=log_transform
)
results[freq_col] = analysis
except Exception as e:
logger.error(f"Error analyzing column '{freq_col}': {e}")
continue
return results
def create_rank_based_visualization_flexible(self, column: str, bin_size: int = 500, log_transform: bool = False, max_words_to_retain: Optional[int] = None) -> Dict:
"""
Create rank-based visualization with flexible column support.
Args:
column: Column name to analyze (can be any numeric column)
bin_size: Number of words per rank group
log_transform: Whether to apply log10 transformation
max_words_to_retain: Maximum number of top frequent words to retain for analysis
Returns:
Dict: Rank-based visualization data
"""
if self.original_data is None:
raise ValueError("No data loaded")
if column not in self.original_data.columns:
raise ValueError(f"Column '{column}' not found in data")
# Get word column from config or use default
word_col = self.column_config.get('word_column', 'Type') if self.column_config else 'Type'
if word_col not in self.original_data.columns:
word_col = 'Type' # Fallback to standardized column
# Sort by the specified frequency column (descending)
sorted_data = self.original_data.sort_values(column, ascending=False).reset_index(drop=True)
# Apply word limit if specified
if max_words_to_retain and max_words_to_retain < len(sorted_data):
sorted_data = sorted_data.head(max_words_to_retain)
logger.info(f"Limited analysis to top {max_words_to_retain} words")
# Create bins by slicing exactly bin_size words
group_labels = []
group_centers = []
avg_frequencies = []
sample_words = {}
group_stats_list = []
# Limit to top 20 bins for better UI performance
max_display_bins = 20
for i in range(0, len(sorted_data), bin_size):
if len(group_labels) >= max_display_bins:
break
end_idx = min(i + bin_size, len(sorted_data))
bin_data = sorted_data[i:end_idx]
# Calculate group boundaries
start_rank = i + 1
end_rank = end_idx
group_label = f"{start_rank}-{end_rank}"
group_labels.append(group_label)
group_centers.append((start_rank + end_rank) / 2)
# Calculate average frequency
avg_freq = bin_data[column].mean()
if log_transform:
avg_freq = np.log10(avg_freq + 1e-10)
avg_frequencies.append(avg_freq)
# Get sample words (5 randomly sampled from this bin)
n_samples = min(5, len(bin_data))
if n_samples > 0:
if n_samples == len(bin_data):
# If fewer than 5 words, take all
sample_word_list = bin_data[word_col].tolist()
else:
# Randomly sample 5 words
sample_indices = random.sample(range(len(bin_data)), n_samples)
sample_word_list = [bin_data.iloc[idx][word_col] for idx in sample_indices]
else:
sample_word_list = []
group_idx = len(group_labels) - 1
sample_words[group_idx] = [{'word': word, 'group': group_label} for word in sample_word_list]
# Store group statistics
group_stats_list.append({
'group_idx': group_idx,
f'{column}_mean': bin_data[column].mean(),
f'{column}_count': len(bin_data),
f'{column}_min': bin_data[column].min(),
f'{column}_max': bin_data[column].max(),
'start_rank': start_rank,
'end_rank': end_rank
})
# Create a DataFrame for group stats
group_stats = pd.DataFrame(group_stats_list)
# Create title suffix with word limit info
title_parts = [f"Bin Size: {bin_size}"]
if max_words_to_retain:
title_parts.append(f"Top {max_words_to_retain:,} words")
title_parts.append(f"{'Log₁₀ ' if log_transform else ''}{column}")
title_suffix = " (" + ", ".join(title_parts) + ")"
return {
'group_labels': group_labels,
'group_centers': group_centers,
'avg_frequencies': avg_frequencies,
'group_stats': group_stats,
'sample_words': sample_words,
'bin_size': bin_size,
'column': column,
'log_transform': log_transform,
'max_words_to_retain': max_words_to_retain,
'total_groups': len(group_labels),
'title_suffix': title_suffix,
'x_label': f"Rank Groups (bin size: {bin_size})",
'y_label': f"{'Log₁₀ ' if log_transform else ''}Average {column}"
}
# Legacy methods for backward compatibility
def validate_format(self, df: pd.DataFrame) -> bool:
"""Legacy method for backward compatibility."""
return 'Type' in df.columns and 'Freq' in df.columns
def get_available_columns(self) -> List[str]:
"""Legacy method for backward compatibility."""
if self.data is None:
return []
freq_columns = []
if 'Freq' in self.data.columns:
freq_columns.append('Freq')
if 'NormFreq' in self.data.columns:
freq_columns.append('NormFreq')
return freq_columns
def create_histogram_data(self, column: str = 'Freq', bins: int = 25, log_transform: bool = False) -> Dict:
"""Legacy histogram method for backward compatibility."""
if self.data is None:
raise ValueError("No data loaded")
if column not in self.data.columns:
raise ValueError(f"Column '{column}' not found in data")
# Get frequency values
freq_values = self.data[column].copy()
# Apply log transformation if requested
if log_transform:
freq_values = np.log10(freq_values + 1e-10)
title_suffix = f" (Log₁₀ {column})"
x_label = f"Log₁₀ {column}"
else:
title_suffix = f" ({column})"
x_label = column
# Create histogram
counts, bin_edges = np.histogram(freq_values, bins=bins)
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
bin_widths = bin_edges[1:] - bin_edges[:-1]
return {
'counts': counts,
'bin_edges': bin_edges,
'bin_centers': bin_centers,
'bin_widths': bin_widths,
'freq_values': freq_values,
'original_column': column,
'log_transform': log_transform,
'title_suffix': title_suffix,
'x_label': x_label,
'total_words': len(self.data)
}
def sample_words_per_bin(self, histogram_data: Dict, samples_per_bin: int = 5) -> Dict[int, List[Dict]]:
"""Legacy word sampling method for backward compatibility."""
if self.data is None:
raise ValueError("No data loaded")
bin_edges = histogram_data['bin_edges']
freq_values = histogram_data['freq_values']
original_column = histogram_data['original_column']
sampled_words = {}
for i in range(len(bin_edges) - 1):
bin_start = bin_edges[i]
bin_end = bin_edges[i + 1]
# Find words in this bin
if i == len(bin_edges) - 2: # Last bin, include right edge
mask = (freq_values >= bin_start) & (freq_values <= bin_end)
else:
mask = (freq_values >= bin_start) & (freq_values < bin_end)
bin_words = self.data[mask]
if len(bin_words) > 0:
# Sample words (up to samples_per_bin)
n_samples = min(samples_per_bin, len(bin_words))
sampled = bin_words.sample(n=n_samples, random_state=42)
# Create word info list
word_list = []
for _, word_row in sampled.iterrows():
word_info = {
'word': word_row['Type'],
'freq': word_row[original_column],
'rank': word_row.get('Rank', 'N/A'),
'original_freq': word_row['Freq'] if original_column != 'Freq' else word_row['Freq']
}
word_list.append(word_info)
sampled_words[i] = word_list
else:
sampled_words[i] = []
return sampled_words
def create_rank_based_visualization(self, column: str = 'Freq', bin_size: int = 500, log_transform: bool = False) -> Dict:
"""Legacy rank-based visualization method for backward compatibility."""
return self.create_rank_based_visualization_flexible(column, bin_size, log_transform)
def calculate_statistics(self, column: str = 'Freq') -> Dict:
"""Calculate descriptive statistics for the frequency data."""
if self.data is None:
raise ValueError("No data loaded")
if column not in self.data.columns:
raise ValueError(f"Column '{column}' not found in data")
freq_values = self.data[column]
stats = {
'count': len(freq_values),
'mean': float(freq_values.mean()),
'median': float(freq_values.median()),
'std': float(freq_values.std()),
'min': float(freq_values.min()),
'max': float(freq_values.max()),
'q25': float(freq_values.quantile(0.25)),
'q75': float(freq_values.quantile(0.75)),
'skewness': float(freq_values.skew()),
'column_name': column
}
# Add some additional insights
stats['range'] = stats['max'] - stats['min']
stats['iqr'] = stats['q75'] - stats['q25']
stats['cv'] = stats['std'] / stats['mean'] if stats['mean'] != 0 else 0
return stats
def get_top_words(self, column: str = 'Freq', n: int = 10) -> List[Dict]:
"""Get the top N words by frequency."""
if self.data is None:
raise ValueError("No data loaded")
if column not in self.data.columns:
raise ValueError(f"Column '{column}' not found in data")
top_words = self.data.nlargest(n, column)
result = []
for _, row in top_words.iterrows():
word_info = {
'word': row['Type'],
'freq': row[column],
'rank': row.get('Rank', 'N/A'),
'original_freq': row['Freq']
}
result.append(word_info)
return result