VivekanandaAI / nlp_processor.py
jyotirmoy05's picture
Upload 8 files
e889148 verified
"""
NLP Processing Module using spaCy and NLTK
Pure logic, config-driven preprocessing
"""
import re
from typing import List, Dict, Any, Optional
from pathlib import Path
import logging
# spaCy and NLTK imports
try:
import spacy
from spacy.language import Language
HAS_SPACY = True
except ImportError:
HAS_SPACY = False
logging.warning("spaCy not installed. Install with: pip install spacy")
try:
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer, SnowballStemmer, LancasterStemmer
HAS_NLTK = True
except ImportError:
HAS_NLTK = False
logging.warning("NLTK not installed. Install with: pip install nltk")
from utils import Config
# ============================================================================
# NLTK DOWNLOADER
# ============================================================================
class NLTKDownloader:
"""Automatically download required NLTK data"""
@staticmethod
def download_required_data(config: Config):
"""Download NLTK data based on config"""
if not HAS_NLTK:
return
required_data = []
# Tokenizer
tokenizer = config.get('nlp.nltk.tokenizer')
if tokenizer:
required_data.append(tokenizer)
# Stopwords
stopwords_lang = config.get('nlp.nltk.stopwords')
if stopwords_lang:
required_data.append('stopwords')
# Download each
for data_name in required_data:
try:
nltk.data.find(f'tokenizers/{data_name}')
except LookupError:
logging.info(f"Downloading NLTK data: {data_name}")
nltk.download(data_name, quiet=True)
# ============================================================================
# TEXT CLEANER
# ============================================================================
class TextCleaner:
"""Clean and normalize text based on config"""
def __init__(self, config: Config):
self.config = config
self.preprocessing_config = config.get('nlp.preprocessing', {})
def clean(self, text: str) -> str:
"""Apply all cleaning steps from config"""
if not text:
return ""
# Remove excessive whitespace
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r' {2,}', ' ', text)
# Remove control characters
text = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', text)
# Normalize unicode
text = text.encode('utf-8', 'ignore').decode('utf-8')
# Remove page numbers at line start
text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE)
# Optional: lowercase
if self.preprocessing_config.get('lowercase', False):
text = text.lower()
# Optional: remove punctuation
if self.preprocessing_config.get('remove_punctuation', False):
text = re.sub(r'[^\w\s]', '', text)
# Optional: remove numbers
if self.preprocessing_config.get('remove_numbers', False):
text = re.sub(r'\d+', '', text)
return text.strip()
def clean_sentence(self, sentence: str) -> str:
"""Clean individual sentence"""
sentence = sentence.strip()
# Remove if too short or too long
min_length = self.preprocessing_config.get('min_word_length', 2)
max_length = self.preprocessing_config.get('max_word_length', 50)
words = sentence.split()
words = [w for w in words if min_length <= len(w) <= max_length]
return ' '.join(words)
# ============================================================================
# SPACY PROCESSOR
# ============================================================================
class SpacyProcessor:
"""Process text using spaCy"""
def __init__(self, config: Config, logger: logging.Logger):
self.config = config
self.logger = logger
self.nlp = None
if HAS_SPACY:
self._load_spacy_model()
def _load_spacy_model(self):
"""Load spaCy model from config"""
model_name = self.config.get('nlp.spacy.model', 'en_core_web_sm')
try:
self.nlp = spacy.load(model_name)
self.logger.info(f"Loaded spaCy model: {model_name}")
# Configure pipeline from config
self._configure_pipeline()
try:
max_len = int(self.config.get('nlp.spacy.max_length', 1000000))
self.nlp.max_length = max_len
except Exception:
pass
except OSError:
self.logger.warning(
f"spaCy model '{model_name}' not found. "
f"Download with: python -m spacy download {model_name}"
)
self.nlp = None
def _configure_pipeline(self):
"""Configure spaCy pipeline from config"""
if not self.nlp:
return
# Disable components
disable = self.config.get('nlp.spacy.disable', [])
for component in disable:
if component in self.nlp.pipe_names:
self.nlp.disable_pipe(component)
self.logger.debug(f"Disabled spaCy component: {component}")
def process(self, text: str) -> spacy.tokens.Doc:
"""Process text with spaCy"""
if not self.nlp:
raise RuntimeError("spaCy model not loaded")
return self.nlp(text)
def extract_sentences(self, text: str) -> List[str]:
"""Extract sentences using spaCy"""
if not self.nlp:
# Fallback to simple split
return [s.strip() for s in text.split('.') if s.strip()]
doc = self.process(text)
return [sent.text.strip() for sent in doc.sents]
def lemmatize(self, text: str) -> str:
"""Lemmatize text using spaCy"""
if not self.nlp:
return text
doc = self.process(text)
return ' '.join([token.lemma_ for token in doc])
def extract_entities(self, text: str) -> List[Dict[str, str]]:
"""Extract named entities"""
if not self.nlp or 'ner' not in self.nlp.pipe_names:
return []
doc = self.process(text)
return [
{
'text': ent.text,
'label': ent.label_,
'start': ent.start_char,
'end': ent.end_char
}
for ent in doc.ents
]
# ============================================================================
# NLTK PROCESSOR
# ============================================================================
class NLTKProcessor:
"""Process text using NLTK"""
def __init__(self, config: Config, logger: logging.Logger):
self.config = config
self.logger = logger
if HAS_NLTK:
NLTKDownloader.download_required_data(config)
self._initialize_components()
def _initialize_components(self):
"""Initialize NLTK components from config"""
# Stopwords
stopwords_lang = self.config.get('nlp.nltk.stopwords', 'english')
try:
self.stopwords = set(stopwords.words(stopwords_lang))
except Exception as e:
self.logger.warning(f"Could not load stopwords: {e}")
self.stopwords = set()
# Stemmer
stemmer_type = self.config.get('nlp.nltk.stemmer', 'porter')
stemmer_map = {
'porter': PorterStemmer,
'snowball': lambda: SnowballStemmer('english'),
'lancaster': LancasterStemmer
}
stemmer_class = stemmer_map.get(stemmer_type, PorterStemmer)
self.stemmer = stemmer_class() if callable(stemmer_class) else stemmer_class
def tokenize_sentences(self, text: str) -> List[str]:
"""Tokenize text into sentences"""
if not HAS_NLTK:
return [s.strip() for s in text.split('.') if s.strip()]
try:
return sent_tokenize(text)
except Exception as e:
self.logger.warning(f"NLTK sentence tokenization failed: {e}")
return [s.strip() for s in text.split('.') if s.strip()]
def tokenize_words(self, text: str) -> List[str]:
"""Tokenize text into words"""
if not HAS_NLTK:
return text.split()
try:
return word_tokenize(text)
except Exception as e:
self.logger.warning(f"NLTK word tokenization failed: {e}")
return text.split()
def remove_stopwords(self, text: str) -> str:
"""Remove stopwords from text"""
if not self.stopwords:
return text
words = self.tokenize_words(text)
filtered_words = [w for w in words if w.lower() not in self.stopwords]
return ' '.join(filtered_words)
def stem_text(self, text: str) -> str:
"""Stem text"""
if not hasattr(self, 'stemmer'):
return text
words = self.tokenize_words(text)
stemmed_words = [self.stemmer.stem(w) for w in words]
return ' '.join(stemmed_words)
# ============================================================================
# UNIFIED NLP PROCESSOR
# ============================================================================
class NLPProcessor:
"""
Unified NLP processor combining spaCy and NLTK
All processing driven by config.yaml
"""
def __init__(self, config: Config, logger: logging.Logger):
self.config = config
self.logger = logger
# Initialize components
self.cleaner = TextCleaner(config)
self.spacy_processor = SpacyProcessor(config, logger) if HAS_SPACY else None
self.nltk_processor = NLTKProcessor(config, logger) if HAS_NLTK else None
self.preprocessing_config = config.get('nlp.preprocessing', {})
self.logger.info("NLP Processor initialized")
def preprocess_text(self, text: str) -> str:
"""
Complete preprocessing pipeline
Order: Clean -> Lemmatize -> Remove Stopwords -> Stem
"""
if not text:
return ""
# Step 1: Clean text
text = self.cleaner.clean(text)
# Step 2: Lemmatize (if enabled and spaCy available)
if self.preprocessing_config.get('lemmatize', False) and self.spacy_processor:
try:
max_chars = int(self.config.get('nlp.spacy.max_lemmatize_chars', 300000))
if len(text) <= max_chars:
text = self.spacy_processor.lemmatize(text)
else:
pass
except Exception as e:
self.logger.warning(f"Lemmatization failed: {e}")
# Step 3: Remove stopwords (if enabled)
if self.preprocessing_config.get('remove_stopwords', False) and self.nltk_processor:
text = self.nltk_processor.remove_stopwords(text)
return text
def extract_sentences(self, text: str, method: str = 'auto') -> List[str]:
"""
Extract sentences using specified method
Args:
text: Input text
method: 'spacy', 'nltk', or 'auto' (tries spacy first)
"""
if method == 'spacy' or (method == 'auto' and self.spacy_processor):
if self.spacy_processor:
return self.spacy_processor.extract_sentences(text)
if method == 'nltk' or method == 'auto':
if self.nltk_processor:
return self.nltk_processor.tokenize_sentences(text)
# Fallback
return [s.strip() for s in text.split('.') if s.strip()]
def chunk_text(self, text: str, chunk_size: Optional[int] = None,
overlap: Optional[int] = None) -> List[str]:
"""
Chunk text intelligently using sentence boundaries
Args:
text: Input text
chunk_size: Characters per chunk (from config if None)
overlap: Overlap between chunks (from config if None)
"""
# Get parameters from config
if chunk_size is None:
chunk_size = self.config.get('embeddings.chunk.size', 500)
if overlap is None:
overlap = self.config.get('embeddings.chunk.overlap', 50)
# Extract sentences
sentences = self.extract_sentences(text)
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
sentence_length = len(sentence)
# If adding this sentence exceeds chunk_size
if current_length + sentence_length > chunk_size and current_chunk:
# Save current chunk
chunks.append(' '.join(current_chunk))
# Start new chunk with overlap
# Keep last few sentences for overlap
overlap_text = ' '.join(current_chunk)
if len(overlap_text) > overlap:
overlap_sentences = []
overlap_length = 0
for s in reversed(current_chunk):
if overlap_length + len(s) <= overlap:
overlap_sentences.insert(0, s)
overlap_length += len(s)
else:
break
current_chunk = overlap_sentences
current_length = overlap_length
else:
current_chunk = []
current_length = 0
current_chunk.append(sentence)
current_length += sentence_length
# Add final chunk
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
def get_text_statistics(self, text: str) -> Dict[str, Any]:
"""Get comprehensive text statistics"""
stats = {
'char_count': len(text),
'word_count': len(text.split()),
'sentence_count': len(self.extract_sentences(text))
}
# Add spaCy stats if available
if self.spacy_processor:
try:
doc = self.spacy_processor.process(text)
stats['token_count'] = len(doc)
stats['unique_lemmas'] = len(set([token.lemma_ for token in doc]))
except Exception:
pass
return stats
def extract_keywords(self, text: str, top_n: int = 10) -> List[str]:
"""Extract keywords using simple frequency analysis"""
# Preprocess
processed = self.preprocess_text(text)
# Tokenize
if self.nltk_processor:
words = self.nltk_processor.tokenize_words(processed)
else:
words = processed.split()
# Count frequency
from collections import Counter
word_freq = Counter(words)
# Get top N
return [word for word, _ in word_freq.most_common(top_n)]
def process_document(self, text: str, metadata: Optional[Dict] = None) -> Dict[str, Any]:
"""
Process entire document with full NLP pipeline
Returns:
Dict with processed text, chunks, stats, and metadata
"""
result = {
'original_text': text,
'metadata': metadata or {}
}
# Preprocess
self.logger.debug("Preprocessing document...")
result['processed_text'] = self.preprocess_text(text)
# Extract sentences
self.logger.debug("Extracting sentences...")
result['sentences'] = self.extract_sentences(text)
# Chunk text
self.logger.debug("Chunking text...")
result['chunks'] = self.chunk_text(result['processed_text'])
# Statistics
result['statistics'] = self.get_text_statistics(text)
# Keywords
result['keywords'] = self.extract_keywords(text)
return result
# ============================================================================
# FACTORY FUNCTION
# ============================================================================
def create_nlp_processor(config: Config, logger: logging.Logger) -> NLPProcessor:
"""Factory function to create NLP processor"""
return NLPProcessor(config, logger)