""" Pattern Deviation Analyzer =========================== Implements the student journalist mental model for bias detection: 1. Build a "reputable pattern" from external news titles/snippets (Google News + local DB results from reliable sources). 2. Compare every article sentence against that pattern. 3. Sentences whose concepts are NOT present in ANY reputable source are surfaced as "extra claims" — possible bias signals. No new dependencies — uses the MiniLM model already loaded by the project (sentence_transformers) for semantic similarity. """ import re from typing import List, Dict, Any, Tuple # Reliable source identifiers (same list as check_article.py) RELIABLE_SOURCES = { "inquirer", "philstar", "manila bulletin", "abs-cbn", "cnn philippines", "gma", "rappler", "sunstar", "businessmirror", "pna", "philippine news agency", "bbc", "reuters", "ap", "associated press", "new york times", "the guardian", # Cebuano / Visayas regional outlets "superbalita", "cebu daily news", "sunstar cebu", "the freeman", "cebu pacific", "mb cebu", "cebu", "sugbo", "visayas", "mindanao daily", "sunstar davao", } # Thresholds # Per-title comparison: a sentence is corroborated if its similarity to ANY # individual source title meets this threshold. Lower than the concatenated # threshold because a direct title match is a precise corroboration signal. PER_TITLE_SIMILARITY_THRESHOLD = 0.60 # Concatenated pattern fallback (used when per-title comparison isn't possible) SENTENCE_SIMILARITY_THRESHOLD = 0.70 DEVIATION_MINOR_THRESHOLD = 0.30 # deviation_score >= this → MINOR DEVIATION DEVIATION_SIGNIFICANT_THRESHOLD = 0.65 # deviation_score >= this → SIGNIFICANT DEVIATION # Cap how many sentences we analyze (performance guard for long articles) MAX_SENTENCES = 20 def _is_reliable(source: str) -> bool: src = source.lower() return any(r in src for r in RELIABLE_SOURCES) def _split_sentences(text: str) -> List[str]: """Naively split text into sentences on . ! ? boundaries.""" raw = re.split(r"(?<=[.!?])\s+", text.strip()) # Filter very short fragments (less than 5 words) return [s.strip() for s in raw if len(s.split()) >= 5] def build_external_pattern(web_results: List[Dict], db_results: List[Dict]) -> str: """ Concatenate titles from reliable web and DB sources into a single 'pattern string' representing what reputable journalism says on this topic. Returns: str: Combined pattern text, or empty string if no reliable sources found. """ fragments = [] for r in web_results: source = r.get("source", "") title = r.get("title", "").strip() if title and _is_reliable(source): fragments.append(title) for r in db_results: source = r.get("source", "") title = r.get("title", "").strip() if title and _is_reliable(source): fragments.append(title) # Fall back to ALL sources (reliable or not) if none found if not fragments: for r in web_results + db_results: title = r.get("title", "").strip() if title: fragments.append(title) return " | ".join(fragments) def _get_minilm(): """Lazy import of MiniLM to avoid circular imports.""" from checker.internal.core import get_minilm_model return get_minilm_model() def _cosine(a, b) -> float: """Compute cosine similarity between two numpy vectors.""" import numpy as np norm_a = float(np.linalg.norm(a)) norm_b = float(np.linalg.norm(b)) if norm_a == 0 or norm_b == 0: return 0.0 return float(np.dot(a, b) / (norm_a * norm_b)) def compare_to_external_pattern( article_text: str, web_results: List[Dict], db_results: List[Dict] = None, ) -> Dict[str, Any]: """ Compare the article's sentences against individual external source titles. For each article sentence, compute the maximum cosine similarity against EACH individual source title embedding (not a single concatenated blob). A sentence is corroborated if it matches ANY single title at >= PER_TITLE_SIMILARITY_THRESHOLD. This fixes the false-positive 'extra claim' that occurred when a headline matched an external source title but got diluted by the concatenated embedding of all other titles. Args: article_text (str): The full article or claim to analyze. web_results (list): Google News results (dicts with 'title', 'source'). db_results (list): Local DB results (dicts with 'title', 'source'). Returns: dict with keys: - external_pattern (str): Summary of what reputable sources say. - extra_claims (list[str]): Sentences not found in external pattern. - corroborated_claims (list[str]): Sentences aligned with pattern. - deviation_score (float): 0-1 fraction of sentences that deviate. - verdict (str): "FOLLOWS PATTERN" | "MINOR DEVIATION" | "SIGNIFICANT DEVIATION" - reliable_source_count (int): Number of reliable sources used. - all_source_titles (list[str]): All titles from external sources. """ if db_results is None: db_results = [] # ── Build external pattern ────────────────────────────────────────────── external_pattern = build_external_pattern(web_results, db_results) # Collect all titles for display all_titles = [] for r in web_results + db_results: t = r.get("title", "").strip() if t: all_titles.append((t, r.get("source", ""))) reliable_count = sum(1 for r in web_results + db_results if _is_reliable(r.get("source", ""))) # If no external sources at all, can't compare if not external_pattern.strip(): return { "external_pattern": "", "extra_claims": [], "corroborated_claims": [], "deviation_score": 0.0, "verdict": "NO EXTERNAL SOURCES", "reliable_source_count": 0, "all_source_titles": [], } # ── Split article into sentences ──────────────────────────────────────── sentences = _split_sentences(article_text)[:MAX_SENTENCES] if not sentences: return { "external_pattern": external_pattern, "extra_claims": [], "corroborated_claims": [], "deviation_score": 0.0, "verdict": "FOLLOWS PATTERN", "reliable_source_count": reliable_count, "all_source_titles": all_titles, } # Collect unique source titles for per-title comparison source_titles_list = [] for r in web_results + db_results: t = r.get("title", "").strip() if t: source_titles_list.append(t) # Deduplicate while preserving order seen_keys: set = set() unique_titles: List[str] = [] for t in source_titles_list: key = t.lower()[:60] if key not in seen_keys: seen_keys.add(key) unique_titles.append(t) # ── Embed sentences + individual source titles in one batched call ────── # Comparing each sentence to EACH title individually (not a concatenated # blob) prevents identical headlines from being diluted and falsely flagged # as "extra claims" — the original bug reported in user testing. try: minilm = _get_minilm() all_texts = sentences + unique_titles embeddings = minilm.encode(all_texts, batch_size=64, show_progress_bar=False) sentence_embeddings = embeddings[:len(sentences)] title_embeddings = embeddings[len(sentences):] except Exception: # Graceful degradation — can't run MiniLM, return neutral result return { "external_pattern": external_pattern, "extra_claims": [], "corroborated_claims": list(sentences), "deviation_score": 0.0, "verdict": "FOLLOWS PATTERN", "reliable_source_count": reliable_count, "all_source_titles": all_titles, } # ── Classify each sentence against individual titles ──────────────────── # A sentence is corroborated if it matches ANY single source title at # >= PER_TITLE_SIMILARITY_THRESHOLD. extra_claims: List[str] = [] corroborated_claims: List[str] = [] for sent, s_emb in zip(sentences, sentence_embeddings): if len(title_embeddings) > 0: # Compute similarity to each individual title; take the maximum max_sim = max((_cosine(s_emb, t_emb) for t_emb in title_embeddings), default=0.0) corroborated = max_sim >= PER_TITLE_SIMILARITY_THRESHOLD else: corroborated = False if corroborated: corroborated_claims.append(sent) else: extra_claims.append(sent) # ── Compute deviation score ───────────────────────────────────────────── total = len(sentences) deviation_score = round(len(extra_claims) / total, 3) if total > 0 else 0.0 # ── Determine verdict ─────────────────────────────────────────────────── if deviation_score >= DEVIATION_SIGNIFICANT_THRESHOLD: verdict = "SIGNIFICANT DEVIATION" elif deviation_score >= DEVIATION_MINOR_THRESHOLD: verdict = "MINOR DEVIATION" else: verdict = "FOLLOWS PATTERN" return { "external_pattern": external_pattern, "extra_claims": extra_claims, "corroborated_claims": corroborated_claims, "deviation_score": deviation_score, "verdict": verdict, "reliable_source_count": reliable_count, "all_source_titles": all_titles, }