simple-text-analyzer / text_analyzer /lexical_sophistication.py
egumasa's picture
bug fix
385ead1
"""
Lexical sophistication analysis backend module.
Handles token processing, reference list matching, and score calculation.
"""
import spacy
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional, Union
from pathlib import Path
import csv
import logging
from collections import defaultdict
import re
from .base_analyzer import BaseAnalyzer
from .app_config import AppConfig
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LexicalSophisticationAnalyzer(BaseAnalyzer):
"""
Main class for lexical sophistication analysis.
Handles tokenization, n-gram generation, and score calculation.
"""
def __init__(self, language: str = None, model_size: str = None, gpu_device: Optional[int] = None):
"""
Initialize analyzer with specified language and model.
Args:
language (str): Language code ('en' for English, 'ja' for Japanese)
model_size (str): SpaCy model size ('md' or 'trf')
gpu_device (int, optional): GPU device ID to use (None for auto-detect, -1 for CPU only)
"""
super().__init__(language, model_size, gpu_device)
self.reference_lists = {}
def load_reference_lists(self, reference_files: Dict[str, Dict[str, Union[str, dict]]]):
"""
Load reference lists from files or dictionaries.
Args:
reference_files: Dict with structure {index_name: {file_type: file_path_or_dict}}
where file_type is 'token', 'lemma', 'bigram', or 'trigram'
and value can be a file path (str) or pre-loaded dictionary
"""
self.reference_lists = {}
for index_name, files in reference_files.items():
self.reference_lists[index_name] = {}
for file_type, file_path_or_dict in files.items():
try:
# Check if it's already a dictionary (pre-loaded data)
if isinstance(file_path_or_dict, dict):
# Check if it's a custom configuration
if file_path_or_dict.get('is_custom_config'):
processed_data = self._parse_custom_config(file_path_or_dict)
self.reference_lists[index_name][file_type] = processed_data
logger.info(f"Loaded custom configured {file_type} reference list for {index_name}")
continue
else:
self.reference_lists[index_name][file_type] = file_path_or_dict
logger.info(f"Loaded pre-loaded {file_type} reference list for {index_name}")
continue
# Check if it's a DataFrame (for n-grams) - convert to nested dict
if isinstance(file_path_or_dict, pd.DataFrame):
if file_type in ['bigram', 'trigram']:
# Convert DataFrame to nested dictionary for better performance
nested_dict = self._convert_dataframe_to_nested_dict(
file_path_or_dict, index_name, file_type
)
self.reference_lists[index_name][file_type] = nested_dict
logger.info(f"Converted pre-loaded {file_type} DataFrame to nested dict for {index_name}")
else:
self.reference_lists[index_name][file_type] = file_path_or_dict
logger.info(f"Loaded pre-loaded {file_type} DataFrame for {index_name}")
continue
# Otherwise, treat as file path
file_path = file_path_or_dict
# Determine delimiter
with open(file_path, 'r', encoding='utf-8') as f:
sample = f.read(1024)
delimiter = ',' if sample.count(',') > sample.count('\t') else '\t'
# Load the file
df = pd.read_csv(file_path, delimiter=delimiter, header=0,
quoting=csv.QUOTE_MINIMAL, quotechar='"')
if file_type in ['token', 'lemma']:
# Check if this is a custom frequency list format with specific columns
if self._is_custom_frequency_format(df):
processed_data = self._parse_custom_frequency_format(df)
# Convert to nested dict format for consistency
nested_dict = {}
for word, freq in processed_data.items():
nested_dict[word] = {'frequency': freq}
self.reference_lists[index_name][file_type] = nested_dict
# For standard unigram files, convert to nested dict format
elif df.shape[1] >= 2:
# Convert all columns to nested dictionary
nested_dict = self._convert_dataframe_to_nested_dict(df, index_name, file_type)
self.reference_lists[index_name][file_type] = nested_dict
else:
# For n-gram files, convert DataFrame to nested dictionary for better performance
nested_dict = self._convert_dataframe_to_nested_dict(df, index_name, file_type)
self.reference_lists[index_name][file_type] = nested_dict
logger.info(f"Loaded {file_type} reference list for {index_name}")
except Exception as e:
logger.error(f"Error loading {file_type} list for {index_name}: {e}")
continue
def _is_custom_frequency_format(self, df) -> bool:
"""
Check if the DataFrame matches the custom frequency list format.
Expected columns: Type, POS, Headword, Rank, Freq, Range, NormFreq, NormRange
"""
expected_columns = ['Type', 'POS', 'Headword', 'Rank', 'Freq', 'Range', 'NormFreq', 'NormRange']
# Check if all expected columns are present (case-insensitive)
df_columns_lower = [col.lower().strip() for col in df.columns]
expected_columns_lower = [col.lower() for col in expected_columns]
return all(col in df_columns_lower for col in expected_columns_lower)
def _convert_dataframe_to_nested_dict(self, df: pd.DataFrame, index_name: str, file_type: str) -> Dict[str, Dict[str, float]]:
"""
Convert DataFrame to nested dictionary structure for fast O(1) lookups.
Args:
df: Source DataFrame
index_name: Name of the reference index
file_type: Type of reference file ('token', 'lemma', 'bigram', 'trigram')
Returns:
{item_text: {measure_name: value, ...}}
"""
nested_dict = {}
if df.empty or len(df.columns) < 2:
logger.warning(f"Empty or invalid DataFrame for {index_name} {file_type}")
return nested_dict
# First column is always the text (word/n-gram)
text_col = df.columns[0]
# Get column configuration from YAML if available
try:
from web_app.config_manager import ConfigManager
config = ConfigManager.load_reference_config()
language_key = "english" if self.language == 'en' else "japanese"
# Find the config entry for this index
config_entry = None
if file_type in ['token', 'lemma']:
section_key = 'unigrams'
else:
section_key = f"{file_type}s" # bigrams/trigrams
if section_key in config.get(language_key, {}):
if index_name in config[language_key][section_key]:
config_entry = config[language_key][section_key][index_name]
# Create measure mapping
measure_mapping = {}
if config_entry and 'columns' in config_entry:
columns_config = config_entry.get('columns', {})
for measure_name, col_idx in columns_config.items():
if isinstance(col_idx, int) and col_idx < len(df.columns):
measure_mapping[measure_name] = df.columns[col_idx]
else:
# Fallback: use column names directly as measure names (skip first column)
for i, col_name in enumerate(df.columns[1:], 1):
measure_mapping[col_name] = col_name
except Exception as e:
logger.warning(f"Could not load YAML config for {index_name}, using fallback naming: {e}")
# Fallback: use column names directly as measure names (skip first column)
measure_mapping = {}
for i, col_name in enumerate(df.columns[1:], 1):
measure_mapping[col_name] = col_name
# Clean and convert data
df_clean = df.copy()
# Clean text column
df_clean[text_col] = df_clean[text_col].astype(str).str.strip().str.lower()
df_clean = df_clean[df_clean[text_col] != '']
df_clean = df_clean[df_clean[text_col] != 'nan']
# Clean numeric columns
for col in df_clean.columns[1:]:
df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')
# Remove rows with all NaN measures
df_clean = df_clean.dropna(subset=df_clean.columns[1:].tolist(), how='all')
# Convert to nested dictionary
for _, row in df_clean.iterrows():
text_key = row[text_col]
if pd.isna(text_key) or text_key == '':
continue
measures = {}
for measure_name, col_name in measure_mapping.items():
if col_name == text_col: # Skip the text column
continue
if col_name in row and not pd.isna(row[col_name]):
measures[measure_name] = float(row[col_name])
if measures: # Only add if we have at least one valid measure
nested_dict[text_key] = measures
logger.info(f"Converted {len(nested_dict)} entries from DataFrame to nested dict for {index_name} {file_type}")
return nested_dict
def _parse_custom_frequency_format(self, df):
"""
Parse custom frequency list format and return a dictionary mapping words to frequency scores.
Uses the 'Freq' column as the primary frequency score.
Handles empty columns and cleans data properly.
"""
# Create case-insensitive column mapping
column_mapping = {}
for col in df.columns:
column_mapping[col.lower().strip()] = col
# Get the actual column names - try both Type and Headword for word column
word_col = column_mapping.get('type') or column_mapping.get('headword')
freq_col = column_mapping.get('freq')
if not word_col or not freq_col:
logger.warning("Custom frequency format missing required columns")
return {}
# Clean the data
df_clean = df.copy()
# Clean word column - remove empty/whitespace entries
df_clean[word_col] = df_clean[word_col].astype(str).str.strip()
df_clean = df_clean[df_clean[word_col] != '']
df_clean = df_clean[df_clean[word_col] != 'nan']
# Clean and convert frequency values to numeric
df_clean[freq_col] = pd.to_numeric(df_clean[freq_col], errors='coerce')
# Remove rows with NaN frequencies or invalid headwords
df_clean = df_clean.dropna(subset=[freq_col])
df_clean = df_clean[df_clean[freq_col] > 0] # Only positive frequencies
# Create dictionary mapping words to frequencies
word_freq_dict = dict(zip(df_clean[word_col].str.lower(), df_clean[freq_col]))
logger.info(f"Parsed custom frequency format with {len(word_freq_dict)} entries (cleaned from {len(df)} rows)")
return word_freq_dict
def _parse_custom_config(self, config):
"""
Parse custom frequency list using user-selected columns.
Args:
config: Dictionary with file_name/content, word_column, freq_column, delimiter
"""
word_column = config['word_column']
freq_column = config['freq_column']
delimiter = config['delimiter']
try:
from io import StringIO
# Check if we have content directly or need to read from file
if 'content' in config:
# Use content directly
content_io = StringIO(config['content'])
df = pd.read_csv(content_io, delimiter=delimiter, header=0,
quoting=csv.QUOTE_MINIMAL, quotechar='"')
elif 'file_path' in config:
# Fallback to file path for backward compatibility
df = pd.read_csv(config['file_path'], delimiter=delimiter, header=0,
quoting=csv.QUOTE_MINIMAL, quotechar='"')
else:
logger.error("No content or file_path found in config")
return {}
# Validate columns exist
if word_column not in df.columns:
logger.error(f"Word column '{word_column}' not found in file")
return {}
if freq_column not in df.columns:
logger.error(f"Frequency column '{freq_column}' not found in file")
return {}
# Clean the data
df_clean = df.copy()
# Clean word column - remove empty/whitespace entries
df_clean[word_column] = df_clean[word_column].astype(str).str.strip()
df_clean = df_clean[df_clean[word_column] != '']
df_clean = df_clean[df_clean[word_column] != 'nan']
# Clean and convert frequency values to numeric
df_clean[freq_column] = pd.to_numeric(df_clean[freq_column], errors='coerce')
# Remove rows with NaN frequencies or invalid words
df_clean = df_clean.dropna(subset=[freq_column])
df_clean = df_clean[df_clean[freq_column] > 0] # Only positive frequencies
# Create dictionary mapping words to frequencies
word_freq_dict = dict(zip(df_clean[word_column].str.lower(), df_clean[freq_column]))
logger.info(f"Parsed custom config with {len(word_freq_dict)} entries using columns '{word_column}' → '{freq_column}'")
return word_freq_dict
except Exception as e:
logger.error(f"Error parsing custom config: {e}")
return {}
def _generate_ngrams(self, tokens: List, n: int, sep: str = " ") -> List[str]:
"""
Generate n-grams from token list, respecting sentence boundaries.
Args:
tokens: List of SpaCy tokens
n: N-gram size (2 for bigrams, 3 for trigrams)
Returns:
List of n-grams as underscore-separated strings
"""
ngrams = []
sentence_starts = [0]
# Find sentence boundaries
for i, token in enumerate(tokens):
if token.is_sent_start and i > 0:
sentence_starts.append(i)
sentence_starts.append(len(tokens))
# Generate n-grams within sentences
for i in range(len(sentence_starts) - 1):
start = sentence_starts[i]
end = sentence_starts[i + 1]
sentence_tokens = tokens[start:end]
# Filter out punctuation
clean_tokens = [t for t in sentence_tokens if not t.is_punct and not t.is_space]
for j in range(len(clean_tokens) - n + 1):
ngram_tokens = clean_tokens[j:j + n]
ngram = sep.join([t.text.lower() for t in ngram_tokens])
ngrams.append(ngram)
return ngrams
def _lookup_score(self, word: str, index_name: str, file_type: str,
measure_col: Optional[str] = None) -> Optional[float]:
"""
Unified lookup for both words and N-grams using nested dictionaries for O(1) performance.
Args:
word: Word/N-gram to look up
index_name: Name of the reference index
file_type: Type of reference file ('token', 'lemma', 'bigram', 'trigram')
measure_col: Specific measure to retrieve (e.g., 'frequency', 'mi_score')
Returns:
Score if found, None otherwise
"""
if index_name not in self.reference_lists:
return None
ref_data = self.reference_lists[index_name].get(file_type)
if ref_data is None:
return None
# Handle Japanese corpus data (special case)
if isinstance(ref_data, dict) and ref_data.get('is_japanese_corpus', False):
# This should not be called directly for Japanese data
# Use _lookup_japanese_score instead
return None
# Handle legacy simple dictionaries (old unigram format: word -> single_score)
if isinstance(ref_data, dict) and not any(isinstance(v, dict) for v in ref_data.values()):
# Legacy simple dictionary format
return ref_data.get(word.lower())
# Handle nested dictionary format (new unified format: word -> {measure: value, ...})
if isinstance(ref_data, dict):
word_data = ref_data.get(word.lower())
if word_data is None or not isinstance(word_data, dict):
return None
# If measure specified, return that specific measure
if measure_col:
return word_data.get(measure_col)
else:
# Return first available measure for backward compatibility
return next(iter(word_data.values())) if word_data else None
# Fallback to DataFrame lookup (for compatibility during transition)
if isinstance(ref_data, pd.DataFrame):
# Find matching row
word_col = ref_data.columns[0]
matching_rows = ref_data[ref_data[word_col].str.lower() == word.lower()]
if matching_rows.empty:
return None
if measure_col and measure_col in ref_data.columns:
value = matching_rows[measure_col].iloc[0]
# Handle non-numeric values
try:
return float(value)
except (ValueError, TypeError):
return None
else:
# Default to second column if no specific measure specified
if len(ref_data.columns) > 1:
value = matching_rows.iloc[0, 1]
try:
return float(value)
except (ValueError, TypeError):
return None
return None
return None
def _lookup_with_unidic_fallback(self, token, index_name: str, file_type: str) -> Dict:
"""
Enhanced Japanese lookup with UniDic 3-level fallback using corpus-compatible keys.
Args:
token: SpaCy token object with UniDic extensions
index_name: Name of the reference index
file_type: Type of reference file ('token', 'lemma')
Returns:
Dictionary with score, method, key, and diagnostic information
"""
# Initialize diagnostic tracking
attempted_keys = []
diagnostic_info = {
'attempted_keys': attempted_keys,
'unidic_features': {},
'alignment_confidence': getattr(token._, 'alignment_confidence', 0.0),
'spacy_fallback_used': False,
'no_match': False
}
# Get UniDic features from token extensions
unidic_features = {
'lemma': getattr(token._, 'unidic_lemma', '') or '',
'lForm': getattr(token._, 'unidic_lform', '') or '',
'pos1': getattr(token._, 'unidic_pos1', '') or '',
'pos2': getattr(token._, 'unidic_pos2', '') or '',
'pos3': getattr(token._, 'unidic_pos3', '') or '',
'goshu': getattr(token._, 'unidic_goshu', '') or ''
}
diagnostic_info['unidic_features'] = unidic_features
# Only proceed with UniDic matching if we have good alignment and features
if diagnostic_info['alignment_confidence'] > 0.5 and any(unidic_features.values()):
# Try corpus-compatible keys using the hierarchical lookup dictionaries
# Level 1: {lemma}_{lForm}_{pos1}_{pos2}_{pos3} (when pos3 exists)
if all([unidic_features['lemma'], unidic_features['lForm'],
unidic_features['pos1'], unidic_features['pos2'], unidic_features['pos3']]):
level1_key = f"{unidic_features['lemma']}_{unidic_features['lForm']}_{unidic_features['pos1']}_{unidic_features['pos2']}_{unidic_features['pos3']}"
attempted_keys.append(level1_key)
score = self._lookup_japanese_corpus_level(level1_key, index_name, file_type, 'level1_dict')
if score is not None:
return {
'score': score,
'match_method': 'unidic_corpus_level_1',
'match_key': level1_key,
'diagnostic_info': diagnostic_info
}
# Level 2: {lemma}_{lForm}_{pos1}_{pos2}
if all([unidic_features['lemma'], unidic_features['lForm'],
unidic_features['pos1'], unidic_features['pos2']]):
level2_key = f"{unidic_features['lemma']}_{unidic_features['lForm']}_{unidic_features['pos1']}_{unidic_features['pos2']}"
attempted_keys.append(level2_key)
score = self._lookup_japanese_corpus_level(level2_key, index_name, file_type, 'level2_dict')
if score is not None:
return {
'score': score,
'match_method': 'unidic_corpus_level_2',
'match_key': level2_key,
'diagnostic_info': diagnostic_info
}
# Level 3: {lemma}_{lForm}_{pos1}
if all([unidic_features['lemma'], unidic_features['lForm'], unidic_features['pos1']]):
level3_key = f"{unidic_features['lemma']}_{unidic_features['lForm']}_{unidic_features['pos1']}"
attempted_keys.append(level3_key)
score = self._lookup_japanese_corpus_level(level3_key, index_name, file_type, 'level3_dict')
if score is not None:
return {
'score': score,
'match_method': 'unidic_corpus_level_3',
'match_key': level3_key,
'diagnostic_info': diagnostic_info
}
# Fallback to legacy spaCy-based matching
diagnostic_info['spacy_fallback_used'] = True
legacy_score = self._lookup_japanese_score(token, index_name, file_type, fallback=True)
if legacy_score is not None:
legacy_key = f"{token.lemma_}_{token.tag_}"
attempted_keys.append(f"legacy: {legacy_key}")
return {
'score': legacy_score,
'match_method': 'legacy_spacy',
'match_key': legacy_key,
'diagnostic_info': diagnostic_info
}
# No match found
diagnostic_info['no_match'] = True
return {
'score': None,
'match_method': 'none',
'match_key': None,
'diagnostic_info': diagnostic_info
}
def _lookup_japanese_corpus_level(self, key: str, index_name: str, file_type: str, level_dict_name: str) -> Optional[float]:
"""
Look up score in a specific level dictionary of Japanese corpus data.
Args:
key: Composite key to look up
index_name: Name of the reference index
file_type: Type of reference file ('token', 'lemma')
level_dict_name: Name of the level dictionary ('level1_dict', 'level2_dict', 'level3_dict')
Returns:
Score if found, None otherwise
"""
if index_name not in self.reference_lists:
return None
ref_data = self.reference_lists[index_name].get(file_type)
if ref_data is None or not isinstance(ref_data, dict):
return None
if not ref_data.get('is_japanese_corpus', False):
return None
level_dict = ref_data.get(level_dict_name, {})
return level_dict.get(key)
def _lookup_japanese_score(self, token, index_name: str, file_type: str, fallback: bool = False) -> Optional[float]:
"""
Look up score for a Japanese word using composite key approach.
Args:
token: SpaCy token object
index_name: Name of the reference index
file_type: Type of reference file ('token', 'lemma')
fallback: Whether to use fallback search strategies
Returns:
Score if found, None otherwise
"""
if index_name not in self.reference_lists:
return None
ref_data = self.reference_lists[index_name].get(file_type)
if ref_data is None or not isinstance(ref_data, dict):
return None
if not ref_data.get('is_japanese_corpus', False):
return None
# Try composite key first (lemma_pos)
composite_key = f"{token.lemma_}_{token.tag_}"
score = ref_data.get('composite_dict', {}).get(composite_key)
if score is None and fallback:
# Fallback to lemma only
score = ref_data.get('lemma_dict', {}).get(token.lemma_.lower())
if score is None and fallback:
# Final fallback to surface form
score = ref_data.get('surface_dict', {}).get(token.text.lower())
return score
def _should_apply_log_transform(self, index_name: str, analysis_type: str,
measure_name: str, log_transforms: Optional[Dict[str, List[str]]],
apply_log_fallback: bool) -> bool:
"""
Determine if a specific measure should be log-transformed.
Args:
index_name: Name of the reference index
analysis_type: 'token' or 'lemma'
measure_name: Name of the measure (e.g., 'frequency', 'MI')
log_transforms: Dict mapping index names to lists of measures to log-transform
apply_log_fallback: Legacy fallback boolean
Returns:
True if the measure should be log-transformed, False otherwise
"""
# If new log_transforms parameter is provided, use it
if log_transforms is not None:
index_transforms = log_transforms.get(index_name, [])
return measure_name in index_transforms
# Fallback to legacy apply_log behavior for backward compatibility
return apply_log_fallback
def _should_compute_measure(self, index_name: str, measure_name: str,
selected_measures: Optional[Dict[str, List[str]]]) -> bool:
"""
Determine if a specific measure should be computed.
Args:
index_name: Name of the reference index
measure_name: Name of the measure (e.g., 'frequency', 'MI')
selected_measures: Dict mapping index names to lists of measures to compute
Returns:
True if the measure should be computed, False otherwise
"""
# If selected_measures is provided, use it for filtering
if selected_measures is not None:
index_measures = selected_measures.get(index_name, [])
return measure_name in index_measures
# If not specified, compute all measures (backward compatibility)
return True
def analyze_text(self, text: str, selected_indices: List[str],
apply_log: bool = False, word_type_filter: Optional[str] = None,
log_transforms: Optional[Dict[str, List[str]]] = None,
selected_measures: Optional[Dict[str, List[str]]] = None,
separate_word_types: bool = False) -> Dict:
"""
Analyze text and return lexical sophistication scores.
Args:
text: Input text to analyze
selected_indices: List of reference indices to apply
apply_log: Whether to apply log10 transformation (legacy parameter, superseded by log_transforms)
word_type_filter: Filter by word type ('CW', 'FW', or None for all)
log_transforms: Dict mapping index names to list of measures that should be log-transformed
e.g., {'COCA_spoken_frequency_token': ['frequency', 'normalized_freq']}
If None, falls back to apply_log behavior for backward compatibility
selected_measures: Dict mapping index names to list of measures to compute
e.g., {'COCA_spoken_frequency_token': ['frequency', 'range']}
If None, computes all available measures for backward compatibility
separate_word_types: If True, process CW and FW separately in the same analysis call
Returns:
Dictionary containing analysis results
"""
# Process text using base class
doc = self.process_document(text)
tokens = self.filter_tokens(doc, exclude_punct=True, exclude_space=True)
# Generate n-grams
bigrams = self._generate_ngrams(tokens, 2)
trigrams = self._generate_ngrams(tokens, 3)
# Prepare results structure
results = {
'summary': {},
'token_details': [],
'bigram_details': [],
'trigram_details': [],
'text_stats': {
'total_tokens': len(tokens),
'unique_tokens': len(set(t.text.lower() for t in tokens)),
'content_words': len([t for t in tokens if self._classify_pos(t) == 'CW']),
'function_words': len([t for t in tokens if self._classify_pos(t) == 'FW'])
},
'raw_scores': {}, # Raw scores for plotting
'tokens': tokens, # Raw spaCy tokens for advanced analysis
'doc': doc # Full spaCy doc for complex operations
}
# Initialize score collections
all_scores = defaultdict(list)
# Process each token
for i, token in enumerate(tokens):
word_type = self._classify_pos(token)
# Skip if filtering by word type
if word_type_filter and word_type != word_type_filter:
continue
# Work directly with spaCy token - include syntactic information
token_detail = {
'id': i + 1,
'token': token.text,
'lemma': token.lemma_,
'pos': token.pos_,
'tag': token.tag_,
'dep_': token.dep_, # Add dependency relation
'head_text': token.head.text, # Add head word
'head_pos': token.head.pos_, # Add head POS
'word_type': word_type
}
# Look up scores for each selected index
for index_name in selected_indices:
# Extract base name and determine analysis type to avoid duplicate suffixes
if index_name.endswith('_token'):
base_name = index_name[:-6] # Remove '_token'
analysis_type = 'token'
elif index_name.endswith('_lemma'):
base_name = index_name[:-6] # Remove '_lemma'
analysis_type = 'lemma'
else:
# Fallback for entries without clear suffix
base_name = index_name
analysis_type = 'token' # Default to token
# Check if this is a Japanese corpus reference list
ref_data = self.reference_lists.get(index_name, {})
is_japanese_corpus = False
for file_type in ['token', 'lemma']:
data = ref_data.get(file_type, {})
if isinstance(data, dict) and data.get('is_japanese_corpus', False):
is_japanese_corpus = True
break
if is_japanese_corpus and self.language == 'ja':
# Use enhanced UniDic lookup with 3-level fallback and diagnostics
if analysis_type == 'token':
result = self._lookup_with_unidic_fallback(token, index_name, 'token')
score = result['score']
# Store enhanced details with clean column name
token_detail[index_name] = score if score is not None else None
token_detail[f"{index_name}_match_method"] = result['match_method']
token_detail[f"{index_name}_match_key"] = result['match_key'] or None
# Store UniDic features for display (only once per token)
if hasattr(token, '_') and hasattr(token._, 'unidic_lemma') and 'unidic_features' not in token_detail:
token_detail['unidic_features'] = {
'lemma': getattr(token._, 'unidic_lemma', ''),
'lForm': getattr(token._, 'unidic_lform', ''),
'pos1': getattr(token._, 'unidic_pos1', ''),
'pos2': getattr(token._, 'unidic_pos2', ''),
'goshu': getattr(token._, 'unidic_goshu', ''),
'alignment_confidence': getattr(token._, 'alignment_confidence', 0.0)
}
else: # lemma analysis
result = self._lookup_with_unidic_fallback(token, index_name, 'lemma')
score = result['score']
# Store enhanced details with clean column name
token_detail[index_name] = score if score is not None else None
token_detail[f"{index_name}_match_method"] = result['match_method']
token_detail[f"{index_name}_match_key"] = result['match_key'] or None
elif is_japanese_corpus:
# Fallback to legacy Japanese lookup if UniDic not available
if analysis_type == 'token':
score = self._lookup_japanese_score(token, index_name, 'token', fallback=True)
# Apply log transformation if needed before storing
if score is not None:
should_log_transform = self._should_apply_log_transform(
index_name, analysis_type, 'frequency', log_transforms, apply_log
)
final_score = np.log10(score) if should_log_transform and score > 0 else score
else:
final_score = None
token_detail[index_name] = final_score
token_detail[f"{index_name}_match_method"] = "legacy_spacy"
else: # lemma analysis
score = self._lookup_japanese_score(token, index_name, 'lemma', fallback=True)
# Apply log transformation if needed before storing
if score is not None:
should_log_transform = self._should_apply_log_transform(
index_name, analysis_type, 'frequency', log_transforms, apply_log
)
final_score = np.log10(score) if should_log_transform and score > 0 else score
else:
final_score = None
token_detail[index_name] = final_score
token_detail[f"{index_name}_match_method"] = "legacy_spacy"
else:
# Standard lookup for non-Japanese data
if analysis_type == 'token':
score = self._lookup_score(token.text, index_name, 'token')
else: # lemma analysis
score = self._lookup_score(token.lemma_, index_name, 'lemma')
# Apply log transformation if needed before storing
if score is not None:
should_log_transform = self._should_apply_log_transform(
index_name, analysis_type, 'frequency', log_transforms, apply_log
)
final_score = np.log10(score) if should_log_transform and score > 0 else score
else:
final_score = None
# Store score with clean column name and transformed value
token_detail[index_name] = final_score
# Collect for summary statistics (score is already transformed if needed)
score = token_detail.get(index_name)
if score is not None:
# Handle different collection methods based on parameters
if separate_word_types or word_type_filter:
# Include word type in the key
all_scores[f"{index_name}_{word_type}"].append(score)
else:
# No word type suffix for unfiltered analysis
all_scores[index_name].append(score)
results['token_details'].append(token_detail)
# Calculate summary statistics
for score_key, scores in all_scores.items():
if scores:
results['summary'][score_key] = {
'mean': np.mean(scores),
'std': np.std(scores),
'count': len(scores),
'min': np.min(scores),
'max': np.max(scores)
}
# Store raw scores for plotting
results['raw_scores'][score_key] = scores
# Process n-grams if available
for ngram_type, ngrams in [('bigram', bigrams), ('trigram', trigrams)]:
if not ngrams:
continue
# Store n-gram details
ngram_details_key = f'{ngram_type}_details'
ngram_counter = {}
# Count occurrences of each n-gram
for ngram in ngrams:
ngram_counter[ngram] = ngram_counter.get(ngram, 0) + 1
# Process unique n-grams for details
for i, (ngram, count) in enumerate(ngram_counter.items()):
ngram_detail = {
'id': i + 1,
ngram_type: ngram,
'frequency': count
}
# Look up scores for each index
for index_name in selected_indices:
if index_name not in self.reference_lists:
continue
ref_data = self.reference_lists[index_name].get(ngram_type)
if ref_data is None:
continue
# Skip if using old DataFrame format (should be converted by now)
if isinstance(ref_data, pd.DataFrame):
logger.warning(f"Found unconverted DataFrame for {index_name} {ngram_type}, skipping")
continue
# Ensure we have the new nested dictionary format
if not isinstance(ref_data, dict):
continue
# Get available measures from any N-gram entry
sample_ngram_data = next(iter(ref_data.values())) if ref_data else {}
if not isinstance(sample_ngram_data, dict):
continue
available_measures = list(sample_ngram_data.keys())
# Process each available measure
for measure_name in available_measures:
# Check if this measure should be computed
if not self._should_compute_measure(index_name, measure_name, selected_measures):
continue
# Use the unified lookup method for O(1) performance
score = self._lookup_score(ngram, index_name, ngram_type, measure_name)
if score is not None:
# Check if this measure should be log-transformed
should_log_transform = self._should_apply_log_transform(
index_name, ngram_type, measure_name, log_transforms, apply_log
)
score_val = np.log10(score) if should_log_transform and score > 0 else score
ngram_detail[f"{index_name}_{measure_name}"] = score_val
else:
ngram_detail[f"{index_name}_{measure_name}"] = None
# Get columns config for proper measure naming from YAML config
# We need to access the original YAML configuration to get proper measure names
from web_app.config_manager import ConfigManager
config = ConfigManager.load_reference_config()
language_key = "english" if self.language == 'en' else "japanese"
# Find the config entry for this index
config_entry = None
for config_section in [f"{ngram_type}s"]: # bigrams/trigrams sections
if config_section in config.get(language_key, {}):
if index_name in config[language_key][config_section]:
config_entry = config[language_key][config_section][index_name]
break
# Note: With nested dictionary format, we already processed all measures above
# No additional processing needed here since measures are extracted directly from the dictionary
results[ngram_details_key].append(ngram_detail)
# Also process for summary statistics
for index_name in selected_indices:
if index_name not in self.reference_lists:
continue
ref_data = self.reference_lists[index_name].get(ngram_type)
if ref_data is None:
continue
# Skip if using old DataFrame format (should be converted by now)
if isinstance(ref_data, pd.DataFrame):
logger.warning(f"Found unconverted DataFrame for {index_name} {ngram_type} in summary, skipping")
continue
# Ensure we have the new nested dictionary format
if not isinstance(ref_data, dict):
continue
# Get available measures from any N-gram entry
sample_ngram_data = next(iter(ref_data.values())) if ref_data else {}
if not isinstance(sample_ngram_data, dict):
continue
available_measures = list(sample_ngram_data.keys())
# Process each available measure for summary statistics
for measure_name in available_measures:
# Check if this measure should be computed
if not self._should_compute_measure(index_name, measure_name, selected_measures):
continue
ngram_scores = []
for ngram in ngrams:
score = self._lookup_score(ngram, index_name, ngram_type, measure_name)
if score is not None:
# Check if this measure should be log-transformed
should_log_transform = self._should_apply_log_transform(
index_name, ngram_type, measure_name, log_transforms, apply_log
)
score_val = np.log10(score) if should_log_transform and score > 0 else score
ngram_scores.append(score_val)
if ngram_scores:
key = f"{index_name}_{ngram_type}_{measure_name}"
results['summary'][key] = {
'mean': np.mean(ngram_scores),
'std': np.std(ngram_scores),
'count': len(ngram_scores),
'min': np.min(ngram_scores),
'max': np.max(ngram_scores)
}
# Store raw scores for plotting
results['raw_scores'][key] = ngram_scores
# Get columns config for proper measure naming from YAML config
# We need to access the original YAML configuration to get proper measure names
from web_app.config_manager import ConfigManager
config = ConfigManager.load_reference_config()
language_key = "english" if self.language == 'en' else "japanese"
# Find the config entry for this index
config_entry = None
for config_section in [f"{ngram_type}s"]: # bigrams/trigrams sections
if config_section in config.get(language_key, {}):
if index_name in config[language_key][config_section]:
config_entry = config[language_key][config_section][index_name]
break
# Note: With nested dictionary format, summary statistics are already processed above
# No additional processing needed here since measures are extracted directly from the dictionary
return results
def analyze_batch_memory(self, file_contents: List[Tuple[str, str]], selected_indices: List[str],
apply_log: bool = False, word_type_filter: Optional[str] = None,
log_transforms: Optional[Dict[str, List[str]]] = None,
selected_measures: Optional[Dict[str, List[str]]] = None,
progress_callback=None) -> pd.DataFrame:
"""
Analyze multiple text files from memory and return aggregated results.
Optimized version that processes both CW and FW in a single pass.
Args:
file_contents: List of (filename, text_content) tuples
selected_indices: List of reference indices to apply
apply_log: Whether to apply log10 transformation (legacy parameter, superseded by log_transforms)
word_type_filter: Filter by word type ('CW', 'FW', or None for all)
log_transforms: Dict mapping index names to list of measures that should be log-transformed
selected_measures: Dict mapping index names to list of measures to compute
progress_callback: Optional callback for progress updates
Returns:
DataFrame with aggregated results
"""
batch_results = []
for i, (filename, text_content) in enumerate(file_contents):
try:
result_row = {'filename': filename}
if word_type_filter:
# Analyze only for specific word type
analysis = self.analyze_text(
text_content,
selected_indices,
apply_log=apply_log,
word_type_filter=word_type_filter,
log_transforms=log_transforms,
selected_measures=selected_measures
)
# Extract summary scores
for key, stats in analysis['summary'].items():
result_row[key] = stats['mean']
else:
# Single optimized analysis call that processes both CW and FW
analysis = self.analyze_text(
text_content,
selected_indices,
apply_log=apply_log,
word_type_filter=None,
log_transforms=log_transforms,
selected_measures=selected_measures,
separate_word_types=True # Process CW/FW separately in same pass
)
# Extract all summary scores including CW, FW, and n-grams
for key, stats in analysis['summary'].items():
result_row[key] = stats['mean']
batch_results.append(result_row)
if progress_callback:
progress_callback(i + 1, len(file_contents))
except Exception as e:
logger.error(f"Error processing file {filename}: {e}")
# Add error row
error_row = {'filename': filename, 'error': str(e)}
batch_results.append(error_row)
if progress_callback:
progress_callback(i + 1, len(file_contents))
return pd.DataFrame(batch_results)
def analyze_batch(self, file_paths: List[str], selected_indices: List[str],
apply_log: bool = False, progress_callback=None) -> pd.DataFrame:
"""
Legacy batch analysis method for backward compatibility.
Analyze multiple text files and return aggregated results.
Args:
file_paths: List of paths to text files
selected_indices: List of reference indices to apply
apply_log: Whether to apply log10 transformation
progress_callback: Optional callback for progress updates
Returns:
DataFrame with aggregated results
"""
batch_results = []
for i, file_path in enumerate(file_paths):
try:
# Read file
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
# Use optimized single-pass analysis
result_row = {'filename': Path(file_path).name}
# Single optimized analysis call that processes both CW and FW
analysis = self.analyze_text(
text,
selected_indices,
apply_log=apply_log,
word_type_filter=None,
separate_word_types=True # Process CW/FW separately in same pass
)
# Extract all summary scores
for key, stats in analysis['summary'].items():
result_row[key] = stats['mean']
batch_results.append(result_row)
if progress_callback:
progress_callback(i + 1, len(file_paths))
except Exception as e:
logger.error(f"Error processing file {file_path}: {e}")
# Add error row
error_row = {'filename': Path(file_path).name, 'error': str(e)}
batch_results.append(error_row)
if progress_callback:
progress_callback(i + 1, len(file_paths))
return pd.DataFrame(batch_results)