Spaces:
Building
Building
| """ | |
| Text processing utilities module. | |
| Contains reusable functions for file handling, encoding detection, and text cleaning. | |
| """ | |
| import os | |
| import tempfile | |
| import chardet | |
| from pathlib import Path | |
| from typing import Union, Tuple, List, Dict, Any, Optional | |
| import logging | |
| import re | |
| from .app_config import AppConfig | |
| logger = logging.getLogger(__name__) | |
| class TextUtility: | |
| """Collection of text processing and file handling utilities.""" | |
| def detect_encoding(content: bytes) -> str: | |
| """ | |
| Detect encoding of byte content. | |
| Args: | |
| content: Byte content to analyze | |
| Returns: | |
| Detected encoding string | |
| """ | |
| try: | |
| # Try chardet for automatic detection | |
| result = chardet.detect(content) | |
| encoding = result.get('encoding', 'utf-8') | |
| # Validate detected encoding against supported list | |
| if encoding and encoding.lower() in [enc.lower() for enc in AppConfig.SUPPORTED_ENCODINGS]: | |
| return encoding | |
| # Fall back to trying supported encodings | |
| for enc in AppConfig.SUPPORTED_ENCODINGS: | |
| try: | |
| content.decode(enc) | |
| return enc | |
| except UnicodeDecodeError: | |
| continue | |
| # Final fallback | |
| return 'utf-8' | |
| except Exception as e: | |
| logger.warning(f"Error detecting encoding: {e}, defaulting to utf-8") | |
| return 'utf-8' | |
| def detect_delimiter(text: str) -> str: | |
| """ | |
| Detect delimiter in text content. | |
| Args: | |
| text: Text content to analyze | |
| Returns: | |
| Detected delimiter | |
| """ | |
| # Count occurrences of each supported delimiter | |
| delimiter_counts = {} | |
| for delimiter in AppConfig.SUPPORTED_DELIMITERS: | |
| delimiter_counts[delimiter] = text.count(delimiter) | |
| # Return the most frequent delimiter, or tab as default | |
| if delimiter_counts: | |
| return max(delimiter_counts, key=delimiter_counts.get) | |
| return '\t' | |
| def clean_text_input(text: str) -> str: | |
| """ | |
| Clean text input by normalizing whitespace and removing problematic characters. | |
| Args: | |
| text: Raw text input | |
| Returns: | |
| Cleaned text | |
| """ | |
| if not text: | |
| return "" | |
| # Normalize whitespace | |
| text = TextUtility.normalize_whitespace(text) | |
| # Remove or replace problematic characters | |
| # Remove null bytes | |
| text = text.replace('\x00', '') | |
| # Normalize unicode | |
| text = text.encode('utf-8', errors='ignore').decode('utf-8') | |
| return text.strip() | |
| def normalize_whitespace(text: str) -> str: | |
| """ | |
| Normalize whitespace in text. | |
| Args: | |
| text: Text to normalize | |
| Returns: | |
| Text with normalized whitespace | |
| """ | |
| if not text: | |
| return "" | |
| # Replace multiple whitespace with single space | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove leading/trailing whitespace from each line | |
| lines = text.split('\n') | |
| lines = [line.strip() for line in lines] | |
| # Remove empty lines at beginning and end | |
| while lines and not lines[0]: | |
| lines.pop(0) | |
| while lines and not lines[-1]: | |
| lines.pop() | |
| return '\n'.join(lines) | |
| def validate_text_length(text: str, max_length: int = None) -> bool: | |
| """ | |
| Validate text length against limits. | |
| Args: | |
| text: Text to validate | |
| max_length: Maximum allowed length (optional) | |
| Returns: | |
| True if text length is valid | |
| """ | |
| if not text: | |
| return False | |
| if max_length and len(text) > max_length: | |
| return False | |
| return True | |
| def extract_text_from_file(file_path: str) -> str: | |
| """ | |
| Extract text content from a file with encoding detection. | |
| Args: | |
| file_path: Path to the file | |
| Returns: | |
| Extracted text content | |
| """ | |
| try: | |
| # Read as bytes first for encoding detection | |
| with open(file_path, 'rb') as f: | |
| content = f.read() | |
| # Detect encoding | |
| encoding = TextUtility.detect_encoding(content) | |
| # Decode with detected encoding | |
| text = content.decode(encoding) | |
| # Clean the text | |
| return TextUtility.clean_text_input(text) | |
| except Exception as e: | |
| logger.error(f"Error extracting text from {file_path}: {e}") | |
| raise ValueError(f"Failed to extract text from file: {e}") | |
| def prepare_batch_files(file_paths: List[str]) -> List[Tuple[str, str]]: | |
| """ | |
| Prepare batch files for processing by extracting text content. | |
| Args: | |
| file_paths: List of file paths | |
| Returns: | |
| List of tuples (file_path, text_content) | |
| """ | |
| prepared_files = [] | |
| for file_path in file_paths: | |
| try: | |
| text_content = TextUtility.extract_text_from_file(file_path) | |
| prepared_files.append((file_path, text_content)) | |
| except Exception as e: | |
| logger.error(f"Error preparing file {file_path}: {e}") | |
| # Add error entry | |
| prepared_files.append((file_path, f"ERROR: {e}")) | |
| return prepared_files | |
| def sanitize_filename(filename: str) -> str: | |
| """ | |
| Sanitize filename by removing problematic characters. | |
| Args: | |
| filename: Original filename | |
| Returns: | |
| Sanitized filename | |
| """ | |
| # Remove or replace problematic characters | |
| filename = re.sub(r'[<>:"/\\|?*]', '_', filename) | |
| # Remove control characters | |
| filename = ''.join(char for char in filename if ord(char) >= 32) | |
| # Limit length | |
| if len(filename) > 255: | |
| name, ext = os.path.splitext(filename) | |
| filename = name[:255-len(ext)] + ext | |
| return filename or "unnamed_file" | |
| def create_safe_temp_file(content: str, suffix: str = '.txt') -> str: | |
| """ | |
| Create a temporary file with given content safely. | |
| Args: | |
| content: Content to write to file | |
| suffix: File suffix | |
| Returns: | |
| Path to created temporary file | |
| """ | |
| try: | |
| with tempfile.NamedTemporaryFile(mode='w', suffix=suffix, delete=False, encoding='utf-8') as f: | |
| f.write(content) | |
| return f.name | |
| except Exception as e: | |
| logger.error(f"Error creating temporary file: {e}") | |
| raise ValueError(f"Failed to create temporary file: {e}") | |
| def load_corpus_config(corpus_name: str) -> Dict[str, Any]: | |
| """ | |
| Load specific corpus configuration from reference_lists.yaml | |
| Args: | |
| corpus_name: Name of the corpus | |
| Returns: | |
| Corpus configuration dictionary | |
| """ | |
| return AppConfig.get_corpus_configuration(corpus_name) | |
| def get_column_mapping(config: Dict, corpus_type: str = 'columns') -> Dict[str, int]: | |
| """ | |
| Extract column mappings from corpus configuration | |
| Args: | |
| config: Corpus configuration dictionary | |
| corpus_type: Type of mapping to extract | |
| Returns: | |
| Dictionary mapping column names to indices | |
| """ | |
| return config.get(corpus_type, {}) | |
| def cleanup_temp_files(file_paths: List[str]) -> None: | |
| """ | |
| Clean up temporary files safely. | |
| Args: | |
| file_paths: List of temporary file paths to clean up | |
| """ | |
| for file_path in file_paths: | |
| try: | |
| if os.path.exists(file_path): | |
| os.unlink(file_path) | |
| except Exception as e: | |
| logger.warning(f"Error cleaning up temporary file {file_path}: {e}") | |