DMj_backend / backend /text_preprocessor.py
isgr9801's picture
test
dcbb6a2 verified
"""
TEXT PREPROCESSING & CLEANING MODULE
1. Load NLP Pipeline (spaCy model)
2. Normalize Text (lowercase, remove special chars, URLs)
3. Tokenize & Analyze (break into words, POS tags)
4. Lemmatize & Clean (reduce to base forms, remove stopwords)
5. Store cleaned text & metadata in MongoDB
6. Feed to downstream AI models
flow: User Input → Normalize → Tokenize → Lemmatize → Store → AI Models
"""
import re
import string
from typing import Dict, List, Optional, Tuple
from datetime import datetime
import logging
try:
import spacy
from spacy.language import Language
SPACY_AVAILABLE = True
except ImportError:
SPACY_AVAILABLE = False
logging.info("spaCy not installed - using lightweight regex-based preprocessing")
Language = None
from backend.connection import get_collection
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global cache for spaCy model (load once, reuse)
_nlp_model: Optional[Language] = None
def load_nlp_pipeline() -> Language:
"""
Load and cache spaCy NLP pipeline.
Downloads en_core_web_sm on first run.
Uses cache on subsequent calls for performance.
Returns:
spacy Language model instance
"""
global _nlp_model
if _nlp_model is not None:
return _nlp_model
if not SPACY_AVAILABLE:
raise RuntimeError("spaCy not installed.")
try:
# Try to load the model
_nlp_model = spacy.load("en_core_web_sm")
logger.info("Loaded spaCy model: en_core_web_sm")
return _nlp_model
except OSError:
# Model not found, try to download
logger.info("Downloading en_core_web_sm model...")
################################################################################3
# import subprocess
# subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], check=True)
import sys, subprocess
subprocess.run([sys.executable, "-m", "spacy", "download", "en_core_web_sm"], check=True)
_nlp_model = spacy.load("en_core_web_sm")
logger.info("ownloaded and loaded en_core_web_sm")
return _nlp_model
class TextPreprocessor:
# Complete text preprocessing pipeline
def __init__(self):
"""Initialize preprocessor with spaCy pipeline if available, else use lightweight mode."""
if SPACY_AVAILABLE:
try:
self.nlp = load_nlp_pipeline()
self.stop_words = self.nlp.Defaults.stop_words
self.use_spacy = True
logger.info("TextPreprocessor initialized with spaCy")
except Exception as e:
logger.warning(f"Failed to load spaCy: {e}. Using lightweight mode.")
self.nlp = None
self.stop_words = self._get_basic_stopwords()
self.use_spacy = False
else:
self.nlp = None
self.stop_words = self._get_basic_stopwords()
self.use_spacy = False
logger.info("TextPreprocessor initialized without spaCy (lightweight mode)")
def _get_basic_stopwords(self) -> set:
"""Basic English stopwords for lightweight mode."""
return {
'i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours',
'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers',
'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves',
'what', 'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are',
'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does',
'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until',
'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into',
'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down',
'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once'
}
def normalize_text(self, text: str) -> str:
"""
- Convert to lowercase
- Remove URLs (https://..., http://...)
- Remove email addresses
- Remove special characters except apostrophes
- Remove extra whitespace
"""
if not text:
return ""
# Remove URLs
text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
# Remove email addresses
text = re.sub(r'\S+@\S+', '', text)
# Remove mentions (@user) and hashtags (#hashtag)
text = re.sub(r'@\w+|#\w+', '', text)
# Convert to lowercase
text = text.lower()
# Remove special characters but keep spaces and apostrophes
text = re.sub(r"[^\w\s']", '', text)
# Remove extra whitespace and tabs
text = ' '.join(text.split())
return text
def tokenize_and_analyze(self, text: str) -> Tuple[List[str], List[Tuple[str, str]]]:
if not text:
return [], []
if not self.use_spacy:
# Lightweight tokenization without spaCy
tokens = re.findall(r'\b\w+\b', text.lower())
pos_tags = [(token, "NOUN") for token in tokens] # Simplified POS
return tokens, pos_tags
doc = self.nlp(text)
tokens = [token.text for token in doc]
pos_tags = [(token.text, token.pos_) for token in doc]
return tokens, pos_tags
def lemmatize_and_clean(self, text: str, remove_stopwords: bool = True,remove_punctuation: bool = True) -> Tuple[str, Dict]:
if not text:
return "", {}
if not self.use_spacy:
# Lightweight lemmatization without spaCy
tokens = re.findall(r'\b\w+\b', text.lower())
lemmas = []
removed_stopwords = 0
for token in tokens:
if remove_stopwords and token in self.stop_words:
removed_stopwords += 1
continue
if len(token) >= 2:
lemmas.append(token)
cleaned_text = ' '.join(lemmas)
metadata = {
"original_token_count": len(tokens),
"cleaned_token_count": len(lemmas),
"removed_stopwords": removed_stopwords,
"pos_distribution": {},
"compression_ratio": round(len(lemmas) / len(tokens), 2) if tokens else 0,
}
return cleaned_text, metadata
doc = self.nlp(text)
lemmas = []
pos_distribution = {}
removed_stopwords = 0
original_count = 0
for token in doc:
original_count += 1
# Count pos tags
pos = token.pos_
pos_distribution[pos] = pos_distribution.get(pos, 0) + 1
# Skip stopwords
if remove_stopwords and token.is_stop:
removed_stopwords += 1
continue
# Skip punctuation
if remove_punctuation and token.is_punct:
continue
# Get lemma (base form)
lemma = token.lemma_.lower()
# Skip single characters (unless important)
if len(lemma) < 2 and token.pos_ not in ["NOUN", "VERB", "ADJ", "ADV"]:
continue
lemmas.append(lemma)
cleaned_text = ' '.join(lemmas)
metadata = {
"original_token_count": original_count,
"cleaned_token_count": len(lemmas),
"removed_stopwords": removed_stopwords,
"pos_distribution": pos_distribution,
"compression_ratio": round(len(lemmas) / original_count, 2) if original_count > 0 else 0,
}
return cleaned_text, metadata
def extract_keywords(self, text: str, top_n: int = 10) -> List[str]:
"""
- Extract noun phrases (noun chunks)
- Filter by part-of-speech (NOUN, VERB, ADJ)
- Rank by frequency
- Return top N
"""
if not text:
return []
if not self.use_spacy:
# Lightweight keyword extraction without spaCy
tokens = re.findall(r'\b\w{3,}\b', text.lower())
# Filter stopwords
keywords = [t for t in tokens if t not in self.stop_words]
# Count frequency
from collections import Counter
keyword_freq = Counter(keywords)
return [kw for kw, _ in keyword_freq.most_common(top_n)]
doc = self.nlp(text)
# Extract noun chunks
noun_chunks = [chunk.text.lower() for chunk in doc.noun_chunks]
# Extract high-value POS (nouns, verbs, adjectives)
important_tokens = [
token.text.lower()
for token in doc
if token.pos_ in ["NOUN", "VERB", "ADJ", "ADV"]
and not token.is_stop
and len(token.text) > 2
]
# Combine and deduplicate
all_keywords = list(set(noun_chunks + important_tokens))
# Sort by frequency in text
keyword_freq = {}
for keyword in all_keywords:
keyword_freq[keyword] = text.lower().count(keyword)
sorted_keywords = sorted(
keyword_freq.items(),
key=lambda x: x[1],
reverse=True
)
return [kw for kw, _ in sorted_keywords[:top_n]]
def preprocess(self, text: str) -> Dict:
if not text:
return {
"original": "",
"normalized": "",
"tokens": [],
"pos_tags": [],
"cleaned": "",
"keywords": [],
"metadata": {},
}
# Step 1: Normalize
normalized = self.normalize_text(text)
# Step 2: Tokenize
tokens, pos_tags = self.tokenize_and_analyze(normalized)
# Step 3: Lemmatize & Clean
cleaned, metadata = self.lemmatize_and_clean(normalized)
# Extract keywords
keywords = self.extract_keywords(normalized)
return {
"original": text,
"normalized": normalized,
"tokens": tokens,
"pos_tags": pos_tags,
"cleaned": cleaned,
"keywords": keywords,
"metadata": metadata,
}
def store_preprocessing_results(memory_id: str, preprocessing_results: Dict) -> bool:
# Store cleaned text & metadata in MongoDB.
col = get_collection("memories")
try:
update_data = {
"preprocessing": {
"normalized": preprocessing_results.get("normalized"),
"cleaned": preprocessing_results.get("cleaned"),
"tokens": preprocessing_results.get("tokens"),
"keywords": preprocessing_results.get("keywords"),
"metadata": preprocessing_results.get("metadata"),
},
"updated_at": datetime.utcnow(),
}
result = col.update_one(
{"_id": __import__("bson").ObjectId(memory_id)},
{"$set": update_data}
)
return result.modified_count > 0
except Exception as e:
logger.error(f"Failed to store preprocessing results: {e}")
return False
def preprocess_unprocessed_memories(batch_size: int = 50) -> Dict:
"""
Step 1 in the full NLP workflow.
Subsequent steps (emotion analysis, embeddings) use cleaned text.
"""
col = get_collection("memories")
preprocessor = TextPreprocessor()
# Find memories without preprocessing
unprocessed = list(col.find(
{"preprocessing": {"$exists": False}}
).limit(batch_size))
processed_count = 0
failed_count = 0
errors = []
for memory in unprocessed:
try:
memory_id = str(memory["_id"])
content = memory.get("content", "")
if not content:
continue
logger.info(f"Preprocessing memory {memory_id}...")
results = preprocessor.preprocess(content)
# Store results
if store_preprocessing_results(memory_id, results):
processed_count += 1
logger.info(f"✓ Preprocessed {memory_id}")
else:
failed_count += 1
errors.append(f"Failed to store preprocessing for {memory_id}")
except Exception as e:
failed_count += 1
error_msg = f"Error preprocessing {memory.get('_id')}: {str(e)}"
errors.append(error_msg)
logger.error(error_msg)
return {
"total": len(unprocessed),
"processed": processed_count,
"failed": failed_count,
"errors": errors,
}
# # Test the preprocessor
# if __name__ == "__main__":
# preprocessor = TextPreprocessor()
# sample_text = """
# Today was a mix of productivity and much-needed relaxation!
# I checked https://example.com for work, then took a 10-minute walk to clear my head. ## 3 434
# Feeling grateful and peaceful. Contact me at test@example.com if you need anything!
# """
# result = preprocessor.preprocess(sample_text)
# print("\n" + "="*60)
# print("TEXT PREPROCESSING PIPELINE OUTPUT")
# print("="*60)
# print(f"\nOriginal:\n{result['original']}")
# print(f"\nNormalized:\n{result['normalized']}")
# print(f"\nTokens: {result['tokens']}")
# print(f"\nPOS Tags: {result['pos_tags']}")
# print(f"\nCleaned:\n{result['cleaned']}")
# print(f"\nKeywords: {result['keywords']}")
# print(f"\nMetadata: {result['metadata']}")
# print("\n" + "="*60)