File size: 7,491 Bytes
f589dab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
"""
grok_sensor.py — X API v2 sensor with Community Notes and full mock fallback.

Queries:
  1. X API v2 search/recent: 7-day tweet volume for claim keywords
  2. Community Notes API: active notes matching claim hash

Returns:
  XSensorResult: {velocity, community_note, note_text, query_used}

Design decisions:
  - All I/O is async (httpx.AsyncClient)
  - Full mock fallback when X_BEARER_TOKEN is absent (common in dev/CI)
  - Rate-limit aware: exponential backoff via tenacity
  - Keyword extraction: noun phrases + named entities via regex (no spaCy dependency)
"""

from __future__ import annotations

import hashlib
import os
import re
import time
from dataclasses import dataclass
from typing import Optional

import httpx
import structlog
from tenacity import retry, stop_after_attempt, wait_exponential

log = structlog.get_logger(__name__)

X_SEARCH_URL = "https://api.twitter.com/2/tweets/search/recent"
X_COMMUNITY_NOTES_URL = "https://api.twitter.com/2/notes/search"

MAX_QUERY_LEN = 512


@dataclass
class XSensorResult:
    velocity: int               # 7-day tweet volume for claim keywords
    community_note: bool        # Active Community Note found
    note_text: str | None       # Text of the most relevant note, if any
    query_used: str             # The search query constructed from the claim
    source: str = "api"         # "api" or "mock"


def _extract_keywords(text: str, max_keywords: int = 6) -> str:
    """
    Extract query keywords from claim text using patterns:
    - Quoted strings (already specific)
    - Named entities: capitalized sequences of 1-3 words
    - Numbers with context
    Strips stop words to improve query precision.
    """
    stop_words = {
        "the", "a", "an", "is", "are", "was", "were", "be", "been",
        "have", "has", "had", "do", "did", "will", "would", "could",
        "should", "may", "might", "shall", "can", "this", "that",
        "these", "those", "it", "its", "their", "they", "we", "you",
        "i", "he", "she", "and", "or", "but", "in", "on", "at", "to",
        "for", "of", "with", "by", "from", "into", "about", "as",
    }

    # Quoted substrings get priority
    quoted = re.findall(r'"([^"]{3,40})"', text)
    if quoted:
        return " ".join(quoted[:2])[:MAX_QUERY_LEN]

    # Named entity sequences (capitalized consecutive words)
    named_entities = re.findall(r"(?:[A-Z][a-z]+)(?:\s+[A-Z][a-z]+){0,2}", text)
    # Numbers with unit context
    numeric = re.findall(r"\d+(?:\.\d+)?\s*(?:%|billion|million|thousand|people|cases|deaths|km|mph)", text)

    candidates: list[str] = named_entities + numeric
    # Fall back to all non-stop words
    if not candidates:
        candidates = [w for w in text.split() if w.lower() not in stop_words and len(w) > 3]

    keywords = list(dict.fromkeys(candidates))[:max_keywords]  # deduplicate, preserve order
    query = " ".join(keywords)[:MAX_QUERY_LEN]
    return query or text[:100]


@retry(
    stop=stop_after_attempt(2),
    wait=wait_exponential(multiplier=0.5, min=0.5, max=4.0),
    reraise=False,
)
async def _search_x_volume(bearer_token: str, query: str) -> int:
    """Query X API v2 recent search to estimate 7-day tweet volume."""
    async with httpx.AsyncClient(timeout=8.0) as client:
        response = await client.get(
            X_SEARCH_URL,
            params={
                "query": query,
                "max_results": 10,
                "tweet.fields": "created_at,public_metrics",
                "start_time": _iso_7d_ago(),
            },
            headers={"Authorization": f"Bearer {bearer_token}"},
        )
        if response.status_code == 429:
            log.warning("x_api.rate_limited")
            return -1  # Signal to retry
        response.raise_for_status()
        data = response.json()
        meta = data.get("meta", {})
        return meta.get("result_count", 0)


@retry(
    stop=stop_after_attempt(2),
    wait=wait_exponential(multiplier=0.5, min=0.5, max=4.0),
    reraise=False,
)
async def _check_community_notes(bearer_token: str, query: str) -> tuple[bool, str | None]:
    """
    Check X Community Notes API for active notes matching the query.
    Returns (has_note, note_text).
    """
    async with httpx.AsyncClient(timeout=8.0) as client:
        try:
            response = await client.get(
                X_COMMUNITY_NOTES_URL,
                params={"query": query[:200], "max_results": 5},
                headers={"Authorization": f"Bearer {bearer_token}"},
            )
            if response.status_code in (401, 403):
                log.warning("x_api.notes_unauthorized")
                return False, None
            if response.status_code == 429:
                return False, None
            response.raise_for_status()
            data = response.json()
            notes = data.get("data", [])
            if notes:
                top_note = notes[0]
                return True, top_note.get("text", "Community Note found.")[:400]
            return False, None
        except Exception as exc:
            log.debug("x_api.notes_error", error=str(exc))
            return False, None


async def query_x_sensor(claim_text: str, claim_hash: str) -> XSensorResult:
    """
    Main entry point for the X sensor.
    Falls back to deterministic mock data when bearer token is absent.
    """
    bearer_token = os.getenv("X_BEARER_TOKEN", "")
    query = _extract_keywords(claim_text)

    if not bearer_token:
        log.debug("x_sensor.mock_mode", reason="X_BEARER_TOKEN not set")
        return _mock_result(claim_hash, query)

    try:
        # Run both queries concurrently
        import asyncio
        velocity_task = asyncio.create_task(_search_x_volume(bearer_token, query))
        notes_task = asyncio.create_task(_check_community_notes(bearer_token, query))

        velocity, (has_note, note_text) = await asyncio.gather(
            velocity_task, notes_task
        )

        return XSensorResult(
            velocity=max(0, velocity),
            community_note=has_note,
            note_text=note_text,
            query_used=query,
            source="api",
        )
    except Exception as exc:
        log.warning("x_sensor.api_error", error=str(exc), fallback="mock")
        return _mock_result(claim_hash, query)


def _mock_result(claim_hash: str, query: str) -> XSensorResult:
    """
    Deterministic mock based on hash — same claim always gets same mock result.
    Simulates a realistic distribution of X sensor outcomes.
    """
    # Use first 2 bytes of hash as seed for deterministic variation
    seed = int(claim_hash[:4], 16) if len(claim_hash) >= 4 else 0
    velocity_options = [12, 87, 340, 1250, 5800, 23000, 91000]
    velocity = velocity_options[seed % len(velocity_options)]
    has_note = (seed % 7) == 0  # ~14% chance of community note
    note_text = (
        "This claim contains misleading context. Several independent "
        "fact-checkers have rated it as partly false."
        if has_note else None
    )
    return XSensorResult(
        velocity=velocity,
        community_note=has_note,
        note_text=note_text,
        query_used=query,
        source="mock",
    )


def _iso_7d_ago() -> str:
    """ISO 8601 timestamp for 7 days ago (X API format)."""
    seven_days_ago = time.time() - (7 * 24 * 3600)
    from datetime import datetime, timezone
    return datetime.fromtimestamp(seven_days_ago, tz=timezone.utc).strftime(
        "%Y-%m-%dT%H:%M:%SZ"
    )