verifai-api / app.py
Alex-Knight's picture
Upload 4 files
f996a72 verified
"""
๐Ÿ” VerifAI - Unified Fake News Detection API
=============================================
Combines Text and Image detection in a single API
Uses: Pre-trained BERT, Gemini Vision, Groq LLM, Web Verification
Enhanced with caching and smart API key management
"""
import re
import os
import io
import base64
import torch
import numpy as np
import hashlib
import time
import requests
from functools import lru_cache
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Dict, Optional, Tuple
from PIL import Image
from fastapi import FastAPI, HTTPException, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from duckduckgo_search import DDGS
import google.generativeai as genai
# OCR
import easyocr
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification
)
# =====================================================
# CONFIG
# =====================================================
# Pre-trained fake news detection model from HuggingFace
FAKE_NEWS_MODEL = "jy46604790/Fake-News-BERT-Detect"
# =====================================================
# GROQ API CONFIGURATION (Fallback when Gemini fails)
# =====================================================
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "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")
GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions"
GROQ_MODEL = "llama-3.1-70b-versatile" # Free tier, fast inference
# =====================================================
# GEMINI API CONFIGURATION WITH SMART ROTATION
# =====================================================
GEMINI_API_KEYS = [
"AIzaSyAk5mCVfXd8-kt1Sz5xlKsZUVW1RVHk-AA", # Key 1
os.getenv("GEMINI_API_KEY", "AIzaSyDOY4Dh0GfVz9o0JGy_kqYrZ8vDxE0qm8g"), # Key 2
"AIzaSyBP-2Or59qhzMqLE-Qcxz4TcYQdqBvWR7M", # Key 3
"AIzaSyAC2KZguXcshRw-M-W-6RlPiL8xQg6VGAg", # Key 4
"AIzaSyCsQ49bpqjxCw0TBprKYzONWytC9K87tJ8", # Key 5
"AIzaSyCd1-Jh3qUumOs2IPXT_-lxFLzzAaw9fTE", # Key 6
]
# =====================================================
# SMART API KEY MANAGEMENT
# =====================================================
@dataclass
class KeyStatus:
"""Track usage and health of each API key"""
requests_count: int = 0
last_used: datetime = field(default_factory=datetime.now)
is_rate_limited: bool = False
cooldown_until: Optional[datetime] = None
errors_count: int = 0
# Initialize key status tracking
key_statuses: Dict[int, KeyStatus] = {i: KeyStatus() for i in range(len(GEMINI_API_KEYS))}
request_counter = 0 # For round-robin distribution
def get_best_key_index() -> int:
"""
Smart key selection using round-robin with health checks.
Skips rate-limited keys and distributes load evenly.
"""
global request_counter
for _ in range(len(GEMINI_API_KEYS)):
idx = request_counter % len(GEMINI_API_KEYS)
request_counter += 1
status = key_statuses[idx]
# Skip if in cooldown
if status.cooldown_until and datetime.now() < status.cooldown_until:
continue
# Reset cooldown if expired
if status.cooldown_until and datetime.now() >= status.cooldown_until:
status.is_rate_limited = False
status.cooldown_until = None
status.errors_count = 0
# Skip keys with too many errors
if status.errors_count >= 3:
continue
return idx
# If all keys are limited, reset and use first available
for idx in range(len(GEMINI_API_KEYS)):
key_statuses[idx].is_rate_limited = False
key_statuses[idx].cooldown_until = None
key_statuses[idx].errors_count = 0
return 0
def mark_key_success(idx: int):
"""Mark a key as successfully used"""
status = key_statuses[idx]
status.requests_count += 1
status.last_used = datetime.now()
status.errors_count = max(0, status.errors_count - 1) # Reduce error count on success
def mark_key_failure(idx: int, is_rate_limit: bool = False):
"""Mark a key as failed, optionally with rate limit cooldown"""
status = key_statuses[idx]
status.errors_count += 1
if is_rate_limit:
status.is_rate_limited = True
# 60 second cooldown for rate limits
status.cooldown_until = datetime.now() + timedelta(seconds=60)
print(f"โธ๏ธ Key #{idx + 1} rate limited, cooldown until {status.cooldown_until}")
def get_gemini_model():
"""Get a Gemini model with smart key selection"""
idx = get_best_key_index()
api_key = GEMINI_API_KEYS[idx]
try:
genai.configure(api_key=api_key)
model = genai.GenerativeModel("gemini-2.0-flash") # Upgraded to 2.0
key_preview = api_key[:15] + "..." + api_key[-4:]
print(f"โœ… Using Gemini key #{idx + 1}: {key_preview}")
return model, idx
except Exception as e:
print(f"โŒ Key #{idx + 1} config failed: {str(e)[:50]}")
mark_key_failure(idx)
raise
# Legacy function for compatibility
def rotate_api_key():
"""Legacy function - now handled by smart key management"""
global request_counter
request_counter += 1
# =====================================================
# LLM RESPONSE CACHING
# =====================================================
# In-memory cache for LLM responses (reduces API calls by ~40-60%)
llm_cache: Dict[str, Tuple[float, float]] = {} # hash -> (score, timestamp)
CACHE_TTL = 3600 # 1 hour cache validity
def get_text_hash(text: str) -> str:
"""Generate hash for text content"""
normalized = text.lower().strip()[:500] # Normalize and limit length
return hashlib.md5(normalized.encode()).hexdigest()
def get_cached_score(text_hash: str) -> Optional[float]:
"""Get cached LLM score if valid"""
if text_hash in llm_cache:
score, timestamp = llm_cache[text_hash]
if time.time() - timestamp < CACHE_TTL:
print(f"๐Ÿ“ฆ Cache hit for {text_hash[:8]}...")
return score
else:
del llm_cache[text_hash] # Expired
return None
def cache_score(text_hash: str, score: float):
"""Cache LLM score with timestamp"""
llm_cache[text_hash] = (score, time.time())
# Limit cache size
if len(llm_cache) > 1000:
oldest = min(llm_cache.items(), key=lambda x: x[1][1])
del llm_cache[oldest[0]]
# =====================================================
# GROQ FALLBACK LLM
# =====================================================
def groq_llm_score(text: str) -> float:
"""
Use Groq's free Llama 3.1 70B as fallback when all Gemini keys fail.
Groq offers very fast inference with generous free tier.
"""
try:
prompt = f"""You are a fact-checking AI. Evaluate if this news claim is factually plausible.
Respond ONLY with a number between 0 and 1:
- 0.0-0.3: Clearly false, impossible, or conspiracy theory
- 0.4-0.6: Uncertain, needs verification
- 0.7-1.0: Plausible, sounds like real news
Claim: {text[:1000]}
Your response (just the number):"""
response = requests.post(
GROQ_API_URL,
headers={
"Authorization": f"Bearer {GROQ_API_KEY}",
"Content-Type": "application/json"
},
json={
"model": GROQ_MODEL,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0,
"max_tokens": 10
},
timeout=30
)
if response.status_code == 200:
result = response.json()
text_response = result["choices"][0]["message"]["content"].strip()
match = re.search(r"([0-9]+\.?[0-9]*)", text_response)
if match:
score = float(match.group(1))
print(f"๐Ÿฆ™ Groq Llama 3.1 score: {score}")
return min(max(score, 0), 1)
else:
print(f"โš ๏ธ Groq API error: {response.status_code}")
except Exception as e:
print(f"[GROQ ERROR]: {e}")
return 0.5 # Neutral fallback
# Initialize with the first working API key
try:
gemini_model, _init_idx = get_gemini_model()
print(f"โœ… Gemini API configured with {len(GEMINI_API_KEYS)} keys (smart rotation enabled)")
except:
gemini_model = None
print("โš ๏ธ Gemini initialization failed")
print(f"โœ… Groq API configured as fallback (model: {GROQ_MODEL})")
# Extended list of trusted news sources - GLOBAL COVERAGE
TRUSTED_DOMAINS = [
# === INTERNATIONAL / WIRE SERVICES ===
"reuters.com", "apnews.com", "afp.com", "upi.com",
# === UNITED STATES ===
"nytimes.com", "washingtonpost.com", "wsj.com", "usatoday.com",
"cnn.com", "nbcnews.com", "cbsnews.com", "abcnews.go.com", "foxnews.com",
"npr.org", "pbs.org", "politico.com", "thehill.com", "axios.com",
"bloomberg.com", "forbes.com", "businessinsider.com", "cnbc.com",
# === UNITED KINGDOM ===
"bbc.com", "bbc.co.uk", "theguardian.com", "telegraph.co.uk",
"independent.co.uk", "thetimes.co.uk", "ft.com", "economist.com",
"dailymail.co.uk", "mirror.co.uk", "metro.co.uk", "sky.com",
# === EUROPE ===
"dw.com", "spiegel.de", "zeit.de", # Germany
"france24.com", "lemonde.fr", "lefigaro.fr", # France
"elpais.com", "elmundo.es", # Spain
"corriere.it", "repubblica.it", # Italy
"nos.nl", "dutchnews.nl", # Netherlands
"euronews.com", "politico.eu", # EU-wide
# === MIDDLE EAST ===
"aljazeera.com", "aljazeera.net",
"arabnews.com", "gulfnews.com", "thenationalnews.com",
"timesofisrael.com", "haaretz.com", "jpost.com",
# === ASIA ===
# India
"ndtv.com", "thehindu.com", "hindustantimes.com", "indianexpress.com",
"timesofindia.indiatimes.com", "indiatimes.com", "news18.com",
"firstpost.com", "livemint.com", "theprint.in", "scroll.in",
# China/Hong Kong
"scmp.com", "globaltimes.cn", "chinadaily.com.cn",
# Japan
"japantimes.co.jp", "nhk.or.jp", "asahi.com",
# South Korea
"koreaherald.com", "koreatimes.co.kr",
# Southeast Asia
"straitstimes.com", "channelnewsasia.com", # Singapore
"bangkokpost.com", # Thailand
"thejakartapost.com", # Indonesia
"philstar.com", "gmanetwork.com", # Philippines
# === AFRICA ===
"news24.com", "mg.co.za", "dailymaverick.co.za", # South Africa
"nation.africa", "theeastafrican.co.ke", # East Africa
"allafrica.com", # Pan-African
# === LATIN AMERICA ===
"bbc.com/mundo", "elpais.com/america",
"folha.uol.com.br", "g1.globo.com", # Brazil
"infobae.com", "clarin.com", # Argentina
"eluniversal.com.mx", "milenio.com", # Mexico
# === OCEANIA ===
"abc.net.au", "sbs.com.au", "smh.com.au", "theaustralian.com.au",
"nzherald.co.nz", "stuff.co.nz", # New Zealand
# === TECH NEWS ===
"techcrunch.com", "theverge.com", "wired.com", "arstechnica.com",
"cnet.com", "zdnet.com", "engadget.com", "gizmodo.com", "mashable.com",
# === BUSINESS/FINANCE ===
"reuters.com/business", "marketwatch.com", "yahoo.com/finance",
"morningstar.com", "investopedia.com",
# === SCIENCE/HEALTH ===
"scientificamerican.com", "nature.com", "newscientist.com",
"sciencemag.org", "medscape.com", "webmd.com", "who.int",
# === GOVERNMENT SOURCES ===
"gov.in", "gov.uk", "usa.gov", "europa.eu", "un.org",
# === ENTERTAINMENT ===
"variety.com", "hollywoodreporter.com", "deadline.com", "ew.com",
# === SPORTS ===
"espn.com", "sports.yahoo.com", "bleacherreport.com", "skysports.com",
# === MICROSOFT/MSN ===
"msn.com", "microsoft.com"
]
# FastAPI app
app = FastAPI(
title="VerifAI - Fake News Detection API",
description="Multi-model fake news detection for text and images using pre-trained transformers",
version="3.0.0"
)
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Request models
class TextRequest(BaseModel):
text: str
class ImageRequest(BaseModel):
image: str # Base64 encoded image
# =====================================================
# LOAD MODELS AT STARTUP
# =====================================================
print("๐Ÿš€ Loading EasyOCR...")
ocr_reader = easyocr.Reader(['en'], gpu=torch.cuda.is_available())
print(f"๐Ÿง  Loading Fake News BERT model: {FAKE_NEWS_MODEL}")
print(" (First run downloads ~440MB from HuggingFace)")
try:
bert_tokenizer = AutoTokenizer.from_pretrained(FAKE_NEWS_MODEL)
bert_model = AutoModelForSequenceClassification.from_pretrained(FAKE_NEWS_MODEL)
bert_model.eval()
print("โœ… Fake News BERT model loaded successfully!")
except Exception as e:
print(f"โŒ Error loading model: {e}")
bert_tokenizer = None
bert_model = None
# =====================================================
# HELPERS
# =====================================================
def clean_text(text):
text = text.lower()
return re.sub(r"[^a-z ]+", " ", text)
def is_factual_claim(text):
"""
Determine if text is a factual claim worth analyzing.
VERY LENIENT - we want to analyze most news-like text.
"""
text_lower = text.lower()
# Comprehensive list of news-related keywords
keywords = [
# Verbs - actions and states
"is", "was", "are", "were", "has", "have", "had", "been", "being",
"announced", "confirmed", "became", "says", "said", "told", "stated",
"won", "lost", "launched", "approved", "reported", "according", "claims",
"died", "killed", "arrested", "elected", "appointed", "resigned", "fired",
"signed", "passed", "rejected", "voted", "declared", "ordered", "banned",
"attacked", "invaded", "bombed", "hacked", "hijacked", "seized", "captured",
"discovered", "revealed", "leaked", "exposed", "investigated", "charged",
"inaugurated", "sworn", "impeached", "convicted", "acquitted", "sentenced",
# News terms
"breaking", "urgent", "exclusive", "official", "sources", "update",
"developing", "just in", "alert", "live", "report", "news",
# Quantities and stats
"million", "billion", "trillion", "percent", "number", "rate", "record",
# Entities - politics
"president", "minister", "prime", "government", "congress", "parliament",
"senate", "court", "supreme", "federal", "state", "national", "election",
"trump", "biden", "obama", "putin", "modi", "xi", "zelensky", "netanyahu",
# Entities - places
"country", "nation", "city", "usa", "america", "china", "russia", "india",
"ukraine", "israel", "iran", "gaza", "palestine", "europe", "asia",
# Time references
"today", "yesterday", "tomorrow", "january", "february", "march", "april",
"may", "june", "july", "august", "september", "october", "november", "december",
"2024", "2025", "2026",
# Events
"attack", "war", "protest", "strike", "earthquake", "hurricane", "flood",
"crash", "explosion", "fire", "shooting", "pandemic", "outbreak"
]
# Check for any keyword match
if any(k in text_lower for k in keywords):
return True
# Also accept any text that's sufficiently long and looks like news
# (at least 20 characters and 4 words)
words = text.split()
if len(text) >= 20 and len(words) >= 4:
return True
return False
def decode_base64_image(base64_string):
"""Decode base64 string to PIL Image"""
if "," in base64_string:
base64_string = base64_string.split(",")[1]
image_data = base64.b64decode(base64_string)
return Image.open(io.BytesIO(image_data))
# =====================================================
# OCR - TEXT EXTRACTION
# =====================================================
def extract_text_from_image(image):
"""Extract text from image using EasyOCR"""
try:
if isinstance(image, Image.Image):
img_array = np.array(image)
results = ocr_reader.readtext(img_array)
else:
results = ocr_reader.readtext(image)
extracted_text = " ".join([result[1] for result in results])
return extracted_text.strip()
except Exception as e:
print(f"[OCR ERROR]: {e}")
return ""
# =====================================================
# SCORING FUNCTIONS
# =====================================================
def bert_fake_news_score(text):
"""
Use pre-trained Fake News BERT model for detection.
Returns probability that the text is REAL news (0-1).
"""
if not bert_tokenizer or not bert_model:
return 0.5 # Fallback if model not loaded
try:
inputs = bert_tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=512,
padding=True
)
with torch.no_grad():
outputs = bert_model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=1)
# Model outputs: [FAKE, REAL] probabilities
# Return REAL probability (index 1)
real_prob = probs[0][1].item()
return real_prob
except Exception as e:
print(f"[BERT ERROR]: {e}")
return 0.5
def llm_score(text):
"""
Use Gemini to evaluate factual plausibility with caching and smart key rotation.
- First checks cache for repeated queries
- Then tries Gemini with smart key rotation
"""
# Check cache first
text_hash = get_text_hash(text)
cached = get_cached_score(text_hash)
if cached is not None:
return cached
max_retries = 3
current_key_idx = None
for attempt in range(max_retries):
try:
prompt = f"""You are a fact-checking AI assistant. Evaluate whether this news claim is factually plausible.
Consider:
1. Does it describe real events, people, or places?
2. Is the claim physically/logically possible?
3. Does it sound like legitimate journalism?
News claims about protests, deaths, government actions, economic data etc. from reputable sources are usually TRUE.
Sensational claims with no specifics, miracle cures, conspiracy theories are usually FALSE.
Respond ONLY with a number between 0 and 1:
- 0.0-0.3: Clearly false, impossible, or conspiracy theory
- 0.4-0.6: Uncertain, needs verification
- 0.7-1.0: Plausible, sounds like real news
Claim to evaluate:
{text[:1500]}
Your response (just the number):"""
# Get model with smart key selection
model, current_key_idx = get_gemini_model()
response = model.generate_content(
prompt,
generation_config={"temperature": 0}
)
text_response = response.text.strip()
match = re.search(r"([0-9]+\.?[0-9]*)", text_response)
if match:
score = float(match.group(1))
score = min(max(score, 0), 1)
# Mark success and cache result
mark_key_success(current_key_idx)
cache_score(text_hash, score)
print(f"โœจ Gemini LLM score: {score}")
return score
return 0.6
except Exception as e:
error_str = str(e).lower()
print(f"[LLM ERROR on attempt {attempt + 1}]: {e}")
# Mark failure with rate limit detection
if current_key_idx is not None:
is_rate_limit = "429" in str(e) or "quota" in error_str or "rate" in error_str
mark_key_failure(current_key_idx, is_rate_limit)
if attempt < max_retries - 1:
continue
# ========== GROQ FALLBACK ==========
print("๐Ÿ”„ All Gemini keys exhausted, falling back to Groq Llama 3.1...")
groq_result = groq_llm_score(text)
cache_score(text_hash, groq_result)
return groq_result
def llm_image_analysis(image):
"""
Use Gemini Vision to analyze image for manipulation with smart key rotation.
Returns (manipulation_score, concerns)
"""
max_retries = 3
current_key_idx = None
for attempt in range(max_retries):
try:
prompt = """Analyze this image for signs of misinformation or manipulation.
Check for:
1. Sensationalist text overlays or headlines
2. Misleading statistics or false claims
3. Signs of digital manipulation (artifacts, inconsistent lighting, unnatural elements)
4. Fake news visual patterns (low quality, watermarks from unreliable sources)
5. Out-of-context imagery
Be FAIR in your analysis. News screenshots from legitimate sources should score LOW (genuine).
Only score HIGH if there are clear signs of manipulation or fake content.
Respond ONLY as:
MANIPULATION_SCORE: <0 to 1, where 0 is completely genuine and 1 is definitely fake/manipulated>
CONCERNS: <specific concerns found, or "None detected">"""
# Get model with smart key selection
model, current_key_idx = get_gemini_model()
response = model.generate_content([prompt, image])
match = re.search(r"MANIPULATION_SCORE:\s*([0-9.]+)", response.text)
manipulation_score = float(match.group(1)) if match else 0.3
concerns_match = re.search(r"CONCERNS:\s*(.+)", response.text, re.IGNORECASE | re.DOTALL)
concerns = concerns_match.group(1).strip().split('\n')[0] if concerns_match else "Unable to analyze"
# Mark success
mark_key_success(current_key_idx)
print(f"๐Ÿ–ผ๏ธ Image analysis complete, manipulation score: {manipulation_score}")
return manipulation_score, concerns
except Exception as e:
error_str = str(e).lower()
print(f"[IMAGE ANALYSIS ERROR on attempt {attempt + 1}]: {e}")
# Mark failure with rate limit detection
if current_key_idx is not None:
is_rate_limit = "429" in str(e) or "quota" in error_str or "rate" in error_str
mark_key_failure(current_key_idx, is_rate_limit)
if attempt < max_retries - 1:
continue
else:
print("[IMAGE ANALYSIS ERROR]: All retry attempts failed")
return 0.3, "Analysis failed - please try again"
def web_search_with_sources(text):
"""
Search web for news verification using MULTIPLE FREE SOURCES:
1. Google News RSS (free, unlimited, returns major news outlets)
2. DuckDuckGo News search (prioritizes news sites)
3. Regular DuckDuckGo as fallback
Returns: (score, list of source dictionaries)
"""
from urllib.parse import urlparse, quote
import xml.etree.ElementTree as ET
def extract_domain(url):
"""Extract clean domain from URL for deduplication"""
try:
parsed = urlparse(url)
domain = parsed.netloc.lower().replace('www.', '')
parts = domain.split('.')
if len(parts) >= 2:
return '.'.join(parts[-2:])
return domain
except:
return url.lower()
# Extract key terms for search
words = text.split()
stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being',
'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could',
'should', 'may', 'might', 'can', 'to', 'of', 'in', 'for', 'on', 'with',
'at', 'by', 'from', 'as', 'into', 'that', 'this', 'these', 'those',
'and', 'or', 'but', 'if', 'then', 'else', 'when', 'up', 'down', 'out',
'so', 'just', 'also', 'only', 'very', 'too', 'now', 'here', 'there'}
keywords = [w for w in words if w.lower() not in stop_words][:10]
query = " ".join(keywords)
print(f"[WEB SEARCH] Query: {query}")
all_results = []
seen_domains = set()
# ===== SOURCE 1: Google News RSS (FREE, UNLIMITED) =====
try:
google_news_url = f"https://news.google.com/rss/search?q={quote(query)}&hl=en-US&gl=US&ceid=US:en"
response = requests.get(google_news_url, timeout=10, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
if response.status_code == 200:
root = ET.fromstring(response.content)
for item in root.findall('.//item')[:15]:
title = item.find('title')
link = item.find('link')
source = item.find('source')
if title is not None and link is not None:
# Google News links often redirect, but source tag has real domain
real_url = link.text if link.text else ""
source_name = source.text if source is not None else ""
all_results.append({
"title": title.text[:120] if title.text else "News",
"href": real_url,
"body": f"Source: {source_name}",
"source": "google_news"
})
print(f"[GOOGLE NEWS] Found {len(all_results)} articles")
except Exception as e:
print(f"[GOOGLE NEWS ERROR]: {e}")
# ===== SOURCE 2: DuckDuckGo NEWS search (prioritizes news sites) =====
try:
with DDGS() as ddgs:
news_results = list(ddgs.news(query, max_results=20))
for r in news_results:
all_results.append({
"title": r.get("title", "")[:120],
"href": r.get("url", ""),
"body": r.get("body", "")[:150],
"source": "ddg_news"
})
print(f"[DDG NEWS] Found {len(news_results)} articles")
except Exception as e:
print(f"[DDG NEWS ERROR]: {e}")
# ===== SOURCE 3: DuckDuckGo regular search as fallback =====
if len(all_results) < 5:
try:
with DDGS() as ddgs:
text_results = list(ddgs.text(query + " news", max_results=15))
for r in text_results:
all_results.append({
"title": r.get("title", "")[:120],
"href": r.get("href", ""),
"body": r.get("body", "")[:150],
"source": "ddg_text"
})
print(f"[DDG TEXT] Found {len(text_results)} results")
except Exception as e:
print(f"[DDG TEXT ERROR]: {e}")
print(f"[WEB SEARCH TOTAL] {len(all_results)} results from all sources")
# Process and categorize results
trusted_hits = 0
trusted_sources = []
other_sources = []
for r in all_results:
url = r.get("href", "")
title = r.get("title", "")
body = r.get("body", "")
if not url:
continue
domain = extract_domain(url)
# Skip duplicates
if domain in seen_domains:
continue
is_trusted = any(d in url.lower() for d in TRUSTED_DOMAINS)
source_entry = {
"title": title if title else "News Article",
"url": url,
"snippet": body if body else "",
"trusted": is_trusted,
"domain": domain
}
if is_trusted:
trusted_sources.append(source_entry)
seen_domains.add(domain)
trusted_hits += 1
else:
other_sources.append(source_entry)
seen_domains.add(domain)
# Combine: trusted first, then others
sources = trusted_sources[:5]
for src in other_sources:
if len(sources) >= 8:
break
if not any(s.get('domain') == src.get('domain') for s in sources):
sources.append(src)
# Calculate score based on trusted hits
if trusted_hits >= 3:
score = 1.0
elif trusted_hits >= 2:
score = 0.8
elif trusted_hits >= 1:
score = 0.5
elif len(sources) > 0:
score = 0.2
else:
score = 0.0
# Log results
final_domains = [s.get('domain', 'unknown') for s in sources]
print(f"[WEB SEARCH] Trusted: {trusted_hits}, Total sources: {len(sources)}, Score: {score}")
print(f"[WEB SEARCH] Domains: {', '.join(final_domains[:5])}...")
return score, sources[:8]
# =====================================================
# TEXT ANALYSIS
# =====================================================
def analyze_text(text):
"""Analyze text for fake news with improved accuracy"""
if not is_factual_claim(text):
return {
"credibility": 50,
"verdict": "NOT A FACTUAL CLAIM",
"bert_score": 0,
"llm_score": 0,
"web_score": 0,
"sources": []
}
bert = bert_fake_news_score(text)
llm = llm_score(text)
web, sources = web_search_with_sources(text)
# IMPROVED Weighted fusion - more balanced approach
# Finding trusted sources is strong evidence of legitimacy
# BERT can be unreliable for certain news types
#
# New weights:
# - Web: 40% (finding trusted sources is strong signal)
# - LLM: 35% (contextual understanding)
# - BERT: 25% (can be unreliable, use as secondary signal)
final = (0.25 * bert) + (0.35 * llm) + (0.40 * web)
# Significant boost if we found trusted sources
if len([s for s in sources if s.get('trusted')]) >= 2:
final = min(final + 0.25, 1.0) # Big boost for multiple trusted sources
elif len([s for s in sources if s.get('trusted')]) >= 1:
final = min(final + 0.15, 1.0) # Moderate boost for one trusted source
elif web > 0:
final = min(final + 0.05, 1.0) # Small boost for any web results
credibility = round(final * 100, 1)
# IMPROVED Verdict logic - give benefit of doubt when sources found
trusted_count = len([s for s in sources if s.get('trusted')])
if trusted_count >= 2:
# Multiple trusted sources = highly credible
verdict = "VERIFIED"
credibility = max(credibility, 75)
elif trusted_count >= 1 and llm >= 0.5:
verdict = "LIKELY REAL"
credibility = max(credibility, 65)
elif trusted_count >= 1:
verdict = "LIKELY REAL"
credibility = max(credibility, 55)
elif bert < 0.2 and llm < 0.3 and web == 0:
# Only flag as fake if ALL signals are negative
verdict = "LIKELY FAKE"
credibility = min(credibility, 30)
elif credibility >= 50:
verdict = "UNCERTAIN - Verify Manually"
else:
verdict = "UNCERTAIN"
return {
"credibility": credibility,
"verdict": verdict,
"bert_score": round(bert, 3),
"llm_score": round(llm, 3),
"web_score": round(web, 3),
"sources": sources
}
# =====================================================
# IMAGE ANALYSIS
# =====================================================
def analyze_image(image):
"""Complete image analysis pipeline"""
extracted_text = extract_text_from_image(image)
manipulation_score, concerns = llm_image_analysis(image)
if not extracted_text:
credibility = int((1 - manipulation_score) * 100)
if manipulation_score > 0.7:
verdict = "SUSPICIOUS"
alert = "โš ๏ธ Image shows signs of manipulation"
elif manipulation_score > 0.4:
verdict = "UNVERIFIABLE"
alert = "โš ๏ธ Unable to verify - no text detected"
else:
verdict = "NO TEXT DETECTED"
alert = "โ„น๏ธ No readable text found in image"
return {
"credibility": credibility,
"verdict": verdict,
"alert": alert,
"extracted_text": "",
"bert_score": 0,
"llm_score": 0,
"web_score": 0,
"image_manipulation_score": round(manipulation_score, 3),
"concerns": concerns,
"sources": []
}
if not is_factual_claim(extracted_text):
credibility = int((1 - manipulation_score) * 100)
return {
"credibility": credibility,
"verdict": "NOT A FACTUAL CLAIM",
"alert": "โ„น๏ธ Image does not contain verifiable claims",
"extracted_text": extracted_text,
"bert_score": 0,
"llm_score": 0,
"web_score": 0,
"image_manipulation_score": round(manipulation_score, 3),
"concerns": concerns,
"sources": []
}
# Full analysis
bert = bert_fake_news_score(extracted_text)
llm = llm_score(extracted_text)
web, sources = web_search_with_sources(extracted_text)
# Combined credibility (text + image)
text_cred = (0.40 * bert) + (0.35 * llm) + (0.25 * web)
image_cred = 1 - manipulation_score
final_cred = (0.80 * text_cred) + (0.20 * image_cred)
credibility = int(final_cred * 100)
# Determine verdict
if bert < 0.3 or manipulation_score > 0.7:
verdict = "FAKE"
alert = "๐Ÿšจ FAKE NEWS ALERT - Content appears fabricated!"
credibility = min(credibility, 20)
elif manipulation_score > 0.5:
verdict = "SUSPICIOUS"
alert = "โš ๏ธ CAUTION - Image shows signs of manipulation"
credibility = min(credibility, 45)
elif web >= 0.5 and bert >= 0.6:
verdict = "VERIFIED"
alert = "โœ… VERIFIED - Content found in trusted sources"
credibility = max(credibility, 80)
elif bert >= 0.5:
verdict = "LIKELY REAL"
alert = "โœ… Content appears credible"
else:
verdict = "UNVERIFIABLE"
alert = "โš ๏ธ Unable to verify - Exercise caution"
return {
"credibility": credibility,
"verdict": verdict,
"alert": alert,
"extracted_text": extracted_text,
"bert_score": round(bert, 3),
"llm_score": round(llm, 3),
"web_score": round(web, 3),
"image_manipulation_score": round(manipulation_score, 3),
"concerns": concerns,
"sources": sources
}
# =====================================================
# API ENDPOINTS
# =====================================================
@app.get("/")
def health_check():
"""Health check with API status"""
# Calculate key health status
healthy_keys = sum(1 for s in key_statuses.values() if not s.is_rate_limited and s.errors_count < 3)
return {
"status": "healthy",
"service": "VerifAI Fake News Detection",
"version": "4.0.0", # Updated version
"model": FAKE_NEWS_MODEL,
"features": [
"Text Detection", "Image Detection", "OCR", "BERT",
"Gemini Vision", "Groq Fallback", "Response Caching", "Source Links"
],
"api_status": {
"gemini_keys": len(GEMINI_API_KEYS),
"healthy_keys": healthy_keys,
"groq_fallback": "enabled",
"groq_model": GROQ_MODEL
},
"cache_stats": {
"cached_items": len(llm_cache),
"cache_ttl_hours": CACHE_TTL / 3600
}
}
@app.get("/api-status")
def api_status():
"""Detailed API key status for monitoring"""
keys_info = []
for idx, status in key_statuses.items():
key = GEMINI_API_KEYS[idx]
keys_info.append({
"key_number": idx + 1,
"key_preview": f"{key[:10]}...{key[-4:]}",
"requests_count": status.requests_count,
"is_rate_limited": status.is_rate_limited,
"errors_count": status.errors_count,
"cooldown_until": str(status.cooldown_until) if status.cooldown_until else None
})
return {
"gemini_keys": keys_info,
"groq": {
"enabled": True,
"model": GROQ_MODEL,
"status": "ready"
},
"cache": {
"total_cached": len(llm_cache),
"ttl_seconds": CACHE_TTL
}
}
@app.post("/check")
def check_text(request: TextRequest):
"""Text-based fake news detection"""
if not request.text.strip():
raise HTTPException(status_code=400, detail="Text is required")
return analyze_text(request.text)
@app.post("/check-image")
async def check_image(request: ImageRequest):
"""Image-based fake news detection (base64 input)"""
try:
image = decode_base64_image(request.image)
return analyze_image(image)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid image: {str(e)}")
@app.post("/check-image-upload")
async def check_image_upload(image: UploadFile = File(...)):
"""Image-based fake news detection (file upload)"""
try:
contents = await image.read()
pil_image = Image.open(io.BytesIO(contents))
return analyze_image(pil_image)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid image: {str(e)}")
@app.post("/extract-text")
async def extract_text(request: ImageRequest):
"""Extract text from image using OCR (base64 input)"""
try:
image = decode_base64_image(request.image)
extracted_text = extract_text_from_image(image)
if not extracted_text:
return {
"success": False,
"extracted_text": "",
"message": "No text detected in the image"
}
return {
"success": True,
"extracted_text": extracted_text,
"message": f"Successfully extracted {len(extracted_text.split())} words"
}
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error extracting text: {str(e)}")
@app.get("/trending-news")
async def get_trending_news():
"""
Fetch top trending news across multiple categories.
Returns 10-15 news items from various topics.
"""
categories = [
{"name": "World", "query": "world news today", "icon": "๐ŸŒ"},
{"name": "Politics", "query": "political news today", "icon": "๐Ÿ›๏ธ"},
{"name": "Tech", "query": "technology news today", "icon": "๐Ÿ’ป"},
{"name": "Gaming", "query": "gaming news today", "icon": "๐ŸŽฎ"},
{"name": "Science", "query": "science discovery today", "icon": "๐Ÿ”ฌ"},
{"name": "Health", "query": "health medical news today", "icon": "๐Ÿฅ"},
{"name": "Sports", "query": "sports news today", "icon": "โšฝ"},
{"name": "Business", "query": "business economy news today", "icon": "๐Ÿ“ˆ"},
{"name": "Entertainment", "query": "entertainment celebrity news today", "icon": "๐ŸŽฌ"},
]
all_news = []
for category in categories:
try:
with DDGS() as ddgs:
# Get more news for this category with today's date emphasis
results = list(ddgs.news(category["query"], max_results=5))
for r in results:
# Parse the date and format it nicely
raw_date = r.get("date", "")
formatted_date = raw_date
# Try to format the date if it exists
try:
from datetime import datetime
if raw_date:
# DuckDuckGo returns date in various formats
dt = datetime.fromisoformat(raw_date.replace('Z', '+00:00'))
formatted_date = dt.strftime("%b %d, %Y โ€ข %I:%M %p")
except:
formatted_date = raw_date if raw_date else "Recent"
# Get image URL with fallback
image_url = r.get("image", "")
news_item = {
"title": r.get("title", "")[:150],
"url": r.get("url", ""),
"source": r.get("source", "Unknown"),
"date": formatted_date,
"raw_date": raw_date, # Include raw date for sorting
"body": r.get("body", "")[:200] if r.get("body") else "",
"image": image_url,
"category": category["name"],
"icon": category["icon"]
}
all_news.append(news_item)
except Exception as e:
print(f"[TRENDING ERROR] {category['name']}: {e}")
continue
# Sort by date (most recent first) - don't shuffle to keep fresh news on top
# Keep within each category for better organization
from datetime import datetime
import random
# Add timestamp for cache-busting
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return {
"success": True,
"news": all_news, # Return all news, let frontend handle pagination
"total": len(all_news),
"categories": [c["name"] for c in categories],
"fetched_at": current_time
}
# =====================================================
# MAIN
# =====================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)