| """ |
| Event processing pipeline that orchestrates NLP analysis, claim extraction, |
| virality scoring, and satellite validation for misinformation detection. |
| """ |
|
|
| import logging |
| import asyncio |
| from datetime import datetime |
| from typing import List, Dict, Any, Optional, Tuple |
| from dataclasses import dataclass |
|
|
| |
| from config import config |
| from models import ( |
| ProcessedEvent, Claim, SatelliteResult, EventSource, LanguageCode, |
| ClaimCategory, INDIAN_STATES, normalize_state_name |
| ) |
| from nlp_analyzer import nlp_analyzer, TextAnalysisResult |
| from database import database |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class RawEvent: |
| """Raw event data before processing""" |
| source: EventSource |
| original_text: str |
| timestamp: datetime |
| metadata: Dict[str, Any] |
| url: Optional[str] = None |
| author: Optional[str] = None |
| engagement_metrics: Optional[Dict[str, int]] = None |
|
|
|
|
| @dataclass |
| class ClaimExtractionResult: |
| """Result of claim extraction from text""" |
| claims: List[Claim] |
| primary_claim: Optional[Claim] |
| confidence_score: float |
| extraction_method: str |
| processing_metadata: Dict[str, Any] |
|
|
|
|
| class ClaimExtractor: |
| """ |
| Extracts and analyzes claims from text content. |
| Uses pattern matching, NLP analysis, and heuristics to identify |
| potential misinformation claims. |
| """ |
| |
| def __init__(self): |
| self.misinformation_patterns = self._load_misinformation_patterns() |
| self.claim_indicators = self._load_claim_indicators() |
| |
| def _load_misinformation_patterns(self) -> Dict[str, List[str]]: |
| """Load patterns commonly associated with misinformation""" |
| return { |
| "health": [ |
| r"vaccine.*(?:cause|dangerous|kill|harm|side effect)", |
| r"covid.*(?:hoax|fake|conspiracy|planned)", |
| r"medicine.*(?:hidden|secret|banned|suppressed)", |
| r"doctor.*(?:don't want|hiding|secret)", |
| r"cure.*(?:they|government|pharma).*(?:don't want|hiding)" |
| ], |
| "politics": [ |
| r"election.*(?:rigged|fraud|stolen|fake)", |
| r"government.*(?:hiding|secret|conspiracy|cover.?up)", |
| r"politician.*(?:corrupt|bribe|scandal|secret)", |
| r"vote.*(?:fraud|illegal|fake|manipulation)", |
| r"media.*(?:lying|fake|propaganda|biased)" |
| ], |
| "disaster": [ |
| r"earthquake.*(?:predicted|warning|coming|artificial)", |
| r"flood.*(?:artificial|man.?made|planned|conspiracy)", |
| r"cyclone.*(?:artificial|weather manipulation|planned)", |
| r"disaster.*(?:planned|artificial|government|conspiracy)" |
| ], |
| "technology": [ |
| r"5g.*(?:dangerous|radiation|cancer|kill|harm)", |
| r"phone.*(?:radiation|cancer|dangerous|spy)", |
| r"internet.*(?:control|surveillance|spy|track)", |
| r"ai.*(?:dangerous|control|replace|eliminate)" |
| ], |
| "social": [ |
| r"community.*(?:under attack|threat|danger|conspiracy)", |
| r"religion.*(?:under threat|attack|conspiracy|persecution)", |
| r"culture.*(?:destroyed|attack|threat|conspiracy)", |
| r"tradition.*(?:under attack|threat|destroyed)" |
| ] |
| } |
| |
| def _load_claim_indicators(self) -> List[str]: |
| """Load linguistic indicators of claims""" |
| return [ |
| |
| "it is true that", "the fact is", "evidence shows", "studies prove", |
| "research confirms", "scientists say", "experts claim", "data reveals", |
| |
| |
| "definitely", "certainly", "absolutely", "without doubt", "clearly", |
| "obviously", "undoubtedly", "proven", "confirmed", "established", |
| |
| |
| "urgent", "breaking", "alert", "warning", "immediate", "emergency", |
| "critical", "important", "must know", "shocking", "exposed", |
| |
| |
| "according to", "sources say", "reports indicate", "leaked documents", |
| "insider information", "confidential", "classified", "secret documents" |
| ] |
| |
| def extract_claims(self, text_analysis: TextAnalysisResult) -> ClaimExtractionResult: |
| """ |
| Extract claims from analyzed text using multiple approaches. |
| """ |
| try: |
| claims = [] |
| extraction_methods = [] |
| |
| text = text_analysis.cleaned_text |
| language = text_analysis.language_detection.language |
| |
| |
| pattern_claims = self._extract_claims_by_patterns(text, language) |
| claims.extend(pattern_claims) |
| if pattern_claims: |
| extraction_methods.append("pattern_matching") |
| |
| |
| sentence_claims = self._extract_claims_by_sentences(text, text_analysis) |
| claims.extend(sentence_claims) |
| if sentence_claims: |
| extraction_methods.append("sentence_analysis") |
| |
| |
| entity_claims = self._extract_claims_by_entities(text, text_analysis) |
| claims.extend(entity_claims) |
| if entity_claims: |
| extraction_methods.append("entity_analysis") |
| |
| |
| unique_claims = self._deduplicate_claims(claims) |
| ranked_claims = sorted(unique_claims, key=lambda c: c.confidence, reverse=True) |
| |
| |
| primary_claim = ranked_claims[0] if ranked_claims else None |
| |
| |
| overall_confidence = ( |
| sum(claim.confidence for claim in ranked_claims) / len(ranked_claims) |
| if ranked_claims else 0.0 |
| ) |
| |
| processing_metadata = { |
| "extraction_methods": extraction_methods, |
| "total_claims_found": len(claims), |
| "unique_claims": len(unique_claims), |
| "text_length": len(text), |
| "language": language.value, |
| "sentiment_score": text_analysis.sentiment_score |
| } |
| |
| return ClaimExtractionResult( |
| claims=ranked_claims[:5], |
| primary_claim=primary_claim, |
| confidence_score=overall_confidence, |
| extraction_method=", ".join(extraction_methods), |
| processing_metadata=processing_metadata |
| ) |
| |
| except Exception as e: |
| logger.error(f"Claim extraction failed: {e}") |
| return ClaimExtractionResult( |
| claims=[], |
| primary_claim=None, |
| confidence_score=0.0, |
| extraction_method="error", |
| processing_metadata={"error": str(e)} |
| ) |
| |
| def _extract_claims_by_patterns(self, text: str, language: LanguageCode) -> List[Claim]: |
| """Extract claims using predefined misinformation patterns""" |
| claims = [] |
| text_lower = text.lower() |
| |
| for category, patterns in self.misinformation_patterns.items(): |
| for pattern in patterns: |
| import re |
| matches = re.finditer(pattern, text_lower, re.IGNORECASE) |
| |
| for match in matches: |
| |
| start = max(0, match.start() - 50) |
| end = min(len(text), match.end() + 50) |
| claim_text = text[start:end].strip() |
| |
| |
| confidence = 0.7 + (len(pattern) / 100) |
| confidence = min(0.95, confidence) |
| |
| claim = Claim( |
| text=claim_text, |
| category=ClaimCategory(category), |
| confidence=confidence, |
| keywords=[match.group()], |
| entities=[] |
| ) |
| claims.append(claim) |
| |
| return claims |
| |
| def _extract_claims_by_sentences(self, text: str, text_analysis: TextAnalysisResult) -> List[Claim]: |
| """Extract claims by analyzing individual sentences""" |
| claims = [] |
| |
| |
| sentences = self._split_into_sentences(text) |
| |
| for sentence in sentences: |
| if len(sentence.strip()) < 20: |
| continue |
| |
| |
| claim_score = self._calculate_sentence_claim_score(sentence) |
| |
| if claim_score > 0.5: |
| |
| category = self._categorize_claim(sentence) |
| |
| |
| sentence_entities = [] |
| for entity in text_analysis.entities.entities: |
| if entity.lower() in sentence.lower(): |
| sentence_entities.append(entity) |
| |
| claim = Claim( |
| text=sentence.strip(), |
| category=category, |
| confidence=claim_score, |
| entities=sentence_entities, |
| geographic_entities=[ |
| e for e in text_analysis.entities.geographic_entities |
| if e.lower() in sentence.lower() |
| ] |
| ) |
| claims.append(claim) |
| |
| return claims |
| |
| def _extract_claims_by_entities(self, text: str, text_analysis: TextAnalysisResult) -> List[Claim]: |
| """Extract claims based on important entities and their context""" |
| claims = [] |
| |
| |
| for geo_entity in text_analysis.entities.geographic_entities: |
| |
| sentences = self._find_sentences_with_entity(text, geo_entity) |
| |
| for sentence in sentences: |
| |
| if self._contains_geographic_claim(sentence, geo_entity): |
| confidence = 0.6 + (0.2 if geo_entity.lower() in INDIAN_STATES else 0.1) |
| |
| claim = Claim( |
| text=sentence.strip(), |
| category=self._categorize_claim(sentence), |
| confidence=min(0.9, confidence), |
| entities=[geo_entity], |
| geographic_entities=[geo_entity] |
| ) |
| claims.append(claim) |
| |
| return claims |
| |
| def _split_into_sentences(self, text: str) -> List[str]: |
| """Split text into sentences using simple heuristics""" |
| import re |
| |
| |
| sentences = re.split(r'[.!?]+', text) |
| |
| |
| cleaned_sentences = [] |
| for sentence in sentences: |
| sentence = sentence.strip() |
| if len(sentence) > 10: |
| cleaned_sentences.append(sentence) |
| |
| return cleaned_sentences |
| |
| def _calculate_sentence_claim_score(self, sentence: str) -> float: |
| """Calculate how likely a sentence is to contain a claim""" |
| score = 0.0 |
| sentence_lower = sentence.lower() |
| |
| |
| for indicator in self.claim_indicators: |
| if indicator in sentence_lower: |
| score += 0.2 |
| |
| |
| assertion_patterns = [ |
| r'\b(is|are|was|were)\s+\w+', |
| r'\b(will|would|can|could)\s+\w+', |
| r'\b(always|never|all|every|no)\s+\w+', |
| r'\b(proven|confirmed|established|fact)\b' |
| ] |
| |
| import re |
| for pattern in assertion_patterns: |
| if re.search(pattern, sentence_lower): |
| score += 0.15 |
| |
| |
| controversial_keywords = [ |
| 'vaccine', 'government', 'conspiracy', 'secret', 'hidden', |
| 'dangerous', 'fake', 'hoax', 'fraud', 'scam' |
| ] |
| |
| for keyword in controversial_keywords: |
| if keyword in sentence_lower: |
| score += 0.1 |
| |
| return min(1.0, score) |
| |
| def _categorize_claim(self, text: str) -> ClaimCategory: |
| """Categorize a claim based on its content""" |
| text_lower = text.lower() |
| |
| |
| if any(word in text_lower for word in ['vaccine', 'medicine', 'doctor', 'health', 'disease', 'cure', 'treatment']): |
| return ClaimCategory.HEALTH |
| |
| |
| if any(word in text_lower for word in ['government', 'election', 'politician', 'vote', 'policy', 'minister']): |
| return ClaimCategory.POLITICS |
| |
| |
| if any(word in text_lower for word in ['earthquake', 'flood', 'cyclone', 'disaster', 'emergency', 'crisis']): |
| return ClaimCategory.DISASTER |
| |
| |
| if any(word in text_lower for word in ['5g', 'phone', 'internet', 'ai', 'technology', 'digital']): |
| return ClaimCategory.TECHNOLOGY |
| |
| |
| if any(word in text_lower for word in ['climate', 'environment', 'pollution', 'global warming', 'weather']): |
| return ClaimCategory.ENVIRONMENT |
| |
| |
| if any(word in text_lower for word in ['community', 'religion', 'culture', 'tradition', 'society']): |
| return ClaimCategory.SOCIAL |
| |
| return ClaimCategory.OTHER |
| |
| def _find_sentences_with_entity(self, text: str, entity: str) -> List[str]: |
| """Find sentences containing a specific entity""" |
| sentences = self._split_into_sentences(text) |
| matching_sentences = [] |
| |
| for sentence in sentences: |
| if entity.lower() in sentence.lower(): |
| matching_sentences.append(sentence) |
| |
| return matching_sentences |
| |
| def _contains_geographic_claim(self, sentence: str, geo_entity: str) -> bool: |
| """Check if sentence makes claims about a geographic entity""" |
| sentence_lower = sentence.lower() |
| |
| |
| claim_patterns = [ |
| f"{geo_entity.lower()}.*(?:dangerous|unsafe|attack|threat|crisis)", |
| f"(?:in|at).*{geo_entity.lower()}.*(?:happening|occurred|reported|confirmed)", |
| f"{geo_entity.lower()}.*(?:government|officials|authorities).*(?:hiding|covering|denying)" |
| ] |
| |
| import re |
| for pattern in claim_patterns: |
| if re.search(pattern, sentence_lower): |
| return True |
| |
| return False |
| |
| def _deduplicate_claims(self, claims: List[Claim]) -> List[Claim]: |
| """Remove duplicate claims based on text similarity""" |
| if not claims: |
| return [] |
| |
| unique_claims = [] |
| |
| for claim in claims: |
| is_duplicate = False |
| |
| for existing_claim in unique_claims: |
| |
| similarity = self._calculate_text_similarity(claim.text, existing_claim.text) |
| |
| if similarity > 0.7: |
| is_duplicate = True |
| |
| if claim.confidence > existing_claim.confidence: |
| unique_claims.remove(existing_claim) |
| unique_claims.append(claim) |
| break |
| |
| if not is_duplicate: |
| unique_claims.append(claim) |
| |
| return unique_claims |
| |
| def _calculate_text_similarity(self, text1: str, text2: str) -> float: |
| """Calculate simple text similarity based on word overlap""" |
| words1 = set(text1.lower().split()) |
| words2 = set(text2.lower().split()) |
| |
| if not words1 or not words2: |
| return 0.0 |
| |
| intersection = words1.intersection(words2) |
| union = words1.union(words2) |
| |
| return len(intersection) / len(union) |
|
|
|
|
| class ViralityScorer: |
| """ |
| Calculates virality scores for events based on source credibility, |
| engagement metrics, and content characteristics. |
| """ |
| |
| def __init__(self): |
| self.source_credibility = self._load_source_credibility_scores() |
| |
| def _load_source_credibility_scores(self) -> Dict[EventSource, float]: |
| """Load credibility scores for different event sources""" |
| return { |
| EventSource.NEWS: 0.7, |
| EventSource.TWITTER: 0.4, |
| EventSource.FACEBOOK: 0.3, |
| EventSource.RSS: 0.6, |
| EventSource.MANUAL: 0.5 |
| } |
| |
| def calculate_virality_score(self, raw_event: RawEvent, text_analysis: TextAnalysisResult, |
| claims: List[Claim]) -> float: |
| """ |
| Calculate virality score based on multiple factors. |
| Higher score indicates higher potential for viral spread. |
| """ |
| try: |
| |
| base_score = 1.0 - self.source_credibility.get(raw_event.source, 0.5) |
| |
| |
| content_score = self._calculate_content_virality(text_analysis, claims) |
| |
| |
| engagement_score = self._calculate_engagement_virality(raw_event.engagement_metrics) |
| |
| |
| timing_score = self._calculate_timing_virality(raw_event.timestamp) |
| |
| |
| virality_score = ( |
| base_score * 0.3 + |
| content_score * 0.4 + |
| engagement_score * 0.2 + |
| timing_score * 0.1 |
| ) |
| |
| return min(1.0, max(0.0, virality_score)) |
| |
| except Exception as e: |
| logger.warning(f"Virality score calculation failed: {e}") |
| return 0.5 |
| |
| def _calculate_content_virality(self, text_analysis: TextAnalysisResult, claims: List[Claim]) -> float: |
| """Calculate virality based on content characteristics""" |
| score = 0.0 |
| |
| |
| sentiment_magnitude = abs(text_analysis.sentiment_score) |
| score += sentiment_magnitude * 0.3 |
| |
| |
| if claims: |
| avg_claim_confidence = sum(claim.confidence for claim in claims) / len(claims) |
| score += avg_claim_confidence * 0.4 |
| |
| |
| viral_keywords = [ |
| 'breaking', 'urgent', 'shocking', 'exposed', 'secret', 'hidden', |
| 'conspiracy', 'scandal', 'leaked', 'exclusive', 'warning', 'alert' |
| ] |
| |
| keyword_count = sum(1 for keyword in viral_keywords if keyword in text_analysis.original_text.lower()) |
| score += min(0.3, keyword_count * 0.1) |
| |
| return min(1.0, score) |
| |
| def _calculate_engagement_virality(self, engagement_metrics: Optional[Dict[str, int]]) -> float: |
| """Calculate virality based on engagement metrics""" |
| if not engagement_metrics: |
| return 0.5 |
| |
| |
| likes = engagement_metrics.get('likes', 0) |
| shares = engagement_metrics.get('shares', 0) |
| comments = engagement_metrics.get('comments', 0) |
| |
| |
| share_score = min(1.0, shares / 100) |
| like_score = min(1.0, likes / 1000) |
| comment_score = min(1.0, comments / 50) |
| |
| return (share_score * 0.5 + like_score * 0.3 + comment_score * 0.2) |
| |
| def _calculate_timing_virality(self, timestamp: datetime) -> float: |
| """Calculate virality based on timing factors""" |
| |
| now = datetime.utcnow() |
| age_hours = (now - timestamp).total_seconds() / 3600 |
| |
| if age_hours < 1: |
| return 1.0 |
| elif age_hours < 6: |
| return 0.8 |
| elif age_hours < 24: |
| return 0.6 |
| elif age_hours < 72: |
| return 0.4 |
| else: |
| return 0.2 |
|
|
|
|
| class EventProcessor: |
| """ |
| Main event processor that orchestrates the entire pipeline: |
| NLP analysis -> Claim extraction -> Virality scoring -> Satellite validation |
| """ |
| |
| def __init__(self): |
| self.claim_extractor = ClaimExtractor() |
| self.virality_scorer = ViralityScorer() |
| self.initialized = False |
| |
| async def initialize(self) -> bool: |
| """Initialize the event processor and all its components""" |
| try: |
| |
| nlp_success = await nlp_analyzer.initialize() |
| if not nlp_success: |
| logger.error("Failed to initialize NLP analyzer") |
| return False |
| |
| |
| db_success = await database.initialize() |
| if not db_success: |
| logger.error("Failed to initialize database") |
| return False |
| |
| self.initialized = True |
| logger.info("Event processor initialized successfully") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Failed to initialize event processor: {e}") |
| return False |
| |
| async def process_event(self, raw_event: RawEvent) -> Optional[ProcessedEvent]: |
| """ |
| Process a raw event through the complete pipeline. |
| Returns a ProcessedEvent ready for storage and visualization. |
| """ |
| if not self.initialized: |
| logger.error("Event processor not initialized") |
| return None |
| |
| try: |
| logger.debug(f"Processing event from {raw_event.source.value}") |
| |
| |
| text_analysis = await nlp_analyzer.analyze_text(raw_event.original_text) |
| |
| |
| claim_result = self.claim_extractor.extract_claims(text_analysis) |
| |
| |
| region_hint, lat, lon = self._extract_geographic_info(text_analysis, raw_event.metadata) |
| |
| |
| virality_score = self.virality_scorer.calculate_virality_score( |
| raw_event, text_analysis, claim_result.claims |
| ) |
| |
| |
| processed_event = ProcessedEvent( |
| source=raw_event.source, |
| original_text=raw_event.original_text, |
| timestamp=raw_event.timestamp, |
| lang=text_analysis.language_detection.language, |
| region_hint=region_hint, |
| lat=lat, |
| lon=lon, |
| entities=text_analysis.entities.entities, |
| virality_score=virality_score, |
| satellite=None, |
| claims=claim_result.claims, |
| processing_metadata={ |
| "nlp_analysis": text_analysis.metadata, |
| "claim_extraction": claim_result.processing_metadata, |
| "virality_factors": { |
| "source_credibility": self.virality_scorer.source_credibility.get(raw_event.source, 0.5), |
| "content_score": virality_score |
| }, |
| "processing_time_ms": text_analysis.processing_time_ms |
| } |
| ) |
| |
| logger.debug(f"Event processed successfully: {processed_event.event_id}") |
| return processed_event |
| |
| except Exception as e: |
| logger.error(f"Event processing failed: {e}") |
| return None |
| |
| def _extract_geographic_info(self, text_analysis: TextAnalysisResult, |
| metadata: Dict[str, Any]) -> Tuple[str, float, float]: |
| """ |
| Extract geographic information from text analysis and metadata. |
| Returns (region_hint, latitude, longitude) |
| """ |
| region_hint = "" |
| lat, lon = 0.0, 0.0 |
| |
| |
| if metadata.get('location'): |
| location_data = metadata['location'] |
| if isinstance(location_data, dict): |
| lat = location_data.get('lat', 0.0) |
| lon = location_data.get('lon', 0.0) |
| region_hint = location_data.get('region', '') |
| |
| |
| if not region_hint and text_analysis.entities.indian_states: |
| |
| region_hint = normalize_state_name(text_analysis.entities.indian_states[0]) |
| |
| |
| lat, lon = self._get_state_coordinates(region_hint) |
| |
| elif not region_hint and text_analysis.entities.geographic_entities: |
| |
| for geo_entity in text_analysis.entities.geographic_entities: |
| normalized_entity = normalize_state_name(geo_entity) |
| if normalized_entity.lower() in INDIAN_STATES: |
| region_hint = normalized_entity |
| lat, lon = self._get_state_coordinates(region_hint) |
| break |
| |
| return region_hint, lat, lon |
| |
| def _get_state_coordinates(self, state_name: str) -> Tuple[float, float]: |
| """Get approximate coordinates for an Indian state""" |
| |
| state_coordinates = { |
| "andhra pradesh": (15.9129, 79.7400), |
| "arunachal pradesh": (28.2180, 94.7278), |
| "assam": (26.2006, 92.9376), |
| "bihar": (25.0961, 85.3131), |
| "chhattisgarh": (21.2787, 81.8661), |
| "goa": (15.2993, 74.1240), |
| "gujarat": (23.0225, 72.5714), |
| "haryana": (29.0588, 76.0856), |
| "himachal pradesh": (31.1048, 77.1734), |
| "jharkhand": (23.6102, 85.2799), |
| "karnataka": (15.3173, 75.7139), |
| "kerala": (10.8505, 76.2711), |
| "madhya pradesh": (22.9734, 78.6569), |
| "maharashtra": (19.7515, 75.7139), |
| "manipur": (24.6637, 93.9063), |
| "meghalaya": (25.4670, 91.3662), |
| "mizoram": (23.1645, 92.9376), |
| "nagaland": (26.1584, 94.5624), |
| "odisha": (20.9517, 85.0985), |
| "punjab": (31.1471, 75.3412), |
| "rajasthan": (27.0238, 74.2179), |
| "sikkim": (27.5330, 88.5122), |
| "tamil nadu": (11.1271, 78.6569), |
| "telangana": (18.1124, 79.0193), |
| "tripura": (23.9408, 91.9882), |
| "uttar pradesh": (26.8467, 80.9462), |
| "uttarakhand": (30.0668, 79.0193), |
| "west bengal": (22.9868, 87.8550), |
| "delhi": (28.7041, 77.1025), |
| "jammu and kashmir": (34.0837, 74.7973), |
| "ladakh": (34.1526, 77.5771) |
| } |
| |
| return state_coordinates.get(state_name.lower(), (0.0, 0.0)) |
|
|
|
|
| |
| event_processor = EventProcessor() |