""" grok_sensor.py — X API v2 sensor with Community Notes and full mock fallback. Queries: 1. X API v2 search/recent: 7-day tweet volume for claim keywords 2. Community Notes API: active notes matching claim hash Returns: XSensorResult: {velocity, community_note, note_text, query_used} Design decisions: - All I/O is async (httpx.AsyncClient) - Full mock fallback when X_BEARER_TOKEN is absent (common in dev/CI) - Rate-limit aware: exponential backoff via tenacity - Keyword extraction: noun phrases + named entities via regex (no spaCy dependency) """ from __future__ import annotations import hashlib import os import re import time from dataclasses import dataclass from typing import Optional import httpx import structlog from tenacity import retry, stop_after_attempt, wait_exponential log = structlog.get_logger(__name__) X_SEARCH_URL = "https://api.twitter.com/2/tweets/search/recent" X_COMMUNITY_NOTES_URL = "https://api.twitter.com/2/notes/search" MAX_QUERY_LEN = 512 @dataclass class XSensorResult: velocity: int # 7-day tweet volume for claim keywords community_note: bool # Active Community Note found note_text: str | None # Text of the most relevant note, if any query_used: str # The search query constructed from the claim source: str = "api" # "api" or "mock" def _extract_keywords(text: str, max_keywords: int = 6) -> str: """ Extract query keywords from claim text using patterns: - Quoted strings (already specific) - Named entities: capitalized sequences of 1-3 words - Numbers with context Strips stop words to improve query precision. """ stop_words = { "the", "a", "an", "is", "are", "was", "were", "be", "been", "have", "has", "had", "do", "did", "will", "would", "could", "should", "may", "might", "shall", "can", "this", "that", "these", "those", "it", "its", "their", "they", "we", "you", "i", "he", "she", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "into", "about", "as", } # Quoted substrings get priority quoted = re.findall(r'"([^"]{3,40})"', text) if quoted: return " ".join(quoted[:2])[:MAX_QUERY_LEN] # Named entity sequences (capitalized consecutive words) named_entities = re.findall(r"(?:[A-Z][a-z]+)(?:\s+[A-Z][a-z]+){0,2}", text) # Numbers with unit context numeric = re.findall(r"\d+(?:\.\d+)?\s*(?:%|billion|million|thousand|people|cases|deaths|km|mph)", text) candidates: list[str] = named_entities + numeric # Fall back to all non-stop words if not candidates: candidates = [w for w in text.split() if w.lower() not in stop_words and len(w) > 3] keywords = list(dict.fromkeys(candidates))[:max_keywords] # deduplicate, preserve order query = " ".join(keywords)[:MAX_QUERY_LEN] return query or text[:100] @retry( stop=stop_after_attempt(2), wait=wait_exponential(multiplier=0.5, min=0.5, max=4.0), reraise=False, ) async def _search_x_volume(bearer_token: str, query: str) -> int: """Query X API v2 recent search to estimate 7-day tweet volume.""" async with httpx.AsyncClient(timeout=8.0) as client: response = await client.get( X_SEARCH_URL, params={ "query": query, "max_results": 10, "tweet.fields": "created_at,public_metrics", "start_time": _iso_7d_ago(), }, headers={"Authorization": f"Bearer {bearer_token}"}, ) if response.status_code == 429: log.warning("x_api.rate_limited") return -1 # Signal to retry response.raise_for_status() data = response.json() meta = data.get("meta", {}) return meta.get("result_count", 0) @retry( stop=stop_after_attempt(2), wait=wait_exponential(multiplier=0.5, min=0.5, max=4.0), reraise=False, ) async def _check_community_notes(bearer_token: str, query: str) -> tuple[bool, str | None]: """ Check X Community Notes API for active notes matching the query. Returns (has_note, note_text). """ async with httpx.AsyncClient(timeout=8.0) as client: try: response = await client.get( X_COMMUNITY_NOTES_URL, params={"query": query[:200], "max_results": 5}, headers={"Authorization": f"Bearer {bearer_token}"}, ) if response.status_code in (401, 403): log.warning("x_api.notes_unauthorized") return False, None if response.status_code == 429: return False, None response.raise_for_status() data = response.json() notes = data.get("data", []) if notes: top_note = notes[0] return True, top_note.get("text", "Community Note found.")[:400] return False, None except Exception as exc: log.debug("x_api.notes_error", error=str(exc)) return False, None async def query_x_sensor(claim_text: str, claim_hash: str) -> XSensorResult: """ Main entry point for the X sensor. Falls back to deterministic mock data when bearer token is absent. """ bearer_token = os.getenv("X_BEARER_TOKEN", "") query = _extract_keywords(claim_text) if not bearer_token: log.debug("x_sensor.mock_mode", reason="X_BEARER_TOKEN not set") return _mock_result(claim_hash, query) try: # Run both queries concurrently import asyncio velocity_task = asyncio.create_task(_search_x_volume(bearer_token, query)) notes_task = asyncio.create_task(_check_community_notes(bearer_token, query)) velocity, (has_note, note_text) = await asyncio.gather( velocity_task, notes_task ) return XSensorResult( velocity=max(0, velocity), community_note=has_note, note_text=note_text, query_used=query, source="api", ) except Exception as exc: log.warning("x_sensor.api_error", error=str(exc), fallback="mock") return _mock_result(claim_hash, query) def _mock_result(claim_hash: str, query: str) -> XSensorResult: """ Deterministic mock based on hash — same claim always gets same mock result. Simulates a realistic distribution of X sensor outcomes. """ # Use first 2 bytes of hash as seed for deterministic variation seed = int(claim_hash[:4], 16) if len(claim_hash) >= 4 else 0 velocity_options = [12, 87, 340, 1250, 5800, 23000, 91000] velocity = velocity_options[seed % len(velocity_options)] has_note = (seed % 7) == 0 # ~14% chance of community note note_text = ( "This claim contains misleading context. Several independent " "fact-checkers have rated it as partly false." if has_note else None ) return XSensorResult( velocity=velocity, community_note=has_note, note_text=note_text, query_used=query, source="mock", ) def _iso_7d_ago() -> str: """ISO 8601 timestamp for 7 days ago (X API format).""" seven_days_ago = time.time() - (7 * 24 * 3600) from datetime import datetime, timezone return datetime.fromtimestamp(seven_days_ago, tz=timezone.utc).strftime( "%Y-%m-%dT%H:%M:%SZ" )