""" Micro Topic Extraction Service v2.2 Optimizations: - Batch processing with spaCy nlp.pipe() (5-10x faster) - Progress logging for large datasets Fixes: - Fix 1: Multi-word NER subsumption (drop unigram components) - Fix 2: Text sanity for NER (clean artifacts like "#emirates #") - Fix 3: Weak noun blacklist (drop generic nouns unless in NER) - Fix 4: Noun sanity (clean "city.#history" -> "city") Pipeline: - Only processes events with engagement="active" and type="watch" - Extracts hashtags -> stored in `hashtags` field - English: NER (en_core_web_md) -> `ner`, Nouns (POS) -> `nouns` - Hinglish: Remove Devanagari -> `text_v1`, then apply English pipeline - Hindi: Stanza NER + Noun extraction - Final: Aggregate, filter, and deduplicate -> `micro_topics` """ import re import unicodedata from typing import List, Dict, Set, Optional, Tuple from collections import Counter # ========================================================= # STOPWORDS # ========================================================= ENGLISH_STOPWORDS = { "a", "an", "the", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "must", "shall", "can", "need", "dare", "ought", "used", "to", "of", "in", "for", "on", "with", "at", "by", "from", "as", "into", "through", "during", "before", "after", "above", "below", "between", "under", "again", "further", "then", "once", "here", "there", "when", "where", "why", "how", "all", "each", "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than", "too", "very", "just", "also", "now", "and", "but", "if", "or", "because", "until", "while", "this", "that", "these", "those", "what", "which", "who", "whom", "whose", "i", "you", "he", "she", "it", "we", "they", "me", "him", "her", "us", "them", "my", "your", "his", "its", "our", "their", "myself", "yourself", "himself", "herself", "itself", "ourselves", "themselves", "am", "about", "get", "got", "go", "going", "went", "come", "came", "make", "made", "take", "took", "see", "saw", "know", "knew", "think", "thought", "want", "like", "look", "use", "find", "give", "tell", "say", "said", "video", "watch", "watched", "new", "first", "last", "best", "top", "full", "part", "episode", "ep", "vs", "ft", "feat", "official", "exclusive", "shorts", "short", "movie", "clip", "scene", "trailer", "teaser" } HINDI_STOPWORDS = { "का", "के", "की", "है", "हैं", "था", "थे", "थी", "में", "से", "को", "पर", "ने", "और", "या", "एक", "यह", "वह", "इस", "उस", "जो", "तो", "भी", "कर", "हो", "ही", "अब", "जब", "तक", "बहुत", "कुछ", "सब", "कोई", "किसी", "अपने", "उनके", "इनके", "वाले", "वाली", "वाला" } WEAK_NOUNS = { "man", "men", "woman", "women", "person", "people", "guy", "guys", "leader", "leaders", "member", "members", "player", "players", "team", "teams", "group", "groups", "family", "families", "death", "life", "time", "day", "days", "night", "nights", "year", "years", "month", "months", "week", "weeks", "thing", "things", "stuff", "way", "ways", "world", "place", "places", "area", "areas", "country", "countries", "city", "cities", "town", "towns", "home", "house", "end", "start", "beginning", "part", "parts", "side", "sides", "point", "points", "case", "cases", "fact", "facts", "news", "update", "updates", "story", "stories", "channel", "channels", "subscriber", "subscribers", "view", "views", "like", "likes", "comment", "comments", "reaction", "reactions", "highlight", "highlights", "moment", "moments", "episode", "episodes" } # ========================================================= # LAZY MODEL LOADING # ========================================================= _nlp_en_md = None _stanza_hi = None def get_spacy_english_md(): """Lazy load spaCy English medium model.""" global _nlp_en_md if _nlp_en_md is None: try: import spacy _nlp_en_md = spacy.load("en_core_web_md") print("[OK] Loaded en_core_web_md") except OSError: print("[ERROR] en_core_web_md not found. Run: python -m spacy download en_core_web_md") _nlp_en_md = None return _nlp_en_md def get_stanza_hindi(): """Lazy load Stanza Hindi pipeline.""" global _stanza_hi if _stanza_hi is None: try: import stanza stanza.download('hi', verbose=False) _stanza_hi = stanza.Pipeline('hi', processors='tokenize,pos,ner', verbose=False) print("[OK] Loaded Stanza Hindi pipeline") except Exception as e: print(f"[ERROR] Stanza Hindi failed: {e}") _stanza_hi = None return _stanza_hi # ========================================================= # TEXT CLEANING UTILITIES # ========================================================= def normalize_unicode(text: str) -> str: """Unicode normalization (NFC) and zero-width character removal.""" if not text: return "" text = unicodedata.normalize('NFC', text) text = re.sub(r'[\u200b-\u200f\u202a-\u202e\ufeff]', '', text) text = re.sub(r'\s+', ' ', text).strip() return text def remove_devanagari(text: str) -> str: """Remove Devanagari (Hindi) characters from text.""" return re.sub(r'[\u0900-\u097F]+', ' ', text) def remove_stopwords(text: str, stopwords: Set[str]) -> str: """Remove stopwords from text.""" words = text.split() filtered = [w for w in words if w.lower() not in stopwords] return ' '.join(filtered) def clean_text_v1(text: str) -> str: """Clean text for Hinglish processing.""" text = remove_devanagari(text) text = normalize_unicode(text) text = remove_stopwords(text, ENGLISH_STOPWORDS) text = re.sub(r'[|]+', ' ', text) text = re.sub(r'\s+', ' ', text).strip() return text def sanitize_topic(text: str) -> str: """ FIX 2 & 4: Sanitize any topic text (NER, noun, etc). Handles cases like: - "#emirates #" -> "emirates" - "city.#history" -> "city" - "world—kuwait" -> "world kuwait" """ if not text: return "" # Remove hashtag patterns (including attached ones like ".#history") text = re.sub(r'\.?#\w*', '', text) # Replace em-dash and other dashes with space text = re.sub(r'[—–\-]+', ' ', text) # Remove leading/trailing punctuation and special chars text = text.strip(' #@|[](){}.,!?:;"\'-_') # Collapse spaces text = re.sub(r'\s+', ' ', text).strip() return text # ========================================================= # HASHTAG EXTRACTION # ========================================================= def extract_hashtags(text: str) -> List[str]: """Extract hashtags from text. Returns list of hashtag values (without #).""" if not text: return [] hashtags = re.findall(r'#(\w+)', text, re.IGNORECASE) seen = set() result = [] for tag in hashtags: tag_lower = tag.lower() if tag_lower not in seen and len(tag_lower) >= 2: result.append(tag_lower) seen.add(tag_lower) return result # ========================================================= # BATCH PROCESSING FOR SPACY # ========================================================= def process_texts_batch_english(texts: List[str], batch_size: int = 100) -> List[Tuple[List[str], List[str]]]: """ Process multiple texts through spaCy using nlp.pipe() for efficiency. Returns list of (ner_list, noun_list) tuples, one per input text. 5-10x faster than processing individually. """ nlp = get_spacy_english_md() if nlp is None: return [([], []) for _ in texts] # Filter out None/empty texts and track indices valid_indices = [] valid_texts = [] for i, text in enumerate(texts): if text and isinstance(text, str) and text.strip(): valid_indices.append(i) valid_texts.append(text) # Initialize results for all texts (including empty ones) results = [([], []) for _ in texts] if not valid_texts: return results target_labels = {"PERSON", "ORG", "GPE", "LOC", "EVENT", "NORP", "FAC", "PRODUCT", "WORK_OF_ART"} # Use nlp.pipe for batch processing on valid texts only for idx, doc in enumerate(nlp.pipe(valid_texts, batch_size=batch_size)): original_idx = valid_indices[idx] # Extract NER ner = [] ner_seen = set() for ent in doc.ents: if ent.label_ in target_labels: ent_text = sanitize_topic(ent.text).lower() if ent_text and ent_text not in ner_seen and len(ent_text) >= 2: ner.append(ent_text) ner_seen.add(ent_text) # Create protected set from NER protected = set() for entity in ner: for word in entity.split(): protected.add(word.lower()) # Extract nouns nouns = [] noun_seen = set() for token in doc: if token.pos_ in ("NOUN", "PROPN"): # FIX 4: Sanitize noun text noun_text = sanitize_topic(token.text).lower() if not noun_text or len(noun_text) < 2: continue if noun_text in noun_seen: continue if noun_text in ENGLISH_STOPWORDS: continue if noun_text.isdigit(): continue if noun_text in WEAK_NOUNS and noun_text not in protected: continue nouns.append(noun_text) noun_seen.add(noun_text) results[original_idx] = (ner, nouns) return results # ========================================================= # HINDI PIPELINE (Stanza) - Not batched (used less frequently) # ========================================================= def process_hindi_text(text: str) -> Tuple[List[str], List[str]]: """Process Hindi text through Stanza. Returns (ner_list, noun_list).""" if not text: return [], [] stanza_pipeline = get_stanza_hindi() if stanza_pipeline is None: return [], [] text = normalize_unicode(text) try: doc = stanza_pipeline(text) # NER ner = [] ner_seen = set() for sentence in doc.sentences: for ent in sentence.ents: ent_text = sanitize_topic(ent.text) if ent_text and ent_text not in ner_seen and len(ent_text) >= 2: ner.append(ent_text) ner_seen.add(ent_text) # Nouns nouns = [] noun_seen = set() for sentence in doc.sentences: for word in sentence.words: if word.upos in ("NOUN", "PROPN"): noun_text = sanitize_topic(word.text) if (noun_text and noun_text not in noun_seen and len(noun_text) >= 2 and noun_text not in HINDI_STOPWORDS): nouns.append(noun_text) noun_seen.add(noun_text) return ner, nouns except Exception as e: print(f"[ERROR] Hindi processing failed: {e}") return [], [] # ========================================================= # NER SUBSUMPTION # ========================================================= def apply_ner_subsumption(all_topics: List[str], ner_entities: List[str]) -> List[str]: """ FIX 1: If a multi-word NER exists, drop its unigram components. Example: If "greg biffle" in topics -> drop "greg" and "biffle" """ subsumed = set() for entity in ner_entities: words = entity.split() if len(words) > 1: for word in words: subsumed.add(word.lower()) filtered = [] for topic in all_topics: topic_lower = topic.lower() if ' ' in topic_lower or topic_lower not in subsumed: filtered.append(topic) return filtered # ========================================================= # MAIN BATCH EXTRACTION FUNCTION # ========================================================= def process_events_batch(events: List[Dict], batch_size: int = 100) -> List[Dict]: """ Process a batch of events with optimized batch NLP processing. Uses spaCy's nlp.pipe() for 5-10x faster processing. """ # Filter qualifying events qualifying_indices = [] texts_to_process = [] language_types = [] for i, event in enumerate(events): if event.get("type") == "watch" and event.get("engagement") == "active": text_clean = event.get("text_clean", "") lang = event.get("language_type", "").lower() if lang == "hinglish": # For hinglish, clean the text first text_v1 = clean_text_v1(text_clean) event["text_v1"] = text_v1 texts_to_process.append(text_v1 if text_v1 else "") elif lang == "hindi": texts_to_process.append("") # Will process separately else: texts_to_process.append(text_clean) qualifying_indices.append(i) language_types.append(lang) total = len(qualifying_indices) print(f"[TOPIC] Processing {total} qualifying events...") if total == 0: return events # Batch process English/Hinglish texts through spaCy english_indices = [] english_texts = [] for idx, (i, lang) in enumerate(zip(qualifying_indices, language_types)): if lang in ("english", "hinglish", "unknown", ""): english_indices.append(idx) english_texts.append(texts_to_process[idx]) print(f"[TOPIC] Batch processing {len(english_texts)} English/Hinglish texts...") english_results = process_texts_batch_english(english_texts, batch_size) # Map results back english_result_map = {} for idx, result in zip(english_indices, english_results): english_result_map[idx] = result # Process each event and add results processed_count = 0 for idx, i in enumerate(qualifying_indices): event = events[i] text_clean = event.get("text_clean", "") lang = language_types[idx] # Extract hashtags hashtags = extract_hashtags(text_clean) # Get NER and nouns based on language if lang == "hindi": ner, nouns = process_hindi_text(text_clean) else: # Get from batch results ner, nouns = english_result_map.get(idx, ([], [])) # Aggregate topics all_topics = [] seen = set() for h in hashtags: h_lower = h.lower() if h_lower not in seen: all_topics.append(h_lower) seen.add(h_lower) for e in ner: e_lower = e.lower() if e_lower not in seen: all_topics.append(e_lower) seen.add(e_lower) for n in nouns: n_lower = n.lower() if n_lower not in seen: all_topics.append(n_lower) seen.add(n_lower) # Apply NER subsumption all_topics = apply_ner_subsumption(all_topics, ner) # Store results event["hashtags"] = hashtags event["ner"] = ner event["nouns"] = nouns event["micro_topics"] = all_topics processed_count += 1 if processed_count % 1000 == 0: print(f"[TOPIC] Processed {processed_count}/{total} events...") print(f"[TOPIC] Completed processing {total} events.") return events # ========================================================= # LEGACY SINGLE EVENT FUNCTION (for testing) # ========================================================= def extract_micro_topics_v2(event: Dict) -> Dict: """Extract micro topics from a single event. Use process_events_batch for bulk.""" if event.get("type") != "watch" or event.get("engagement") != "active": return event text_clean = event.get("text_clean", "") language_type = event.get("language_type", "").lower() hashtags = extract_hashtags(text_clean) if language_type == "english": results = process_texts_batch_english([text_clean]) ner, nouns = results[0] if results else ([], []) elif language_type == "hinglish": text_v1 = clean_text_v1(text_clean) event["text_v1"] = text_v1 results = process_texts_batch_english([text_v1]) ner, nouns = results[0] if results else ([], []) elif language_type == "hindi": ner, nouns = process_hindi_text(text_clean) else: results = process_texts_batch_english([text_clean]) ner, nouns = results[0] if results else ([], []) all_topics = [] seen = set() for h in hashtags: h_lower = h.lower() if h_lower not in seen: all_topics.append(h_lower) seen.add(h_lower) for e in ner: e_lower = e.lower() if e_lower not in seen: all_topics.append(e_lower) seen.add(e_lower) for n in nouns: n_lower = n.lower() if n_lower not in seen: all_topics.append(n_lower) seen.add(n_lower) all_topics = apply_ner_subsumption(all_topics, ner) event["hashtags"] = hashtags event["ner"] = ner event["nouns"] = nouns event["micro_topics"] = all_topics return event def get_aggregated_topics(events: List[Dict], top_n: int = 50) -> Dict: """Aggregate micro topics across all events.""" hashtag_counter = Counter() ner_counter = Counter() noun_counter = Counter() topic_counter = Counter() for event in events: hashtag_counter.update(event.get("hashtags", [])) ner_counter.update(event.get("ner", [])) noun_counter.update(event.get("nouns", [])) topic_counter.update(event.get("micro_topics", [])) return { "top_hashtags": [{"topic": t, "count": c} for t, c in hashtag_counter.most_common(top_n)], "top_ner": [{"topic": t, "count": c} for t, c in ner_counter.most_common(top_n)], "top_nouns": [{"topic": t, "count": c} for t, c in noun_counter.most_common(top_n)], "top_micro_topics": [{"topic": t, "count": c} for t, c in topic_counter.most_common(top_n)], "stats": { "total_unique_hashtags": len(hashtag_counter), "total_unique_ner": len(ner_counter), "total_unique_nouns": len(noun_counter), "total_unique_topics": len(topic_counter) } }