Spaces:
Running
Running
| """ | |
| TEXT PREPROCESSING & CLEANING MODULE | |
| 1. Load NLP Pipeline (spaCy model) | |
| 2. Normalize Text (lowercase, remove special chars, URLs) | |
| 3. Tokenize & Analyze (break into words, POS tags) | |
| 4. Lemmatize & Clean (reduce to base forms, remove stopwords) | |
| 5. Store cleaned text & metadata in MongoDB | |
| 6. Feed to downstream AI models | |
| flow: User Input → Normalize → Tokenize → Lemmatize → Store → AI Models | |
| """ | |
| import re | |
| import string | |
| from typing import Dict, List, Optional, Tuple | |
| from datetime import datetime | |
| import logging | |
| try: | |
| import spacy | |
| from spacy.language import Language | |
| SPACY_AVAILABLE = True | |
| except ImportError: | |
| SPACY_AVAILABLE = False | |
| logging.info("spaCy not installed - using lightweight regex-based preprocessing") | |
| Language = None | |
| from backend.connection import get_collection | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Global cache for spaCy model (load once, reuse) | |
| _nlp_model: Optional[Language] = None | |
| def load_nlp_pipeline() -> Language: | |
| """ | |
| Load and cache spaCy NLP pipeline. | |
| Downloads en_core_web_sm on first run. | |
| Uses cache on subsequent calls for performance. | |
| Returns: | |
| spacy Language model instance | |
| """ | |
| global _nlp_model | |
| if _nlp_model is not None: | |
| return _nlp_model | |
| if not SPACY_AVAILABLE: | |
| raise RuntimeError("spaCy not installed.") | |
| try: | |
| # Try to load the model | |
| _nlp_model = spacy.load("en_core_web_sm") | |
| logger.info("Loaded spaCy model: en_core_web_sm") | |
| return _nlp_model | |
| except OSError: | |
| # Model not found, try to download | |
| logger.info("Downloading en_core_web_sm model...") | |
| ################################################################################3 | |
| # import subprocess | |
| # subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], check=True) | |
| import sys, subprocess | |
| subprocess.run([sys.executable, "-m", "spacy", "download", "en_core_web_sm"], check=True) | |
| _nlp_model = spacy.load("en_core_web_sm") | |
| logger.info("ownloaded and loaded en_core_web_sm") | |
| return _nlp_model | |
| class TextPreprocessor: | |
| # Complete text preprocessing pipeline | |
| def __init__(self): | |
| """Initialize preprocessor with spaCy pipeline if available, else use lightweight mode.""" | |
| if SPACY_AVAILABLE: | |
| try: | |
| self.nlp = load_nlp_pipeline() | |
| self.stop_words = self.nlp.Defaults.stop_words | |
| self.use_spacy = True | |
| logger.info("TextPreprocessor initialized with spaCy") | |
| except Exception as e: | |
| logger.warning(f"Failed to load spaCy: {e}. Using lightweight mode.") | |
| self.nlp = None | |
| self.stop_words = self._get_basic_stopwords() | |
| self.use_spacy = False | |
| else: | |
| self.nlp = None | |
| self.stop_words = self._get_basic_stopwords() | |
| self.use_spacy = False | |
| logger.info("TextPreprocessor initialized without spaCy (lightweight mode)") | |
| def _get_basic_stopwords(self) -> set: | |
| """Basic English stopwords for lightweight mode.""" | |
| return { | |
| 'i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours', | |
| 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers', | |
| 'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', | |
| 'what', 'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are', | |
| 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', | |
| 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', | |
| 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', | |
| 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', | |
| 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once' | |
| } | |
| def normalize_text(self, text: str) -> str: | |
| """ | |
| - Convert to lowercase | |
| - Remove URLs (https://..., http://...) | |
| - Remove email addresses | |
| - Remove special characters except apostrophes | |
| - Remove extra whitespace | |
| """ | |
| if not text: | |
| return "" | |
| # Remove URLs | |
| text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE) | |
| # Remove email addresses | |
| text = re.sub(r'\S+@\S+', '', text) | |
| # Remove mentions (@user) and hashtags (#hashtag) | |
| text = re.sub(r'@\w+|#\w+', '', text) | |
| # Convert to lowercase | |
| text = text.lower() | |
| # Remove special characters but keep spaces and apostrophes | |
| text = re.sub(r"[^\w\s']", '', text) | |
| # Remove extra whitespace and tabs | |
| text = ' '.join(text.split()) | |
| return text | |
| def tokenize_and_analyze(self, text: str) -> Tuple[List[str], List[Tuple[str, str]]]: | |
| if not text: | |
| return [], [] | |
| if not self.use_spacy: | |
| # Lightweight tokenization without spaCy | |
| tokens = re.findall(r'\b\w+\b', text.lower()) | |
| pos_tags = [(token, "NOUN") for token in tokens] # Simplified POS | |
| return tokens, pos_tags | |
| doc = self.nlp(text) | |
| tokens = [token.text for token in doc] | |
| pos_tags = [(token.text, token.pos_) for token in doc] | |
| return tokens, pos_tags | |
| def lemmatize_and_clean(self, text: str, remove_stopwords: bool = True,remove_punctuation: bool = True) -> Tuple[str, Dict]: | |
| if not text: | |
| return "", {} | |
| if not self.use_spacy: | |
| # Lightweight lemmatization without spaCy | |
| tokens = re.findall(r'\b\w+\b', text.lower()) | |
| lemmas = [] | |
| removed_stopwords = 0 | |
| for token in tokens: | |
| if remove_stopwords and token in self.stop_words: | |
| removed_stopwords += 1 | |
| continue | |
| if len(token) >= 2: | |
| lemmas.append(token) | |
| cleaned_text = ' '.join(lemmas) | |
| metadata = { | |
| "original_token_count": len(tokens), | |
| "cleaned_token_count": len(lemmas), | |
| "removed_stopwords": removed_stopwords, | |
| "pos_distribution": {}, | |
| "compression_ratio": round(len(lemmas) / len(tokens), 2) if tokens else 0, | |
| } | |
| return cleaned_text, metadata | |
| doc = self.nlp(text) | |
| lemmas = [] | |
| pos_distribution = {} | |
| removed_stopwords = 0 | |
| original_count = 0 | |
| for token in doc: | |
| original_count += 1 | |
| # Count pos tags | |
| pos = token.pos_ | |
| pos_distribution[pos] = pos_distribution.get(pos, 0) + 1 | |
| # Skip stopwords | |
| if remove_stopwords and token.is_stop: | |
| removed_stopwords += 1 | |
| continue | |
| # Skip punctuation | |
| if remove_punctuation and token.is_punct: | |
| continue | |
| # Get lemma (base form) | |
| lemma = token.lemma_.lower() | |
| # Skip single characters (unless important) | |
| if len(lemma) < 2 and token.pos_ not in ["NOUN", "VERB", "ADJ", "ADV"]: | |
| continue | |
| lemmas.append(lemma) | |
| cleaned_text = ' '.join(lemmas) | |
| metadata = { | |
| "original_token_count": original_count, | |
| "cleaned_token_count": len(lemmas), | |
| "removed_stopwords": removed_stopwords, | |
| "pos_distribution": pos_distribution, | |
| "compression_ratio": round(len(lemmas) / original_count, 2) if original_count > 0 else 0, | |
| } | |
| return cleaned_text, metadata | |
| def extract_keywords(self, text: str, top_n: int = 10) -> List[str]: | |
| """ | |
| - Extract noun phrases (noun chunks) | |
| - Filter by part-of-speech (NOUN, VERB, ADJ) | |
| - Rank by frequency | |
| - Return top N | |
| """ | |
| if not text: | |
| return [] | |
| if not self.use_spacy: | |
| # Lightweight keyword extraction without spaCy | |
| tokens = re.findall(r'\b\w{3,}\b', text.lower()) | |
| # Filter stopwords | |
| keywords = [t for t in tokens if t not in self.stop_words] | |
| # Count frequency | |
| from collections import Counter | |
| keyword_freq = Counter(keywords) | |
| return [kw for kw, _ in keyword_freq.most_common(top_n)] | |
| doc = self.nlp(text) | |
| # Extract noun chunks | |
| noun_chunks = [chunk.text.lower() for chunk in doc.noun_chunks] | |
| # Extract high-value POS (nouns, verbs, adjectives) | |
| important_tokens = [ | |
| token.text.lower() | |
| for token in doc | |
| if token.pos_ in ["NOUN", "VERB", "ADJ", "ADV"] | |
| and not token.is_stop | |
| and len(token.text) > 2 | |
| ] | |
| # Combine and deduplicate | |
| all_keywords = list(set(noun_chunks + important_tokens)) | |
| # Sort by frequency in text | |
| keyword_freq = {} | |
| for keyword in all_keywords: | |
| keyword_freq[keyword] = text.lower().count(keyword) | |
| sorted_keywords = sorted( | |
| keyword_freq.items(), | |
| key=lambda x: x[1], | |
| reverse=True | |
| ) | |
| return [kw for kw, _ in sorted_keywords[:top_n]] | |
| def preprocess(self, text: str) -> Dict: | |
| if not text: | |
| return { | |
| "original": "", | |
| "normalized": "", | |
| "tokens": [], | |
| "pos_tags": [], | |
| "cleaned": "", | |
| "keywords": [], | |
| "metadata": {}, | |
| } | |
| # Step 1: Normalize | |
| normalized = self.normalize_text(text) | |
| # Step 2: Tokenize | |
| tokens, pos_tags = self.tokenize_and_analyze(normalized) | |
| # Step 3: Lemmatize & Clean | |
| cleaned, metadata = self.lemmatize_and_clean(normalized) | |
| # Extract keywords | |
| keywords = self.extract_keywords(normalized) | |
| return { | |
| "original": text, | |
| "normalized": normalized, | |
| "tokens": tokens, | |
| "pos_tags": pos_tags, | |
| "cleaned": cleaned, | |
| "keywords": keywords, | |
| "metadata": metadata, | |
| } | |
| def store_preprocessing_results(memory_id: str, preprocessing_results: Dict) -> bool: | |
| # Store cleaned text & metadata in MongoDB. | |
| col = get_collection("memories") | |
| try: | |
| update_data = { | |
| "preprocessing": { | |
| "normalized": preprocessing_results.get("normalized"), | |
| "cleaned": preprocessing_results.get("cleaned"), | |
| "tokens": preprocessing_results.get("tokens"), | |
| "keywords": preprocessing_results.get("keywords"), | |
| "metadata": preprocessing_results.get("metadata"), | |
| }, | |
| "updated_at": datetime.utcnow(), | |
| } | |
| result = col.update_one( | |
| {"_id": __import__("bson").ObjectId(memory_id)}, | |
| {"$set": update_data} | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| logger.error(f"Failed to store preprocessing results: {e}") | |
| return False | |
| def preprocess_unprocessed_memories(batch_size: int = 50) -> Dict: | |
| """ | |
| Step 1 in the full NLP workflow. | |
| Subsequent steps (emotion analysis, embeddings) use cleaned text. | |
| """ | |
| col = get_collection("memories") | |
| preprocessor = TextPreprocessor() | |
| # Find memories without preprocessing | |
| unprocessed = list(col.find( | |
| {"preprocessing": {"$exists": False}} | |
| ).limit(batch_size)) | |
| processed_count = 0 | |
| failed_count = 0 | |
| errors = [] | |
| for memory in unprocessed: | |
| try: | |
| memory_id = str(memory["_id"]) | |
| content = memory.get("content", "") | |
| if not content: | |
| continue | |
| logger.info(f"Preprocessing memory {memory_id}...") | |
| results = preprocessor.preprocess(content) | |
| # Store results | |
| if store_preprocessing_results(memory_id, results): | |
| processed_count += 1 | |
| logger.info(f"✓ Preprocessed {memory_id}") | |
| else: | |
| failed_count += 1 | |
| errors.append(f"Failed to store preprocessing for {memory_id}") | |
| except Exception as e: | |
| failed_count += 1 | |
| error_msg = f"Error preprocessing {memory.get('_id')}: {str(e)}" | |
| errors.append(error_msg) | |
| logger.error(error_msg) | |
| return { | |
| "total": len(unprocessed), | |
| "processed": processed_count, | |
| "failed": failed_count, | |
| "errors": errors, | |
| } | |
| # # Test the preprocessor | |
| # if __name__ == "__main__": | |
| # preprocessor = TextPreprocessor() | |
| # sample_text = """ | |
| # Today was a mix of productivity and much-needed relaxation! | |
| # I checked https://example.com for work, then took a 10-minute walk to clear my head. ## 3 434 | |
| # Feeling grateful and peaceful. Contact me at test@example.com if you need anything! | |
| # """ | |
| # result = preprocessor.preprocess(sample_text) | |
| # print("\n" + "="*60) | |
| # print("TEXT PREPROCESSING PIPELINE OUTPUT") | |
| # print("="*60) | |
| # print(f"\nOriginal:\n{result['original']}") | |
| # print(f"\nNormalized:\n{result['normalized']}") | |
| # print(f"\nTokens: {result['tokens']}") | |
| # print(f"\nPOS Tags: {result['pos_tags']}") | |
| # print(f"\nCleaned:\n{result['cleaned']}") | |
| # print(f"\nKeywords: {result['keywords']}") | |
| # print(f"\nMetadata: {result['metadata']}") | |
| # print("\n" + "="*60) | |