| """ |
| NLP Processing Module using spaCy and NLTK |
| Pure logic, config-driven preprocessing |
| """ |
|
|
| import re |
| from typing import List, Dict, Any, Optional |
| from pathlib import Path |
| import logging |
|
|
| |
| try: |
| import spacy |
| from spacy.language import Language |
| HAS_SPACY = True |
| except ImportError: |
| HAS_SPACY = False |
| logging.warning("spaCy not installed. Install with: pip install spacy") |
|
|
| try: |
| import nltk |
| from nltk.tokenize import sent_tokenize, word_tokenize |
| from nltk.corpus import stopwords |
| from nltk.stem import PorterStemmer, SnowballStemmer, LancasterStemmer |
| HAS_NLTK = True |
| except ImportError: |
| HAS_NLTK = False |
| logging.warning("NLTK not installed. Install with: pip install nltk") |
|
|
| from utils import Config |
|
|
| |
| |
| |
|
|
| class NLTKDownloader: |
| """Automatically download required NLTK data""" |
| |
| @staticmethod |
| def download_required_data(config: Config): |
| """Download NLTK data based on config""" |
| if not HAS_NLTK: |
| return |
| |
| required_data = [] |
| |
| |
| tokenizer = config.get('nlp.nltk.tokenizer') |
| if tokenizer: |
| required_data.append(tokenizer) |
| |
| |
| stopwords_lang = config.get('nlp.nltk.stopwords') |
| if stopwords_lang: |
| required_data.append('stopwords') |
| |
| |
| for data_name in required_data: |
| try: |
| nltk.data.find(f'tokenizers/{data_name}') |
| except LookupError: |
| logging.info(f"Downloading NLTK data: {data_name}") |
| nltk.download(data_name, quiet=True) |
|
|
| |
| |
| |
|
|
| class TextCleaner: |
| """Clean and normalize text based on config""" |
| |
| def __init__(self, config: Config): |
| self.config = config |
| self.preprocessing_config = config.get('nlp.preprocessing', {}) |
| |
| def clean(self, text: str) -> str: |
| """Apply all cleaning steps from config""" |
| if not text: |
| return "" |
| |
| |
| text = re.sub(r'\n{3,}', '\n\n', text) |
| text = re.sub(r' {2,}', ' ', text) |
| |
| |
| text = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', text) |
| |
| |
| text = text.encode('utf-8', 'ignore').decode('utf-8') |
| |
| |
| text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE) |
| |
| |
| if self.preprocessing_config.get('lowercase', False): |
| text = text.lower() |
| |
| |
| if self.preprocessing_config.get('remove_punctuation', False): |
| text = re.sub(r'[^\w\s]', '', text) |
| |
| |
| if self.preprocessing_config.get('remove_numbers', False): |
| text = re.sub(r'\d+', '', text) |
| |
| return text.strip() |
| |
| def clean_sentence(self, sentence: str) -> str: |
| """Clean individual sentence""" |
| sentence = sentence.strip() |
| |
| |
| min_length = self.preprocessing_config.get('min_word_length', 2) |
| max_length = self.preprocessing_config.get('max_word_length', 50) |
| |
| words = sentence.split() |
| words = [w for w in words if min_length <= len(w) <= max_length] |
| |
| return ' '.join(words) |
|
|
| |
| |
| |
|
|
| class SpacyProcessor: |
| """Process text using spaCy""" |
| |
| def __init__(self, config: Config, logger: logging.Logger): |
| self.config = config |
| self.logger = logger |
| self.nlp = None |
| |
| if HAS_SPACY: |
| self._load_spacy_model() |
| |
| def _load_spacy_model(self): |
| """Load spaCy model from config""" |
| model_name = self.config.get('nlp.spacy.model', 'en_core_web_sm') |
|
|
| try: |
| self.nlp = spacy.load(model_name) |
| self.logger.info(f"Loaded spaCy model: {model_name}") |
|
|
| |
| self._configure_pipeline() |
| try: |
| max_len = int(self.config.get('nlp.spacy.max_length', 1000000)) |
| self.nlp.max_length = max_len |
| except Exception: |
| pass |
| |
| except OSError: |
| self.logger.warning( |
| f"spaCy model '{model_name}' not found. " |
| f"Download with: python -m spacy download {model_name}" |
| ) |
| self.nlp = None |
| |
| def _configure_pipeline(self): |
| """Configure spaCy pipeline from config""" |
| if not self.nlp: |
| return |
| |
| |
| disable = self.config.get('nlp.spacy.disable', []) |
| for component in disable: |
| if component in self.nlp.pipe_names: |
| self.nlp.disable_pipe(component) |
| self.logger.debug(f"Disabled spaCy component: {component}") |
| |
| def process(self, text: str) -> spacy.tokens.Doc: |
| """Process text with spaCy""" |
| if not self.nlp: |
| raise RuntimeError("spaCy model not loaded") |
| |
| return self.nlp(text) |
| |
| def extract_sentences(self, text: str) -> List[str]: |
| """Extract sentences using spaCy""" |
| if not self.nlp: |
| |
| return [s.strip() for s in text.split('.') if s.strip()] |
| |
| doc = self.process(text) |
| return [sent.text.strip() for sent in doc.sents] |
| |
| def lemmatize(self, text: str) -> str: |
| """Lemmatize text using spaCy""" |
| if not self.nlp: |
| return text |
| |
| doc = self.process(text) |
| return ' '.join([token.lemma_ for token in doc]) |
| |
| def extract_entities(self, text: str) -> List[Dict[str, str]]: |
| """Extract named entities""" |
| if not self.nlp or 'ner' not in self.nlp.pipe_names: |
| return [] |
| |
| doc = self.process(text) |
| return [ |
| { |
| 'text': ent.text, |
| 'label': ent.label_, |
| 'start': ent.start_char, |
| 'end': ent.end_char |
| } |
| for ent in doc.ents |
| ] |
|
|
| |
| |
| |
|
|
| class NLTKProcessor: |
| """Process text using NLTK""" |
| |
| def __init__(self, config: Config, logger: logging.Logger): |
| self.config = config |
| self.logger = logger |
| |
| if HAS_NLTK: |
| NLTKDownloader.download_required_data(config) |
| self._initialize_components() |
| |
| def _initialize_components(self): |
| """Initialize NLTK components from config""" |
| |
| stopwords_lang = self.config.get('nlp.nltk.stopwords', 'english') |
| try: |
| self.stopwords = set(stopwords.words(stopwords_lang)) |
| except Exception as e: |
| self.logger.warning(f"Could not load stopwords: {e}") |
| self.stopwords = set() |
| |
| |
| stemmer_type = self.config.get('nlp.nltk.stemmer', 'porter') |
| stemmer_map = { |
| 'porter': PorterStemmer, |
| 'snowball': lambda: SnowballStemmer('english'), |
| 'lancaster': LancasterStemmer |
| } |
| |
| stemmer_class = stemmer_map.get(stemmer_type, PorterStemmer) |
| self.stemmer = stemmer_class() if callable(stemmer_class) else stemmer_class |
| |
| def tokenize_sentences(self, text: str) -> List[str]: |
| """Tokenize text into sentences""" |
| if not HAS_NLTK: |
| return [s.strip() for s in text.split('.') if s.strip()] |
| |
| try: |
| return sent_tokenize(text) |
| except Exception as e: |
| self.logger.warning(f"NLTK sentence tokenization failed: {e}") |
| return [s.strip() for s in text.split('.') if s.strip()] |
| |
| def tokenize_words(self, text: str) -> List[str]: |
| """Tokenize text into words""" |
| if not HAS_NLTK: |
| return text.split() |
| |
| try: |
| return word_tokenize(text) |
| except Exception as e: |
| self.logger.warning(f"NLTK word tokenization failed: {e}") |
| return text.split() |
| |
| def remove_stopwords(self, text: str) -> str: |
| """Remove stopwords from text""" |
| if not self.stopwords: |
| return text |
| |
| words = self.tokenize_words(text) |
| filtered_words = [w for w in words if w.lower() not in self.stopwords] |
| return ' '.join(filtered_words) |
| |
| def stem_text(self, text: str) -> str: |
| """Stem text""" |
| if not hasattr(self, 'stemmer'): |
| return text |
| |
| words = self.tokenize_words(text) |
| stemmed_words = [self.stemmer.stem(w) for w in words] |
| return ' '.join(stemmed_words) |
|
|
| |
| |
| |
|
|
| class NLPProcessor: |
| """ |
| Unified NLP processor combining spaCy and NLTK |
| All processing driven by config.yaml |
| """ |
| |
| def __init__(self, config: Config, logger: logging.Logger): |
| self.config = config |
| self.logger = logger |
| |
| |
| self.cleaner = TextCleaner(config) |
| self.spacy_processor = SpacyProcessor(config, logger) if HAS_SPACY else None |
| self.nltk_processor = NLTKProcessor(config, logger) if HAS_NLTK else None |
| |
| self.preprocessing_config = config.get('nlp.preprocessing', {}) |
| |
| self.logger.info("NLP Processor initialized") |
| |
| def preprocess_text(self, text: str) -> str: |
| """ |
| Complete preprocessing pipeline |
| Order: Clean -> Lemmatize -> Remove Stopwords -> Stem |
| """ |
| if not text: |
| return "" |
| |
| |
| text = self.cleaner.clean(text) |
| |
| |
| if self.preprocessing_config.get('lemmatize', False) and self.spacy_processor: |
| try: |
| max_chars = int(self.config.get('nlp.spacy.max_lemmatize_chars', 300000)) |
| if len(text) <= max_chars: |
| text = self.spacy_processor.lemmatize(text) |
| else: |
| pass |
| except Exception as e: |
| self.logger.warning(f"Lemmatization failed: {e}") |
| |
| |
| if self.preprocessing_config.get('remove_stopwords', False) and self.nltk_processor: |
| text = self.nltk_processor.remove_stopwords(text) |
| |
| return text |
| |
| def extract_sentences(self, text: str, method: str = 'auto') -> List[str]: |
| """ |
| Extract sentences using specified method |
| |
| Args: |
| text: Input text |
| method: 'spacy', 'nltk', or 'auto' (tries spacy first) |
| """ |
| if method == 'spacy' or (method == 'auto' and self.spacy_processor): |
| if self.spacy_processor: |
| return self.spacy_processor.extract_sentences(text) |
| |
| if method == 'nltk' or method == 'auto': |
| if self.nltk_processor: |
| return self.nltk_processor.tokenize_sentences(text) |
| |
| |
| return [s.strip() for s in text.split('.') if s.strip()] |
| |
| def chunk_text(self, text: str, chunk_size: Optional[int] = None, |
| overlap: Optional[int] = None) -> List[str]: |
| """ |
| Chunk text intelligently using sentence boundaries |
| |
| Args: |
| text: Input text |
| chunk_size: Characters per chunk (from config if None) |
| overlap: Overlap between chunks (from config if None) |
| """ |
| |
| if chunk_size is None: |
| chunk_size = self.config.get('embeddings.chunk.size', 500) |
| if overlap is None: |
| overlap = self.config.get('embeddings.chunk.overlap', 50) |
| |
| |
| sentences = self.extract_sentences(text) |
| |
| chunks = [] |
| current_chunk = [] |
| current_length = 0 |
| |
| for sentence in sentences: |
| sentence_length = len(sentence) |
| |
| |
| if current_length + sentence_length > chunk_size and current_chunk: |
| |
| chunks.append(' '.join(current_chunk)) |
| |
| |
| |
| overlap_text = ' '.join(current_chunk) |
| if len(overlap_text) > overlap: |
| overlap_sentences = [] |
| overlap_length = 0 |
| for s in reversed(current_chunk): |
| if overlap_length + len(s) <= overlap: |
| overlap_sentences.insert(0, s) |
| overlap_length += len(s) |
| else: |
| break |
| current_chunk = overlap_sentences |
| current_length = overlap_length |
| else: |
| current_chunk = [] |
| current_length = 0 |
| |
| current_chunk.append(sentence) |
| current_length += sentence_length |
| |
| |
| if current_chunk: |
| chunks.append(' '.join(current_chunk)) |
| |
| return chunks |
| |
| def get_text_statistics(self, text: str) -> Dict[str, Any]: |
| """Get comprehensive text statistics""" |
| stats = { |
| 'char_count': len(text), |
| 'word_count': len(text.split()), |
| 'sentence_count': len(self.extract_sentences(text)) |
| } |
| |
| |
| if self.spacy_processor: |
| try: |
| doc = self.spacy_processor.process(text) |
| stats['token_count'] = len(doc) |
| stats['unique_lemmas'] = len(set([token.lemma_ for token in doc])) |
| except Exception: |
| pass |
| |
| return stats |
| |
| def extract_keywords(self, text: str, top_n: int = 10) -> List[str]: |
| """Extract keywords using simple frequency analysis""" |
| |
| processed = self.preprocess_text(text) |
| |
| |
| if self.nltk_processor: |
| words = self.nltk_processor.tokenize_words(processed) |
| else: |
| words = processed.split() |
| |
| |
| from collections import Counter |
| word_freq = Counter(words) |
| |
| |
| return [word for word, _ in word_freq.most_common(top_n)] |
| |
| def process_document(self, text: str, metadata: Optional[Dict] = None) -> Dict[str, Any]: |
| """ |
| Process entire document with full NLP pipeline |
| |
| Returns: |
| Dict with processed text, chunks, stats, and metadata |
| """ |
| result = { |
| 'original_text': text, |
| 'metadata': metadata or {} |
| } |
| |
| |
| self.logger.debug("Preprocessing document...") |
| result['processed_text'] = self.preprocess_text(text) |
| |
| |
| self.logger.debug("Extracting sentences...") |
| result['sentences'] = self.extract_sentences(text) |
| |
| |
| self.logger.debug("Chunking text...") |
| result['chunks'] = self.chunk_text(result['processed_text']) |
| |
| |
| result['statistics'] = self.get_text_statistics(text) |
| |
| |
| result['keywords'] = self.extract_keywords(text) |
| |
| return result |
|
|
| |
| |
| |
|
|
| def create_nlp_processor(config: Config, logger: logging.Logger) -> NLPProcessor: |
| """Factory function to create NLP processor""" |
| return NLPProcessor(config, logger) |