egumasa's picture
Enhance GPU support with stronger enforcement
bb65e54
"""
Base analyzer module providing shared SpaCy infrastructure.
Eliminates code duplication and provides common functionality for all SpaCy-based analyzers.
"""
import spacy
from typing import Dict, List, Any, Optional, Iterator, Tuple, TYPE_CHECKING
import logging
import tempfile
from pathlib import Path
import os
from .app_config import AppConfig
from .text_utility import TextUtility
# Import UniDic extensions and enricher
try:
from . import unidic_extensions # This registers the token extensions
from .unidic_enricher import UniDicEnricher
UNIDIC_AVAILABLE = True
except ImportError as e:
logger.warning(f"UniDic integration not available: {e}")
UNIDIC_AVAILABLE = False
UniDicEnricher = None
if TYPE_CHECKING:
import spacy
logger = logging.getLogger(__name__)
class BaseAnalyzer:
"""
Base class for all SpaCy-based text analyzers.
Provides shared model loading, document processing, and utility functions.
"""
def __init__(self, language: str = None, model_size: str = None, gpu_device: Optional[int] = None):
"""
Initialize the base analyzer.
Args:
language: Language code ('en' or 'ja')
model_size: Model size ('md' or 'trf')
gpu_device: GPU device ID to use (None for auto-detect, -1 for CPU only)
"""
self.language = language or AppConfig.DEFAULT_LANGUAGE
self.model_size = model_size or AppConfig.DEFAULT_MODEL_SIZE
self.gpu_device = gpu_device
self.nlp = None
self._model_info = {}
self.unidic_enricher = None
self._using_gpu = False
self._load_spacy_model()
# Initialize UniDic enricher for Japanese
if self.language == 'ja' and UNIDIC_AVAILABLE:
try:
self.unidic_enricher = UniDicEnricher()
logger.info("UniDic enricher initialized for Japanese analysis")
except Exception as e:
logger.warning(f"Failed to initialize UniDic enricher: {e}")
self.unidic_enricher = None
def _detect_gpu_availability(self) -> Tuple[bool, Optional[str], Optional[int]]:
"""
Detect if GPU/CUDA is available for spaCy processing.
Returns:
Tuple of (is_available, device_name, device_id)
"""
try:
import torch
if torch.cuda.is_available():
device_count = torch.cuda.device_count()
if device_count > 0:
# Use specified device or default to 0
if self.gpu_device is not None and self.gpu_device >= 0:
device_id = min(self.gpu_device, device_count - 1)
else:
device_id = 0
device_name = torch.cuda.get_device_name(device_id)
return True, device_name, device_id
return False, None, None
except ImportError:
logger.debug("PyTorch not available - GPU support disabled")
return False, None, None
except Exception as e:
logger.warning(f"Error detecting GPU: {e}")
return False, None, None
def _configure_gpu_for_spacy(self) -> bool:
"""
Configure spaCy to use GPU if available with strong enforcement.
Returns:
True if GPU was successfully configured, False otherwise
"""
# Check if GPU should be disabled explicitly
if self.gpu_device == -1:
logger.info("GPU explicitly disabled by user")
return False
# Check if GPU is disabled via environment variable
if os.environ.get('SPACY_USE_GPU', '').lower() == 'false':
logger.info("GPU disabled via SPACY_USE_GPU environment variable")
return False
gpu_available, device_name, device_id = self._detect_gpu_availability()
if not gpu_available:
# For transformer models, this is a critical issue
if self.model_size == 'trf':
logger.warning("No GPU/CUDA device available for transformer model - performance will be degraded")
else:
logger.info("No GPU/CUDA device available - using CPU")
return False
try:
# Import torch to set device explicitly
import torch
# Set CUDA device globally for all operations
torch.cuda.set_device(device_id)
os.environ['CUDA_VISIBLE_DEVICES'] = str(device_id)
# Force spaCy to use GPU - use require_gpu for stronger enforcement
try:
spacy.require_gpu(gpu_id=device_id)
logger.info(f"Successfully enforced GPU usage with spacy.require_gpu()")
except Exception as e:
# Fallback to prefer_gpu if require_gpu fails
logger.warning(f"spacy.require_gpu() failed: {e}, trying prefer_gpu()")
gpu_id = spacy.prefer_gpu(gpu_id=device_id)
if gpu_id is False:
raise RuntimeError("spacy.prefer_gpu() returned False despite GPU being available")
logger.info(f"GPU strongly configured for spaCy - using {device_name} (device {device_id})")
# Set environment variable to ensure GPU usage
os.environ['SPACY_PREFER_GPU'] = '1'
return True
except Exception as e:
logger.error(f"Failed to enable GPU for spaCy: {e}")
# For transformer models, this is critical
if self.model_size == 'trf':
logger.error("GPU initialization failed for transformer model - processing will be slow")
return False
def _configure_batch_sizes(self) -> None:
"""Configure optimal batch sizes for GPU processing."""
if self.model_size == 'trf':
# Transformer models need smaller batch sizes due to memory constraints
# But GPU can handle larger batches than CPU
if hasattr(self.nlp, 'pipe'):
for pipe_name in self.nlp.pipe_names:
pipe = self.nlp.get_pipe(pipe_name)
if hasattr(pipe, 'cfg'):
# Set batch size based on available GPU memory
# These are conservative defaults that work on most GPUs
if pipe_name == 'transformer':
pipe.cfg['batch_size'] = 128 # Transformer batch size
else:
pipe.cfg['batch_size'] = 256 # Other components
else:
# Non-transformer models can use larger batches
if hasattr(self.nlp, 'pipe'):
for pipe_name in self.nlp.pipe_names:
pipe = self.nlp.get_pipe(pipe_name)
if hasattr(pipe, 'cfg'):
pipe.cfg['batch_size'] = 1024
def _force_model_to_gpu(self) -> bool:
"""
Force all model components to GPU after loading.
Returns:
True if successful, False otherwise
"""
if not self._using_gpu or not self.nlp:
return False
try:
import torch
# Force each pipeline component to GPU
for pipe_name, pipe in self.nlp.pipeline:
if hasattr(pipe, 'model'):
# Move the model to GPU
if hasattr(pipe.model, 'to'):
pipe.model.to('cuda:0')
logger.debug(f"Moved '{pipe_name}' component to GPU")
# Special handling for transformer components
if pipe_name == 'transformer' and hasattr(pipe, 'model'):
# Ensure transformer model is on GPU
if hasattr(pipe.model, 'transformer'):
pipe.model.transformer.to('cuda:0')
logger.info(f"Transformer component forcefully moved to GPU")
return True
except Exception as e:
logger.error(f"Failed to force model components to GPU: {e}")
return False
def _verify_gpu_usage(self) -> bool:
"""
Verify that model components are actually using GPU.
Returns:
True if GPU is being used, False otherwise
"""
if not self._using_gpu or not self.nlp:
return False
try:
import torch
gpu_components = []
cpu_components = []
for pipe_name, pipe in self.nlp.pipeline:
if hasattr(pipe, 'model'):
# Check device of model parameters
is_on_gpu = False
if hasattr(pipe.model, 'parameters'):
# Check if any parameters are on GPU
for param in pipe.model.parameters():
if param.is_cuda:
is_on_gpu = True
break
elif hasattr(pipe.model, 'device'):
# Check device attribute
device = str(pipe.model.device)
is_on_gpu = 'cuda' in device
if is_on_gpu:
gpu_components.append(pipe_name)
else:
cpu_components.append(pipe_name)
if gpu_components:
logger.info(f"Components on GPU: {', '.join(gpu_components)}")
if cpu_components:
logger.warning(f"Components still on CPU: {', '.join(cpu_components)}")
# For transformer models, ensure the transformer component is on GPU
if self.model_size == 'trf' and 'transformer' not in gpu_components:
logger.error("Transformer component is not on GPU!")
return False
return len(gpu_components) > 0
except Exception as e:
logger.error(f"Failed to verify GPU usage: {e}")
return False
def _load_spacy_model(self) -> None:
"""Load appropriate SpaCy model based on language and size with strong GPU enforcement."""
# Validate combination
if not AppConfig.validate_language_model_combination(self.language, self.model_size):
raise ValueError(f"Unsupported language/model combination: {self.language}/{self.model_size}")
model_name = AppConfig.get_spacy_model_name(self.language, self.model_size)
if not model_name:
raise ValueError(f"No model found for language '{self.language}' and size '{self.model_size}'")
# Configure GPU BEFORE loading model - this is critical
self._using_gpu = self._configure_gpu_for_spacy()
try:
# Load model with optimizations for GPU if available
if self._using_gpu and self.model_size == 'trf':
# Enable mixed precision for transformer models on GPU
self.nlp = spacy.load(model_name, config={"components": {"transformer": {"model": {"mixed_precision": True}}}})
else:
self.nlp = spacy.load(model_name)
# Force model components to GPU after loading
if self._using_gpu:
gpu_forced = self._force_model_to_gpu()
if not gpu_forced:
logger.warning("Failed to force model components to GPU")
# Verify GPU usage
gpu_verified = self._verify_gpu_usage()
if not gpu_verified and self.model_size == 'trf':
logger.error("GPU verification failed for transformer model")
# Get GPU info for model info
gpu_info = "CPU"
if self._using_gpu:
gpu_available, device_name, device_id = self._detect_gpu_availability()
if gpu_available:
gpu_info = f"GPU ({device_name}, device {device_id})"
# Add verification status
if self._verify_gpu_usage():
gpu_info += " [VERIFIED]"
else:
gpu_info += " [NOT VERIFIED]"
self._model_info = {
'name': model_name,
'language': self.language,
'model_size': self.model_size,
'version': spacy.__version__,
'device': gpu_info,
'gpu_enabled': self._using_gpu
}
logger.info(f"Loaded SpaCy model: {model_name} on {gpu_info}")
# Configure batch sizes for optimal GPU performance
if self._using_gpu and hasattr(self.nlp, 'pipe'):
# Increase batch size for GPU processing
self._configure_batch_sizes()
except OSError as e:
error_msg = f"SpaCy model {model_name} not found. Please install it first."
logger.error(error_msg)
raise OSError(error_msg) from e
except Exception as e:
logger.error(f"Error loading SpaCy model: {e}")
# Try fallback to CPU if GPU loading failed
if self._using_gpu:
logger.warning("Falling back to CPU after GPU loading failed")
self._using_gpu = False
try:
self.nlp = spacy.load(model_name)
self._model_info['device'] = 'CPU (fallback)'
self._model_info['gpu_enabled'] = False
logger.info(f"Successfully loaded {model_name} on CPU after GPU failure")
except Exception as cpu_error:
raise ValueError(f"Failed to load model on both GPU and CPU: {cpu_error}") from cpu_error
else:
raise
def get_model_info(self) -> Dict[str, str]:
"""
Get information about the loaded model.
Returns:
Dictionary with model information
"""
return self._model_info.copy()
def process_document(self, text: str) -> "spacy.Doc":
"""
Process text into a SpaCy document.
Args:
text: Input text to process
Returns:
Processed SpaCy document
Raises:
ValueError: If model not loaded or text processing fails
"""
if not self.nlp:
raise ValueError("SpaCy model not loaded")
if not text or not text.strip():
raise ValueError("Empty text provided")
try:
# Clean text before processing
cleaned_text = TextUtility.clean_text_input(text)
# Process with SpaCy
doc = self.nlp(cleaned_text)
# Add UniDic enrichment for Japanese
if self.unidic_enricher and self.language == 'ja':
try:
self.unidic_enricher.enrich_spacy_doc(doc, cleaned_text)
logger.debug("UniDic enrichment completed")
except Exception as e:
logger.warning(f"UniDic enrichment failed: {e}")
return doc
except Exception as e:
self.handle_processing_error(e, f"processing text of length {len(text)}")
raise
def handle_processing_error(self, error: Exception, context: str) -> None:
"""
Handle processing errors with appropriate logging.
Args:
error: The exception that occurred
context: Context description for the error
"""
error_msg = f"Error {context}: {error}"
logger.error(error_msg)
def filter_tokens(self,
doc: "spacy.Doc",
exclude_punct: bool = True,
exclude_space: bool = True,
word_type_filter: Optional[str] = None) -> List["spacy.Token"]:
"""
Filter tokens based on various criteria.
Args:
doc: SpaCy document
exclude_punct: Whether to exclude punctuation
exclude_space: Whether to exclude spaces
word_type_filter: Filter by word type ('CW', 'FW', or None)
Returns:
List of filtered tokens
"""
filtered_tokens = []
for token in doc:
# Basic filtering
if exclude_space and token.is_space:
continue
if exclude_punct and token.is_punct:
continue
# Word type filtering
if word_type_filter:
word_type = self._classify_pos(token)
if word_type != word_type_filter:
continue
filtered_tokens.append(token)
return filtered_tokens
def _classify_pos(self, token: "spacy.Token") -> str:
"""
Classify token as content word (CW) or function word (FW).
Args:
token: SpaCy token object
Returns:
'CW' for content words, 'FW' for function words
"""
content_pos = {'NOUN', 'VERB', 'ADJ', 'ADV'}
function_pos = {'DET', 'PRON', 'ADP', 'CONJ', 'CCONJ', 'SCONJ'}
if token.pos_ in content_pos:
return 'CW'
elif token.pos_ in function_pos:
return 'FW'
else:
# Default classification for ambiguous cases
return 'CW' if token.pos_ not in {'PUNCT', 'SPACE', 'X'} else 'FW'
def format_token_for_display(self, token: "spacy.Token", include_syntax: bool = True) -> Dict[str, Any]:
"""
Format token for UI display - only call when needed for output.
Args:
token: SpaCy token
include_syntax: Whether to include syntactic information (dep_, head, etc.)
Returns:
Formatted token data dictionary for display
"""
result = {
'token': token.text,
'lemma': token.lemma_,
'pos': token.pos_,
'tag': token.tag_,
'word_type': self._classify_pos(token)
}
if include_syntax:
result.update({
'dep_': token.dep_,
'head_text': token.head.text,
'head_pos': token.head.pos_,
})
return result
def get_syntactic_context(self, token: "spacy.Token") -> Dict[str, Any]:
"""
Get comprehensive syntactic relationships for a token.
Args:
token: SpaCy token
Returns:
Dictionary with syntactic context information
"""
return {
'dep_': token.dep_,
'head': token.head,
'children': list(token.children),
'ancestors': list(token.ancestors),
'subtree_span': token.subtree,
'left_edge': token.left_edge,
'right_edge': token.right_edge
}
def process_sentences(self,
doc: "spacy.Doc",
max_tokens: Optional[int] = None) -> List["spacy.Span"]:
"""
Process sentences with optional token limits.
Args:
doc: SpaCy document
max_tokens: Maximum tokens per sentence (uses config default if None)
Returns:
List of sentence spans
"""
max_tokens = max_tokens or AppConfig.MAX_TOKENS_FOR_VISUALIZATION
processed_sentences = []
for sent in doc.sents:
# Filter tokens (exclude spaces for counting)
sent_tokens = [token for token in sent if not token.is_space]
if len(sent_tokens) > max_tokens:
# Truncate sentence
truncated_tokens = sent_tokens[:max_tokens]
# Create new span with truncated tokens
start_idx = truncated_tokens[0].i
end_idx = truncated_tokens[-1].i + 1
truncated_span = doc[start_idx:end_idx]
processed_sentences.append(truncated_span)
else:
processed_sentences.append(sent)
return processed_sentences
def setup_batch_processing(self, file_paths: List[str]) -> Iterator[Tuple[str, str]]:
"""
Set up batch processing for multiple files.
Args:
file_paths: List of file paths to process
Yields:
Tuples of (file_path, text_content)
"""
for file_path in file_paths:
try:
text_content = TextUtility.extract_text_from_file(file_path)
yield file_path, text_content
except Exception as e:
logger.error(f"Error processing file {file_path}: {e}")
yield file_path, f"ERROR: {e}"
def cleanup_batch_processing(self, temp_files: List[str]) -> None:
"""
Clean up temporary files from batch processing.
Args:
temp_files: List of temporary file paths
"""
TextUtility.cleanup_temp_files(temp_files)