lumen / app.py
ANISA09's picture
Update app.py
4a40214 verified
raw
history blame
21.3 kB
# misinfo_gradio_full.py
import os
import re
import time
import json
import base64
import logging
from typing import List, Dict, Any, Optional
import requests
import trafilatura
import tldextract
import gradio as gr
from PIL import Image
import pytesseract
# ML lazy-load
ZS_PIPE = None
SENTE = None
GEMINI_CLIENT = None
# Load env
from dotenv import load_dotenv
load_dotenv()
NEWSAPI_KEY = os.getenv("NEWSAPI_KEY")
GNEWS_KEY = os.getenv("GNEWS_KEY")
SERPAPI_KEY = os.getenv("SERPAPI_KEY")
FACTCHECK_KEY = os.getenv("FACTCHECK_KEY")
SAFE_BROWSING_KEY = os.getenv("SAFE_BROWSING_KEY")
VIRUSTOTAL_KEY = os.getenv("VIRUSTOTAL_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("misinfo")
# --- Helpers ---
def safe_headers():
return {"User-Agent": "misinfo-gradio/1.0"}
def extract_domain(url: str) -> Optional[str]:
try:
ext = tldextract.extract(url)
if ext.registered_domain:
return ext.registered_domain.lower()
except Exception:
pass
return None
TRUSTED_DOMAINS = {
"bbc.co.uk","bbc.com","cnn.com","nytimes.com","reuters.com","apnews.com",
"theguardian.com","npr.org","washingtonpost.com","wsj.com","usatoday.com",
"bloomberg.com","aljazeera.com","msnbc.com","cnbc.com","foxnews.com",
"scientificamerican.com","nature.com","sciencedaily.com","timesofindia.indiatimes.com","indiatimes.com"
}
BLACKLISTED_DOMAINS = {"example-bad-site.com"} # keep small; replace with curated list in prod
# --- Model loaders ---
def get_zs_pipe():
global ZS_PIPE
if ZS_PIPE is None:
try:
from transformers import pipeline
ZS_PIPE = pipeline("zero-shot-classification", model="typeform/distilbert-base-uncased-mnli")
except Exception as e:
logger.warning("zero-shot pipeline load error: %s", e)
ZS_PIPE = None
return ZS_PIPE
def get_sente_model():
global SENTE
if SENTE is None:
try:
from sentence_transformers import SentenceTransformer
SENTE = SentenceTransformer("all-MiniLM-L6-v2")
except Exception as e:
logger.warning("sentence-transformers load error: %s", e)
SENTE = None
return SENTE
def get_gemini_client():
global GEMINI_CLIENT
if GEMINI_CLIENT is None and GEMINI_API_KEY:
try:
from google import genai
GEMINI_CLIENT = genai.Client(api_key=GEMINI_API_KEY)
except Exception as e:
logger.warning("gemini client init error: %s", e)
GEMINI_CLIENT = None
return GEMINI_CLIENT
# --- Extraction ---
def fetch_and_extract(url: str, max_chars: int = 4000) -> str:
"""Use trafilatura to fetch & extract main article text."""
if not url:
return ""
try:
downloaded = trafilatura.fetch_url(url, headers=safe_headers(), timeout=12)
if not downloaded:
return ""
text = trafilatura.extract(downloaded, include_comments=False, include_tables=False)
if not text:
return ""
text = re.sub(r'\s+', ' ', text).strip()
return text[:max_chars]
except Exception as e:
logger.warning("fetch_and_extract error: %s", e)
return ""
def ocr_image_to_text(img: Image.Image, max_chars=4000) -> str:
try:
text = pytesseract.image_to_string(img)
text = re.sub(r'\s+', ' ', text).strip()
return text[:max_chars]
except Exception as e:
logger.warning("OCR error: %s", e)
return ""
# --- News / evidence fetching ---
def fetch_newsapi(query: str, max_results: int = 6) -> List[Dict[str,str]]:
if not NEWSAPI_KEY:
return []
try:
url = "https://newsapi.org/v2/everything"
params = {"q": query, "pageSize": max_results, "apiKey": NEWSAPI_KEY, "language": "en", "sortBy": "relevancy"}
r = requests.get(url, params=params, headers=safe_headers(), timeout=8)
r.raise_for_status()
js = r.json()
articles = []
for a in js.get("articles", [])[:max_results]:
articles.append({"title": a.get("title"), "url": a.get("url"), "source": a.get("source",{}).get("name"), "snippet": a.get("description") or a.get("content") or ""})
return articles
except Exception as e:
logger.warning("NewsAPI error: %s", e)
return []
def fetch_gnews(query: str, max_results: int = 6) -> List[Dict[str,str]]:
if not GNEWS_KEY:
return []
try:
url = "https://gnews.io/api/v4/search"
params = {"q": query, "token": GNEWS_KEY, "max": max_results, "lang": "en"}
r = requests.get(url, params=params, headers=safe_headers(), timeout=8)
r.raise_for_status()
js = r.json()
return [{"title": a.get("title"), "url": a.get("url"), "source": a.get("source",{}).get("name"), "snippet": a.get("description") or ""} for a in js.get("articles", [])[:max_results]]
except Exception as e:
logger.warning("GNews error: %s", e)
return []
def fetch_serpapi(query: str, max_results: int = 6) -> List[Dict[str,str]]:
if not SERPAPI_KEY:
return []
try:
url = "https://serpapi.com/search.json"
params = {"q": query, "api_key": SERPAPI_KEY, "num": max_results, "engine": "google"}
r = requests.get(url, params=params, headers=safe_headers(), timeout=8)
r.raise_for_status()
js = r.json()
results = []
for item in js.get("organic_results", [])[:max_results]:
results.append({"title": item.get("title"), "url": item.get("link"), "source": item.get("source") or item.get("displayed_link"), "snippet": item.get("snippet") or ""})
return results
except Exception as e:
logger.warning("SerpApi error: %s", e)
return []
def gather_news_evidence(query: str, max_results=6) -> List[Dict[str,str]]:
items = []
items.extend(fetch_newsapi(query, max_results))
items.extend(fetch_gnews(query, max_results))
items.extend(fetch_serpapi(query, max_results))
# dedupe by url
seen = set()
dedup = []
for it in items:
url = it.get("url")
if not url or url in seen:
continue
seen.add(url)
dedup.append(it)
return dedup[:max_results]
# --- Fact-check (Google Fact Check Tools) ---
def factcheck_claim(claim: str) -> Dict[str,Any]:
if not FACTCHECK_KEY:
return {"outcome": "api_key_missing", "source": []}
try:
url = "https://factchecktools.googleapis.com/v1alpha1/claims:search"
params = {"query": claim, "key": FACTCHECK_KEY, "languageCode": "en", "pageSize": 5}
r = requests.get(url, params=params, headers=safe_headers(), timeout=8)
r.raise_for_status()
js = r.json()
claims = js.get("claims", [])
results = []
for c in claims:
text = c.get("text")
for review in c.get("claimReview", []):
results.append({
"claimant": c.get("claimant"),
"text": text,
"publisher": review.get("publisher", {}).get("name"),
"title": review.get("title"),
"url": review.get("url"),
"rating": review.get("textualRating")
})
outcome = "unverified" if not results else results[0].get("rating", "unverified")
return {"outcome": outcome, "source": results}
except Exception as e:
logger.warning("factcheck error: %s", e)
return {"outcome": "error", "error": str(e), "source": []}
# --- Safe Browsing (Google) ---
def check_safe_browsing(url: str) -> Dict[str,Any]:
if not SAFE_BROWSING_KEY:
return {"status": "api_key_missing"}
try:
endpoint = f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={SAFE_BROWSING_KEY}"
payload = {
"client": {"clientId": "misinfo-gradio", "clientVersion": "1.0"},
"threatInfo": {
"threatTypes": ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION"],
"platformTypes": ["ANY_PLATFORM"],
"threatEntryTypes": ["URL"],
"threatEntries": [{"url": url}]
}
}
r = requests.post(endpoint, json=payload, headers=safe_headers(), timeout=8)
r.raise_for_status()
js = r.json()
return {"status": "ok", "matches": js.get("matches", [])}
except Exception as e:
logger.warning("safe browsing error: %s", e)
return {"status": "error", "error": str(e)}
# --- VirusTotal check (best-effort) ---
def check_virustotal(url: str) -> Dict[str,Any]:
if not VIRUSTOTAL_KEY:
return {"status": "api_key_missing"}
try:
# Submit URL to /urls to get id
submit = requests.post("https://www.virustotal.com/api/v3/urls", data={"url": url}, headers={"x-apikey": VIRUSTOTAL_KEY}, timeout=10)
submit.raise_for_status()
data = submit.json()
url_id = data.get("data", {}).get("id")
if not url_id:
return {"status": "error", "error": "no_id"}
# Get analysis/summary (v3 has endpoints /urls/{id})
r = requests.get(f"https://www.virustotal.com/api/v3/urls/{url_id}", headers={"x-apikey": VIRUSTOTAL_KEY}, timeout=10)
r.raise_for_status()
info = r.json()
return {"status": "ok", "info": info}
except Exception as e:
logger.warning("virustotal error: %s", e)
return {"status": "error", "error": str(e)}
# --- Semantic evidence selection ---
def select_relevant_sentences(claim: str, article_text: str, top_k: int = 5) -> List[str]:
model = get_sente_model()
if not model:
# fallback: return first sentences
sents = re.split(r'(?<=[.!?]) +', article_text)
return [s.strip() for s in sents[:top_k] if s.strip()]
# split into sentences and compute similarity
sentences = [s.strip() for s in re.split(r'(?<=[.!?]) +', article_text) if s.strip()]
if not sentences:
return []
try:
claim_emb = model.encode(claim, convert_to_tensor=True)
sent_embs = model.encode(sentences, convert_to_tensor=True)
import numpy as np
from sentence_transformers import util
sims = util.cos_sim(claim_emb, sent_embs)[0].cpu().numpy()
idxs = list(np.argsort(-sims)[:top_k])
selected = [sentences[i] for i in idxs if i < len(sentences)]
return selected
except Exception as e:
logger.warning("semantic selection error: %s", e)
# fallback
return sentences[:top_k]
# --- Zero-shot classification (truth + content type) ---
def zero_shot_classify(text: str) -> Dict[str,Any]:
pipe = get_zs_pipe()
res = {}
if pipe:
try:
truth_labels = ["True", "False", "Misleading", "Unverifiable"]
r1 = pipe(text, truth_labels, multi_label=False, truncation=True)
res["truth_label"] = r1["labels"][0]
res["truth_score"] = float(r1["scores"][0])
except Exception as e:
logger.warning("zero-shot truth error: %s", e)
res["truth_label"] = "Unknown"; res["truth_score"] = 0.0
try:
type_labels = ["News","Opinion","Satire","Rumor"]
r2 = pipe(text, type_labels, multi_label=False, truncation=True)
res["content_type"] = r2["labels"][0]
res["content_type_score"] = float(r2["scores"][0])
except Exception as e:
logger.warning("zero-shot content type error: %s", e)
res["content_type"] = "Unknown"; res["content_type_score"] = 0.0
else:
res = {"truth_label":"Unknown","truth_score":0.0,"content_type":"Unknown","content_type_score":0.0}
return res
# --- Gemini deep verification ---
def gemini_verify(claim: str, evidence: List[str], domain: Optional[str]) -> Dict[str,Any]:
client = get_gemini_client()
if not client:
return {"outcome": "api_missing", "explanation": "Gemini API key not set or client failed", "raw": None}
# structured prompt asking for JSON
prompt = (
"You are an expert fact-checker. Given the claim and evidence, output valid JSON with keys:\n"
"outcome (one of: True, False, Misleading, Unverifiable),\n"
"confidence (0-1),\n"
"explanation (short),\n"
"takeaways (list of 1-3 short tips),\n"
"sources (list of cited sources if any).\n\n"
f"Claim: {claim}\n\n"
f"Domain: {domain}\n\n"
"Evidence:\n" + ("\n".join(f"- {e}" for e in evidence)) + "\n\n"
"Provide only JSON in the response."
)
try:
resp = client.models.generate_content(model="gemini-2.5-flash", contents=prompt)
text = resp.text
# attempt to parse JSON substring
try:
parsed = json.loads(text)
return {"outcome":"ok", "result": parsed, "raw": text}
except Exception:
# try to find first { ... } substring
m = re.search(r'(\{.*\})', text, flags=re.S)
if m:
try:
parsed = json.loads(m.group(1))
return {"outcome":"ok", "result": parsed, "raw": text}
except Exception:
return {"outcome":"parse_error", "raw": text}
return {"outcome":"no_json", "raw": text}
except Exception as e:
logger.warning("gemini error: %s", e)
return {"outcome":"error", "error": str(e)}
# --- Fusion of signals into credibility score ---
def fuse_signals(truth_score: float, domain: Optional[str], evidence_count: int, gemini_outcome: Optional[Dict[str,Any]]) -> Dict[str,Any]:
# base from truth_score (0-1)
base = truth_score
# domain trust
domain_factor = 1.0
if domain:
if domain in TRUSTED_DOMAINS:
domain_factor += 0.2
elif domain in BLACKLISTED_DOMAINS:
domain_factor -= 0.4
else:
domain_factor += 0.0
# evidence factor (cap to 1)
evidence_factor = min(evidence_count / 5.0, 1.0)
# gemini adjustment
gemini_adj = 1.0
if gemini_outcome and gemini_outcome.get("result"):
res = gemini_outcome["result"]
out = res.get("outcome", "").lower()
conf = float(res.get("confidence", 0.5)) if isinstance(res.get("confidence", 0.5), (float,int,str)) else 0.5
if out in ("false","misleading"):
gemini_adj -= 0.25 * conf
elif out == "true":
gemini_adj += 0.1 * conf
elif out == "unverifiable":
gemini_adj -= 0.05 * conf
# combine
score = base * 0.5 + evidence_factor * 0.3 + (domain_factor - 1.0) * 0.2
score = score * gemini_adj
score = max(0.0, min(1.0, score))
pct = int(round(score * 100))
color = "green" if pct >= 70 else "yellow" if pct >= 40 else "red"
return {"score": pct, "color": color, "raw": score}
# --- Main pipeline: single mode (run everything) ---
def analyze_pipeline(article: Optional[str], url: Optional[str], image: Optional[Image.Image], claim_override: Optional[str], top_k_evidence: int = 5):
# 1) choose text source
source = None
text = ""
domain = None
if article and article.strip():
source = "article"
text = article.strip()
elif url and url.strip():
source = "url"
domain = extract_domain(url)
text = fetch_and_extract(url) or ""
elif image is not None:
source = "image"
text = ocr_image_to_text(image) or ""
else:
return {"error": "No input provided. Paste article text, or a URL, or upload image."}
# limit text
if len(text) > 4000:
text = text[:4000]
# claim to check: use explicit claim_override or try to use first sentence/headline
claim = claim_override.strip() if claim_override and claim_override.strip() else (re.split(r'(?<=[.!?]) +', text.strip())[0] if text else "")
# 2) quick zero-shot classification
zs = zero_shot_classify(text if len(claim) < 30 else claim) # run on claim if short, else on text
truth_label = zs.get("truth_label")
truth_score = zs.get("truth_score", 0.0)
content_type = zs.get("content_type")
content_type_score = zs.get("content_type_score", 0.0)
# 3) evidence: internal (from article) and external (news APIs)
internal_evidence = select_relevant_sentences(claim or text, text, top_k=top_k_evidence) if text else []
# external news queries: search using claim or summary
query = claim or (text[:200])
external_articles = gather_news_evidence(query, max_results=6)
# filter to credible domains
ext_filtered = []
for a in external_articles:
dom = extract_domain(a.get("url") or "")
a["domain"] = dom
if dom and dom in TRUSTED_DOMAINS:
ext_filtered.append(a)
# 4) fact-check API
fact = factcheck_claim(claim or text)
# 5) safe browsing + virustotal only if URL input provided
safe_browsing_res = check_safe_browsing(url) if url else {"status":"no_url"}
virustotal_res = check_virustotal(url) if url else {"status":"no_url"}
# 6) deep verify with Gemini (claim + internal+external evidence)
evidence_for_gemini = internal_evidence[:top_k_evidence] + [ (a.get("title") or "") + " - " + (a.get("snippet") or "") for a in ext_filtered[:top_k_evidence] ]
gemini_res = gemini_verify(claim or text, evidence_for_gemini, domain)
# 7) fuse signals
credibility = fuse_signals(truth_score, domain, len(internal_evidence) + len(ext_filtered), gemini_res)
# 8) build outputs & tips
tips = (
"- Check the source domain and author.\n"
"- Cross-check the claim with multiple trusted outlets.\n"
"- Look for official statements or peer-reviewed studies for scientific claims.\n"
"- Be skeptical of sensational language and images without context."
)
out = {
"source": source,
"domain": domain,
"claim": claim,
"text_snippet": text[:800],
"quick_classification": {"truth_label": truth_label, "truth_score": truth_score, "content_type": content_type, "content_type_score": content_type_score},
"internal_evidence": internal_evidence,
"external_evidence": ext_filtered,
"factcheck": fact,
"safe_browsing": safe_browsing_res,
"virustotal": {"status": virustotal_res.get("status", "unknown"), "summary": (virustotal_res.get("info") or {}) if isinstance(virustotal_res, dict) else {}},
"gemini_verification": gemini_res,
"credibility": credibility,
"tips": tips
}
return out
# --- Gradio UI ---
def pretty_output(result: Dict[str,Any]):
if not isinstance(result, dict):
return str(result), "", "", "", ""
if result.get("error"):
return result["error"], "", "", "", ""
# format sections
header = f"Credibility score: {result['credibility']['score']} ({result['credibility']['color']})"
quick = json.dumps(result.get("quick_classification", {}), indent=2)
evidence = ""
if result.get("internal_evidence"):
evidence += "Internal evidence (from article):\n" + "\n".join(f"- {s}" for s in result["internal_evidence"]) + "\n\n"
if result.get("external_evidence"):
evidence += "External corroborating articles:\n" + "\n".join(f"- {a.get('title')} ({a.get('domain')}) — {a.get('url')}" for a in result["external_evidence"]) + "\n\n"
fact = json.dumps(result.get("factcheck", {}), indent=2)
gemini = result.get("gemini_verification", {})
gemini_text = json.dumps(gemini, indent=2) if gemini else ""
tips = result.get("tips", "")
return header, quick, evidence, fact, gemini_text + "\n\n" + tips
with gr.Blocks() as demo:
gr.Markdown("# 🛡️ Unified Misinformation Detector (single mode)")
gr.Markdown("Provide either Article text (preferred), or a URL, or upload an image (screenshot). Optionally add a short claim to check.")
with gr.Row():
article_in = gr.Textbox(lines=6, label="Paste Article Text (preferred)")
url_in = gr.Textbox(label="Article URL")
image_in = gr.Image(type="pil", label="Upload Image (screenshot)")
claim_in = gr.Textbox(lines=1, label="Optional short claim (override automatic claim extraction)")
topk = gr.Slider(1, 8, value=5, step=1, label="Top-K evidence sentences")
run_btn = gr.Button("Run Full Pipeline")
out_header = gr.Textbox(label="Summary", interactive=False)
out_quick = gr.Code(label="Quick classification (truth + content type)")
out_evidence = gr.Textbox(label="Evidence & External articles", lines=12)
out_factcheck = gr.Code(label="Fact-check API result")
out_gemini = gr.Code(label="Gemini result + Tips")
def run(article, url, image, claim_override, top_k):
res = analyze_pipeline(article, url, image, claim_override, top_k_evidence=int(top_k))
return pretty_output(res)
run_btn.click(run, inputs=[article_in, url_in, image_in, claim_in, topk], outputs=[out_header, out_quick, out_evidence, out_factcheck, out_gemini])
if __name__ == "__main__":
demo.launch()