deepfakefacedetection / facecomparison_multi_resume.py
Samsularif's picture
Update facecomparison_multi_resume.py
28d9e73 verified
# # #!/usr/bin/env python3
# # import os
# # import csv
# # import time
# # import base64
# # from pathlib import Path
# # from tqdm import tqdm
# # import logging
# # from PIL import Image
# # import io
# # from datetime import datetime
# # from openai import OpenAI
# # import numpy as np
# # # === RetinaFace Configuration ===
# # try:
# # from retinaface import RetinaFace
# # RETINAFACE_AVAILABLE = True
# # except ImportError:
# # RETINAFACE_AVAILABLE = False
# # print("❌ ERROR: RetinaFace library not found. Please run 'pip install retina-face'. Running without face cropping.")
# # # === LOGGING CONFIG ===
# # os.makedirs("logs", exist_ok=True)
# # logging.basicConfig(
# # filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log",
# # level=logging.INFO,
# # format="%(asctime)s - %(levelname)s - %(message)s",
# # )
# # logger = logging.getLogger(__name__)
# # class DeepfakeDetector:
# # """Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) with RetinaFace + adaptive delay"""
# # def __init__(self, api_key, model_name="qwen", debug_mode=False, start_from=0, use_face_detector=True):
# # self.api_key = api_key
# # self.model_name = model_name.lower()
# # self.debug_mode = debug_mode
# # self.start_from = start_from
# # self.dataset_folder = "dataset"
# # self.results_folder = "result"
# # self.csv_filename = f"result_{self.model_name}_{start_from}.csv"
# # # === OpenRouter API client ===
# # self.client = OpenAI(
# # base_url="https://openrouter.ai/api/v1",
# # api_key=self.api_key,
# # )
# # self.extra_headers = {
# # "HTTP-Referer": "https://github.com/retinaface-comparison",
# # "X-Title": "Deepfake Detection Adaptive"
# # }
# # # === Model map (5 LLMs) ===
# # self.model_map = {
# # "qwen": "qwen/qwen3-vl-8b-instruct",
# # "gpt": "openai/chatgpt-4o-latest",
# # "gemini": "google/gemini-2.5-flash",
# # "llama": "meta-llama/llama-3.2-90b-vision-instruct",
# # "cohere": "cohere/command-r-plus-08-2024",
# # }
# # if self.model_name not in self.model_map:
# # raise ValueError("❌ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere")
# # logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})")
# # os.makedirs(self.results_folder, exist_ok=True)
# # self.use_face_detector = use_face_detector and RETINAFACE_AVAILABLE
# # self.target_size = 512
# # # Adaptive delay system
# # self.delay = 0.5
# # self.fail_count = 0
# # self.success_count = 0
# # print(f"\nModel: {self.model_name.upper()} | RetinaFace Cropping: {'ON' if self.use_face_detector else 'OFF'}")
# # # === Image handling (RetinaFace) ===
# # def preprocess_image_with_retinaface(self, image_path):
# # if not self.use_face_detector or not RETINAFACE_AVAILABLE:
# # return self.encode_image_simple(image_path)
# # try:
# # faces = RetinaFace.detect_faces(image_path)
# # if not faces:
# # logger.warning(f"No face detected in {image_path}")
# # return self.encode_image_simple(image_path)
# # # Ambil wajah utama
# # first_face = list(faces.values())[0]
# # facial_area = first_face.get("facial_area", None)
# # if not facial_area or len(facial_area) != 4:
# # return self.encode_image_simple(image_path)
# # x1, y1, x2, y2 = facial_area
# # img = Image.open(image_path).convert("RGB")
# # cropped_img = img.crop((x1, y1, x2, y2))
# # cropped_img = cropped_img.resize((self.target_size, self.target_size))
# # buf = io.BytesIO()
# # cropped_img.save(buf, format="JPEG", quality=90, optimize=True)
# # encoded = base64.b64encode(buf.getvalue()).decode("utf-8")
# # return f"data:image/jpeg;base64,{encoded}"
# # except Exception as e:
# # logger.error(f"RetinaFace error on {image_path}: {e}")
# # return self.encode_image_simple(image_path)
# # def encode_image_simple(self, image_path):
# # try:
# # with open(image_path, "rb") as f:
# # encoded = base64.b64encode(f.read()).decode("utf-8")
# # return f"data:image/jpeg;base64,{encoded}"
# # except Exception as e:
# # logger.error(f"Encode error: {e}")
# # return None
# # def validate_image(self, image_path):
# # try:
# # if not os.path.exists(image_path):
# # return False
# # with Image.open(image_path) as img:
# # img.verify()
# # return True
# # except Exception:
# # return False
# # def normalize_output(self, content):
# # if not content:
# # return "UNKNOWN"
# # text = content.strip().upper()
# # if any(w in text for w in ["REAL", "GENUINE", "HUMAN"]):
# # return "REAL"
# # if any(w in text for w in ["FAKE", "DEEPFAKE", "AI", "SYNTHETIC", "GENERATED"]):
# # return "FAKE"
# # if "NOT FAKE" in text or "LOOKS REAL" in text:
# # return "REAL"
# # if "PROBABLY FAKE" in text or "MAYBE FAKE" in text:
# # return "FAKE"
# # return "UNKNOWN"
# # def reverify_qwen(self, img_b64, prev_result):
# # prompt = (
# # "Re-analyze this face image for deepfake signs. "
# # "Focus on lighting, symmetry, and unnatural skin artifacts. "
# # "Respond with one word only: REAL or FAKE."
# # )
# # print(f"🔁 Re-verifying Qwen result (was {prev_result})...")
# # try:
# # resp = self.client.chat.completions.create(
# # extra_headers=self.extra_headers,
# # model=self.model_map["qwen"],
# # messages=[{
# # "role": "user",
# # "content": [
# # {"type": "image_url", "image_url": {"url": img_b64}},
# # {"type": "text", "text": prompt}
# # ]
# # }],
# # max_tokens=50,
# # temperature=0.1,
# # )
# # content = resp.choices[0].message.content.strip().upper()
# # if "FAKE" in content:
# # print("✅ Changed to FAKE after second check")
# # return "FAKE"
# # elif "REAL" in content:
# # print("✅ Confirmed REAL after second check")
# # return "REAL"
# # else:
# # print("⚠️ Still ambiguous after recheck")
# # return prev_result
# # except Exception as e:
# # print(f"⚠️ Qwen re-verification failed: {e}")
# # return prev_result
# # # === Deteksi utama ===
# # def detect_deepfake_llm(self, image_path):
# # prompt = (
# # "You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. "
# # "Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE."
# # )
# # if not self.validate_image(image_path):
# # return "ERROR", None, "invalid"
# # img_b64 = self.preprocess_image_with_retinaface(image_path)
# # if not img_b64:
# # return "ERROR", None, "invalid"
# # model_id = self.model_map[self.model_name]
# # method = "retinaface_crop" if self.use_face_detector else "original"
# # try:
# # resp = self.client.chat.completions.create(
# # extra_headers=self.extra_headers,
# # model=model_id,
# # messages=[{
# # "role": "user",
# # "content": [
# # {"type": "image_url", "image_url": {"url": img_b64}},
# # {"type": "text", "text": prompt}
# # ]
# # }],
# # max_tokens=50,
# # temperature=0.1,
# # )
# # content = resp.choices[0].message.content
# # result = self.normalize_output(content)
# # if self.model_name == "qwen" and result == "REAL":
# # result = self.reverify_qwen(img_b64, result)
# # return result, content, method
# # except Exception as e:
# # logger.error(f"Detection failed: {e}")
# # return "ERROR", None, method
# # # === Dataset & Resume ===
# # def get_images(self):
# # dataset_path = Path(self.dataset_folder)
# # real_path = dataset_path / "face_real"
# # fake_path = dataset_path / "face_fake"
# # if not real_path.exists() or not fake_path.exists():
# # print("❌ Dataset folders missing.")
# # return []
# # real_images = sorted(list(real_path.glob("*.jpg")))[:500]
# # fake_images = sorted(list(fake_path.glob("*.jpg")))[:500]
# # return [(str(p), "REAL") for p in real_images] + [(str(p), "FAKE") for p in fake_images]
# # def load_existing_results(self):
# # csv_path = os.path.join(self.results_folder, self.csv_filename)
# # if not os.path.exists(csv_path):
# # return []
# # results = []
# # with open(csv_path, "r", encoding="utf-8") as f:
# # reader = csv.reader(f)
# # next(reader)
# # for row in reader:
# # if len(row) >= 5:
# # results.append((row[0], row[1], row[2], row[3], row[4]))
# # logger.info(f"Loaded {len(results)} existing results")
# # return results
# # def save_results_to_csv(self, results):
# # csv_path = os.path.join(self.results_folder, self.csv_filename)
# # with open(csv_path, "w", newline="", encoding="utf-8") as f:
# # writer = csv.writer(f)
# # writer.writerow(["filename", "ground_truth", "llm_result", "model_name", "method"])
# # writer.writerows(results)
# # logger.info(f"Saved {len(results)} results to {csv_path}")
# # # === Adaptive delay logic ===
# # def adjust_delay(self):
# # if self.fail_count > 5:
# # self.delay = min(self.delay + 0.2, 2.0)
# # logger.warning(f"Increasing delay to {self.delay:.1f}s due to failures.")
# # self.fail_count = 0
# # elif self.success_count > 10:
# # self.delay = max(self.delay - 0.1, 0.3)
# # logger.info(f"Reducing delay to {self.delay:.1f}s (stable).")
# # self.success_count = 0
# # def run_detection(self, resume=True):
# # all_images = self.get_images()
# # if not all_images:
# # return
# # results = self.load_existing_results() if resume else []
# # processed = {r[0] for r in results}
# # remaining = [(p, gt) for p, gt in all_images if os.path.basename(p) not in processed]
# # print(f"\n=== STARTING {self.model_name.upper()} DETECTION ===")
# # print(f"Total: {len(all_images)} | Already done: {len(processed)} | Remaining: {len(remaining)}")
# # with tqdm(total=len(remaining), desc=f"{self.model_name.upper()}") as pbar:
# # for img_path, truth in remaining:
# # try:
# # result, response, method = self.detect_deepfake_llm(img_path)
# # results.append((os.path.basename(img_path), truth, result, self.model_name, method))
# # self.success_count += 1
# # except Exception as e:
# # logger.error(f"Fatal error: {e}")
# # results.append((os.path.basename(img_path), truth, "ERROR", self.model_name, "error"))
# # self.fail_count += 1
# # pbar.set_description(f"{os.path.basename(img_path)} -> {result}")
# # pbar.update(1)
# # self.save_results_to_csv(results)
# # self.adjust_delay()
# # time.sleep(self.delay)
# # print(f"\n✅ Detection completed for {self.model_name.upper()}")
# # print(f"Results saved to: {os.path.join(self.results_folder, self.csv_filename)}")
# #!/usr/bin/env python3
# #!/usr/bin/env python3
# import os
# import csv
# import time
# import base64
# from pathlib import Path
# from tqdm import tqdm
# import logging
# from PIL import Image
# import io
# from datetime import datetime
# from openai import OpenAI
# import numpy as np
# import math # Diperlukan untuk perhitungan akurasi
# # RetinaFace Configuration
# try:
# #retinaface
# from retinaface import RetinaFace
# RETINAFACE_AVAILABLE = True
# except ImportError:
# RETINAFACE_AVAILABLE = False
# print("❌ ERROR: RetinaFace library not found. Running without face cropping.")
# # === LOGGING CONFIG ===
# os.makedirs("logs", exist_ok=True)
# logging.basicConfig(
# filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log",
# level=logging.INFO,
# format="%(asctime)s - %(levelname)s - %(message)s",
# )
# logger = logging.getLogger(__name__)
# class DeepfakeDetector:
# """Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) with RetinaFace + adaptive delay"""
# def __init__(self, api_key, model_name="qwen", debug_mode=False, start_from=0, use_face_detector=True):
# self.api_key = api_key
# self.model_name = model_name.lower()
# self.debug_mode = debug_mode
# self.start_from = start_from
# self.dataset_folder = "dataset"
# self.results_folder = "result"
# self.csv_filename = f"result_{self.model_name}_{start_from}.csv"
# # OpenRouter API client
# self.client = OpenAI(
# base_url="https://openrouter.ai/api/v1",
# api_key=self.api_key,
# )
# self.extra_headers = {
# "HTTP-Referer": "https://github.com/retinaface-comparison",
# "X-Title": "Deepfake Detection Adaptive"
# }
# # Model map (5 LLMs via OpenRouter)
# self.model_map = {
# "qwen": "qwen/qwen3-vl-8b-instruct",
# "gpt": "openai/chatgpt-4o-latest",
# "gemini": "google/gemini-2.5-flash",
# "llama": "meta-llama/llama-3.2-90b-vision-instruct",
# "cohere": "cohere/command-r-plus-08-2024",
# }
# if self.model_name not in self.model_map:
# raise ValueError("❌ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere")
# logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})")
# os.makedirs(self.results_folder, exist_ok=True)
# self.use_face_detector = use_face_detector and RETINAFACE_AVAILABLE
# self.target_size = 512
# # waktu delay
# self.delay = 0.3
# self.fail_count = 0
# self.success_count = 0
# print(f"\nModel: {self.model_name.upper()} | RetinaFace Cropping: {'ON' if self.use_face_detector else 'OFF'}")
# # Image handling (RetinaFace)
# def preprocess_image_with_retinaface(self, image_path):
# if not self.use_face_detector or not RETINAFACE_AVAILABLE:
# return self.encode_image_simple(image_path)
# try:
# # Perlu diubah ke string karena RetinaFace kadang tidak menerima objek Path
# faces = RetinaFace.detect_faces(str(image_path))
# if not faces:
# logger.warning(f"No face detected in {image_path}")
# return self.encode_image_simple(image_path)
# first_face = list(faces.values())[0]
# facial_area = first_face.get("facial_area", None)
# if not facial_area or len(facial_area) != 4:
# return self.encode_image_simple(image_path)
# x1, y1, x2, y2 = facial_area
# img = Image.open(image_path).convert("RGB")
# # Tambahkan margin (ekstraksi)
# margin_ratio = 0.2
# w, h = x2 - x1, y2 - y1
# margin_x = int(w * margin_ratio)
# margin_y = int(h * margin_ratio)
# x1 = max(0, x1 - margin_x)
# y1 = max(0, y1 - margin_y)
# x2 = min(img.width, x2 + margin_x)
# y2 = min(img.height, y2 + margin_y)
# cropped_img = img.crop((x1, y1, x2, y2))
# cropped_img = cropped_img.resize((self.target_size, self.target_size), Image.Resampling.LANCZOS)
# buf = io.BytesIO()
# cropped_img.save(buf, format="JPEG", quality=90, optimize=True)
# encoded = base64.b64encode(buf.getvalue()).decode("utf-8")
# return f"data:image/jpeg;base64,{encoded}"
# except Exception as e:
# logger.error(f"RetinaFace error on {image_path}: {e}")
# return self.encode_image_simple(image_path)
# def encode_image_simple(self, image_path):
# try:
# with open(image_path, "rb") as f:
# encoded = base64.b64encode(f.read()).decode("utf-8")
# return f"data:image/jpeg;base64,{encoded}"
# except Exception as e:
# logger.error(f"Encode error: {e}")
# return None
# def validate_image(self, image_path):
# try:
# if not os.path.exists(image_path):
# return False
# with Image.open(image_path) as img:
# img.verify()
# return True
# except Exception:
# return False
# def normalize_output(self, content):
# """
# Normalizes verbose LLM output to a single word: REAL, FAKE, or UNKNOWN.
# """
# if not content:
# return "UNKNOWN"
# text = content.strip().upper()
# # Mencari kata kunci FAKE
# if any(w in text for w in ["FAKE", "DEEPFAKE", "AI GENERATED", "SYNTHETIC"]):
# return "FAKE"
# # Mencari kata kunci REAL
# if any(w in text for w in ["REAL", "GENUINE", "HUMAN", "NOT FAKE"]):
# return "REAL"
# # Upaya kedua: Coba ambil kata pertama/kata kunci di tengah respons
# words = text.split()
# if words:
# for word in words[:3]: # Cek 3 kata pertama
# if "REAL" in word: return "REAL"
# if "FAKE" in word: return "FAKE"
# logger.warning(f"Output ambiguous/unpredictable: {content}")
# return "UNKNOWN"
# def reverify_qwen(self, img_b64, prev_result):
# # Logika re-verifikasi
# prompt = (
# "Re-analyze this face image for deepfake signs. "
# "Focus on lighting, symmetry, and unnatural skin artifacts. "
# "Respond with one word only: REAL or FAKE."
# )
# print(f"🔁 Re-verifying Qwen result (was {prev_result})...")
# try:
# resp = self.client.chat.completions.create(
# extra_headers=self.extra_headers,
# model=self.model_map["qwen"],
# messages=[{
# "role": "user",
# "content": [
# {"type": "image_url", "image_url": {"url": img_b64}},
# {"type": "text", "text": prompt}
# ]
# }],
# max_tokens=50,
# temperature=0.1,
# )
# content = resp.choices[0].message.content.strip().upper()
# if "FAKE" in content:
# print("Changed to FAKE after second check")
# return "FAKE"
# elif "REAL" in content:
# print("Confirmed REAL after second check")
# return "REAL"
# else:
# print("Still ambiguous after recheck")
# return prev_result
# except Exception as e:
# print(f"Qwen re-verification failed: {e}")
# return prev_result
# # Fungsi Fallback
# def retinaface_simple_fallback(self, image_path):
# """Applies a heuristic rule if LLM returns an error."""
# if not self.use_face_detector or not RETINAFACE_AVAILABLE:
# return 'UNKNOWN_FALLBACK'
# try:
# # Panggil deteksi wajah (menggunakan str(image_path))
# faces = RetinaFace.detect_faces(str(image_path))
# if not faces:
# return 'UNKNOWN_FALLBACK'
# best_score = max(f['score'] for f in faces.values())
# # Aturan Heuristik: Jika skor kepercayaan wajah sangat tinggi, REAL.
# if best_score > 0.995:
# return 'REAL'
# else:
# return 'UNKNOWN_FALLBACK'
# except Exception:
# return 'UNKNOWN_FALLBACK'
# # Deteksi utama
# def detect_deepfake_llm(self, image_path):
# prompt = (
# "You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. "
# "Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE."
# )
# if not self.validate_image(image_path):
# return "ERROR", None, "invalid"
# img_b64 = self.preprocess_image_with_retinaface(image_path)
# if not img_b64:
# return "ERROR", None, "invalid"
# model_id = self.model_map[self.model_name]
# method = "retinaface_crop" if self.use_face_detector else "original"
# try:
# resp = self.client.chat.completions.create(
# extra_headers=self.extra_headers,
# model=model_id,
# messages=[{
# "role": "user",
# "content": [
# {"type": "image_url", "image_url": {"url": img_b64}},
# {"type": "text", "text": prompt}
# ]
# }],
# max_tokens=50,
# temperature=0.1,
# )
# content = resp.choices[0].message.content
# result = self.normalize_output(content)
# if self.model_name == "qwen" and result == "REAL":
# result = self.reverify_qwen(img_b64, result)
# return result, content, method
# except Exception as e:
# logger.error(f"Detection failed: {e}")
# # Tambahkan logika Fallback RetinaFace
# fallback_result = self.retinaface_simple_fallback(image_path)
# logger.warning(f"LLM Failed. Applying Fallback Logic: {fallback_result}")
# if fallback_result == 'REAL':
# # Jika heuristik RetinaFace yakin gambar BERKUALITAS BAGUS, prediksi REAL
# return 'REAL', "RetinaFace Heuristic", "retinaface_crop_FALLBACK"
# else:
# return "FAKE", "RetinaFace Heuristic (Assumed FAKE)", "retinaface_crop"
# # === Dataset & Resume ===
# def get_images(self):
# dataset_path = Path(self.dataset_folder)
# real_path = dataset_path / "face_real"
# fake_path = dataset_path / "face_fake"
# if not real_path.exists() or not fake_path.exists():
# print("❌ Dataset folders missing.")
# return []
# # Batasi gambar menjadi 500 REAL dan 500 FAKE (total 1000)
# real_images = sorted(list(real_path.glob("*.jpg")))[:500]
# fake_images = sorted(list(fake_path.glob("*.jpg")))[:500]
# return [(str(p), "REAL") for p in real_images] + [(str(p), "FAKE") for p in fake_images]
# def load_existing_results(self):
# csv_path = os.path.join(self.results_folder, self.csv_filename)
# if not os.path.exists(csv_path):
# return []
# results = []
# with open(csv_path, "r", encoding="utf-8") as f:
# reader = csv.reader(f)
# next(reader)
# for row in reader:
# # baris memiliki 5 kolom
# if len(row) >= 5:
# results.append((row[0], row[1], row[2], row[3], row[4]))
# logger.info(f"Loaded {len(results)} existing results")
# return results
# def save_results_to_csv(self, results):
# csv_path = os.path.join(self.results_folder, self.csv_filename)
# with open(csv_path, "w", newline="", encoding="utf-8") as f:
# writer = csv.writer(f)
# writer.writerow(["filename", "ground_truth", "llm_result", "model_name", "method"])
# writer.writerows(results)
# logger.info(f"Saved {len(results)} results to {csv_path}")
# # logika delay
# def adjust_delay(self):
# # Logika adaptive delay
# if self.fail_count > 5:
# self.delay = min(self.delay + 0.2, 2.0)
# logger.warning(f"Increasing delay to {self.delay:.1f}s due to failures.")
# self.fail_count = 0
# elif self.success_count > 10:
# self.delay = max(self.delay - 0.1, 0.3)
# logger.info(f"Reducing delay to {self.delay:.1f}s (stable).")
# self.success_count = 0
# def run_detection(self, resume=True):
# all_images = self.get_images()
# if not all_images:
# return
# results = self.load_existing_results() if resume else []
# processed = {r[0] for r in results}
# remaining = [(p, gt) for p, gt in all_images if os.path.basename(p) not in processed]
# print(f"\n=== STARTING {self.model_name.upper()} DETECTION ===")
# print(f"Total: {len(all_images)} | Already done: {len(processed)} | Remaining: {len(remaining)}")
# with tqdm(total=len(remaining), desc=f"{self.model_name.upper()}") as pbar:
# for img_path, truth in remaining:
# try:
# result, response, method = self.detect_deepfake_llm(img_path)
# results.append((os.path.basename(img_path), truth, result, self.model_name, method))
# self.success_count += 1
# except Exception as e:
# logger.error(f"Fatal error: {e}")
# results.append((os.path.basename(img_path), truth, "ERROR", self.model_name, "error"))
# self.fail_count += 1
# pbar.set_description(f"{os.path.basename(img_path)} -> {result}")
# pbar.update(1)
# self.save_results_to_csv(results)
# self.adjust_delay()
# time.sleep(self.delay)
# print(f"\n✅ Deteksi selesai untuk {self.model_name.upper()}")
# print(f"Hasil simpan ke: {os.path.join(self.results_folder, self.csv_filename)}")
# #!/usr/bin/env python3
# import os
# import csv
# import time
# import base64
# from pathlib import Path
# from tqdm import tqdm
# import logging
# from PIL import Image
# import io
# from datetime import datetime
# from openai import OpenAI
# import numpy as np
# import math
# # === RetinaFace Configuration ===
# try:
# from retinaface import RetinaFace
# RETINAFACE_AVAILABLE = True
# except ImportError:
# RETINAFACE_AVAILABLE = False
# print("❌ ERROR: RetinaFace library not found. Please run 'pip install retina-face'. Running without face cropping.")
# # === LOGGING CONFIG ===
# os.makedirs("logs", exist_ok=True)
# logging.basicConfig(
# filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log",
# level=logging.INFO,
# format="%(asctime)s - %(levelname)s - %(message)s",
# )
# logger = logging.getLogger(__name__)
# class DeepfakeDetector:
# """Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) with RetinaFace + adaptive delay"""
# def __init__(self, api_key, model_name="qwen", debug_mode=False, start_from=0, use_face_detector=True):
# self.api_key = api_key
# self.model_name = model_name.lower()
# self.debug_mode = debug_mode
# self.start_from = start_from
# self.dataset_folder = "dataset"
# self.results_folder = "result"
# self.csv_filename = f"result_{self.model_name}_{start_from}.csv"
# # === OpenRouter API client (Menggunakan OpenRouter untuk semua model) ===
# self.client = OpenAI(
# base_url="https://openrouter.ai/api/v1",
# api_key=self.api_key,
# )
# self.extra_headers = {
# "HTTP-Referer": "https://github.com/retinaface-comparison",
# "X-Title": "Deepfake Detection Adaptive"
# }
# # === Model map (5 LLMs via OpenRouter) ===
# self.model_map = {
# "qwen": "qwen/qwen3-vl-8b-instruct",
# "gpt": "openai/chatgpt-4o-latest",
# "gemini": "google/gemini-2.5-flash",
# "llama": "meta-llama/llama-3.2-90b-vision-instruct",
# "cohere": "cohere/command-r-plus-08-2024",
# }
# if self.model_name not in self.model_map:
# raise ValueError("❌ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere")
# logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})")
# os.makedirs(self.results_folder, exist_ok=True)
# self.use_face_detector = False and RETINAFACE_AVAILABLE
# self.target_size = 512
# # waktu delay
# self.delay = 0.3
# self.fail_count = 0
# self.success_count = 0
# print(f"\nModel: {self.model_name.upper()} | RetinaFace Cropping: {'ON' if self.use_face_detector else 'OFF'}")
# # === Image handling (RetinaFace) ===
# def preprocess_image_with_retinaface(self, image_path):
# if not self.use_face_detector or not RETINAFACE_AVAILABLE:
# return self.encode_image_simple(image_path)
# try:
# # Perlu diubah ke string karena RetinaFace kadang tidak menerima objek Path
# faces = RetinaFace.detect_faces(str(image_path))
# if not faces:
# logger.warning(f"No face detected in {image_path}")
# return self.encode_image_simple(image_path)
# first_face = list(faces.values())[0]
# facial_area = first_face.get("facial_area", None)
# if not facial_area or len(facial_area) != 4:
# return self.encode_image_simple(image_path)
# x1, y1, x2, y2 = facial_area
# img = Image.open(image_path).convert("RGB")
# # Tambahkan margin (ekstraksi)
# margin_ratio = 0.2
# w, h = x2 - x1, y2 - y1
# margin_x = int(w * margin_ratio)
# margin_y = int(h * margin_ratio)
# x1 = max(0, x1 - margin_x)
# y1 = max(0, y1 - margin_y)
# x2 = min(img.width, x2 + margin_x)
# y2 = min(img.height, y2 + margin_y)
# cropped_img = img.crop((x1, y1, x2, y2))
# cropped_img = cropped_img.resize((self.target_size, self.target_size), Image.Resampling.LANCZOS)
# buf = io.BytesIO()
# cropped_img.save(buf, format="JPEG", quality=90, optimize=True)
# encoded = base64.b64encode(buf.getvalue()).decode("utf-8")
# return f"data:image/jpeg;base64,{encoded}"
# except Exception as e:
# logger.error(f"RetinaFace error on {image_path}: {e}")
# return self.encode_image_simple(image_path)
# def encode_image_simple(self, image_path):
# try:
# with open(image_path, "rb") as f:
# encoded = base64.b64encode(f.read()).decode("utf-8")
# return f"data:image/jpeg;base64,{encoded}"
# except Exception as e:
# logger.error(f"Encode error: {e}")
# return None
# def validate_image(self, image_path):
# try:
# if not os.path.exists(image_path):
# return False
# with Image.open(image_path) as img:
# img.verify()
# return True
# except Exception:
# return False
# def normalize_output(self, content):
# """
# Normalizes verbose LLM output to a single word: REAL, FAKE, or UNKNOWN.
# """
# if not content:
# return "UNKNOWN"
# text = content.strip().upper()
# # Mencari kata kunci FAKE (atau sinonim)
# if any(w in text for w in ["FAKE", "DEEPFAKE", "AI GENERATED", "SYNTHETIC"]):
# return "FAKE"
# # Mencari kata kunci REAL (atau sinonim)
# if any(w in text for w in ["REAL", "GENUINE", "HUMAN", "NOT FAKE"]):
# return "REAL"
# # Upaya kedua: Coba ambil kata pertama/kata kunci di tengah respons
# words = text.split()
# if words:
# for word in words[:3]: # Cek 3 kata pertama
# if "REAL" in word: return "REAL"
# if "FAKE" in word: return "FAKE"
# logger.warning(f"Output ambiguous/unpredictable: {content}")
# return "UNKNOWN"
# def reverify_qwen(self, img_b64, prev_result):
# # Logika re-verifikasi
# prompt = (
# "Re-analyze this face image for deepfake signs. "
# "Focus on lighting, symmetry, and unnatural skin artifacts. "
# "Respond with one word only: REAL or FAKE."
# )
# print(f"🔁 Re-verifying Qwen result (was {prev_result})...")
# try:
# resp = self.client.chat.completions.create(
# extra_headers=self.extra_headers,
# model=self.model_map["qwen"],
# messages=[{
# "role": "user",
# "content": [
# {"type": "image_url", "image_url": {"url": img_b64}},
# {"type": "text", "text": prompt}
# ]
# }],
# max_tokens=50,
# temperature=0.1,
# )
# content = resp.choices[0].message.content.strip().upper()
# if "FAKE" in content:
# print("Changed to FAKE after second check")
# return "FAKE"
# elif "REAL" in content:
# print("Confirmed REAL after second check")
# return "REAL"
# else:
# print("Still ambiguous after recheck")
# return prev_result
# except Exception as e:
# print(f"Qwen re-verification failed: {e}")
# return prev_result
# # Fungsi Fallback Sederhana RetinaFace (Heuristik) DIHAPUS
# # === Deteksi utama ===
# def detect_deepfake_llm(self, image_path):
# prompt = (
# "You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. "
# "Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE."
# )
# if not self.validate_image(image_path):
# return "ERROR", None, "invalid"
# img_b64 = self.preprocess_image_with_retinaface(image_path)
# if not img_b64:
# return "ERROR", None, "invalid"
# model_id = self.model_map[self.model_name]
# method = "retinaface_crop" if self.use_face_detector else "original"
# try:
# resp = self.client.chat.completions.create(
# extra_headers=self.extra_headers,
# model=model_id,
# messages=[{
# "role": "user",
# "content": [
# {"type": "image_url", "image_url": {"url": img_b64}},
# {"type": "text", "text": prompt}
# ]
# }],
# max_tokens=50,
# temperature=0.1,
# )
# content = resp.choices[0].message.content
# result = self.normalize_output(content)
# if self.model_name == "qwen" and result == "REAL":
# result = self.reverify_qwen(img_b64, result)
# return result, content, method
# except Exception as e:
# logger.error(f"Detection failed: {e}")
# # --- LOGIKA KETIKA LLM GAGAL (TIDAK ADA TEBAKAN) ---
# # Jika LLM gagal, catat sebagai ERROR.
# return "ERROR", None, "API_FAILURE" # Mengganti 'error' dengan 'API_FAILURE' untuk kejelasan
# # === Dataset & Resume ===
# def get_images(self):
# dataset_path = Path(self.dataset_folder)
# real_path = dataset_path / "face_real"
# fake_path = dataset_path / "face_fake"
# if not real_path.exists() or not fake_path.exists():
# print("❌ Dataset folders missing.")
# return []
# # Batasi gambar menjadi 500 REAL dan 500 FAKE (total 1000)
# real_images = sorted(list(real_path.glob("*.jpg")))[:500]
# fake_images = sorted(list(fake_path.glob("*.jpg")))[:500]
# return [(str(p), "REAL") for p in real_images] + [(str(p), "FAKE") for p in fake_images]
# def load_existing_results(self):
# csv_path = os.path.join(self.results_folder, self.csv_filename)
# if not os.path.exists(csv_path):
# return []
# results = []
# with open(csv_path, "r", encoding="utf-8") as f:
# reader = csv.reader(f)
# next(reader)
# for row in reader:
# # Pastikan baris memiliki 5 kolom
# if len(row) >= 5:
# results.append((row[0], row[1], row[2], row[3], row[4]))
# logger.info(f"Loaded {len(results)} existing results")
# return results
# def save_results_to_csv(self, results):
# csv_path = os.path.join(self.results_folder, self.csv_filename)
# with open(csv_path, "w", newline="", encoding="utf-8") as f:
# writer = csv.writer(f)
# writer.writerow(["filename", "ground_truth", "llm_result", "model_name", "method"])
# writer.writerows(results)
# logger.info(f"Saved {len(results)} results to {csv_path}")
# # === Adaptive delay logic ===
# def adjust_delay(self):
# # Logika adaptive delay
# if self.fail_count > 5:
# self.delay = min(self.delay + 0.2, 2.0)
# logger.warning(f"Increasing delay to {self.delay:.1f}s due to failures.")
# self.fail_count = 0
# elif self.success_count > 10:
# self.delay = max(self.delay - 0.1, 0.3)
# logger.info(f"Reducing delay to {self.delay:.1f}s (stable).")
# self.success_count = 0
# def run_detection(self, resume=True):
# all_images = self.get_images()
# if not all_images:
# return
# results = self.load_existing_results() if resume else []
# processed = {r[0] for r in results}
# remaining = [(p, gt) for p, gt in all_images if os.path.basename(p) not in processed]
# print(f"\n=== STARTING {self.model_name.upper()} DETECTION ===")
# print(f"Total: {len(all_images)} | Already done: {len(processed)} | Remaining: {len(remaining)}")
# with tqdm(total=len(remaining), desc=f"{self.model_name.upper()}") as pbar:
# for img_path, truth in remaining:
# try:
# result, response, method = self.detect_deepfake_llm(img_path)
# results.append((os.path.basename(img_path), truth, result, self.model_name, method))
# self.success_count += 1
# except Exception as e:
# logger.error(f"Fatal error: {e}")
# results.append((os.path.basename(img_path), truth, "ERROR", self.model_name, "error"))
# self.fail_count += 1
# pbar.set_description(f"{os.path.basename(img_path)} -> {result}")
# pbar.update(1)
# self.save_results_to_csv(results)
# self.adjust_delay()
# time.sleep(self.delay)
# print(f"\n✅ Deteksi selesai untuk {self.model_name.upper()}")
# print(f"Hasil simpan ke: {os.path.join(self.results_folder, self.csv_filename)}")
import os
import time
import base64
from pathlib import Path
import logging
from PIL import Image
import io
from datetime import datetime
from openai import OpenAI
import numpy as np
# === RetinaFace Configuration ===
try:
from retinaface import RetinaFace
RETINAFACE_AVAILABLE = True
except ImportError:
RETINAFACE_AVAILABLE = False
# === LOGGING CONFIG ===
os.makedirs("logs", exist_ok=True)
logging.basicConfig(
filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
class DeepfakeDetector:
"""Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) for Single Image Inference"""
def __init__(self, api_key, model_name="qwen", use_face_detector=True):
self.api_key = api_key
self.model_name = model_name.lower()
# === OpenRouter API client ===
self.client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=self.api_key,
)
self.extra_headers = {
# Ganti Referer agar sesuai dengan Hugging Face
"HTTP-Referer": "https://huggingface.co/spaces/[your-username]/[your-space-name]",
"X-Title": f"Deepfake Detection Gradio ({self.model_name.upper()})"
}
# === Model map (5 LLMs via OpenRouter) ===
self.model_map = {
"qwen": "qwen/qwen3-vl-8b-instruct",
"gpt": "openai/chatgpt-4o-latest",
"gemini": "google/gemini-2.5-flash",
"llama": "meta-llama/llama-3.2-90b-vision-instruct",
# "cohere": "cohere/command-r-plus-08-2024",
}
if self.model_name not in self.model_map:
raise ValueError("❌ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere")
logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})")
# diinisialisasi berdasarkan input
self.use_face_detector = use_face_detector and RETINAFACE_AVAILABLE
self.target_size = 512
# === Image handling (RetinaFace) ===
# FUNGSI INI TETAP TIDAK BERUBAH
def preprocess_image_with_retinaface(self, image_path):
if not self.use_face_detector or not RETINAFACE_AVAILABLE:
return self.encode_image_simple(image_path)
try:
faces = RetinaFace.detect_faces(str(image_path))
if not faces:
logger.warning(f"No face detected in {image_path}")
return self.encode_image_simple(image_path)
first_face = list(faces.values())[0]
facial_area = first_face.get("facial_area", None)
if not facial_area or len(facial_area) != 4:
return self.encode_image_simple(image_path)
x1, y1, x2, y2 = facial_area
img = Image.open(image_path).convert("RGB")
# Tambahkan margin (ekstraksi)
margin_ratio = 0.2
w, h = x2 - x1, y2 - y1
margin_x = int(w * margin_ratio)
margin_y = int(h * margin_ratio)
x1 = max(0, x1 - margin_x)
y1 = max(0, y1 - margin_y)
x2 = min(img.width, x2 + margin_x)
y2 = min(img.height, y2 + margin_y)
cropped_img = img.crop((x1, y1, x2, y2))
cropped_img = cropped_img.resize((self.target_size, self.target_size), Image.Resampling.LANCZOS)
buf = io.BytesIO()
cropped_img.save(buf, format="JPEG", quality=90, optimize=True)
encoded = base64.b64encode(buf.getvalue()).decode("utf-8")
return f"data:image/jpeg;base64,{encoded}"
except Exception as e:
logger.error(f"RetinaFace error on {image_path}: {e}")
return self.encode_image_simple(image_path)
# FUNGSI encode
def encode_image_simple(self, image_path):
try:
with open(image_path, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
return f"data:image/jpeg;base64,{encoded}"
except Exception as e:
logger.error(f"Encode error: {e}")
return None
# FUNGSI validasi
def validate_image(self, image_path):
try:
if not os.path.exists(image_path):
return False
with Image.open(image_path) as img:
img.verify()
return True
except Exception:
return False
# FUNGSI normalize
def normalize_output(self, content):
"""
Normalizes verbose LLM output to a single word: REAL, FAKE, or UNKNOWN.
"""
if not content:
return "UNKNOWN"
text = content.strip().upper()
if any(w in text for w in ["FAKE", "DEEPFAKE", "AI GENERATED", "SYNTHETIC"]):
return "FAKE"
if any(w in text for w in ["REAL", "GENUINE", "HUMAN", "NOT FAKE"]):
return "REAL"
words = text.split()
if words:
for word in words[:3]:
if "REAL" in word: return "REAL"
if "FAKE" in word: return "FAKE"
logger.warning(f"Output ambiguous/unpredictable: {content}")
return "UNKNOWN"
# FUNGSI analisis qwen
def reverify_qwen(self, img_b64, prev_result):
prompt = (
"Re-analyze this face image for deepfake signs. "
"Focus on lighting, symmetry, and unnatural skin artifacts. "
"Respond with one word only: REAL or FAKE."
)
try:
resp = self.client.chat.completions.create(
extra_headers=self.extra_headers,
model=self.model_map["qwen"],
messages=[{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": img_b64}},
{"type": "text", "text": prompt}
]
}],
max_tokens=50,
temperature=0.1,
)
content = resp.choices[0].message.content.strip().upper()
if "FAKE" in content:
# print("Changed to FAKE after second check")
return "FAKE"
elif "REAL" in content:
# print("Confirmed REAL after second check")
return "REAL"
else:
# print("Still ambiguous after recheck")
return prev_result
except Exception as e:
# print(f"Qwen re-verification failed: {e}")
return prev_result
# === Deteksi utama ===
def detect_deepfake_llm(self, image_path):
prompt = (
"You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. "
"Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE."
)
if not self.validate_image(image_path):
return "ERROR", None, "invalid"
img_b64 = self.preprocess_image_with_retinaface(image_path)
if not img_b64:
return "ERROR", None, "invalid"
model_id = self.model_map[self.model_name]
method = "retinaface_crop" if self.use_face_detector else "original"
try:
resp = self.client.chat.completions.create(
extra_headers=self.extra_headers,
model=model_id,
messages=[{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": img_b64}},
{"type": "text", "text": prompt}
]
}],
max_tokens=50,
temperature=0.1,
)
content = resp.choices[0].message.content
result = self.normalize_output(content)
if self.model_name == "qwen" and result == "REAL":
result = self.reverify_qwen(img_b64, result)
return result, content, method
except Exception as e:
logger.error(f"Detection failed: {e}")
return "ERROR", None, "API_FAILURE"
pass # Pass diletakkan di sini hanya sebagai penanda bahwa sisanya telah dihapus.