File size: 7,491 Bytes
f589dab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | """
grok_sensor.py — X API v2 sensor with Community Notes and full mock fallback.
Queries:
1. X API v2 search/recent: 7-day tweet volume for claim keywords
2. Community Notes API: active notes matching claim hash
Returns:
XSensorResult: {velocity, community_note, note_text, query_used}
Design decisions:
- All I/O is async (httpx.AsyncClient)
- Full mock fallback when X_BEARER_TOKEN is absent (common in dev/CI)
- Rate-limit aware: exponential backoff via tenacity
- Keyword extraction: noun phrases + named entities via regex (no spaCy dependency)
"""
from __future__ import annotations
import hashlib
import os
import re
import time
from dataclasses import dataclass
from typing import Optional
import httpx
import structlog
from tenacity import retry, stop_after_attempt, wait_exponential
log = structlog.get_logger(__name__)
X_SEARCH_URL = "https://api.twitter.com/2/tweets/search/recent"
X_COMMUNITY_NOTES_URL = "https://api.twitter.com/2/notes/search"
MAX_QUERY_LEN = 512
@dataclass
class XSensorResult:
velocity: int # 7-day tweet volume for claim keywords
community_note: bool # Active Community Note found
note_text: str | None # Text of the most relevant note, if any
query_used: str # The search query constructed from the claim
source: str = "api" # "api" or "mock"
def _extract_keywords(text: str, max_keywords: int = 6) -> str:
"""
Extract query keywords from claim text using patterns:
- Quoted strings (already specific)
- Named entities: capitalized sequences of 1-3 words
- Numbers with context
Strips stop words to improve query precision.
"""
stop_words = {
"the", "a", "an", "is", "are", "was", "were", "be", "been",
"have", "has", "had", "do", "did", "will", "would", "could",
"should", "may", "might", "shall", "can", "this", "that",
"these", "those", "it", "its", "their", "they", "we", "you",
"i", "he", "she", "and", "or", "but", "in", "on", "at", "to",
"for", "of", "with", "by", "from", "into", "about", "as",
}
# Quoted substrings get priority
quoted = re.findall(r'"([^"]{3,40})"', text)
if quoted:
return " ".join(quoted[:2])[:MAX_QUERY_LEN]
# Named entity sequences (capitalized consecutive words)
named_entities = re.findall(r"(?:[A-Z][a-z]+)(?:\s+[A-Z][a-z]+){0,2}", text)
# Numbers with unit context
numeric = re.findall(r"\d+(?:\.\d+)?\s*(?:%|billion|million|thousand|people|cases|deaths|km|mph)", text)
candidates: list[str] = named_entities + numeric
# Fall back to all non-stop words
if not candidates:
candidates = [w for w in text.split() if w.lower() not in stop_words and len(w) > 3]
keywords = list(dict.fromkeys(candidates))[:max_keywords] # deduplicate, preserve order
query = " ".join(keywords)[:MAX_QUERY_LEN]
return query or text[:100]
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=0.5, min=0.5, max=4.0),
reraise=False,
)
async def _search_x_volume(bearer_token: str, query: str) -> int:
"""Query X API v2 recent search to estimate 7-day tweet volume."""
async with httpx.AsyncClient(timeout=8.0) as client:
response = await client.get(
X_SEARCH_URL,
params={
"query": query,
"max_results": 10,
"tweet.fields": "created_at,public_metrics",
"start_time": _iso_7d_ago(),
},
headers={"Authorization": f"Bearer {bearer_token}"},
)
if response.status_code == 429:
log.warning("x_api.rate_limited")
return -1 # Signal to retry
response.raise_for_status()
data = response.json()
meta = data.get("meta", {})
return meta.get("result_count", 0)
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=0.5, min=0.5, max=4.0),
reraise=False,
)
async def _check_community_notes(bearer_token: str, query: str) -> tuple[bool, str | None]:
"""
Check X Community Notes API for active notes matching the query.
Returns (has_note, note_text).
"""
async with httpx.AsyncClient(timeout=8.0) as client:
try:
response = await client.get(
X_COMMUNITY_NOTES_URL,
params={"query": query[:200], "max_results": 5},
headers={"Authorization": f"Bearer {bearer_token}"},
)
if response.status_code in (401, 403):
log.warning("x_api.notes_unauthorized")
return False, None
if response.status_code == 429:
return False, None
response.raise_for_status()
data = response.json()
notes = data.get("data", [])
if notes:
top_note = notes[0]
return True, top_note.get("text", "Community Note found.")[:400]
return False, None
except Exception as exc:
log.debug("x_api.notes_error", error=str(exc))
return False, None
async def query_x_sensor(claim_text: str, claim_hash: str) -> XSensorResult:
"""
Main entry point for the X sensor.
Falls back to deterministic mock data when bearer token is absent.
"""
bearer_token = os.getenv("X_BEARER_TOKEN", "")
query = _extract_keywords(claim_text)
if not bearer_token:
log.debug("x_sensor.mock_mode", reason="X_BEARER_TOKEN not set")
return _mock_result(claim_hash, query)
try:
# Run both queries concurrently
import asyncio
velocity_task = asyncio.create_task(_search_x_volume(bearer_token, query))
notes_task = asyncio.create_task(_check_community_notes(bearer_token, query))
velocity, (has_note, note_text) = await asyncio.gather(
velocity_task, notes_task
)
return XSensorResult(
velocity=max(0, velocity),
community_note=has_note,
note_text=note_text,
query_used=query,
source="api",
)
except Exception as exc:
log.warning("x_sensor.api_error", error=str(exc), fallback="mock")
return _mock_result(claim_hash, query)
def _mock_result(claim_hash: str, query: str) -> XSensorResult:
"""
Deterministic mock based on hash — same claim always gets same mock result.
Simulates a realistic distribution of X sensor outcomes.
"""
# Use first 2 bytes of hash as seed for deterministic variation
seed = int(claim_hash[:4], 16) if len(claim_hash) >= 4 else 0
velocity_options = [12, 87, 340, 1250, 5800, 23000, 91000]
velocity = velocity_options[seed % len(velocity_options)]
has_note = (seed % 7) == 0 # ~14% chance of community note
note_text = (
"This claim contains misleading context. Several independent "
"fact-checkers have rated it as partly false."
if has_note else None
)
return XSensorResult(
velocity=velocity,
community_note=has_note,
note_text=note_text,
query_used=query,
source="mock",
)
def _iso_7d_ago() -> str:
"""ISO 8601 timestamp for 7 days ago (X API format)."""
seven_days_ago = time.time() - (7 * 24 * 3600)
from datetime import datetime, timezone
return datetime.fromtimestamp(seven_days_ago, tz=timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
|