""" 🔍 VerifAI - Unified Fake News Detection API ============================================= Combines Text and Image detection in a single API Uses: Pre-trained BERT, Gemini Vision, Groq LLM, Web Verification Enhanced with caching and smart API key management """ import re import os import io import base64 import torch import numpy as np import hashlib import time import requests from functools import lru_cache from datetime import datetime, timedelta from dataclasses import dataclass, field from typing import Dict, Optional, Tuple from PIL import Image from fastapi import FastAPI, HTTPException, File, UploadFile from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from duckduckgo_search import DDGS import google.generativeai as genai # OCR import easyocr from transformers import ( AutoTokenizer, AutoModelForSequenceClassification ) # ===================================================== # CONFIG # ===================================================== # Pre-trained fake news detection model from HuggingFace FAKE_NEWS_MODEL = "jy46604790/Fake-News-BERT-Detect" # ===================================================== # GROQ API CONFIGURATION (Fallback when Gemini fails) # ===================================================== GROQ_API_KEY = os.getenv("GROQ_API_KEY", "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") GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions" GROQ_MODEL = "llama-3.1-70b-versatile" # Free tier, fast inference # ===================================================== # GEMINI API CONFIGURATION WITH SMART ROTATION # ===================================================== GEMINI_API_KEYS = [ "AIzaSyAk5mCVfXd8-kt1Sz5xlKsZUVW1RVHk-AA", # Key 1 os.getenv("GEMINI_API_KEY", "AIzaSyDOY4Dh0GfVz9o0JGy_kqYrZ8vDxE0qm8g"), # Key 2 "AIzaSyBP-2Or59qhzMqLE-Qcxz4TcYQdqBvWR7M", # Key 3 "AIzaSyAC2KZguXcshRw-M-W-6RlPiL8xQg6VGAg", # Key 4 "AIzaSyCsQ49bpqjxCw0TBprKYzONWytC9K87tJ8", # Key 5 "AIzaSyCd1-Jh3qUumOs2IPXT_-lxFLzzAaw9fTE", # Key 6 ] # ===================================================== # SMART API KEY MANAGEMENT # ===================================================== @dataclass class KeyStatus: """Track usage and health of each API key""" requests_count: int = 0 last_used: datetime = field(default_factory=datetime.now) is_rate_limited: bool = False cooldown_until: Optional[datetime] = None errors_count: int = 0 # Initialize key status tracking key_statuses: Dict[int, KeyStatus] = {i: KeyStatus() for i in range(len(GEMINI_API_KEYS))} request_counter = 0 # For round-robin distribution def get_best_key_index() -> int: """ Smart key selection using round-robin with health checks. Skips rate-limited keys and distributes load evenly. """ global request_counter for _ in range(len(GEMINI_API_KEYS)): idx = request_counter % len(GEMINI_API_KEYS) request_counter += 1 status = key_statuses[idx] # Skip if in cooldown if status.cooldown_until and datetime.now() < status.cooldown_until: continue # Reset cooldown if expired if status.cooldown_until and datetime.now() >= status.cooldown_until: status.is_rate_limited = False status.cooldown_until = None status.errors_count = 0 # Skip keys with too many errors if status.errors_count >= 3: continue return idx # If all keys are limited, reset and use first available for idx in range(len(GEMINI_API_KEYS)): key_statuses[idx].is_rate_limited = False key_statuses[idx].cooldown_until = None key_statuses[idx].errors_count = 0 return 0 def mark_key_success(idx: int): """Mark a key as successfully used""" status = key_statuses[idx] status.requests_count += 1 status.last_used = datetime.now() status.errors_count = max(0, status.errors_count - 1) # Reduce error count on success def mark_key_failure(idx: int, is_rate_limit: bool = False): """Mark a key as failed, optionally with rate limit cooldown""" status = key_statuses[idx] status.errors_count += 1 if is_rate_limit: status.is_rate_limited = True # 60 second cooldown for rate limits status.cooldown_until = datetime.now() + timedelta(seconds=60) print(f"â¸ī¸ Key #{idx + 1} rate limited, cooldown until {status.cooldown_until}") def get_gemini_model(): """Get a Gemini model with smart key selection""" idx = get_best_key_index() api_key = GEMINI_API_KEYS[idx] try: genai.configure(api_key=api_key) model = genai.GenerativeModel("gemini-2.0-flash") # Upgraded to 2.0 key_preview = api_key[:15] + "..." + api_key[-4:] print(f"✅ Using Gemini key #{idx + 1}: {key_preview}") return model, idx except Exception as e: print(f"❌ Key #{idx + 1} config failed: {str(e)[:50]}") mark_key_failure(idx) raise # Legacy function for compatibility def rotate_api_key(): """Legacy function - now handled by smart key management""" global request_counter request_counter += 1 # ===================================================== # LLM RESPONSE CACHING # ===================================================== # In-memory cache for LLM responses (reduces API calls by ~40-60%) llm_cache: Dict[str, Tuple[float, float]] = {} # hash -> (score, timestamp) CACHE_TTL = 3600 # 1 hour cache validity def get_text_hash(text: str) -> str: """Generate hash for text content""" normalized = text.lower().strip()[:500] # Normalize and limit length return hashlib.md5(normalized.encode()).hexdigest() def get_cached_score(text_hash: str) -> Optional[float]: """Get cached LLM score if valid""" if text_hash in llm_cache: score, timestamp = llm_cache[text_hash] if time.time() - timestamp < CACHE_TTL: print(f"đŸ“Ļ Cache hit for {text_hash[:8]}...") return score else: del llm_cache[text_hash] # Expired return None def cache_score(text_hash: str, score: float): """Cache LLM score with timestamp""" llm_cache[text_hash] = (score, time.time()) # Limit cache size if len(llm_cache) > 1000: oldest = min(llm_cache.items(), key=lambda x: x[1][1]) del llm_cache[oldest[0]] # ===================================================== # GROQ FALLBACK LLM # ===================================================== def groq_llm_score(text: str) -> float: """ Use Groq's free Llama 3.1 70B as fallback when all Gemini keys fail. Groq offers very fast inference with generous free tier. """ try: prompt = f"""You are a fact-checking AI. Evaluate if this news claim is factually plausible. Respond ONLY with a number between 0 and 1: - 0.0-0.3: Clearly false, impossible, or conspiracy theory - 0.4-0.6: Uncertain, needs verification - 0.7-1.0: Plausible, sounds like real news Claim: {text[:1000]} Your response (just the number):""" response = requests.post( GROQ_API_URL, headers={ "Authorization": f"Bearer {GROQ_API_KEY}", "Content-Type": "application/json" }, json={ "model": GROQ_MODEL, "messages": [{"role": "user", "content": prompt}], "temperature": 0, "max_tokens": 10 }, timeout=30 ) if response.status_code == 200: result = response.json() text_response = result["choices"][0]["message"]["content"].strip() match = re.search(r"([0-9]+\.?[0-9]*)", text_response) if match: score = float(match.group(1)) print(f"đŸĻ™ Groq Llama 3.1 score: {score}") return min(max(score, 0), 1) else: print(f"âš ī¸ Groq API error: {response.status_code}") except Exception as e: print(f"[GROQ ERROR]: {e}") return 0.5 # Neutral fallback # Initialize with the first working API key try: gemini_model, _init_idx = get_gemini_model() print(f"✅ Gemini API configured with {len(GEMINI_API_KEYS)} keys (smart rotation enabled)") except: gemini_model = None print("âš ī¸ Gemini initialization failed") print(f"✅ Groq API configured as fallback (model: {GROQ_MODEL})") # Extended list of trusted news sources - GLOBAL COVERAGE TRUSTED_DOMAINS = [ # === INTERNATIONAL / WIRE SERVICES === "reuters.com", "apnews.com", "afp.com", "upi.com", # === UNITED STATES === "nytimes.com", "washingtonpost.com", "wsj.com", "usatoday.com", "cnn.com", "nbcnews.com", "cbsnews.com", "abcnews.go.com", "foxnews.com", "npr.org", "pbs.org", "politico.com", "thehill.com", "axios.com", "bloomberg.com", "forbes.com", "businessinsider.com", "cnbc.com", # === UNITED KINGDOM === "bbc.com", "bbc.co.uk", "theguardian.com", "telegraph.co.uk", "independent.co.uk", "thetimes.co.uk", "ft.com", "economist.com", "dailymail.co.uk", "mirror.co.uk", "metro.co.uk", "sky.com", # === EUROPE === "dw.com", "spiegel.de", "zeit.de", # Germany "france24.com", "lemonde.fr", "lefigaro.fr", # France "elpais.com", "elmundo.es", # Spain "corriere.it", "repubblica.it", # Italy "nos.nl", "dutchnews.nl", # Netherlands "euronews.com", "politico.eu", # EU-wide # === MIDDLE EAST === "aljazeera.com", "aljazeera.net", "arabnews.com", "gulfnews.com", "thenationalnews.com", "timesofisrael.com", "haaretz.com", "jpost.com", # === ASIA === # India "ndtv.com", "thehindu.com", "hindustantimes.com", "indianexpress.com", "timesofindia.indiatimes.com", "indiatimes.com", "news18.com", "firstpost.com", "livemint.com", "theprint.in", "scroll.in", # China/Hong Kong "scmp.com", "globaltimes.cn", "chinadaily.com.cn", # Japan "japantimes.co.jp", "nhk.or.jp", "asahi.com", # South Korea "koreaherald.com", "koreatimes.co.kr", # Southeast Asia "straitstimes.com", "channelnewsasia.com", # Singapore "bangkokpost.com", # Thailand "thejakartapost.com", # Indonesia "philstar.com", "gmanetwork.com", # Philippines # === AFRICA === "news24.com", "mg.co.za", "dailymaverick.co.za", # South Africa "nation.africa", "theeastafrican.co.ke", # East Africa "allafrica.com", # Pan-African # === LATIN AMERICA === "bbc.com/mundo", "elpais.com/america", "folha.uol.com.br", "g1.globo.com", # Brazil "infobae.com", "clarin.com", # Argentina "eluniversal.com.mx", "milenio.com", # Mexico # === OCEANIA === "abc.net.au", "sbs.com.au", "smh.com.au", "theaustralian.com.au", "nzherald.co.nz", "stuff.co.nz", # New Zealand # === TECH NEWS === "techcrunch.com", "theverge.com", "wired.com", "arstechnica.com", "cnet.com", "zdnet.com", "engadget.com", "gizmodo.com", "mashable.com", # === BUSINESS/FINANCE === "reuters.com/business", "marketwatch.com", "yahoo.com/finance", "morningstar.com", "investopedia.com", # === SCIENCE/HEALTH === "scientificamerican.com", "nature.com", "newscientist.com", "sciencemag.org", "medscape.com", "webmd.com", "who.int", # === GOVERNMENT SOURCES === "gov.in", "gov.uk", "usa.gov", "europa.eu", "un.org", # === ENTERTAINMENT === "variety.com", "hollywoodreporter.com", "deadline.com", "ew.com", # === SPORTS === "espn.com", "sports.yahoo.com", "bleacherreport.com", "skysports.com", # === MICROSOFT/MSN === "msn.com", "microsoft.com" ] # FastAPI app app = FastAPI( title="VerifAI - Fake News Detection API", description="Multi-model fake news detection for text and images using pre-trained transformers", version="3.0.0" ) # Enable CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Request models class TextRequest(BaseModel): text: str class ImageRequest(BaseModel): image: str # Base64 encoded image # ===================================================== # LOAD MODELS AT STARTUP # ===================================================== print("🚀 Loading EasyOCR...") ocr_reader = easyocr.Reader(['en'], gpu=torch.cuda.is_available()) print(f"🧠 Loading Fake News BERT model: {FAKE_NEWS_MODEL}") print(" (First run downloads ~440MB from HuggingFace)") try: bert_tokenizer = AutoTokenizer.from_pretrained(FAKE_NEWS_MODEL) bert_model = AutoModelForSequenceClassification.from_pretrained(FAKE_NEWS_MODEL) bert_model.eval() print("✅ Fake News BERT model loaded successfully!") except Exception as e: print(f"❌ Error loading model: {e}") bert_tokenizer = None bert_model = None # ===================================================== # HELPERS # ===================================================== def clean_text(text): text = text.lower() return re.sub(r"[^a-z ]+", " ", text) def is_factual_claim(text): """ Determine if text is a factual claim worth analyzing. VERY LENIENT - we want to analyze most news-like text. """ text_lower = text.lower() # Comprehensive list of news-related keywords keywords = [ # Verbs - actions and states "is", "was", "are", "were", "has", "have", "had", "been", "being", "announced", "confirmed", "became", "says", "said", "told", "stated", "won", "lost", "launched", "approved", "reported", "according", "claims", "died", "killed", "arrested", "elected", "appointed", "resigned", "fired", "signed", "passed", "rejected", "voted", "declared", "ordered", "banned", "attacked", "invaded", "bombed", "hacked", "hijacked", "seized", "captured", "discovered", "revealed", "leaked", "exposed", "investigated", "charged", "inaugurated", "sworn", "impeached", "convicted", "acquitted", "sentenced", # News terms "breaking", "urgent", "exclusive", "official", "sources", "update", "developing", "just in", "alert", "live", "report", "news", # Quantities and stats "million", "billion", "trillion", "percent", "number", "rate", "record", # Entities - politics "president", "minister", "prime", "government", "congress", "parliament", "senate", "court", "supreme", "federal", "state", "national", "election", "trump", "biden", "obama", "putin", "modi", "xi", "zelensky", "netanyahu", # Entities - places "country", "nation", "city", "usa", "america", "china", "russia", "india", "ukraine", "israel", "iran", "gaza", "palestine", "europe", "asia", # Time references "today", "yesterday", "tomorrow", "january", "february", "march", "april", "may", "june", "july", "august", "september", "october", "november", "december", "2024", "2025", "2026", # Events "attack", "war", "protest", "strike", "earthquake", "hurricane", "flood", "crash", "explosion", "fire", "shooting", "pandemic", "outbreak" ] # Check for any keyword match if any(k in text_lower for k in keywords): return True # Also accept any text that's sufficiently long and looks like news # (at least 20 characters and 4 words) words = text.split() if len(text) >= 20 and len(words) >= 4: return True return False def decode_base64_image(base64_string): """Decode base64 string to PIL Image""" if "," in base64_string: base64_string = base64_string.split(",")[1] image_data = base64.b64decode(base64_string) return Image.open(io.BytesIO(image_data)) # ===================================================== # OCR - TEXT EXTRACTION # ===================================================== def extract_text_from_image(image): """Extract text from image using EasyOCR""" try: if isinstance(image, Image.Image): img_array = np.array(image) results = ocr_reader.readtext(img_array) else: results = ocr_reader.readtext(image) extracted_text = " ".join([result[1] for result in results]) return extracted_text.strip() except Exception as e: print(f"[OCR ERROR]: {e}") return "" # ===================================================== # SCORING FUNCTIONS # ===================================================== def bert_fake_news_score(text): """ Use pre-trained Fake News BERT model for detection. Returns probability that the text is REAL news (0-1). """ if not bert_tokenizer or not bert_model: return 0.5 # Fallback if model not loaded try: inputs = bert_tokenizer( text, return_tensors="pt", truncation=True, max_length=512, padding=True ) with torch.no_grad(): outputs = bert_model(**inputs) logits = outputs.logits probs = torch.softmax(logits, dim=1) # Model outputs: [FAKE, REAL] probabilities # Return REAL probability (index 1) real_prob = probs[0][1].item() return real_prob except Exception as e: print(f"[BERT ERROR]: {e}") return 0.5 def llm_score(text): """ Use Gemini to evaluate factual plausibility with caching and smart key rotation. - First checks cache for repeated queries - Then tries Gemini with smart key rotation """ # Check cache first text_hash = get_text_hash(text) cached = get_cached_score(text_hash) if cached is not None: return cached max_retries = 3 current_key_idx = None for attempt in range(max_retries): try: prompt = f"""You are a fact-checking AI assistant. Evaluate whether this news claim is factually plausible. Consider: 1. Does it describe real events, people, or places? 2. Is the claim physically/logically possible? 3. Does it sound like legitimate journalism? News claims about protests, deaths, government actions, economic data etc. from reputable sources are usually TRUE. Sensational claims with no specifics, miracle cures, conspiracy theories are usually FALSE. Respond ONLY with a number between 0 and 1: - 0.0-0.3: Clearly false, impossible, or conspiracy theory - 0.4-0.6: Uncertain, needs verification - 0.7-1.0: Plausible, sounds like real news Claim to evaluate: {text[:1500]} Your response (just the number):""" # Get model with smart key selection model, current_key_idx = get_gemini_model() response = model.generate_content( prompt, generation_config={"temperature": 0} ) text_response = response.text.strip() match = re.search(r"([0-9]+\.?[0-9]*)", text_response) if match: score = float(match.group(1)) score = min(max(score, 0), 1) # Mark success and cache result mark_key_success(current_key_idx) cache_score(text_hash, score) print(f"✨ Gemini LLM score: {score}") return score return 0.6 except Exception as e: error_str = str(e).lower() print(f"[LLM ERROR on attempt {attempt + 1}]: {e}") # Mark failure with rate limit detection if current_key_idx is not None: is_rate_limit = "429" in str(e) or "quota" in error_str or "rate" in error_str mark_key_failure(current_key_idx, is_rate_limit) if attempt < max_retries - 1: continue # ========== GROQ FALLBACK ========== print("🔄 All Gemini keys exhausted, falling back to Groq Llama 3.1...") groq_result = groq_llm_score(text) cache_score(text_hash, groq_result) return groq_result def llm_image_analysis(image): """ Use Gemini Vision to analyze image for manipulation with smart key rotation. Returns (manipulation_score, concerns) """ max_retries = 3 current_key_idx = None for attempt in range(max_retries): try: prompt = """Analyze this image for signs of misinformation or manipulation. Check for: 1. Sensationalist text overlays or headlines 2. Misleading statistics or false claims 3. Signs of digital manipulation (artifacts, inconsistent lighting, unnatural elements) 4. Fake news visual patterns (low quality, watermarks from unreliable sources) 5. Out-of-context imagery Be FAIR in your analysis. News screenshots from legitimate sources should score LOW (genuine). Only score HIGH if there are clear signs of manipulation or fake content. Respond ONLY as: MANIPULATION_SCORE: <0 to 1, where 0 is completely genuine and 1 is definitely fake/manipulated> CONCERNS: """ # Get model with smart key selection model, current_key_idx = get_gemini_model() response = model.generate_content([prompt, image]) match = re.search(r"MANIPULATION_SCORE:\s*([0-9.]+)", response.text) manipulation_score = float(match.group(1)) if match else 0.3 concerns_match = re.search(r"CONCERNS:\s*(.+)", response.text, re.IGNORECASE | re.DOTALL) concerns = concerns_match.group(1).strip().split('\n')[0] if concerns_match else "Unable to analyze" # Mark success mark_key_success(current_key_idx) print(f"đŸ–ŧī¸ Image analysis complete, manipulation score: {manipulation_score}") return manipulation_score, concerns except Exception as e: error_str = str(e).lower() print(f"[IMAGE ANALYSIS ERROR on attempt {attempt + 1}]: {e}") # Mark failure with rate limit detection if current_key_idx is not None: is_rate_limit = "429" in str(e) or "quota" in error_str or "rate" in error_str mark_key_failure(current_key_idx, is_rate_limit) if attempt < max_retries - 1: continue else: print("[IMAGE ANALYSIS ERROR]: All retry attempts failed") return 0.3, "Analysis failed - please try again" def web_search_with_sources(text): """ Search web for news verification using MULTIPLE FREE SOURCES: 1. Google News RSS (free, unlimited, returns major news outlets) 2. DuckDuckGo News search (prioritizes news sites) 3. Regular DuckDuckGo as fallback Returns: (score, list of source dictionaries) """ from urllib.parse import urlparse, quote import xml.etree.ElementTree as ET def extract_domain(url): """Extract clean domain from URL for deduplication""" try: parsed = urlparse(url) domain = parsed.netloc.lower().replace('www.', '') parts = domain.split('.') if len(parts) >= 2: return '.'.join(parts[-2:]) return domain except: return url.lower() # Extract key terms for search words = text.split() stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'can', 'to', 'of', 'in', 'for', 'on', 'with', 'at', 'by', 'from', 'as', 'into', 'that', 'this', 'these', 'those', 'and', 'or', 'but', 'if', 'then', 'else', 'when', 'up', 'down', 'out', 'so', 'just', 'also', 'only', 'very', 'too', 'now', 'here', 'there'} keywords = [w for w in words if w.lower() not in stop_words][:10] query = " ".join(keywords) print(f"[WEB SEARCH] Query: {query}") all_results = [] seen_domains = set() # ===== SOURCE 1: Google News RSS (FREE, UNLIMITED) ===== try: google_news_url = f"https://news.google.com/rss/search?q={quote(query)}&hl=en-US&gl=US&ceid=US:en" response = requests.get(google_news_url, timeout=10, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) if response.status_code == 200: root = ET.fromstring(response.content) for item in root.findall('.//item')[:15]: title = item.find('title') link = item.find('link') source = item.find('source') if title is not None and link is not None: # Google News links often redirect, but source tag has real domain real_url = link.text if link.text else "" source_name = source.text if source is not None else "" all_results.append({ "title": title.text[:120] if title.text else "News", "href": real_url, "body": f"Source: {source_name}", "source": "google_news" }) print(f"[GOOGLE NEWS] Found {len(all_results)} articles") except Exception as e: print(f"[GOOGLE NEWS ERROR]: {e}") # ===== SOURCE 2: DuckDuckGo NEWS search (prioritizes news sites) ===== try: with DDGS() as ddgs: news_results = list(ddgs.news(query, max_results=20)) for r in news_results: all_results.append({ "title": r.get("title", "")[:120], "href": r.get("url", ""), "body": r.get("body", "")[:150], "source": "ddg_news" }) print(f"[DDG NEWS] Found {len(news_results)} articles") except Exception as e: print(f"[DDG NEWS ERROR]: {e}") # ===== SOURCE 3: DuckDuckGo regular search as fallback ===== if len(all_results) < 5: try: with DDGS() as ddgs: text_results = list(ddgs.text(query + " news", max_results=15)) for r in text_results: all_results.append({ "title": r.get("title", "")[:120], "href": r.get("href", ""), "body": r.get("body", "")[:150], "source": "ddg_text" }) print(f"[DDG TEXT] Found {len(text_results)} results") except Exception as e: print(f"[DDG TEXT ERROR]: {e}") print(f"[WEB SEARCH TOTAL] {len(all_results)} results from all sources") # Process and categorize results trusted_hits = 0 trusted_sources = [] other_sources = [] for r in all_results: url = r.get("href", "") title = r.get("title", "") body = r.get("body", "") if not url: continue domain = extract_domain(url) # Skip duplicates if domain in seen_domains: continue is_trusted = any(d in url.lower() for d in TRUSTED_DOMAINS) source_entry = { "title": title if title else "News Article", "url": url, "snippet": body if body else "", "trusted": is_trusted, "domain": domain } if is_trusted: trusted_sources.append(source_entry) seen_domains.add(domain) trusted_hits += 1 else: other_sources.append(source_entry) seen_domains.add(domain) # Combine: trusted first, then others sources = trusted_sources[:5] for src in other_sources: if len(sources) >= 8: break if not any(s.get('domain') == src.get('domain') for s in sources): sources.append(src) # Calculate score based on trusted hits if trusted_hits >= 3: score = 1.0 elif trusted_hits >= 2: score = 0.8 elif trusted_hits >= 1: score = 0.5 elif len(sources) > 0: score = 0.2 else: score = 0.0 # Log results final_domains = [s.get('domain', 'unknown') for s in sources] print(f"[WEB SEARCH] Trusted: {trusted_hits}, Total sources: {len(sources)}, Score: {score}") print(f"[WEB SEARCH] Domains: {', '.join(final_domains[:5])}...") return score, sources[:8] # ===================================================== # TEXT ANALYSIS # ===================================================== def analyze_text(text): """Analyze text for fake news with improved accuracy""" if not is_factual_claim(text): return { "credibility": 50, "verdict": "NOT A FACTUAL CLAIM", "bert_score": 0, "llm_score": 0, "web_score": 0, "sources": [] } bert = bert_fake_news_score(text) llm = llm_score(text) web, sources = web_search_with_sources(text) # IMPROVED Weighted fusion - more balanced approach # Finding trusted sources is strong evidence of legitimacy # BERT can be unreliable for certain news types # # New weights: # - Web: 40% (finding trusted sources is strong signal) # - LLM: 35% (contextual understanding) # - BERT: 25% (can be unreliable, use as secondary signal) final = (0.25 * bert) + (0.35 * llm) + (0.40 * web) # Significant boost if we found trusted sources if len([s for s in sources if s.get('trusted')]) >= 2: final = min(final + 0.25, 1.0) # Big boost for multiple trusted sources elif len([s for s in sources if s.get('trusted')]) >= 1: final = min(final + 0.15, 1.0) # Moderate boost for one trusted source elif web > 0: final = min(final + 0.05, 1.0) # Small boost for any web results credibility = round(final * 100, 1) # IMPROVED Verdict logic - give benefit of doubt when sources found trusted_count = len([s for s in sources if s.get('trusted')]) if trusted_count >= 2: # Multiple trusted sources = highly credible verdict = "VERIFIED" credibility = max(credibility, 75) elif trusted_count >= 1 and llm >= 0.5: verdict = "LIKELY REAL" credibility = max(credibility, 65) elif trusted_count >= 1: verdict = "LIKELY REAL" credibility = max(credibility, 55) elif bert < 0.2 and llm < 0.3 and web == 0: # Only flag as fake if ALL signals are negative verdict = "LIKELY FAKE" credibility = min(credibility, 30) elif credibility >= 50: verdict = "UNCERTAIN - Verify Manually" else: verdict = "UNCERTAIN" return { "credibility": credibility, "verdict": verdict, "bert_score": round(bert, 3), "llm_score": round(llm, 3), "web_score": round(web, 3), "sources": sources } # ===================================================== # IMAGE ANALYSIS # ===================================================== def analyze_image(image): """Complete image analysis pipeline""" extracted_text = extract_text_from_image(image) manipulation_score, concerns = llm_image_analysis(image) if not extracted_text: credibility = int((1 - manipulation_score) * 100) if manipulation_score > 0.7: verdict = "SUSPICIOUS" alert = "âš ī¸ Image shows signs of manipulation" elif manipulation_score > 0.4: verdict = "UNVERIFIABLE" alert = "âš ī¸ Unable to verify - no text detected" else: verdict = "NO TEXT DETECTED" alert = "â„šī¸ No readable text found in image" return { "credibility": credibility, "verdict": verdict, "alert": alert, "extracted_text": "", "bert_score": 0, "llm_score": 0, "web_score": 0, "image_manipulation_score": round(manipulation_score, 3), "concerns": concerns, "sources": [] } if not is_factual_claim(extracted_text): credibility = int((1 - manipulation_score) * 100) return { "credibility": credibility, "verdict": "NOT A FACTUAL CLAIM", "alert": "â„šī¸ Image does not contain verifiable claims", "extracted_text": extracted_text, "bert_score": 0, "llm_score": 0, "web_score": 0, "image_manipulation_score": round(manipulation_score, 3), "concerns": concerns, "sources": [] } # Full analysis bert = bert_fake_news_score(extracted_text) llm = llm_score(extracted_text) web, sources = web_search_with_sources(extracted_text) # Combined credibility (text + image) text_cred = (0.40 * bert) + (0.35 * llm) + (0.25 * web) image_cred = 1 - manipulation_score final_cred = (0.80 * text_cred) + (0.20 * image_cred) credibility = int(final_cred * 100) # Determine verdict if bert < 0.3 or manipulation_score > 0.7: verdict = "FAKE" alert = "🚨 FAKE NEWS ALERT - Content appears fabricated!" credibility = min(credibility, 20) elif manipulation_score > 0.5: verdict = "SUSPICIOUS" alert = "âš ī¸ CAUTION - Image shows signs of manipulation" credibility = min(credibility, 45) elif web >= 0.5 and bert >= 0.6: verdict = "VERIFIED" alert = "✅ VERIFIED - Content found in trusted sources" credibility = max(credibility, 80) elif bert >= 0.5: verdict = "LIKELY REAL" alert = "✅ Content appears credible" else: verdict = "UNVERIFIABLE" alert = "âš ī¸ Unable to verify - Exercise caution" return { "credibility": credibility, "verdict": verdict, "alert": alert, "extracted_text": extracted_text, "bert_score": round(bert, 3), "llm_score": round(llm, 3), "web_score": round(web, 3), "image_manipulation_score": round(manipulation_score, 3), "concerns": concerns, "sources": sources } # ===================================================== # API ENDPOINTS # ===================================================== @app.get("/") def health_check(): """Health check with API status""" # Calculate key health status healthy_keys = sum(1 for s in key_statuses.values() if not s.is_rate_limited and s.errors_count < 3) return { "status": "healthy", "service": "VerifAI Fake News Detection", "version": "4.0.0", # Updated version "model": FAKE_NEWS_MODEL, "features": [ "Text Detection", "Image Detection", "OCR", "BERT", "Gemini Vision", "Groq Fallback", "Response Caching", "Source Links" ], "api_status": { "gemini_keys": len(GEMINI_API_KEYS), "healthy_keys": healthy_keys, "groq_fallback": "enabled", "groq_model": GROQ_MODEL }, "cache_stats": { "cached_items": len(llm_cache), "cache_ttl_hours": CACHE_TTL / 3600 } } @app.get("/api-status") def api_status(): """Detailed API key status for monitoring""" keys_info = [] for idx, status in key_statuses.items(): key = GEMINI_API_KEYS[idx] keys_info.append({ "key_number": idx + 1, "key_preview": f"{key[:10]}...{key[-4:]}", "requests_count": status.requests_count, "is_rate_limited": status.is_rate_limited, "errors_count": status.errors_count, "cooldown_until": str(status.cooldown_until) if status.cooldown_until else None }) return { "gemini_keys": keys_info, "groq": { "enabled": True, "model": GROQ_MODEL, "status": "ready" }, "cache": { "total_cached": len(llm_cache), "ttl_seconds": CACHE_TTL } } @app.post("/check") def check_text(request: TextRequest): """Text-based fake news detection""" if not request.text.strip(): raise HTTPException(status_code=400, detail="Text is required") return analyze_text(request.text) @app.post("/check-image") async def check_image(request: ImageRequest): """Image-based fake news detection (base64 input)""" try: image = decode_base64_image(request.image) return analyze_image(image) except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid image: {str(e)}") @app.post("/check-image-upload") async def check_image_upload(image: UploadFile = File(...)): """Image-based fake news detection (file upload)""" try: contents = await image.read() pil_image = Image.open(io.BytesIO(contents)) return analyze_image(pil_image) except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid image: {str(e)}") @app.post("/extract-text") async def extract_text(request: ImageRequest): """Extract text from image using OCR (base64 input)""" try: image = decode_base64_image(request.image) extracted_text = extract_text_from_image(image) if not extracted_text: return { "success": False, "extracted_text": "", "message": "No text detected in the image" } return { "success": True, "extracted_text": extracted_text, "message": f"Successfully extracted {len(extracted_text.split())} words" } except Exception as e: raise HTTPException(status_code=400, detail=f"Error extracting text: {str(e)}") @app.get("/trending-news") async def get_trending_news(): """ Fetch top trending news across multiple categories. Returns 10-15 news items from various topics. """ categories = [ {"name": "World", "query": "world news today", "icon": "🌍"}, {"name": "Politics", "query": "political news today", "icon": "đŸ›ī¸"}, {"name": "Tech", "query": "technology news today", "icon": "đŸ’ģ"}, {"name": "Gaming", "query": "gaming news today", "icon": "🎮"}, {"name": "Science", "query": "science discovery today", "icon": "đŸ”Ŧ"}, {"name": "Health", "query": "health medical news today", "icon": "đŸĨ"}, {"name": "Sports", "query": "sports news today", "icon": "âšŊ"}, {"name": "Business", "query": "business economy news today", "icon": "📈"}, {"name": "Entertainment", "query": "entertainment celebrity news today", "icon": "đŸŽŦ"}, ] all_news = [] for category in categories: try: with DDGS() as ddgs: # Get more news for this category with today's date emphasis results = list(ddgs.news(category["query"], max_results=5)) for r in results: # Parse the date and format it nicely raw_date = r.get("date", "") formatted_date = raw_date # Try to format the date if it exists try: from datetime import datetime if raw_date: # DuckDuckGo returns date in various formats dt = datetime.fromisoformat(raw_date.replace('Z', '+00:00')) formatted_date = dt.strftime("%b %d, %Y â€ĸ %I:%M %p") except: formatted_date = raw_date if raw_date else "Recent" # Get image URL with fallback image_url = r.get("image", "") news_item = { "title": r.get("title", "")[:150], "url": r.get("url", ""), "source": r.get("source", "Unknown"), "date": formatted_date, "raw_date": raw_date, # Include raw date for sorting "body": r.get("body", "")[:200] if r.get("body") else "", "image": image_url, "category": category["name"], "icon": category["icon"] } all_news.append(news_item) except Exception as e: print(f"[TRENDING ERROR] {category['name']}: {e}") continue # Sort by date (most recent first) - don't shuffle to keep fresh news on top # Keep within each category for better organization from datetime import datetime import random # Add timestamp for cache-busting current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return { "success": True, "news": all_news, # Return all news, let frontend handle pagination "total": len(all_news), "categories": [c["name"] for c in categories], "fetched_at": current_time } # ===================================================== # MAIN # ===================================================== if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)