simple-text-analyzer / web_app /config_manager.py
egumasa's picture
emuTAALES
e7279e4
"""
Configuration management module for reference lists and file processing.
Handles loading, validation, and management of frequency list configurations.
"""
import streamlit as st
import pandas as pd
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import yaml
import csv
from web_app.session_manager import SessionManager
from web_app.utils import MemoryFileHandler
class ConfigManager:
"""Manages configuration for reference lists and file processing."""
@staticmethod
@st.cache_data
def load_reference_config() -> Dict[str, Any]:
"""Load reference lists configuration from YAML file."""
config_path = Path("config/reference_lists.yaml")
if config_path.exists():
try:
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
st.error(f"Error loading reference configuration: {e}")
return {"english": {"unigrams": {}, "bigrams": {}, "trigrams": {}},
"japanese": {"unigrams": {}, "bigrams": {}, "trigrams": {}}}
@staticmethod
def get_numeric_columns(df: pd.DataFrame) -> List[str]:
"""Get list of columns that contain numeric data."""
numeric_cols = []
for col in df.columns:
try:
sample = df[col].dropna().head(10)
if len(sample) > 0:
pd.to_numeric(sample, errors='raise')
numeric_cols.append(col)
except (ValueError, TypeError):
continue
return numeric_cols
@staticmethod
def process_uploaded_file(uploaded_file) -> Optional[Dict[str, Any]]:
"""Process a single uploaded file and return its configuration."""
try:
from io import StringIO
# Use memory-based approach for HF Spaces compatibility
content = MemoryFileHandler.process_uploaded_file(uploaded_file, as_text=False)
if not content:
st.error(f"Failed to read file {uploaded_file.name}")
return None
# Decode content if it's bytes
if isinstance(content, bytes):
text_content = content.decode('utf-8')
else:
text_content = content
# Determine delimiter from first 1024 chars
sample = text_content[:1024]
delimiter = ',' if sample.count(',') > sample.count('\t') else '\t'
# Create StringIO for pandas to read
content_io = StringIO(text_content)
# Load preview
df_preview = pd.read_csv(content_io, delimiter=delimiter, header=0, nrows=5,
quoting=csv.QUOTE_MINIMAL, quotechar='"')
# Store content in session state instead of file path
if 'uploaded_files_content' not in st.session_state:
st.session_state.uploaded_files_content = {}
# Use filename as key
st.session_state.uploaded_files_content[uploaded_file.name] = text_content
return {
'file_name': uploaded_file.name,
'columns': list(df_preview.columns),
'delimiter': delimiter,
'preview': df_preview,
'base_name': Path(uploaded_file.name).stem,
'configurations': [],
'content': text_content # Include content for immediate use
}
except Exception as e:
st.error(f"Error processing file {uploaded_file.name}: {e}")
return None
@staticmethod
def create_custom_config(file_name: str, content: str, delimiter: str, word_col: str, score_col: str) -> Dict[str, Any]:
"""Create custom configuration object for backend."""
return {
'file_name': file_name,
'content': content,
'word_column': word_col,
'freq_column': score_col,
'delimiter': delimiter,
'is_custom_config': True
}
@staticmethod
def validate_index_config(word_col: str, score_col: str, index_name: str) -> Tuple[bool, str]:
"""Validate an index configuration."""
if not word_col or not score_col or not index_name:
return False, "Missing required fields"
if word_col == score_col:
return False, "Word and score columns cannot be the same"
if not index_name.strip():
return False, "Index name cannot be empty"
return True, ""
@staticmethod
def apply_configurations(all_configs: Dict[str, Any]) -> Tuple[int, List[str]]:
"""Apply multiple index configurations to session state."""
success_count = 0
errors = []
for file_key, file_config in all_configs.items():
# Validate that we have content
if 'content' not in file_config or not file_config['content']:
errors.append(f"No content found for file: {file_key}")
continue
for index_config in file_config['indices']:
word_col = index_config['word_column']
score_col = index_config['score_column']
index_name = index_config['index_name']
# Validate configuration
is_valid, error_msg = ConfigManager.validate_index_config(word_col, score_col, index_name)
if not is_valid:
errors.append(f"{file_key}: {error_msg}")
continue
# Remove existing entry if it exists
if index_name in SessionManager.get_reference_lists():
SessionManager.remove_reference_list(index_name)
# Create and store configuration
custom_data = ConfigManager.create_custom_config(
file_config['file_name'],
file_config['content'],
file_config['delimiter'],
word_col,
score_col
)
SessionManager.add_reference_list(index_name, {'token': custom_data})
success_count += 1
return success_count, errors
@staticmethod
def load_reference_list_data(list_config: Dict[str, Any]) -> Dict[str, Any]:
"""Load actual data for a reference list based on its configuration.
Supports both old schema (files.token/files.lemma) and new schema (single file).
"""
from web_app.schema_validator import SchemaValidator
data = {}
# Detect schema version for this specific entry
is_new_schema = any(field in list_config for field in SchemaValidator.NEW_SCHEMA_FIELDS)
# Check if this is a Japanese corpus
is_japanese_corpus = list_config.get('japanese_corpus', False)
# Check if this is a bigram or trigram configuration
columns = list_config.get('columns', {})
is_bigram = 'bigram' in columns
is_trigram = 'trigram' in columns
# Handle different schema formats
if is_new_schema:
# New schema: single file with analysis_type
file_path = list_config.get('file')
analysis_type = list_config.get('analysis_type', 'token')
if file_path:
files_to_process = {analysis_type: file_path}
else:
files_to_process = {}
else:
# Old schema: files.token/files.lemma
files_to_process = list_config.get('files', {})
for file_type, file_path in files_to_process.items():
if file_path is None:
continue
file_path = Path(file_path)
if not file_path.exists():
continue
try:
# Determine delimiter
delimiter = '\t' if list_config.get('format', 'csv') == 'tsv' else ','
# Load file
if list_config.get('has_header', False):
df = pd.read_csv(file_path, delimiter=delimiter, header=0,
quoting=csv.QUOTE_MINIMAL, quotechar='"')
else:
df = pd.read_csv(file_path, delimiter=delimiter, header=None,
quoting=csv.QUOTE_MINIMAL, quotechar='"')
# Get column mapping
columns = list_config.get('columns', {})
if is_japanese_corpus and file_type in ['token', 'lemma']:
# Handle Japanese corpus format with composite keys
processed_data = ConfigManager._parse_japanese_corpus_data(df, columns)
data[file_type] = processed_data
elif file_type in ['token', 'lemma'] and not is_bigram and not is_trigram:
# For standard unigrams
word_col = columns.get('word', 0)
score_col = columns.get('frequency', 1)
if isinstance(word_col, int) and isinstance(score_col, int):
if len(df.columns) > max(word_col, score_col):
# Clean and convert scores to numeric
df.iloc[:, score_col] = pd.to_numeric(df.iloc[:, score_col], errors='coerce')
# Remove rows with NaN scores
df = df.dropna(subset=[df.columns[score_col]])
data[file_type] = dict(zip(
df.iloc[:, word_col].astype(str).str.lower(),
df.iloc[:, score_col]
))
else:
# For n-gram files
for col in df.columns[1:]:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Determine the correct file type for backend
if is_bigram:
data['bigram'] = df
elif is_trigram:
data['trigram'] = df
else:
# For standard unigram files that aren't bigrams or trigrams
data[file_type] = df
except Exception as e:
st.error(f"Error loading {file_type} file {file_path}: {e}")
continue
return data
@staticmethod
def _parse_japanese_corpus_data(df: pd.DataFrame, columns: Dict[str, int]) -> Dict[str, Any]:
"""Parse Japanese corpus data and create multiple lookup dictionaries with hierarchical POS splitting."""
try:
# Get column indices
surface_col_idx = columns.get('surface_form', 1)
lemma_col_idx = columns.get('lemma', 2)
pos_col_idx = columns.get('pos', 3)
freq_col_idx = columns.get('frequency', 6)
# Get actual column names
df_columns = list(df.columns)
surface_col = df_columns[surface_col_idx] if surface_col_idx < len(df_columns) else None
lemma_col = df_columns[lemma_col_idx] if lemma_col_idx < len(df_columns) else None
pos_col = df_columns[pos_col_idx] if pos_col_idx < len(df_columns) else None
freq_col = df_columns[freq_col_idx] if freq_col_idx < len(df_columns) else None
if not all([surface_col, lemma_col, pos_col, freq_col]):
raise ValueError("Missing required columns for Japanese corpus")
# Clean the data
df_clean = df.copy()
# Clean text columns
for col in [surface_col, lemma_col, pos_col]:
df_clean[col] = df_clean[col].astype(str).str.strip()
df_clean = df_clean[df_clean[col] != '']
df_clean = df_clean[df_clean[col] != 'nan']
# Clean and convert frequency column
df_clean[freq_col] = pd.to_numeric(df_clean[freq_col], errors='coerce')
df_clean = df_clean.dropna(subset=[freq_col])
df_clean = df_clean[df_clean[freq_col] > 0] # Only positive frequencies
# Split POS column by hyphen to extract pos1, pos2, pos3
def split_pos(pos_str):
parts = str(pos_str).split('-')
return {
'pos1': parts[0] if len(parts) > 0 else '',
'pos2': parts[1] if len(parts) > 1 else '',
'pos3': parts[2] if len(parts) > 2 else ''
}
pos_split = df_clean[pos_col].apply(split_pos)
df_clean['pos1'] = [p['pos1'] for p in pos_split]
df_clean['pos2'] = [p['pos2'] for p in pos_split]
df_clean['pos3'] = [p['pos3'] for p in pos_split]
# Create multiple levels of composite keys to match UniDic lookup hierarchy
# Level 1: lemma_lForm_pos1_pos2_pos3 (when pos3 exists)
df_clean['level1_key'] = df_clean.apply(
lambda row: f"{row[lemma_col]}_{row[surface_col]}_{row['pos1']}_{row['pos2']}_{row['pos3']}"
if row['pos3'] else None, axis=1
)
# Level 2: lemma_lForm_pos1_pos2
df_clean['level2_key'] = df_clean.apply(
lambda row: f"{row[lemma_col]}_{row[surface_col]}_{row['pos1']}_{row['pos2']}"
if row['pos2'] else None, axis=1
)
# Level 3: lemma_lForm_pos1
df_clean['level3_key'] = df_clean.apply(
lambda row: f"{row[lemma_col]}_{row[surface_col]}_{row['pos1']}"
if row['pos1'] else None, axis=1
)
# Legacy composite key for backward compatibility
df_clean['legacy_key'] = df_clean[lemma_col] + '_' + df_clean[pos_col]
# Create lookup dictionaries for each level
level1_dict = {}
level2_dict = {}
level3_dict = {}
for _, row in df_clean.iterrows():
freq = row[freq_col]
if row['level1_key']:
level1_dict[row['level1_key']] = freq
if row['level2_key']:
level2_dict[row['level2_key']] = freq
if row['level3_key']:
level3_dict[row['level3_key']] = freq
# Return enhanced Japanese corpus data structure
return {
'level1_dict': level1_dict, # Most specific UniDic-compatible keys
'level2_dict': level2_dict,
'level3_dict': level3_dict,
'composite_dict': dict(zip(df_clean['legacy_key'], df_clean[freq_col])), # Legacy format
'lemma_dict': dict(zip(df_clean[lemma_col].str.lower(), df_clean[freq_col])),
'surface_dict': dict(zip(df_clean[surface_col].str.lower(), df_clean[freq_col])),
'is_japanese_corpus': True
}
except Exception as e:
st.error(f"Error parsing Japanese corpus data: {e}")
return {}
@staticmethod
def clean_default_reference_lists():
"""Clean up default reference lists that are no longer selected."""
# This would be called by the UI when managing default reference lists
# Implementation depends on how default lists are managed
pass