""" NLP Processing Module using spaCy and NLTK Pure logic, config-driven preprocessing """ import re from typing import List, Dict, Any, Optional from pathlib import Path import logging # spaCy and NLTK imports try: import spacy from spacy.language import Language HAS_SPACY = True except ImportError: HAS_SPACY = False logging.warning("spaCy not installed. Install with: pip install spacy") try: import nltk from nltk.tokenize import sent_tokenize, word_tokenize from nltk.corpus import stopwords from nltk.stem import PorterStemmer, SnowballStemmer, LancasterStemmer HAS_NLTK = True except ImportError: HAS_NLTK = False logging.warning("NLTK not installed. Install with: pip install nltk") from utils import Config # ============================================================================ # NLTK DOWNLOADER # ============================================================================ class NLTKDownloader: """Automatically download required NLTK data""" @staticmethod def download_required_data(config: Config): """Download NLTK data based on config""" if not HAS_NLTK: return required_data = [] # Tokenizer tokenizer = config.get('nlp.nltk.tokenizer') if tokenizer: required_data.append(tokenizer) # Stopwords stopwords_lang = config.get('nlp.nltk.stopwords') if stopwords_lang: required_data.append('stopwords') # Download each for data_name in required_data: try: nltk.data.find(f'tokenizers/{data_name}') except LookupError: logging.info(f"Downloading NLTK data: {data_name}") nltk.download(data_name, quiet=True) # ============================================================================ # TEXT CLEANER # ============================================================================ class TextCleaner: """Clean and normalize text based on config""" def __init__(self, config: Config): self.config = config self.preprocessing_config = config.get('nlp.preprocessing', {}) def clean(self, text: str) -> str: """Apply all cleaning steps from config""" if not text: return "" # Remove excessive whitespace text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r' {2,}', ' ', text) # Remove control characters text = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', text) # Normalize unicode text = text.encode('utf-8', 'ignore').decode('utf-8') # Remove page numbers at line start text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE) # Optional: lowercase if self.preprocessing_config.get('lowercase', False): text = text.lower() # Optional: remove punctuation if self.preprocessing_config.get('remove_punctuation', False): text = re.sub(r'[^\w\s]', '', text) # Optional: remove numbers if self.preprocessing_config.get('remove_numbers', False): text = re.sub(r'\d+', '', text) return text.strip() def clean_sentence(self, sentence: str) -> str: """Clean individual sentence""" sentence = sentence.strip() # Remove if too short or too long min_length = self.preprocessing_config.get('min_word_length', 2) max_length = self.preprocessing_config.get('max_word_length', 50) words = sentence.split() words = [w for w in words if min_length <= len(w) <= max_length] return ' '.join(words) # ============================================================================ # SPACY PROCESSOR # ============================================================================ class SpacyProcessor: """Process text using spaCy""" def __init__(self, config: Config, logger: logging.Logger): self.config = config self.logger = logger self.nlp = None if HAS_SPACY: self._load_spacy_model() def _load_spacy_model(self): """Load spaCy model from config""" model_name = self.config.get('nlp.spacy.model', 'en_core_web_sm') try: self.nlp = spacy.load(model_name) self.logger.info(f"Loaded spaCy model: {model_name}") # Configure pipeline from config self._configure_pipeline() try: max_len = int(self.config.get('nlp.spacy.max_length', 1000000)) self.nlp.max_length = max_len except Exception: pass except OSError: self.logger.warning( f"spaCy model '{model_name}' not found. " f"Download with: python -m spacy download {model_name}" ) self.nlp = None def _configure_pipeline(self): """Configure spaCy pipeline from config""" if not self.nlp: return # Disable components disable = self.config.get('nlp.spacy.disable', []) for component in disable: if component in self.nlp.pipe_names: self.nlp.disable_pipe(component) self.logger.debug(f"Disabled spaCy component: {component}") def process(self, text: str) -> spacy.tokens.Doc: """Process text with spaCy""" if not self.nlp: raise RuntimeError("spaCy model not loaded") return self.nlp(text) def extract_sentences(self, text: str) -> List[str]: """Extract sentences using spaCy""" if not self.nlp: # Fallback to simple split return [s.strip() for s in text.split('.') if s.strip()] doc = self.process(text) return [sent.text.strip() for sent in doc.sents] def lemmatize(self, text: str) -> str: """Lemmatize text using spaCy""" if not self.nlp: return text doc = self.process(text) return ' '.join([token.lemma_ for token in doc]) def extract_entities(self, text: str) -> List[Dict[str, str]]: """Extract named entities""" if not self.nlp or 'ner' not in self.nlp.pipe_names: return [] doc = self.process(text) return [ { 'text': ent.text, 'label': ent.label_, 'start': ent.start_char, 'end': ent.end_char } for ent in doc.ents ] # ============================================================================ # NLTK PROCESSOR # ============================================================================ class NLTKProcessor: """Process text using NLTK""" def __init__(self, config: Config, logger: logging.Logger): self.config = config self.logger = logger if HAS_NLTK: NLTKDownloader.download_required_data(config) self._initialize_components() def _initialize_components(self): """Initialize NLTK components from config""" # Stopwords stopwords_lang = self.config.get('nlp.nltk.stopwords', 'english') try: self.stopwords = set(stopwords.words(stopwords_lang)) except Exception as e: self.logger.warning(f"Could not load stopwords: {e}") self.stopwords = set() # Stemmer stemmer_type = self.config.get('nlp.nltk.stemmer', 'porter') stemmer_map = { 'porter': PorterStemmer, 'snowball': lambda: SnowballStemmer('english'), 'lancaster': LancasterStemmer } stemmer_class = stemmer_map.get(stemmer_type, PorterStemmer) self.stemmer = stemmer_class() if callable(stemmer_class) else stemmer_class def tokenize_sentences(self, text: str) -> List[str]: """Tokenize text into sentences""" if not HAS_NLTK: return [s.strip() for s in text.split('.') if s.strip()] try: return sent_tokenize(text) except Exception as e: self.logger.warning(f"NLTK sentence tokenization failed: {e}") return [s.strip() for s in text.split('.') if s.strip()] def tokenize_words(self, text: str) -> List[str]: """Tokenize text into words""" if not HAS_NLTK: return text.split() try: return word_tokenize(text) except Exception as e: self.logger.warning(f"NLTK word tokenization failed: {e}") return text.split() def remove_stopwords(self, text: str) -> str: """Remove stopwords from text""" if not self.stopwords: return text words = self.tokenize_words(text) filtered_words = [w for w in words if w.lower() not in self.stopwords] return ' '.join(filtered_words) def stem_text(self, text: str) -> str: """Stem text""" if not hasattr(self, 'stemmer'): return text words = self.tokenize_words(text) stemmed_words = [self.stemmer.stem(w) for w in words] return ' '.join(stemmed_words) # ============================================================================ # UNIFIED NLP PROCESSOR # ============================================================================ class NLPProcessor: """ Unified NLP processor combining spaCy and NLTK All processing driven by config.yaml """ def __init__(self, config: Config, logger: logging.Logger): self.config = config self.logger = logger # Initialize components self.cleaner = TextCleaner(config) self.spacy_processor = SpacyProcessor(config, logger) if HAS_SPACY else None self.nltk_processor = NLTKProcessor(config, logger) if HAS_NLTK else None self.preprocessing_config = config.get('nlp.preprocessing', {}) self.logger.info("NLP Processor initialized") def preprocess_text(self, text: str) -> str: """ Complete preprocessing pipeline Order: Clean -> Lemmatize -> Remove Stopwords -> Stem """ if not text: return "" # Step 1: Clean text text = self.cleaner.clean(text) # Step 2: Lemmatize (if enabled and spaCy available) if self.preprocessing_config.get('lemmatize', False) and self.spacy_processor: try: max_chars = int(self.config.get('nlp.spacy.max_lemmatize_chars', 300000)) if len(text) <= max_chars: text = self.spacy_processor.lemmatize(text) else: pass except Exception as e: self.logger.warning(f"Lemmatization failed: {e}") # Step 3: Remove stopwords (if enabled) if self.preprocessing_config.get('remove_stopwords', False) and self.nltk_processor: text = self.nltk_processor.remove_stopwords(text) return text def extract_sentences(self, text: str, method: str = 'auto') -> List[str]: """ Extract sentences using specified method Args: text: Input text method: 'spacy', 'nltk', or 'auto' (tries spacy first) """ if method == 'spacy' or (method == 'auto' and self.spacy_processor): if self.spacy_processor: return self.spacy_processor.extract_sentences(text) if method == 'nltk' or method == 'auto': if self.nltk_processor: return self.nltk_processor.tokenize_sentences(text) # Fallback return [s.strip() for s in text.split('.') if s.strip()] def chunk_text(self, text: str, chunk_size: Optional[int] = None, overlap: Optional[int] = None) -> List[str]: """ Chunk text intelligently using sentence boundaries Args: text: Input text chunk_size: Characters per chunk (from config if None) overlap: Overlap between chunks (from config if None) """ # Get parameters from config if chunk_size is None: chunk_size = self.config.get('embeddings.chunk.size', 500) if overlap is None: overlap = self.config.get('embeddings.chunk.overlap', 50) # Extract sentences sentences = self.extract_sentences(text) chunks = [] current_chunk = [] current_length = 0 for sentence in sentences: sentence_length = len(sentence) # If adding this sentence exceeds chunk_size if current_length + sentence_length > chunk_size and current_chunk: # Save current chunk chunks.append(' '.join(current_chunk)) # Start new chunk with overlap # Keep last few sentences for overlap overlap_text = ' '.join(current_chunk) if len(overlap_text) > overlap: overlap_sentences = [] overlap_length = 0 for s in reversed(current_chunk): if overlap_length + len(s) <= overlap: overlap_sentences.insert(0, s) overlap_length += len(s) else: break current_chunk = overlap_sentences current_length = overlap_length else: current_chunk = [] current_length = 0 current_chunk.append(sentence) current_length += sentence_length # Add final chunk if current_chunk: chunks.append(' '.join(current_chunk)) return chunks def get_text_statistics(self, text: str) -> Dict[str, Any]: """Get comprehensive text statistics""" stats = { 'char_count': len(text), 'word_count': len(text.split()), 'sentence_count': len(self.extract_sentences(text)) } # Add spaCy stats if available if self.spacy_processor: try: doc = self.spacy_processor.process(text) stats['token_count'] = len(doc) stats['unique_lemmas'] = len(set([token.lemma_ for token in doc])) except Exception: pass return stats def extract_keywords(self, text: str, top_n: int = 10) -> List[str]: """Extract keywords using simple frequency analysis""" # Preprocess processed = self.preprocess_text(text) # Tokenize if self.nltk_processor: words = self.nltk_processor.tokenize_words(processed) else: words = processed.split() # Count frequency from collections import Counter word_freq = Counter(words) # Get top N return [word for word, _ in word_freq.most_common(top_n)] def process_document(self, text: str, metadata: Optional[Dict] = None) -> Dict[str, Any]: """ Process entire document with full NLP pipeline Returns: Dict with processed text, chunks, stats, and metadata """ result = { 'original_text': text, 'metadata': metadata or {} } # Preprocess self.logger.debug("Preprocessing document...") result['processed_text'] = self.preprocess_text(text) # Extract sentences self.logger.debug("Extracting sentences...") result['sentences'] = self.extract_sentences(text) # Chunk text self.logger.debug("Chunking text...") result['chunks'] = self.chunk_text(result['processed_text']) # Statistics result['statistics'] = self.get_text_statistics(text) # Keywords result['keywords'] = self.extract_keywords(text) return result # ============================================================================ # FACTORY FUNCTION # ============================================================================ def create_nlp_processor(config: Config, logger: logging.Logger) -> NLPProcessor: """Factory function to create NLP processor""" return NLPProcessor(config, logger)