egumasa's picture
Japanese language support
dbc9105
"""
Text processing utilities module.
Contains reusable functions for file handling, encoding detection, and text cleaning.
"""
import os
import tempfile
import chardet
from pathlib import Path
from typing import Union, Tuple, List, Dict, Any, Optional
import logging
import re
from .app_config import AppConfig
logger = logging.getLogger(__name__)
class TextUtility:
"""Collection of text processing and file handling utilities."""
@staticmethod
def detect_encoding(content: bytes) -> str:
"""
Detect encoding of byte content.
Args:
content: Byte content to analyze
Returns:
Detected encoding string
"""
try:
# Try chardet for automatic detection
result = chardet.detect(content)
encoding = result.get('encoding', 'utf-8')
# Validate detected encoding against supported list
if encoding and encoding.lower() in [enc.lower() for enc in AppConfig.SUPPORTED_ENCODINGS]:
return encoding
# Fall back to trying supported encodings
for enc in AppConfig.SUPPORTED_ENCODINGS:
try:
content.decode(enc)
return enc
except UnicodeDecodeError:
continue
# Final fallback
return 'utf-8'
except Exception as e:
logger.warning(f"Error detecting encoding: {e}, defaulting to utf-8")
return 'utf-8'
@staticmethod
def detect_delimiter(text: str) -> str:
"""
Detect delimiter in text content.
Args:
text: Text content to analyze
Returns:
Detected delimiter
"""
# Count occurrences of each supported delimiter
delimiter_counts = {}
for delimiter in AppConfig.SUPPORTED_DELIMITERS:
delimiter_counts[delimiter] = text.count(delimiter)
# Return the most frequent delimiter, or tab as default
if delimiter_counts:
return max(delimiter_counts, key=delimiter_counts.get)
return '\t'
@staticmethod
def clean_text_input(text: str) -> str:
"""
Clean text input by normalizing whitespace and removing problematic characters.
Args:
text: Raw text input
Returns:
Cleaned text
"""
if not text:
return ""
# Normalize whitespace
text = TextUtility.normalize_whitespace(text)
# Remove or replace problematic characters
# Remove null bytes
text = text.replace('\x00', '')
# Normalize unicode
text = text.encode('utf-8', errors='ignore').decode('utf-8')
return text.strip()
@staticmethod
def normalize_whitespace(text: str) -> str:
"""
Normalize whitespace in text.
Args:
text: Text to normalize
Returns:
Text with normalized whitespace
"""
if not text:
return ""
# Replace multiple whitespace with single space
text = re.sub(r'\s+', ' ', text)
# Remove leading/trailing whitespace from each line
lines = text.split('\n')
lines = [line.strip() for line in lines]
# Remove empty lines at beginning and end
while lines and not lines[0]:
lines.pop(0)
while lines and not lines[-1]:
lines.pop()
return '\n'.join(lines)
@staticmethod
def validate_text_length(text: str, max_length: int = None) -> bool:
"""
Validate text length against limits.
Args:
text: Text to validate
max_length: Maximum allowed length (optional)
Returns:
True if text length is valid
"""
if not text:
return False
if max_length and len(text) > max_length:
return False
return True
@staticmethod
def extract_text_from_file(file_path: str) -> str:
"""
Extract text content from a file with encoding detection.
Args:
file_path: Path to the file
Returns:
Extracted text content
"""
try:
# Read as bytes first for encoding detection
with open(file_path, 'rb') as f:
content = f.read()
# Detect encoding
encoding = TextUtility.detect_encoding(content)
# Decode with detected encoding
text = content.decode(encoding)
# Clean the text
return TextUtility.clean_text_input(text)
except Exception as e:
logger.error(f"Error extracting text from {file_path}: {e}")
raise ValueError(f"Failed to extract text from file: {e}")
@staticmethod
def prepare_batch_files(file_paths: List[str]) -> List[Tuple[str, str]]:
"""
Prepare batch files for processing by extracting text content.
Args:
file_paths: List of file paths
Returns:
List of tuples (file_path, text_content)
"""
prepared_files = []
for file_path in file_paths:
try:
text_content = TextUtility.extract_text_from_file(file_path)
prepared_files.append((file_path, text_content))
except Exception as e:
logger.error(f"Error preparing file {file_path}: {e}")
# Add error entry
prepared_files.append((file_path, f"ERROR: {e}"))
return prepared_files
@staticmethod
def sanitize_filename(filename: str) -> str:
"""
Sanitize filename by removing problematic characters.
Args:
filename: Original filename
Returns:
Sanitized filename
"""
# Remove or replace problematic characters
filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
# Remove control characters
filename = ''.join(char for char in filename if ord(char) >= 32)
# Limit length
if len(filename) > 255:
name, ext = os.path.splitext(filename)
filename = name[:255-len(ext)] + ext
return filename or "unnamed_file"
@staticmethod
def create_safe_temp_file(content: str, suffix: str = '.txt') -> str:
"""
Create a temporary file with given content safely.
Args:
content: Content to write to file
suffix: File suffix
Returns:
Path to created temporary file
"""
try:
with tempfile.NamedTemporaryFile(mode='w', suffix=suffix, delete=False, encoding='utf-8') as f:
f.write(content)
return f.name
except Exception as e:
logger.error(f"Error creating temporary file: {e}")
raise ValueError(f"Failed to create temporary file: {e}")
@staticmethod
def load_corpus_config(corpus_name: str) -> Dict[str, Any]:
"""
Load specific corpus configuration from reference_lists.yaml
Args:
corpus_name: Name of the corpus
Returns:
Corpus configuration dictionary
"""
return AppConfig.get_corpus_configuration(corpus_name)
@staticmethod
def get_column_mapping(config: Dict, corpus_type: str = 'columns') -> Dict[str, int]:
"""
Extract column mappings from corpus configuration
Args:
config: Corpus configuration dictionary
corpus_type: Type of mapping to extract
Returns:
Dictionary mapping column names to indices
"""
return config.get(corpus_type, {})
@staticmethod
def cleanup_temp_files(file_paths: List[str]) -> None:
"""
Clean up temporary files safely.
Args:
file_paths: List of temporary file paths to clean up
"""
for file_path in file_paths:
try:
if os.path.exists(file_path):
os.unlink(file_path)
except Exception as e:
logger.warning(f"Error cleaning up temporary file {file_path}: {e}")