| """ |
| grok_sensor.py — Async X API v2 + Community Notes integration. |
| |
| Queries two signals for any claim: |
| 1. 7-day tweet velocity: how fast is this claim spreading? |
| High velocity + no corroboration = yellow flag |
| 2. Community Notes: has the crowd-sourced fact-check system flagged it? |
| Active note = strong red signal (-0.4 in trust scoring) |
| |
| Full mock fallback when X_BEARER_TOKEN is absent — the system runs |
| end-to-end in demo mode without any external API credentials. |
| """ |
|
|
| import hashlib |
| import random |
| from datetime import datetime, timedelta, timezone |
|
|
| import httpx |
| import structlog |
| from tenacity import ( |
| retry, |
| retry_if_exception_type, |
| stop_after_attempt, |
| wait_exponential, |
| ) |
|
|
| from core.config import Settings, get_settings |
| from core.models import GrokSensorResult |
|
|
| log = structlog.get_logger(__name__) |
|
|
| X_API_BASE = "https://api.twitter.com/2" |
| COMMUNITY_NOTES_BASE = "https://twitter.com/i/birdwatch/n" |
|
|
| |
| |
| |
|
|
| @retry( |
| retry=retry_if_exception_type(httpx.HTTPStatusError), |
| stop=stop_after_attempt(3), |
| wait=wait_exponential(multiplier=0.5, min=0.1, max=2.0), |
| ) |
| async def _search_x_api(query: str, bearer_token: str) -> int: |
| """ |
| Search X API v2 for tweet count matching the query in the past 7 days. |
| Returns the total tweet count as a velocity signal. |
| |
| Uses tenacity for exponential backoff on HTTP 429 (rate limit) responses. |
| """ |
| params = { |
| "query": f"{query} -is:retweet lang:en", |
| "start_time": (datetime.now(timezone.utc) - timedelta(days=7)).isoformat(), |
| "granularity": "day", |
| } |
| headers = {"Authorization": f"Bearer {bearer_token}"} |
|
|
| async with httpx.AsyncClient(timeout=5.0) as client: |
| resp = await client.get( |
| f"{X_API_BASE}/tweets/counts/recent", |
| params=params, |
| headers=headers, |
| ) |
| resp.raise_for_status() |
| data = resp.json() |
| return data.get("meta", {}).get("total_tweet_count", 0) |
|
|
|
|
| async def _check_community_notes(query_keywords: list[str], bearer_token: str) -> tuple[bool, str | None]: |
| """ |
| Check for active Community Notes using the X API v2 search endpoint. |
| Community Notes are exposed as tweets from @CommunityNotes. |
| |
| Returns (has_note: bool, note_text: str | None). |
| """ |
| query = " ".join(query_keywords[:5]) |
| params = { |
| "query": f"(from:CommunityNotes) ({query})", |
| "max_results": 5, |
| "tweet.fields": "text,created_at", |
| "start_time": (datetime.now(timezone.utc) - timedelta(days=30)).isoformat(), |
| } |
| headers = {"Authorization": f"Bearer {bearer_token}"} |
|
|
| async with httpx.AsyncClient(timeout=5.0) as client: |
| resp = await client.get( |
| f"{X_API_BASE}/tweets/search/recent", |
| params=params, |
| headers=headers, |
| ) |
| if resp.status_code == 200: |
| data = resp.json() |
| tweets = data.get("data", []) |
| if tweets: |
| return True, tweets[0]["text"] |
| return False, None |
|
|
|
|
| def _extract_keywords(text: str) -> list[str]: |
| """ |
| Extract the most meaningful content words for query construction. |
| Strips stopwords; keeps nouns, numbers, proper nouns (heuristic: capitalized). |
| """ |
| stopwords = { |
| "the", "a", "an", "is", "are", "was", "were", "be", "been", "being", |
| "have", "has", "had", "do", "does", "did", "will", "would", "could", |
| "should", "may", "might", "shall", "can", "this", "that", "these", |
| "those", "i", "we", "you", "he", "she", "it", "they", "and", "or", |
| "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", |
| "up", "as", "into", "through", "about", "after", "before", |
| } |
| words = [w.strip(".,!?;:\"'()[]") for w in text.split()] |
| return [w for w in words if w.lower() not in stopwords and len(w) > 3][:10] |
|
|
|
|
| def _mock_sensor_result(claim_hash: str) -> GrokSensorResult: |
| """ |
| Deterministic mock result derived from the claim hash. |
| Same hash always produces the same result — stable for testing. |
| """ |
| seed = int(claim_hash[:8], 16) if all(c in "0123456789abcdef" for c in claim_hash[:8]) else hash(claim_hash) |
| rng = random.Random(seed) |
|
|
| velocity = rng.randint(0, 50_000) |
| has_note = rng.random() < 0.12 |
| note_text = ( |
| "Community Note: This claim lacks context. The full data shows..." |
| if has_note |
| else None |
| ) |
|
|
| return GrokSensorResult( |
| velocity=velocity, |
| community_note=has_note, |
| note_text=note_text, |
| is_mock=True, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| async def query_grok_sensor( |
| claim_text: str, |
| claim_hash: str, |
| settings: Settings | None = None, |
| ) -> GrokSensorResult: |
| """ |
| Main entry point: query X API for claim velocity and Community Notes. |
| |
| Falls back to deterministic mock data when X_BEARER_TOKEN is absent. |
| The mock is seeded by claim_hash so results are consistent across calls. |
| """ |
| cfg = settings or get_settings() |
|
|
| if cfg.demo_mode or not cfg.has_x_api: |
| result = _mock_sensor_result(claim_hash) |
| log.debug("grok_sensor.mock", velocity=result.velocity, has_note=result.community_note) |
| return result |
|
|
| keywords = _extract_keywords(claim_text) |
| query = " ".join(keywords[:5]) |
|
|
| try: |
| velocity, (has_note, note_text) = await _search_x_api(query, cfg.x_bearer_token), (False, None) |
|
|
| |
| if velocity > 100: |
| has_note, note_text = await _check_community_notes(keywords, cfg.x_bearer_token) |
|
|
| result = GrokSensorResult( |
| velocity=velocity, |
| community_note=has_note, |
| note_text=note_text, |
| is_mock=False, |
| ) |
| log.info("grok_sensor.live", velocity=velocity, has_note=has_note) |
| return result |
|
|
| except httpx.HTTPError as exc: |
| log.warning("grok_sensor.api_error", error=str(exc), fallback="mock") |
| return _mock_sensor_result(claim_hash) |
|
|