Spaces:
Sleeping
Sleeping
| # misinfo_gradio_full.py | |
| import os | |
| import re | |
| import time | |
| import json | |
| import base64 | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| import requests | |
| import trafilatura | |
| import tldextract | |
| import gradio as gr | |
| from PIL import Image | |
| import pytesseract | |
| # ML lazy-load | |
| ZS_PIPE = None | |
| SENTE = None | |
| GEMINI_CLIENT = None | |
| # Load env | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| NEWSAPI_KEY = os.getenv("NEWSAPI_KEY") | |
| GNEWS_KEY = os.getenv("GNEWS_KEY") | |
| SERPAPI_KEY = os.getenv("SERPAPI_KEY") | |
| FACTCHECK_KEY = os.getenv("FACTCHECK_KEY") | |
| SAFE_BROWSING_KEY = os.getenv("SAFE_BROWSING_KEY") | |
| VIRUSTOTAL_KEY = os.getenv("VIRUSTOTAL_KEY") | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| # Logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("misinfo") | |
| # --- Helpers --- | |
| def safe_headers(): | |
| return {"User-Agent": "misinfo-gradio/1.0"} | |
| def extract_domain(url: str) -> Optional[str]: | |
| try: | |
| ext = tldextract.extract(url) | |
| if ext.registered_domain: | |
| return ext.registered_domain.lower() | |
| except Exception: | |
| pass | |
| return None | |
| TRUSTED_DOMAINS = { | |
| "bbc.co.uk","bbc.com","cnn.com","nytimes.com","reuters.com","apnews.com", | |
| "theguardian.com","npr.org","washingtonpost.com","wsj.com","usatoday.com", | |
| "bloomberg.com","aljazeera.com","msnbc.com","cnbc.com","foxnews.com", | |
| "scientificamerican.com","nature.com","sciencedaily.com","timesofindia.indiatimes.com","indiatimes.com" | |
| } | |
| BLACKLISTED_DOMAINS = {"example-bad-site.com"} # keep small; replace with curated list in prod | |
| # --- Model loaders --- | |
| def get_zs_pipe(): | |
| global ZS_PIPE | |
| if ZS_PIPE is None: | |
| try: | |
| from transformers import pipeline | |
| ZS_PIPE = pipeline("zero-shot-classification", model="typeform/distilbert-base-uncased-mnli") | |
| except Exception as e: | |
| logger.warning("zero-shot pipeline load error: %s", e) | |
| ZS_PIPE = None | |
| return ZS_PIPE | |
| def get_sente_model(): | |
| global SENTE | |
| if SENTE is None: | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| SENTE = SentenceTransformer("all-MiniLM-L6-v2") | |
| except Exception as e: | |
| logger.warning("sentence-transformers load error: %s", e) | |
| SENTE = None | |
| return SENTE | |
| def get_gemini_client(): | |
| global GEMINI_CLIENT | |
| if GEMINI_CLIENT is None and GEMINI_API_KEY: | |
| try: | |
| from google import genai | |
| GEMINI_CLIENT = genai.Client(api_key=GEMINI_API_KEY) | |
| except Exception as e: | |
| logger.warning("gemini client init error: %s", e) | |
| GEMINI_CLIENT = None | |
| return GEMINI_CLIENT | |
| # --- Extraction --- | |
| def fetch_and_extract(url: str, max_chars: int = 4000) -> str: | |
| """Use trafilatura to fetch & extract main article text.""" | |
| if not url: | |
| return "" | |
| try: | |
| downloaded = trafilatura.fetch_url(url, headers=safe_headers(), timeout=12) | |
| if not downloaded: | |
| return "" | |
| text = trafilatura.extract(downloaded, include_comments=False, include_tables=False) | |
| if not text: | |
| return "" | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text[:max_chars] | |
| except Exception as e: | |
| logger.warning("fetch_and_extract error: %s", e) | |
| return "" | |
| def ocr_image_to_text(img: Image.Image, max_chars=4000) -> str: | |
| try: | |
| text = pytesseract.image_to_string(img) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text[:max_chars] | |
| except Exception as e: | |
| logger.warning("OCR error: %s", e) | |
| return "" | |
| # --- News / evidence fetching --- | |
| def fetch_newsapi(query: str, max_results: int = 6) -> List[Dict[str,str]]: | |
| if not NEWSAPI_KEY: | |
| return [] | |
| try: | |
| url = "https://newsapi.org/v2/everything" | |
| params = {"q": query, "pageSize": max_results, "apiKey": NEWSAPI_KEY, "language": "en", "sortBy": "relevancy"} | |
| r = requests.get(url, params=params, headers=safe_headers(), timeout=8) | |
| r.raise_for_status() | |
| js = r.json() | |
| articles = [] | |
| for a in js.get("articles", [])[:max_results]: | |
| articles.append({"title": a.get("title"), "url": a.get("url"), "source": a.get("source",{}).get("name"), "snippet": a.get("description") or a.get("content") or ""}) | |
| return articles | |
| except Exception as e: | |
| logger.warning("NewsAPI error: %s", e) | |
| return [] | |
| def fetch_gnews(query: str, max_results: int = 6) -> List[Dict[str,str]]: | |
| if not GNEWS_KEY: | |
| return [] | |
| try: | |
| url = "https://gnews.io/api/v4/search" | |
| params = {"q": query, "token": GNEWS_KEY, "max": max_results, "lang": "en"} | |
| r = requests.get(url, params=params, headers=safe_headers(), timeout=8) | |
| r.raise_for_status() | |
| js = r.json() | |
| return [{"title": a.get("title"), "url": a.get("url"), "source": a.get("source",{}).get("name"), "snippet": a.get("description") or ""} for a in js.get("articles", [])[:max_results]] | |
| except Exception as e: | |
| logger.warning("GNews error: %s", e) | |
| return [] | |
| def fetch_serpapi(query: str, max_results: int = 6) -> List[Dict[str,str]]: | |
| if not SERPAPI_KEY: | |
| return [] | |
| try: | |
| url = "https://serpapi.com/search.json" | |
| params = {"q": query, "api_key": SERPAPI_KEY, "num": max_results, "engine": "google"} | |
| r = requests.get(url, params=params, headers=safe_headers(), timeout=8) | |
| r.raise_for_status() | |
| js = r.json() | |
| results = [] | |
| for item in js.get("organic_results", [])[:max_results]: | |
| results.append({"title": item.get("title"), "url": item.get("link"), "source": item.get("source") or item.get("displayed_link"), "snippet": item.get("snippet") or ""}) | |
| return results | |
| except Exception as e: | |
| logger.warning("SerpApi error: %s", e) | |
| return [] | |
| def gather_news_evidence(query: str, max_results=6) -> List[Dict[str,str]]: | |
| items = [] | |
| items.extend(fetch_newsapi(query, max_results)) | |
| items.extend(fetch_gnews(query, max_results)) | |
| items.extend(fetch_serpapi(query, max_results)) | |
| # dedupe by url | |
| seen = set() | |
| dedup = [] | |
| for it in items: | |
| url = it.get("url") | |
| if not url or url in seen: | |
| continue | |
| seen.add(url) | |
| dedup.append(it) | |
| return dedup[:max_results] | |
| # --- Fact-check (Google Fact Check Tools) --- | |
| def factcheck_claim(claim: str) -> Dict[str,Any]: | |
| if not FACTCHECK_KEY: | |
| return {"outcome": "api_key_missing", "source": []} | |
| try: | |
| url = "https://factchecktools.googleapis.com/v1alpha1/claims:search" | |
| params = {"query": claim, "key": FACTCHECK_KEY, "languageCode": "en", "pageSize": 5} | |
| r = requests.get(url, params=params, headers=safe_headers(), timeout=8) | |
| r.raise_for_status() | |
| js = r.json() | |
| claims = js.get("claims", []) | |
| results = [] | |
| for c in claims: | |
| text = c.get("text") | |
| for review in c.get("claimReview", []): | |
| results.append({ | |
| "claimant": c.get("claimant"), | |
| "text": text, | |
| "publisher": review.get("publisher", {}).get("name"), | |
| "title": review.get("title"), | |
| "url": review.get("url"), | |
| "rating": review.get("textualRating") | |
| }) | |
| outcome = "unverified" if not results else results[0].get("rating", "unverified") | |
| return {"outcome": outcome, "source": results} | |
| except Exception as e: | |
| logger.warning("factcheck error: %s", e) | |
| return {"outcome": "error", "error": str(e), "source": []} | |
| # --- Safe Browsing (Google) --- | |
| def check_safe_browsing(url: str) -> Dict[str,Any]: | |
| if not SAFE_BROWSING_KEY: | |
| return {"status": "api_key_missing"} | |
| try: | |
| endpoint = f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={SAFE_BROWSING_KEY}" | |
| payload = { | |
| "client": {"clientId": "misinfo-gradio", "clientVersion": "1.0"}, | |
| "threatInfo": { | |
| "threatTypes": ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION"], | |
| "platformTypes": ["ANY_PLATFORM"], | |
| "threatEntryTypes": ["URL"], | |
| "threatEntries": [{"url": url}] | |
| } | |
| } | |
| r = requests.post(endpoint, json=payload, headers=safe_headers(), timeout=8) | |
| r.raise_for_status() | |
| js = r.json() | |
| return {"status": "ok", "matches": js.get("matches", [])} | |
| except Exception as e: | |
| logger.warning("safe browsing error: %s", e) | |
| return {"status": "error", "error": str(e)} | |
| # --- VirusTotal check (best-effort) --- | |
| def check_virustotal(url: str) -> Dict[str,Any]: | |
| if not VIRUSTOTAL_KEY: | |
| return {"status": "api_key_missing"} | |
| try: | |
| # Submit URL to /urls to get id | |
| submit = requests.post("https://www.virustotal.com/api/v3/urls", data={"url": url}, headers={"x-apikey": VIRUSTOTAL_KEY}, timeout=10) | |
| submit.raise_for_status() | |
| data = submit.json() | |
| url_id = data.get("data", {}).get("id") | |
| if not url_id: | |
| return {"status": "error", "error": "no_id"} | |
| # Get analysis/summary (v3 has endpoints /urls/{id}) | |
| r = requests.get(f"https://www.virustotal.com/api/v3/urls/{url_id}", headers={"x-apikey": VIRUSTOTAL_KEY}, timeout=10) | |
| r.raise_for_status() | |
| info = r.json() | |
| return {"status": "ok", "info": info} | |
| except Exception as e: | |
| logger.warning("virustotal error: %s", e) | |
| return {"status": "error", "error": str(e)} | |
| # --- Semantic evidence selection --- | |
| def select_relevant_sentences(claim: str, article_text: str, top_k: int = 5) -> List[str]: | |
| model = get_sente_model() | |
| if not model: | |
| # fallback: return first sentences | |
| sents = re.split(r'(?<=[.!?]) +', article_text) | |
| return [s.strip() for s in sents[:top_k] if s.strip()] | |
| # split into sentences and compute similarity | |
| sentences = [s.strip() for s in re.split(r'(?<=[.!?]) +', article_text) if s.strip()] | |
| if not sentences: | |
| return [] | |
| try: | |
| claim_emb = model.encode(claim, convert_to_tensor=True) | |
| sent_embs = model.encode(sentences, convert_to_tensor=True) | |
| import numpy as np | |
| from sentence_transformers import util | |
| sims = util.cos_sim(claim_emb, sent_embs)[0].cpu().numpy() | |
| idxs = list(np.argsort(-sims)[:top_k]) | |
| selected = [sentences[i] for i in idxs if i < len(sentences)] | |
| return selected | |
| except Exception as e: | |
| logger.warning("semantic selection error: %s", e) | |
| # fallback | |
| return sentences[:top_k] | |
| # --- Zero-shot classification (truth + content type) --- | |
| def zero_shot_classify(text: str) -> Dict[str,Any]: | |
| pipe = get_zs_pipe() | |
| res = {} | |
| if pipe: | |
| try: | |
| truth_labels = ["True", "False", "Misleading", "Unverifiable"] | |
| r1 = pipe(text, truth_labels, multi_label=False, truncation=True) | |
| res["truth_label"] = r1["labels"][0] | |
| res["truth_score"] = float(r1["scores"][0]) | |
| except Exception as e: | |
| logger.warning("zero-shot truth error: %s", e) | |
| res["truth_label"] = "Unknown"; res["truth_score"] = 0.0 | |
| try: | |
| type_labels = ["News","Opinion","Satire","Rumor"] | |
| r2 = pipe(text, type_labels, multi_label=False, truncation=True) | |
| res["content_type"] = r2["labels"][0] | |
| res["content_type_score"] = float(r2["scores"][0]) | |
| except Exception as e: | |
| logger.warning("zero-shot content type error: %s", e) | |
| res["content_type"] = "Unknown"; res["content_type_score"] = 0.0 | |
| else: | |
| res = {"truth_label":"Unknown","truth_score":0.0,"content_type":"Unknown","content_type_score":0.0} | |
| return res | |
| # --- Gemini deep verification --- | |
| def gemini_verify(claim: str, evidence: List[str], domain: Optional[str]) -> Dict[str,Any]: | |
| client = get_gemini_client() | |
| if not client: | |
| return {"outcome": "api_missing", "explanation": "Gemini API key not set or client failed", "raw": None} | |
| # structured prompt asking for JSON | |
| prompt = ( | |
| "You are an expert fact-checker. Given the claim and evidence, output valid JSON with keys:\n" | |
| "outcome (one of: True, False, Misleading, Unverifiable),\n" | |
| "confidence (0-1),\n" | |
| "explanation (short),\n" | |
| "takeaways (list of 1-3 short tips),\n" | |
| "sources (list of cited sources if any).\n\n" | |
| f"Claim: {claim}\n\n" | |
| f"Domain: {domain}\n\n" | |
| "Evidence:\n" + ("\n".join(f"- {e}" for e in evidence)) + "\n\n" | |
| "Provide only JSON in the response." | |
| ) | |
| try: | |
| resp = client.models.generate_content(model="gemini-2.5-flash", contents=prompt) | |
| text = resp.text | |
| # attempt to parse JSON substring | |
| try: | |
| parsed = json.loads(text) | |
| return {"outcome":"ok", "result": parsed, "raw": text} | |
| except Exception: | |
| # try to find first { ... } substring | |
| m = re.search(r'(\{.*\})', text, flags=re.S) | |
| if m: | |
| try: | |
| parsed = json.loads(m.group(1)) | |
| return {"outcome":"ok", "result": parsed, "raw": text} | |
| except Exception: | |
| return {"outcome":"parse_error", "raw": text} | |
| return {"outcome":"no_json", "raw": text} | |
| except Exception as e: | |
| logger.warning("gemini error: %s", e) | |
| return {"outcome":"error", "error": str(e)} | |
| # --- Fusion of signals into credibility score --- | |
| def fuse_signals(truth_score: float, domain: Optional[str], evidence_count: int, gemini_outcome: Optional[Dict[str,Any]]) -> Dict[str,Any]: | |
| # base from truth_score (0-1) | |
| base = truth_score | |
| # domain trust | |
| domain_factor = 1.0 | |
| if domain: | |
| if domain in TRUSTED_DOMAINS: | |
| domain_factor += 0.2 | |
| elif domain in BLACKLISTED_DOMAINS: | |
| domain_factor -= 0.4 | |
| else: | |
| domain_factor += 0.0 | |
| # evidence factor (cap to 1) | |
| evidence_factor = min(evidence_count / 5.0, 1.0) | |
| # gemini adjustment | |
| gemini_adj = 1.0 | |
| if gemini_outcome and gemini_outcome.get("result"): | |
| res = gemini_outcome["result"] | |
| out = res.get("outcome", "").lower() | |
| conf = float(res.get("confidence", 0.5)) if isinstance(res.get("confidence", 0.5), (float,int,str)) else 0.5 | |
| if out in ("false","misleading"): | |
| gemini_adj -= 0.25 * conf | |
| elif out == "true": | |
| gemini_adj += 0.1 * conf | |
| elif out == "unverifiable": | |
| gemini_adj -= 0.05 * conf | |
| # combine | |
| score = base * 0.5 + evidence_factor * 0.3 + (domain_factor - 1.0) * 0.2 | |
| score = score * gemini_adj | |
| score = max(0.0, min(1.0, score)) | |
| pct = int(round(score * 100)) | |
| color = "green" if pct >= 70 else "yellow" if pct >= 40 else "red" | |
| return {"score": pct, "color": color, "raw": score} | |
| # --- Main pipeline: single mode (run everything) --- | |
| def analyze_pipeline(article: Optional[str], url: Optional[str], image: Optional[Image.Image], claim_override: Optional[str], top_k_evidence: int = 5): | |
| # 1) choose text source | |
| source = None | |
| text = "" | |
| domain = None | |
| if article and article.strip(): | |
| source = "article" | |
| text = article.strip() | |
| elif url and url.strip(): | |
| source = "url" | |
| domain = extract_domain(url) | |
| text = fetch_and_extract(url) or "" | |
| elif image is not None: | |
| source = "image" | |
| text = ocr_image_to_text(image) or "" | |
| else: | |
| return {"error": "No input provided. Paste article text, or a URL, or upload image."} | |
| # limit text | |
| if len(text) > 4000: | |
| text = text[:4000] | |
| # claim to check: use explicit claim_override or try to use first sentence/headline | |
| claim = claim_override.strip() if claim_override and claim_override.strip() else (re.split(r'(?<=[.!?]) +', text.strip())[0] if text else "") | |
| # 2) quick zero-shot classification | |
| zs = zero_shot_classify(text if len(claim) < 30 else claim) # run on claim if short, else on text | |
| truth_label = zs.get("truth_label") | |
| truth_score = zs.get("truth_score", 0.0) | |
| content_type = zs.get("content_type") | |
| content_type_score = zs.get("content_type_score", 0.0) | |
| # 3) evidence: internal (from article) and external (news APIs) | |
| internal_evidence = select_relevant_sentences(claim or text, text, top_k=top_k_evidence) if text else [] | |
| # external news queries: search using claim or summary | |
| query = claim or (text[:200]) | |
| external_articles = gather_news_evidence(query, max_results=6) | |
| # filter to credible domains | |
| ext_filtered = [] | |
| for a in external_articles: | |
| dom = extract_domain(a.get("url") or "") | |
| a["domain"] = dom | |
| if dom and dom in TRUSTED_DOMAINS: | |
| ext_filtered.append(a) | |
| # 4) fact-check API | |
| fact = factcheck_claim(claim or text) | |
| # 5) safe browsing + virustotal only if URL input provided | |
| safe_browsing_res = check_safe_browsing(url) if url else {"status":"no_url"} | |
| virustotal_res = check_virustotal(url) if url else {"status":"no_url"} | |
| # 6) deep verify with Gemini (claim + internal+external evidence) | |
| evidence_for_gemini = internal_evidence[:top_k_evidence] + [ (a.get("title") or "") + " - " + (a.get("snippet") or "") for a in ext_filtered[:top_k_evidence] ] | |
| gemini_res = gemini_verify(claim or text, evidence_for_gemini, domain) | |
| # 7) fuse signals | |
| credibility = fuse_signals(truth_score, domain, len(internal_evidence) + len(ext_filtered), gemini_res) | |
| # 8) build outputs & tips | |
| tips = ( | |
| "- Check the source domain and author.\n" | |
| "- Cross-check the claim with multiple trusted outlets.\n" | |
| "- Look for official statements or peer-reviewed studies for scientific claims.\n" | |
| "- Be skeptical of sensational language and images without context." | |
| ) | |
| out = { | |
| "source": source, | |
| "domain": domain, | |
| "claim": claim, | |
| "text_snippet": text[:800], | |
| "quick_classification": {"truth_label": truth_label, "truth_score": truth_score, "content_type": content_type, "content_type_score": content_type_score}, | |
| "internal_evidence": internal_evidence, | |
| "external_evidence": ext_filtered, | |
| "factcheck": fact, | |
| "safe_browsing": safe_browsing_res, | |
| "virustotal": {"status": virustotal_res.get("status", "unknown"), "summary": (virustotal_res.get("info") or {}) if isinstance(virustotal_res, dict) else {}}, | |
| "gemini_verification": gemini_res, | |
| "credibility": credibility, | |
| "tips": tips | |
| } | |
| return out | |
| # --- Gradio UI --- | |
| def pretty_output(result: Dict[str,Any]): | |
| if not isinstance(result, dict): | |
| return str(result), "", "", "", "" | |
| if result.get("error"): | |
| return result["error"], "", "", "", "" | |
| # format sections | |
| header = f"Credibility score: {result['credibility']['score']} ({result['credibility']['color']})" | |
| quick = json.dumps(result.get("quick_classification", {}), indent=2) | |
| evidence = "" | |
| if result.get("internal_evidence"): | |
| evidence += "Internal evidence (from article):\n" + "\n".join(f"- {s}" for s in result["internal_evidence"]) + "\n\n" | |
| if result.get("external_evidence"): | |
| evidence += "External corroborating articles:\n" + "\n".join(f"- {a.get('title')} ({a.get('domain')}) — {a.get('url')}" for a in result["external_evidence"]) + "\n\n" | |
| fact = json.dumps(result.get("factcheck", {}), indent=2) | |
| gemini = result.get("gemini_verification", {}) | |
| gemini_text = json.dumps(gemini, indent=2) if gemini else "" | |
| tips = result.get("tips", "") | |
| return header, quick, evidence, fact, gemini_text + "\n\n" + tips | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🛡️ Unified Misinformation Detector (single mode)") | |
| gr.Markdown("Provide either Article text (preferred), or a URL, or upload an image (screenshot). Optionally add a short claim to check.") | |
| with gr.Row(): | |
| article_in = gr.Textbox(lines=6, label="Paste Article Text (preferred)") | |
| url_in = gr.Textbox(label="Article URL") | |
| image_in = gr.Image(type="pil", label="Upload Image (screenshot)") | |
| claim_in = gr.Textbox(lines=1, label="Optional short claim (override automatic claim extraction)") | |
| topk = gr.Slider(1, 8, value=5, step=1, label="Top-K evidence sentences") | |
| run_btn = gr.Button("Run Full Pipeline") | |
| out_header = gr.Textbox(label="Summary", interactive=False) | |
| out_quick = gr.Code(label="Quick classification (truth + content type)") | |
| out_evidence = gr.Textbox(label="Evidence & External articles", lines=12) | |
| out_factcheck = gr.Code(label="Fact-check API result") | |
| out_gemini = gr.Code(label="Gemini result + Tips") | |
| def run(article, url, image, claim_override, top_k): | |
| res = analyze_pipeline(article, url, image, claim_override, top_k_evidence=int(top_k)) | |
| return pretty_output(res) | |
| run_btn.click(run, inputs=[article_in, url_in, image_in, claim_in, topk], outputs=[out_header, out_quick, out_evidence, out_factcheck, out_gemini]) | |
| if __name__ == "__main__": | |
| demo.launch() | |