Spaces:
Sleeping
Sleeping
| """ | |
| ๐ VerifAI - Unified Fake News Detection API | |
| ============================================= | |
| Combines Text and Image detection in a single API | |
| Uses: Pre-trained BERT, Gemini Vision, Groq LLM, Web Verification | |
| Enhanced with caching and smart API key management | |
| """ | |
| import re | |
| import os | |
| import io | |
| import base64 | |
| import torch | |
| import numpy as np | |
| import hashlib | |
| import time | |
| import requests | |
| from functools import lru_cache | |
| from datetime import datetime, timedelta | |
| from dataclasses import dataclass, field | |
| from typing import Dict, Optional, Tuple | |
| from PIL import Image | |
| from fastapi import FastAPI, HTTPException, File, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from duckduckgo_search import DDGS | |
| import google.generativeai as genai | |
| # OCR | |
| import easyocr | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification | |
| ) | |
| # ===================================================== | |
| # CONFIG | |
| # ===================================================== | |
| # Pre-trained fake news detection model from HuggingFace | |
| FAKE_NEWS_MODEL = "jy46604790/Fake-News-BERT-Detect" | |
| # ===================================================== | |
| # GROQ API CONFIGURATION (Fallback when Gemini fails) | |
| # ===================================================== | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY", "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") | |
| GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions" | |
| GROQ_MODEL = "llama-3.1-70b-versatile" # Free tier, fast inference | |
| # ===================================================== | |
| # GEMINI API CONFIGURATION WITH SMART ROTATION | |
| # ===================================================== | |
| GEMINI_API_KEYS = [ | |
| "AIzaSyAk5mCVfXd8-kt1Sz5xlKsZUVW1RVHk-AA", # Key 1 | |
| os.getenv("GEMINI_API_KEY", "AIzaSyDOY4Dh0GfVz9o0JGy_kqYrZ8vDxE0qm8g"), # Key 2 | |
| "AIzaSyBP-2Or59qhzMqLE-Qcxz4TcYQdqBvWR7M", # Key 3 | |
| "AIzaSyAC2KZguXcshRw-M-W-6RlPiL8xQg6VGAg", # Key 4 | |
| "AIzaSyCsQ49bpqjxCw0TBprKYzONWytC9K87tJ8", # Key 5 | |
| "AIzaSyCd1-Jh3qUumOs2IPXT_-lxFLzzAaw9fTE", # Key 6 | |
| ] | |
| # ===================================================== | |
| # SMART API KEY MANAGEMENT | |
| # ===================================================== | |
| class KeyStatus: | |
| """Track usage and health of each API key""" | |
| requests_count: int = 0 | |
| last_used: datetime = field(default_factory=datetime.now) | |
| is_rate_limited: bool = False | |
| cooldown_until: Optional[datetime] = None | |
| errors_count: int = 0 | |
| # Initialize key status tracking | |
| key_statuses: Dict[int, KeyStatus] = {i: KeyStatus() for i in range(len(GEMINI_API_KEYS))} | |
| request_counter = 0 # For round-robin distribution | |
| def get_best_key_index() -> int: | |
| """ | |
| Smart key selection using round-robin with health checks. | |
| Skips rate-limited keys and distributes load evenly. | |
| """ | |
| global request_counter | |
| for _ in range(len(GEMINI_API_KEYS)): | |
| idx = request_counter % len(GEMINI_API_KEYS) | |
| request_counter += 1 | |
| status = key_statuses[idx] | |
| # Skip if in cooldown | |
| if status.cooldown_until and datetime.now() < status.cooldown_until: | |
| continue | |
| # Reset cooldown if expired | |
| if status.cooldown_until and datetime.now() >= status.cooldown_until: | |
| status.is_rate_limited = False | |
| status.cooldown_until = None | |
| status.errors_count = 0 | |
| # Skip keys with too many errors | |
| if status.errors_count >= 3: | |
| continue | |
| return idx | |
| # If all keys are limited, reset and use first available | |
| for idx in range(len(GEMINI_API_KEYS)): | |
| key_statuses[idx].is_rate_limited = False | |
| key_statuses[idx].cooldown_until = None | |
| key_statuses[idx].errors_count = 0 | |
| return 0 | |
| def mark_key_success(idx: int): | |
| """Mark a key as successfully used""" | |
| status = key_statuses[idx] | |
| status.requests_count += 1 | |
| status.last_used = datetime.now() | |
| status.errors_count = max(0, status.errors_count - 1) # Reduce error count on success | |
| def mark_key_failure(idx: int, is_rate_limit: bool = False): | |
| """Mark a key as failed, optionally with rate limit cooldown""" | |
| status = key_statuses[idx] | |
| status.errors_count += 1 | |
| if is_rate_limit: | |
| status.is_rate_limited = True | |
| # 60 second cooldown for rate limits | |
| status.cooldown_until = datetime.now() + timedelta(seconds=60) | |
| print(f"โธ๏ธ Key #{idx + 1} rate limited, cooldown until {status.cooldown_until}") | |
| def get_gemini_model(): | |
| """Get a Gemini model with smart key selection""" | |
| idx = get_best_key_index() | |
| api_key = GEMINI_API_KEYS[idx] | |
| try: | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel("gemini-2.0-flash") # Upgraded to 2.0 | |
| key_preview = api_key[:15] + "..." + api_key[-4:] | |
| print(f"โ Using Gemini key #{idx + 1}: {key_preview}") | |
| return model, idx | |
| except Exception as e: | |
| print(f"โ Key #{idx + 1} config failed: {str(e)[:50]}") | |
| mark_key_failure(idx) | |
| raise | |
| # Legacy function for compatibility | |
| def rotate_api_key(): | |
| """Legacy function - now handled by smart key management""" | |
| global request_counter | |
| request_counter += 1 | |
| # ===================================================== | |
| # LLM RESPONSE CACHING | |
| # ===================================================== | |
| # In-memory cache for LLM responses (reduces API calls by ~40-60%) | |
| llm_cache: Dict[str, Tuple[float, float]] = {} # hash -> (score, timestamp) | |
| CACHE_TTL = 3600 # 1 hour cache validity | |
| def get_text_hash(text: str) -> str: | |
| """Generate hash for text content""" | |
| normalized = text.lower().strip()[:500] # Normalize and limit length | |
| return hashlib.md5(normalized.encode()).hexdigest() | |
| def get_cached_score(text_hash: str) -> Optional[float]: | |
| """Get cached LLM score if valid""" | |
| if text_hash in llm_cache: | |
| score, timestamp = llm_cache[text_hash] | |
| if time.time() - timestamp < CACHE_TTL: | |
| print(f"๐ฆ Cache hit for {text_hash[:8]}...") | |
| return score | |
| else: | |
| del llm_cache[text_hash] # Expired | |
| return None | |
| def cache_score(text_hash: str, score: float): | |
| """Cache LLM score with timestamp""" | |
| llm_cache[text_hash] = (score, time.time()) | |
| # Limit cache size | |
| if len(llm_cache) > 1000: | |
| oldest = min(llm_cache.items(), key=lambda x: x[1][1]) | |
| del llm_cache[oldest[0]] | |
| # ===================================================== | |
| # GROQ FALLBACK LLM | |
| # ===================================================== | |
| def groq_llm_score(text: str) -> float: | |
| """ | |
| Use Groq's free Llama 3.1 70B as fallback when all Gemini keys fail. | |
| Groq offers very fast inference with generous free tier. | |
| """ | |
| try: | |
| prompt = f"""You are a fact-checking AI. Evaluate if this news claim is factually plausible. | |
| Respond ONLY with a number between 0 and 1: | |
| - 0.0-0.3: Clearly false, impossible, or conspiracy theory | |
| - 0.4-0.6: Uncertain, needs verification | |
| - 0.7-1.0: Plausible, sounds like real news | |
| Claim: {text[:1000]} | |
| Your response (just the number):""" | |
| response = requests.post( | |
| GROQ_API_URL, | |
| headers={ | |
| "Authorization": f"Bearer {GROQ_API_KEY}", | |
| "Content-Type": "application/json" | |
| }, | |
| json={ | |
| "model": GROQ_MODEL, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "temperature": 0, | |
| "max_tokens": 10 | |
| }, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| text_response = result["choices"][0]["message"]["content"].strip() | |
| match = re.search(r"([0-9]+\.?[0-9]*)", text_response) | |
| if match: | |
| score = float(match.group(1)) | |
| print(f"๐ฆ Groq Llama 3.1 score: {score}") | |
| return min(max(score, 0), 1) | |
| else: | |
| print(f"โ ๏ธ Groq API error: {response.status_code}") | |
| except Exception as e: | |
| print(f"[GROQ ERROR]: {e}") | |
| return 0.5 # Neutral fallback | |
| # Initialize with the first working API key | |
| try: | |
| gemini_model, _init_idx = get_gemini_model() | |
| print(f"โ Gemini API configured with {len(GEMINI_API_KEYS)} keys (smart rotation enabled)") | |
| except: | |
| gemini_model = None | |
| print("โ ๏ธ Gemini initialization failed") | |
| print(f"โ Groq API configured as fallback (model: {GROQ_MODEL})") | |
| # Extended list of trusted news sources - GLOBAL COVERAGE | |
| TRUSTED_DOMAINS = [ | |
| # === INTERNATIONAL / WIRE SERVICES === | |
| "reuters.com", "apnews.com", "afp.com", "upi.com", | |
| # === UNITED STATES === | |
| "nytimes.com", "washingtonpost.com", "wsj.com", "usatoday.com", | |
| "cnn.com", "nbcnews.com", "cbsnews.com", "abcnews.go.com", "foxnews.com", | |
| "npr.org", "pbs.org", "politico.com", "thehill.com", "axios.com", | |
| "bloomberg.com", "forbes.com", "businessinsider.com", "cnbc.com", | |
| # === UNITED KINGDOM === | |
| "bbc.com", "bbc.co.uk", "theguardian.com", "telegraph.co.uk", | |
| "independent.co.uk", "thetimes.co.uk", "ft.com", "economist.com", | |
| "dailymail.co.uk", "mirror.co.uk", "metro.co.uk", "sky.com", | |
| # === EUROPE === | |
| "dw.com", "spiegel.de", "zeit.de", # Germany | |
| "france24.com", "lemonde.fr", "lefigaro.fr", # France | |
| "elpais.com", "elmundo.es", # Spain | |
| "corriere.it", "repubblica.it", # Italy | |
| "nos.nl", "dutchnews.nl", # Netherlands | |
| "euronews.com", "politico.eu", # EU-wide | |
| # === MIDDLE EAST === | |
| "aljazeera.com", "aljazeera.net", | |
| "arabnews.com", "gulfnews.com", "thenationalnews.com", | |
| "timesofisrael.com", "haaretz.com", "jpost.com", | |
| # === ASIA === | |
| # India | |
| "ndtv.com", "thehindu.com", "hindustantimes.com", "indianexpress.com", | |
| "timesofindia.indiatimes.com", "indiatimes.com", "news18.com", | |
| "firstpost.com", "livemint.com", "theprint.in", "scroll.in", | |
| # China/Hong Kong | |
| "scmp.com", "globaltimes.cn", "chinadaily.com.cn", | |
| # Japan | |
| "japantimes.co.jp", "nhk.or.jp", "asahi.com", | |
| # South Korea | |
| "koreaherald.com", "koreatimes.co.kr", | |
| # Southeast Asia | |
| "straitstimes.com", "channelnewsasia.com", # Singapore | |
| "bangkokpost.com", # Thailand | |
| "thejakartapost.com", # Indonesia | |
| "philstar.com", "gmanetwork.com", # Philippines | |
| # === AFRICA === | |
| "news24.com", "mg.co.za", "dailymaverick.co.za", # South Africa | |
| "nation.africa", "theeastafrican.co.ke", # East Africa | |
| "allafrica.com", # Pan-African | |
| # === LATIN AMERICA === | |
| "bbc.com/mundo", "elpais.com/america", | |
| "folha.uol.com.br", "g1.globo.com", # Brazil | |
| "infobae.com", "clarin.com", # Argentina | |
| "eluniversal.com.mx", "milenio.com", # Mexico | |
| # === OCEANIA === | |
| "abc.net.au", "sbs.com.au", "smh.com.au", "theaustralian.com.au", | |
| "nzherald.co.nz", "stuff.co.nz", # New Zealand | |
| # === TECH NEWS === | |
| "techcrunch.com", "theverge.com", "wired.com", "arstechnica.com", | |
| "cnet.com", "zdnet.com", "engadget.com", "gizmodo.com", "mashable.com", | |
| # === BUSINESS/FINANCE === | |
| "reuters.com/business", "marketwatch.com", "yahoo.com/finance", | |
| "morningstar.com", "investopedia.com", | |
| # === SCIENCE/HEALTH === | |
| "scientificamerican.com", "nature.com", "newscientist.com", | |
| "sciencemag.org", "medscape.com", "webmd.com", "who.int", | |
| # === GOVERNMENT SOURCES === | |
| "gov.in", "gov.uk", "usa.gov", "europa.eu", "un.org", | |
| # === ENTERTAINMENT === | |
| "variety.com", "hollywoodreporter.com", "deadline.com", "ew.com", | |
| # === SPORTS === | |
| "espn.com", "sports.yahoo.com", "bleacherreport.com", "skysports.com", | |
| # === MICROSOFT/MSN === | |
| "msn.com", "microsoft.com" | |
| ] | |
| # FastAPI app | |
| app = FastAPI( | |
| title="VerifAI - Fake News Detection API", | |
| description="Multi-model fake news detection for text and images using pre-trained transformers", | |
| version="3.0.0" | |
| ) | |
| # Enable CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Request models | |
| class TextRequest(BaseModel): | |
| text: str | |
| class ImageRequest(BaseModel): | |
| image: str # Base64 encoded image | |
| # ===================================================== | |
| # LOAD MODELS AT STARTUP | |
| # ===================================================== | |
| print("๐ Loading EasyOCR...") | |
| ocr_reader = easyocr.Reader(['en'], gpu=torch.cuda.is_available()) | |
| print(f"๐ง Loading Fake News BERT model: {FAKE_NEWS_MODEL}") | |
| print(" (First run downloads ~440MB from HuggingFace)") | |
| try: | |
| bert_tokenizer = AutoTokenizer.from_pretrained(FAKE_NEWS_MODEL) | |
| bert_model = AutoModelForSequenceClassification.from_pretrained(FAKE_NEWS_MODEL) | |
| bert_model.eval() | |
| print("โ Fake News BERT model loaded successfully!") | |
| except Exception as e: | |
| print(f"โ Error loading model: {e}") | |
| bert_tokenizer = None | |
| bert_model = None | |
| # ===================================================== | |
| # HELPERS | |
| # ===================================================== | |
| def clean_text(text): | |
| text = text.lower() | |
| return re.sub(r"[^a-z ]+", " ", text) | |
| def is_factual_claim(text): | |
| """ | |
| Determine if text is a factual claim worth analyzing. | |
| VERY LENIENT - we want to analyze most news-like text. | |
| """ | |
| text_lower = text.lower() | |
| # Comprehensive list of news-related keywords | |
| keywords = [ | |
| # Verbs - actions and states | |
| "is", "was", "are", "were", "has", "have", "had", "been", "being", | |
| "announced", "confirmed", "became", "says", "said", "told", "stated", | |
| "won", "lost", "launched", "approved", "reported", "according", "claims", | |
| "died", "killed", "arrested", "elected", "appointed", "resigned", "fired", | |
| "signed", "passed", "rejected", "voted", "declared", "ordered", "banned", | |
| "attacked", "invaded", "bombed", "hacked", "hijacked", "seized", "captured", | |
| "discovered", "revealed", "leaked", "exposed", "investigated", "charged", | |
| "inaugurated", "sworn", "impeached", "convicted", "acquitted", "sentenced", | |
| # News terms | |
| "breaking", "urgent", "exclusive", "official", "sources", "update", | |
| "developing", "just in", "alert", "live", "report", "news", | |
| # Quantities and stats | |
| "million", "billion", "trillion", "percent", "number", "rate", "record", | |
| # Entities - politics | |
| "president", "minister", "prime", "government", "congress", "parliament", | |
| "senate", "court", "supreme", "federal", "state", "national", "election", | |
| "trump", "biden", "obama", "putin", "modi", "xi", "zelensky", "netanyahu", | |
| # Entities - places | |
| "country", "nation", "city", "usa", "america", "china", "russia", "india", | |
| "ukraine", "israel", "iran", "gaza", "palestine", "europe", "asia", | |
| # Time references | |
| "today", "yesterday", "tomorrow", "january", "february", "march", "april", | |
| "may", "june", "july", "august", "september", "october", "november", "december", | |
| "2024", "2025", "2026", | |
| # Events | |
| "attack", "war", "protest", "strike", "earthquake", "hurricane", "flood", | |
| "crash", "explosion", "fire", "shooting", "pandemic", "outbreak" | |
| ] | |
| # Check for any keyword match | |
| if any(k in text_lower for k in keywords): | |
| return True | |
| # Also accept any text that's sufficiently long and looks like news | |
| # (at least 20 characters and 4 words) | |
| words = text.split() | |
| if len(text) >= 20 and len(words) >= 4: | |
| return True | |
| return False | |
| def decode_base64_image(base64_string): | |
| """Decode base64 string to PIL Image""" | |
| if "," in base64_string: | |
| base64_string = base64_string.split(",")[1] | |
| image_data = base64.b64decode(base64_string) | |
| return Image.open(io.BytesIO(image_data)) | |
| # ===================================================== | |
| # OCR - TEXT EXTRACTION | |
| # ===================================================== | |
| def extract_text_from_image(image): | |
| """Extract text from image using EasyOCR""" | |
| try: | |
| if isinstance(image, Image.Image): | |
| img_array = np.array(image) | |
| results = ocr_reader.readtext(img_array) | |
| else: | |
| results = ocr_reader.readtext(image) | |
| extracted_text = " ".join([result[1] for result in results]) | |
| return extracted_text.strip() | |
| except Exception as e: | |
| print(f"[OCR ERROR]: {e}") | |
| return "" | |
| # ===================================================== | |
| # SCORING FUNCTIONS | |
| # ===================================================== | |
| def bert_fake_news_score(text): | |
| """ | |
| Use pre-trained Fake News BERT model for detection. | |
| Returns probability that the text is REAL news (0-1). | |
| """ | |
| if not bert_tokenizer or not bert_model: | |
| return 0.5 # Fallback if model not loaded | |
| try: | |
| inputs = bert_tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512, | |
| padding=True | |
| ) | |
| with torch.no_grad(): | |
| outputs = bert_model(**inputs) | |
| logits = outputs.logits | |
| probs = torch.softmax(logits, dim=1) | |
| # Model outputs: [FAKE, REAL] probabilities | |
| # Return REAL probability (index 1) | |
| real_prob = probs[0][1].item() | |
| return real_prob | |
| except Exception as e: | |
| print(f"[BERT ERROR]: {e}") | |
| return 0.5 | |
| def llm_score(text): | |
| """ | |
| Use Gemini to evaluate factual plausibility with caching and smart key rotation. | |
| - First checks cache for repeated queries | |
| - Then tries Gemini with smart key rotation | |
| """ | |
| # Check cache first | |
| text_hash = get_text_hash(text) | |
| cached = get_cached_score(text_hash) | |
| if cached is not None: | |
| return cached | |
| max_retries = 3 | |
| current_key_idx = None | |
| for attempt in range(max_retries): | |
| try: | |
| prompt = f"""You are a fact-checking AI assistant. Evaluate whether this news claim is factually plausible. | |
| Consider: | |
| 1. Does it describe real events, people, or places? | |
| 2. Is the claim physically/logically possible? | |
| 3. Does it sound like legitimate journalism? | |
| News claims about protests, deaths, government actions, economic data etc. from reputable sources are usually TRUE. | |
| Sensational claims with no specifics, miracle cures, conspiracy theories are usually FALSE. | |
| Respond ONLY with a number between 0 and 1: | |
| - 0.0-0.3: Clearly false, impossible, or conspiracy theory | |
| - 0.4-0.6: Uncertain, needs verification | |
| - 0.7-1.0: Plausible, sounds like real news | |
| Claim to evaluate: | |
| {text[:1500]} | |
| Your response (just the number):""" | |
| # Get model with smart key selection | |
| model, current_key_idx = get_gemini_model() | |
| response = model.generate_content( | |
| prompt, | |
| generation_config={"temperature": 0} | |
| ) | |
| text_response = response.text.strip() | |
| match = re.search(r"([0-9]+\.?[0-9]*)", text_response) | |
| if match: | |
| score = float(match.group(1)) | |
| score = min(max(score, 0), 1) | |
| # Mark success and cache result | |
| mark_key_success(current_key_idx) | |
| cache_score(text_hash, score) | |
| print(f"โจ Gemini LLM score: {score}") | |
| return score | |
| return 0.6 | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| print(f"[LLM ERROR on attempt {attempt + 1}]: {e}") | |
| # Mark failure with rate limit detection | |
| if current_key_idx is not None: | |
| is_rate_limit = "429" in str(e) or "quota" in error_str or "rate" in error_str | |
| mark_key_failure(current_key_idx, is_rate_limit) | |
| if attempt < max_retries - 1: | |
| continue | |
| # ========== GROQ FALLBACK ========== | |
| print("๐ All Gemini keys exhausted, falling back to Groq Llama 3.1...") | |
| groq_result = groq_llm_score(text) | |
| cache_score(text_hash, groq_result) | |
| return groq_result | |
| def llm_image_analysis(image): | |
| """ | |
| Use Gemini Vision to analyze image for manipulation with smart key rotation. | |
| Returns (manipulation_score, concerns) | |
| """ | |
| max_retries = 3 | |
| current_key_idx = None | |
| for attempt in range(max_retries): | |
| try: | |
| prompt = """Analyze this image for signs of misinformation or manipulation. | |
| Check for: | |
| 1. Sensationalist text overlays or headlines | |
| 2. Misleading statistics or false claims | |
| 3. Signs of digital manipulation (artifacts, inconsistent lighting, unnatural elements) | |
| 4. Fake news visual patterns (low quality, watermarks from unreliable sources) | |
| 5. Out-of-context imagery | |
| Be FAIR in your analysis. News screenshots from legitimate sources should score LOW (genuine). | |
| Only score HIGH if there are clear signs of manipulation or fake content. | |
| Respond ONLY as: | |
| MANIPULATION_SCORE: <0 to 1, where 0 is completely genuine and 1 is definitely fake/manipulated> | |
| CONCERNS: <specific concerns found, or "None detected">""" | |
| # Get model with smart key selection | |
| model, current_key_idx = get_gemini_model() | |
| response = model.generate_content([prompt, image]) | |
| match = re.search(r"MANIPULATION_SCORE:\s*([0-9.]+)", response.text) | |
| manipulation_score = float(match.group(1)) if match else 0.3 | |
| concerns_match = re.search(r"CONCERNS:\s*(.+)", response.text, re.IGNORECASE | re.DOTALL) | |
| concerns = concerns_match.group(1).strip().split('\n')[0] if concerns_match else "Unable to analyze" | |
| # Mark success | |
| mark_key_success(current_key_idx) | |
| print(f"๐ผ๏ธ Image analysis complete, manipulation score: {manipulation_score}") | |
| return manipulation_score, concerns | |
| except Exception as e: | |
| error_str = str(e).lower() | |
| print(f"[IMAGE ANALYSIS ERROR on attempt {attempt + 1}]: {e}") | |
| # Mark failure with rate limit detection | |
| if current_key_idx is not None: | |
| is_rate_limit = "429" in str(e) or "quota" in error_str or "rate" in error_str | |
| mark_key_failure(current_key_idx, is_rate_limit) | |
| if attempt < max_retries - 1: | |
| continue | |
| else: | |
| print("[IMAGE ANALYSIS ERROR]: All retry attempts failed") | |
| return 0.3, "Analysis failed - please try again" | |
| def web_search_with_sources(text): | |
| """ | |
| Search web for news verification using MULTIPLE FREE SOURCES: | |
| 1. Google News RSS (free, unlimited, returns major news outlets) | |
| 2. DuckDuckGo News search (prioritizes news sites) | |
| 3. Regular DuckDuckGo as fallback | |
| Returns: (score, list of source dictionaries) | |
| """ | |
| from urllib.parse import urlparse, quote | |
| import xml.etree.ElementTree as ET | |
| def extract_domain(url): | |
| """Extract clean domain from URL for deduplication""" | |
| try: | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower().replace('www.', '') | |
| parts = domain.split('.') | |
| if len(parts) >= 2: | |
| return '.'.join(parts[-2:]) | |
| return domain | |
| except: | |
| return url.lower() | |
| # Extract key terms for search | |
| words = text.split() | |
| stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being', | |
| 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', | |
| 'should', 'may', 'might', 'can', 'to', 'of', 'in', 'for', 'on', 'with', | |
| 'at', 'by', 'from', 'as', 'into', 'that', 'this', 'these', 'those', | |
| 'and', 'or', 'but', 'if', 'then', 'else', 'when', 'up', 'down', 'out', | |
| 'so', 'just', 'also', 'only', 'very', 'too', 'now', 'here', 'there'} | |
| keywords = [w for w in words if w.lower() not in stop_words][:10] | |
| query = " ".join(keywords) | |
| print(f"[WEB SEARCH] Query: {query}") | |
| all_results = [] | |
| seen_domains = set() | |
| # ===== SOURCE 1: Google News RSS (FREE, UNLIMITED) ===== | |
| try: | |
| google_news_url = f"https://news.google.com/rss/search?q={quote(query)}&hl=en-US&gl=US&ceid=US:en" | |
| response = requests.get(google_news_url, timeout=10, headers={ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| }) | |
| if response.status_code == 200: | |
| root = ET.fromstring(response.content) | |
| for item in root.findall('.//item')[:15]: | |
| title = item.find('title') | |
| link = item.find('link') | |
| source = item.find('source') | |
| if title is not None and link is not None: | |
| # Google News links often redirect, but source tag has real domain | |
| real_url = link.text if link.text else "" | |
| source_name = source.text if source is not None else "" | |
| all_results.append({ | |
| "title": title.text[:120] if title.text else "News", | |
| "href": real_url, | |
| "body": f"Source: {source_name}", | |
| "source": "google_news" | |
| }) | |
| print(f"[GOOGLE NEWS] Found {len(all_results)} articles") | |
| except Exception as e: | |
| print(f"[GOOGLE NEWS ERROR]: {e}") | |
| # ===== SOURCE 2: DuckDuckGo NEWS search (prioritizes news sites) ===== | |
| try: | |
| with DDGS() as ddgs: | |
| news_results = list(ddgs.news(query, max_results=20)) | |
| for r in news_results: | |
| all_results.append({ | |
| "title": r.get("title", "")[:120], | |
| "href": r.get("url", ""), | |
| "body": r.get("body", "")[:150], | |
| "source": "ddg_news" | |
| }) | |
| print(f"[DDG NEWS] Found {len(news_results)} articles") | |
| except Exception as e: | |
| print(f"[DDG NEWS ERROR]: {e}") | |
| # ===== SOURCE 3: DuckDuckGo regular search as fallback ===== | |
| if len(all_results) < 5: | |
| try: | |
| with DDGS() as ddgs: | |
| text_results = list(ddgs.text(query + " news", max_results=15)) | |
| for r in text_results: | |
| all_results.append({ | |
| "title": r.get("title", "")[:120], | |
| "href": r.get("href", ""), | |
| "body": r.get("body", "")[:150], | |
| "source": "ddg_text" | |
| }) | |
| print(f"[DDG TEXT] Found {len(text_results)} results") | |
| except Exception as e: | |
| print(f"[DDG TEXT ERROR]: {e}") | |
| print(f"[WEB SEARCH TOTAL] {len(all_results)} results from all sources") | |
| # Process and categorize results | |
| trusted_hits = 0 | |
| trusted_sources = [] | |
| other_sources = [] | |
| for r in all_results: | |
| url = r.get("href", "") | |
| title = r.get("title", "") | |
| body = r.get("body", "") | |
| if not url: | |
| continue | |
| domain = extract_domain(url) | |
| # Skip duplicates | |
| if domain in seen_domains: | |
| continue | |
| is_trusted = any(d in url.lower() for d in TRUSTED_DOMAINS) | |
| source_entry = { | |
| "title": title if title else "News Article", | |
| "url": url, | |
| "snippet": body if body else "", | |
| "trusted": is_trusted, | |
| "domain": domain | |
| } | |
| if is_trusted: | |
| trusted_sources.append(source_entry) | |
| seen_domains.add(domain) | |
| trusted_hits += 1 | |
| else: | |
| other_sources.append(source_entry) | |
| seen_domains.add(domain) | |
| # Combine: trusted first, then others | |
| sources = trusted_sources[:5] | |
| for src in other_sources: | |
| if len(sources) >= 8: | |
| break | |
| if not any(s.get('domain') == src.get('domain') for s in sources): | |
| sources.append(src) | |
| # Calculate score based on trusted hits | |
| if trusted_hits >= 3: | |
| score = 1.0 | |
| elif trusted_hits >= 2: | |
| score = 0.8 | |
| elif trusted_hits >= 1: | |
| score = 0.5 | |
| elif len(sources) > 0: | |
| score = 0.2 | |
| else: | |
| score = 0.0 | |
| # Log results | |
| final_domains = [s.get('domain', 'unknown') for s in sources] | |
| print(f"[WEB SEARCH] Trusted: {trusted_hits}, Total sources: {len(sources)}, Score: {score}") | |
| print(f"[WEB SEARCH] Domains: {', '.join(final_domains[:5])}...") | |
| return score, sources[:8] | |
| # ===================================================== | |
| # TEXT ANALYSIS | |
| # ===================================================== | |
| def analyze_text(text): | |
| """Analyze text for fake news with improved accuracy""" | |
| if not is_factual_claim(text): | |
| return { | |
| "credibility": 50, | |
| "verdict": "NOT A FACTUAL CLAIM", | |
| "bert_score": 0, | |
| "llm_score": 0, | |
| "web_score": 0, | |
| "sources": [] | |
| } | |
| bert = bert_fake_news_score(text) | |
| llm = llm_score(text) | |
| web, sources = web_search_with_sources(text) | |
| # IMPROVED Weighted fusion - more balanced approach | |
| # Finding trusted sources is strong evidence of legitimacy | |
| # BERT can be unreliable for certain news types | |
| # | |
| # New weights: | |
| # - Web: 40% (finding trusted sources is strong signal) | |
| # - LLM: 35% (contextual understanding) | |
| # - BERT: 25% (can be unreliable, use as secondary signal) | |
| final = (0.25 * bert) + (0.35 * llm) + (0.40 * web) | |
| # Significant boost if we found trusted sources | |
| if len([s for s in sources if s.get('trusted')]) >= 2: | |
| final = min(final + 0.25, 1.0) # Big boost for multiple trusted sources | |
| elif len([s for s in sources if s.get('trusted')]) >= 1: | |
| final = min(final + 0.15, 1.0) # Moderate boost for one trusted source | |
| elif web > 0: | |
| final = min(final + 0.05, 1.0) # Small boost for any web results | |
| credibility = round(final * 100, 1) | |
| # IMPROVED Verdict logic - give benefit of doubt when sources found | |
| trusted_count = len([s for s in sources if s.get('trusted')]) | |
| if trusted_count >= 2: | |
| # Multiple trusted sources = highly credible | |
| verdict = "VERIFIED" | |
| credibility = max(credibility, 75) | |
| elif trusted_count >= 1 and llm >= 0.5: | |
| verdict = "LIKELY REAL" | |
| credibility = max(credibility, 65) | |
| elif trusted_count >= 1: | |
| verdict = "LIKELY REAL" | |
| credibility = max(credibility, 55) | |
| elif bert < 0.2 and llm < 0.3 and web == 0: | |
| # Only flag as fake if ALL signals are negative | |
| verdict = "LIKELY FAKE" | |
| credibility = min(credibility, 30) | |
| elif credibility >= 50: | |
| verdict = "UNCERTAIN - Verify Manually" | |
| else: | |
| verdict = "UNCERTAIN" | |
| return { | |
| "credibility": credibility, | |
| "verdict": verdict, | |
| "bert_score": round(bert, 3), | |
| "llm_score": round(llm, 3), | |
| "web_score": round(web, 3), | |
| "sources": sources | |
| } | |
| # ===================================================== | |
| # IMAGE ANALYSIS | |
| # ===================================================== | |
| def analyze_image(image): | |
| """Complete image analysis pipeline""" | |
| extracted_text = extract_text_from_image(image) | |
| manipulation_score, concerns = llm_image_analysis(image) | |
| if not extracted_text: | |
| credibility = int((1 - manipulation_score) * 100) | |
| if manipulation_score > 0.7: | |
| verdict = "SUSPICIOUS" | |
| alert = "โ ๏ธ Image shows signs of manipulation" | |
| elif manipulation_score > 0.4: | |
| verdict = "UNVERIFIABLE" | |
| alert = "โ ๏ธ Unable to verify - no text detected" | |
| else: | |
| verdict = "NO TEXT DETECTED" | |
| alert = "โน๏ธ No readable text found in image" | |
| return { | |
| "credibility": credibility, | |
| "verdict": verdict, | |
| "alert": alert, | |
| "extracted_text": "", | |
| "bert_score": 0, | |
| "llm_score": 0, | |
| "web_score": 0, | |
| "image_manipulation_score": round(manipulation_score, 3), | |
| "concerns": concerns, | |
| "sources": [] | |
| } | |
| if not is_factual_claim(extracted_text): | |
| credibility = int((1 - manipulation_score) * 100) | |
| return { | |
| "credibility": credibility, | |
| "verdict": "NOT A FACTUAL CLAIM", | |
| "alert": "โน๏ธ Image does not contain verifiable claims", | |
| "extracted_text": extracted_text, | |
| "bert_score": 0, | |
| "llm_score": 0, | |
| "web_score": 0, | |
| "image_manipulation_score": round(manipulation_score, 3), | |
| "concerns": concerns, | |
| "sources": [] | |
| } | |
| # Full analysis | |
| bert = bert_fake_news_score(extracted_text) | |
| llm = llm_score(extracted_text) | |
| web, sources = web_search_with_sources(extracted_text) | |
| # Combined credibility (text + image) | |
| text_cred = (0.40 * bert) + (0.35 * llm) + (0.25 * web) | |
| image_cred = 1 - manipulation_score | |
| final_cred = (0.80 * text_cred) + (0.20 * image_cred) | |
| credibility = int(final_cred * 100) | |
| # Determine verdict | |
| if bert < 0.3 or manipulation_score > 0.7: | |
| verdict = "FAKE" | |
| alert = "๐จ FAKE NEWS ALERT - Content appears fabricated!" | |
| credibility = min(credibility, 20) | |
| elif manipulation_score > 0.5: | |
| verdict = "SUSPICIOUS" | |
| alert = "โ ๏ธ CAUTION - Image shows signs of manipulation" | |
| credibility = min(credibility, 45) | |
| elif web >= 0.5 and bert >= 0.6: | |
| verdict = "VERIFIED" | |
| alert = "โ VERIFIED - Content found in trusted sources" | |
| credibility = max(credibility, 80) | |
| elif bert >= 0.5: | |
| verdict = "LIKELY REAL" | |
| alert = "โ Content appears credible" | |
| else: | |
| verdict = "UNVERIFIABLE" | |
| alert = "โ ๏ธ Unable to verify - Exercise caution" | |
| return { | |
| "credibility": credibility, | |
| "verdict": verdict, | |
| "alert": alert, | |
| "extracted_text": extracted_text, | |
| "bert_score": round(bert, 3), | |
| "llm_score": round(llm, 3), | |
| "web_score": round(web, 3), | |
| "image_manipulation_score": round(manipulation_score, 3), | |
| "concerns": concerns, | |
| "sources": sources | |
| } | |
| # ===================================================== | |
| # API ENDPOINTS | |
| # ===================================================== | |
| def health_check(): | |
| """Health check with API status""" | |
| # Calculate key health status | |
| healthy_keys = sum(1 for s in key_statuses.values() if not s.is_rate_limited and s.errors_count < 3) | |
| return { | |
| "status": "healthy", | |
| "service": "VerifAI Fake News Detection", | |
| "version": "4.0.0", # Updated version | |
| "model": FAKE_NEWS_MODEL, | |
| "features": [ | |
| "Text Detection", "Image Detection", "OCR", "BERT", | |
| "Gemini Vision", "Groq Fallback", "Response Caching", "Source Links" | |
| ], | |
| "api_status": { | |
| "gemini_keys": len(GEMINI_API_KEYS), | |
| "healthy_keys": healthy_keys, | |
| "groq_fallback": "enabled", | |
| "groq_model": GROQ_MODEL | |
| }, | |
| "cache_stats": { | |
| "cached_items": len(llm_cache), | |
| "cache_ttl_hours": CACHE_TTL / 3600 | |
| } | |
| } | |
| def api_status(): | |
| """Detailed API key status for monitoring""" | |
| keys_info = [] | |
| for idx, status in key_statuses.items(): | |
| key = GEMINI_API_KEYS[idx] | |
| keys_info.append({ | |
| "key_number": idx + 1, | |
| "key_preview": f"{key[:10]}...{key[-4:]}", | |
| "requests_count": status.requests_count, | |
| "is_rate_limited": status.is_rate_limited, | |
| "errors_count": status.errors_count, | |
| "cooldown_until": str(status.cooldown_until) if status.cooldown_until else None | |
| }) | |
| return { | |
| "gemini_keys": keys_info, | |
| "groq": { | |
| "enabled": True, | |
| "model": GROQ_MODEL, | |
| "status": "ready" | |
| }, | |
| "cache": { | |
| "total_cached": len(llm_cache), | |
| "ttl_seconds": CACHE_TTL | |
| } | |
| } | |
| def check_text(request: TextRequest): | |
| """Text-based fake news detection""" | |
| if not request.text.strip(): | |
| raise HTTPException(status_code=400, detail="Text is required") | |
| return analyze_text(request.text) | |
| async def check_image(request: ImageRequest): | |
| """Image-based fake news detection (base64 input)""" | |
| try: | |
| image = decode_base64_image(request.image) | |
| return analyze_image(image) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid image: {str(e)}") | |
| async def check_image_upload(image: UploadFile = File(...)): | |
| """Image-based fake news detection (file upload)""" | |
| try: | |
| contents = await image.read() | |
| pil_image = Image.open(io.BytesIO(contents)) | |
| return analyze_image(pil_image) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid image: {str(e)}") | |
| async def extract_text(request: ImageRequest): | |
| """Extract text from image using OCR (base64 input)""" | |
| try: | |
| image = decode_base64_image(request.image) | |
| extracted_text = extract_text_from_image(image) | |
| if not extracted_text: | |
| return { | |
| "success": False, | |
| "extracted_text": "", | |
| "message": "No text detected in the image" | |
| } | |
| return { | |
| "success": True, | |
| "extracted_text": extracted_text, | |
| "message": f"Successfully extracted {len(extracted_text.split())} words" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Error extracting text: {str(e)}") | |
| async def get_trending_news(): | |
| """ | |
| Fetch top trending news across multiple categories. | |
| Returns 10-15 news items from various topics. | |
| """ | |
| categories = [ | |
| {"name": "World", "query": "world news today", "icon": "๐"}, | |
| {"name": "Politics", "query": "political news today", "icon": "๐๏ธ"}, | |
| {"name": "Tech", "query": "technology news today", "icon": "๐ป"}, | |
| {"name": "Gaming", "query": "gaming news today", "icon": "๐ฎ"}, | |
| {"name": "Science", "query": "science discovery today", "icon": "๐ฌ"}, | |
| {"name": "Health", "query": "health medical news today", "icon": "๐ฅ"}, | |
| {"name": "Sports", "query": "sports news today", "icon": "โฝ"}, | |
| {"name": "Business", "query": "business economy news today", "icon": "๐"}, | |
| {"name": "Entertainment", "query": "entertainment celebrity news today", "icon": "๐ฌ"}, | |
| ] | |
| all_news = [] | |
| for category in categories: | |
| try: | |
| with DDGS() as ddgs: | |
| # Get more news for this category with today's date emphasis | |
| results = list(ddgs.news(category["query"], max_results=5)) | |
| for r in results: | |
| # Parse the date and format it nicely | |
| raw_date = r.get("date", "") | |
| formatted_date = raw_date | |
| # Try to format the date if it exists | |
| try: | |
| from datetime import datetime | |
| if raw_date: | |
| # DuckDuckGo returns date in various formats | |
| dt = datetime.fromisoformat(raw_date.replace('Z', '+00:00')) | |
| formatted_date = dt.strftime("%b %d, %Y โข %I:%M %p") | |
| except: | |
| formatted_date = raw_date if raw_date else "Recent" | |
| # Get image URL with fallback | |
| image_url = r.get("image", "") | |
| news_item = { | |
| "title": r.get("title", "")[:150], | |
| "url": r.get("url", ""), | |
| "source": r.get("source", "Unknown"), | |
| "date": formatted_date, | |
| "raw_date": raw_date, # Include raw date for sorting | |
| "body": r.get("body", "")[:200] if r.get("body") else "", | |
| "image": image_url, | |
| "category": category["name"], | |
| "icon": category["icon"] | |
| } | |
| all_news.append(news_item) | |
| except Exception as e: | |
| print(f"[TRENDING ERROR] {category['name']}: {e}") | |
| continue | |
| # Sort by date (most recent first) - don't shuffle to keep fresh news on top | |
| # Keep within each category for better organization | |
| from datetime import datetime | |
| import random | |
| # Add timestamp for cache-busting | |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| return { | |
| "success": True, | |
| "news": all_news, # Return all news, let frontend handle pagination | |
| "total": len(all_news), | |
| "categories": [c["name"] for c in categories], | |
| "fetched_at": current_time | |
| } | |
| # ===================================================== | |
| # MAIN | |
| # ===================================================== | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |