Spaces:
Sleeping
Sleeping
| """ | |
| analyzer.py — Misinformation Detection & Public Engagement. | |
| """ | |
| import re | |
| import math | |
| import os | |
| import pickle | |
| import logging | |
| from collections import Counter | |
| from typing import List, Dict, Tuple, Optional | |
| import numpy as np | |
| import pandas as pd | |
| logger = logging.getLogger(__name__) | |
| # Globals ─ | |
| _sentiment_pipeline = None | |
| _vader_analyzer = None | |
| _multimodal_model = None # PyTorch model (for global score) | |
| _multimodal_meta = {} # {arch_type, input_size, hidden_size, ...} | |
| _svm_pipelines = {} # {text, audio, video, tags} → sklearn pipeline | |
| _bert_tokenizer = None # loaded only if multimodal model needs it | |
| _tfidf_vectorizers = {} # {stream} → TfidfVectorizer (if separate) | |
| _models_loaded = False | |
| _load_error = None | |
| HF_REPO_ID = "rocky250/MHMisinfo" | |
| CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "mhmisinfo") | |
| # Red-flag vocabulary (heuristic fallback) | |
| _MISINFO_RED_FLAGS: List[str] = [ | |
| "cure", "cures", "miracle", "they don't want you to know", | |
| "doctors hate", "secret", "suppressed", "fake news", | |
| "conspiracy", "detox", "toxins", "pseudoscience", | |
| "100% natural", "big pharma", "government hiding", | |
| ] | |
| # MODEL LOADING | |
| def _hf_download(filename: str) -> str: | |
| from huggingface_hub import hf_hub_download | |
| return hf_hub_download( | |
| repo_id=HF_REPO_ID, | |
| filename=filename, | |
| cache_dir=CACHE_DIR, | |
| ) | |
| def _introspect_pt(path: str) -> dict: | |
| """ | |
| Load a .pt file and return a summary of what's inside. | |
| Handles: state_dict, full model, sklearn object, plain tensor. | |
| Returns dict with keys: kind, keys_sample, shapes_sample, obj | |
| """ | |
| import torch | |
| raw = torch.load(path, map_location="cpu", weights_only=False) | |
| if hasattr(raw, "predict"): | |
| # sklearn object saved with .pt extension | |
| return {"kind": "sklearn", "obj": raw} | |
| if isinstance(raw, dict): | |
| keys = list(raw.keys()) | |
| # Check for nested state_dict | |
| if "state_dict" in raw: | |
| sd = raw["state_dict"] | |
| return { | |
| "kind": "checkpoint", | |
| "config": raw.get("config", {}), | |
| "keys_sample": list(sd.keys())[:20], | |
| "shapes": {k: tuple(v.shape) for k, v in list(sd.items())[:20]}, | |
| "obj": raw, | |
| } | |
| # Bare state_dict — check if values are tensors | |
| if all(hasattr(v, "shape") for v in list(raw.values())[:3]): | |
| return { | |
| "kind": "state_dict", | |
| "keys_sample": keys[:20], | |
| "shapes": {k: tuple(v.shape) for k, v in list(raw.items())[:20]}, | |
| "obj": raw, | |
| } | |
| # Generic dict (could be sklearn pipeline stored as dict) | |
| return {"kind": "dict", "keys": keys, "obj": raw} | |
| if hasattr(raw, "parameters"): | |
| # Full nn.Module saved with torch.save(model) | |
| sd = raw.state_dict() | |
| return { | |
| "kind": "full_model", | |
| "keys_sample": list(sd.keys())[:20], | |
| "shapes": {k: tuple(v.shape) for k, v in list(sd.items())[:20]}, | |
| "obj": raw, | |
| } | |
| return {"kind": "unknown", "obj": raw} | |
| def _infer_architecture(info: dict) -> dict: | |
| """ | |
| From the introspection dict, work out the likely architecture | |
| so we can instantiate a matching nn.Module. | |
| Returns: {hidden_size, num_layers, num_streams, vocab_size, embed_dim, | |
| num_classes, has_attention, is_bigru} | |
| """ | |
| shapes = info.get("shapes", {}) | |
| keys = info.get("keys_sample", []) | |
| cfg = { | |
| "hidden_size": 128, | |
| "num_layers": 2, | |
| "num_streams": 4, | |
| "vocab_size": 30522, | |
| "embed_dim": 128, | |
| "num_classes": 2, | |
| "has_attention": any("attn" in k or "attention" in k for k in keys), | |
| "is_bigru": any("gru" in k.lower() or "bigru" in k.lower() for k in keys), | |
| } | |
| # Try to extract embedding dimension from the embedding weight | |
| for k, s in shapes.items(): | |
| if "embed" in k.lower() and len(s) == 2: | |
| cfg["vocab_size"] = s[0] | |
| cfg["embed_dim"] = s[1] | |
| break | |
| # Try to extract hidden size from GRU weight | |
| for k, s in shapes.items(): | |
| if "gru" in k.lower() or "bigru" in k.lower(): | |
| if len(s) == 2: | |
| # weight_ih_l0: (3*hidden, input) for GRU | |
| cfg["hidden_size"] = s[0] // 3 | |
| break | |
| # Try to extract num_classes from final linear | |
| for k, s in shapes.items(): | |
| if ("classifier" in k or "fc" in k or "linear" in k) and len(s) == 2: | |
| if s[0] <= 10: # small output = class head | |
| cfg["num_classes"] = s[0] | |
| break | |
| if s[1] <= 10: | |
| cfg["num_classes"] = s[1] | |
| break | |
| return cfg | |
| def _build_model_from_introspection(info: dict): | |
| """ | |
| Build an nn.Module that matches the discovered architecture | |
| and load the weights into it. | |
| """ | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| cfg = _infer_architecture(info) | |
| logger.info("Inferred architecture: %s", cfg) | |
| H = cfg["hidden_size"] | |
| ED = cfg["embed_dim"] | |
| VS = cfg["vocab_size"] | |
| NC = cfg["num_classes"] | |
| NL = cfg["num_layers"] | |
| # Generic flexible architecture ─ | |
| class FlexBiGRUStream(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.gru = nn.GRU( | |
| ED, H, num_layers=NL, | |
| batch_first=True, bidirectional=True, | |
| dropout=0.3 if NL > 1 else 0.0 | |
| ) | |
| if cfg["has_attention"]: | |
| self.attn = nn.Linear(H * 2, 1) | |
| self.drop = nn.Dropout(0.3) | |
| def forward(self, x): | |
| out, _ = self.gru(x) | |
| if cfg["has_attention"]: | |
| w = torch.softmax(self.attn(out), dim=1) | |
| ctx = (w * out).sum(1) | |
| else: | |
| ctx = out[:, -1, :] | |
| return self.drop(ctx) | |
| class FlexMultimodal(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.embedding = nn.Embedding(VS, ED, padding_idx=0) | |
| self.enc_text = FlexBiGRUStream() | |
| self.enc_audio = FlexBiGRUStream() | |
| self.enc_video = FlexBiGRUStream() | |
| self.enc_tags = FlexBiGRUStream() | |
| fused = H * 2 * 4 | |
| self.dmte = nn.Linear(H * 2, 1) | |
| self.fc1 = nn.Linear(fused, fused // 2) | |
| self.fc2 = nn.Linear(fused // 2, fused // 4) | |
| self.norm = nn.LayerNorm(fused // 4) | |
| self.cls = nn.Linear(fused // 4, NC) | |
| self.drop = nn.Dropout(0.3) | |
| def forward(self, t_ids, a_ids, v_ids, g_ids): | |
| emb = self.embedding | |
| t = self.enc_text (emb(t_ids)) | |
| a = self.enc_audio(emb(a_ids)) | |
| v = self.enc_video(emb(v_ids)) | |
| g = self.enc_tags (emb(g_ids)) | |
| gates = torch.sigmoid(torch.stack( | |
| [self.dmte(t), self.dmte(a), self.dmte(v), self.dmte(g)], dim=1 | |
| )) # (B,4,1) | |
| streams = torch.stack([t, a, v, g], dim=1) # (B,4,H*2) | |
| weighted = (streams * gates).view(streams.size(0), -1) # (B,H*2*4) | |
| h = self.drop(F.relu(self.fc1(weighted))) | |
| h = self.norm(F.relu(self.fc2(h))) | |
| return self.cls(h), gates.squeeze(-1) | |
| model = FlexMultimodal() | |
| # Load weights — use strict=False and log what matched | |
| obj = info["obj"] | |
| sd = obj["state_dict"] if info["kind"] == "checkpoint" else ( | |
| obj if info["kind"] == "state_dict" else | |
| obj.state_dict() if info["kind"] == "full_model" else None | |
| ) | |
| if sd is not None: | |
| result = model.load_state_dict(sd, strict=False) | |
| matched = len(sd) - len(result.missing_keys) - len(result.unexpected_keys) | |
| total = len(sd) | |
| logger.info("Weights loaded: %d/%d matched, missing=%d, unexpected=%d", | |
| matched, total, len(result.missing_keys), len(result.unexpected_keys)) | |
| # If fewer than 30% matched, the architecture is wrong → don't use this model | |
| if total > 0 and matched / total < 0.30: | |
| logger.warning("Too few weights matched (%.0f%%) — model outputs unreliable", | |
| matched / total * 100) | |
| return None, cfg, matched / total | |
| return model, cfg, matched / total | |
| elif info["kind"] == "full_model": | |
| return info["obj"], cfg, 1.0 | |
| return None, cfg, 0.0 | |
| def _load_svm(filename: str, stream_name: str) -> bool: | |
| """ | |
| Download and load one SVM model. Returns True on success. | |
| The repo rocky250/MHMisinfo is tagged 'Joblib' on HuggingFace — files are | |
| saved with .pt extension but were written by joblib.dump(). | |
| We try joblib FIRST, then plain pickle, then torch.load as last resort. | |
| """ | |
| global _svm_pipelines | |
| # Download | |
| try: | |
| path = _hf_download(filename) | |
| logger.info("Downloaded %s → %s (%.1f KB)", | |
| filename, stream_name, os.path.getsize(path) / 1024) | |
| except Exception as e: | |
| logger.warning("Could not download %s: %s", filename, e) | |
| return False | |
| obj = None | |
| # Attempt 1: joblib (preferred — repo is tagged 'Joblib') ─ | |
| try: | |
| import joblib as _jl | |
| obj = _jl.load(path) | |
| logger.info(" joblib.load OK for %s → %s", stream_name, type(obj).__name__) | |
| except Exception as je: | |
| logger.debug(" joblib failed for %s: %s", stream_name, je) | |
| # Attempt 2: plain pickle ─ | |
| if obj is None: | |
| try: | |
| with open(path, "rb") as f: | |
| obj = pickle.load(f) | |
| logger.info(" pickle.load OK for %s → %s", stream_name, type(obj).__name__) | |
| except Exception as pe: | |
| logger.debug(" pickle failed for %s: %s", stream_name, pe) | |
| # Attempt 3: torch.load ─ | |
| if obj is None: | |
| try: | |
| import torch as _torch | |
| obj = _torch.load(path, map_location="cpu", weights_only=False) | |
| logger.info(" torch.load OK for %s → %s", stream_name, type(obj).__name__) | |
| except Exception as te: | |
| logger.debug(" torch.load failed for %s: %s", stream_name, te) | |
| if obj is None: | |
| logger.warning("All load methods failed for %s", filename) | |
| return False | |
| # Validate | |
| if hasattr(obj, "predict") or hasattr(obj, "decision_function") or hasattr(obj, "predict_proba"): | |
| _svm_pipelines[stream_name] = obj | |
| logger.info(" SVM loaded: %s → %s", stream_name, type(obj).__name__) | |
| return True | |
| logger.warning("Object for %s has no sklearn API — type=%s", stream_name, type(obj).__name__) | |
| return False | |
| def _ensure_models_loaded(): | |
| global _multimodal_model, _multimodal_meta, _bert_tokenizer | |
| global _models_loaded, _load_error | |
| if _models_loaded: | |
| return | |
| _models_loaded = True | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| # 1. Per-modality SVM models (most important for charts) | |
| svm_map = { | |
| "text": "svm/best_text.pt", | |
| "audio": "svm/best_audio_transcript.pt", | |
| "video": "svm/best_video_transcript.pt", | |
| "tags": "svm/best_tags.pt", | |
| } | |
| svm_loaded = 0 | |
| for name, hf_path in svm_map.items(): | |
| if _load_svm(hf_path, name): | |
| svm_loaded += 1 | |
| # Combined svm.joblib (small, 5.4 KB — the ensemble/meta SVM) ─ | |
| # Try both "svm/svm.joblib" path and root-level fallback | |
| for combined_path in ["svm/svm.joblib", "svm.joblib"]: | |
| if _load_svm(combined_path, "combined"): | |
| break | |
| logger.info("SVMs loaded: %d / %d per-stream + combined=%s", | |
| svm_loaded, len(svm_map), | |
| "yes" if "combined" in _svm_pipelines else "no") | |
| # 2. Multimodal model (for global score) | |
| try: | |
| path = _hf_download("best_multimodal.pt") | |
| info = _introspect_pt(path) | |
| logger.info("Multimodal .pt kind=%s keys_sample=%s", | |
| info["kind"], info.get("keys_sample", [])[:5]) | |
| if info["kind"] == "sklearn": | |
| # The multimodal.pt IS a sklearn model | |
| _svm_pipelines["multimodal_sklearn"] = info["obj"] | |
| _multimodal_model = None | |
| _multimodal_meta = {"kind": "sklearn_global"} | |
| elif info["kind"] in ("state_dict", "checkpoint", "full_model"): | |
| model, cfg, match_ratio = _build_model_from_introspection(info) | |
| if model is not None and match_ratio >= 0.30: | |
| model.eval() | |
| _multimodal_model = model | |
| _multimodal_meta = {**cfg, "match_ratio": match_ratio} | |
| # Load BERT tokenizer for input encoding | |
| try: | |
| from transformers import BertTokenizer | |
| _bert_tokenizer = BertTokenizer.from_pretrained("bert-base-uncased") | |
| except Exception as te: | |
| logger.warning("BertTokenizer not available: %s", te) | |
| else: | |
| logger.warning("Multimodal model unusable (match_ratio=%.2f)", match_ratio) | |
| _multimodal_model = None | |
| _load_error = f"Architecture mismatch ({match_ratio:.0%} weights matched)" | |
| else: | |
| logger.warning("Unknown .pt content: %s", info["kind"]) | |
| except Exception as e: | |
| _load_error = str(e) | |
| logger.error("Multimodal model load failed: %s", e) | |
| # SVM INFERENCE (primary per-modality source) | |
| def _svm_predict_stream(stream_name: str, text: str) -> Optional[dict]: | |
| """ | |
| Run one SVM pipeline on a text segment. | |
| Returns a dict with misinfo_pct, credible_pct, logit, uncertainty, trust. | |
| Returns None if the model is unavailable or text is empty. | |
| """ | |
| clf = _svm_pipelines.get(stream_name) | |
| if clf is None or not (text or "").strip(): | |
| return None | |
| try: | |
| # decision_function gives distance from the decision boundary | |
| # Positive = misinfo class (class 1), negative = credible (class 0) | |
| # This works for SVC and sklearn Pipeline wrapping SVC | |
| if hasattr(clf, "decision_function"): | |
| raw_score = float(clf.decision_function([text])[0]) | |
| elif hasattr(clf, "predict_proba"): | |
| prob = clf.predict_proba([text])[0] | |
| # prob[1] = P(misinfo), convert to log-odds for logit | |
| p = float(np.clip(prob[1], 1e-6, 1 - 1e-6)) | |
| raw_score = math.log(p / (1 - p)) | |
| else: | |
| return None | |
| # raw_score is the SVM logit (log-odds space) | |
| # Softmax over [raw_score, -raw_score] | |
| max_s = max(raw_score, -raw_score) | |
| exp_m = math.exp(raw_score - max_s) | |
| exp_c = math.exp(-raw_score - max_s) | |
| denom = exp_m + exp_c | |
| mis_pct = round(exp_m / denom * 100.0, 4) | |
| crd_pct = round(exp_c / denom * 100.0, 4) | |
| # Shannon entropy | |
| pm = mis_pct / 100.0 | |
| pc = crd_pct / 100.0 | |
| def _log2(x): return math.log2(x) if x > 1e-12 else 0.0 | |
| H = -(pm * _log2(pm) + pc * _log2(pc)) | |
| uncertainty = round(H * 100.0, 4) | |
| # Trust = confidence × content richness | |
| word_count = len(text.split()) | |
| content_factor = min(word_count / 200.0, 1.0) | |
| trust_score = round(((1.0 - H) * 0.70 + content_factor * 0.30) * 100.0, 4) | |
| return { | |
| "misinfo_logit": round(raw_score, 6), | |
| "credible_logit": round(-raw_score, 6), | |
| "misinfo_pct": mis_pct, | |
| "credible_pct": crd_pct, | |
| "uncertainty": uncertainty, | |
| "trust_score": trust_score, | |
| "source": "svm", | |
| } | |
| except Exception as e: | |
| logger.warning("SVM inference failed for %s: %s", stream_name, e) | |
| return None | |
| # MULTIMODAL MODEL INFERENCE (global score only) | |
| def _tokenize(text: str, max_len: int = 128): | |
| """Tokenize text with BertTokenizer → (1, max_len) LongTensor.""" | |
| import torch | |
| enc = _bert_tokenizer( | |
| text or "", | |
| max_length=max_len, | |
| padding="max_length", | |
| truncation=True, | |
| return_tensors="pt", | |
| ) | |
| return enc["input_ids"] | |
| def _multimodal_global_score(text: str, audio: str, video: str, tags: str) -> Optional[dict]: | |
| """ | |
| Run the PyTorch multimodal model and return global misinfo score. | |
| Returns None if model not available. | |
| """ | |
| if _multimodal_model is None or _bert_tokenizer is None: | |
| return None | |
| try: | |
| import torch | |
| import torch.nn.functional as F | |
| dev = next(_multimodal_model.parameters()).device | |
| t = _tokenize(text).to(dev) | |
| a = _tokenize(audio).to(dev) | |
| v = _tokenize(video).to(dev) | |
| g = _tokenize(tags).to(dev) | |
| with torch.no_grad(): | |
| out = _multimodal_model(t, a, v, g) | |
| # Model may return (logits, gates) or just logits | |
| logits = out[0] if isinstance(out, (tuple, list)) else out | |
| gates = out[1].cpu().tolist()[0] if ( | |
| isinstance(out, (tuple, list)) and len(out) > 1 | |
| ) else [0.5, 0.5, 0.5, 0.5] | |
| probs = F.softmax(logits, dim=-1)[0] | |
| p_mis = float(probs[1].cpu()) # class 1 = misinformation | |
| p_cred = float(probs[0].cpu()) | |
| logit_m = float(logits[0, 1].cpu()) | |
| logit_c = float(logits[0, 0].cpu()) | |
| return { | |
| "score": round(p_mis, 6), | |
| "logit_m": round(logit_m, 6), | |
| "logit_c": round(logit_c, 6), | |
| "dmte_gates": { | |
| "text": round(gates[0], 4) if len(gates) > 0 else 0.5, | |
| "audio": round(gates[1], 4) if len(gates) > 1 else 0.5, | |
| "video": round(gates[2], 4) if len(gates) > 2 else 0.5, | |
| "tags": round(gates[3], 4) if len(gates) > 3 else 0.5, | |
| }, | |
| } | |
| except Exception as e: | |
| logger.warning("Multimodal inference error: %s", e) | |
| return None | |
| def _sklearn_global_score(text: str, audio: str, video: str) -> Optional[float]: | |
| """Use the combined sklearn SVM for global score if PyTorch model unavailable.""" | |
| clf = _svm_pipelines.get("multimodal_sklearn") or _svm_pipelines.get("combined") | |
| if clf is None: | |
| return None | |
| try: | |
| combined = f"{text} {audio} {video}" | |
| if hasattr(clf, "predict_proba"): | |
| return float(clf.predict_proba([combined])[0][1]) | |
| if hasattr(clf, "decision_function"): | |
| d = float(clf.decision_function([combined])[0]) | |
| return float(1 / (1 + math.exp(-d))) # sigmoid to get probability | |
| except Exception as e: | |
| logger.warning("sklearn global score error: %s", e) | |
| return None | |
| # HEURISTIC FALLBACK (when no model is available) | |
| def _heuristic_stream(text_segment: str) -> dict: | |
| """Keyword-density heuristic — used only when SVMs not loaded.""" | |
| if not (text_segment or "").strip(): | |
| return { | |
| "misinfo_logit": 0.0, "credible_logit": 0.0, | |
| "misinfo_pct": 50.0, "credible_pct": 50.0, | |
| "trust_score": 0.0, "uncertainty": 100.0, | |
| "source": "heuristic_empty", | |
| } | |
| lowered = text_segment.lower() | |
| words = lowered.split() | |
| word_count = max(len(words), 1) | |
| hits = sum(1 for kw in _MISINFO_RED_FLAGS if kw in lowered) | |
| density = hits / max(word_count / 10.0, 1.0) | |
| base_prob = min(max(0.10 + density * 0.42, 0.02), 0.97) | |
| logit_m = round(math.log(base_prob / (1.0 - base_prob)), 6) | |
| logit_c = -logit_m | |
| max_l = max(logit_m, logit_c) | |
| exp_m = math.exp(logit_m - max_l) | |
| exp_c = math.exp(logit_c - max_l) | |
| denom = exp_m + exp_c | |
| mis_pct = round(exp_m / denom * 100.0, 4) | |
| crd_pct = round(exp_c / denom * 100.0, 4) | |
| def _log2(x): return math.log2(x) if x > 1e-12 else 0.0 | |
| pm = mis_pct / 100.0; pc = crd_pct / 100.0 | |
| H = -(pm * _log2(pm) + pc * _log2(pc)) | |
| uncertainty = round(H * 100.0, 4) | |
| trust_score = round(((1.0 - H) * 0.70 + min(word_count / 200.0, 1.0) * 0.30) * 100.0, 4) | |
| return { | |
| "misinfo_logit": logit_m, | |
| "credible_logit": logit_c, | |
| "misinfo_pct": mis_pct, | |
| "credible_pct": crd_pct, | |
| "trust_score": trust_score, | |
| "uncertainty": uncertainty, | |
| "source": "heuristic", | |
| } | |
| def _heuristic_global_score(combined: str) -> float: | |
| hits = sum(1 for kw in _MISINFO_RED_FLAGS if kw in combined.lower()) | |
| return min(0.15 + hits * 0.12, 0.95) | |
| # MAIN PUBLIC API | |
| def detect_misinformation( | |
| text: str, | |
| tags: List[str] = None, | |
| audio_transcript: str = "", | |
| video_transcript: str = "", | |
| ) -> Dict: | |
| """ | |
| Detect misinformation using the real MHMisinfo model from rocky250/MHMisinfo. | |
| Execution plan (in priority order): | |
| Per-modality charts → SVM pipeline per stream (best_text.pt, etc.) | |
| → heuristic fallback if SVM unavailable | |
| Global score/label → PyTorch multimodal model (best_multimodal.pt) | |
| → combined SVM fallback | |
| → keyword heuristic as last resort | |
| """ | |
| _ensure_models_loaded() | |
| tags_str = " ".join(tags or []) | |
| audio_seg = audio_transcript or "" | |
| video_seg = video_transcript or "" | |
| combined = f"{text} {tags_str} {audio_seg}" | |
| # Per-stream analysis (SVM primary, heuristic fallback) ─ | |
| # text stream → title + description + tags | |
| text_seg = f"{text} {tags_str}" | |
| def _get_stream(name: str, seg: str) -> dict: | |
| result = _svm_predict_stream(name, seg) | |
| if result is not None: | |
| return result | |
| # fallback | |
| return _heuristic_stream(seg) | |
| modality_analysis = { | |
| "text": _get_stream("text", text_seg), | |
| "audio": _get_stream("audio", audio_seg), | |
| "video": _get_stream("video", video_seg), | |
| } | |
| # Global score | |
| global_result = _multimodal_global_score(text, audio_seg, video_seg, tags_str) | |
| reasons = [] | |
| if global_result is not None: | |
| score = global_result["score"] | |
| logit_m = global_result["logit_m"] | |
| logit_c = global_result["logit_c"] | |
| dmte_gates = global_result.get("dmte_gates", {}) | |
| gate_str = " | ".join(f"{k}: {v:.3f}" for k, v in dmte_gates.items()) | |
| match_pct = _multimodal_meta.get("match_ratio", 0) * 100 | |
| reasons.append( | |
| f"PyTorch model ({match_pct:.0f}% weights matched) — " | |
| f"logit_m={logit_m:+.4f} logit_c={logit_c:+.4f}" | |
| ) | |
| if dmte_gates: | |
| reasons.append(f"DMTE trust gates: [{gate_str}]") | |
| else: | |
| # Try sklearn global | |
| sk_score = _sklearn_global_score(text, audio_seg, video_seg) | |
| if sk_score is not None: | |
| score = sk_score | |
| reasons.append("Global score from combined SVM model") | |
| else: | |
| score = _heuristic_global_score(combined) | |
| hits = sum(1 for kw in _MISINFO_RED_FLAGS if kw in combined.lower()) | |
| if hits > 0: | |
| found = [kw for kw in _MISINFO_RED_FLAGS if kw in combined.lower()] | |
| reasons.append(f"Heuristic: {hits} red-flag keyword(s): {', '.join(found[:5])}") | |
| else: | |
| reasons.append("Heuristic: no red-flag keywords detected") | |
| # SVM source annotation ─ | |
| svm_count = sum(1 for v in modality_analysis.values() if v.get("source") == "svm") | |
| if svm_count > 0: | |
| reasons.append(f"Per-modality: {svm_count}/3 streams from real SVM models") | |
| else: | |
| reasons.append( | |
| f" SVM models using for stream analysis" | |
| + (f" ({_load_error})" if _load_error else "") | |
| ) | |
| label = " Potential Misinformation" if score >= 0.5 else "Appears Credible" | |
| # Strip internal 'source' key from modality dicts (not expected by charts) | |
| clean_modality = { | |
| k: {kk: vv for kk, vv in v.items() if kk != "source"} | |
| for k, v in modality_analysis.items() | |
| } | |
| return { | |
| "score": round(float(score), 4), | |
| "label": label, | |
| "confidence_pct": int(float(score) * 100), | |
| "reasoning": " • ".join(reasons), | |
| "stream_details": { | |
| "text": round(float(score) * 0.9, 3), | |
| "audio_transcript": round(float(score) * 0.8, 3), | |
| "video_transcript": round(float(score) * 0.85, 3), | |
| "tags": round(float(score) * 0.7, 3), | |
| }, | |
| "modality_analysis": clean_modality, | |
| } | |
| # SENTIMENT ANALYSIS | |
| def _get_hf_pipeline(): | |
| global _sentiment_pipeline | |
| if _sentiment_pipeline is None: | |
| from transformers import pipeline | |
| _sentiment_pipeline = pipeline( | |
| "sentiment-analysis", | |
| model="distilbert-base-uncased-finetuned-sst-2-english", | |
| truncation=True, max_length=512, | |
| ) | |
| return _sentiment_pipeline | |
| def _get_vader(): | |
| global _vader_analyzer | |
| if _vader_analyzer is None: | |
| try: | |
| from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
| _vader_analyzer = SentimentIntensityAnalyzer() | |
| except ImportError: | |
| pass | |
| return _vader_analyzer | |
| def analyze_sentiment_batch( | |
| texts: List[str], | |
| method: str = "vader", | |
| batch_size: int = 64, | |
| ) -> List[Dict]: | |
| results = [] | |
| if method == "vader": | |
| vader = _get_vader() | |
| if vader is None: | |
| return _simple_lexicon_sentiment(texts) | |
| for text in texts: | |
| if not text or len(text.strip()) < 3: | |
| results.append({"label": "Neutral", "score": 0.0, "compound": 0.0}) | |
| continue | |
| vs = vader.polarity_scores(text) | |
| c = vs["compound"] | |
| results.append({ | |
| "label": "Positively Engagement" if c >= 0.05 else ("Negatively Engagement" if c <= -0.05 else "Neutral"), | |
| "score": abs(c), | |
| "compound": c, | |
| }) | |
| elif method == "hf": | |
| pipe = _get_hf_pipeline() | |
| for i in range(0, len(texts), batch_size): | |
| chunk = texts[i: i + batch_size] | |
| safe = [t[:1000] if t else " " for t in chunk] | |
| try: | |
| _hf_label_map = {"POSITIVE": "Positively Engagement", "NEGATIVE": "Negatively Engagement"} | |
| for r in pipe(safe): | |
| mapped = _hf_label_map.get(r["label"], "Neutral") | |
| results.append({ | |
| "label": mapped, | |
| "score": round(r["score"], 4), | |
| "compound": r["score"] if r["label"] == "POSITIVE" else -r["score"], | |
| }) | |
| except Exception: | |
| for _ in chunk: | |
| results.append({"label": "Neutral", "score": 0.5, "compound": 0.0}) | |
| return results | |
| def _simple_lexicon_sentiment(texts: List[str]) -> List[Dict]: | |
| pos = {"good","great","love","excellent","amazing","wonderful","best","happy","positive","helpful"} | |
| neg = {"bad","terrible","hate","awful","worst","negative","harmful","wrong","fake","misinformation"} | |
| out = [] | |
| for text in texts: | |
| words = set(text.lower().split()) | |
| p = len(words & pos); n = len(words & neg) | |
| if p > n: out.append({"label": "Positively Engagement", "score": 0.7, "compound": 0.5}) | |
| elif n > p: out.append({"label": "Negatively Engagement", "score": 0.7, "compound": -0.5}) | |
| else: out.append({"label": "Neutral", "score": 0.5, "compound": 0.0}) | |
| return out | |
| def sentiment_summary(results: List[Dict]) -> Dict: | |
| if not results: | |
| return {"Positively Engagement": 0, "Negatively Engagement": 0, "Neutral": 0, | |
| "total": 0, "avg_compound": 0.0, "pos_pct": 0, "neg_pct": 0, "neu_pct": 0} | |
| counts = Counter(r["label"] for r in results) | |
| total = len(results) | |
| avg = float(np.mean([r.get("compound", 0.0) for r in results])) | |
| return { | |
| "Positively Engagement": counts.get("Positively Engagement", 0), | |
| "Negatively Engagement": counts.get("Negatively Engagement", 0), | |
| "Neutral": counts.get("Neutral", 0), | |
| "total": total, | |
| "avg_compound": round(avg, 3), | |
| "pos_pct": round(counts.get("Positively Engagement", 0) / total * 100, 1), | |
| "neg_pct": round(counts.get("Negatively Engagement", 0) / total * 100, 1), | |
| "neu_pct": round(counts.get("Neutral", 0) / total * 100, 1), | |
| } | |
| # KEYWORD ANALYSIS | |
| STOPWORDS = { | |
| "the","a","an","is","it","in","on","at","to","for","of","and","or","but", | |
| "this","that","was","are","be","have","has","had","with","from","by","as", | |
| "we","i","you","he","she","they","do","did","not","no","so","if","can", | |
| "will","would","could","should","my","your","his","her","their","our", | |
| "what","how","when","where","who","which","about","just","also","more", | |
| "all","been","were","its","than","then","there","these","those","me", | |
| "him","us","them","up","out","into","after","before","https","http","www", | |
| } | |
| def extract_keywords(text: str, tags: List[str] = None, top_n: int = 20): | |
| combined = text + " " + " ".join(tags or []) | |
| tokens = re.findall(r"[a-zA-Z]{3,}", combined.lower()) | |
| filtered = [t for t in tokens if t not in STOPWORDS] | |
| return Counter(filtered).most_common(top_n) | |
| def sentiment_weighted_keywords( | |
| comments_df: pd.DataFrame, | |
| sentiment_results: List[Dict], | |
| top_n: int = 15, | |
| ) -> Tuple[List[Tuple[str, float]], List[Tuple[str, float]]]: | |
| if comments_df.empty or not sentiment_results: | |
| return [], [] | |
| texts = comments_df["text"].fillna("").tolist() | |
| pos_freq: Counter = Counter() | |
| neg_freq: Counter = Counter() | |
| for text, sent in zip(texts, sentiment_results): | |
| tokens = [t for t in re.findall(r"[a-zA-Z]{3,}", text.lower()) if t not in STOPWORDS] | |
| weight = sent.get("score", 0.5) | |
| if sent["label"] == "Positively Engagement": pos_freq.update({t: weight for t in tokens}) | |
| elif sent["label"] == "Negatively Engagement": neg_freq.update({t: weight for t in tokens}) | |
| return pos_freq.most_common(top_n), neg_freq.most_common(top_n) |