| | from typing import Optional, Tuple |
| | from fastapi import FastAPI, UploadFile, File, Form |
| | from fastapi.responses import FileResponse |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from PIL import Image, ExifTags |
| | import io |
| | import hashlib |
| | import httpx |
| | import os |
| | import base64 |
| | import json |
| | import asyncio |
| | import cv2 |
| | import tempfile |
| | import fitz |
| | import pypdf |
| |
|
| | |
| | GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| | HF_API_KEY = os.getenv("HF_API_KEY") |
| | GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions" |
| | ROBERTA_FAKE_NEWS_URL = "https://router.huggingface.co/hf-inference/models/hamzab/roberta-fake-news-classification" |
| | ROBERTA_AI_TEXT_URL = "https://router.huggingface.co/hf-inference/models/openai-community/roberta-base-openai-detector" |
| |
|
| | |
| | app = FastAPI() |
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | @app.get("/") |
| | def read_root(): |
| | return FileResponse("index.html") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def calculate_sha256(contents: bytes): |
| | return hashlib.sha256(contents).hexdigest() |
| |
|
| |
|
| | def calculate_metadata_risk(image: Image.Image): |
| | risk = 0.0 |
| | try: |
| | exif = image._getexif() |
| | if exif is None: |
| | risk += 0.1 |
| | else: |
| | for tag, value in exif.items(): |
| | decoded = ExifTags.TAGS.get(tag, tag) |
| | if decoded == "Software": |
| | risk += 0.2 |
| | except Exception: |
| | risk += 0.1 |
| | return min(risk, 1.0) |
| |
|
| |
|
| | def fusion_score(model_score: float, metadata_risk: float): |
| | final = 0.9 * model_score + 0.1 * metadata_risk |
| | authenticity = (1 - final) * 100 |
| | fake = final * 100 |
| | return authenticity, fake |
| |
|
| |
|
| | def normalize_output(label_prob_dict: dict) -> float: |
| | FAKE_KEYWORDS = ["fake", "ai", "generated", "manipulated", "deepfake", "artificial", "synthetic", "machine"] |
| | REAL_KEYWORDS = ["real", "authentic", "genuine", "human", "original"] |
| |
|
| | fake_score = 0.0 |
| | uncertain_score = 0.0 |
| |
|
| | for label, prob in label_prob_dict.items(): |
| | label_lower = label.lower() |
| | if any(k in label_lower for k in FAKE_KEYWORDS): |
| | fake_score += prob |
| | elif any(k in label_lower for k in REAL_KEYWORDS): |
| | pass |
| | else: |
| | uncertain_score += prob |
| |
|
| | fake_score += 0.4 * uncertain_score |
| | return min(fake_score, 1.0) |
| |
|
| |
|
| | def make_confidence(authenticity, fake): |
| | diff = abs(authenticity - fake) |
| | return "low" if diff < 20 else "medium" if diff < 40 else "high" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | async def call_groq_vision(contents: bytes) -> Tuple[Optional[float], str]: |
| | if not GROQ_API_KEY: |
| | print("No GROQ_API_KEY set") |
| | return None, "" |
| | try: |
| | base64_image = base64.b64encode(contents).decode('utf-8') |
| | payload = { |
| | "model": "meta-llama/llama-4-scout-17b-16e-instruct", |
| | "messages": [ |
| | { |
| | "role": "user", |
| | "content": [ |
| | { |
| | "type": "text", |
| | "text": """You are a forensic image analyst expert. Analyze this image for signs of AI generation or manipulation. |
| | |
| | Look for: |
| | - Unnatural skin texture or too-perfect features |
| | - Inconsistent lighting or shadows |
| | - Background anomalies or blurring |
| | - Artifacts typical of diffusion models (Midjourney, DALL-E, Stable Diffusion) |
| | - Overly smooth or painterly textures |
| | - Unnatural hair or eye details |
| | - Signs of face swapping or deepfake manipulation |
| | - EXIF/compression patterns typical of AI tools |
| | |
| | Respond ONLY in this exact JSON format, nothing else: |
| | {"fake_probability": 0.0, "reasoning": "brief reason"} |
| | |
| | fake_probability must be between 0.0 (definitely real) and 1.0 (definitely AI/fake).""" |
| | }, |
| | { |
| | "type": "image_url", |
| | "image_url": { |
| | "url": f"data:image/jpeg;base64,{base64_image}" |
| | } |
| | } |
| | ] |
| | } |
| | ], |
| | "max_tokens": 200, |
| | "temperature": 0.1 |
| | } |
| |
|
| | async with httpx.AsyncClient(timeout=30.0) as client: |
| | response = await client.post( |
| | GROQ_API_URL, |
| | headers={ |
| | "Authorization": f"Bearer {GROQ_API_KEY}", |
| | "Content-Type": "application/json" |
| | }, |
| | json=payload |
| | ) |
| | response.raise_for_status() |
| | data = response.json() |
| | text = data["choices"][0]["message"]["content"] |
| | print(f"Groq vision response: {text}") |
| | clean = text.strip().replace("```json", "").replace("```", "") |
| | result = json.loads(clean) |
| | return float(result["fake_probability"]), result.get("reasoning", "") |
| |
|
| | except Exception as e: |
| | print(f"Groq vision failed: {e}") |
| | return None, "" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | async def call_groq_text(text: str) -> Tuple[Optional[float], str]: |
| | if not GROQ_API_KEY: |
| | return None, "" |
| | try: |
| | payload = { |
| | "model": "llama-3.3-70b-versatile", |
| | "messages": [ |
| | { |
| | "role": "user", |
| | "content": f"""You are a forensic text analyst. Analyze the following text and determine if it is AI-generated or written by a human. Also check if it could be a forged government document or fake news. |
| | |
| | Look for: |
| | - Overly formal or repetitive sentence structure typical of LLMs |
| | - Lack of personal voice or human inconsistencies |
| | - Suspiciously perfect grammar with no natural errors |
| | - Generic phrasing commonly used by AI models |
| | - For government documents: inconsistent terminology, wrong formats, suspicious clauses |
| | - For news: sensational language, lack of credible sources, misleading framing |
| | |
| | Text to analyze: |
| | \"\"\" |
| | {text[:4000]} |
| | \"\"\" |
| | |
| | Respond ONLY in this exact JSON format, nothing else: |
| | {{"fake_probability": 0.0, "reasoning": "brief reason"}} |
| | |
| | fake_probability must be between 0.0 (definitely human/authentic) and 1.0 (definitely AI-generated/forged).""" |
| | } |
| | ], |
| | "max_tokens": 200, |
| | "temperature": 0.1 |
| | } |
| |
|
| | async with httpx.AsyncClient(timeout=30.0) as client: |
| | response = await client.post( |
| | GROQ_API_URL, |
| | headers={ |
| | "Authorization": f"Bearer {GROQ_API_KEY}", |
| | "Content-Type": "application/json" |
| | }, |
| | json=payload |
| | ) |
| | response.raise_for_status() |
| | data = response.json() |
| | text_response = data["choices"][0]["message"]["content"] |
| | print(f"Groq text response: {text_response}") |
| | clean = text_response.strip().replace("```json", "").replace("```", "") |
| | result = json.loads(clean) |
| | return float(result["fake_probability"]), result.get("reasoning", "") |
| |
|
| | except Exception as e: |
| | print(f"Groq text failed: {e}") |
| | return None, "" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | async def call_roberta(url: str, text: str, name: str) -> Optional[float]: |
| | if not HF_API_KEY: |
| | print(f"No HF_API_KEY, skipping {name}") |
| | return None |
| | try: |
| | async with httpx.AsyncClient(timeout=30.0) as client: |
| | response = await client.post( |
| | url, |
| | headers={"Authorization": f"Bearer {HF_API_KEY}"}, |
| | json={"inputs": text[:512]} |
| | ) |
| | response.raise_for_status() |
| | data = response.json() |
| | print(f"{name} response: {data}") |
| | label_prob_dict = {item["label"]: item["score"] for item in data[0]} |
| | return normalize_output(label_prob_dict) |
| | except Exception as e: |
| | print(f"{name} failed: {e}") |
| | return None |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | async def analyze_image(contents: bytes, content_type: str = "image/jpeg"): |
| | image = Image.open(io.BytesIO(contents)).convert("RGB") |
| |
|
| | if len(contents) > 20 * 1024 * 1024: |
| | print("Image too large for Groq") |
| | score, reasoning = None, "Image too large for analysis" |
| | else: |
| | score, reasoning = await call_groq_vision(contents) |
| |
|
| | combined_model_score = score if score is not None else 0.5 |
| | models_used = ["Groq_Llama4"] if score is not None else [] |
| |
|
| | metadata_risk = calculate_metadata_risk(image) |
| | authenticity, fake = fusion_score(combined_model_score, metadata_risk) |
| |
|
| | return { |
| | "type": "image", |
| | "authenticity": round(authenticity, 2), |
| | "fake": round(fake, 2), |
| | "confidence_level": make_confidence(authenticity, fake), |
| | "models_used": models_used, |
| | "details": { |
| | "groq_score": round(score, 4) if score is not None else "unavailable", |
| | "groq_reasoning": reasoning, |
| | "metadata_risk": round(metadata_risk, 4), |
| | } |
| | } |
| |
|
| |
|
| | async def analyze_video(contents: bytes): |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as f: |
| | f.write(contents) |
| | tmp_path = f.name |
| |
|
| | try: |
| | cap = cv2.VideoCapture(tmp_path) |
| | frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | duration = round(frame_count / fps, 1) if fps > 0 else 0 |
| |
|
| | sample_indices = [int(frame_count * i / 5) for i in range(5)] |
| | frames = [] |
| |
|
| | for idx in sample_indices: |
| | cap.set(cv2.CAP_PROP_POS_FRAMES, idx) |
| | ret, frame = cap.read() |
| | if ret: |
| | pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
| | buf = io.BytesIO() |
| | pil_img.save(buf, format="JPEG", quality=85) |
| | frames.append(buf.getvalue()) |
| |
|
| | cap.release() |
| | os.unlink(tmp_path) |
| |
|
| | if not frames: |
| | return { |
| | "type": "video", |
| | "authenticity": 50.0, |
| | "fake": 50.0, |
| | "confidence_level": "low", |
| | "models_used": [], |
| | "details": { |
| | "groq_score": "unavailable", |
| | "groq_reasoning": "Could not extract frames from video.", |
| | "metadata_risk": 0.0, |
| | "frames_analyzed": 0, |
| | "video_duration": duration |
| | } |
| | } |
| |
|
| | scores = [] |
| | reasonings = [] |
| | for i, frame_bytes in enumerate(frames): |
| | print(f"Analyzing frame {i+1}/{len(frames)}") |
| | score, reasoning = await call_groq_vision(frame_bytes) |
| | if score is not None: |
| | scores.append(score) |
| | reasonings.append(f"Frame {i+1}: {reasoning}") |
| | if i < len(frames) - 1: |
| | await asyncio.sleep(2) |
| |
|
| | combined_model_score = sum(scores) / len(scores) if scores else 0.5 |
| | models_used = ["Groq_Llama4"] if scores else [] |
| | groq_reasoning = " | ".join(reasonings) if reasonings else "All frame analyses failed." |
| |
|
| | authenticity = round((1 - combined_model_score) * 100, 2) |
| | fake = round(combined_model_score * 100, 2) |
| |
|
| | return { |
| | "type": "video", |
| | "authenticity": authenticity, |
| | "fake": fake, |
| | "confidence_level": make_confidence(authenticity, fake), |
| | "models_used": models_used, |
| | "details": { |
| | "groq_score": round(combined_model_score, 4), |
| | "groq_reasoning": groq_reasoning, |
| | "metadata_risk": 0.0, |
| | "frames_analyzed": len(scores), |
| | "video_duration": duration |
| | } |
| | } |
| |
|
| | except Exception as e: |
| | print(f"Video analysis failed: {e}") |
| | if os.path.exists(tmp_path): |
| | os.unlink(tmp_path) |
| | return { |
| | "type": "video", |
| | "authenticity": 50.0, |
| | "fake": 50.0, |
| | "confidence_level": "low", |
| | "models_used": [], |
| | "details": { |
| | "groq_score": "unavailable", |
| | "groq_reasoning": f"Analysis failed: {str(e)}", |
| | "metadata_risk": 0.0, |
| | "frames_analyzed": 0, |
| | "video_duration": 0 |
| | } |
| | } |
| |
|
| |
|
| | async def analyze_text(text: str): |
| | |
| | results = await asyncio.gather( |
| | call_roberta(ROBERTA_FAKE_NEWS_URL, text, "RoBERTa_FakeNews"), |
| | call_roberta(ROBERTA_AI_TEXT_URL, text, "RoBERTa_AIDetector"), |
| | call_groq_text(text) |
| | ) |
| |
|
| | score1 = results[0] |
| | score2 = results[1] |
| | score3, reasoning = results[2] |
| |
|
| | scores = [(s, n) for s, n in [ |
| | (score1, "RoBERTa_FakeNews"), |
| | (score2, "RoBERTa_AIDetector"), |
| | (score3, "Groq_Llama3") |
| | ] if s is not None] |
| |
|
| | combined = sum(s for s, _ in scores) / len(scores) if scores else 0.5 |
| | models_used = [n for _, n in scores] |
| |
|
| | authenticity = round((1 - combined) * 100, 2) |
| | fake = round(combined * 100, 2) |
| |
|
| | return { |
| | "type": "text", |
| | "authenticity": authenticity, |
| | "fake": fake, |
| | "confidence_level": make_confidence(authenticity, fake), |
| | "models_used": models_used, |
| | "details": { |
| | "groq_score": round(score3, 4) if score3 is not None else "unavailable", |
| | "roberta_fakenews_score": round(score1, 4) if score1 is not None else "unavailable", |
| | "roberta_aidetector_score": round(score2, 4) if score2 is not None else "unavailable", |
| | "groq_reasoning": reasoning, |
| | "metadata_risk": 0.0, |
| | } |
| | } |
| |
|
| |
|
| | async def analyze_pdf(contents: bytes): |
| | scores = [] |
| | reasonings = [] |
| |
|
| | try: |
| | |
| | reader = pypdf.PdfReader(io.BytesIO(contents)) |
| | full_text = "" |
| | for page in reader.pages: |
| | full_text += page.extract_text() or "" |
| |
|
| | if full_text.strip(): |
| | print(f"Extracted {len(full_text)} chars from PDF") |
| | text_results = await asyncio.gather( |
| | call_roberta(ROBERTA_FAKE_NEWS_URL, full_text, "RoBERTa_FakeNews"), |
| | call_roberta(ROBERTA_AI_TEXT_URL, full_text, "RoBERTa_AIDetector"), |
| | call_groq_text(full_text) |
| | ) |
| | s1 = text_results[0] |
| | s2 = text_results[1] |
| | s3, text_reasoning = text_results[2] |
| |
|
| | if s1 is not None: |
| | scores.append(s1) |
| | reasonings.append(f"RoBERTa FakeNews: {round(s1*100)}% fake") |
| | if s2 is not None: |
| | scores.append(s2) |
| | reasonings.append(f"RoBERTa AI Detector: {round(s2*100)}% AI-generated") |
| | if s3 is not None: |
| | scores.append(s3) |
| | reasonings.append(f"Groq text: {text_reasoning}") |
| |
|
| | |
| | doc = fitz.open(stream=contents, filetype="pdf") |
| | image_count = 0 |
| | for page in doc: |
| | for img in page.get_images(): |
| | if image_count >= 3: |
| | break |
| | xref = img[0] |
| | base_image = doc.extract_image(xref) |
| | img_bytes = base_image["image"] |
| | await asyncio.sleep(2) |
| | img_score, img_reasoning = await call_groq_vision(img_bytes) |
| | if img_score is not None: |
| | scores.append(img_score) |
| | reasonings.append(f"Image {image_count+1}: {img_reasoning}") |
| | image_count += 1 |
| | doc.close() |
| |
|
| | except Exception as e: |
| | print(f"PDF analysis error: {e}") |
| |
|
| | combined = sum(scores) / len(scores) if scores else 0.5 |
| | models_used = ["RoBERTa_FakeNews", "RoBERTa_AIDetector", "Groq_Llama3+Vision"] if scores else [] |
| |
|
| | authenticity = round((1 - combined) * 100, 2) |
| | fake = round(combined * 100, 2) |
| |
|
| | return { |
| | "type": "pdf", |
| | "authenticity": authenticity, |
| | "fake": fake, |
| | "confidence_level": make_confidence(authenticity, fake), |
| | "models_used": models_used, |
| | "details": { |
| | "groq_score": "see breakdown", |
| | "groq_reasoning": " | ".join(reasonings) if reasonings else "No content extracted", |
| | "metadata_risk": 0.0, |
| | } |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @app.post("/analyze") |
| | async def analyze( |
| | file: Optional[UploadFile] = File(None), |
| | text: Optional[str] = Form(None) |
| | ): |
| | |
| | if text and not file: |
| | result = await analyze_text(text) |
| | result["sha256"] = hashlib.sha256(text.encode()).hexdigest() |
| | return result |
| |
|
| | if not file: |
| | return {"error": "No file or text provided"} |
| |
|
| | contents = await file.read() |
| | sha256 = calculate_sha256(contents) |
| |
|
| | if file.content_type.startswith("image/"): |
| | result = await analyze_image(contents, file.content_type) |
| | elif file.content_type.startswith("video/"): |
| | result = await analyze_video(contents) |
| | elif file.content_type == "application/pdf": |
| | result = await analyze_pdf(contents) |
| | else: |
| | return {"error": "Unsupported file type"} |
| |
|
| | result["sha256"] = sha256 |
| | return result |
| |
|
| |
|
| |
|