""" Stage 4 — Inference Engine (5-Signal Weighted Scoring) ===================================================== Evaluates articles across five independent signals: 1. Source Credibility (30%) 2. Claim Verification (30%) 3. Linguistic Analysis (20%) 4. Freshness (10%) 5. Ensemble Model Vote (10%) Then applies adversarial overrides and maps to a final verdict. """ import os import re import sys import yaml import logging import pickle import pandas as pd import numpy as np import torch from datetime import datetime, timezone _PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if str(_PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(_PROJECT_ROOT)) from src.utils.text_utils import clean_text, build_full_text, word_count as wc_func, text_length_bucket from src.stage2_preprocessing import KerasStyleTokenizer import sys setattr(sys.modules['__main__'], 'KerasStyleTokenizer', KerasStyleTokenizer) logger = logging.getLogger("stage4_inference") # ═════════════════════════════════════════════════════════════════════════════ # CONSTANTS # ═════════════════════════════════════════════════════════════════════════════ CREDIBLE_OUTLETS = { "reuters.com", "apnews.com", "bbc.com", "bbc.co.uk", "nytimes.com", "washingtonpost.com", "theguardian.com", "cnn.com", "cbsnews.com", "nbcnews.com", "abcnews.go.com", "npr.org", "pbs.org", "bloomberg.com", "wsj.com", "ft.com", "economist.com", "usatoday.com", "time.com", "politico.com", "thehill.com", "axios.com", "propublica.org", "snopes.com", "factcheck.org", "politifact.com", "fullfact.org", "aljazeera.com", "dw.com", "france24.com", "scmp.com", "theatlantic.com", "newyorker.com", "wired.com", "nature.com", "sciencemag.org", "thelancet.com", "bmj.com", "who.int", "un.org", "whitehouse.gov", "gov.uk", "europa.eu", "hindustantimes.com", "ndtv.com", "thehindu.com", "indianexpress.com", "timesofindia.indiatimes.com", "livemint.com", "abc.net.au", "cbc.ca", "globalnews.ca", "stuff.co.nz", "forbes.com", "businessinsider.com", "cnbc.com", "techcrunch.com", "arstechnica.com", "theverge.com", "engadget.com", "espn.com", "bbc.com/sport", "skysports.com", } CORROBORATION_OUTLETS_RE = re.compile( r"(?i)\b(Reuters|Associated Press|\bAP\b|CBS|BBC|NBC|CNN|" r"New York Times|NYT|Washington Post|The Guardian|NPR|PBS|" r"Bloomberg|Wall Street Journal|Forbes)\b" ) AUTHOR_PATTERNS = re.compile( r"(?i)\b(by|written by|reporter|staff writer|correspondent|" r"contributing writer|author|edited by|reported by)\b\s*[A-Z]" ) BYLINE_NAME_RE = re.compile(r"^[A-Z][a-z]+ [A-Z][a-z]+", re.MULTILINE) SUPERLATIVE_RE = re.compile( r"(?i)\b(shocking|massive|unprecedented|bombshell|explosive|" r"stunning|jaw-dropping|mind-blowing|unbelievable|outrageous)\b" ) SENSATIONAL_RE = re.compile( r"(?i)(you won't believe|what happened next|this is why|" r"one weird trick|exposed|destroyed|slammed)" ) NO_ATTRIB_RE = re.compile( r"(?i)(sources say|it is believed|reportedly|some people say|" r"many believe|rumor has it|anonymous source|unconfirmed reports)" ) PASSIVE_VOICE_RE = re.compile( r"(?i)(it is being said|it was reported|it has been claimed|" r"it is alleged|it was alleged|it is rumored)" ) QUOTE_RE = re.compile(r'"([^"]{10,})"') QUOTE_ATTRIB_RE = re.compile( r"(?i)(said|stated|according to|told|announced|confirmed|wrote|called|described|noted|added|explained|argued|claimed)" ) STAT_RE = re.compile(r"\d+\s*%|\d+\s*(million|billion|trillion)", re.IGNORECASE) CITATION_RE = re.compile( r"(?i)(according to|source:|study by|data from|published by|research by|" r"report by|survey by|analysis by|statistics from)" ) INSTITUTION_RE = re.compile( r"(?i)(university|department of|ministry|commission|institute|agency|" r"foundation|world health|WHO|FDA|CDC|NASA|UNICEF|IMF|World Bank)" ) TEMPORAL_RE = re.compile( r"(?i)(this week|this month|recently|new report|just released|" r"annual forecast|latest data|new study|breaking|today|yesterday)" ) class ModelNotTrainedError(Exception): def __init__(self, message="Run python run_pipeline.py --stage 3 first"): super().__init__(message) # ═════════════════════════════════════════════════════════════════════════════ # MODEL LOADING (unchanged from original) # ═════════════════════════════════════════════════════════════════════════════ _MODEL_CACHE = {} def load_config(): cfg_path = os.path.join(_PROJECT_ROOT, "config", "config.yaml") with open(cfg_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) def _get_model(model_name, cfg): """Lazy load models.""" if model_name in _MODEL_CACHE: return _MODEL_CACHE[model_name] models_dir = os.path.join(_PROJECT_ROOT, cfg.get("paths", {}).get("models_dir", "models/saved")) if model_name == "logistic": import joblib fpath = os.path.join(models_dir, "logistic_model", "logistic_model.pkl") if not os.path.exists(fpath): raise ModelNotTrainedError() _MODEL_CACHE[model_name] = joblib.load(fpath) elif model_name == "lstm": from src.models.lstm_model import BiLSTMClassifier, load_glove_embeddings, pad_sequences tok_path = os.path.join(models_dir, "tokenizer.pkl") if not os.path.exists(tok_path) or not os.path.exists(os.path.join(models_dir, "lstm_model", "model.pt")): raise ModelNotTrainedError() with open(tok_path, "rb") as f: tok = pickle.load(f) glove_path = os.path.join(_PROJECT_ROOT, cfg["paths"]["glove_path"]) emb_matrix, vocab_size = load_glove_embeddings(glove_path, tok.word_index) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = BiLSTMClassifier(vocab_size, emb_matrix).to(device) model.load_state_dict(torch.load(os.path.join(models_dir, "lstm_model", "model.pt"), map_location=device)) model.eval() _MODEL_CACHE[model_name] = (model, tok, device) elif model_name in ("distilbert", "roberta"): try: from transformers import AutoTokenizer, AutoModelForSequenceClassification except ImportError: raise ModelNotTrainedError() d_path = os.path.join(models_dir, f"{model_name}_model") if not os.path.exists(os.path.join(d_path, "config.json")): raise ModelNotTrainedError() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") tok = AutoTokenizer.from_pretrained(d_path) model = AutoModelForSequenceClassification.from_pretrained(d_path).to(device) model.eval() _MODEL_CACHE[model_name] = (model, tok, device) elif model_name == "meta": import joblib fpath = os.path.join(models_dir, "meta_classifier", "meta_classifier.pkl") if not os.path.exists(fpath): raise ModelNotTrainedError() _MODEL_CACHE[model_name] = joblib.load(fpath) return _MODEL_CACHE[model_name] # ═════════════════════════════════════════════════════════════════════════════ # FEATURE EXTRACTION # ═════════════════════════════════════════════════════════════════════════════ def extract_features(title, text, source_domain, published_date, cfg): """Build standardized structural mapping for raw strings.""" full = build_full_text(title, text) clean = clean_text(full) wc = wc_func(clean) bucket = text_length_bucket(wc) has_date = pd.notna(published_date) and published_date != "" if has_date and isinstance(published_date, str): try: published_date = pd.to_datetime(published_date, utc=True) except Exception: has_date = False published_date = None elif has_date: try: published_date = pd.Timestamp(published_date, tz="UTC") except Exception: has_date = False published_date = None return { "clean_text": clean, "full_text": full, "word_count": wc, "text_length_bucket": bucket, "has_date": has_date, "published_date": published_date, "source_domain": source_domain if source_domain else "unknown", } # ═════════════════════════════════════════════════════════════════════════════ # STEP 1 — SOURCE CREDIBILITY (weight: 30%) # ═════════════════════════════════════════════════════════════════════════════ def _levenshtein(s1, s2): """Minimal Levenshtein distance for typosquatting check.""" if len(s1) < len(s2): return _levenshtein(s2, s1) if len(s2) == 0: return len(s1) prev_row = range(len(s2) + 1) for i, c1 in enumerate(s1): curr_row = [i + 1] for j, c2 in enumerate(s2): curr_row.append(min(curr_row[j] + 1, prev_row[j + 1] + 1, prev_row[j] + (c1 != c2))) prev_row = curr_row return prev_row[-1] def score_source_credibility(source_domain, title, text): """ Step 1: Evaluate source trustworthiness. Returns: (score, author_found, typosquatting_detected) """ # ── Early return: no source at all ── if not source_domain or source_domain.strip() == "" or source_domain == "unknown": # Still check for author in text body author_found = bool(AUTHOR_PATTERNS.search(text[:500])) or bool(BYLINE_NAME_RE.search(text[:200])) return 0.3, author_found, False domain = source_domain.strip().lower() # ── Typosquatting check ── for outlet in CREDIBLE_OUTLETS: dist = _levenshtein(domain, outlet) if 0 < dist <= 2: # close but not exact return 0.0, False, True # ── Component scoring ── score = 0.0 # Base: any valid domain score += 0.20 # Known outlet if domain in CREDIBLE_OUTLETS: score += 0.40 # Author verifiability search_area = text[:500] author_found = bool(AUTHOR_PATTERNS.search(search_area)) or bool(BYLINE_NAME_RE.search(text[:200])) if author_found: score += 0.20 # Corroboration: text mentions other major outlets if CORROBORATION_OUTLETS_RE.search(text): score += 0.20 return min(1.0, score), author_found, False # ═════════════════════════════════════════════════════════════════════════════ # STEP 2 — CLAIM VERIFICATION (weight: 30%) # ═════════════════════════════════════════════════════════════════════════════ _SPACY_NLP = None def _get_spacy(): global _SPACY_NLP if _SPACY_NLP is None: import spacy try: _SPACY_NLP = spacy.load("en_core_web_sm") except OSError: import subprocess subprocess.run([sys.executable, "-m", "spacy", "download", "en_core_web_sm"], check=True) _SPACY_NLP = spacy.load("en_core_web_sm") return _SPACY_NLP def score_claim_verification(meta_proba, clean_text_str, title): """ Step 2: Entity-level claim verification. Returns: (claim_score, entities_found, n_verifiable, quotes_attributed, quotes_total) """ nlp = _get_spacy() # Process a capped version to avoid memory issues on long articles doc = nlp(clean_text_str[:5000]) # Sub-step A: Named Entity Extraction verifiable_types = {"PERSON", "ORG", "GPE"} numeric_types = {"MONEY", "PERCENT", "CARDINAL"} verifiable_ents = [ent.text for ent in doc.ents if ent.label_ in verifiable_types] numeric_ents = [ent for ent in doc.ents if ent.label_ in numeric_types] n_verifiable = len(set(verifiable_ents)) # Count unverifiable numeric claims (no citation within ±100 chars) n_unverifiable = 0 for ent in numeric_ents: start = max(0, ent.start_char - 100) end = min(len(clean_text_str), ent.end_char + 100) context = clean_text_str[start:end] if not CITATION_RE.search(context): n_unverifiable += 1 # Sub-step B: Quote Attribution quotes = QUOTE_RE.findall(clean_text_str[:5000]) quotes_total = len(quotes) quotes_attributed = 0 for q in quotes: q_pos = clean_text_str.find(q) if q_pos == -1: continue context_start = max(0, q_pos - 50) context_end = min(len(clean_text_str), q_pos + len(q) + 50) context = clean_text_str[context_start:context_end] if QUOTE_ATTRIB_RE.search(context): quotes_attributed += 1 attributed_ratio = (quotes_attributed / quotes_total) if quotes_total > 0 else 1.0 # Sub-step C: Combine entity_score = min(1.0, n_verifiable / 3) # 3+ verifiable entities = full marks unverifiable_penalty = min(0.15, n_unverifiable * 0.05) claim_score = (meta_proba * 0.60) + (entity_score * 0.25) + (attributed_ratio * 0.15) claim_score = max(0.0, min(1.0, claim_score - unverifiable_penalty)) entities_found = list(set(verifiable_ents))[:10] # Cap for JSON output return claim_score, entities_found, n_verifiable, quotes_attributed, quotes_total # ═════════════════════════════════════════════════════════════════════════════ # STEP 3 — LINGUISTIC ANALYSIS (weight: 20%) # ═════════════════════════════════════════════════════════════════════════════ def score_linguistic_quality(title, text, clean_text_str, author_found, cfg=None): """ Step 3: Rule-based linguistic quality scoring. Reuses DistilBERT for headline contradiction check. Returns: (linguistic_score, deductions_applied, headline_contradicts) """ score = 1.0 deductions = [] headline_contradicts = False title_str = str(title) if title else "" # ── 1. Sensationalist headline (-0.20) ── sensational = False if title_str: caps_words = re.findall(r"\b[A-Z]{4,}\b", title_str) if len(caps_words) >= 1: sensational = True if "!" in title_str: sensational = True if SENSATIONAL_RE.search(title_str): sensational = True if sensational: score -= 0.20 deductions.append("Sensationalist headline detected") # ── 2. Excessive superlatives (-0.15, needs ≥2 matches) ── superlative_matches = SUPERLATIVE_RE.findall(clean_text_str) if len(superlative_matches) >= 2: score -= 0.15 deductions.append(f"Excessive superlatives ({len(superlative_matches)} found)") # ── 3. No attribution (-0.15) ── if NO_ATTRIB_RE.search(clean_text_str): score -= 0.15 deductions.append("Anonymous/vague attribution patterns found") # ── 4. Headline contradicts body (-0.10) ── # Guard: only run if title looks like a real headline, not an auto-extracted body sentence is_real_headline = ( title_str and len(title_str) > 10 and len(title_str.split()) <= 15 and not title_str.lower().startswith(("it has", "it was", "it is", "there was", "there is")) and title_str.lower() not in str(text).lower()[:100] ) if is_real_headline: body_only = str(text)[:512] # Raw body text, NOT clean_text_str which has title prepended try: if "distilbert" in _MODEL_CACHE: model, tok, device = _MODEL_CACHE["distilbert"] with torch.no_grad(): t_enc = tok(title_str, return_tensors="pt", truncation=True, max_length=64, padding=True).to(device) b_enc = tok(body_only, return_tensors="pt", truncation=True, max_length=512, padding=True).to(device) t_hidden = model.distilbert(**t_enc).last_hidden_state[:, 0, :] # CLS token b_hidden = model.distilbert(**b_enc).last_hidden_state[:, 0, :] cos_sim = float(torch.nn.functional.cosine_similarity(t_hidden, b_hidden).item()) if cos_sim < 0.30: headline_contradicts = True score -= 0.10 deductions.append(f"Headline may contradict body (similarity={cos_sim:.2f})") except Exception as e: # Fallback: simple word overlap against body only title_words = set(title_str.lower().split()) body_words = set(body_only.lower().split()) overlap = len(title_words & body_words) / max(len(title_words), 1) if overlap < 0.15 and len(title_words) > 3: headline_contradicts = True score -= 0.10 deductions.append("Headline has very low word overlap with body") # ── 5. Internal contradictions (-0.10) ── # Heuristic: negation near repeated noun phrase sentences = re.split(r'[.!?]+', clean_text_str[:3000]) negation_re = re.compile(r"\b(not|no|never|false|deny|denied|incorrect|wrong)\b", re.IGNORECASE) noun_counts = {} contradiction_found = False for sent in sentences: words = sent.lower().split() # Track nouns (simple: capitalized words in original text) for w in words: if len(w) > 3: noun_counts[w] = noun_counts.get(w, 0) + 1 # Check if a repeated noun appears near negation if negation_re.search(sent): for w in words: if noun_counts.get(w, 0) >= 2 and len(w) > 4: contradiction_found = True break if contradiction_found: break if contradiction_found: score -= 0.10 deductions.append("Possible internal contradiction detected") # ── 6. Passive voice obscuring agency (-0.10) ── if PASSIVE_VOICE_RE.search(clean_text_str): score -= 0.10 deductions.append("Passive voice used to obscure agency") # ── 7. Missing byline (-0.05) ── if not author_found: score -= 0.05 deductions.append("No byline or author attribution found") score = max(0.0, score) return score, deductions, headline_contradicts # ═════════════════════════════════════════════════════════════════════════════ # STEP 4 — FRESHNESS (weight: 10%) # ═════════════════════════════════════════════════════════════════════════════ def score_freshness_v2(published_date, has_date, title, text): """ Step 4: Temporal freshness scoring. Case A: Date found → bracket-based scoring. Case B: No date → contextual signal scanning. Returns: (score, case, signals_found) """ if has_date and published_date is not None: # ── Case A ── now = datetime.now(timezone.utc) try: if getattr(published_date, 'tzinfo', None) is None: published_date = published_date.replace(tzinfo=timezone.utc) days_old = (now - published_date).days except Exception: # Fallback to Case B if date math fails return _freshness_case_b(title, text) if days_old < 0: days_old = 0 if days_old < 30: return 1.0, "A", [] elif days_old <= 180: return 0.75, "A", [] elif days_old <= 730: # 2 years return 0.5, "A", [] else: return 0.2, "A", [] else: return _freshness_case_b(title, text) def _freshness_case_b(title, text): """Case B: No date found — scan for contextual freshness signals.""" combined = str(title) + " " + str(text) signals = [] now = datetime.now() # Signal 1: Current year mentioned (dynamic) year_re = re.compile(r"\b(" + str(now.year) + r"|" + str(now.year - 1) + r")\b") if year_re.search(combined): signals.append(f"Current/recent year mentioned ({now.year} or {now.year-1})") # Signal 2: Temporal phrases if TEMPORAL_RE.search(combined): signals.append("Temporal freshness phrase detected") # Signal 3: Named institution if INSTITUTION_RE.search(combined): signals.append("Named institutional publisher found") # Signal 4: Major outlet corroboration if CORROBORATION_OUTLETS_RE.search(combined): signals.append("Major outlet corroboration cited") score_map = {4: 0.80, 3: 0.70, 2: 0.60, 1: 0.50, 0: 0.40} n = min(len(signals), 4) return score_map[n], "B", signals # ═════════════════════════════════════════════════════════════════════════════ # STEP 5 — MODEL VOTE (weight: 10%) # ═════════════════════════════════════════════════════════════════════════════ def score_model_vote(votes): """Step 5: Proportion of TRUE votes from the ensemble.""" if not votes: return 0.5 return sum(votes.values()) / len(votes) # ═════════════════════════════════════════════════════════════════════════════ # ADVERSARIAL OVERRIDE # ═════════════════════════════════════════════════════════════════════════════ def check_adversarial_flags(has_date, author_found, n_verifiable, headline_contradicts, typosquatting_detected, text): """ Post-scoring adversarial check. Any flag → cap final_score at 0.25. Returns: list of triggered flag names. """ flags = [] # Flag 1: Triple anonymity if not has_date and not author_found and n_verifiable == 0: flags.append("Triple anonymity (no date, no author, no named sources)") # Flag 2: Headline contradicts body if headline_contradicts: flags.append("Headline contradicts article body") # Flag 3: Typosquatting if typosquatting_detected: flags.append("Domain mimics a known outlet (typosquatting)") # Flag 4: Statistics without traceable source stats_found = STAT_RE.findall(text) if stats_found: # Check if any citation pattern exists in the text if not CITATION_RE.search(text): flags.append("Statistics cited with no traceable primary source") return flags # ═════════════════════════════════════════════════════════════════════════════ # REASON BUILDER # ═════════════════════════════════════════════════════════════════════════════ def build_reasons_and_missing(scores, n_verifiable, author_found, has_date, deductions, adversarial_flags): """ Programmatically generate top_reasons and missing_signals from scores. Returns: (reasons[:3], missing_signals) """ reasons = [] missing = [] # ── Negative signals ── if scores["source"] < 0.4: reasons.append("Source is unknown or not editorially accountable") if scores["claim"] < 0.5: reasons.append("Core claims could not be fully verified") if scores["linguistic"] < 0.7: reasons.append("Writing style shows signs of sensationalism or manipulation") if scores["freshness"] < 0.5: reasons.append("Article age or missing date reduces temporal reliability") if scores["model_vote"] < 0.5: reasons.append("AI models flagged patterns inconsistent with credible journalism") # ── Positive signals ── if scores["source"] >= 0.8: reasons.append("Article is from a known, credible outlet") if scores["claim"] >= 0.8: reasons.append("Core claims are well-attributed with verifiable entities") if scores["linguistic"] >= 0.9: reasons.append("Writing style is neutral and well-attributed") if scores["model_vote"] >= 0.75: reasons.append("AI models strongly agree this content is credible") # ── Adversarial flags ── for flag in adversarial_flags: reasons.append(f"Adversarial flag: {flag}") # ── Missing signals ── if not author_found: missing.append("Author identity could not be verified") if not has_date: missing.append("Publication date not found") if scores["source"] <= 0.3: missing.append("Source domain not recognized") if n_verifiable == 0: missing.append("No verifiable named entities found in text") return reasons[:3], missing # ═════════════════════════════════════════════════════════════════════════════ # MAIN INFERENCE INTERFACE # ═════════════════════════════════════════════════════════════════════════════ def predict_article(title, text, source_domain, published_date, mode="full", trigger_rag=True): """ 5-Signal weighted scoring inference. Execution order: 1. extract_features() 2. Run base models (LR/LSTM/DistilBERT/RoBERTa) → probas, votes 3. Run meta-classifier → meta_proba 4. Step 1: score_source_credibility() 5. Step 2: score_claim_verification() 6. Step 3: score_linguistic_quality() [needs author_found from Step 1] 7. Step 4: score_freshness_v2() 8. Step 5: score_model_vote() 9. Weighted final score + adversarial override + verdict """ cfg = load_config() feat = extract_features(title, text, source_domain, published_date, cfg) probas = { "lr_proba": np.nan, "lstm_proba": np.nan, "distilbert_proba": np.nan, "roberta_proba": np.nan, } votes = {} # ── Base Model Inference ────────────────────────────────────────────── # 1. Logistic Regression if mode in ("fast", "balanced", "full"): lr_pipe = _get_model("logistic", cfg) df_lr = pd.DataFrame([{ "clean_text": feat["clean_text"], "word_count": feat["word_count"], "text_length_bucket": feat["text_length_bucket"], "has_date": 1 if feat["has_date"] else 0, "freshness_score": 0.5, # neutral for model input "source_domain": feat["source_domain"], }]) try: p = float(lr_pipe.predict_proba(df_lr)[:, 1][0]) probas["lr_proba"] = p votes["logistic"] = int(p >= 0.5) except Exception as e: logger.warning(f"LR inference failed: {e}") # 2. Bi-LSTM if mode in ("balanced", "full"): lstm_model, tok, device = _get_model("lstm", cfg) maxlen = cfg.get("preprocessing", {}).get("lstm_max_len", 512) from src.models.lstm_model import pad_sequences seq = tok.texts_to_sequences([feat["clean_text"]]) pad = pad_sequences(seq, maxlen=maxlen, padding='post') t_pad = torch.from_numpy(pad).long().to(device) with torch.no_grad(): logits = lstm_model(t_pad) p = float(torch.sigmoid(logits).cpu().numpy()[0]) probas["lstm_proba"] = p votes["lstm"] = int(p >= 0.5) # 3. Transformers (DistilBERT + RoBERTa) if mode == "full": for t_name in ("distilbert", "roberta"): model, tok, device = _get_model(t_name, cfg) inputs = tok(feat["clean_text"], padding=True, truncation=True, max_length=512, return_tensors="pt").to(device) with torch.no_grad(): out = model(**inputs) p = float(torch.softmax(out.logits, dim=-1)[0, 1].item()) if t_name == "roberta": p = p * 0.92 # RoBERTa TRUE-bias dampening probas[t_name + "_proba"] = p votes[t_name] = int(p >= 0.5) # 4. Meta-Classifier meta_bundle = _get_model("meta", cfg) meta_preprocessor = meta_bundle["preprocessor"] meta_model = meta_bundle["model"] df_meta = pd.DataFrame([{ "lr_proba": probas["lr_proba"], "lstm_proba": probas["lstm_proba"], "distilbert_proba": probas["distilbert_proba"], "roberta_proba": probas["roberta_proba"], "word_count": feat["word_count"], "has_date": 1 if feat["has_date"] else 0, "freshness_score": 0.5, # neutral — freshness is now scored separately in Step 4 }]) df_cats = pd.DataFrame([{ "text_length_bucket": feat["text_length_bucket"], "source_domain": feat["source_domain"], }]) cat_feats = meta_preprocessor.transform(df_cats) X_meta = np.hstack((df_meta.values, cat_feats)) meta_proba = float(meta_model.predict_proba(X_meta)[:, 1][0]) # Short-text dampening (under 50 words) short_text = feat["word_count"] < 50 if short_text: meta_proba = 0.5 + (meta_proba - 0.5) * 0.6 # ── 5-Signal Scoring ───────────────────────────────────────────────── # Step 1: Source Credibility source_score, author_found, typosquat = score_source_credibility( feat["source_domain"], title, text ) # Step 2: Claim Verification claim_score, entities_found, n_verifiable, q_attr, q_total = score_claim_verification( meta_proba, feat["clean_text"], title ) # Step 3: Linguistic Analysis (depends on author_found from Step 1) ling_score, deductions, headline_contradicts = score_linguistic_quality( title, text, feat["clean_text"], author_found, cfg ) # Step 4: Freshness fresh_score, fresh_case, fresh_signals = score_freshness_v2( feat.get("published_date"), feat["has_date"], title, text ) # Step 5: Model Vote vote_score = score_model_vote(votes) # ── Final Weighted Score ────────────────────────────────────────────── scores = { "source": round(source_score, 4), "claim": round(claim_score, 4), "linguistic": round(ling_score, 4), "freshness": round(fresh_score, 4), "model_vote": round(vote_score, 4), } final_score = ( source_score * 0.30 + claim_score * 0.30 + ling_score * 0.20 + fresh_score * 0.10 + vote_score * 0.10 ) # ── Adversarial Override ────────────────────────────────────────────── adv_flags = check_adversarial_flags( feat["has_date"], author_found, n_verifiable, headline_contradicts, typosquat, feat["clean_text"] ) if adv_flags: final_score = min(final_score, 0.25) final_score = round(final_score, 4) # ── Verdict ─────────────────────────────────────────────────────────── if final_score >= 0.75: verdict = "TRUE" elif final_score >= 0.55: verdict = "UNCERTAIN" elif final_score >= 0.35: verdict = "LIKELY FALSE" else: verdict = "FALSE" # ── Reasons & Missing Signals ───────────────────────────────────────── top_reasons, missing_signals = build_reasons_and_missing( scores, n_verifiable, author_found, feat["has_date"], deductions, adv_flags ) # ── Confidence ──────────────────────────────────────────────────────── missing_count = len(missing_signals) if adv_flags or missing_count >= 3: confidence = "LOW" elif verdict == "UNCERTAIN" or missing_count in (1, 2): confidence = "MEDIUM" elif final_score >= 0.75 or final_score < 0.35: confidence = "HIGH" else: confidence = "MEDIUM" # ── Recommended Action + LOW Guard ──────────────────────────────────── action_map = { "TRUE": "Publish", "UNCERTAIN": "Flag for review", "LIKELY FALSE": "Suppress", "FALSE": "Escalate", } recommended_action = action_map[verdict] # Hard rule: LOW confidence → never "Publish" if confidence == "LOW" and recommended_action == "Publish": recommended_action = "Flag for review" # ── Return Full JSON ────────────────────────────────────────────────── return { "verdict": verdict, "final_score": final_score, "scores": scores, "freshness_case": fresh_case, "freshness_signals_found": fresh_signals, "adversarial_flags": adv_flags, "top_reasons": top_reasons, "missing_signals": missing_signals, "confidence": confidence, "recommended_action": recommended_action, "base_model_votes": votes, "base_model_probas": probas, "word_count": feat["word_count"], "short_text_warning": short_text, "deductions_applied": deductions, "entities_found": entities_found, "quotes_attributed": q_attr, "quotes_total": q_total, } if __name__ == "__main__": import json try: res = predict_article( "Breaking: AI solves P=NP", "The algorithm has shocked absolutely everyone across the earth entirely " "resolving everything overnight. Sources say it is unprecedented.", "techcrunch.com", datetime.now().isoformat(), mode="fast" ) print("Verdict Dict:") print(json.dumps(res, indent=2, default=str)) except ModelNotTrainedError as e: print("ERROR:", str(e))