ThesisProject / checker /external /pattern_deviation.py
fborj's picture
updated pattern dev
80d0772 verified
Raw
History Blame Contribute Delete
10.4 kB
"""
Pattern Deviation Analyzer
===========================
Implements the student journalist mental model for bias detection:
1. Build a "reputable pattern" from external news titles/snippets
(Google News + local DB results from reliable sources).
2. Compare every article sentence against that pattern.
3. Sentences whose concepts are NOT present in ANY reputable source
are surfaced as "extra claims" β€” possible bias signals.
No new dependencies β€” uses the MiniLM model already loaded by the project
(sentence_transformers) for semantic similarity.
"""
import re
from typing import List, Dict, Any, Tuple
# Reliable source identifiers (same list as check_article.py)
RELIABLE_SOURCES = {
"inquirer", "philstar", "manila bulletin", "abs-cbn",
"cnn philippines", "gma", "rappler", "sunstar",
"businessmirror", "pna", "philippine news agency",
"bbc", "reuters", "ap", "associated press",
"new york times", "the guardian",
# Cebuano / Visayas regional outlets
"superbalita", "cebu daily news", "sunstar cebu",
"the freeman", "cebu pacific", "mb cebu", "cebu",
"sugbo", "visayas", "mindanao daily", "sunstar davao",
}
# Thresholds
# Per-title comparison: a sentence is corroborated if its similarity to ANY
# individual source title meets this threshold. Lower than the concatenated
# threshold because a direct title match is a precise corroboration signal.
PER_TITLE_SIMILARITY_THRESHOLD = 0.60
# Concatenated pattern fallback (used when per-title comparison isn't possible)
SENTENCE_SIMILARITY_THRESHOLD = 0.70
DEVIATION_MINOR_THRESHOLD = 0.30 # deviation_score >= this β†’ MINOR DEVIATION
DEVIATION_SIGNIFICANT_THRESHOLD = 0.65 # deviation_score >= this β†’ SIGNIFICANT DEVIATION
# Cap how many sentences we analyze (performance guard for long articles)
MAX_SENTENCES = 20
def _is_reliable(source: str) -> bool:
src = source.lower()
return any(r in src for r in RELIABLE_SOURCES)
def _split_sentences(text: str) -> List[str]:
"""Naively split text into sentences on . ! ? boundaries."""
raw = re.split(r"(?<=[.!?])\s+", text.strip())
# Filter very short fragments (less than 5 words)
return [s.strip() for s in raw if len(s.split()) >= 5]
def build_external_pattern(web_results: List[Dict], db_results: List[Dict]) -> str:
"""
Concatenate titles from reliable web and DB sources into a single
'pattern string' representing what reputable journalism says on this topic.
Returns:
str: Combined pattern text, or empty string if no reliable sources found.
"""
fragments = []
for r in web_results:
source = r.get("source", "")
title = r.get("title", "").strip()
if title and _is_reliable(source):
fragments.append(title)
for r in db_results:
source = r.get("source", "")
title = r.get("title", "").strip()
if title and _is_reliable(source):
fragments.append(title)
# Fall back to ALL sources (reliable or not) if none found
if not fragments:
for r in web_results + db_results:
title = r.get("title", "").strip()
if title:
fragments.append(title)
return " | ".join(fragments)
def _get_minilm():
"""Lazy import of MiniLM to avoid circular imports."""
from checker.internal.core import get_minilm_model
return get_minilm_model()
def _cosine(a, b) -> float:
"""Compute cosine similarity between two numpy vectors."""
import numpy as np
norm_a = float(np.linalg.norm(a))
norm_b = float(np.linalg.norm(b))
if norm_a == 0 or norm_b == 0:
return 0.0
return float(np.dot(a, b) / (norm_a * norm_b))
def compare_to_external_pattern(
article_text: str,
web_results: List[Dict],
db_results: List[Dict] = None,
) -> Dict[str, Any]:
"""
Compare the article's sentences against individual external source titles.
For each article sentence, compute the maximum cosine similarity against
EACH individual source title embedding (not a single concatenated blob).
A sentence is corroborated if it matches ANY single title at
>= PER_TITLE_SIMILARITY_THRESHOLD. This fixes the false-positive 'extra
claim' that occurred when a headline matched an external source title but
got diluted by the concatenated embedding of all other titles.
Args:
article_text (str): The full article or claim to analyze.
web_results (list): Google News results (dicts with 'title', 'source').
db_results (list): Local DB results (dicts with 'title', 'source').
Returns:
dict with keys:
- external_pattern (str): Summary of what reputable sources say.
- extra_claims (list[str]): Sentences not found in external pattern.
- corroborated_claims (list[str]): Sentences aligned with pattern.
- deviation_score (float): 0-1 fraction of sentences that deviate.
- verdict (str): "FOLLOWS PATTERN" | "MINOR DEVIATION" | "SIGNIFICANT DEVIATION"
- reliable_source_count (int): Number of reliable sources used.
- all_source_titles (list[str]): All titles from external sources.
"""
if db_results is None:
db_results = []
# ── Build external pattern ──────────────────────────────────────────────
external_pattern = build_external_pattern(web_results, db_results)
# Collect all titles for display
all_titles = []
for r in web_results + db_results:
t = r.get("title", "").strip()
if t:
all_titles.append((t, r.get("source", "")))
reliable_count = sum(1 for r in web_results + db_results if _is_reliable(r.get("source", "")))
# If no external sources at all, can't compare
if not external_pattern.strip():
return {
"external_pattern": "",
"extra_claims": [],
"corroborated_claims": [],
"deviation_score": 0.0,
"verdict": "NO EXTERNAL SOURCES",
"reliable_source_count": 0,
"all_source_titles": [],
}
# ── Split article into sentences ────────────────────────────────────────
sentences = _split_sentences(article_text)[:MAX_SENTENCES]
if not sentences:
return {
"external_pattern": external_pattern,
"extra_claims": [],
"corroborated_claims": [],
"deviation_score": 0.0,
"verdict": "FOLLOWS PATTERN",
"reliable_source_count": reliable_count,
"all_source_titles": all_titles,
}
# Collect unique source titles for per-title comparison
source_titles_list = []
for r in web_results + db_results:
t = r.get("title", "").strip()
if t:
source_titles_list.append(t)
# Deduplicate while preserving order
seen_keys: set = set()
unique_titles: List[str] = []
for t in source_titles_list:
key = t.lower()[:60]
if key not in seen_keys:
seen_keys.add(key)
unique_titles.append(t)
# ── Embed sentences + individual source titles in one batched call ──────
# Comparing each sentence to EACH title individually (not a concatenated
# blob) prevents identical headlines from being diluted and falsely flagged
# as "extra claims" β€” the original bug reported in user testing.
try:
minilm = _get_minilm()
all_texts = sentences + unique_titles
embeddings = minilm.encode(all_texts, batch_size=64, show_progress_bar=False)
sentence_embeddings = embeddings[:len(sentences)]
title_embeddings = embeddings[len(sentences):]
except Exception:
# Graceful degradation β€” can't run MiniLM, return neutral result
return {
"external_pattern": external_pattern,
"extra_claims": [],
"corroborated_claims": list(sentences),
"deviation_score": 0.0,
"verdict": "FOLLOWS PATTERN",
"reliable_source_count": reliable_count,
"all_source_titles": all_titles,
}
# ── Classify each sentence against individual titles ────────────────────
# A sentence is corroborated if it matches ANY single source title at
# >= PER_TITLE_SIMILARITY_THRESHOLD.
extra_claims: List[str] = []
corroborated_claims: List[str] = []
for sent, s_emb in zip(sentences, sentence_embeddings):
if len(title_embeddings) > 0:
# Compute similarity to each individual title; take the maximum
max_sim = max((_cosine(s_emb, t_emb) for t_emb in title_embeddings), default=0.0)
corroborated = max_sim >= PER_TITLE_SIMILARITY_THRESHOLD
else:
corroborated = False
if corroborated:
corroborated_claims.append(sent)
else:
extra_claims.append(sent)
# ── Compute deviation score ─────────────────────────────────────────────
total = len(sentences)
deviation_score = round(len(extra_claims) / total, 3) if total > 0 else 0.0
# ── Determine verdict ───────────────────────────────────────────────────
if deviation_score >= DEVIATION_SIGNIFICANT_THRESHOLD:
verdict = "SIGNIFICANT DEVIATION"
elif deviation_score >= DEVIATION_MINOR_THRESHOLD:
verdict = "MINOR DEVIATION"
else:
verdict = "FOLLOWS PATTERN"
return {
"external_pattern": external_pattern,
"extra_claims": extra_claims,
"corroborated_claims": corroborated_claims,
"deviation_score": deviation_score,
"verdict": verdict,
"reliable_source_count": reliable_count,
"all_source_titles": all_titles,
}