File size: 6,197 Bytes
f589dab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | """
gatekeeper.py β Groq-powered fact vs. noise edge router.
Design goal: sub-120ms p95 latency to filter social noise before the
expensive RAG + multi-agent pipeline fires.
The gatekeeper uses llama3-8b-8192 on Groq because:
- 800+ tokens/sec throughput β deterministic latency budget
- Structured JSON output schema enforced via Pydantic v2
- Free tier handles high throughput for the gatekeeper role
- Groq's hardware is optimized for small-model, low-latency inference
"""
from __future__ import annotations
import os
import re
from enum import Enum
from functools import lru_cache
import structlog
from groq import AsyncGroq
from pydantic import BaseModel, Field
from tenacity import retry, stop_after_attempt, wait_exponential
log = structlog.get_logger(__name__)
# ββ Models ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class ClaimLabel(str, Enum):
FACT = "fact"
NOISE = "noise"
class GatekeeperResult(BaseModel):
label: ClaimLabel
reason: str = Field(description="One-sentence explanation for the classification")
falsifiability_score: float = Field(
ge=0.0, le=1.0,
description="0=pure opinion/meme, 1=objectively verifiable claim"
)
# ββ Groq client βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@lru_cache(maxsize=1)
def get_groq_client() -> AsyncGroq:
api_key = os.getenv("GROQ_API_KEY", "")
if not api_key:
log.warning("gatekeeper.no_api_key", msg="GROQ_API_KEY not set, using mock fallback")
return AsyncGroq(api_key=api_key or "gsk_placeholder")
GATEKEEPER_SYSTEM = """You are a claim falsifiability classifier. Your ONLY job is to decide whether a given piece of text contains a falsifiable factual claim that can be fact-checked.
OUTPUT RULES:
- Respond ONLY with a valid JSON object. No markdown, no preamble.
- Schema: {"label": "fact"|"noise", "reason": "<one sentence>", "falsifiability_score": <0.0-1.0>}
FACT = a specific, falsifiable assertion about the world (statistics, historical events, named person claims, scientific assertions, quotes, causal claims).
NOISE = opinions, memes, questions, sarcasm, social filler, greetings, jokes, abstract feelings.
Examples of FACT:
- "The unemployment rate hit 4.2% in September 2024"
- "Elon Musk acquired Twitter for $44 billion"
- "COVID vaccines contain microchips" (falsifiable even if wrong)
Examples of NOISE:
- "lol this is wild"
- "do you think AI will take our jobs?"
- "i love this song"
- "we need to do better as a society"
"""
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=0.1, min=0.05, max=0.5),
reraise=False,
)
async def classify_claim(text: str) -> GatekeeperResult:
"""
Classify a text as a falsifiable factual claim ('fact') or noise.
Falls back gracefully if Groq is unavailable.
"""
# Pre-filter: very short text is almost always noise
if len(text.split()) < 5:
return GatekeeperResult(
label=ClaimLabel.NOISE,
reason="Text too short to contain a verifiable claim.",
falsifiability_score=0.0,
)
api_key = os.getenv("GROQ_API_KEY", "")
if not api_key:
return _heuristic_classify(text)
try:
client = get_groq_client()
response = await client.chat.completions.create(
model="llama3-8b-8192",
messages=[
{"role": "system", "content": GATEKEEPER_SYSTEM},
{"role": "user", "content": f"Classify this text:\n\n{text[:800]}"},
],
temperature=0.0, # deterministic
max_tokens=120,
response_format={"type": "json_object"},
)
raw_json = response.choices[0].message.content or "{}"
# Use model_validate_json β raises ValidationError on bad schema, not try/except
result = GatekeeperResult.model_validate_json(raw_json)
log.debug(
"gatekeeper.classified",
label=result.label,
score=result.falsifiability_score,
text_preview=text[:60],
)
return result
except Exception as exc:
log.warning("gatekeeper.groq_error", error=str(exc), fallback="heuristic")
return _heuristic_classify(text)
def _heuristic_classify(text: str) -> GatekeeperResult:
"""
CPU-only heuristic fallback when Groq API is unavailable.
Uses a bag-of-patterns approach; deliberately conservative (leans toward 'fact')
to avoid silently dropping real claims.
"""
noise_patterns = [
r"^\s*(lol|lmao|omg|wtf|bruh|smh|imo|tbh|idk|ngl|gg|rip)\b",
r"^\s*[?!]{1,3}\s*$",
r"(how are you|what do you think|do you believe|feel like)",
r"^\s*(yes|no|maybe|sure|ok|okay|yep|nope)\s*[.!?]?\s*$",
r"(i love|i hate|i feel|i think)\s+(?:this|that|it)",
]
text_lower = text.lower()
for pat in noise_patterns:
if re.search(pat, text_lower):
return GatekeeperResult(
label=ClaimLabel.NOISE,
reason="Pattern-matched as social noise by heuristic classifier.",
falsifiability_score=0.1,
)
# Signals of a factual claim
fact_signals = [
r"\d+\s*(%|percent|billion|million|thousand)",
r"\b(according to|reported|confirmed|announced|study|research|data shows)\b",
r"\b(president|ceo|minister|government|official)\b.{0,50}\b(said|stated|announced)\b",
r"\b(died|killed|arrested|charged|convicted|sentenced)\b",
r"https?://",
]
score = sum(0.15 for pat in fact_signals if re.search(pat, text_lower))
score = min(score, 0.85)
label = ClaimLabel.FACT if score >= 0.15 else ClaimLabel.NOISE
return GatekeeperResult(
label=label,
reason="Heuristic classification (Groq unavailable).",
falsifiability_score=score if label == ClaimLabel.FACT else 0.15,
)
|