Spaces:
Building
Building
| """ | |
| Lexical sophistication analysis backend module. | |
| Handles token processing, reference list matching, and score calculation. | |
| """ | |
| import spacy | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Tuple, Optional, Union | |
| from pathlib import Path | |
| import csv | |
| import logging | |
| from collections import defaultdict | |
| import re | |
| from .base_analyzer import BaseAnalyzer | |
| from .app_config import AppConfig | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class LexicalSophisticationAnalyzer(BaseAnalyzer): | |
| """ | |
| Main class for lexical sophistication analysis. | |
| Handles tokenization, n-gram generation, and score calculation. | |
| """ | |
| def __init__(self, language: str = None, model_size: str = None, gpu_device: Optional[int] = None): | |
| """ | |
| Initialize analyzer with specified language and model. | |
| Args: | |
| language (str): Language code ('en' for English, 'ja' for Japanese) | |
| model_size (str): SpaCy model size ('md' or 'trf') | |
| gpu_device (int, optional): GPU device ID to use (None for auto-detect, -1 for CPU only) | |
| """ | |
| super().__init__(language, model_size, gpu_device) | |
| self.reference_lists = {} | |
| def load_reference_lists(self, reference_files: Dict[str, Dict[str, Union[str, dict]]]): | |
| """ | |
| Load reference lists from files or dictionaries. | |
| Args: | |
| reference_files: Dict with structure {index_name: {file_type: file_path_or_dict}} | |
| where file_type is 'token', 'lemma', 'bigram', or 'trigram' | |
| and value can be a file path (str) or pre-loaded dictionary | |
| """ | |
| self.reference_lists = {} | |
| for index_name, files in reference_files.items(): | |
| self.reference_lists[index_name] = {} | |
| for file_type, file_path_or_dict in files.items(): | |
| try: | |
| # Check if it's already a dictionary (pre-loaded data) | |
| if isinstance(file_path_or_dict, dict): | |
| # Check if it's a custom configuration | |
| if file_path_or_dict.get('is_custom_config'): | |
| processed_data = self._parse_custom_config(file_path_or_dict) | |
| self.reference_lists[index_name][file_type] = processed_data | |
| logger.info(f"Loaded custom configured {file_type} reference list for {index_name}") | |
| continue | |
| else: | |
| self.reference_lists[index_name][file_type] = file_path_or_dict | |
| logger.info(f"Loaded pre-loaded {file_type} reference list for {index_name}") | |
| continue | |
| # Check if it's a DataFrame (for n-grams) - convert to nested dict | |
| if isinstance(file_path_or_dict, pd.DataFrame): | |
| if file_type in ['bigram', 'trigram']: | |
| # Convert DataFrame to nested dictionary for better performance | |
| nested_dict = self._convert_dataframe_to_nested_dict( | |
| file_path_or_dict, index_name, file_type | |
| ) | |
| self.reference_lists[index_name][file_type] = nested_dict | |
| logger.info(f"Converted pre-loaded {file_type} DataFrame to nested dict for {index_name}") | |
| else: | |
| self.reference_lists[index_name][file_type] = file_path_or_dict | |
| logger.info(f"Loaded pre-loaded {file_type} DataFrame for {index_name}") | |
| continue | |
| # Otherwise, treat as file path | |
| file_path = file_path_or_dict | |
| # Determine delimiter | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| sample = f.read(1024) | |
| delimiter = ',' if sample.count(',') > sample.count('\t') else '\t' | |
| # Load the file | |
| df = pd.read_csv(file_path, delimiter=delimiter, header=0, | |
| quoting=csv.QUOTE_MINIMAL, quotechar='"') | |
| if file_type in ['token', 'lemma']: | |
| # Check if this is a custom frequency list format with specific columns | |
| if self._is_custom_frequency_format(df): | |
| processed_data = self._parse_custom_frequency_format(df) | |
| # Convert to nested dict format for consistency | |
| nested_dict = {} | |
| for word, freq in processed_data.items(): | |
| nested_dict[word] = {'frequency': freq} | |
| self.reference_lists[index_name][file_type] = nested_dict | |
| # For standard unigram files, convert to nested dict format | |
| elif df.shape[1] >= 2: | |
| # Convert all columns to nested dictionary | |
| nested_dict = self._convert_dataframe_to_nested_dict(df, index_name, file_type) | |
| self.reference_lists[index_name][file_type] = nested_dict | |
| else: | |
| # For n-gram files, convert DataFrame to nested dictionary for better performance | |
| nested_dict = self._convert_dataframe_to_nested_dict(df, index_name, file_type) | |
| self.reference_lists[index_name][file_type] = nested_dict | |
| logger.info(f"Loaded {file_type} reference list for {index_name}") | |
| except Exception as e: | |
| logger.error(f"Error loading {file_type} list for {index_name}: {e}") | |
| continue | |
| def _is_custom_frequency_format(self, df) -> bool: | |
| """ | |
| Check if the DataFrame matches the custom frequency list format. | |
| Expected columns: Type, POS, Headword, Rank, Freq, Range, NormFreq, NormRange | |
| """ | |
| expected_columns = ['Type', 'POS', 'Headword', 'Rank', 'Freq', 'Range', 'NormFreq', 'NormRange'] | |
| # Check if all expected columns are present (case-insensitive) | |
| df_columns_lower = [col.lower().strip() for col in df.columns] | |
| expected_columns_lower = [col.lower() for col in expected_columns] | |
| return all(col in df_columns_lower for col in expected_columns_lower) | |
| def _convert_dataframe_to_nested_dict(self, df: pd.DataFrame, index_name: str, file_type: str) -> Dict[str, Dict[str, float]]: | |
| """ | |
| Convert DataFrame to nested dictionary structure for fast O(1) lookups. | |
| Args: | |
| df: Source DataFrame | |
| index_name: Name of the reference index | |
| file_type: Type of reference file ('token', 'lemma', 'bigram', 'trigram') | |
| Returns: | |
| {item_text: {measure_name: value, ...}} | |
| """ | |
| nested_dict = {} | |
| if df.empty or len(df.columns) < 2: | |
| logger.warning(f"Empty or invalid DataFrame for {index_name} {file_type}") | |
| return nested_dict | |
| # First column is always the text (word/n-gram) | |
| text_col = df.columns[0] | |
| # Get column configuration from YAML if available | |
| try: | |
| from web_app.config_manager import ConfigManager | |
| config = ConfigManager.load_reference_config() | |
| language_key = "english" if self.language == 'en' else "japanese" | |
| # Find the config entry for this index | |
| config_entry = None | |
| if file_type in ['token', 'lemma']: | |
| section_key = 'unigrams' | |
| else: | |
| section_key = f"{file_type}s" # bigrams/trigrams | |
| if section_key in config.get(language_key, {}): | |
| if index_name in config[language_key][section_key]: | |
| config_entry = config[language_key][section_key][index_name] | |
| # Create measure mapping | |
| measure_mapping = {} | |
| if config_entry and 'columns' in config_entry: | |
| columns_config = config_entry.get('columns', {}) | |
| for measure_name, col_idx in columns_config.items(): | |
| if isinstance(col_idx, int) and col_idx < len(df.columns): | |
| measure_mapping[measure_name] = df.columns[col_idx] | |
| else: | |
| # Fallback: use column names directly as measure names (skip first column) | |
| for i, col_name in enumerate(df.columns[1:], 1): | |
| measure_mapping[col_name] = col_name | |
| except Exception as e: | |
| logger.warning(f"Could not load YAML config for {index_name}, using fallback naming: {e}") | |
| # Fallback: use column names directly as measure names (skip first column) | |
| measure_mapping = {} | |
| for i, col_name in enumerate(df.columns[1:], 1): | |
| measure_mapping[col_name] = col_name | |
| # Clean and convert data | |
| df_clean = df.copy() | |
| # Clean text column | |
| df_clean[text_col] = df_clean[text_col].astype(str).str.strip().str.lower() | |
| df_clean = df_clean[df_clean[text_col] != ''] | |
| df_clean = df_clean[df_clean[text_col] != 'nan'] | |
| # Clean numeric columns | |
| for col in df_clean.columns[1:]: | |
| df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce') | |
| # Remove rows with all NaN measures | |
| df_clean = df_clean.dropna(subset=df_clean.columns[1:].tolist(), how='all') | |
| # Convert to nested dictionary | |
| for _, row in df_clean.iterrows(): | |
| text_key = row[text_col] | |
| if pd.isna(text_key) or text_key == '': | |
| continue | |
| measures = {} | |
| for measure_name, col_name in measure_mapping.items(): | |
| if col_name == text_col: # Skip the text column | |
| continue | |
| if col_name in row and not pd.isna(row[col_name]): | |
| measures[measure_name] = float(row[col_name]) | |
| if measures: # Only add if we have at least one valid measure | |
| nested_dict[text_key] = measures | |
| logger.info(f"Converted {len(nested_dict)} entries from DataFrame to nested dict for {index_name} {file_type}") | |
| return nested_dict | |
| def _parse_custom_frequency_format(self, df): | |
| """ | |
| Parse custom frequency list format and return a dictionary mapping words to frequency scores. | |
| Uses the 'Freq' column as the primary frequency score. | |
| Handles empty columns and cleans data properly. | |
| """ | |
| # Create case-insensitive column mapping | |
| column_mapping = {} | |
| for col in df.columns: | |
| column_mapping[col.lower().strip()] = col | |
| # Get the actual column names - try both Type and Headword for word column | |
| word_col = column_mapping.get('type') or column_mapping.get('headword') | |
| freq_col = column_mapping.get('freq') | |
| if not word_col or not freq_col: | |
| logger.warning("Custom frequency format missing required columns") | |
| return {} | |
| # Clean the data | |
| df_clean = df.copy() | |
| # Clean word column - remove empty/whitespace entries | |
| df_clean[word_col] = df_clean[word_col].astype(str).str.strip() | |
| df_clean = df_clean[df_clean[word_col] != ''] | |
| df_clean = df_clean[df_clean[word_col] != 'nan'] | |
| # Clean and convert frequency values to numeric | |
| df_clean[freq_col] = pd.to_numeric(df_clean[freq_col], errors='coerce') | |
| # Remove rows with NaN frequencies or invalid headwords | |
| df_clean = df_clean.dropna(subset=[freq_col]) | |
| df_clean = df_clean[df_clean[freq_col] > 0] # Only positive frequencies | |
| # Create dictionary mapping words to frequencies | |
| word_freq_dict = dict(zip(df_clean[word_col].str.lower(), df_clean[freq_col])) | |
| logger.info(f"Parsed custom frequency format with {len(word_freq_dict)} entries (cleaned from {len(df)} rows)") | |
| return word_freq_dict | |
| def _parse_custom_config(self, config): | |
| """ | |
| Parse custom frequency list using user-selected columns. | |
| Args: | |
| config: Dictionary with file_name/content, word_column, freq_column, delimiter | |
| """ | |
| word_column = config['word_column'] | |
| freq_column = config['freq_column'] | |
| delimiter = config['delimiter'] | |
| try: | |
| from io import StringIO | |
| # Check if we have content directly or need to read from file | |
| if 'content' in config: | |
| # Use content directly | |
| content_io = StringIO(config['content']) | |
| df = pd.read_csv(content_io, delimiter=delimiter, header=0, | |
| quoting=csv.QUOTE_MINIMAL, quotechar='"') | |
| elif 'file_path' in config: | |
| # Fallback to file path for backward compatibility | |
| df = pd.read_csv(config['file_path'], delimiter=delimiter, header=0, | |
| quoting=csv.QUOTE_MINIMAL, quotechar='"') | |
| else: | |
| logger.error("No content or file_path found in config") | |
| return {} | |
| # Validate columns exist | |
| if word_column not in df.columns: | |
| logger.error(f"Word column '{word_column}' not found in file") | |
| return {} | |
| if freq_column not in df.columns: | |
| logger.error(f"Frequency column '{freq_column}' not found in file") | |
| return {} | |
| # Clean the data | |
| df_clean = df.copy() | |
| # Clean word column - remove empty/whitespace entries | |
| df_clean[word_column] = df_clean[word_column].astype(str).str.strip() | |
| df_clean = df_clean[df_clean[word_column] != ''] | |
| df_clean = df_clean[df_clean[word_column] != 'nan'] | |
| # Clean and convert frequency values to numeric | |
| df_clean[freq_column] = pd.to_numeric(df_clean[freq_column], errors='coerce') | |
| # Remove rows with NaN frequencies or invalid words | |
| df_clean = df_clean.dropna(subset=[freq_column]) | |
| df_clean = df_clean[df_clean[freq_column] > 0] # Only positive frequencies | |
| # Create dictionary mapping words to frequencies | |
| word_freq_dict = dict(zip(df_clean[word_column].str.lower(), df_clean[freq_column])) | |
| logger.info(f"Parsed custom config with {len(word_freq_dict)} entries using columns '{word_column}' → '{freq_column}'") | |
| return word_freq_dict | |
| except Exception as e: | |
| logger.error(f"Error parsing custom config: {e}") | |
| return {} | |
| def _generate_ngrams(self, tokens: List, n: int, sep: str = " ") -> List[str]: | |
| """ | |
| Generate n-grams from token list, respecting sentence boundaries. | |
| Args: | |
| tokens: List of SpaCy tokens | |
| n: N-gram size (2 for bigrams, 3 for trigrams) | |
| Returns: | |
| List of n-grams as underscore-separated strings | |
| """ | |
| ngrams = [] | |
| sentence_starts = [0] | |
| # Find sentence boundaries | |
| for i, token in enumerate(tokens): | |
| if token.is_sent_start and i > 0: | |
| sentence_starts.append(i) | |
| sentence_starts.append(len(tokens)) | |
| # Generate n-grams within sentences | |
| for i in range(len(sentence_starts) - 1): | |
| start = sentence_starts[i] | |
| end = sentence_starts[i + 1] | |
| sentence_tokens = tokens[start:end] | |
| # Filter out punctuation | |
| clean_tokens = [t for t in sentence_tokens if not t.is_punct and not t.is_space] | |
| for j in range(len(clean_tokens) - n + 1): | |
| ngram_tokens = clean_tokens[j:j + n] | |
| ngram = sep.join([t.text.lower() for t in ngram_tokens]) | |
| ngrams.append(ngram) | |
| return ngrams | |
| def _lookup_score(self, word: str, index_name: str, file_type: str, | |
| measure_col: Optional[str] = None) -> Optional[float]: | |
| """ | |
| Unified lookup for both words and N-grams using nested dictionaries for O(1) performance. | |
| Args: | |
| word: Word/N-gram to look up | |
| index_name: Name of the reference index | |
| file_type: Type of reference file ('token', 'lemma', 'bigram', 'trigram') | |
| measure_col: Specific measure to retrieve (e.g., 'frequency', 'mi_score') | |
| Returns: | |
| Score if found, None otherwise | |
| """ | |
| if index_name not in self.reference_lists: | |
| return None | |
| ref_data = self.reference_lists[index_name].get(file_type) | |
| if ref_data is None: | |
| return None | |
| # Handle Japanese corpus data (special case) | |
| if isinstance(ref_data, dict) and ref_data.get('is_japanese_corpus', False): | |
| # This should not be called directly for Japanese data | |
| # Use _lookup_japanese_score instead | |
| return None | |
| # Handle legacy simple dictionaries (old unigram format: word -> single_score) | |
| if isinstance(ref_data, dict) and not any(isinstance(v, dict) for v in ref_data.values()): | |
| # Legacy simple dictionary format | |
| return ref_data.get(word.lower()) | |
| # Handle nested dictionary format (new unified format: word -> {measure: value, ...}) | |
| if isinstance(ref_data, dict): | |
| word_data = ref_data.get(word.lower()) | |
| if word_data is None or not isinstance(word_data, dict): | |
| return None | |
| # If measure specified, return that specific measure | |
| if measure_col: | |
| return word_data.get(measure_col) | |
| else: | |
| # Return first available measure for backward compatibility | |
| return next(iter(word_data.values())) if word_data else None | |
| # Fallback to DataFrame lookup (for compatibility during transition) | |
| if isinstance(ref_data, pd.DataFrame): | |
| # Find matching row | |
| word_col = ref_data.columns[0] | |
| matching_rows = ref_data[ref_data[word_col].str.lower() == word.lower()] | |
| if matching_rows.empty: | |
| return None | |
| if measure_col and measure_col in ref_data.columns: | |
| value = matching_rows[measure_col].iloc[0] | |
| # Handle non-numeric values | |
| try: | |
| return float(value) | |
| except (ValueError, TypeError): | |
| return None | |
| else: | |
| # Default to second column if no specific measure specified | |
| if len(ref_data.columns) > 1: | |
| value = matching_rows.iloc[0, 1] | |
| try: | |
| return float(value) | |
| except (ValueError, TypeError): | |
| return None | |
| return None | |
| return None | |
| def _lookup_with_unidic_fallback(self, token, index_name: str, file_type: str) -> Dict: | |
| """ | |
| Enhanced Japanese lookup with UniDic 3-level fallback using corpus-compatible keys. | |
| Args: | |
| token: SpaCy token object with UniDic extensions | |
| index_name: Name of the reference index | |
| file_type: Type of reference file ('token', 'lemma') | |
| Returns: | |
| Dictionary with score, method, key, and diagnostic information | |
| """ | |
| # Initialize diagnostic tracking | |
| attempted_keys = [] | |
| diagnostic_info = { | |
| 'attempted_keys': attempted_keys, | |
| 'unidic_features': {}, | |
| 'alignment_confidence': getattr(token._, 'alignment_confidence', 0.0), | |
| 'spacy_fallback_used': False, | |
| 'no_match': False | |
| } | |
| # Get UniDic features from token extensions | |
| unidic_features = { | |
| 'lemma': getattr(token._, 'unidic_lemma', '') or '', | |
| 'lForm': getattr(token._, 'unidic_lform', '') or '', | |
| 'pos1': getattr(token._, 'unidic_pos1', '') or '', | |
| 'pos2': getattr(token._, 'unidic_pos2', '') or '', | |
| 'pos3': getattr(token._, 'unidic_pos3', '') or '', | |
| 'goshu': getattr(token._, 'unidic_goshu', '') or '' | |
| } | |
| diagnostic_info['unidic_features'] = unidic_features | |
| # Only proceed with UniDic matching if we have good alignment and features | |
| if diagnostic_info['alignment_confidence'] > 0.5 and any(unidic_features.values()): | |
| # Try corpus-compatible keys using the hierarchical lookup dictionaries | |
| # Level 1: {lemma}_{lForm}_{pos1}_{pos2}_{pos3} (when pos3 exists) | |
| if all([unidic_features['lemma'], unidic_features['lForm'], | |
| unidic_features['pos1'], unidic_features['pos2'], unidic_features['pos3']]): | |
| level1_key = f"{unidic_features['lemma']}_{unidic_features['lForm']}_{unidic_features['pos1']}_{unidic_features['pos2']}_{unidic_features['pos3']}" | |
| attempted_keys.append(level1_key) | |
| score = self._lookup_japanese_corpus_level(level1_key, index_name, file_type, 'level1_dict') | |
| if score is not None: | |
| return { | |
| 'score': score, | |
| 'match_method': 'unidic_corpus_level_1', | |
| 'match_key': level1_key, | |
| 'diagnostic_info': diagnostic_info | |
| } | |
| # Level 2: {lemma}_{lForm}_{pos1}_{pos2} | |
| if all([unidic_features['lemma'], unidic_features['lForm'], | |
| unidic_features['pos1'], unidic_features['pos2']]): | |
| level2_key = f"{unidic_features['lemma']}_{unidic_features['lForm']}_{unidic_features['pos1']}_{unidic_features['pos2']}" | |
| attempted_keys.append(level2_key) | |
| score = self._lookup_japanese_corpus_level(level2_key, index_name, file_type, 'level2_dict') | |
| if score is not None: | |
| return { | |
| 'score': score, | |
| 'match_method': 'unidic_corpus_level_2', | |
| 'match_key': level2_key, | |
| 'diagnostic_info': diagnostic_info | |
| } | |
| # Level 3: {lemma}_{lForm}_{pos1} | |
| if all([unidic_features['lemma'], unidic_features['lForm'], unidic_features['pos1']]): | |
| level3_key = f"{unidic_features['lemma']}_{unidic_features['lForm']}_{unidic_features['pos1']}" | |
| attempted_keys.append(level3_key) | |
| score = self._lookup_japanese_corpus_level(level3_key, index_name, file_type, 'level3_dict') | |
| if score is not None: | |
| return { | |
| 'score': score, | |
| 'match_method': 'unidic_corpus_level_3', | |
| 'match_key': level3_key, | |
| 'diagnostic_info': diagnostic_info | |
| } | |
| # Fallback to legacy spaCy-based matching | |
| diagnostic_info['spacy_fallback_used'] = True | |
| legacy_score = self._lookup_japanese_score(token, index_name, file_type, fallback=True) | |
| if legacy_score is not None: | |
| legacy_key = f"{token.lemma_}_{token.tag_}" | |
| attempted_keys.append(f"legacy: {legacy_key}") | |
| return { | |
| 'score': legacy_score, | |
| 'match_method': 'legacy_spacy', | |
| 'match_key': legacy_key, | |
| 'diagnostic_info': diagnostic_info | |
| } | |
| # No match found | |
| diagnostic_info['no_match'] = True | |
| return { | |
| 'score': None, | |
| 'match_method': 'none', | |
| 'match_key': None, | |
| 'diagnostic_info': diagnostic_info | |
| } | |
| def _lookup_japanese_corpus_level(self, key: str, index_name: str, file_type: str, level_dict_name: str) -> Optional[float]: | |
| """ | |
| Look up score in a specific level dictionary of Japanese corpus data. | |
| Args: | |
| key: Composite key to look up | |
| index_name: Name of the reference index | |
| file_type: Type of reference file ('token', 'lemma') | |
| level_dict_name: Name of the level dictionary ('level1_dict', 'level2_dict', 'level3_dict') | |
| Returns: | |
| Score if found, None otherwise | |
| """ | |
| if index_name not in self.reference_lists: | |
| return None | |
| ref_data = self.reference_lists[index_name].get(file_type) | |
| if ref_data is None or not isinstance(ref_data, dict): | |
| return None | |
| if not ref_data.get('is_japanese_corpus', False): | |
| return None | |
| level_dict = ref_data.get(level_dict_name, {}) | |
| return level_dict.get(key) | |
| def _lookup_japanese_score(self, token, index_name: str, file_type: str, fallback: bool = False) -> Optional[float]: | |
| """ | |
| Look up score for a Japanese word using composite key approach. | |
| Args: | |
| token: SpaCy token object | |
| index_name: Name of the reference index | |
| file_type: Type of reference file ('token', 'lemma') | |
| fallback: Whether to use fallback search strategies | |
| Returns: | |
| Score if found, None otherwise | |
| """ | |
| if index_name not in self.reference_lists: | |
| return None | |
| ref_data = self.reference_lists[index_name].get(file_type) | |
| if ref_data is None or not isinstance(ref_data, dict): | |
| return None | |
| if not ref_data.get('is_japanese_corpus', False): | |
| return None | |
| # Try composite key first (lemma_pos) | |
| composite_key = f"{token.lemma_}_{token.tag_}" | |
| score = ref_data.get('composite_dict', {}).get(composite_key) | |
| if score is None and fallback: | |
| # Fallback to lemma only | |
| score = ref_data.get('lemma_dict', {}).get(token.lemma_.lower()) | |
| if score is None and fallback: | |
| # Final fallback to surface form | |
| score = ref_data.get('surface_dict', {}).get(token.text.lower()) | |
| return score | |
| def _should_apply_log_transform(self, index_name: str, analysis_type: str, | |
| measure_name: str, log_transforms: Optional[Dict[str, List[str]]], | |
| apply_log_fallback: bool) -> bool: | |
| """ | |
| Determine if a specific measure should be log-transformed. | |
| Args: | |
| index_name: Name of the reference index | |
| analysis_type: 'token' or 'lemma' | |
| measure_name: Name of the measure (e.g., 'frequency', 'MI') | |
| log_transforms: Dict mapping index names to lists of measures to log-transform | |
| apply_log_fallback: Legacy fallback boolean | |
| Returns: | |
| True if the measure should be log-transformed, False otherwise | |
| """ | |
| # If new log_transforms parameter is provided, use it | |
| if log_transforms is not None: | |
| index_transforms = log_transforms.get(index_name, []) | |
| return measure_name in index_transforms | |
| # Fallback to legacy apply_log behavior for backward compatibility | |
| return apply_log_fallback | |
| def _should_compute_measure(self, index_name: str, measure_name: str, | |
| selected_measures: Optional[Dict[str, List[str]]]) -> bool: | |
| """ | |
| Determine if a specific measure should be computed. | |
| Args: | |
| index_name: Name of the reference index | |
| measure_name: Name of the measure (e.g., 'frequency', 'MI') | |
| selected_measures: Dict mapping index names to lists of measures to compute | |
| Returns: | |
| True if the measure should be computed, False otherwise | |
| """ | |
| # If selected_measures is provided, use it for filtering | |
| if selected_measures is not None: | |
| index_measures = selected_measures.get(index_name, []) | |
| return measure_name in index_measures | |
| # If not specified, compute all measures (backward compatibility) | |
| return True | |
| def analyze_text(self, text: str, selected_indices: List[str], | |
| apply_log: bool = False, word_type_filter: Optional[str] = None, | |
| log_transforms: Optional[Dict[str, List[str]]] = None, | |
| selected_measures: Optional[Dict[str, List[str]]] = None, | |
| separate_word_types: bool = False) -> Dict: | |
| """ | |
| Analyze text and return lexical sophistication scores. | |
| Args: | |
| text: Input text to analyze | |
| selected_indices: List of reference indices to apply | |
| apply_log: Whether to apply log10 transformation (legacy parameter, superseded by log_transforms) | |
| word_type_filter: Filter by word type ('CW', 'FW', or None for all) | |
| log_transforms: Dict mapping index names to list of measures that should be log-transformed | |
| e.g., {'COCA_spoken_frequency_token': ['frequency', 'normalized_freq']} | |
| If None, falls back to apply_log behavior for backward compatibility | |
| selected_measures: Dict mapping index names to list of measures to compute | |
| e.g., {'COCA_spoken_frequency_token': ['frequency', 'range']} | |
| If None, computes all available measures for backward compatibility | |
| separate_word_types: If True, process CW and FW separately in the same analysis call | |
| Returns: | |
| Dictionary containing analysis results | |
| """ | |
| # Process text using base class | |
| doc = self.process_document(text) | |
| tokens = self.filter_tokens(doc, exclude_punct=True, exclude_space=True) | |
| # Generate n-grams | |
| bigrams = self._generate_ngrams(tokens, 2) | |
| trigrams = self._generate_ngrams(tokens, 3) | |
| # Prepare results structure | |
| results = { | |
| 'summary': {}, | |
| 'token_details': [], | |
| 'bigram_details': [], | |
| 'trigram_details': [], | |
| 'text_stats': { | |
| 'total_tokens': len(tokens), | |
| 'unique_tokens': len(set(t.text.lower() for t in tokens)), | |
| 'content_words': len([t for t in tokens if self._classify_pos(t) == 'CW']), | |
| 'function_words': len([t for t in tokens if self._classify_pos(t) == 'FW']) | |
| }, | |
| 'raw_scores': {}, # Raw scores for plotting | |
| 'tokens': tokens, # Raw spaCy tokens for advanced analysis | |
| 'doc': doc # Full spaCy doc for complex operations | |
| } | |
| # Initialize score collections | |
| all_scores = defaultdict(list) | |
| # Process each token | |
| for i, token in enumerate(tokens): | |
| word_type = self._classify_pos(token) | |
| # Skip if filtering by word type | |
| if word_type_filter and word_type != word_type_filter: | |
| continue | |
| # Work directly with spaCy token - include syntactic information | |
| token_detail = { | |
| 'id': i + 1, | |
| 'token': token.text, | |
| 'lemma': token.lemma_, | |
| 'pos': token.pos_, | |
| 'tag': token.tag_, | |
| 'dep_': token.dep_, # Add dependency relation | |
| 'head_text': token.head.text, # Add head word | |
| 'head_pos': token.head.pos_, # Add head POS | |
| 'word_type': word_type | |
| } | |
| # Look up scores for each selected index | |
| for index_name in selected_indices: | |
| # Extract base name and determine analysis type to avoid duplicate suffixes | |
| if index_name.endswith('_token'): | |
| base_name = index_name[:-6] # Remove '_token' | |
| analysis_type = 'token' | |
| elif index_name.endswith('_lemma'): | |
| base_name = index_name[:-6] # Remove '_lemma' | |
| analysis_type = 'lemma' | |
| else: | |
| # Fallback for entries without clear suffix | |
| base_name = index_name | |
| analysis_type = 'token' # Default to token | |
| # Check if this is a Japanese corpus reference list | |
| ref_data = self.reference_lists.get(index_name, {}) | |
| is_japanese_corpus = False | |
| for file_type in ['token', 'lemma']: | |
| data = ref_data.get(file_type, {}) | |
| if isinstance(data, dict) and data.get('is_japanese_corpus', False): | |
| is_japanese_corpus = True | |
| break | |
| if is_japanese_corpus and self.language == 'ja': | |
| # Use enhanced UniDic lookup with 3-level fallback and diagnostics | |
| if analysis_type == 'token': | |
| result = self._lookup_with_unidic_fallback(token, index_name, 'token') | |
| score = result['score'] | |
| # Store enhanced details with clean column name | |
| token_detail[index_name] = score if score is not None else None | |
| token_detail[f"{index_name}_match_method"] = result['match_method'] | |
| token_detail[f"{index_name}_match_key"] = result['match_key'] or None | |
| # Store UniDic features for display (only once per token) | |
| if hasattr(token, '_') and hasattr(token._, 'unidic_lemma') and 'unidic_features' not in token_detail: | |
| token_detail['unidic_features'] = { | |
| 'lemma': getattr(token._, 'unidic_lemma', ''), | |
| 'lForm': getattr(token._, 'unidic_lform', ''), | |
| 'pos1': getattr(token._, 'unidic_pos1', ''), | |
| 'pos2': getattr(token._, 'unidic_pos2', ''), | |
| 'goshu': getattr(token._, 'unidic_goshu', ''), | |
| 'alignment_confidence': getattr(token._, 'alignment_confidence', 0.0) | |
| } | |
| else: # lemma analysis | |
| result = self._lookup_with_unidic_fallback(token, index_name, 'lemma') | |
| score = result['score'] | |
| # Store enhanced details with clean column name | |
| token_detail[index_name] = score if score is not None else None | |
| token_detail[f"{index_name}_match_method"] = result['match_method'] | |
| token_detail[f"{index_name}_match_key"] = result['match_key'] or None | |
| elif is_japanese_corpus: | |
| # Fallback to legacy Japanese lookup if UniDic not available | |
| if analysis_type == 'token': | |
| score = self._lookup_japanese_score(token, index_name, 'token', fallback=True) | |
| # Apply log transformation if needed before storing | |
| if score is not None: | |
| should_log_transform = self._should_apply_log_transform( | |
| index_name, analysis_type, 'frequency', log_transforms, apply_log | |
| ) | |
| final_score = np.log10(score) if should_log_transform and score > 0 else score | |
| else: | |
| final_score = None | |
| token_detail[index_name] = final_score | |
| token_detail[f"{index_name}_match_method"] = "legacy_spacy" | |
| else: # lemma analysis | |
| score = self._lookup_japanese_score(token, index_name, 'lemma', fallback=True) | |
| # Apply log transformation if needed before storing | |
| if score is not None: | |
| should_log_transform = self._should_apply_log_transform( | |
| index_name, analysis_type, 'frequency', log_transforms, apply_log | |
| ) | |
| final_score = np.log10(score) if should_log_transform and score > 0 else score | |
| else: | |
| final_score = None | |
| token_detail[index_name] = final_score | |
| token_detail[f"{index_name}_match_method"] = "legacy_spacy" | |
| else: | |
| # Standard lookup for non-Japanese data | |
| if analysis_type == 'token': | |
| score = self._lookup_score(token.text, index_name, 'token') | |
| else: # lemma analysis | |
| score = self._lookup_score(token.lemma_, index_name, 'lemma') | |
| # Apply log transformation if needed before storing | |
| if score is not None: | |
| should_log_transform = self._should_apply_log_transform( | |
| index_name, analysis_type, 'frequency', log_transforms, apply_log | |
| ) | |
| final_score = np.log10(score) if should_log_transform and score > 0 else score | |
| else: | |
| final_score = None | |
| # Store score with clean column name and transformed value | |
| token_detail[index_name] = final_score | |
| # Collect for summary statistics (score is already transformed if needed) | |
| score = token_detail.get(index_name) | |
| if score is not None: | |
| # Handle different collection methods based on parameters | |
| if separate_word_types or word_type_filter: | |
| # Include word type in the key | |
| all_scores[f"{index_name}_{word_type}"].append(score) | |
| else: | |
| # No word type suffix for unfiltered analysis | |
| all_scores[index_name].append(score) | |
| results['token_details'].append(token_detail) | |
| # Calculate summary statistics | |
| for score_key, scores in all_scores.items(): | |
| if scores: | |
| results['summary'][score_key] = { | |
| 'mean': np.mean(scores), | |
| 'std': np.std(scores), | |
| 'count': len(scores), | |
| 'min': np.min(scores), | |
| 'max': np.max(scores) | |
| } | |
| # Store raw scores for plotting | |
| results['raw_scores'][score_key] = scores | |
| # Process n-grams if available | |
| for ngram_type, ngrams in [('bigram', bigrams), ('trigram', trigrams)]: | |
| if not ngrams: | |
| continue | |
| # Store n-gram details | |
| ngram_details_key = f'{ngram_type}_details' | |
| ngram_counter = {} | |
| # Count occurrences of each n-gram | |
| for ngram in ngrams: | |
| ngram_counter[ngram] = ngram_counter.get(ngram, 0) + 1 | |
| # Process unique n-grams for details | |
| for i, (ngram, count) in enumerate(ngram_counter.items()): | |
| ngram_detail = { | |
| 'id': i + 1, | |
| ngram_type: ngram, | |
| 'frequency': count | |
| } | |
| # Look up scores for each index | |
| for index_name in selected_indices: | |
| if index_name not in self.reference_lists: | |
| continue | |
| ref_data = self.reference_lists[index_name].get(ngram_type) | |
| if ref_data is None: | |
| continue | |
| # Skip if using old DataFrame format (should be converted by now) | |
| if isinstance(ref_data, pd.DataFrame): | |
| logger.warning(f"Found unconverted DataFrame for {index_name} {ngram_type}, skipping") | |
| continue | |
| # Ensure we have the new nested dictionary format | |
| if not isinstance(ref_data, dict): | |
| continue | |
| # Get available measures from any N-gram entry | |
| sample_ngram_data = next(iter(ref_data.values())) if ref_data else {} | |
| if not isinstance(sample_ngram_data, dict): | |
| continue | |
| available_measures = list(sample_ngram_data.keys()) | |
| # Process each available measure | |
| for measure_name in available_measures: | |
| # Check if this measure should be computed | |
| if not self._should_compute_measure(index_name, measure_name, selected_measures): | |
| continue | |
| # Use the unified lookup method for O(1) performance | |
| score = self._lookup_score(ngram, index_name, ngram_type, measure_name) | |
| if score is not None: | |
| # Check if this measure should be log-transformed | |
| should_log_transform = self._should_apply_log_transform( | |
| index_name, ngram_type, measure_name, log_transforms, apply_log | |
| ) | |
| score_val = np.log10(score) if should_log_transform and score > 0 else score | |
| ngram_detail[f"{index_name}_{measure_name}"] = score_val | |
| else: | |
| ngram_detail[f"{index_name}_{measure_name}"] = None | |
| # Get columns config for proper measure naming from YAML config | |
| # We need to access the original YAML configuration to get proper measure names | |
| from web_app.config_manager import ConfigManager | |
| config = ConfigManager.load_reference_config() | |
| language_key = "english" if self.language == 'en' else "japanese" | |
| # Find the config entry for this index | |
| config_entry = None | |
| for config_section in [f"{ngram_type}s"]: # bigrams/trigrams sections | |
| if config_section in config.get(language_key, {}): | |
| if index_name in config[language_key][config_section]: | |
| config_entry = config[language_key][config_section][index_name] | |
| break | |
| # Note: With nested dictionary format, we already processed all measures above | |
| # No additional processing needed here since measures are extracted directly from the dictionary | |
| results[ngram_details_key].append(ngram_detail) | |
| # Also process for summary statistics | |
| for index_name in selected_indices: | |
| if index_name not in self.reference_lists: | |
| continue | |
| ref_data = self.reference_lists[index_name].get(ngram_type) | |
| if ref_data is None: | |
| continue | |
| # Skip if using old DataFrame format (should be converted by now) | |
| if isinstance(ref_data, pd.DataFrame): | |
| logger.warning(f"Found unconverted DataFrame for {index_name} {ngram_type} in summary, skipping") | |
| continue | |
| # Ensure we have the new nested dictionary format | |
| if not isinstance(ref_data, dict): | |
| continue | |
| # Get available measures from any N-gram entry | |
| sample_ngram_data = next(iter(ref_data.values())) if ref_data else {} | |
| if not isinstance(sample_ngram_data, dict): | |
| continue | |
| available_measures = list(sample_ngram_data.keys()) | |
| # Process each available measure for summary statistics | |
| for measure_name in available_measures: | |
| # Check if this measure should be computed | |
| if not self._should_compute_measure(index_name, measure_name, selected_measures): | |
| continue | |
| ngram_scores = [] | |
| for ngram in ngrams: | |
| score = self._lookup_score(ngram, index_name, ngram_type, measure_name) | |
| if score is not None: | |
| # Check if this measure should be log-transformed | |
| should_log_transform = self._should_apply_log_transform( | |
| index_name, ngram_type, measure_name, log_transforms, apply_log | |
| ) | |
| score_val = np.log10(score) if should_log_transform and score > 0 else score | |
| ngram_scores.append(score_val) | |
| if ngram_scores: | |
| key = f"{index_name}_{ngram_type}_{measure_name}" | |
| results['summary'][key] = { | |
| 'mean': np.mean(ngram_scores), | |
| 'std': np.std(ngram_scores), | |
| 'count': len(ngram_scores), | |
| 'min': np.min(ngram_scores), | |
| 'max': np.max(ngram_scores) | |
| } | |
| # Store raw scores for plotting | |
| results['raw_scores'][key] = ngram_scores | |
| # Get columns config for proper measure naming from YAML config | |
| # We need to access the original YAML configuration to get proper measure names | |
| from web_app.config_manager import ConfigManager | |
| config = ConfigManager.load_reference_config() | |
| language_key = "english" if self.language == 'en' else "japanese" | |
| # Find the config entry for this index | |
| config_entry = None | |
| for config_section in [f"{ngram_type}s"]: # bigrams/trigrams sections | |
| if config_section in config.get(language_key, {}): | |
| if index_name in config[language_key][config_section]: | |
| config_entry = config[language_key][config_section][index_name] | |
| break | |
| # Note: With nested dictionary format, summary statistics are already processed above | |
| # No additional processing needed here since measures are extracted directly from the dictionary | |
| return results | |
| def analyze_batch_memory(self, file_contents: List[Tuple[str, str]], selected_indices: List[str], | |
| apply_log: bool = False, word_type_filter: Optional[str] = None, | |
| log_transforms: Optional[Dict[str, List[str]]] = None, | |
| selected_measures: Optional[Dict[str, List[str]]] = None, | |
| progress_callback=None) -> pd.DataFrame: | |
| """ | |
| Analyze multiple text files from memory and return aggregated results. | |
| Optimized version that processes both CW and FW in a single pass. | |
| Args: | |
| file_contents: List of (filename, text_content) tuples | |
| selected_indices: List of reference indices to apply | |
| apply_log: Whether to apply log10 transformation (legacy parameter, superseded by log_transforms) | |
| word_type_filter: Filter by word type ('CW', 'FW', or None for all) | |
| log_transforms: Dict mapping index names to list of measures that should be log-transformed | |
| selected_measures: Dict mapping index names to list of measures to compute | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| DataFrame with aggregated results | |
| """ | |
| batch_results = [] | |
| for i, (filename, text_content) in enumerate(file_contents): | |
| try: | |
| result_row = {'filename': filename} | |
| if word_type_filter: | |
| # Analyze only for specific word type | |
| analysis = self.analyze_text( | |
| text_content, | |
| selected_indices, | |
| apply_log=apply_log, | |
| word_type_filter=word_type_filter, | |
| log_transforms=log_transforms, | |
| selected_measures=selected_measures | |
| ) | |
| # Extract summary scores | |
| for key, stats in analysis['summary'].items(): | |
| result_row[key] = stats['mean'] | |
| else: | |
| # Single optimized analysis call that processes both CW and FW | |
| analysis = self.analyze_text( | |
| text_content, | |
| selected_indices, | |
| apply_log=apply_log, | |
| word_type_filter=None, | |
| log_transforms=log_transforms, | |
| selected_measures=selected_measures, | |
| separate_word_types=True # Process CW/FW separately in same pass | |
| ) | |
| # Extract all summary scores including CW, FW, and n-grams | |
| for key, stats in analysis['summary'].items(): | |
| result_row[key] = stats['mean'] | |
| batch_results.append(result_row) | |
| if progress_callback: | |
| progress_callback(i + 1, len(file_contents)) | |
| except Exception as e: | |
| logger.error(f"Error processing file {filename}: {e}") | |
| # Add error row | |
| error_row = {'filename': filename, 'error': str(e)} | |
| batch_results.append(error_row) | |
| if progress_callback: | |
| progress_callback(i + 1, len(file_contents)) | |
| return pd.DataFrame(batch_results) | |
| def analyze_batch(self, file_paths: List[str], selected_indices: List[str], | |
| apply_log: bool = False, progress_callback=None) -> pd.DataFrame: | |
| """ | |
| Legacy batch analysis method for backward compatibility. | |
| Analyze multiple text files and return aggregated results. | |
| Args: | |
| file_paths: List of paths to text files | |
| selected_indices: List of reference indices to apply | |
| apply_log: Whether to apply log10 transformation | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| DataFrame with aggregated results | |
| """ | |
| batch_results = [] | |
| for i, file_path in enumerate(file_paths): | |
| try: | |
| # Read file | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| # Use optimized single-pass analysis | |
| result_row = {'filename': Path(file_path).name} | |
| # Single optimized analysis call that processes both CW and FW | |
| analysis = self.analyze_text( | |
| text, | |
| selected_indices, | |
| apply_log=apply_log, | |
| word_type_filter=None, | |
| separate_word_types=True # Process CW/FW separately in same pass | |
| ) | |
| # Extract all summary scores | |
| for key, stats in analysis['summary'].items(): | |
| result_row[key] = stats['mean'] | |
| batch_results.append(result_row) | |
| if progress_callback: | |
| progress_callback(i + 1, len(file_paths)) | |
| except Exception as e: | |
| logger.error(f"Error processing file {file_path}: {e}") | |
| # Add error row | |
| error_row = {'filename': Path(file_path).name, 'error': str(e)} | |
| batch_results.append(error_row) | |
| if progress_callback: | |
| progress_callback(i + 1, len(file_paths)) | |
| return pd.DataFrame(batch_results) | |