Files changed (1) hide show
  1. app.py +437 -309
app.py CHANGED
@@ -1,384 +1,512 @@
 
1
  import os
2
- import json
3
  import re
 
 
 
 
4
  from typing import List, Dict, Any, Optional
5
- from fastapi import FastAPI, HTTPException
6
- from fastapi.responses import HTMLResponse
7
- from pydantic import BaseModel
8
- from dotenv import load_dotenv
9
- import requests
10
- from bs4 import BeautifulSoup
11
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
12
- from fastapi.middleware.cors import CORSMiddleware
13
 
 
 
 
 
 
 
14
 
15
- # ---------------- Lazy-loaded AI Models ----------------
16
  ZS_PIPE = None
17
- SENTE_MODEL = None
18
  GEMINI_CLIENT = None
19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
  def get_zs_pipe():
21
  global ZS_PIPE
22
  if ZS_PIPE is None:
23
  try:
24
  from transformers import pipeline
25
- # much smaller model (~250MB vs 1.3GB)
26
  ZS_PIPE = pipeline("zero-shot-classification", model="typeform/distilbert-base-uncased-mnli")
27
- except Exception:
 
28
  ZS_PIPE = None
29
  return ZS_PIPE
30
 
31
  def get_sente_model():
32
- global SENTE_MODEL
33
- if SENTE_MODEL is None:
34
  try:
35
  from sentence_transformers import SentenceTransformer
36
- # smaller semantic similarity model (~80MB vs 400MB)
37
- SENTE_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
38
- except Exception:
39
- SENTE_MODEL = None
40
- return SENTE_MODEL
41
-
42
 
43
  def get_gemini_client():
44
  global GEMINI_CLIENT
45
- if GEMINI_CLIENT is None:
46
  try:
47
  from google import genai
48
- GEMINI_CLIENT = genai.Client() # uses GEMINI_API_KEY from environment
49
- except Exception:
 
50
  GEMINI_CLIENT = None
51
  return GEMINI_CLIENT
52
 
53
- # ---------------- Env Vars ----------------
54
- load_dotenv()
55
- GNEWS_API_KEY = os.getenv("GNEWS_KEY")
56
- NEWSORG_API_KEY = os.getenv("NEWSORG_KEY")
57
- GEMINI_API_KEY = os.getenv("AI_API_KEY")
58
-
59
- app = FastAPI(title="Hybrid Misinformation Detector")
60
- # Define allowed origins
61
- origins = ["*"]
62
-
63
- # Add CORS middleware
64
- app.add_middleware(
65
- CORSMiddleware,
66
- allow_origins=origins, # List of allowed origins
67
- allow_credentials=True, # Allow cookies and credentials
68
- allow_methods=["*"], # Allow all HTTP methods (GET, POST, etc.)
69
- allow_headers=["*"], # Allow all headers
70
- )
71
- # ---------------- Models ----------------
72
- class VerifyRequest(BaseModel):
73
- text: str
74
- mode: Optional[str] = "fast" # fast, deep, hybrid
75
-
76
- # ---------------- Utilities ----------------
77
- def safe_headers():
78
- return {"User-Agent": "misinfo-tool/1.0 (+https://example.com)"}
79
-
80
- def domain_from_url(url: str) -> Optional[str]:
81
- if not url: return None
82
  try:
83
- m = re.search(r"https?://(?:www\.)?([^/]+)/?", url)
84
- if m:
85
- domain = m.group(1).lower()
86
- parts = domain.split('.')
87
- if len(parts) > 2:
88
- domain = ".".join(parts[-2:])
89
- return domain
90
- except Exception:
91
- return None
92
- return None
93
-
94
- # ---------------- Trusted / Blacklist ----------------
95
- TRUSTED_DOMAINS = {
96
- "bbc.co.uk","bbc.com","cnn.com","nytimes.com","reuters.com","apnews.com",
97
- "theguardian.com","npr.org","washingtonpost.com","wsj.com","usatoday.com",
98
- "bloomberg.com","aljazeera.com","msnbc.com","cnbc.com","foxnews.com",
99
- "scientificamerican.com","nature.com","sciencedaily.com"
100
- }
101
-
102
- BLACKLISTED_DOMAINS = {
103
- "imdb.com","youtube.com","wikipedia.org","fandom.com","comicbook.com",
104
- "rottentomatoes.com","hulu.com","netflix.com","ign.com","forbes.com"
105
- }
106
 
107
- UNWANTED_KEYWORDS = [
108
- "movie","film","episode","tv show","trailer","comic","manga","fan","fandom",
109
- "review","fiction","novel","fantasy","screenplay","actor","actress"
110
- ]
 
 
 
 
111
 
112
- # ---------------- NLP classify ----------------
113
- def classify_text_type(text: str) -> Dict[str, Any]:
114
- labels = ["news","rumor","fact","opinion","satire","unverifiable"]
115
- pipe = get_zs_pipe()
116
- if pipe:
117
- try:
118
- res = pipe(text, labels, multi_label=False, truncation=True)
119
- label = res["labels"][0]
120
- score = float(res["scores"][0])
121
- return {"type": label, "score": round(score,3), "scores": dict(zip(res["labels"], res["scores"]))}
122
- except Exception:
123
- pass
124
- t = text.lower()
125
- if any(k in t for k in ["according to","reported","breaking","news","announced"]):
126
- return {"type":"news","score":0.65,"scores":{}}
127
- if any(k in t for k in ["i think","in my opinion","i believe","should"]):
128
- return {"type":"opinion","score":0.7,"scores":{}}
129
- if any(k in t for k in ["joke","satire","not real","parody"]):
130
- return {"type":"satire","score":0.7,"scores":{}}
131
- if any(k in t for k in ["study shows","research","published","peer-reviewed"]):
132
- return {"type":"fact","score":0.6,"scores":{}}
133
- return {"type":"rumor","score":0.45,"scores":{}}
134
-
135
- def summarize_text(text: str, max_len=300) -> str:
136
- sentences = re.split(r'(?<=[.!?]) +', text.strip())
137
- summary = sentences[0] if sentences else text
138
- if len(summary) > max_len:
139
- summary = summary[:max_len].rsplit(' ',1)[0] + "..."
140
- return summary
141
-
142
- # ---------------- Search ----------------
143
- def fetch_gnews(query: str, max_results=6) -> List[Dict[str,str]]:
144
- if not GNEWS_API_KEY:
145
  return []
146
  try:
147
- url = "https://gnews.io/api/v4/search"
148
- params = {"q": query, "token": GNEWS_API_KEY, "max": max_results, "lang":"en"}
149
- r = requests.get(url, params=params, headers=safe_headers(), timeout=6)
150
  r.raise_for_status()
151
  js = r.json()
152
- return [{"title": a.get("title"), "url": a.get("url"), "source": a.get("source",{}).get("name"), "snippet": a.get("description")} for a in js.get("articles", [])[:max_results]]
153
- except Exception:
 
 
 
 
154
  return []
155
 
156
- def fetch_newsapi(query: str, max_results=6) -> List[Dict[str,str]]:
157
- if not NEWSORG_API_KEY:
158
  return []
159
  try:
160
- url = "https://newsapi.org/v2/everything"
161
- params = {"q": query, "pageSize": max_results, "apiKey": NEWSORG_API_KEY, "language":"en"}
162
- r = requests.get(url, params=params, headers=safe_headers(), timeout=6)
163
  r.raise_for_status()
164
  js = r.json()
165
- return [{"title": a.get("title"), "url": a.get("url"), "source": a.get("source",{}).get("name"), "snippet": a.get("description")} for a in js.get("articles", [])[:max_results]]
166
- except Exception:
 
167
  return []
168
 
169
- def duckduckgo_search(query: str, max_results=8) -> List[Dict[str,str]]:
 
 
170
  try:
171
- url = "https://html.duckduckgo.com/html/"
172
- r = requests.post(url, data={"q": query}, headers=safe_headers(), timeout=6)
 
173
  r.raise_for_status()
174
- soup = BeautifulSoup(r.text, "html.parser")
175
  results = []
176
- for res in soup.select(".result__a")[:max_results]:
177
- title = res.get_text()
178
- href = res.get("href")
179
- snippet_node = res.find_parent().select_one(".result__snippet")
180
- snippet = snippet_node.get_text() if snippet_node else ""
181
- results.append({"title": title, "url": href, "source":None, "snippet": snippet})
182
  return results
183
- except Exception:
 
184
  return []
185
 
186
- # ---------------- Optimized fetch all sources ----------------
187
- def fetch_all_sources(query: str) -> List[Dict[str,str]]:
188
- with ThreadPoolExecutor(max_workers=3) as executor:
189
- futures = [
190
- executor.submit(fetch_gnews, query),
191
- executor.submit(fetch_newsapi, query),
192
- executor.submit(duckduckgo_search, query)
193
- ]
194
- results = []
195
- for f in futures:
196
- try:
197
- results.extend(f.result())
198
- except:
199
- pass
200
- return results
201
-
202
- # ---------------- Filtering ----------------
203
- def is_unwanted_snippet(snippet: str) -> bool:
204
- if not snippet: return False
205
- s = snippet.lower()
206
- return any(k in s for k in UNWANTED_KEYWORDS)
207
-
208
- def filter_sources(sources: List[Dict[str,str]]) -> List[Dict[str,str]]:
209
- kept, seen = [], set()
210
- for s in sources:
211
- url = s.get("url") or ""
212
- if not url or url in seen: continue
213
  seen.add(url)
214
- domain = domain_from_url(url)
215
- s["domain"] = domain or ""
216
- if not domain: continue
217
- if domain in BLACKLISTED_DOMAINS: continue
218
- if domain not in TRUSTED_DOMAINS: continue
219
- if is_unwanted_snippet(s.get("snippet","")) or is_unwanted_snippet(s.get("title","")): continue
220
- kept.append(s)
221
- return kept
222
-
223
- # ---------------- Semantic filtering ----------------
224
- def compute_similarity(args):
225
- claim_emb, snippet = args
226
- model = get_sente_model()
227
- if not model: return 0.0
228
- snippet_emb = model.encode(snippet, convert_to_tensor=True)
229
- from sentence_transformers import util
230
- return util.cos_sim(claim_emb, snippet_emb).item()
231
-
232
- def semantic_filter_parallel(claim: str, sources: List[Dict[str,str]], threshold=0.3) -> List[Dict[str,str]]:
233
- model = get_sente_model()
234
- if not model or not sources:
235
- return sources
236
-
237
- claim_emb = model.encode(claim, convert_to_tensor=True)
238
- args = [(claim_emb, s["snippet"]) for s in sources]
239
-
240
- filtered = []
241
- with ProcessPoolExecutor(max_workers=min(4, len(sources))) as executor:
242
- sims = list(executor.map(compute_similarity, args))
243
-
244
- for s, sim in zip(sources, sims):
245
- if sim >= threshold:
246
- filtered.append(s)
247
- return filtered
248
-
249
- # ---------------- Evidence summary ----------------
250
- def summarize_evidence(sources: List[Dict[str,str]], max_chars=800) -> str:
251
- if not sources:
252
- return "No credible news sources found."
253
- parts = []
254
- for s in sources[:8]:
255
- t = s.get("title") or ""
256
- snip = s.get("snippet") or ""
257
- domain = s.get("domain") or domain_from_url(s.get("url","")) or ""
258
- parts.append(f"{t} ({domain}) — {snip}")
259
- res = "\n".join(parts)
260
- if len(res) > max_chars:
261
- return res[:max_chars].rsplit(" ",1)[0] + "..."
262
- return res
263
 
264
- # ---------------- Fusion ----------------
265
- def fuse_scores(fast_conf: float, deep_outcome: Optional[str], evidence_count: int) -> Dict[str,Any]:
266
- base = fast_conf*0.5 + min(evidence_count/5.0,1.0)*0.5
267
- if deep_outcome and deep_outcome.lower() in ["false","misleading"]:
268
- base *= 0.7
269
- score = int(round(max(0, min(1, base)) * 100))
270
- color = "green" if score >= 70 else "yellow" if score >= 40 else "red"
271
- return {"score":score, "color":color}
272
-
273
- # ---------------- Fact Check API ----------------
274
  def factcheck_claim(claim: str) -> Dict[str,Any]:
275
- api_key = "AIzaSyB0A-MIHs8qkjYTWE-TnoLw46KplX-Ihjs"
276
- url = "https://factchecktools.googleapis.com/v1alpha1/claims:search"
277
- params = {"query": claim, "key": api_key, "languageCode": "en", "pageSize": 5}
278
  try:
279
- r = requests.get(url, params=params, headers=safe_headers(), timeout=6)
 
 
280
  r.raise_for_status()
281
  js = r.json()
282
  claims = js.get("claims", [])
283
  results = []
284
  for c in claims:
285
- claimant = c.get("claimant", "Unknown")
286
- text = c.get("text", "")
287
- claimReview = c.get("claimReview", [])
288
- for review in claimReview:
289
- publisher = review.get("publisher", {}).get("name")
290
- url = review.get("url")
291
- title = review.get("title")
292
- review_rating = review.get("textualRating")
293
  results.append({
294
- "claimant": claimant,
295
  "text": text,
296
- "publisher": publisher,
297
- "url": url,
298
- "title": title,
299
- "rating": review_rating
300
  })
301
- outcome = "Unverified" if not results else results[0].get("rating", "Unverified")
302
  return {"outcome": outcome, "source": results}
303
  except Exception as e:
304
- return {"outcome": "Error", "source": [], "error": str(e)}
305
-
306
- # ---------------- API ----------------
307
- @app.post("/verify")
308
- async def verify(req: VerifyRequest):
309
- claim = (req.text or "").strip()
310
- mode = (req.mode or "fast").lower()
311
- if not claim:
312
- raise HTTPException(status_code=400, detail="Empty claim")
313
-
314
- # Step 1 classify
315
- text_type_res = classify_text_type(claim)
316
- stored_type = text_type_res["type"]
317
-
318
- # Step 2 summarize
319
- user_summary = summarize_text(claim)
320
-
321
- # Step 3 search
322
- query = f"{user_summary} site:bbc.com OR site:cnn.com OR site:reuters.com OR site:apnews.com"
323
- all_raw = fetch_all_sources(query)
324
 
325
- # Step 4 filter
326
- filtered = filter_sources(all_raw)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
327
 
328
- # Step 4b semantic filter
329
- filtered = semantic_filter_parallel(claim, filtered)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
330
 
331
- evidence_summary = summarize_evidence(filtered)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
332
 
333
- # Step 5 fast classification
334
- fast_label, fast_conf = "Unverifiable", 0.4
335
  pipe = get_zs_pipe()
 
336
  if pipe:
337
  try:
338
- cls = pipe(claim, ["True","False","Misleading","Unverifiable"], multi_label=False, truncation=True)
339
- fast_label = cls["labels"][0]
340
- fast_conf = float(cls["scores"][0])
341
- except:
342
- pass
343
-
344
- # Step 6 deep (Gemini AI)
345
- deep_result = None
346
- if mode in ["deep","hybrid"]:
347
- client = get_gemini_client()
348
- if client:
349
- try:
350
- prompt = f'Verify claim: "{claim}". Output JSON: outcome, explanation, comparison, takeaways.'
351
- resp = client.models.generate_content(model="gemini-2.5-flash", contents=prompt)
352
- deep_result = json.loads(resp.text)
353
- except:
354
- deep_result = {"outcome":"Unverifiable","explanation":"Gemini API error","takeaways":["Check credible sources"]}
355
- else:
356
- deep_result = {"outcome":"Unverifiable","explanation":"Demo mode: API missing","takeaways":["Check credible sources"]}
357
-
358
- # Step 7 fact-check API
359
- factcheck = factcheck_claim(claim)
360
-
361
- # Step 8 fuse scores
362
- deep_outcome = deep_result.get("outcome") if deep_result else None
363
- fuse = fuse_scores(fast_conf, deep_outcome, len(filtered))
364
 
365
- return {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
366
  "claim": claim,
367
- "text_type": stored_type,
368
- "text_type_scores": text_type_res.get("scores", {}),
369
- "user_summary": user_summary,
370
- "fast": {"label": fast_label, "confidence": round(fast_conf,3)},
371
- "evidence_count_raw": len(all_raw),
372
- "evidence_count_filtered": len(filtered),
373
- "evidence": filtered,
374
- "evidence_summary": evidence_summary,
375
- "deep": deep_result or {},
376
- "factcheck": factcheck,
377
- "credibility": fuse
378
  }
379
-
380
- # ---------------- Frontend ----------------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
381
 
382
  if __name__ == "__main__":
383
- import uvicorn
384
- uvicorn.run("app:app", host="0.0.0.0", port=int(os.getenv("PORT","1748")))
 
1
+ # misinfo_gradio_full.py
2
  import os
 
3
  import re
4
+ import time
5
+ import json
6
+ import base64
7
+ import logging
8
  from typing import List, Dict, Any, Optional
 
 
 
 
 
 
 
 
9
 
10
+ import requests
11
+ import trafilatura
12
+ import tldextract
13
+ import gradio as gr
14
+ from PIL import Image
15
+ import pytesseract
16
 
17
+ # ML lazy-load
18
  ZS_PIPE = None
19
+ SENTE = None
20
  GEMINI_CLIENT = None
21
 
22
+ # Load env
23
+ from dotenv import load_dotenv
24
+ load_dotenv()
25
+
26
+ NEWSAPI_KEY = os.getenv("NEWSAPI_KEY")
27
+ GNEWS_KEY = os.getenv("GNEWS_KEY")
28
+ SERPAPI_KEY = os.getenv("SERPAPI_KEY")
29
+ FACTCHECK_KEY = os.getenv("FACTCHECK_KEY")
30
+ SAFE_BROWSING_KEY = os.getenv("SAFE_BROWSING_KEY")
31
+ VIRUSTOTAL_KEY = os.getenv("VIRUSTOTAL_KEY")
32
+ GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
33
+
34
+ # Logging
35
+ logging.basicConfig(level=logging.INFO)
36
+ logger = logging.getLogger("misinfo")
37
+
38
+ # --- Helpers ---
39
+ def safe_headers():
40
+ return {"User-Agent": "misinfo-gradio/1.0"}
41
+
42
+ def extract_domain(url: str) -> Optional[str]:
43
+ try:
44
+ ext = tldextract.extract(url)
45
+ if ext.registered_domain:
46
+ return ext.registered_domain.lower()
47
+ except Exception:
48
+ pass
49
+ return None
50
+
51
+ TRUSTED_DOMAINS = {
52
+ "bbc.co.uk","bbc.com","cnn.com","nytimes.com","reuters.com","apnews.com",
53
+ "theguardian.com","npr.org","washingtonpost.com","wsj.com","usatoday.com",
54
+ "bloomberg.com","aljazeera.com","msnbc.com","cnbc.com","foxnews.com",
55
+ "scientificamerican.com","nature.com","sciencedaily.com","timesofindia.indiatimes.com","indiatimes.com"
56
+ }
57
+ BLACKLISTED_DOMAINS = {"example-bad-site.com"} # keep small; replace with curated list in prod
58
+
59
+ # --- Model loaders ---
60
  def get_zs_pipe():
61
  global ZS_PIPE
62
  if ZS_PIPE is None:
63
  try:
64
  from transformers import pipeline
 
65
  ZS_PIPE = pipeline("zero-shot-classification", model="typeform/distilbert-base-uncased-mnli")
66
+ except Exception as e:
67
+ logger.warning("zero-shot pipeline load error: %s", e)
68
  ZS_PIPE = None
69
  return ZS_PIPE
70
 
71
  def get_sente_model():
72
+ global SENTE
73
+ if SENTE is None:
74
  try:
75
  from sentence_transformers import SentenceTransformer
76
+ SENTE = SentenceTransformer("all-MiniLM-L6-v2")
77
+ except Exception as e:
78
+ logger.warning("sentence-transformers load error: %s", e)
79
+ SENTE = None
80
+ return SENTE
 
81
 
82
  def get_gemini_client():
83
  global GEMINI_CLIENT
84
+ if GEMINI_CLIENT is None and GEMINI_API_KEY:
85
  try:
86
  from google import genai
87
+ GEMINI_CLIENT = genai.Client(api_key=GEMINI_API_KEY)
88
+ except Exception as e:
89
+ logger.warning("gemini client init error: %s", e)
90
  GEMINI_CLIENT = None
91
  return GEMINI_CLIENT
92
 
93
+ # --- Extraction ---
94
+ def fetch_and_extract(url: str, max_chars: int = 4000) -> str:
95
+ """Use trafilatura to fetch & extract main article text."""
96
+ if not url:
97
+ return ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98
  try:
99
+ downloaded = trafilatura.fetch_url(url, headers=safe_headers(), timeout=12)
100
+ if not downloaded:
101
+ return ""
102
+ text = trafilatura.extract(downloaded, include_comments=False, include_tables=False)
103
+ if not text:
104
+ return ""
105
+ text = re.sub(r'\s+', ' ', text).strip()
106
+ return text[:max_chars]
107
+ except Exception as e:
108
+ logger.warning("fetch_and_extract error: %s", e)
109
+ return ""
 
 
 
 
 
 
 
 
 
 
 
 
110
 
111
+ def ocr_image_to_text(img: Image.Image, max_chars=4000) -> str:
112
+ try:
113
+ text = pytesseract.image_to_string(img)
114
+ text = re.sub(r'\s+', ' ', text).strip()
115
+ return text[:max_chars]
116
+ except Exception as e:
117
+ logger.warning("OCR error: %s", e)
118
+ return ""
119
 
120
+ # --- News / evidence fetching ---
121
+ def fetch_newsapi(query: str, max_results: int = 6) -> List[Dict[str,str]]:
122
+ if not NEWSAPI_KEY:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  return []
124
  try:
125
+ url = "https://newsapi.org/v2/everything"
126
+ params = {"q": query, "pageSize": max_results, "apiKey": NEWSAPI_KEY, "language": "en", "sortBy": "relevancy"}
127
+ r = requests.get(url, params=params, headers=safe_headers(), timeout=8)
128
  r.raise_for_status()
129
  js = r.json()
130
+ articles = []
131
+ for a in js.get("articles", [])[:max_results]:
132
+ articles.append({"title": a.get("title"), "url": a.get("url"), "source": a.get("source",{}).get("name"), "snippet": a.get("description") or a.get("content") or ""})
133
+ return articles
134
+ except Exception as e:
135
+ logger.warning("NewsAPI error: %s", e)
136
  return []
137
 
138
+ def fetch_gnews(query: str, max_results: int = 6) -> List[Dict[str,str]]:
139
+ if not GNEWS_KEY:
140
  return []
141
  try:
142
+ url = "https://gnews.io/api/v4/search"
143
+ params = {"q": query, "token": GNEWS_KEY, "max": max_results, "lang": "en"}
144
+ r = requests.get(url, params=params, headers=safe_headers(), timeout=8)
145
  r.raise_for_status()
146
  js = r.json()
147
+ return [{"title": a.get("title"), "url": a.get("url"), "source": a.get("source",{}).get("name"), "snippet": a.get("description") or ""} for a in js.get("articles", [])[:max_results]]
148
+ except Exception as e:
149
+ logger.warning("GNews error: %s", e)
150
  return []
151
 
152
+ def fetch_serpapi(query: str, max_results: int = 6) -> List[Dict[str,str]]:
153
+ if not SERPAPI_KEY:
154
+ return []
155
  try:
156
+ url = "https://serpapi.com/search.json"
157
+ params = {"q": query, "api_key": SERPAPI_KEY, "num": max_results, "engine": "google"}
158
+ r = requests.get(url, params=params, headers=safe_headers(), timeout=8)
159
  r.raise_for_status()
160
+ js = r.json()
161
  results = []
162
+ for item in js.get("organic_results", [])[:max_results]:
163
+ results.append({"title": item.get("title"), "url": item.get("link"), "source": item.get("source") or item.get("displayed_link"), "snippet": item.get("snippet") or ""})
 
 
 
 
164
  return results
165
+ except Exception as e:
166
+ logger.warning("SerpApi error: %s", e)
167
  return []
168
 
169
+ def gather_news_evidence(query: str, max_results=6) -> List[Dict[str,str]]:
170
+ items = []
171
+ items.extend(fetch_newsapi(query, max_results))
172
+ items.extend(fetch_gnews(query, max_results))
173
+ items.extend(fetch_serpapi(query, max_results))
174
+ # dedupe by url
175
+ seen = set()
176
+ dedup = []
177
+ for it in items:
178
+ url = it.get("url")
179
+ if not url or url in seen:
180
+ continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
181
  seen.add(url)
182
+ dedup.append(it)
183
+ return dedup[:max_results]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
184
 
185
+ # --- Fact-check (Google Fact Check Tools) ---
 
 
 
 
 
 
 
 
 
186
  def factcheck_claim(claim: str) -> Dict[str,Any]:
187
+ if not FACTCHECK_KEY:
188
+ return {"outcome": "api_key_missing", "source": []}
 
189
  try:
190
+ url = "https://factchecktools.googleapis.com/v1alpha1/claims:search"
191
+ params = {"query": claim, "key": FACTCHECK_KEY, "languageCode": "en", "pageSize": 5}
192
+ r = requests.get(url, params=params, headers=safe_headers(), timeout=8)
193
  r.raise_for_status()
194
  js = r.json()
195
  claims = js.get("claims", [])
196
  results = []
197
  for c in claims:
198
+ text = c.get("text")
199
+ for review in c.get("claimReview", []):
 
 
 
 
 
 
200
  results.append({
201
+ "claimant": c.get("claimant"),
202
  "text": text,
203
+ "publisher": review.get("publisher", {}).get("name"),
204
+ "title": review.get("title"),
205
+ "url": review.get("url"),
206
+ "rating": review.get("textualRating")
207
  })
208
+ outcome = "unverified" if not results else results[0].get("rating", "unverified")
209
  return {"outcome": outcome, "source": results}
210
  except Exception as e:
211
+ logger.warning("factcheck error: %s", e)
212
+ return {"outcome": "error", "error": str(e), "source": []}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
213
 
214
+ # --- Safe Browsing (Google) ---
215
+ def check_safe_browsing(url: str) -> Dict[str,Any]:
216
+ if not SAFE_BROWSING_KEY:
217
+ return {"status": "api_key_missing"}
218
+ try:
219
+ endpoint = f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={SAFE_BROWSING_KEY}"
220
+ payload = {
221
+ "client": {"clientId": "misinfo-gradio", "clientVersion": "1.0"},
222
+ "threatInfo": {
223
+ "threatTypes": ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION"],
224
+ "platformTypes": ["ANY_PLATFORM"],
225
+ "threatEntryTypes": ["URL"],
226
+ "threatEntries": [{"url": url}]
227
+ }
228
+ }
229
+ r = requests.post(endpoint, json=payload, headers=safe_headers(), timeout=8)
230
+ r.raise_for_status()
231
+ js = r.json()
232
+ return {"status": "ok", "matches": js.get("matches", [])}
233
+ except Exception as e:
234
+ logger.warning("safe browsing error: %s", e)
235
+ return {"status": "error", "error": str(e)}
236
 
237
+ # --- VirusTotal check (best-effort) ---
238
+ def check_virustotal(url: str) -> Dict[str,Any]:
239
+ if not VIRUSTOTAL_KEY:
240
+ return {"status": "api_key_missing"}
241
+ try:
242
+ # Submit URL to /urls to get id
243
+ submit = requests.post("https://www.virustotal.com/api/v3/urls", data={"url": url}, headers={"x-apikey": VIRUSTOTAL_KEY}, timeout=10)
244
+ submit.raise_for_status()
245
+ data = submit.json()
246
+ url_id = data.get("data", {}).get("id")
247
+ if not url_id:
248
+ return {"status": "error", "error": "no_id"}
249
+ # Get analysis/summary (v3 has endpoints /urls/{id})
250
+ r = requests.get(f"https://www.virustotal.com/api/v3/urls/{url_id}", headers={"x-apikey": VIRUSTOTAL_KEY}, timeout=10)
251
+ r.raise_for_status()
252
+ info = r.json()
253
+ return {"status": "ok", "info": info}
254
+ except Exception as e:
255
+ logger.warning("virustotal error: %s", e)
256
+ return {"status": "error", "error": str(e)}
257
 
258
+ # --- Semantic evidence selection ---
259
+ def select_relevant_sentences(claim: str, article_text: str, top_k: int = 5) -> List[str]:
260
+ model = get_sente_model()
261
+ if not model:
262
+ # fallback: return first sentences
263
+ sents = re.split(r'(?<=[.!?]) +', article_text)
264
+ return [s.strip() for s in sents[:top_k] if s.strip()]
265
+ # split into sentences and compute similarity
266
+ sentences = [s.strip() for s in re.split(r'(?<=[.!?]) +', article_text) if s.strip()]
267
+ if not sentences:
268
+ return []
269
+ try:
270
+ claim_emb = model.encode(claim, convert_to_tensor=True)
271
+ sent_embs = model.encode(sentences, convert_to_tensor=True)
272
+ import numpy as np
273
+ from sentence_transformers import util
274
+ sims = util.cos_sim(claim_emb, sent_embs)[0].cpu().numpy()
275
+ idxs = list(np.argsort(-sims)[:top_k])
276
+ selected = [sentences[i] for i in idxs if i < len(sentences)]
277
+ return selected
278
+ except Exception as e:
279
+ logger.warning("semantic selection error: %s", e)
280
+ # fallback
281
+ return sentences[:top_k]
282
 
283
+ # --- Zero-shot classification (truth + content type) ---
284
+ def zero_shot_classify(text: str) -> Dict[str,Any]:
285
  pipe = get_zs_pipe()
286
+ res = {}
287
  if pipe:
288
  try:
289
+ truth_labels = ["True", "False", "Misleading", "Unverifiable"]
290
+ r1 = pipe(text, truth_labels, multi_label=False, truncation=True)
291
+ res["truth_label"] = r1["labels"][0]
292
+ res["truth_score"] = float(r1["scores"][0])
293
+ except Exception as e:
294
+ logger.warning("zero-shot truth error: %s", e)
295
+ res["truth_label"] = "Unknown"; res["truth_score"] = 0.0
296
+ try:
297
+ type_labels = ["News","Opinion","Satire","Rumor"]
298
+ r2 = pipe(text, type_labels, multi_label=False, truncation=True)
299
+ res["content_type"] = r2["labels"][0]
300
+ res["content_type_score"] = float(r2["scores"][0])
301
+ except Exception as e:
302
+ logger.warning("zero-shot content type error: %s", e)
303
+ res["content_type"] = "Unknown"; res["content_type_score"] = 0.0
304
+ else:
305
+ res = {"truth_label":"Unknown","truth_score":0.0,"content_type":"Unknown","content_type_score":0.0}
306
+ return res
 
 
 
 
 
 
 
 
307
 
308
+ # --- Gemini deep verification ---
309
+ def gemini_verify(claim: str, evidence: List[str], domain: Optional[str]) -> Dict[str,Any]:
310
+ client = get_gemini_client()
311
+ if not client:
312
+ return {"outcome": "api_missing", "explanation": "Gemini API key not set or client failed", "raw": None}
313
+ # structured prompt asking for JSON
314
+ prompt = (
315
+ "You are an expert fact-checker. Given the claim and evidence, output valid JSON with keys:\n"
316
+ "outcome (one of: True, False, Misleading, Unverifiable),\n"
317
+ "confidence (0-1),\n"
318
+ "explanation (short),\n"
319
+ "takeaways (list of 1-3 short tips),\n"
320
+ "sources (list of cited sources if any).\n\n"
321
+ f"Claim: {claim}\n\n"
322
+ f"Domain: {domain}\n\n"
323
+ "Evidence:\n" + ("\n".join(f"- {e}" for e in evidence)) + "\n\n"
324
+ "Provide only JSON in the response."
325
+ )
326
+ try:
327
+ resp = client.models.generate_content(model="gemini-2.5-flash", contents=prompt)
328
+ text = resp.text
329
+ # attempt to parse JSON substring
330
+ try:
331
+ parsed = json.loads(text)
332
+ return {"outcome":"ok", "result": parsed, "raw": text}
333
+ except Exception:
334
+ # try to find first { ... } substring
335
+ m = re.search(r'(\{.*\})', text, flags=re.S)
336
+ if m:
337
+ try:
338
+ parsed = json.loads(m.group(1))
339
+ return {"outcome":"ok", "result": parsed, "raw": text}
340
+ except Exception:
341
+ return {"outcome":"parse_error", "raw": text}
342
+ return {"outcome":"no_json", "raw": text}
343
+ except Exception as e:
344
+ logger.warning("gemini error: %s", e)
345
+ return {"outcome":"error", "error": str(e)}
346
+
347
+ # --- Fusion of signals into credibility score ---
348
+ def fuse_signals(truth_score: float, domain: Optional[str], evidence_count: int, gemini_outcome: Optional[Dict[str,Any]]) -> Dict[str,Any]:
349
+ # base from truth_score (0-1)
350
+ base = truth_score
351
+ # domain trust
352
+ domain_factor = 1.0
353
+ if domain:
354
+ if domain in TRUSTED_DOMAINS:
355
+ domain_factor += 0.2
356
+ elif domain in BLACKLISTED_DOMAINS:
357
+ domain_factor -= 0.4
358
+ else:
359
+ domain_factor += 0.0
360
+ # evidence factor (cap to 1)
361
+ evidence_factor = min(evidence_count / 5.0, 1.0)
362
+ # gemini adjustment
363
+ gemini_adj = 1.0
364
+ if gemini_outcome and gemini_outcome.get("result"):
365
+ res = gemini_outcome["result"]
366
+ out = res.get("outcome", "").lower()
367
+ conf = float(res.get("confidence", 0.5)) if isinstance(res.get("confidence", 0.5), (float,int,str)) else 0.5
368
+ if out in ("false","misleading"):
369
+ gemini_adj -= 0.25 * conf
370
+ elif out == "true":
371
+ gemini_adj += 0.1 * conf
372
+ elif out == "unverifiable":
373
+ gemini_adj -= 0.05 * conf
374
+ # combine
375
+ score = base * 0.5 + evidence_factor * 0.3 + (domain_factor - 1.0) * 0.2
376
+ score = score * gemini_adj
377
+ score = max(0.0, min(1.0, score))
378
+ pct = int(round(score * 100))
379
+ color = "green" if pct >= 70 else "yellow" if pct >= 40 else "red"
380
+ return {"score": pct, "color": color, "raw": score}
381
+
382
+ # --- Main pipeline: single mode (run everything) ---
383
+ def analyze_pipeline(article: Optional[str], url: Optional[str], image: Optional[Image.Image], claim_override: Optional[str], top_k_evidence: int = 5):
384
+ # 1) choose text source
385
+ source = None
386
+ text = ""
387
+ domain = None
388
+ if article and article.strip():
389
+ source = "article"
390
+ text = article.strip()
391
+ elif url and url.strip():
392
+ source = "url"
393
+ domain = extract_domain(url)
394
+ text = fetch_and_extract(url) or ""
395
+ elif image is not None:
396
+ source = "image"
397
+ text = ocr_image_to_text(image) or ""
398
+ else:
399
+ return {"error": "No input provided. Paste article text, or a URL, or upload image."}
400
+
401
+ # limit text
402
+ if len(text) > 4000:
403
+ text = text[:4000]
404
+
405
+ # claim to check: use explicit claim_override or try to use first sentence/headline
406
+ claim = claim_override.strip() if claim_override and claim_override.strip() else (re.split(r'(?<=[.!?]) +', text.strip())[0] if text else "")
407
+
408
+ # 2) quick zero-shot classification
409
+ zs = zero_shot_classify(text if len(claim) < 30 else claim) # run on claim if short, else on text
410
+ truth_label = zs.get("truth_label")
411
+ truth_score = zs.get("truth_score", 0.0)
412
+ content_type = zs.get("content_type")
413
+ content_type_score = zs.get("content_type_score", 0.0)
414
+
415
+ # 3) evidence: internal (from article) and external (news APIs)
416
+ internal_evidence = select_relevant_sentences(claim or text, text, top_k=top_k_evidence) if text else []
417
+ # external news queries: search using claim or summary
418
+ query = claim or (text[:200])
419
+ external_articles = gather_news_evidence(query, max_results=6)
420
+ # filter to credible domains
421
+ ext_filtered = []
422
+ for a in external_articles:
423
+ dom = extract_domain(a.get("url") or "")
424
+ a["domain"] = dom
425
+ if dom and dom in TRUSTED_DOMAINS:
426
+ ext_filtered.append(a)
427
+ # 4) fact-check API
428
+ fact = factcheck_claim(claim or text)
429
+
430
+ # 5) safe browsing + virustotal only if URL input provided
431
+ safe_browsing_res = check_safe_browsing(url) if url else {"status":"no_url"}
432
+ virustotal_res = check_virustotal(url) if url else {"status":"no_url"}
433
+
434
+ # 6) deep verify with Gemini (claim + internal+external evidence)
435
+ evidence_for_gemini = internal_evidence[:top_k_evidence] + [ (a.get("title") or "") + " - " + (a.get("snippet") or "") for a in ext_filtered[:top_k_evidence] ]
436
+ gemini_res = gemini_verify(claim or text, evidence_for_gemini, domain)
437
+
438
+ # 7) fuse signals
439
+ credibility = fuse_signals(truth_score, domain, len(internal_evidence) + len(ext_filtered), gemini_res)
440
+
441
+ # 8) build outputs & tips
442
+ tips = (
443
+ "- Check the source domain and author.\n"
444
+ "- Cross-check the claim with multiple trusted outlets.\n"
445
+ "- Look for official statements or peer-reviewed studies for scientific claims.\n"
446
+ "- Be skeptical of sensational language and images without context."
447
+ )
448
+
449
+ out = {
450
+ "source": source,
451
+ "domain": domain,
452
  "claim": claim,
453
+ "text_snippet": text[:800],
454
+ "quick_classification": {"truth_label": truth_label, "truth_score": truth_score, "content_type": content_type, "content_type_score": content_type_score},
455
+ "internal_evidence": internal_evidence,
456
+ "external_evidence": ext_filtered,
457
+ "factcheck": fact,
458
+ "safe_browsing": safe_browsing_res,
459
+ "virustotal": {"status": virustotal_res.get("status", "unknown"), "summary": (virustotal_res.get("info") or {}) if isinstance(virustotal_res, dict) else {}},
460
+ "gemini_verification": gemini_res,
461
+ "credibility": credibility,
462
+ "tips": tips
 
463
  }
464
+ return out
465
+
466
+ # --- Gradio UI ---
467
+ def pretty_output(result: Dict[str,Any]):
468
+ if not isinstance(result, dict):
469
+ return str(result), "", "", "", ""
470
+ if result.get("error"):
471
+ return result["error"], "", "", "", ""
472
+ # format sections
473
+ header = f"Credibility score: {result['credibility']['score']} ({result['credibility']['color']})"
474
+ quick = json.dumps(result.get("quick_classification", {}), indent=2)
475
+ evidence = ""
476
+ if result.get("internal_evidence"):
477
+ evidence += "Internal evidence (from article):\n" + "\n".join(f"- {s}" for s in result["internal_evidence"]) + "\n\n"
478
+ if result.get("external_evidence"):
479
+ evidence += "External corroborating articles:\n" + "\n".join(f"- {a.get('title')} ({a.get('domain')}) — {a.get('url')}" for a in result["external_evidence"]) + "\n\n"
480
+ fact = json.dumps(result.get("factcheck", {}), indent=2)
481
+ gemini = result.get("gemini_verification", {})
482
+ gemini_text = json.dumps(gemini, indent=2) if gemini else ""
483
+ tips = result.get("tips", "")
484
+ return header, quick, evidence, fact, gemini_text + "\n\n" + tips
485
+
486
+ with gr.Blocks() as demo:
487
+ gr.Markdown("# 🛡️ Unified Misinformation Detector (single mode)")
488
+ gr.Markdown("Provide either Article text (preferred), or a URL, or upload an image (screenshot). Optionally add a short claim to check.")
489
+
490
+ with gr.Row():
491
+ article_in = gr.Textbox(lines=6, label="Paste Article Text (preferred)")
492
+ url_in = gr.Textbox(label="Article URL")
493
+ image_in = gr.Image(type="pil", label="Upload Image (screenshot)")
494
+
495
+ claim_in = gr.Textbox(lines=1, label="Optional short claim (override automatic claim extraction)")
496
+ topk = gr.Slider(1, 8, value=5, step=1, label="Top-K evidence sentences")
497
+
498
+ run_btn = gr.Button("Run Full Pipeline")
499
+ out_header = gr.Textbox(label="Summary", interactive=False)
500
+ out_quick = gr.Code(label="Quick classification (truth + content type)")
501
+ out_evidence = gr.Textbox(label="Evidence & External articles", lines=12)
502
+ out_factcheck = gr.Code(label="Fact-check API result")
503
+ out_gemini = gr.Code(label="Gemini result + Tips")
504
+
505
+ def run(article, url, image, claim_override, top_k):
506
+ res = analyze_pipeline(article, url, image, claim_override, top_k_evidence=int(top_k))
507
+ return pretty_output(res)
508
+
509
+ run_btn.click(run, inputs=[article_in, url_in, image_in, claim_in, topk], outputs=[out_header, out_quick, out_evidence, out_factcheck, out_gemini])
510
 
511
  if __name__ == "__main__":
512
+ demo.launch()