Spaces:
Building
Building
| """ | |
| Base analyzer module providing shared SpaCy infrastructure. | |
| Eliminates code duplication and provides common functionality for all SpaCy-based analyzers. | |
| """ | |
| import spacy | |
| from typing import Dict, List, Any, Optional, Iterator, Tuple, TYPE_CHECKING | |
| import logging | |
| import tempfile | |
| from pathlib import Path | |
| import os | |
| from .app_config import AppConfig | |
| from .text_utility import TextUtility | |
| # Import UniDic extensions and enricher | |
| try: | |
| from . import unidic_extensions # This registers the token extensions | |
| from .unidic_enricher import UniDicEnricher | |
| UNIDIC_AVAILABLE = True | |
| except ImportError as e: | |
| logger.warning(f"UniDic integration not available: {e}") | |
| UNIDIC_AVAILABLE = False | |
| UniDicEnricher = None | |
| if TYPE_CHECKING: | |
| import spacy | |
| logger = logging.getLogger(__name__) | |
| class BaseAnalyzer: | |
| """ | |
| Base class for all SpaCy-based text analyzers. | |
| Provides shared model loading, document processing, and utility functions. | |
| """ | |
| def __init__(self, language: str = None, model_size: str = None, gpu_device: Optional[int] = None): | |
| """ | |
| Initialize the base analyzer. | |
| Args: | |
| language: Language code ('en' or 'ja') | |
| model_size: Model size ('md' or 'trf') | |
| gpu_device: GPU device ID to use (None for auto-detect, -1 for CPU only) | |
| """ | |
| self.language = language or AppConfig.DEFAULT_LANGUAGE | |
| self.model_size = model_size or AppConfig.DEFAULT_MODEL_SIZE | |
| self.gpu_device = gpu_device | |
| self.nlp = None | |
| self._model_info = {} | |
| self.unidic_enricher = None | |
| self._using_gpu = False | |
| self._load_spacy_model() | |
| # Initialize UniDic enricher for Japanese | |
| if self.language == 'ja' and UNIDIC_AVAILABLE: | |
| try: | |
| self.unidic_enricher = UniDicEnricher() | |
| logger.info("UniDic enricher initialized for Japanese analysis") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize UniDic enricher: {e}") | |
| self.unidic_enricher = None | |
| def _detect_gpu_availability(self) -> Tuple[bool, Optional[str], Optional[int]]: | |
| """ | |
| Detect if GPU/CUDA is available for spaCy processing. | |
| Returns: | |
| Tuple of (is_available, device_name, device_id) | |
| """ | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| device_count = torch.cuda.device_count() | |
| if device_count > 0: | |
| # Use specified device or default to 0 | |
| if self.gpu_device is not None and self.gpu_device >= 0: | |
| device_id = min(self.gpu_device, device_count - 1) | |
| else: | |
| device_id = 0 | |
| device_name = torch.cuda.get_device_name(device_id) | |
| return True, device_name, device_id | |
| return False, None, None | |
| except ImportError: | |
| logger.debug("PyTorch not available - GPU support disabled") | |
| return False, None, None | |
| except Exception as e: | |
| logger.warning(f"Error detecting GPU: {e}") | |
| return False, None, None | |
| def _configure_gpu_for_spacy(self) -> bool: | |
| """ | |
| Configure spaCy to use GPU if available with strong enforcement. | |
| Returns: | |
| True if GPU was successfully configured, False otherwise | |
| """ | |
| # Check if GPU should be disabled explicitly | |
| if self.gpu_device == -1: | |
| logger.info("GPU explicitly disabled by user") | |
| return False | |
| # Check if GPU is disabled via environment variable | |
| if os.environ.get('SPACY_USE_GPU', '').lower() == 'false': | |
| logger.info("GPU disabled via SPACY_USE_GPU environment variable") | |
| return False | |
| gpu_available, device_name, device_id = self._detect_gpu_availability() | |
| if not gpu_available: | |
| # For transformer models, this is a critical issue | |
| if self.model_size == 'trf': | |
| logger.warning("No GPU/CUDA device available for transformer model - performance will be degraded") | |
| else: | |
| logger.info("No GPU/CUDA device available - using CPU") | |
| return False | |
| try: | |
| # Import torch to set device explicitly | |
| import torch | |
| # Set CUDA device globally for all operations | |
| torch.cuda.set_device(device_id) | |
| os.environ['CUDA_VISIBLE_DEVICES'] = str(device_id) | |
| # Force spaCy to use GPU - use require_gpu for stronger enforcement | |
| try: | |
| spacy.require_gpu(gpu_id=device_id) | |
| logger.info(f"Successfully enforced GPU usage with spacy.require_gpu()") | |
| except Exception as e: | |
| # Fallback to prefer_gpu if require_gpu fails | |
| logger.warning(f"spacy.require_gpu() failed: {e}, trying prefer_gpu()") | |
| gpu_id = spacy.prefer_gpu(gpu_id=device_id) | |
| if gpu_id is False: | |
| raise RuntimeError("spacy.prefer_gpu() returned False despite GPU being available") | |
| logger.info(f"GPU strongly configured for spaCy - using {device_name} (device {device_id})") | |
| # Set environment variable to ensure GPU usage | |
| os.environ['SPACY_PREFER_GPU'] = '1' | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to enable GPU for spaCy: {e}") | |
| # For transformer models, this is critical | |
| if self.model_size == 'trf': | |
| logger.error("GPU initialization failed for transformer model - processing will be slow") | |
| return False | |
| def _configure_batch_sizes(self) -> None: | |
| """Configure optimal batch sizes for GPU processing.""" | |
| if self.model_size == 'trf': | |
| # Transformer models need smaller batch sizes due to memory constraints | |
| # But GPU can handle larger batches than CPU | |
| if hasattr(self.nlp, 'pipe'): | |
| for pipe_name in self.nlp.pipe_names: | |
| pipe = self.nlp.get_pipe(pipe_name) | |
| if hasattr(pipe, 'cfg'): | |
| # Set batch size based on available GPU memory | |
| # These are conservative defaults that work on most GPUs | |
| if pipe_name == 'transformer': | |
| pipe.cfg['batch_size'] = 128 # Transformer batch size | |
| else: | |
| pipe.cfg['batch_size'] = 256 # Other components | |
| else: | |
| # Non-transformer models can use larger batches | |
| if hasattr(self.nlp, 'pipe'): | |
| for pipe_name in self.nlp.pipe_names: | |
| pipe = self.nlp.get_pipe(pipe_name) | |
| if hasattr(pipe, 'cfg'): | |
| pipe.cfg['batch_size'] = 1024 | |
| def _force_model_to_gpu(self) -> bool: | |
| """ | |
| Force all model components to GPU after loading. | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not self._using_gpu or not self.nlp: | |
| return False | |
| try: | |
| import torch | |
| # Force each pipeline component to GPU | |
| for pipe_name, pipe in self.nlp.pipeline: | |
| if hasattr(pipe, 'model'): | |
| # Move the model to GPU | |
| if hasattr(pipe.model, 'to'): | |
| pipe.model.to('cuda:0') | |
| logger.debug(f"Moved '{pipe_name}' component to GPU") | |
| # Special handling for transformer components | |
| if pipe_name == 'transformer' and hasattr(pipe, 'model'): | |
| # Ensure transformer model is on GPU | |
| if hasattr(pipe.model, 'transformer'): | |
| pipe.model.transformer.to('cuda:0') | |
| logger.info(f"Transformer component forcefully moved to GPU") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to force model components to GPU: {e}") | |
| return False | |
| def _verify_gpu_usage(self) -> bool: | |
| """ | |
| Verify that model components are actually using GPU. | |
| Returns: | |
| True if GPU is being used, False otherwise | |
| """ | |
| if not self._using_gpu or not self.nlp: | |
| return False | |
| try: | |
| import torch | |
| gpu_components = [] | |
| cpu_components = [] | |
| for pipe_name, pipe in self.nlp.pipeline: | |
| if hasattr(pipe, 'model'): | |
| # Check device of model parameters | |
| is_on_gpu = False | |
| if hasattr(pipe.model, 'parameters'): | |
| # Check if any parameters are on GPU | |
| for param in pipe.model.parameters(): | |
| if param.is_cuda: | |
| is_on_gpu = True | |
| break | |
| elif hasattr(pipe.model, 'device'): | |
| # Check device attribute | |
| device = str(pipe.model.device) | |
| is_on_gpu = 'cuda' in device | |
| if is_on_gpu: | |
| gpu_components.append(pipe_name) | |
| else: | |
| cpu_components.append(pipe_name) | |
| if gpu_components: | |
| logger.info(f"Components on GPU: {', '.join(gpu_components)}") | |
| if cpu_components: | |
| logger.warning(f"Components still on CPU: {', '.join(cpu_components)}") | |
| # For transformer models, ensure the transformer component is on GPU | |
| if self.model_size == 'trf' and 'transformer' not in gpu_components: | |
| logger.error("Transformer component is not on GPU!") | |
| return False | |
| return len(gpu_components) > 0 | |
| except Exception as e: | |
| logger.error(f"Failed to verify GPU usage: {e}") | |
| return False | |
| def _load_spacy_model(self) -> None: | |
| """Load appropriate SpaCy model based on language and size with strong GPU enforcement.""" | |
| # Validate combination | |
| if not AppConfig.validate_language_model_combination(self.language, self.model_size): | |
| raise ValueError(f"Unsupported language/model combination: {self.language}/{self.model_size}") | |
| model_name = AppConfig.get_spacy_model_name(self.language, self.model_size) | |
| if not model_name: | |
| raise ValueError(f"No model found for language '{self.language}' and size '{self.model_size}'") | |
| # Configure GPU BEFORE loading model - this is critical | |
| self._using_gpu = self._configure_gpu_for_spacy() | |
| try: | |
| # Load model with optimizations for GPU if available | |
| if self._using_gpu and self.model_size == 'trf': | |
| # Enable mixed precision for transformer models on GPU | |
| self.nlp = spacy.load(model_name, config={"components": {"transformer": {"model": {"mixed_precision": True}}}}) | |
| else: | |
| self.nlp = spacy.load(model_name) | |
| # Force model components to GPU after loading | |
| if self._using_gpu: | |
| gpu_forced = self._force_model_to_gpu() | |
| if not gpu_forced: | |
| logger.warning("Failed to force model components to GPU") | |
| # Verify GPU usage | |
| gpu_verified = self._verify_gpu_usage() | |
| if not gpu_verified and self.model_size == 'trf': | |
| logger.error("GPU verification failed for transformer model") | |
| # Get GPU info for model info | |
| gpu_info = "CPU" | |
| if self._using_gpu: | |
| gpu_available, device_name, device_id = self._detect_gpu_availability() | |
| if gpu_available: | |
| gpu_info = f"GPU ({device_name}, device {device_id})" | |
| # Add verification status | |
| if self._verify_gpu_usage(): | |
| gpu_info += " [VERIFIED]" | |
| else: | |
| gpu_info += " [NOT VERIFIED]" | |
| self._model_info = { | |
| 'name': model_name, | |
| 'language': self.language, | |
| 'model_size': self.model_size, | |
| 'version': spacy.__version__, | |
| 'device': gpu_info, | |
| 'gpu_enabled': self._using_gpu | |
| } | |
| logger.info(f"Loaded SpaCy model: {model_name} on {gpu_info}") | |
| # Configure batch sizes for optimal GPU performance | |
| if self._using_gpu and hasattr(self.nlp, 'pipe'): | |
| # Increase batch size for GPU processing | |
| self._configure_batch_sizes() | |
| except OSError as e: | |
| error_msg = f"SpaCy model {model_name} not found. Please install it first." | |
| logger.error(error_msg) | |
| raise OSError(error_msg) from e | |
| except Exception as e: | |
| logger.error(f"Error loading SpaCy model: {e}") | |
| # Try fallback to CPU if GPU loading failed | |
| if self._using_gpu: | |
| logger.warning("Falling back to CPU after GPU loading failed") | |
| self._using_gpu = False | |
| try: | |
| self.nlp = spacy.load(model_name) | |
| self._model_info['device'] = 'CPU (fallback)' | |
| self._model_info['gpu_enabled'] = False | |
| logger.info(f"Successfully loaded {model_name} on CPU after GPU failure") | |
| except Exception as cpu_error: | |
| raise ValueError(f"Failed to load model on both GPU and CPU: {cpu_error}") from cpu_error | |
| else: | |
| raise | |
| def get_model_info(self) -> Dict[str, str]: | |
| """ | |
| Get information about the loaded model. | |
| Returns: | |
| Dictionary with model information | |
| """ | |
| return self._model_info.copy() | |
| def process_document(self, text: str) -> "spacy.Doc": | |
| """ | |
| Process text into a SpaCy document. | |
| Args: | |
| text: Input text to process | |
| Returns: | |
| Processed SpaCy document | |
| Raises: | |
| ValueError: If model not loaded or text processing fails | |
| """ | |
| if not self.nlp: | |
| raise ValueError("SpaCy model not loaded") | |
| if not text or not text.strip(): | |
| raise ValueError("Empty text provided") | |
| try: | |
| # Clean text before processing | |
| cleaned_text = TextUtility.clean_text_input(text) | |
| # Process with SpaCy | |
| doc = self.nlp(cleaned_text) | |
| # Add UniDic enrichment for Japanese | |
| if self.unidic_enricher and self.language == 'ja': | |
| try: | |
| self.unidic_enricher.enrich_spacy_doc(doc, cleaned_text) | |
| logger.debug("UniDic enrichment completed") | |
| except Exception as e: | |
| logger.warning(f"UniDic enrichment failed: {e}") | |
| return doc | |
| except Exception as e: | |
| self.handle_processing_error(e, f"processing text of length {len(text)}") | |
| raise | |
| def handle_processing_error(self, error: Exception, context: str) -> None: | |
| """ | |
| Handle processing errors with appropriate logging. | |
| Args: | |
| error: The exception that occurred | |
| context: Context description for the error | |
| """ | |
| error_msg = f"Error {context}: {error}" | |
| logger.error(error_msg) | |
| def filter_tokens(self, | |
| doc: "spacy.Doc", | |
| exclude_punct: bool = True, | |
| exclude_space: bool = True, | |
| word_type_filter: Optional[str] = None) -> List["spacy.Token"]: | |
| """ | |
| Filter tokens based on various criteria. | |
| Args: | |
| doc: SpaCy document | |
| exclude_punct: Whether to exclude punctuation | |
| exclude_space: Whether to exclude spaces | |
| word_type_filter: Filter by word type ('CW', 'FW', or None) | |
| Returns: | |
| List of filtered tokens | |
| """ | |
| filtered_tokens = [] | |
| for token in doc: | |
| # Basic filtering | |
| if exclude_space and token.is_space: | |
| continue | |
| if exclude_punct and token.is_punct: | |
| continue | |
| # Word type filtering | |
| if word_type_filter: | |
| word_type = self._classify_pos(token) | |
| if word_type != word_type_filter: | |
| continue | |
| filtered_tokens.append(token) | |
| return filtered_tokens | |
| def _classify_pos(self, token: "spacy.Token") -> str: | |
| """ | |
| Classify token as content word (CW) or function word (FW). | |
| Args: | |
| token: SpaCy token object | |
| Returns: | |
| 'CW' for content words, 'FW' for function words | |
| """ | |
| content_pos = {'NOUN', 'VERB', 'ADJ', 'ADV'} | |
| function_pos = {'DET', 'PRON', 'ADP', 'CONJ', 'CCONJ', 'SCONJ'} | |
| if token.pos_ in content_pos: | |
| return 'CW' | |
| elif token.pos_ in function_pos: | |
| return 'FW' | |
| else: | |
| # Default classification for ambiguous cases | |
| return 'CW' if token.pos_ not in {'PUNCT', 'SPACE', 'X'} else 'FW' | |
| def format_token_for_display(self, token: "spacy.Token", include_syntax: bool = True) -> Dict[str, Any]: | |
| """ | |
| Format token for UI display - only call when needed for output. | |
| Args: | |
| token: SpaCy token | |
| include_syntax: Whether to include syntactic information (dep_, head, etc.) | |
| Returns: | |
| Formatted token data dictionary for display | |
| """ | |
| result = { | |
| 'token': token.text, | |
| 'lemma': token.lemma_, | |
| 'pos': token.pos_, | |
| 'tag': token.tag_, | |
| 'word_type': self._classify_pos(token) | |
| } | |
| if include_syntax: | |
| result.update({ | |
| 'dep_': token.dep_, | |
| 'head_text': token.head.text, | |
| 'head_pos': token.head.pos_, | |
| }) | |
| return result | |
| def get_syntactic_context(self, token: "spacy.Token") -> Dict[str, Any]: | |
| """ | |
| Get comprehensive syntactic relationships for a token. | |
| Args: | |
| token: SpaCy token | |
| Returns: | |
| Dictionary with syntactic context information | |
| """ | |
| return { | |
| 'dep_': token.dep_, | |
| 'head': token.head, | |
| 'children': list(token.children), | |
| 'ancestors': list(token.ancestors), | |
| 'subtree_span': token.subtree, | |
| 'left_edge': token.left_edge, | |
| 'right_edge': token.right_edge | |
| } | |
| def process_sentences(self, | |
| doc: "spacy.Doc", | |
| max_tokens: Optional[int] = None) -> List["spacy.Span"]: | |
| """ | |
| Process sentences with optional token limits. | |
| Args: | |
| doc: SpaCy document | |
| max_tokens: Maximum tokens per sentence (uses config default if None) | |
| Returns: | |
| List of sentence spans | |
| """ | |
| max_tokens = max_tokens or AppConfig.MAX_TOKENS_FOR_VISUALIZATION | |
| processed_sentences = [] | |
| for sent in doc.sents: | |
| # Filter tokens (exclude spaces for counting) | |
| sent_tokens = [token for token in sent if not token.is_space] | |
| if len(sent_tokens) > max_tokens: | |
| # Truncate sentence | |
| truncated_tokens = sent_tokens[:max_tokens] | |
| # Create new span with truncated tokens | |
| start_idx = truncated_tokens[0].i | |
| end_idx = truncated_tokens[-1].i + 1 | |
| truncated_span = doc[start_idx:end_idx] | |
| processed_sentences.append(truncated_span) | |
| else: | |
| processed_sentences.append(sent) | |
| return processed_sentences | |
| def setup_batch_processing(self, file_paths: List[str]) -> Iterator[Tuple[str, str]]: | |
| """ | |
| Set up batch processing for multiple files. | |
| Args: | |
| file_paths: List of file paths to process | |
| Yields: | |
| Tuples of (file_path, text_content) | |
| """ | |
| for file_path in file_paths: | |
| try: | |
| text_content = TextUtility.extract_text_from_file(file_path) | |
| yield file_path, text_content | |
| except Exception as e: | |
| logger.error(f"Error processing file {file_path}: {e}") | |
| yield file_path, f"ERROR: {e}" | |
| def cleanup_batch_processing(self, temp_files: List[str]) -> None: | |
| """ | |
| Clean up temporary files from batch processing. | |
| Args: | |
| temp_files: List of temporary file paths | |
| """ | |
| TextUtility.cleanup_temp_files(temp_files) | |