File size: 29,179 Bytes
c293f7c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 | """
Event processing pipeline that orchestrates NLP analysis, claim extraction,
virality scoring, and satellite validation for misinformation detection.
"""
import logging
import asyncio
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
# Local imports
from config import config
from models import (
ProcessedEvent, Claim, SatelliteResult, EventSource, LanguageCode,
ClaimCategory, INDIAN_STATES, normalize_state_name
)
from nlp_analyzer import nlp_analyzer, TextAnalysisResult
from database import database
# Configure logging
logger = logging.getLogger(__name__)
@dataclass
class RawEvent:
"""Raw event data before processing"""
source: EventSource
original_text: str
timestamp: datetime
metadata: Dict[str, Any]
url: Optional[str] = None
author: Optional[str] = None
engagement_metrics: Optional[Dict[str, int]] = None
@dataclass
class ClaimExtractionResult:
"""Result of claim extraction from text"""
claims: List[Claim]
primary_claim: Optional[Claim]
confidence_score: float
extraction_method: str
processing_metadata: Dict[str, Any]
class ClaimExtractor:
"""
Extracts and analyzes claims from text content.
Uses pattern matching, NLP analysis, and heuristics to identify
potential misinformation claims.
"""
def __init__(self):
self.misinformation_patterns = self._load_misinformation_patterns()
self.claim_indicators = self._load_claim_indicators()
def _load_misinformation_patterns(self) -> Dict[str, List[str]]:
"""Load patterns commonly associated with misinformation"""
return {
"health": [
r"vaccine.*(?:cause|dangerous|kill|harm|side effect)",
r"covid.*(?:hoax|fake|conspiracy|planned)",
r"medicine.*(?:hidden|secret|banned|suppressed)",
r"doctor.*(?:don't want|hiding|secret)",
r"cure.*(?:they|government|pharma).*(?:don't want|hiding)"
],
"politics": [
r"election.*(?:rigged|fraud|stolen|fake)",
r"government.*(?:hiding|secret|conspiracy|cover.?up)",
r"politician.*(?:corrupt|bribe|scandal|secret)",
r"vote.*(?:fraud|illegal|fake|manipulation)",
r"media.*(?:lying|fake|propaganda|biased)"
],
"disaster": [
r"earthquake.*(?:predicted|warning|coming|artificial)",
r"flood.*(?:artificial|man.?made|planned|conspiracy)",
r"cyclone.*(?:artificial|weather manipulation|planned)",
r"disaster.*(?:planned|artificial|government|conspiracy)"
],
"technology": [
r"5g.*(?:dangerous|radiation|cancer|kill|harm)",
r"phone.*(?:radiation|cancer|dangerous|spy)",
r"internet.*(?:control|surveillance|spy|track)",
r"ai.*(?:dangerous|control|replace|eliminate)"
],
"social": [
r"community.*(?:under attack|threat|danger|conspiracy)",
r"religion.*(?:under threat|attack|conspiracy|persecution)",
r"culture.*(?:destroyed|attack|threat|conspiracy)",
r"tradition.*(?:under attack|threat|destroyed)"
]
}
def _load_claim_indicators(self) -> List[str]:
"""Load linguistic indicators of claims"""
return [
# Assertion indicators
"it is true that", "the fact is", "evidence shows", "studies prove",
"research confirms", "scientists say", "experts claim", "data reveals",
# Certainty indicators
"definitely", "certainly", "absolutely", "without doubt", "clearly",
"obviously", "undoubtedly", "proven", "confirmed", "established",
# Urgency indicators
"urgent", "breaking", "alert", "warning", "immediate", "emergency",
"critical", "important", "must know", "shocking", "exposed",
# Authority indicators
"according to", "sources say", "reports indicate", "leaked documents",
"insider information", "confidential", "classified", "secret documents"
]
def extract_claims(self, text_analysis: TextAnalysisResult) -> ClaimExtractionResult:
"""
Extract claims from analyzed text using multiple approaches.
"""
try:
claims = []
extraction_methods = []
text = text_analysis.cleaned_text
language = text_analysis.language_detection.language
# Method 1: Pattern-based claim extraction
pattern_claims = self._extract_claims_by_patterns(text, language)
claims.extend(pattern_claims)
if pattern_claims:
extraction_methods.append("pattern_matching")
# Method 2: Sentence-based claim extraction
sentence_claims = self._extract_claims_by_sentences(text, text_analysis)
claims.extend(sentence_claims)
if sentence_claims:
extraction_methods.append("sentence_analysis")
# Method 3: Entity-based claim extraction
entity_claims = self._extract_claims_by_entities(text, text_analysis)
claims.extend(entity_claims)
if entity_claims:
extraction_methods.append("entity_analysis")
# Remove duplicates and rank by confidence
unique_claims = self._deduplicate_claims(claims)
ranked_claims = sorted(unique_claims, key=lambda c: c.confidence, reverse=True)
# Select primary claim (highest confidence)
primary_claim = ranked_claims[0] if ranked_claims else None
# Calculate overall confidence
overall_confidence = (
sum(claim.confidence for claim in ranked_claims) / len(ranked_claims)
if ranked_claims else 0.0
)
processing_metadata = {
"extraction_methods": extraction_methods,
"total_claims_found": len(claims),
"unique_claims": len(unique_claims),
"text_length": len(text),
"language": language.value,
"sentiment_score": text_analysis.sentiment_score
}
return ClaimExtractionResult(
claims=ranked_claims[:5], # Limit to top 5 claims
primary_claim=primary_claim,
confidence_score=overall_confidence,
extraction_method=", ".join(extraction_methods),
processing_metadata=processing_metadata
)
except Exception as e:
logger.error(f"Claim extraction failed: {e}")
return ClaimExtractionResult(
claims=[],
primary_claim=None,
confidence_score=0.0,
extraction_method="error",
processing_metadata={"error": str(e)}
)
def _extract_claims_by_patterns(self, text: str, language: LanguageCode) -> List[Claim]:
"""Extract claims using predefined misinformation patterns"""
claims = []
text_lower = text.lower()
for category, patterns in self.misinformation_patterns.items():
for pattern in patterns:
import re
matches = re.finditer(pattern, text_lower, re.IGNORECASE)
for match in matches:
# Extract surrounding context
start = max(0, match.start() - 50)
end = min(len(text), match.end() + 50)
claim_text = text[start:end].strip()
# Calculate confidence based on pattern specificity
confidence = 0.7 + (len(pattern) / 100) # More specific patterns get higher confidence
confidence = min(0.95, confidence)
claim = Claim(
text=claim_text,
category=ClaimCategory(category),
confidence=confidence,
keywords=[match.group()],
entities=[]
)
claims.append(claim)
return claims
def _extract_claims_by_sentences(self, text: str, text_analysis: TextAnalysisResult) -> List[Claim]:
"""Extract claims by analyzing individual sentences"""
claims = []
# Split text into sentences
sentences = self._split_into_sentences(text)
for sentence in sentences:
if len(sentence.strip()) < 20: # Skip very short sentences
continue
# Check for claim indicators
claim_score = self._calculate_sentence_claim_score(sentence)
if claim_score > 0.5: # Threshold for considering as a claim
# Determine category based on content
category = self._categorize_claim(sentence)
# Extract entities from this sentence
sentence_entities = []
for entity in text_analysis.entities.entities:
if entity.lower() in sentence.lower():
sentence_entities.append(entity)
claim = Claim(
text=sentence.strip(),
category=category,
confidence=claim_score,
entities=sentence_entities,
geographic_entities=[
e for e in text_analysis.entities.geographic_entities
if e.lower() in sentence.lower()
]
)
claims.append(claim)
return claims
def _extract_claims_by_entities(self, text: str, text_analysis: TextAnalysisResult) -> List[Claim]:
"""Extract claims based on important entities and their context"""
claims = []
# Focus on geographic entities (Indian states/cities)
for geo_entity in text_analysis.entities.geographic_entities:
# Find sentences containing this entity
sentences = self._find_sentences_with_entity(text, geo_entity)
for sentence in sentences:
# Check if this sentence makes claims about the geographic entity
if self._contains_geographic_claim(sentence, geo_entity):
confidence = 0.6 + (0.2 if geo_entity.lower() in INDIAN_STATES else 0.1)
claim = Claim(
text=sentence.strip(),
category=self._categorize_claim(sentence),
confidence=min(0.9, confidence),
entities=[geo_entity],
geographic_entities=[geo_entity]
)
claims.append(claim)
return claims
def _split_into_sentences(self, text: str) -> List[str]:
"""Split text into sentences using simple heuristics"""
import re
# Simple sentence splitting
sentences = re.split(r'[.!?]+', text)
# Clean and filter sentences
cleaned_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 10: # Minimum sentence length
cleaned_sentences.append(sentence)
return cleaned_sentences
def _calculate_sentence_claim_score(self, sentence: str) -> float:
"""Calculate how likely a sentence is to contain a claim"""
score = 0.0
sentence_lower = sentence.lower()
# Check for claim indicators
for indicator in self.claim_indicators:
if indicator in sentence_lower:
score += 0.2
# Check for assertion patterns
assertion_patterns = [
r'\b(is|are|was|were)\s+\w+', # "is dangerous", "are fake"
r'\b(will|would|can|could)\s+\w+', # "will cause", "can kill"
r'\b(always|never|all|every|no)\s+\w+', # Absolute statements
r'\b(proven|confirmed|established|fact)\b' # Certainty claims
]
import re
for pattern in assertion_patterns:
if re.search(pattern, sentence_lower):
score += 0.15
# Boost score for controversial topics
controversial_keywords = [
'vaccine', 'government', 'conspiracy', 'secret', 'hidden',
'dangerous', 'fake', 'hoax', 'fraud', 'scam'
]
for keyword in controversial_keywords:
if keyword in sentence_lower:
score += 0.1
return min(1.0, score)
def _categorize_claim(self, text: str) -> ClaimCategory:
"""Categorize a claim based on its content"""
text_lower = text.lower()
# Health-related keywords
if any(word in text_lower for word in ['vaccine', 'medicine', 'doctor', 'health', 'disease', 'cure', 'treatment']):
return ClaimCategory.HEALTH
# Politics-related keywords
if any(word in text_lower for word in ['government', 'election', 'politician', 'vote', 'policy', 'minister']):
return ClaimCategory.POLITICS
# Disaster-related keywords
if any(word in text_lower for word in ['earthquake', 'flood', 'cyclone', 'disaster', 'emergency', 'crisis']):
return ClaimCategory.DISASTER
# Technology-related keywords
if any(word in text_lower for word in ['5g', 'phone', 'internet', 'ai', 'technology', 'digital']):
return ClaimCategory.TECHNOLOGY
# Environment-related keywords
if any(word in text_lower for word in ['climate', 'environment', 'pollution', 'global warming', 'weather']):
return ClaimCategory.ENVIRONMENT
# Social-related keywords
if any(word in text_lower for word in ['community', 'religion', 'culture', 'tradition', 'society']):
return ClaimCategory.SOCIAL
return ClaimCategory.OTHER
def _find_sentences_with_entity(self, text: str, entity: str) -> List[str]:
"""Find sentences containing a specific entity"""
sentences = self._split_into_sentences(text)
matching_sentences = []
for sentence in sentences:
if entity.lower() in sentence.lower():
matching_sentences.append(sentence)
return matching_sentences
def _contains_geographic_claim(self, sentence: str, geo_entity: str) -> bool:
"""Check if sentence makes claims about a geographic entity"""
sentence_lower = sentence.lower()
# Look for claim patterns involving the geographic entity
claim_patterns = [
f"{geo_entity.lower()}.*(?:dangerous|unsafe|attack|threat|crisis)",
f"(?:in|at).*{geo_entity.lower()}.*(?:happening|occurred|reported|confirmed)",
f"{geo_entity.lower()}.*(?:government|officials|authorities).*(?:hiding|covering|denying)"
]
import re
for pattern in claim_patterns:
if re.search(pattern, sentence_lower):
return True
return False
def _deduplicate_claims(self, claims: List[Claim]) -> List[Claim]:
"""Remove duplicate claims based on text similarity"""
if not claims:
return []
unique_claims = []
for claim in claims:
is_duplicate = False
for existing_claim in unique_claims:
# Simple similarity check based on text overlap
similarity = self._calculate_text_similarity(claim.text, existing_claim.text)
if similarity > 0.7: # Threshold for considering as duplicate
is_duplicate = True
# Keep the claim with higher confidence
if claim.confidence > existing_claim.confidence:
unique_claims.remove(existing_claim)
unique_claims.append(claim)
break
if not is_duplicate:
unique_claims.append(claim)
return unique_claims
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
"""Calculate simple text similarity based on word overlap"""
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union)
class ViralityScorer:
"""
Calculates virality scores for events based on source credibility,
engagement metrics, and content characteristics.
"""
def __init__(self):
self.source_credibility = self._load_source_credibility_scores()
def _load_source_credibility_scores(self) -> Dict[EventSource, float]:
"""Load credibility scores for different event sources"""
return {
EventSource.NEWS: 0.7, # Traditional news sources
EventSource.TWITTER: 0.4, # Social media - lower credibility
EventSource.FACEBOOK: 0.3, # Social media - lower credibility
EventSource.RSS: 0.6, # RSS feeds - moderate credibility
EventSource.MANUAL: 0.5 # Manual input - neutral
}
def calculate_virality_score(self, raw_event: RawEvent, text_analysis: TextAnalysisResult,
claims: List[Claim]) -> float:
"""
Calculate virality score based on multiple factors.
Higher score indicates higher potential for viral spread.
"""
try:
# Base score from source credibility (inverted - less credible sources spread faster)
base_score = 1.0 - self.source_credibility.get(raw_event.source, 0.5)
# Content factors
content_score = self._calculate_content_virality(text_analysis, claims)
# Engagement factors (if available)
engagement_score = self._calculate_engagement_virality(raw_event.engagement_metrics)
# Timing factors
timing_score = self._calculate_timing_virality(raw_event.timestamp)
# Combine scores with weights
virality_score = (
base_score * 0.3 +
content_score * 0.4 +
engagement_score * 0.2 +
timing_score * 0.1
)
return min(1.0, max(0.0, virality_score))
except Exception as e:
logger.warning(f"Virality score calculation failed: {e}")
return 0.5 # Default neutral score
def _calculate_content_virality(self, text_analysis: TextAnalysisResult, claims: List[Claim]) -> float:
"""Calculate virality based on content characteristics"""
score = 0.0
# Emotional content spreads faster
sentiment_magnitude = abs(text_analysis.sentiment_score)
score += sentiment_magnitude * 0.3
# Controversial claims spread faster
if claims:
avg_claim_confidence = sum(claim.confidence for claim in claims) / len(claims)
score += avg_claim_confidence * 0.4
# Certain keywords increase virality
viral_keywords = [
'breaking', 'urgent', 'shocking', 'exposed', 'secret', 'hidden',
'conspiracy', 'scandal', 'leaked', 'exclusive', 'warning', 'alert'
]
keyword_count = sum(1 for keyword in viral_keywords if keyword in text_analysis.original_text.lower())
score += min(0.3, keyword_count * 0.1)
return min(1.0, score)
def _calculate_engagement_virality(self, engagement_metrics: Optional[Dict[str, int]]) -> float:
"""Calculate virality based on engagement metrics"""
if not engagement_metrics:
return 0.5 # Neutral score if no metrics available
# Normalize engagement metrics
likes = engagement_metrics.get('likes', 0)
shares = engagement_metrics.get('shares', 0)
comments = engagement_metrics.get('comments', 0)
# Shares are most important for virality
share_score = min(1.0, shares / 100) # Normalize assuming 100 shares = max score
like_score = min(1.0, likes / 1000) # Normalize assuming 1000 likes = max score
comment_score = min(1.0, comments / 50) # Normalize assuming 50 comments = max score
return (share_score * 0.5 + like_score * 0.3 + comment_score * 0.2)
def _calculate_timing_virality(self, timestamp: datetime) -> float:
"""Calculate virality based on timing factors"""
# Recent content has higher virality potential
now = datetime.utcnow()
age_hours = (now - timestamp).total_seconds() / 3600
if age_hours < 1:
return 1.0 # Very recent
elif age_hours < 6:
return 0.8 # Recent
elif age_hours < 24:
return 0.6 # Same day
elif age_hours < 72:
return 0.4 # Within 3 days
else:
return 0.2 # Older content
class EventProcessor:
"""
Main event processor that orchestrates the entire pipeline:
NLP analysis -> Claim extraction -> Virality scoring -> Satellite validation
"""
def __init__(self):
self.claim_extractor = ClaimExtractor()
self.virality_scorer = ViralityScorer()
self.initialized = False
async def initialize(self) -> bool:
"""Initialize the event processor and all its components"""
try:
# Initialize NLP analyzer
nlp_success = await nlp_analyzer.initialize()
if not nlp_success:
logger.error("Failed to initialize NLP analyzer")
return False
# Initialize database
db_success = await database.initialize()
if not db_success:
logger.error("Failed to initialize database")
return False
self.initialized = True
logger.info("Event processor initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize event processor: {e}")
return False
async def process_event(self, raw_event: RawEvent) -> Optional[ProcessedEvent]:
"""
Process a raw event through the complete pipeline.
Returns a ProcessedEvent ready for storage and visualization.
"""
if not self.initialized:
logger.error("Event processor not initialized")
return None
try:
logger.debug(f"Processing event from {raw_event.source.value}")
# Step 1: NLP Analysis
text_analysis = await nlp_analyzer.analyze_text(raw_event.original_text)
# Step 2: Claim Extraction
claim_result = self.claim_extractor.extract_claims(text_analysis)
# Step 3: Geographic Processing
region_hint, lat, lon = self._extract_geographic_info(text_analysis, raw_event.metadata)
# Step 4: Virality Scoring
virality_score = self.virality_scorer.calculate_virality_score(
raw_event, text_analysis, claim_result.claims
)
# Step 5: Create ProcessedEvent
processed_event = ProcessedEvent(
source=raw_event.source,
original_text=raw_event.original_text,
timestamp=raw_event.timestamp,
lang=text_analysis.language_detection.language,
region_hint=region_hint,
lat=lat,
lon=lon,
entities=text_analysis.entities.entities,
virality_score=virality_score,
satellite=None, # Will be populated by satellite validation
claims=claim_result.claims,
processing_metadata={
"nlp_analysis": text_analysis.metadata,
"claim_extraction": claim_result.processing_metadata,
"virality_factors": {
"source_credibility": self.virality_scorer.source_credibility.get(raw_event.source, 0.5),
"content_score": virality_score
},
"processing_time_ms": text_analysis.processing_time_ms
}
)
logger.debug(f"Event processed successfully: {processed_event.event_id}")
return processed_event
except Exception as e:
logger.error(f"Event processing failed: {e}")
return None
def _extract_geographic_info(self, text_analysis: TextAnalysisResult,
metadata: Dict[str, Any]) -> Tuple[str, float, float]:
"""
Extract geographic information from text analysis and metadata.
Returns (region_hint, latitude, longitude)
"""
region_hint = ""
lat, lon = 0.0, 0.0
# Try to get location from metadata first
if metadata.get('location'):
location_data = metadata['location']
if isinstance(location_data, dict):
lat = location_data.get('lat', 0.0)
lon = location_data.get('lon', 0.0)
region_hint = location_data.get('region', '')
# If no metadata location, try to infer from text analysis
if not region_hint and text_analysis.entities.indian_states:
# Use the first Indian state found
region_hint = normalize_state_name(text_analysis.entities.indian_states[0])
# Get approximate coordinates for the state
lat, lon = self._get_state_coordinates(region_hint)
elif not region_hint and text_analysis.entities.geographic_entities:
# Try to match geographic entities to Indian states
for geo_entity in text_analysis.entities.geographic_entities:
normalized_entity = normalize_state_name(geo_entity)
if normalized_entity.lower() in INDIAN_STATES:
region_hint = normalized_entity
lat, lon = self._get_state_coordinates(region_hint)
break
return region_hint, lat, lon
def _get_state_coordinates(self, state_name: str) -> Tuple[float, float]:
"""Get approximate coordinates for an Indian state"""
# Approximate coordinates for Indian state capitals
state_coordinates = {
"andhra pradesh": (15.9129, 79.7400),
"arunachal pradesh": (28.2180, 94.7278),
"assam": (26.2006, 92.9376),
"bihar": (25.0961, 85.3131),
"chhattisgarh": (21.2787, 81.8661),
"goa": (15.2993, 74.1240),
"gujarat": (23.0225, 72.5714),
"haryana": (29.0588, 76.0856),
"himachal pradesh": (31.1048, 77.1734),
"jharkhand": (23.6102, 85.2799),
"karnataka": (15.3173, 75.7139),
"kerala": (10.8505, 76.2711),
"madhya pradesh": (22.9734, 78.6569),
"maharashtra": (19.7515, 75.7139),
"manipur": (24.6637, 93.9063),
"meghalaya": (25.4670, 91.3662),
"mizoram": (23.1645, 92.9376),
"nagaland": (26.1584, 94.5624),
"odisha": (20.9517, 85.0985),
"punjab": (31.1471, 75.3412),
"rajasthan": (27.0238, 74.2179),
"sikkim": (27.5330, 88.5122),
"tamil nadu": (11.1271, 78.6569),
"telangana": (18.1124, 79.0193),
"tripura": (23.9408, 91.9882),
"uttar pradesh": (26.8467, 80.9462),
"uttarakhand": (30.0668, 79.0193),
"west bengal": (22.9868, 87.8550),
"delhi": (28.7041, 77.1025),
"jammu and kashmir": (34.0837, 74.7973),
"ladakh": (34.1526, 77.5771)
}
return state_coordinates.get(state_name.lower(), (0.0, 0.0))
# Global processor instance
event_processor = EventProcessor() |