Spaces:
Running
Running
| """ | |
| Micro Topic Extraction Service v2.2 | |
| Optimizations: | |
| - Batch processing with spaCy nlp.pipe() (5-10x faster) | |
| - Progress logging for large datasets | |
| Fixes: | |
| - Fix 1: Multi-word NER subsumption (drop unigram components) | |
| - Fix 2: Text sanity for NER (clean artifacts like "#emirates #") | |
| - Fix 3: Weak noun blacklist (drop generic nouns unless in NER) | |
| - Fix 4: Noun sanity (clean "city.#history" -> "city") | |
| Pipeline: | |
| - Only processes events with engagement="active" and type="watch" | |
| - Extracts hashtags -> stored in `hashtags` field | |
| - English: NER (en_core_web_md) -> `ner`, Nouns (POS) -> `nouns` | |
| - Hinglish: Remove Devanagari -> `text_v1`, then apply English pipeline | |
| - Hindi: Stanza NER + Noun extraction | |
| - Final: Aggregate, filter, and deduplicate -> `micro_topics` | |
| """ | |
| import re | |
| import unicodedata | |
| from typing import List, Dict, Set, Optional, Tuple | |
| from collections import Counter | |
| # ========================================================= | |
| # STOPWORDS | |
| # ========================================================= | |
| ENGLISH_STOPWORDS = { | |
| "a", "an", "the", "is", "are", "was", "were", "be", "been", "being", | |
| "have", "has", "had", "do", "does", "did", "will", "would", "could", | |
| "should", "may", "might", "must", "shall", "can", "need", "dare", | |
| "ought", "used", "to", "of", "in", "for", "on", "with", "at", "by", | |
| "from", "as", "into", "through", "during", "before", "after", "above", | |
| "below", "between", "under", "again", "further", "then", "once", "here", | |
| "there", "when", "where", "why", "how", "all", "each", "few", "more", | |
| "most", "other", "some", "such", "no", "nor", "not", "only", "own", | |
| "same", "so", "than", "too", "very", "just", "also", "now", "and", | |
| "but", "if", "or", "because", "until", "while", "this", "that", "these", | |
| "those", "what", "which", "who", "whom", "whose", "i", "you", "he", | |
| "she", "it", "we", "they", "me", "him", "her", "us", "them", "my", | |
| "your", "his", "its", "our", "their", "myself", "yourself", "himself", | |
| "herself", "itself", "ourselves", "themselves", "am", "about", "get", | |
| "got", "go", "going", "went", "come", "came", "make", "made", "take", | |
| "took", "see", "saw", "know", "knew", "think", "thought", "want", | |
| "like", "look", "use", "find", "give", "tell", "say", "said", "video", | |
| "watch", "watched", "new", "first", "last", "best", "top", "full", | |
| "part", "episode", "ep", "vs", "ft", "feat", "official", "exclusive", | |
| "shorts", "short", "movie", "clip", "scene", "trailer", "teaser" | |
| } | |
| HINDI_STOPWORDS = { | |
| "का", "के", "की", "है", "हैं", "था", "थे", "थी", "में", "से", "को", | |
| "पर", "ने", "और", "या", "एक", "यह", "वह", "इस", "उस", "जो", "तो", | |
| "भी", "कर", "हो", "ही", "अब", "जब", "तक", "बहुत", "कुछ", "सब", | |
| "कोई", "किसी", "अपने", "उनके", "इनके", "वाले", "वाली", "वाला" | |
| } | |
| WEAK_NOUNS = { | |
| "man", "men", "woman", "women", "person", "people", "guy", "guys", | |
| "leader", "leaders", "member", "members", "player", "players", | |
| "team", "teams", "group", "groups", "family", "families", | |
| "death", "life", "time", "day", "days", "night", "nights", | |
| "year", "years", "month", "months", "week", "weeks", | |
| "thing", "things", "stuff", "way", "ways", | |
| "world", "place", "places", "area", "areas", "country", "countries", | |
| "city", "cities", "town", "towns", "home", "house", | |
| "end", "start", "beginning", "part", "parts", "side", "sides", | |
| "point", "points", "case", "cases", "fact", "facts", | |
| "news", "update", "updates", "story", "stories", | |
| "channel", "channels", "subscriber", "subscribers", | |
| "view", "views", "like", "likes", "comment", "comments", | |
| "reaction", "reactions", "highlight", "highlights", | |
| "moment", "moments", "episode", "episodes" | |
| } | |
| # ========================================================= | |
| # LAZY MODEL LOADING | |
| # ========================================================= | |
| _nlp_en_md = None | |
| _stanza_hi = None | |
| def get_spacy_english_md(): | |
| """Lazy load spaCy English medium model.""" | |
| global _nlp_en_md | |
| if _nlp_en_md is None: | |
| try: | |
| import spacy | |
| _nlp_en_md = spacy.load("en_core_web_md") | |
| print("[OK] Loaded en_core_web_md") | |
| except OSError: | |
| print("[ERROR] en_core_web_md not found. Run: python -m spacy download en_core_web_md") | |
| _nlp_en_md = None | |
| return _nlp_en_md | |
| def get_stanza_hindi(): | |
| """Lazy load Stanza Hindi pipeline.""" | |
| global _stanza_hi | |
| if _stanza_hi is None: | |
| try: | |
| import stanza | |
| stanza.download('hi', verbose=False) | |
| _stanza_hi = stanza.Pipeline('hi', processors='tokenize,pos,ner', verbose=False) | |
| print("[OK] Loaded Stanza Hindi pipeline") | |
| except Exception as e: | |
| print(f"[ERROR] Stanza Hindi failed: {e}") | |
| _stanza_hi = None | |
| return _stanza_hi | |
| # ========================================================= | |
| # TEXT CLEANING UTILITIES | |
| # ========================================================= | |
| def normalize_unicode(text: str) -> str: | |
| """Unicode normalization (NFC) and zero-width character removal.""" | |
| if not text: | |
| return "" | |
| text = unicodedata.normalize('NFC', text) | |
| text = re.sub(r'[\u200b-\u200f\u202a-\u202e\ufeff]', '', text) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def remove_devanagari(text: str) -> str: | |
| """Remove Devanagari (Hindi) characters from text.""" | |
| return re.sub(r'[\u0900-\u097F]+', ' ', text) | |
| def remove_stopwords(text: str, stopwords: Set[str]) -> str: | |
| """Remove stopwords from text.""" | |
| words = text.split() | |
| filtered = [w for w in words if w.lower() not in stopwords] | |
| return ' '.join(filtered) | |
| def clean_text_v1(text: str) -> str: | |
| """Clean text for Hinglish processing.""" | |
| text = remove_devanagari(text) | |
| text = normalize_unicode(text) | |
| text = remove_stopwords(text, ENGLISH_STOPWORDS) | |
| text = re.sub(r'[|]+', ' ', text) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def sanitize_topic(text: str) -> str: | |
| """ | |
| FIX 2 & 4: Sanitize any topic text (NER, noun, etc). | |
| Handles cases like: | |
| - "#emirates #" -> "emirates" | |
| - "city.#history" -> "city" | |
| - "world—kuwait" -> "world kuwait" | |
| """ | |
| if not text: | |
| return "" | |
| # Remove hashtag patterns (including attached ones like ".#history") | |
| text = re.sub(r'\.?#\w*', '', text) | |
| # Replace em-dash and other dashes with space | |
| text = re.sub(r'[—–\-]+', ' ', text) | |
| # Remove leading/trailing punctuation and special chars | |
| text = text.strip(' #@|[](){}.,!?:;"\'-_') | |
| # Collapse spaces | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| # ========================================================= | |
| # HASHTAG EXTRACTION | |
| # ========================================================= | |
| def extract_hashtags(text: str) -> List[str]: | |
| """Extract hashtags from text. Returns list of hashtag values (without #).""" | |
| if not text: | |
| return [] | |
| hashtags = re.findall(r'#(\w+)', text, re.IGNORECASE) | |
| seen = set() | |
| result = [] | |
| for tag in hashtags: | |
| tag_lower = tag.lower() | |
| if tag_lower not in seen and len(tag_lower) >= 2: | |
| result.append(tag_lower) | |
| seen.add(tag_lower) | |
| return result | |
| # ========================================================= | |
| # BATCH PROCESSING FOR SPACY | |
| # ========================================================= | |
| def process_texts_batch_english(texts: List[str], batch_size: int = 100) -> List[Tuple[List[str], List[str]]]: | |
| """ | |
| Process multiple texts through spaCy using nlp.pipe() for efficiency. | |
| Returns list of (ner_list, noun_list) tuples, one per input text. | |
| 5-10x faster than processing individually. | |
| """ | |
| nlp = get_spacy_english_md() | |
| if nlp is None: | |
| return [([], []) for _ in texts] | |
| # Filter out None/empty texts and track indices | |
| valid_indices = [] | |
| valid_texts = [] | |
| for i, text in enumerate(texts): | |
| if text and isinstance(text, str) and text.strip(): | |
| valid_indices.append(i) | |
| valid_texts.append(text) | |
| # Initialize results for all texts (including empty ones) | |
| results = [([], []) for _ in texts] | |
| if not valid_texts: | |
| return results | |
| target_labels = {"PERSON", "ORG", "GPE", "LOC", "EVENT", "NORP", "FAC", "PRODUCT", "WORK_OF_ART"} | |
| # Use nlp.pipe for batch processing on valid texts only | |
| for idx, doc in enumerate(nlp.pipe(valid_texts, batch_size=batch_size)): | |
| original_idx = valid_indices[idx] | |
| # Extract NER | |
| ner = [] | |
| ner_seen = set() | |
| for ent in doc.ents: | |
| if ent.label_ in target_labels: | |
| ent_text = sanitize_topic(ent.text).lower() | |
| if ent_text and ent_text not in ner_seen and len(ent_text) >= 2: | |
| ner.append(ent_text) | |
| ner_seen.add(ent_text) | |
| # Create protected set from NER | |
| protected = set() | |
| for entity in ner: | |
| for word in entity.split(): | |
| protected.add(word.lower()) | |
| # Extract nouns | |
| nouns = [] | |
| noun_seen = set() | |
| for token in doc: | |
| if token.pos_ in ("NOUN", "PROPN"): | |
| # FIX 4: Sanitize noun text | |
| noun_text = sanitize_topic(token.text).lower() | |
| if not noun_text or len(noun_text) < 2: | |
| continue | |
| if noun_text in noun_seen: | |
| continue | |
| if noun_text in ENGLISH_STOPWORDS: | |
| continue | |
| if noun_text.isdigit(): | |
| continue | |
| if noun_text in WEAK_NOUNS and noun_text not in protected: | |
| continue | |
| nouns.append(noun_text) | |
| noun_seen.add(noun_text) | |
| results[original_idx] = (ner, nouns) | |
| return results | |
| # ========================================================= | |
| # HINDI PIPELINE (Stanza) - Not batched (used less frequently) | |
| # ========================================================= | |
| def process_hindi_text(text: str) -> Tuple[List[str], List[str]]: | |
| """Process Hindi text through Stanza. Returns (ner_list, noun_list).""" | |
| if not text: | |
| return [], [] | |
| stanza_pipeline = get_stanza_hindi() | |
| if stanza_pipeline is None: | |
| return [], [] | |
| text = normalize_unicode(text) | |
| try: | |
| doc = stanza_pipeline(text) | |
| # NER | |
| ner = [] | |
| ner_seen = set() | |
| for sentence in doc.sentences: | |
| for ent in sentence.ents: | |
| ent_text = sanitize_topic(ent.text) | |
| if ent_text and ent_text not in ner_seen and len(ent_text) >= 2: | |
| ner.append(ent_text) | |
| ner_seen.add(ent_text) | |
| # Nouns | |
| nouns = [] | |
| noun_seen = set() | |
| for sentence in doc.sentences: | |
| for word in sentence.words: | |
| if word.upos in ("NOUN", "PROPN"): | |
| noun_text = sanitize_topic(word.text) | |
| if (noun_text and | |
| noun_text not in noun_seen and | |
| len(noun_text) >= 2 and | |
| noun_text not in HINDI_STOPWORDS): | |
| nouns.append(noun_text) | |
| noun_seen.add(noun_text) | |
| return ner, nouns | |
| except Exception as e: | |
| print(f"[ERROR] Hindi processing failed: {e}") | |
| return [], [] | |
| # ========================================================= | |
| # NER SUBSUMPTION | |
| # ========================================================= | |
| def apply_ner_subsumption(all_topics: List[str], ner_entities: List[str]) -> List[str]: | |
| """ | |
| FIX 1: If a multi-word NER exists, drop its unigram components. | |
| Example: If "greg biffle" in topics -> drop "greg" and "biffle" | |
| """ | |
| subsumed = set() | |
| for entity in ner_entities: | |
| words = entity.split() | |
| if len(words) > 1: | |
| for word in words: | |
| subsumed.add(word.lower()) | |
| filtered = [] | |
| for topic in all_topics: | |
| topic_lower = topic.lower() | |
| if ' ' in topic_lower or topic_lower not in subsumed: | |
| filtered.append(topic) | |
| return filtered | |
| # ========================================================= | |
| # MAIN BATCH EXTRACTION FUNCTION | |
| # ========================================================= | |
| def process_events_batch(events: List[Dict], batch_size: int = 100) -> List[Dict]: | |
| """ | |
| Process a batch of events with optimized batch NLP processing. | |
| Uses spaCy's nlp.pipe() for 5-10x faster processing. | |
| """ | |
| # Filter qualifying events | |
| qualifying_indices = [] | |
| texts_to_process = [] | |
| language_types = [] | |
| for i, event in enumerate(events): | |
| if event.get("type") == "watch" and event.get("engagement") == "active": | |
| text_clean = event.get("text_clean", "") | |
| lang = event.get("language_type", "").lower() | |
| if lang == "hinglish": | |
| # For hinglish, clean the text first | |
| text_v1 = clean_text_v1(text_clean) | |
| event["text_v1"] = text_v1 | |
| texts_to_process.append(text_v1 if text_v1 else "") | |
| elif lang == "hindi": | |
| texts_to_process.append("") # Will process separately | |
| else: | |
| texts_to_process.append(text_clean) | |
| qualifying_indices.append(i) | |
| language_types.append(lang) | |
| total = len(qualifying_indices) | |
| print(f"[TOPIC] Processing {total} qualifying events...") | |
| if total == 0: | |
| return events | |
| # Batch process English/Hinglish texts through spaCy | |
| english_indices = [] | |
| english_texts = [] | |
| for idx, (i, lang) in enumerate(zip(qualifying_indices, language_types)): | |
| if lang in ("english", "hinglish", "unknown", ""): | |
| english_indices.append(idx) | |
| english_texts.append(texts_to_process[idx]) | |
| print(f"[TOPIC] Batch processing {len(english_texts)} English/Hinglish texts...") | |
| english_results = process_texts_batch_english(english_texts, batch_size) | |
| # Map results back | |
| english_result_map = {} | |
| for idx, result in zip(english_indices, english_results): | |
| english_result_map[idx] = result | |
| # Process each event and add results | |
| processed_count = 0 | |
| for idx, i in enumerate(qualifying_indices): | |
| event = events[i] | |
| text_clean = event.get("text_clean", "") | |
| lang = language_types[idx] | |
| # Extract hashtags | |
| hashtags = extract_hashtags(text_clean) | |
| # Get NER and nouns based on language | |
| if lang == "hindi": | |
| ner, nouns = process_hindi_text(text_clean) | |
| else: | |
| # Get from batch results | |
| ner, nouns = english_result_map.get(idx, ([], [])) | |
| # Aggregate topics | |
| all_topics = [] | |
| seen = set() | |
| for h in hashtags: | |
| h_lower = h.lower() | |
| if h_lower not in seen: | |
| all_topics.append(h_lower) | |
| seen.add(h_lower) | |
| for e in ner: | |
| e_lower = e.lower() | |
| if e_lower not in seen: | |
| all_topics.append(e_lower) | |
| seen.add(e_lower) | |
| for n in nouns: | |
| n_lower = n.lower() | |
| if n_lower not in seen: | |
| all_topics.append(n_lower) | |
| seen.add(n_lower) | |
| # Apply NER subsumption | |
| all_topics = apply_ner_subsumption(all_topics, ner) | |
| # Store results | |
| event["hashtags"] = hashtags | |
| event["ner"] = ner | |
| event["nouns"] = nouns | |
| event["micro_topics"] = all_topics | |
| processed_count += 1 | |
| if processed_count % 1000 == 0: | |
| print(f"[TOPIC] Processed {processed_count}/{total} events...") | |
| print(f"[TOPIC] Completed processing {total} events.") | |
| return events | |
| # ========================================================= | |
| # LEGACY SINGLE EVENT FUNCTION (for testing) | |
| # ========================================================= | |
| def extract_micro_topics_v2(event: Dict) -> Dict: | |
| """Extract micro topics from a single event. Use process_events_batch for bulk.""" | |
| if event.get("type") != "watch" or event.get("engagement") != "active": | |
| return event | |
| text_clean = event.get("text_clean", "") | |
| language_type = event.get("language_type", "").lower() | |
| hashtags = extract_hashtags(text_clean) | |
| if language_type == "english": | |
| results = process_texts_batch_english([text_clean]) | |
| ner, nouns = results[0] if results else ([], []) | |
| elif language_type == "hinglish": | |
| text_v1 = clean_text_v1(text_clean) | |
| event["text_v1"] = text_v1 | |
| results = process_texts_batch_english([text_v1]) | |
| ner, nouns = results[0] if results else ([], []) | |
| elif language_type == "hindi": | |
| ner, nouns = process_hindi_text(text_clean) | |
| else: | |
| results = process_texts_batch_english([text_clean]) | |
| ner, nouns = results[0] if results else ([], []) | |
| all_topics = [] | |
| seen = set() | |
| for h in hashtags: | |
| h_lower = h.lower() | |
| if h_lower not in seen: | |
| all_topics.append(h_lower) | |
| seen.add(h_lower) | |
| for e in ner: | |
| e_lower = e.lower() | |
| if e_lower not in seen: | |
| all_topics.append(e_lower) | |
| seen.add(e_lower) | |
| for n in nouns: | |
| n_lower = n.lower() | |
| if n_lower not in seen: | |
| all_topics.append(n_lower) | |
| seen.add(n_lower) | |
| all_topics = apply_ner_subsumption(all_topics, ner) | |
| event["hashtags"] = hashtags | |
| event["ner"] = ner | |
| event["nouns"] = nouns | |
| event["micro_topics"] = all_topics | |
| return event | |
| def get_aggregated_topics(events: List[Dict], top_n: int = 50) -> Dict: | |
| """Aggregate micro topics across all events.""" | |
| hashtag_counter = Counter() | |
| ner_counter = Counter() | |
| noun_counter = Counter() | |
| topic_counter = Counter() | |
| for event in events: | |
| hashtag_counter.update(event.get("hashtags", [])) | |
| ner_counter.update(event.get("ner", [])) | |
| noun_counter.update(event.get("nouns", [])) | |
| topic_counter.update(event.get("micro_topics", [])) | |
| return { | |
| "top_hashtags": [{"topic": t, "count": c} for t, c in hashtag_counter.most_common(top_n)], | |
| "top_ner": [{"topic": t, "count": c} for t, c in ner_counter.most_common(top_n)], | |
| "top_nouns": [{"topic": t, "count": c} for t, c in noun_counter.most_common(top_n)], | |
| "top_micro_topics": [{"topic": t, "count": c} for t, c in topic_counter.most_common(top_n)], | |
| "stats": { | |
| "total_unique_hashtags": len(hashtag_counter), | |
| "total_unique_ner": len(ner_counter), | |
| "total_unique_nouns": len(noun_counter), | |
| "total_unique_topics": len(topic_counter) | |
| } | |
| } | |