File size: 19,544 Bytes
63d7edb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
import os
import io
import json
import requests
from typing import Dict, List, Any, Optional
from fastapi import FastAPI, HTTPException, Body
from pydantic import BaseModel
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from newspaper import Article
from bs4 import BeautifulSoup
import easyocr
from PIL import Image
import google.generativeai as genai
from datetime import datetime
import logging
from fastapi.middleware.cors import CORSMiddleware

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("TruthLens")

# Initialize FastAPI app
app = FastAPI(title="TruthLens Backend")

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Load Hugging Face model (RoBERTa-based fake news detector)
MODEL_NAME = "Pulk17/Fake-News-Detection"
tokenizer = None
model = None

def load_model():
    """Lazy load the Hugging Face model"""
    global tokenizer, model
    if tokenizer is None or model is None:
        print("Loading Hugging Face model...")
        tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
        model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
        model.eval()
    return tokenizer, model

# Environment variables
# Ensure you set these in your environment or .env file
GOOGLE_FACT_CHECK_API_KEY = os.environ.get('GOOGLE_FACT_CHECK_API_KEY', '')
HIVE_API_KEY = os.environ.get('HIVE_API_KEY', '')
GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY', '')

# Initialize Gemini
if GEMINI_API_KEY:
    genai.configure(api_key=GEMINI_API_KEY)
    gemini_model = genai.GenerativeModel('gemini-pro')
else:
    gemini_model = None

# Request Models
class TextRequest(BaseModel):
    text: str

class UrlRequest(BaseModel):
    url: str

class ImageRequest(BaseModel):
    image_url: str

# Source Credibility Database
CREDIBLE_SOURCES = {
    "apnews.com": "Associated Press",
    "reuters.com": "Reuters",
    "bbc.com": "BBC News",
    "bbc.co.uk": "BBC News",
    "nytimes.com": "The New York Times",
    "npr.org": "NPR",
    "pbs.org": "PBS NewsHour",
    "wsj.com": "The Wall Street Journal",
    "bloomberg.com": "Bloomberg",
    "theguardian.com": "The Guardian",
    "washingtonpost.com": "The Washington Post",
    "propublica.org": "ProPublica",
    "aljazeera.com": "Al Jazeera",
    "economist.com": "The Economist",
    "forbes.com": "Forbes"
}

SATIRE_SOURCES = {
    "theonion.com": "The Onion",
    "babylonbee.com": "The Babylon Bee",
    "clickhole.com": "ClickHole",
    "newyorker.com/humor/borowitz-report": "The Borowitz Report",
    "thebeaverton.com": "The Beaverton",
    "cracked.com": "Cracked",
    "dailymash.co.uk": "The Daily Mash",
    "waterfordwhispersnews.com": "Waterford Whispers News"
}

# Helper functions
def get_fact_checks(text: str) -> List[Dict[str, str]]:
    """Get fact checks from Google Fact Check Tools API"""
    if not GOOGLE_FACT_CHECK_API_KEY:
        return []
    
    try:
        # Extract key claims (first 100 chars as query)
        query = text[:100]
        
        url = "https://factchecktools.googleapis.com/v1alpha1/claims:search"
        params = {
            "query": query,
            "key": GOOGLE_FACT_CHECK_API_KEY,
            "languageCode": "en"
        }
        
        response = requests.get(url, params=params, timeout=10)
        
        if response.status_code == 200:
            data = response.json()
            claims = data.get('claims', [])
            
            fact_checks = []
            for claim in claims[:3]:  # Top 3 fact checks
                fact_check = {
                    "claim": claim.get('text', ''),
                    "claimant": claim.get('claimant', ''),
                    "rating": claim.get('claimReview', [{}])[0].get('textualRating', 'Unknown'),
                    "url": claim.get('claimReview', [{}])[0].get('url', '')
                }
                fact_checks.append(fact_check)
            
            return fact_checks
        else:
            print(f"Fact check API error: {response.status_code}")
            return []
            
    except Exception as e:
        print(f"Error getting fact checks: {e}")
        return []

def extract_claims_with_gemini(text: str) -> List[str]:
    """Use Gemini to extract key factual claims for building a search query"""
    if not gemini_model:
        return [text[:100]]
    
    try:
        prompt = f"""
        Extract the single most important factual claim from the following text that can be used to search in a fact-check database. 
        Output ONLY the extracted claim string, nothing else.
        
        Text: {text[:1000]}
        """
        response = gemini_model.generate_content(prompt)
        if response and hasattr(response, 'text'):
            claim = response.text.strip()
            return [claim] if claim else [text[:100]]
        return [text[:100]]
    except Exception as e:
        print(f"Gemini claim extraction error: {e}")
        return [text[:100]]

def generate_explanation_with_gemini(text: str, label: str, confidence: float, fact_checks: List[Dict]) -> str:
    """Use Gemini to explain the reasoning behind the detection result"""
    if not gemini_model:
        return f"The news has been classified as {label} with {confidence:.2%} confidence."
    
    try:
        fact_check_context = ""
        if fact_checks:
            fact_check_context = "Relevant fact checks found:\n" + "\n".join([f"- {fc['claim']} (Rating: {fc['rating']})" for fc in fact_checks])
        
        prompt = f"""
        Act as a professional fact-checker for an app called TruthLens. 
        Analyze the following news text and the AI detection result.
        
        News Text: {text[:1000]}
        AI Classification: {label}
        Confidence: {confidence:.2%}
        {fact_check_context}
        
        Provide a concise, human-readable explanation (2-3 sentences) explaining why this news is likely {label}. 
        Focus on style, source (if present), or specific fact-check evidence.
        """
        response = gemini_model.generate_content(prompt)
        if response and hasattr(response, 'text'):
            return response.text.strip()
        return f"The model identified this content as {label} with {confidence:.2%} confidence."
    except Exception as e:
        print(f"Gemini explanation error: {e}")
        return f"Analysis complete: The model identified this content as {label}."

def detect_ai_image(image_bytes: bytes) -> Dict[str, Any]:
    """Detect AI-generated content using Hive Moderation API"""
    if not HIVE_API_KEY:
        return {"probability": 0.0, "generator": None}
    
    try:
        url = "https://api.hivemoderation.com/v2/task/sync"
        
        headers = {
            "Authorization": f"Token {HIVE_API_KEY}",
            "Content-Type": "application/json"
        }
        
        # Convert image to base64
        import base64
        image_b64 = base64.b64encode(image_bytes).decode('utf-8')
        
        payload = {
            "image": image_b64,
            "models": ["ai_generated"]
        }
        
        response = requests.post(url, headers=headers, json=payload, timeout=30)
        
        if response.status_code == 200:
            data = response.json()
            ai_generated = data.get('status', [{}])[0].get('response', {}).get('output', [{}])[0]
            
            return {
                "probability": ai_generated.get('score', 0.0),
                "generator": ai_generated.get('class', None)
            }
        else:
            print(f"Hive API error: {response.status_code}")
            return {"probability": 0.0, "generator": None}
            
    except Exception as e:
        print(f"Error detecting AI image: {e}")
        return {"probability": 0.0, "generator": None}

def calculate_risk_level(
    label: str,
    confidence: float,
    fact_checks: List[Dict],
    image_ai_result: Optional[Dict] = None
) -> str:
    """Calculate overall risk level"""
    
    # Base risk on label and confidence
    if label == "FAKE" and confidence > 0.8:
        base_risk = "high"
    elif label == "FAKE" and confidence > 0.5:
        base_risk = "medium"
    elif label == "REAL" and confidence > 0.8:
        base_risk = "low"
    else:
        base_risk = "medium"
    
    # Adjust based on fact checks
    if fact_checks:
        fake_ratings = sum(1 for fc in fact_checks if 'false' in fc['rating'].lower() or 'fake' in fc['rating'].lower() or 'satire' in fc['rating'].lower())
        if fake_ratings >= 2:
            base_risk = "high"
    
    # Adjust based on AI image detection
    if image_ai_result and image_ai_result['probability'] > 0.7:
        if base_risk == "low":
            base_risk = "medium"
        elif base_risk == "medium":
            base_risk = "high"
    
    return base_risk

def check_source_credibility(url: str) -> Dict[str, Any]:
    """Check if the URL belongs to a known credible or satire source"""
    from urllib.parse import urlparse
    
    try:
        domain = urlparse(url).netloc.lower()
        if domain.startswith("www."):
            domain = domain[4:]
            
        # Check Satire first
        for satire_domain, name in SATIRE_SOURCES.items():
            if satire_domain in url.lower():
                return {"status": "satire", "name": name, "label": "FAKE", "confidence": 1.0}
        
        # Check Credible
        if domain in CREDIBLE_SOURCES:
            return {"status": "credible", "name": CREDIBLE_SOURCES[domain], "label": "REAL", "confidence": 0.95}
            
        return {"status": "unknown", "name": None, "label": None, "confidence": 0.0}
    except Exception as e:
        print(f"Error checking credibility: {e}")
        return {"status": "unknown", "name": None, "label": None, "confidence": 0.0}

def extract_article_text(url: str) -> str:
    """Extract article text from URL using newspaper3k"""
    try:
        article = Article(url)
        article.download()
        article.parse()
        return article.text
    except Exception as e:
        print(f"Error extracting article with newspaper3k: {e}")
        # Fallback to BeautifulSoup
        try:
            response = requests.get(url, timeout=30)
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # Remove script and style elements
            for script in soup(["script", "style"]):
                script.decompose()
            
            # Get text
            text = soup.get_text()
            
            # Clean up whitespace
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            text = ' '.join(chunk for chunk in chunks if chunk)
            
            return text
        except Exception as e2:
            logger.error(f"Error with BeautifulSoup fallback: {e2}")
            return ""

@app.get("/")
async def root():
    return {"status": "healthy", "service": "TruthLens API"}

# Endpoints

@app.post("/detect-text")
async def detect_text(request: TextRequest, skip_extras: bool = False):
    """Detect fake news in text with optional fact-check and explanation"""
    try:
        text = request.text
        if not text:
            raise HTTPException(status_code=400, detail="Text is required")
        
        # Load model
        tok, mdl = load_model()
        
        # Tokenize and predict
        inputs = tok(text, return_tensors="pt", truncation=True, max_length=512, padding=True)
        
        with torch.no_grad():
            outputs = mdl(**inputs)
            logits = outputs.logits
            probabilities = torch.softmax(logits, dim=1)
            prediction = torch.argmax(probabilities, dim=1).item()
            confidence = probabilities[0][prediction].item()
        
        label = "REAL" if prediction == 1 else "FAKE"
        
        if skip_extras:
            return {
                "input_type": "text",
                "label": label,
                "confidence": confidence,
                "timestamp": datetime.now().isoformat()
            }
        
        # Enhanced Fact Checking with Gemini
        extracted_claims = extract_claims_with_gemini(text)
        fact_checks = get_fact_checks(extracted_claims[0])
        
        # Gemini Explanation
        explanation = generate_explanation_with_gemini(text, label, confidence, fact_checks)
        
        risk_level = calculate_risk_level(label, confidence, fact_checks)
        
        return {
            "input_type": "text",
            "text": text,
            "label": label,
            "confidence": confidence,
            "explanation": explanation,
            "fact_checks": fact_checks,
            "risk_level": risk_level,
            "timestamp": datetime.now().isoformat()
        }
        
    except Exception as e:
        print(f"Error in detect_text: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/detect-url")
async def detect_url(request: UrlRequest, skip_extras: bool = False):
    """Detect fake news in URL with optional fact-check and explanation"""
    try:
        url = request.url
        if not url:
            raise HTTPException(status_code=400, detail="URL is required")
        
        # 1. Check Source Credibility First
        source_info = check_source_credibility(url)
        
        article_text = extract_article_text(url)
        if not article_text:
            raise HTTPException(status_code=400, detail="Failed to extract article text from URL")
        
        tok, mdl = load_model()
        
        inputs = tok(article_text, return_tensors="pt", truncation=True, max_length=512, padding=True)
        
        with torch.no_grad():
            outputs = mdl(**inputs)
            logits = outputs.logits
            probabilities = torch.softmax(logits, dim=1)
            prediction = torch.argmax(probabilities, dim=1).item()
            confidence = probabilities[0][prediction].item()
        
        label = "REAL" if prediction == 1 else "FAKE"
        
        # Override with source credibility if it's definitive
        if source_info["status"] == "satire":
            label = "FAKE"
            confidence = 1.0
        elif source_info["status"] == "credible" and label == "FAKE":
            # If a credible source is flagged as fake, we lower risk but keep label
            # or we could trust the source more. Let's provide it in metadata.
            pass
        
        if skip_extras:
            return {
                "input_type": "url",
                "label": label,
                "confidence": confidence,
                "source_metadata": source_info,
                "timestamp": datetime.now().isoformat()
            }
        
        # Enhanced Fact Checking with Gemini
        extracted_claims = extract_claims_with_gemini(article_text)
        fact_checks = get_fact_checks(extracted_claims[0])
        
        # Gemini Explanation
        explanation = generate_explanation_with_gemini(
            f"Source: {source_info['name'] if source_info['name'] else 'Unknown'}. Content: {article_text}", 
            label, 
            confidence, 
            fact_checks
        )
        
        risk_level = calculate_risk_level(label, confidence, fact_checks)
        
        return {
            "input_type": "url",
            "url": url,
            "source_metadata": source_info,
            "text": article_text[:500],
            "label": label,
            "confidence": confidence,
            "explanation": explanation,
            "fact_checks": fact_checks,
            "risk_level": risk_level,
            "timestamp": datetime.now().isoformat()
        }
        
    except Exception as e:
        print(f"Error in detect_url: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/detect-image")
async def detect_image(request: ImageRequest, skip_extras: bool = False):
    try:
        image_url = request.image_url
        logger.info(f"Processing image: {image_url}")
        if not image_url:
            raise HTTPException(status_code=400, detail="Image URL is required")
        
        response = requests.get(image_url, timeout=30)
        response.raise_for_status()
        image_bytes = response.content
        
        reader = easyocr.Reader(['en'])
        # EasyOCR can read from bytes directly
        ocr_results = reader.readtext(image_bytes)
        extracted_text = ' '.join([result[1] for result in ocr_results])
        
        image_ai_result = detect_ai_image(image_bytes)
        
        if extracted_text.strip():
            tok, mdl = load_model()
            inputs = tok(extracted_text, return_tensors="pt", truncation=True, max_length=512, padding=True)
            
            with torch.no_grad():
                outputs = mdl(**inputs)
                logits = outputs.logits
                probabilities = torch.softmax(logits, dim=1)
                prediction = torch.argmax(probabilities, dim=1).item()
                confidence = probabilities[0][prediction].item()
            
            label = "REAL" if prediction == 1 else "FAKE"
            
            if skip_extras:
                return {
                    "input_type": "image",
                    "label": label,
                    "confidence": confidence,
                    "image_ai_result": image_ai_result,
                    "timestamp": datetime.now().isoformat()
                }
                
            # Enhanced Fact Checking with Gemini
            extracted_claims = extract_claims_with_gemini(extracted_text)
            fact_checks = get_fact_checks(extracted_claims[0])
        else:
            label = "FAKE" if image_ai_result['probability'] > 0.7 else "REAL"
            confidence = image_ai_result['probability'] if label == "FAKE" else (1 - image_ai_result['probability'])
            fact_checks = []
            
            if skip_extras:
                return {
                    "input_type": "image",
                    "label": label,
                    "confidence": confidence,
                    "image_ai_result": image_ai_result,
                    "timestamp": datetime.now().isoformat()
                }
        
        # Gemini Explanation
        explanation = generate_explanation_with_gemini(extracted_text if extracted_text else "No text found in image", label, confidence, fact_checks)
        
        risk_level = calculate_risk_level(label, confidence, fact_checks, image_ai_result)
        
        return {
            "input_type": "image",
            "image_url": image_url,
            "text": extracted_text[:500] if extracted_text else None,
            "label": label,
            "confidence": confidence,
            "explanation": explanation,
            "fact_checks": fact_checks,
            "image_ai_result": image_ai_result,
            "risk_level": risk_level,
            "timestamp": datetime.now().isoformat()
        }
        
    except Exception as e:
        print(f"Error in detect_image: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)