Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import json | |
| import requests | |
| from typing import Dict, List, Any, Optional | |
| from fastapi import FastAPI, HTTPException, Body | |
| from pydantic import BaseModel | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| from newspaper import Article | |
| from bs4 import BeautifulSoup | |
| import easyocr | |
| from PIL import Image | |
| import google.generativeai as genai | |
| from datetime import datetime | |
| import logging | |
| from fastapi.middleware.cors import CORSMiddleware | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger("TruthLens") | |
| # Initialize FastAPI app | |
| app = FastAPI(title="TruthLens Backend") | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Load Hugging Face model (RoBERTa-based fake news detector) | |
| MODEL_NAME = "Pulk17/Fake-News-Detection" | |
| tokenizer = None | |
| model = None | |
| def load_model(): | |
| """Lazy load the Hugging Face model""" | |
| global tokenizer, model | |
| if tokenizer is None or model is None: | |
| print("Loading Hugging Face model...") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME) | |
| model.eval() | |
| return tokenizer, model | |
| # Environment variables | |
| # Ensure you set these in your environment or .env file | |
| GOOGLE_FACT_CHECK_API_KEY = os.environ.get('GOOGLE_FACT_CHECK_API_KEY', '') | |
| HIVE_API_KEY = os.environ.get('HIVE_API_KEY', '') | |
| GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY', '') | |
| # Initialize Gemini | |
| if GEMINI_API_KEY: | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| gemini_model = genai.GenerativeModel('gemini-pro') | |
| else: | |
| gemini_model = None | |
| # Request Models | |
| class TextRequest(BaseModel): | |
| text: str | |
| class UrlRequest(BaseModel): | |
| url: str | |
| class ImageRequest(BaseModel): | |
| image_url: str | |
| # Source Credibility Database | |
| CREDIBLE_SOURCES = { | |
| "apnews.com": "Associated Press", | |
| "reuters.com": "Reuters", | |
| "bbc.com": "BBC News", | |
| "bbc.co.uk": "BBC News", | |
| "nytimes.com": "The New York Times", | |
| "npr.org": "NPR", | |
| "pbs.org": "PBS NewsHour", | |
| "wsj.com": "The Wall Street Journal", | |
| "bloomberg.com": "Bloomberg", | |
| "theguardian.com": "The Guardian", | |
| "washingtonpost.com": "The Washington Post", | |
| "propublica.org": "ProPublica", | |
| "aljazeera.com": "Al Jazeera", | |
| "economist.com": "The Economist", | |
| "forbes.com": "Forbes" | |
| } | |
| SATIRE_SOURCES = { | |
| "theonion.com": "The Onion", | |
| "babylonbee.com": "The Babylon Bee", | |
| "clickhole.com": "ClickHole", | |
| "newyorker.com/humor/borowitz-report": "The Borowitz Report", | |
| "thebeaverton.com": "The Beaverton", | |
| "cracked.com": "Cracked", | |
| "dailymash.co.uk": "The Daily Mash", | |
| "waterfordwhispersnews.com": "Waterford Whispers News" | |
| } | |
| # Helper functions | |
| def get_fact_checks(text: str) -> List[Dict[str, str]]: | |
| """Get fact checks from Google Fact Check Tools API""" | |
| if not GOOGLE_FACT_CHECK_API_KEY: | |
| return [] | |
| try: | |
| # Extract key claims (first 100 chars as query) | |
| query = text[:100] | |
| url = "https://factchecktools.googleapis.com/v1alpha1/claims:search" | |
| params = { | |
| "query": query, | |
| "key": GOOGLE_FACT_CHECK_API_KEY, | |
| "languageCode": "en" | |
| } | |
| response = requests.get(url, params=params, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| claims = data.get('claims', []) | |
| fact_checks = [] | |
| for claim in claims[:3]: # Top 3 fact checks | |
| fact_check = { | |
| "claim": claim.get('text', ''), | |
| "claimant": claim.get('claimant', ''), | |
| "rating": claim.get('claimReview', [{}])[0].get('textualRating', 'Unknown'), | |
| "url": claim.get('claimReview', [{}])[0].get('url', '') | |
| } | |
| fact_checks.append(fact_check) | |
| return fact_checks | |
| else: | |
| print(f"Fact check API error: {response.status_code}") | |
| return [] | |
| except Exception as e: | |
| print(f"Error getting fact checks: {e}") | |
| return [] | |
| def extract_claims_with_gemini(text: str) -> List[str]: | |
| """Use Gemini to extract key factual claims for building a search query""" | |
| if not gemini_model: | |
| return [text[:100]] | |
| try: | |
| prompt = f""" | |
| Extract the single most important factual claim from the following text that can be used to search in a fact-check database. | |
| Output ONLY the extracted claim string, nothing else. | |
| Text: {text[:1000]} | |
| """ | |
| response = gemini_model.generate_content(prompt) | |
| if response and hasattr(response, 'text'): | |
| claim = response.text.strip() | |
| return [claim] if claim else [text[:100]] | |
| return [text[:100]] | |
| except Exception as e: | |
| print(f"Gemini claim extraction error: {e}") | |
| return [text[:100]] | |
| def generate_explanation_with_gemini(text: str, label: str, confidence: float, fact_checks: List[Dict]) -> str: | |
| """Use Gemini to explain the reasoning behind the detection result""" | |
| if not gemini_model: | |
| return f"The news has been classified as {label} with {confidence:.2%} confidence." | |
| try: | |
| fact_check_context = "" | |
| if fact_checks: | |
| fact_check_context = "Relevant fact checks found:\n" + "\n".join([f"- {fc['claim']} (Rating: {fc['rating']})" for fc in fact_checks]) | |
| prompt = f""" | |
| Act as a professional fact-checker for an app called TruthLens. | |
| Analyze the following news text and the AI detection result. | |
| News Text: {text[:1000]} | |
| AI Classification: {label} | |
| Confidence: {confidence:.2%} | |
| {fact_check_context} | |
| Provide a concise, human-readable explanation (2-3 sentences) explaining why this news is likely {label}. | |
| Focus on style, source (if present), or specific fact-check evidence. | |
| """ | |
| response = gemini_model.generate_content(prompt) | |
| if response and hasattr(response, 'text'): | |
| return response.text.strip() | |
| return f"The model identified this content as {label} with {confidence:.2%} confidence." | |
| except Exception as e: | |
| print(f"Gemini explanation error: {e}") | |
| return f"Analysis complete: The model identified this content as {label}." | |
| def detect_ai_image(image_bytes: bytes) -> Dict[str, Any]: | |
| """Detect AI-generated content using Hive Moderation API""" | |
| if not HIVE_API_KEY: | |
| return {"probability": 0.0, "generator": None} | |
| try: | |
| url = "https://api.hivemoderation.com/v2/task/sync" | |
| headers = { | |
| "Authorization": f"Token {HIVE_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| # Convert image to base64 | |
| import base64 | |
| image_b64 = base64.b64encode(image_bytes).decode('utf-8') | |
| payload = { | |
| "image": image_b64, | |
| "models": ["ai_generated"] | |
| } | |
| response = requests.post(url, headers=headers, json=payload, timeout=30) | |
| if response.status_code == 200: | |
| data = response.json() | |
| ai_generated = data.get('status', [{}])[0].get('response', {}).get('output', [{}])[0] | |
| return { | |
| "probability": ai_generated.get('score', 0.0), | |
| "generator": ai_generated.get('class', None) | |
| } | |
| else: | |
| print(f"Hive API error: {response.status_code}") | |
| return {"probability": 0.0, "generator": None} | |
| except Exception as e: | |
| print(f"Error detecting AI image: {e}") | |
| return {"probability": 0.0, "generator": None} | |
| def calculate_risk_level( | |
| label: str, | |
| confidence: float, | |
| fact_checks: List[Dict], | |
| image_ai_result: Optional[Dict] = None | |
| ) -> str: | |
| """Calculate overall risk level""" | |
| # Base risk on label and confidence | |
| if label == "FAKE" and confidence > 0.8: | |
| base_risk = "high" | |
| elif label == "FAKE" and confidence > 0.5: | |
| base_risk = "medium" | |
| elif label == "REAL" and confidence > 0.8: | |
| base_risk = "low" | |
| else: | |
| base_risk = "medium" | |
| # Adjust based on fact checks | |
| if fact_checks: | |
| fake_ratings = sum(1 for fc in fact_checks if 'false' in fc['rating'].lower() or 'fake' in fc['rating'].lower() or 'satire' in fc['rating'].lower()) | |
| if fake_ratings >= 2: | |
| base_risk = "high" | |
| # Adjust based on AI image detection | |
| if image_ai_result and image_ai_result['probability'] > 0.7: | |
| if base_risk == "low": | |
| base_risk = "medium" | |
| elif base_risk == "medium": | |
| base_risk = "high" | |
| return base_risk | |
| def check_source_credibility(url: str) -> Dict[str, Any]: | |
| """Check if the URL belongs to a known credible or satire source""" | |
| from urllib.parse import urlparse | |
| try: | |
| domain = urlparse(url).netloc.lower() | |
| if domain.startswith("www."): | |
| domain = domain[4:] | |
| # Check Satire first | |
| for satire_domain, name in SATIRE_SOURCES.items(): | |
| if satire_domain in url.lower(): | |
| return {"status": "satire", "name": name, "label": "FAKE", "confidence": 1.0} | |
| # Check Credible | |
| if domain in CREDIBLE_SOURCES: | |
| return {"status": "credible", "name": CREDIBLE_SOURCES[domain], "label": "REAL", "confidence": 0.95} | |
| return {"status": "unknown", "name": None, "label": None, "confidence": 0.0} | |
| except Exception as e: | |
| print(f"Error checking credibility: {e}") | |
| return {"status": "unknown", "name": None, "label": None, "confidence": 0.0} | |
| def extract_article_text(url: str) -> str: | |
| """Extract article text from URL using newspaper3k""" | |
| try: | |
| article = Article(url) | |
| article.download() | |
| article.parse() | |
| return article.text | |
| except Exception as e: | |
| print(f"Error extracting article with newspaper3k: {e}") | |
| # Fallback to BeautifulSoup | |
| try: | |
| response = requests.get(url, timeout=30) | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| # Get text | |
| text = soup.get_text() | |
| # Clean up whitespace | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = ' '.join(chunk for chunk in chunks if chunk) | |
| return text | |
| except Exception as e2: | |
| logger.error(f"Error with BeautifulSoup fallback: {e2}") | |
| return "" | |
| async def root(): | |
| return {"status": "healthy", "service": "TruthLens API"} | |
| # Endpoints | |
| async def detect_text(request: TextRequest, skip_extras: bool = False): | |
| """Detect fake news in text with optional fact-check and explanation""" | |
| try: | |
| text = request.text | |
| if not text: | |
| raise HTTPException(status_code=400, detail="Text is required") | |
| # Load model | |
| tok, mdl = load_model() | |
| # Tokenize and predict | |
| inputs = tok(text, return_tensors="pt", truncation=True, max_length=512, padding=True) | |
| with torch.no_grad(): | |
| outputs = mdl(**inputs) | |
| logits = outputs.logits | |
| probabilities = torch.softmax(logits, dim=1) | |
| prediction = torch.argmax(probabilities, dim=1).item() | |
| confidence = probabilities[0][prediction].item() | |
| label = "REAL" if prediction == 1 else "FAKE" | |
| if skip_extras: | |
| return { | |
| "input_type": "text", | |
| "label": label, | |
| "confidence": confidence, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Enhanced Fact Checking with Gemini | |
| extracted_claims = extract_claims_with_gemini(text) | |
| fact_checks = get_fact_checks(extracted_claims[0]) | |
| # Gemini Explanation | |
| explanation = generate_explanation_with_gemini(text, label, confidence, fact_checks) | |
| risk_level = calculate_risk_level(label, confidence, fact_checks) | |
| return { | |
| "input_type": "text", | |
| "text": text, | |
| "label": label, | |
| "confidence": confidence, | |
| "explanation": explanation, | |
| "fact_checks": fact_checks, | |
| "risk_level": risk_level, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| print(f"Error in detect_text: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def detect_url(request: UrlRequest, skip_extras: bool = False): | |
| """Detect fake news in URL with optional fact-check and explanation""" | |
| try: | |
| url = request.url | |
| if not url: | |
| raise HTTPException(status_code=400, detail="URL is required") | |
| # 1. Check Source Credibility First | |
| source_info = check_source_credibility(url) | |
| article_text = extract_article_text(url) | |
| if not article_text: | |
| raise HTTPException(status_code=400, detail="Failed to extract article text from URL") | |
| tok, mdl = load_model() | |
| inputs = tok(article_text, return_tensors="pt", truncation=True, max_length=512, padding=True) | |
| with torch.no_grad(): | |
| outputs = mdl(**inputs) | |
| logits = outputs.logits | |
| probabilities = torch.softmax(logits, dim=1) | |
| prediction = torch.argmax(probabilities, dim=1).item() | |
| confidence = probabilities[0][prediction].item() | |
| label = "REAL" if prediction == 1 else "FAKE" | |
| # Override with source credibility if it's definitive | |
| if source_info["status"] == "satire": | |
| label = "FAKE" | |
| confidence = 1.0 | |
| elif source_info["status"] == "credible" and label == "FAKE": | |
| # If a credible source is flagged as fake, we lower risk but keep label | |
| # or we could trust the source more. Let's provide it in metadata. | |
| pass | |
| if skip_extras: | |
| return { | |
| "input_type": "url", | |
| "label": label, | |
| "confidence": confidence, | |
| "source_metadata": source_info, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Enhanced Fact Checking with Gemini | |
| extracted_claims = extract_claims_with_gemini(article_text) | |
| fact_checks = get_fact_checks(extracted_claims[0]) | |
| # Gemini Explanation | |
| explanation = generate_explanation_with_gemini( | |
| f"Source: {source_info['name'] if source_info['name'] else 'Unknown'}. Content: {article_text}", | |
| label, | |
| confidence, | |
| fact_checks | |
| ) | |
| risk_level = calculate_risk_level(label, confidence, fact_checks) | |
| return { | |
| "input_type": "url", | |
| "url": url, | |
| "source_metadata": source_info, | |
| "text": article_text[:500], | |
| "label": label, | |
| "confidence": confidence, | |
| "explanation": explanation, | |
| "fact_checks": fact_checks, | |
| "risk_level": risk_level, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| print(f"Error in detect_url: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def detect_image(request: ImageRequest, skip_extras: bool = False): | |
| try: | |
| image_url = request.image_url | |
| logger.info(f"Processing image: {image_url}") | |
| if not image_url: | |
| raise HTTPException(status_code=400, detail="Image URL is required") | |
| response = requests.get(image_url, timeout=30) | |
| response.raise_for_status() | |
| image_bytes = response.content | |
| reader = easyocr.Reader(['en']) | |
| # EasyOCR can read from bytes directly | |
| ocr_results = reader.readtext(image_bytes) | |
| extracted_text = ' '.join([result[1] for result in ocr_results]) | |
| image_ai_result = detect_ai_image(image_bytes) | |
| if extracted_text.strip(): | |
| tok, mdl = load_model() | |
| inputs = tok(extracted_text, return_tensors="pt", truncation=True, max_length=512, padding=True) | |
| with torch.no_grad(): | |
| outputs = mdl(**inputs) | |
| logits = outputs.logits | |
| probabilities = torch.softmax(logits, dim=1) | |
| prediction = torch.argmax(probabilities, dim=1).item() | |
| confidence = probabilities[0][prediction].item() | |
| label = "REAL" if prediction == 1 else "FAKE" | |
| if skip_extras: | |
| return { | |
| "input_type": "image", | |
| "label": label, | |
| "confidence": confidence, | |
| "image_ai_result": image_ai_result, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Enhanced Fact Checking with Gemini | |
| extracted_claims = extract_claims_with_gemini(extracted_text) | |
| fact_checks = get_fact_checks(extracted_claims[0]) | |
| else: | |
| label = "FAKE" if image_ai_result['probability'] > 0.7 else "REAL" | |
| confidence = image_ai_result['probability'] if label == "FAKE" else (1 - image_ai_result['probability']) | |
| fact_checks = [] | |
| if skip_extras: | |
| return { | |
| "input_type": "image", | |
| "label": label, | |
| "confidence": confidence, | |
| "image_ai_result": image_ai_result, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Gemini Explanation | |
| explanation = generate_explanation_with_gemini(extracted_text if extracted_text else "No text found in image", label, confidence, fact_checks) | |
| risk_level = calculate_risk_level(label, confidence, fact_checks, image_ai_result) | |
| return { | |
| "input_type": "image", | |
| "image_url": image_url, | |
| "text": extracted_text[:500] if extracted_text else None, | |
| "label": label, | |
| "confidence": confidence, | |
| "explanation": explanation, | |
| "fact_checks": fact_checks, | |
| "image_ai_result": image_ai_result, | |
| "risk_level": risk_level, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| print(f"Error in detect_image: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |