""" Base analyzer module providing shared SpaCy infrastructure. Eliminates code duplication and provides common functionality for all SpaCy-based analyzers. """ import spacy from typing import Dict, List, Any, Optional, Iterator, Tuple, TYPE_CHECKING import logging import tempfile from pathlib import Path import os from .app_config import AppConfig from .text_utility import TextUtility # Import UniDic extensions and enricher try: from . import unidic_extensions # This registers the token extensions from .unidic_enricher import UniDicEnricher UNIDIC_AVAILABLE = True except ImportError as e: logger.warning(f"UniDic integration not available: {e}") UNIDIC_AVAILABLE = False UniDicEnricher = None if TYPE_CHECKING: import spacy logger = logging.getLogger(__name__) class BaseAnalyzer: """ Base class for all SpaCy-based text analyzers. Provides shared model loading, document processing, and utility functions. """ def __init__(self, language: str = None, model_size: str = None, gpu_device: Optional[int] = None): """ Initialize the base analyzer. Args: language: Language code ('en' or 'ja') model_size: Model size ('md' or 'trf') gpu_device: GPU device ID to use (None for auto-detect, -1 for CPU only) """ self.language = language or AppConfig.DEFAULT_LANGUAGE self.model_size = model_size or AppConfig.DEFAULT_MODEL_SIZE self.gpu_device = gpu_device self.nlp = None self._model_info = {} self.unidic_enricher = None self._using_gpu = False self._load_spacy_model() # Initialize UniDic enricher for Japanese if self.language == 'ja' and UNIDIC_AVAILABLE: try: self.unidic_enricher = UniDicEnricher() logger.info("UniDic enricher initialized for Japanese analysis") except Exception as e: logger.warning(f"Failed to initialize UniDic enricher: {e}") self.unidic_enricher = None def _detect_gpu_availability(self) -> Tuple[bool, Optional[str], Optional[int]]: """ Detect if GPU/CUDA is available for spaCy processing. Returns: Tuple of (is_available, device_name, device_id) """ try: import torch if torch.cuda.is_available(): device_count = torch.cuda.device_count() if device_count > 0: # Use specified device or default to 0 if self.gpu_device is not None and self.gpu_device >= 0: device_id = min(self.gpu_device, device_count - 1) else: device_id = 0 device_name = torch.cuda.get_device_name(device_id) return True, device_name, device_id return False, None, None except ImportError: logger.debug("PyTorch not available - GPU support disabled") return False, None, None except Exception as e: logger.warning(f"Error detecting GPU: {e}") return False, None, None def _configure_gpu_for_spacy(self) -> bool: """ Configure spaCy to use GPU if available with strong enforcement. Returns: True if GPU was successfully configured, False otherwise """ # Check if GPU should be disabled explicitly if self.gpu_device == -1: logger.info("GPU explicitly disabled by user") return False # Check if GPU is disabled via environment variable if os.environ.get('SPACY_USE_GPU', '').lower() == 'false': logger.info("GPU disabled via SPACY_USE_GPU environment variable") return False gpu_available, device_name, device_id = self._detect_gpu_availability() if not gpu_available: # For transformer models, this is a critical issue if self.model_size == 'trf': logger.warning("No GPU/CUDA device available for transformer model - performance will be degraded") else: logger.info("No GPU/CUDA device available - using CPU") return False try: # Import torch to set device explicitly import torch # Set CUDA device globally for all operations torch.cuda.set_device(device_id) os.environ['CUDA_VISIBLE_DEVICES'] = str(device_id) # Force spaCy to use GPU - use require_gpu for stronger enforcement try: spacy.require_gpu(gpu_id=device_id) logger.info(f"Successfully enforced GPU usage with spacy.require_gpu()") except Exception as e: # Fallback to prefer_gpu if require_gpu fails logger.warning(f"spacy.require_gpu() failed: {e}, trying prefer_gpu()") gpu_id = spacy.prefer_gpu(gpu_id=device_id) if gpu_id is False: raise RuntimeError("spacy.prefer_gpu() returned False despite GPU being available") logger.info(f"GPU strongly configured for spaCy - using {device_name} (device {device_id})") # Set environment variable to ensure GPU usage os.environ['SPACY_PREFER_GPU'] = '1' return True except Exception as e: logger.error(f"Failed to enable GPU for spaCy: {e}") # For transformer models, this is critical if self.model_size == 'trf': logger.error("GPU initialization failed for transformer model - processing will be slow") return False def _configure_batch_sizes(self) -> None: """Configure optimal batch sizes for GPU processing.""" if self.model_size == 'trf': # Transformer models need smaller batch sizes due to memory constraints # But GPU can handle larger batches than CPU if hasattr(self.nlp, 'pipe'): for pipe_name in self.nlp.pipe_names: pipe = self.nlp.get_pipe(pipe_name) if hasattr(pipe, 'cfg'): # Set batch size based on available GPU memory # These are conservative defaults that work on most GPUs if pipe_name == 'transformer': pipe.cfg['batch_size'] = 128 # Transformer batch size else: pipe.cfg['batch_size'] = 256 # Other components else: # Non-transformer models can use larger batches if hasattr(self.nlp, 'pipe'): for pipe_name in self.nlp.pipe_names: pipe = self.nlp.get_pipe(pipe_name) if hasattr(pipe, 'cfg'): pipe.cfg['batch_size'] = 1024 def _force_model_to_gpu(self) -> bool: """ Force all model components to GPU after loading. Returns: True if successful, False otherwise """ if not self._using_gpu or not self.nlp: return False try: import torch # Force each pipeline component to GPU for pipe_name, pipe in self.nlp.pipeline: if hasattr(pipe, 'model'): # Move the model to GPU if hasattr(pipe.model, 'to'): pipe.model.to('cuda:0') logger.debug(f"Moved '{pipe_name}' component to GPU") # Special handling for transformer components if pipe_name == 'transformer' and hasattr(pipe, 'model'): # Ensure transformer model is on GPU if hasattr(pipe.model, 'transformer'): pipe.model.transformer.to('cuda:0') logger.info(f"Transformer component forcefully moved to GPU") return True except Exception as e: logger.error(f"Failed to force model components to GPU: {e}") return False def _verify_gpu_usage(self) -> bool: """ Verify that model components are actually using GPU. Returns: True if GPU is being used, False otherwise """ if not self._using_gpu or not self.nlp: return False try: import torch gpu_components = [] cpu_components = [] for pipe_name, pipe in self.nlp.pipeline: if hasattr(pipe, 'model'): # Check device of model parameters is_on_gpu = False if hasattr(pipe.model, 'parameters'): # Check if any parameters are on GPU for param in pipe.model.parameters(): if param.is_cuda: is_on_gpu = True break elif hasattr(pipe.model, 'device'): # Check device attribute device = str(pipe.model.device) is_on_gpu = 'cuda' in device if is_on_gpu: gpu_components.append(pipe_name) else: cpu_components.append(pipe_name) if gpu_components: logger.info(f"Components on GPU: {', '.join(gpu_components)}") if cpu_components: logger.warning(f"Components still on CPU: {', '.join(cpu_components)}") # For transformer models, ensure the transformer component is on GPU if self.model_size == 'trf' and 'transformer' not in gpu_components: logger.error("Transformer component is not on GPU!") return False return len(gpu_components) > 0 except Exception as e: logger.error(f"Failed to verify GPU usage: {e}") return False def _load_spacy_model(self) -> None: """Load appropriate SpaCy model based on language and size with strong GPU enforcement.""" # Validate combination if not AppConfig.validate_language_model_combination(self.language, self.model_size): raise ValueError(f"Unsupported language/model combination: {self.language}/{self.model_size}") model_name = AppConfig.get_spacy_model_name(self.language, self.model_size) if not model_name: raise ValueError(f"No model found for language '{self.language}' and size '{self.model_size}'") # Configure GPU BEFORE loading model - this is critical self._using_gpu = self._configure_gpu_for_spacy() try: # Load model with optimizations for GPU if available if self._using_gpu and self.model_size == 'trf': # Enable mixed precision for transformer models on GPU self.nlp = spacy.load(model_name, config={"components": {"transformer": {"model": {"mixed_precision": True}}}}) else: self.nlp = spacy.load(model_name) # Force model components to GPU after loading if self._using_gpu: gpu_forced = self._force_model_to_gpu() if not gpu_forced: logger.warning("Failed to force model components to GPU") # Verify GPU usage gpu_verified = self._verify_gpu_usage() if not gpu_verified and self.model_size == 'trf': logger.error("GPU verification failed for transformer model") # Get GPU info for model info gpu_info = "CPU" if self._using_gpu: gpu_available, device_name, device_id = self._detect_gpu_availability() if gpu_available: gpu_info = f"GPU ({device_name}, device {device_id})" # Add verification status if self._verify_gpu_usage(): gpu_info += " [VERIFIED]" else: gpu_info += " [NOT VERIFIED]" self._model_info = { 'name': model_name, 'language': self.language, 'model_size': self.model_size, 'version': spacy.__version__, 'device': gpu_info, 'gpu_enabled': self._using_gpu } logger.info(f"Loaded SpaCy model: {model_name} on {gpu_info}") # Configure batch sizes for optimal GPU performance if self._using_gpu and hasattr(self.nlp, 'pipe'): # Increase batch size for GPU processing self._configure_batch_sizes() except OSError as e: error_msg = f"SpaCy model {model_name} not found. Please install it first." logger.error(error_msg) raise OSError(error_msg) from e except Exception as e: logger.error(f"Error loading SpaCy model: {e}") # Try fallback to CPU if GPU loading failed if self._using_gpu: logger.warning("Falling back to CPU after GPU loading failed") self._using_gpu = False try: self.nlp = spacy.load(model_name) self._model_info['device'] = 'CPU (fallback)' self._model_info['gpu_enabled'] = False logger.info(f"Successfully loaded {model_name} on CPU after GPU failure") except Exception as cpu_error: raise ValueError(f"Failed to load model on both GPU and CPU: {cpu_error}") from cpu_error else: raise def get_model_info(self) -> Dict[str, str]: """ Get information about the loaded model. Returns: Dictionary with model information """ return self._model_info.copy() def process_document(self, text: str) -> "spacy.Doc": """ Process text into a SpaCy document. Args: text: Input text to process Returns: Processed SpaCy document Raises: ValueError: If model not loaded or text processing fails """ if not self.nlp: raise ValueError("SpaCy model not loaded") if not text or not text.strip(): raise ValueError("Empty text provided") try: # Clean text before processing cleaned_text = TextUtility.clean_text_input(text) # Process with SpaCy doc = self.nlp(cleaned_text) # Add UniDic enrichment for Japanese if self.unidic_enricher and self.language == 'ja': try: self.unidic_enricher.enrich_spacy_doc(doc, cleaned_text) logger.debug("UniDic enrichment completed") except Exception as e: logger.warning(f"UniDic enrichment failed: {e}") return doc except Exception as e: self.handle_processing_error(e, f"processing text of length {len(text)}") raise def handle_processing_error(self, error: Exception, context: str) -> None: """ Handle processing errors with appropriate logging. Args: error: The exception that occurred context: Context description for the error """ error_msg = f"Error {context}: {error}" logger.error(error_msg) def filter_tokens(self, doc: "spacy.Doc", exclude_punct: bool = True, exclude_space: bool = True, word_type_filter: Optional[str] = None) -> List["spacy.Token"]: """ Filter tokens based on various criteria. Args: doc: SpaCy document exclude_punct: Whether to exclude punctuation exclude_space: Whether to exclude spaces word_type_filter: Filter by word type ('CW', 'FW', or None) Returns: List of filtered tokens """ filtered_tokens = [] for token in doc: # Basic filtering if exclude_space and token.is_space: continue if exclude_punct and token.is_punct: continue # Word type filtering if word_type_filter: word_type = self._classify_pos(token) if word_type != word_type_filter: continue filtered_tokens.append(token) return filtered_tokens def _classify_pos(self, token: "spacy.Token") -> str: """ Classify token as content word (CW) or function word (FW). Args: token: SpaCy token object Returns: 'CW' for content words, 'FW' for function words """ content_pos = {'NOUN', 'VERB', 'ADJ', 'ADV'} function_pos = {'DET', 'PRON', 'ADP', 'CONJ', 'CCONJ', 'SCONJ'} if token.pos_ in content_pos: return 'CW' elif token.pos_ in function_pos: return 'FW' else: # Default classification for ambiguous cases return 'CW' if token.pos_ not in {'PUNCT', 'SPACE', 'X'} else 'FW' def format_token_for_display(self, token: "spacy.Token", include_syntax: bool = True) -> Dict[str, Any]: """ Format token for UI display - only call when needed for output. Args: token: SpaCy token include_syntax: Whether to include syntactic information (dep_, head, etc.) Returns: Formatted token data dictionary for display """ result = { 'token': token.text, 'lemma': token.lemma_, 'pos': token.pos_, 'tag': token.tag_, 'word_type': self._classify_pos(token) } if include_syntax: result.update({ 'dep_': token.dep_, 'head_text': token.head.text, 'head_pos': token.head.pos_, }) return result def get_syntactic_context(self, token: "spacy.Token") -> Dict[str, Any]: """ Get comprehensive syntactic relationships for a token. Args: token: SpaCy token Returns: Dictionary with syntactic context information """ return { 'dep_': token.dep_, 'head': token.head, 'children': list(token.children), 'ancestors': list(token.ancestors), 'subtree_span': token.subtree, 'left_edge': token.left_edge, 'right_edge': token.right_edge } def process_sentences(self, doc: "spacy.Doc", max_tokens: Optional[int] = None) -> List["spacy.Span"]: """ Process sentences with optional token limits. Args: doc: SpaCy document max_tokens: Maximum tokens per sentence (uses config default if None) Returns: List of sentence spans """ max_tokens = max_tokens or AppConfig.MAX_TOKENS_FOR_VISUALIZATION processed_sentences = [] for sent in doc.sents: # Filter tokens (exclude spaces for counting) sent_tokens = [token for token in sent if not token.is_space] if len(sent_tokens) > max_tokens: # Truncate sentence truncated_tokens = sent_tokens[:max_tokens] # Create new span with truncated tokens start_idx = truncated_tokens[0].i end_idx = truncated_tokens[-1].i + 1 truncated_span = doc[start_idx:end_idx] processed_sentences.append(truncated_span) else: processed_sentences.append(sent) return processed_sentences def setup_batch_processing(self, file_paths: List[str]) -> Iterator[Tuple[str, str]]: """ Set up batch processing for multiple files. Args: file_paths: List of file paths to process Yields: Tuples of (file_path, text_content) """ for file_path in file_paths: try: text_content = TextUtility.extract_text_from_file(file_path) yield file_path, text_content except Exception as e: logger.error(f"Error processing file {file_path}: {e}") yield file_path, f"ERROR: {e}" def cleanup_batch_processing(self, temp_files: List[str]) -> None: """ Clean up temporary files from batch processing. Args: temp_files: List of temporary file paths """ TextUtility.cleanup_temp_files(temp_files)