rwttrter / backend /grok_sensor.py
plexdx's picture
Upload 26 files
64d289f verified
"""
grok_sensor.py — Async X API v2 + Community Notes integration.
Queries two signals for any claim:
1. 7-day tweet velocity: how fast is this claim spreading?
High velocity + no corroboration = yellow flag
2. Community Notes: has the crowd-sourced fact-check system flagged it?
Active note = strong red signal (-0.4 in trust scoring)
Full mock fallback when X_BEARER_TOKEN is absent — the system runs
end-to-end in demo mode without any external API credentials.
"""
import hashlib
import random
from datetime import datetime, timedelta, timezone
import httpx
import structlog
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from core.config import Settings, get_settings
from core.models import GrokSensorResult
log = structlog.get_logger(__name__)
X_API_BASE = "https://api.twitter.com/2"
COMMUNITY_NOTES_BASE = "https://twitter.com/i/birdwatch/n" # Unofficial — use search API workaround
# ---------------------------------------------------------------------------
# X API v2 search
# ---------------------------------------------------------------------------
@retry(
retry=retry_if_exception_type(httpx.HTTPStatusError),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.5, min=0.1, max=2.0),
)
async def _search_x_api(query: str, bearer_token: str) -> int:
"""
Search X API v2 for tweet count matching the query in the past 7 days.
Returns the total tweet count as a velocity signal.
Uses tenacity for exponential backoff on HTTP 429 (rate limit) responses.
"""
params = {
"query": f"{query} -is:retweet lang:en",
"start_time": (datetime.now(timezone.utc) - timedelta(days=7)).isoformat(),
"granularity": "day",
}
headers = {"Authorization": f"Bearer {bearer_token}"}
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(
f"{X_API_BASE}/tweets/counts/recent",
params=params,
headers=headers,
)
resp.raise_for_status()
data = resp.json()
return data.get("meta", {}).get("total_tweet_count", 0)
async def _check_community_notes(query_keywords: list[str], bearer_token: str) -> tuple[bool, str | None]:
"""
Check for active Community Notes using the X API v2 search endpoint.
Community Notes are exposed as tweets from @CommunityNotes.
Returns (has_note: bool, note_text: str | None).
"""
query = " ".join(query_keywords[:5]) # Use top-5 keywords for targeted search
params = {
"query": f"(from:CommunityNotes) ({query})",
"max_results": 5,
"tweet.fields": "text,created_at",
"start_time": (datetime.now(timezone.utc) - timedelta(days=30)).isoformat(),
}
headers = {"Authorization": f"Bearer {bearer_token}"}
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(
f"{X_API_BASE}/tweets/search/recent",
params=params,
headers=headers,
)
if resp.status_code == 200:
data = resp.json()
tweets = data.get("data", [])
if tweets:
return True, tweets[0]["text"]
return False, None
def _extract_keywords(text: str) -> list[str]:
"""
Extract the most meaningful content words for query construction.
Strips stopwords; keeps nouns, numbers, proper nouns (heuristic: capitalized).
"""
stopwords = {
"the", "a", "an", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will", "would", "could",
"should", "may", "might", "shall", "can", "this", "that", "these",
"those", "i", "we", "you", "he", "she", "it", "they", "and", "or",
"but", "in", "on", "at", "to", "for", "of", "with", "by", "from",
"up", "as", "into", "through", "about", "after", "before",
}
words = [w.strip(".,!?;:\"'()[]") for w in text.split()]
return [w for w in words if w.lower() not in stopwords and len(w) > 3][:10]
def _mock_sensor_result(claim_hash: str) -> GrokSensorResult:
"""
Deterministic mock result derived from the claim hash.
Same hash always produces the same result — stable for testing.
"""
seed = int(claim_hash[:8], 16) if all(c in "0123456789abcdef" for c in claim_hash[:8]) else hash(claim_hash)
rng = random.Random(seed)
velocity = rng.randint(0, 50_000)
has_note = rng.random() < 0.12 # ~12% chance of a community note (realistic)
note_text = (
"Community Note: This claim lacks context. The full data shows..."
if has_note
else None
)
return GrokSensorResult(
velocity=velocity,
community_note=has_note,
note_text=note_text,
is_mock=True,
)
# ---------------------------------------------------------------------------
# Public interface
# ---------------------------------------------------------------------------
async def query_grok_sensor(
claim_text: str,
claim_hash: str,
settings: Settings | None = None,
) -> GrokSensorResult:
"""
Main entry point: query X API for claim velocity and Community Notes.
Falls back to deterministic mock data when X_BEARER_TOKEN is absent.
The mock is seeded by claim_hash so results are consistent across calls.
"""
cfg = settings or get_settings()
if cfg.demo_mode or not cfg.has_x_api:
result = _mock_sensor_result(claim_hash)
log.debug("grok_sensor.mock", velocity=result.velocity, has_note=result.community_note)
return result
keywords = _extract_keywords(claim_text)
query = " ".join(keywords[:5])
try:
velocity, (has_note, note_text) = await _search_x_api(query, cfg.x_bearer_token), (False, None)
# Only check community notes if velocity is nonzero (claim is circulating)
if velocity > 100:
has_note, note_text = await _check_community_notes(keywords, cfg.x_bearer_token)
result = GrokSensorResult(
velocity=velocity,
community_note=has_note,
note_text=note_text,
is_mock=False,
)
log.info("grok_sensor.live", velocity=velocity, has_note=has_note)
return result
except httpx.HTTPError as exc:
log.warning("grok_sensor.api_error", error=str(exc), fallback="mock")
return _mock_sensor_result(claim_hash)