Spaces:
Building
Building
| """ | |
| Configuration management module for reference lists and file processing. | |
| Handles loading, validation, and management of frequency list configurations. | |
| """ | |
| import streamlit as st | |
| import pandas as pd | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional, Tuple | |
| import yaml | |
| import csv | |
| from web_app.session_manager import SessionManager | |
| from web_app.utils import MemoryFileHandler | |
| class ConfigManager: | |
| """Manages configuration for reference lists and file processing.""" | |
| def load_reference_config() -> Dict[str, Any]: | |
| """Load reference lists configuration from YAML file.""" | |
| config_path = Path("config/reference_lists.yaml") | |
| if config_path.exists(): | |
| try: | |
| with open(config_path, 'r', encoding='utf-8') as f: | |
| return yaml.safe_load(f) | |
| except Exception as e: | |
| st.error(f"Error loading reference configuration: {e}") | |
| return {"english": {"unigrams": {}, "bigrams": {}, "trigrams": {}}, | |
| "japanese": {"unigrams": {}, "bigrams": {}, "trigrams": {}}} | |
| def get_numeric_columns(df: pd.DataFrame) -> List[str]: | |
| """Get list of columns that contain numeric data.""" | |
| numeric_cols = [] | |
| for col in df.columns: | |
| try: | |
| sample = df[col].dropna().head(10) | |
| if len(sample) > 0: | |
| pd.to_numeric(sample, errors='raise') | |
| numeric_cols.append(col) | |
| except (ValueError, TypeError): | |
| continue | |
| return numeric_cols | |
| def process_uploaded_file(uploaded_file) -> Optional[Dict[str, Any]]: | |
| """Process a single uploaded file and return its configuration.""" | |
| try: | |
| from io import StringIO | |
| # Use memory-based approach for HF Spaces compatibility | |
| content = MemoryFileHandler.process_uploaded_file(uploaded_file, as_text=False) | |
| if not content: | |
| st.error(f"Failed to read file {uploaded_file.name}") | |
| return None | |
| # Decode content if it's bytes | |
| if isinstance(content, bytes): | |
| text_content = content.decode('utf-8') | |
| else: | |
| text_content = content | |
| # Determine delimiter from first 1024 chars | |
| sample = text_content[:1024] | |
| delimiter = ',' if sample.count(',') > sample.count('\t') else '\t' | |
| # Create StringIO for pandas to read | |
| content_io = StringIO(text_content) | |
| # Load preview | |
| df_preview = pd.read_csv(content_io, delimiter=delimiter, header=0, nrows=5, | |
| quoting=csv.QUOTE_MINIMAL, quotechar='"') | |
| # Store content in session state instead of file path | |
| if 'uploaded_files_content' not in st.session_state: | |
| st.session_state.uploaded_files_content = {} | |
| # Use filename as key | |
| st.session_state.uploaded_files_content[uploaded_file.name] = text_content | |
| return { | |
| 'file_name': uploaded_file.name, | |
| 'columns': list(df_preview.columns), | |
| 'delimiter': delimiter, | |
| 'preview': df_preview, | |
| 'base_name': Path(uploaded_file.name).stem, | |
| 'configurations': [], | |
| 'content': text_content # Include content for immediate use | |
| } | |
| except Exception as e: | |
| st.error(f"Error processing file {uploaded_file.name}: {e}") | |
| return None | |
| def create_custom_config(file_name: str, content: str, delimiter: str, word_col: str, score_col: str) -> Dict[str, Any]: | |
| """Create custom configuration object for backend.""" | |
| return { | |
| 'file_name': file_name, | |
| 'content': content, | |
| 'word_column': word_col, | |
| 'freq_column': score_col, | |
| 'delimiter': delimiter, | |
| 'is_custom_config': True | |
| } | |
| def validate_index_config(word_col: str, score_col: str, index_name: str) -> Tuple[bool, str]: | |
| """Validate an index configuration.""" | |
| if not word_col or not score_col or not index_name: | |
| return False, "Missing required fields" | |
| if word_col == score_col: | |
| return False, "Word and score columns cannot be the same" | |
| if not index_name.strip(): | |
| return False, "Index name cannot be empty" | |
| return True, "" | |
| def apply_configurations(all_configs: Dict[str, Any]) -> Tuple[int, List[str]]: | |
| """Apply multiple index configurations to session state.""" | |
| success_count = 0 | |
| errors = [] | |
| for file_key, file_config in all_configs.items(): | |
| # Validate that we have content | |
| if 'content' not in file_config or not file_config['content']: | |
| errors.append(f"No content found for file: {file_key}") | |
| continue | |
| for index_config in file_config['indices']: | |
| word_col = index_config['word_column'] | |
| score_col = index_config['score_column'] | |
| index_name = index_config['index_name'] | |
| # Validate configuration | |
| is_valid, error_msg = ConfigManager.validate_index_config(word_col, score_col, index_name) | |
| if not is_valid: | |
| errors.append(f"{file_key}: {error_msg}") | |
| continue | |
| # Remove existing entry if it exists | |
| if index_name in SessionManager.get_reference_lists(): | |
| SessionManager.remove_reference_list(index_name) | |
| # Create and store configuration | |
| custom_data = ConfigManager.create_custom_config( | |
| file_config['file_name'], | |
| file_config['content'], | |
| file_config['delimiter'], | |
| word_col, | |
| score_col | |
| ) | |
| SessionManager.add_reference_list(index_name, {'token': custom_data}) | |
| success_count += 1 | |
| return success_count, errors | |
| def load_reference_list_data(list_config: Dict[str, Any]) -> Dict[str, Any]: | |
| """Load actual data for a reference list based on its configuration. | |
| Supports both old schema (files.token/files.lemma) and new schema (single file). | |
| """ | |
| from web_app.schema_validator import SchemaValidator | |
| data = {} | |
| # Detect schema version for this specific entry | |
| is_new_schema = any(field in list_config for field in SchemaValidator.NEW_SCHEMA_FIELDS) | |
| # Check if this is a Japanese corpus | |
| is_japanese_corpus = list_config.get('japanese_corpus', False) | |
| # Check if this is a bigram or trigram configuration | |
| columns = list_config.get('columns', {}) | |
| is_bigram = 'bigram' in columns | |
| is_trigram = 'trigram' in columns | |
| # Handle different schema formats | |
| if is_new_schema: | |
| # New schema: single file with analysis_type | |
| file_path = list_config.get('file') | |
| analysis_type = list_config.get('analysis_type', 'token') | |
| if file_path: | |
| files_to_process = {analysis_type: file_path} | |
| else: | |
| files_to_process = {} | |
| else: | |
| # Old schema: files.token/files.lemma | |
| files_to_process = list_config.get('files', {}) | |
| for file_type, file_path in files_to_process.items(): | |
| if file_path is None: | |
| continue | |
| file_path = Path(file_path) | |
| if not file_path.exists(): | |
| continue | |
| try: | |
| # Determine delimiter | |
| delimiter = '\t' if list_config.get('format', 'csv') == 'tsv' else ',' | |
| # Load file | |
| if list_config.get('has_header', False): | |
| df = pd.read_csv(file_path, delimiter=delimiter, header=0, | |
| quoting=csv.QUOTE_MINIMAL, quotechar='"') | |
| else: | |
| df = pd.read_csv(file_path, delimiter=delimiter, header=None, | |
| quoting=csv.QUOTE_MINIMAL, quotechar='"') | |
| # Get column mapping | |
| columns = list_config.get('columns', {}) | |
| if is_japanese_corpus and file_type in ['token', 'lemma']: | |
| # Handle Japanese corpus format with composite keys | |
| processed_data = ConfigManager._parse_japanese_corpus_data(df, columns) | |
| data[file_type] = processed_data | |
| elif file_type in ['token', 'lemma'] and not is_bigram and not is_trigram: | |
| # For standard unigrams | |
| word_col = columns.get('word', 0) | |
| score_col = columns.get('frequency', 1) | |
| if isinstance(word_col, int) and isinstance(score_col, int): | |
| if len(df.columns) > max(word_col, score_col): | |
| # Clean and convert scores to numeric | |
| df.iloc[:, score_col] = pd.to_numeric(df.iloc[:, score_col], errors='coerce') | |
| # Remove rows with NaN scores | |
| df = df.dropna(subset=[df.columns[score_col]]) | |
| data[file_type] = dict(zip( | |
| df.iloc[:, word_col].astype(str).str.lower(), | |
| df.iloc[:, score_col] | |
| )) | |
| else: | |
| # For n-gram files | |
| for col in df.columns[1:]: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| # Determine the correct file type for backend | |
| if is_bigram: | |
| data['bigram'] = df | |
| elif is_trigram: | |
| data['trigram'] = df | |
| else: | |
| # For standard unigram files that aren't bigrams or trigrams | |
| data[file_type] = df | |
| except Exception as e: | |
| st.error(f"Error loading {file_type} file {file_path}: {e}") | |
| continue | |
| return data | |
| def _parse_japanese_corpus_data(df: pd.DataFrame, columns: Dict[str, int]) -> Dict[str, Any]: | |
| """Parse Japanese corpus data and create multiple lookup dictionaries with hierarchical POS splitting.""" | |
| try: | |
| # Get column indices | |
| surface_col_idx = columns.get('surface_form', 1) | |
| lemma_col_idx = columns.get('lemma', 2) | |
| pos_col_idx = columns.get('pos', 3) | |
| freq_col_idx = columns.get('frequency', 6) | |
| # Get actual column names | |
| df_columns = list(df.columns) | |
| surface_col = df_columns[surface_col_idx] if surface_col_idx < len(df_columns) else None | |
| lemma_col = df_columns[lemma_col_idx] if lemma_col_idx < len(df_columns) else None | |
| pos_col = df_columns[pos_col_idx] if pos_col_idx < len(df_columns) else None | |
| freq_col = df_columns[freq_col_idx] if freq_col_idx < len(df_columns) else None | |
| if not all([surface_col, lemma_col, pos_col, freq_col]): | |
| raise ValueError("Missing required columns for Japanese corpus") | |
| # Clean the data | |
| df_clean = df.copy() | |
| # Clean text columns | |
| for col in [surface_col, lemma_col, pos_col]: | |
| df_clean[col] = df_clean[col].astype(str).str.strip() | |
| df_clean = df_clean[df_clean[col] != ''] | |
| df_clean = df_clean[df_clean[col] != 'nan'] | |
| # Clean and convert frequency column | |
| df_clean[freq_col] = pd.to_numeric(df_clean[freq_col], errors='coerce') | |
| df_clean = df_clean.dropna(subset=[freq_col]) | |
| df_clean = df_clean[df_clean[freq_col] > 0] # Only positive frequencies | |
| # Split POS column by hyphen to extract pos1, pos2, pos3 | |
| def split_pos(pos_str): | |
| parts = str(pos_str).split('-') | |
| return { | |
| 'pos1': parts[0] if len(parts) > 0 else '', | |
| 'pos2': parts[1] if len(parts) > 1 else '', | |
| 'pos3': parts[2] if len(parts) > 2 else '' | |
| } | |
| pos_split = df_clean[pos_col].apply(split_pos) | |
| df_clean['pos1'] = [p['pos1'] for p in pos_split] | |
| df_clean['pos2'] = [p['pos2'] for p in pos_split] | |
| df_clean['pos3'] = [p['pos3'] for p in pos_split] | |
| # Create multiple levels of composite keys to match UniDic lookup hierarchy | |
| # Level 1: lemma_lForm_pos1_pos2_pos3 (when pos3 exists) | |
| df_clean['level1_key'] = df_clean.apply( | |
| lambda row: f"{row[lemma_col]}_{row[surface_col]}_{row['pos1']}_{row['pos2']}_{row['pos3']}" | |
| if row['pos3'] else None, axis=1 | |
| ) | |
| # Level 2: lemma_lForm_pos1_pos2 | |
| df_clean['level2_key'] = df_clean.apply( | |
| lambda row: f"{row[lemma_col]}_{row[surface_col]}_{row['pos1']}_{row['pos2']}" | |
| if row['pos2'] else None, axis=1 | |
| ) | |
| # Level 3: lemma_lForm_pos1 | |
| df_clean['level3_key'] = df_clean.apply( | |
| lambda row: f"{row[lemma_col]}_{row[surface_col]}_{row['pos1']}" | |
| if row['pos1'] else None, axis=1 | |
| ) | |
| # Legacy composite key for backward compatibility | |
| df_clean['legacy_key'] = df_clean[lemma_col] + '_' + df_clean[pos_col] | |
| # Create lookup dictionaries for each level | |
| level1_dict = {} | |
| level2_dict = {} | |
| level3_dict = {} | |
| for _, row in df_clean.iterrows(): | |
| freq = row[freq_col] | |
| if row['level1_key']: | |
| level1_dict[row['level1_key']] = freq | |
| if row['level2_key']: | |
| level2_dict[row['level2_key']] = freq | |
| if row['level3_key']: | |
| level3_dict[row['level3_key']] = freq | |
| # Return enhanced Japanese corpus data structure | |
| return { | |
| 'level1_dict': level1_dict, # Most specific UniDic-compatible keys | |
| 'level2_dict': level2_dict, | |
| 'level3_dict': level3_dict, | |
| 'composite_dict': dict(zip(df_clean['legacy_key'], df_clean[freq_col])), # Legacy format | |
| 'lemma_dict': dict(zip(df_clean[lemma_col].str.lower(), df_clean[freq_col])), | |
| 'surface_dict': dict(zip(df_clean[surface_col].str.lower(), df_clean[freq_col])), | |
| 'is_japanese_corpus': True | |
| } | |
| except Exception as e: | |
| st.error(f"Error parsing Japanese corpus data: {e}") | |
| return {} | |
| def clean_default_reference_lists(): | |
| """Clean up default reference lists that are no longer selected.""" | |
| # This would be called by the UI when managing default reference lists | |
| # Implementation depends on how default lists are managed | |
| pass | |