# # #!/usr/bin/env python3 # # import os # # import csv # # import time # # import base64 # # from pathlib import Path # # from tqdm import tqdm # # import logging # # from PIL import Image # # import io # # from datetime import datetime # # from openai import OpenAI # # import numpy as np # # # === RetinaFace Configuration === # # try: # # from retinaface import RetinaFace # # RETINAFACE_AVAILABLE = True # # except ImportError: # # RETINAFACE_AVAILABLE = False # # print("āŒ ERROR: RetinaFace library not found. Please run 'pip install retina-face'. Running without face cropping.") # # # === LOGGING CONFIG === # # os.makedirs("logs", exist_ok=True) # # logging.basicConfig( # # filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log", # # level=logging.INFO, # # format="%(asctime)s - %(levelname)s - %(message)s", # # ) # # logger = logging.getLogger(__name__) # # class DeepfakeDetector: # # """Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) with RetinaFace + adaptive delay""" # # def __init__(self, api_key, model_name="qwen", debug_mode=False, start_from=0, use_face_detector=True): # # self.api_key = api_key # # self.model_name = model_name.lower() # # self.debug_mode = debug_mode # # self.start_from = start_from # # self.dataset_folder = "dataset" # # self.results_folder = "result" # # self.csv_filename = f"result_{self.model_name}_{start_from}.csv" # # # === OpenRouter API client === # # self.client = OpenAI( # # base_url="https://openrouter.ai/api/v1", # # api_key=self.api_key, # # ) # # self.extra_headers = { # # "HTTP-Referer": "https://github.com/retinaface-comparison", # # "X-Title": "Deepfake Detection Adaptive" # # } # # # === Model map (5 LLMs) === # # self.model_map = { # # "qwen": "qwen/qwen3-vl-8b-instruct", # # "gpt": "openai/chatgpt-4o-latest", # # "gemini": "google/gemini-2.5-flash", # # "llama": "meta-llama/llama-3.2-90b-vision-instruct", # # "cohere": "cohere/command-r-plus-08-2024", # # } # # if self.model_name not in self.model_map: # # raise ValueError("āŒ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere") # # logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})") # # os.makedirs(self.results_folder, exist_ok=True) # # self.use_face_detector = use_face_detector and RETINAFACE_AVAILABLE # # self.target_size = 512 # # # Adaptive delay system # # self.delay = 0.5 # # self.fail_count = 0 # # self.success_count = 0 # # print(f"\nModel: {self.model_name.upper()} | RetinaFace Cropping: {'ON' if self.use_face_detector else 'OFF'}") # # # === Image handling (RetinaFace) === # # def preprocess_image_with_retinaface(self, image_path): # # if not self.use_face_detector or not RETINAFACE_AVAILABLE: # # return self.encode_image_simple(image_path) # # try: # # faces = RetinaFace.detect_faces(image_path) # # if not faces: # # logger.warning(f"No face detected in {image_path}") # # return self.encode_image_simple(image_path) # # # Ambil wajah utama # # first_face = list(faces.values())[0] # # facial_area = first_face.get("facial_area", None) # # if not facial_area or len(facial_area) != 4: # # return self.encode_image_simple(image_path) # # x1, y1, x2, y2 = facial_area # # img = Image.open(image_path).convert("RGB") # # cropped_img = img.crop((x1, y1, x2, y2)) # # cropped_img = cropped_img.resize((self.target_size, self.target_size)) # # buf = io.BytesIO() # # cropped_img.save(buf, format="JPEG", quality=90, optimize=True) # # encoded = base64.b64encode(buf.getvalue()).decode("utf-8") # # return f"data:image/jpeg;base64,{encoded}" # # except Exception as e: # # logger.error(f"RetinaFace error on {image_path}: {e}") # # return self.encode_image_simple(image_path) # # def encode_image_simple(self, image_path): # # try: # # with open(image_path, "rb") as f: # # encoded = base64.b64encode(f.read()).decode("utf-8") # # return f"data:image/jpeg;base64,{encoded}" # # except Exception as e: # # logger.error(f"Encode error: {e}") # # return None # # def validate_image(self, image_path): # # try: # # if not os.path.exists(image_path): # # return False # # with Image.open(image_path) as img: # # img.verify() # # return True # # except Exception: # # return False # # def normalize_output(self, content): # # if not content: # # return "UNKNOWN" # # text = content.strip().upper() # # if any(w in text for w in ["REAL", "GENUINE", "HUMAN"]): # # return "REAL" # # if any(w in text for w in ["FAKE", "DEEPFAKE", "AI", "SYNTHETIC", "GENERATED"]): # # return "FAKE" # # if "NOT FAKE" in text or "LOOKS REAL" in text: # # return "REAL" # # if "PROBABLY FAKE" in text or "MAYBE FAKE" in text: # # return "FAKE" # # return "UNKNOWN" # # def reverify_qwen(self, img_b64, prev_result): # # prompt = ( # # "Re-analyze this face image for deepfake signs. " # # "Focus on lighting, symmetry, and unnatural skin artifacts. " # # "Respond with one word only: REAL or FAKE." # # ) # # print(f"šŸ” Re-verifying Qwen result (was {prev_result})...") # # try: # # resp = self.client.chat.completions.create( # # extra_headers=self.extra_headers, # # model=self.model_map["qwen"], # # messages=[{ # # "role": "user", # # "content": [ # # {"type": "image_url", "image_url": {"url": img_b64}}, # # {"type": "text", "text": prompt} # # ] # # }], # # max_tokens=50, # # temperature=0.1, # # ) # # content = resp.choices[0].message.content.strip().upper() # # if "FAKE" in content: # # print("āœ… Changed to FAKE after second check") # # return "FAKE" # # elif "REAL" in content: # # print("āœ… Confirmed REAL after second check") # # return "REAL" # # else: # # print("āš ļø Still ambiguous after recheck") # # return prev_result # # except Exception as e: # # print(f"āš ļø Qwen re-verification failed: {e}") # # return prev_result # # # === Deteksi utama === # # def detect_deepfake_llm(self, image_path): # # prompt = ( # # "You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. " # # "Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE." # # ) # # if not self.validate_image(image_path): # # return "ERROR", None, "invalid" # # img_b64 = self.preprocess_image_with_retinaface(image_path) # # if not img_b64: # # return "ERROR", None, "invalid" # # model_id = self.model_map[self.model_name] # # method = "retinaface_crop" if self.use_face_detector else "original" # # try: # # resp = self.client.chat.completions.create( # # extra_headers=self.extra_headers, # # model=model_id, # # messages=[{ # # "role": "user", # # "content": [ # # {"type": "image_url", "image_url": {"url": img_b64}}, # # {"type": "text", "text": prompt} # # ] # # }], # # max_tokens=50, # # temperature=0.1, # # ) # # content = resp.choices[0].message.content # # result = self.normalize_output(content) # # if self.model_name == "qwen" and result == "REAL": # # result = self.reverify_qwen(img_b64, result) # # return result, content, method # # except Exception as e: # # logger.error(f"Detection failed: {e}") # # return "ERROR", None, method # # # === Dataset & Resume === # # def get_images(self): # # dataset_path = Path(self.dataset_folder) # # real_path = dataset_path / "face_real" # # fake_path = dataset_path / "face_fake" # # if not real_path.exists() or not fake_path.exists(): # # print("āŒ Dataset folders missing.") # # return [] # # real_images = sorted(list(real_path.glob("*.jpg")))[:500] # # fake_images = sorted(list(fake_path.glob("*.jpg")))[:500] # # return [(str(p), "REAL") for p in real_images] + [(str(p), "FAKE") for p in fake_images] # # def load_existing_results(self): # # csv_path = os.path.join(self.results_folder, self.csv_filename) # # if not os.path.exists(csv_path): # # return [] # # results = [] # # with open(csv_path, "r", encoding="utf-8") as f: # # reader = csv.reader(f) # # next(reader) # # for row in reader: # # if len(row) >= 5: # # results.append((row[0], row[1], row[2], row[3], row[4])) # # logger.info(f"Loaded {len(results)} existing results") # # return results # # def save_results_to_csv(self, results): # # csv_path = os.path.join(self.results_folder, self.csv_filename) # # with open(csv_path, "w", newline="", encoding="utf-8") as f: # # writer = csv.writer(f) # # writer.writerow(["filename", "ground_truth", "llm_result", "model_name", "method"]) # # writer.writerows(results) # # logger.info(f"Saved {len(results)} results to {csv_path}") # # # === Adaptive delay logic === # # def adjust_delay(self): # # if self.fail_count > 5: # # self.delay = min(self.delay + 0.2, 2.0) # # logger.warning(f"Increasing delay to {self.delay:.1f}s due to failures.") # # self.fail_count = 0 # # elif self.success_count > 10: # # self.delay = max(self.delay - 0.1, 0.3) # # logger.info(f"Reducing delay to {self.delay:.1f}s (stable).") # # self.success_count = 0 # # def run_detection(self, resume=True): # # all_images = self.get_images() # # if not all_images: # # return # # results = self.load_existing_results() if resume else [] # # processed = {r[0] for r in results} # # remaining = [(p, gt) for p, gt in all_images if os.path.basename(p) not in processed] # # print(f"\n=== STARTING {self.model_name.upper()} DETECTION ===") # # print(f"Total: {len(all_images)} | Already done: {len(processed)} | Remaining: {len(remaining)}") # # with tqdm(total=len(remaining), desc=f"{self.model_name.upper()}") as pbar: # # for img_path, truth in remaining: # # try: # # result, response, method = self.detect_deepfake_llm(img_path) # # results.append((os.path.basename(img_path), truth, result, self.model_name, method)) # # self.success_count += 1 # # except Exception as e: # # logger.error(f"Fatal error: {e}") # # results.append((os.path.basename(img_path), truth, "ERROR", self.model_name, "error")) # # self.fail_count += 1 # # pbar.set_description(f"{os.path.basename(img_path)} -> {result}") # # pbar.update(1) # # self.save_results_to_csv(results) # # self.adjust_delay() # # time.sleep(self.delay) # # print(f"\nāœ… Detection completed for {self.model_name.upper()}") # # print(f"Results saved to: {os.path.join(self.results_folder, self.csv_filename)}") # #!/usr/bin/env python3 # #!/usr/bin/env python3 # import os # import csv # import time # import base64 # from pathlib import Path # from tqdm import tqdm # import logging # from PIL import Image # import io # from datetime import datetime # from openai import OpenAI # import numpy as np # import math # Diperlukan untuk perhitungan akurasi # # RetinaFace Configuration # try: # #retinaface # from retinaface import RetinaFace # RETINAFACE_AVAILABLE = True # except ImportError: # RETINAFACE_AVAILABLE = False # print("āŒ ERROR: RetinaFace library not found. Running without face cropping.") # # === LOGGING CONFIG === # os.makedirs("logs", exist_ok=True) # logging.basicConfig( # filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log", # level=logging.INFO, # format="%(asctime)s - %(levelname)s - %(message)s", # ) # logger = logging.getLogger(__name__) # class DeepfakeDetector: # """Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) with RetinaFace + adaptive delay""" # def __init__(self, api_key, model_name="qwen", debug_mode=False, start_from=0, use_face_detector=True): # self.api_key = api_key # self.model_name = model_name.lower() # self.debug_mode = debug_mode # self.start_from = start_from # self.dataset_folder = "dataset" # self.results_folder = "result" # self.csv_filename = f"result_{self.model_name}_{start_from}.csv" # # OpenRouter API client # self.client = OpenAI( # base_url="https://openrouter.ai/api/v1", # api_key=self.api_key, # ) # self.extra_headers = { # "HTTP-Referer": "https://github.com/retinaface-comparison", # "X-Title": "Deepfake Detection Adaptive" # } # # Model map (5 LLMs via OpenRouter) # self.model_map = { # "qwen": "qwen/qwen3-vl-8b-instruct", # "gpt": "openai/chatgpt-4o-latest", # "gemini": "google/gemini-2.5-flash", # "llama": "meta-llama/llama-3.2-90b-vision-instruct", # "cohere": "cohere/command-r-plus-08-2024", # } # if self.model_name not in self.model_map: # raise ValueError("āŒ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere") # logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})") # os.makedirs(self.results_folder, exist_ok=True) # self.use_face_detector = use_face_detector and RETINAFACE_AVAILABLE # self.target_size = 512 # # waktu delay # self.delay = 0.3 # self.fail_count = 0 # self.success_count = 0 # print(f"\nModel: {self.model_name.upper()} | RetinaFace Cropping: {'ON' if self.use_face_detector else 'OFF'}") # # Image handling (RetinaFace) # def preprocess_image_with_retinaface(self, image_path): # if not self.use_face_detector or not RETINAFACE_AVAILABLE: # return self.encode_image_simple(image_path) # try: # # Perlu diubah ke string karena RetinaFace kadang tidak menerima objek Path # faces = RetinaFace.detect_faces(str(image_path)) # if not faces: # logger.warning(f"No face detected in {image_path}") # return self.encode_image_simple(image_path) # first_face = list(faces.values())[0] # facial_area = first_face.get("facial_area", None) # if not facial_area or len(facial_area) != 4: # return self.encode_image_simple(image_path) # x1, y1, x2, y2 = facial_area # img = Image.open(image_path).convert("RGB") # # Tambahkan margin (ekstraksi) # margin_ratio = 0.2 # w, h = x2 - x1, y2 - y1 # margin_x = int(w * margin_ratio) # margin_y = int(h * margin_ratio) # x1 = max(0, x1 - margin_x) # y1 = max(0, y1 - margin_y) # x2 = min(img.width, x2 + margin_x) # y2 = min(img.height, y2 + margin_y) # cropped_img = img.crop((x1, y1, x2, y2)) # cropped_img = cropped_img.resize((self.target_size, self.target_size), Image.Resampling.LANCZOS) # buf = io.BytesIO() # cropped_img.save(buf, format="JPEG", quality=90, optimize=True) # encoded = base64.b64encode(buf.getvalue()).decode("utf-8") # return f"data:image/jpeg;base64,{encoded}" # except Exception as e: # logger.error(f"RetinaFace error on {image_path}: {e}") # return self.encode_image_simple(image_path) # def encode_image_simple(self, image_path): # try: # with open(image_path, "rb") as f: # encoded = base64.b64encode(f.read()).decode("utf-8") # return f"data:image/jpeg;base64,{encoded}" # except Exception as e: # logger.error(f"Encode error: {e}") # return None # def validate_image(self, image_path): # try: # if not os.path.exists(image_path): # return False # with Image.open(image_path) as img: # img.verify() # return True # except Exception: # return False # def normalize_output(self, content): # """ # Normalizes verbose LLM output to a single word: REAL, FAKE, or UNKNOWN. # """ # if not content: # return "UNKNOWN" # text = content.strip().upper() # # Mencari kata kunci FAKE # if any(w in text for w in ["FAKE", "DEEPFAKE", "AI GENERATED", "SYNTHETIC"]): # return "FAKE" # # Mencari kata kunci REAL # if any(w in text for w in ["REAL", "GENUINE", "HUMAN", "NOT FAKE"]): # return "REAL" # # Upaya kedua: Coba ambil kata pertama/kata kunci di tengah respons # words = text.split() # if words: # for word in words[:3]: # Cek 3 kata pertama # if "REAL" in word: return "REAL" # if "FAKE" in word: return "FAKE" # logger.warning(f"Output ambiguous/unpredictable: {content}") # return "UNKNOWN" # def reverify_qwen(self, img_b64, prev_result): # # Logika re-verifikasi # prompt = ( # "Re-analyze this face image for deepfake signs. " # "Focus on lighting, symmetry, and unnatural skin artifacts. " # "Respond with one word only: REAL or FAKE." # ) # print(f"šŸ” Re-verifying Qwen result (was {prev_result})...") # try: # resp = self.client.chat.completions.create( # extra_headers=self.extra_headers, # model=self.model_map["qwen"], # messages=[{ # "role": "user", # "content": [ # {"type": "image_url", "image_url": {"url": img_b64}}, # {"type": "text", "text": prompt} # ] # }], # max_tokens=50, # temperature=0.1, # ) # content = resp.choices[0].message.content.strip().upper() # if "FAKE" in content: # print("Changed to FAKE after second check") # return "FAKE" # elif "REAL" in content: # print("Confirmed REAL after second check") # return "REAL" # else: # print("Still ambiguous after recheck") # return prev_result # except Exception as e: # print(f"Qwen re-verification failed: {e}") # return prev_result # # Fungsi Fallback # def retinaface_simple_fallback(self, image_path): # """Applies a heuristic rule if LLM returns an error.""" # if not self.use_face_detector or not RETINAFACE_AVAILABLE: # return 'UNKNOWN_FALLBACK' # try: # # Panggil deteksi wajah (menggunakan str(image_path)) # faces = RetinaFace.detect_faces(str(image_path)) # if not faces: # return 'UNKNOWN_FALLBACK' # best_score = max(f['score'] for f in faces.values()) # # Aturan Heuristik: Jika skor kepercayaan wajah sangat tinggi, REAL. # if best_score > 0.995: # return 'REAL' # else: # return 'UNKNOWN_FALLBACK' # except Exception: # return 'UNKNOWN_FALLBACK' # # Deteksi utama # def detect_deepfake_llm(self, image_path): # prompt = ( # "You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. " # "Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE." # ) # if not self.validate_image(image_path): # return "ERROR", None, "invalid" # img_b64 = self.preprocess_image_with_retinaface(image_path) # if not img_b64: # return "ERROR", None, "invalid" # model_id = self.model_map[self.model_name] # method = "retinaface_crop" if self.use_face_detector else "original" # try: # resp = self.client.chat.completions.create( # extra_headers=self.extra_headers, # model=model_id, # messages=[{ # "role": "user", # "content": [ # {"type": "image_url", "image_url": {"url": img_b64}}, # {"type": "text", "text": prompt} # ] # }], # max_tokens=50, # temperature=0.1, # ) # content = resp.choices[0].message.content # result = self.normalize_output(content) # if self.model_name == "qwen" and result == "REAL": # result = self.reverify_qwen(img_b64, result) # return result, content, method # except Exception as e: # logger.error(f"Detection failed: {e}") # # Tambahkan logika Fallback RetinaFace # fallback_result = self.retinaface_simple_fallback(image_path) # logger.warning(f"LLM Failed. Applying Fallback Logic: {fallback_result}") # if fallback_result == 'REAL': # # Jika heuristik RetinaFace yakin gambar BERKUALITAS BAGUS, prediksi REAL # return 'REAL', "RetinaFace Heuristic", "retinaface_crop_FALLBACK" # else: # return "FAKE", "RetinaFace Heuristic (Assumed FAKE)", "retinaface_crop" # # === Dataset & Resume === # def get_images(self): # dataset_path = Path(self.dataset_folder) # real_path = dataset_path / "face_real" # fake_path = dataset_path / "face_fake" # if not real_path.exists() or not fake_path.exists(): # print("āŒ Dataset folders missing.") # return [] # # Batasi gambar menjadi 500 REAL dan 500 FAKE (total 1000) # real_images = sorted(list(real_path.glob("*.jpg")))[:500] # fake_images = sorted(list(fake_path.glob("*.jpg")))[:500] # return [(str(p), "REAL") for p in real_images] + [(str(p), "FAKE") for p in fake_images] # def load_existing_results(self): # csv_path = os.path.join(self.results_folder, self.csv_filename) # if not os.path.exists(csv_path): # return [] # results = [] # with open(csv_path, "r", encoding="utf-8") as f: # reader = csv.reader(f) # next(reader) # for row in reader: # # baris memiliki 5 kolom # if len(row) >= 5: # results.append((row[0], row[1], row[2], row[3], row[4])) # logger.info(f"Loaded {len(results)} existing results") # return results # def save_results_to_csv(self, results): # csv_path = os.path.join(self.results_folder, self.csv_filename) # with open(csv_path, "w", newline="", encoding="utf-8") as f: # writer = csv.writer(f) # writer.writerow(["filename", "ground_truth", "llm_result", "model_name", "method"]) # writer.writerows(results) # logger.info(f"Saved {len(results)} results to {csv_path}") # # logika delay # def adjust_delay(self): # # Logika adaptive delay # if self.fail_count > 5: # self.delay = min(self.delay + 0.2, 2.0) # logger.warning(f"Increasing delay to {self.delay:.1f}s due to failures.") # self.fail_count = 0 # elif self.success_count > 10: # self.delay = max(self.delay - 0.1, 0.3) # logger.info(f"Reducing delay to {self.delay:.1f}s (stable).") # self.success_count = 0 # def run_detection(self, resume=True): # all_images = self.get_images() # if not all_images: # return # results = self.load_existing_results() if resume else [] # processed = {r[0] for r in results} # remaining = [(p, gt) for p, gt in all_images if os.path.basename(p) not in processed] # print(f"\n=== STARTING {self.model_name.upper()} DETECTION ===") # print(f"Total: {len(all_images)} | Already done: {len(processed)} | Remaining: {len(remaining)}") # with tqdm(total=len(remaining), desc=f"{self.model_name.upper()}") as pbar: # for img_path, truth in remaining: # try: # result, response, method = self.detect_deepfake_llm(img_path) # results.append((os.path.basename(img_path), truth, result, self.model_name, method)) # self.success_count += 1 # except Exception as e: # logger.error(f"Fatal error: {e}") # results.append((os.path.basename(img_path), truth, "ERROR", self.model_name, "error")) # self.fail_count += 1 # pbar.set_description(f"{os.path.basename(img_path)} -> {result}") # pbar.update(1) # self.save_results_to_csv(results) # self.adjust_delay() # time.sleep(self.delay) # print(f"\nāœ… Deteksi selesai untuk {self.model_name.upper()}") # print(f"Hasil simpan ke: {os.path.join(self.results_folder, self.csv_filename)}") # #!/usr/bin/env python3 # import os # import csv # import time # import base64 # from pathlib import Path # from tqdm import tqdm # import logging # from PIL import Image # import io # from datetime import datetime # from openai import OpenAI # import numpy as np # import math # # === RetinaFace Configuration === # try: # from retinaface import RetinaFace # RETINAFACE_AVAILABLE = True # except ImportError: # RETINAFACE_AVAILABLE = False # print("āŒ ERROR: RetinaFace library not found. Please run 'pip install retina-face'. Running without face cropping.") # # === LOGGING CONFIG === # os.makedirs("logs", exist_ok=True) # logging.basicConfig( # filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log", # level=logging.INFO, # format="%(asctime)s - %(levelname)s - %(message)s", # ) # logger = logging.getLogger(__name__) # class DeepfakeDetector: # """Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) with RetinaFace + adaptive delay""" # def __init__(self, api_key, model_name="qwen", debug_mode=False, start_from=0, use_face_detector=True): # self.api_key = api_key # self.model_name = model_name.lower() # self.debug_mode = debug_mode # self.start_from = start_from # self.dataset_folder = "dataset" # self.results_folder = "result" # self.csv_filename = f"result_{self.model_name}_{start_from}.csv" # # === OpenRouter API client (Menggunakan OpenRouter untuk semua model) === # self.client = OpenAI( # base_url="https://openrouter.ai/api/v1", # api_key=self.api_key, # ) # self.extra_headers = { # "HTTP-Referer": "https://github.com/retinaface-comparison", # "X-Title": "Deepfake Detection Adaptive" # } # # === Model map (5 LLMs via OpenRouter) === # self.model_map = { # "qwen": "qwen/qwen3-vl-8b-instruct", # "gpt": "openai/chatgpt-4o-latest", # "gemini": "google/gemini-2.5-flash", # "llama": "meta-llama/llama-3.2-90b-vision-instruct", # "cohere": "cohere/command-r-plus-08-2024", # } # if self.model_name not in self.model_map: # raise ValueError("āŒ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere") # logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})") # os.makedirs(self.results_folder, exist_ok=True) # self.use_face_detector = False and RETINAFACE_AVAILABLE # self.target_size = 512 # # waktu delay # self.delay = 0.3 # self.fail_count = 0 # self.success_count = 0 # print(f"\nModel: {self.model_name.upper()} | RetinaFace Cropping: {'ON' if self.use_face_detector else 'OFF'}") # # === Image handling (RetinaFace) === # def preprocess_image_with_retinaface(self, image_path): # if not self.use_face_detector or not RETINAFACE_AVAILABLE: # return self.encode_image_simple(image_path) # try: # # Perlu diubah ke string karena RetinaFace kadang tidak menerima objek Path # faces = RetinaFace.detect_faces(str(image_path)) # if not faces: # logger.warning(f"No face detected in {image_path}") # return self.encode_image_simple(image_path) # first_face = list(faces.values())[0] # facial_area = first_face.get("facial_area", None) # if not facial_area or len(facial_area) != 4: # return self.encode_image_simple(image_path) # x1, y1, x2, y2 = facial_area # img = Image.open(image_path).convert("RGB") # # Tambahkan margin (ekstraksi) # margin_ratio = 0.2 # w, h = x2 - x1, y2 - y1 # margin_x = int(w * margin_ratio) # margin_y = int(h * margin_ratio) # x1 = max(0, x1 - margin_x) # y1 = max(0, y1 - margin_y) # x2 = min(img.width, x2 + margin_x) # y2 = min(img.height, y2 + margin_y) # cropped_img = img.crop((x1, y1, x2, y2)) # cropped_img = cropped_img.resize((self.target_size, self.target_size), Image.Resampling.LANCZOS) # buf = io.BytesIO() # cropped_img.save(buf, format="JPEG", quality=90, optimize=True) # encoded = base64.b64encode(buf.getvalue()).decode("utf-8") # return f"data:image/jpeg;base64,{encoded}" # except Exception as e: # logger.error(f"RetinaFace error on {image_path}: {e}") # return self.encode_image_simple(image_path) # def encode_image_simple(self, image_path): # try: # with open(image_path, "rb") as f: # encoded = base64.b64encode(f.read()).decode("utf-8") # return f"data:image/jpeg;base64,{encoded}" # except Exception as e: # logger.error(f"Encode error: {e}") # return None # def validate_image(self, image_path): # try: # if not os.path.exists(image_path): # return False # with Image.open(image_path) as img: # img.verify() # return True # except Exception: # return False # def normalize_output(self, content): # """ # Normalizes verbose LLM output to a single word: REAL, FAKE, or UNKNOWN. # """ # if not content: # return "UNKNOWN" # text = content.strip().upper() # # Mencari kata kunci FAKE (atau sinonim) # if any(w in text for w in ["FAKE", "DEEPFAKE", "AI GENERATED", "SYNTHETIC"]): # return "FAKE" # # Mencari kata kunci REAL (atau sinonim) # if any(w in text for w in ["REAL", "GENUINE", "HUMAN", "NOT FAKE"]): # return "REAL" # # Upaya kedua: Coba ambil kata pertama/kata kunci di tengah respons # words = text.split() # if words: # for word in words[:3]: # Cek 3 kata pertama # if "REAL" in word: return "REAL" # if "FAKE" in word: return "FAKE" # logger.warning(f"Output ambiguous/unpredictable: {content}") # return "UNKNOWN" # def reverify_qwen(self, img_b64, prev_result): # # Logika re-verifikasi # prompt = ( # "Re-analyze this face image for deepfake signs. " # "Focus on lighting, symmetry, and unnatural skin artifacts. " # "Respond with one word only: REAL or FAKE." # ) # print(f"šŸ” Re-verifying Qwen result (was {prev_result})...") # try: # resp = self.client.chat.completions.create( # extra_headers=self.extra_headers, # model=self.model_map["qwen"], # messages=[{ # "role": "user", # "content": [ # {"type": "image_url", "image_url": {"url": img_b64}}, # {"type": "text", "text": prompt} # ] # }], # max_tokens=50, # temperature=0.1, # ) # content = resp.choices[0].message.content.strip().upper() # if "FAKE" in content: # print("Changed to FAKE after second check") # return "FAKE" # elif "REAL" in content: # print("Confirmed REAL after second check") # return "REAL" # else: # print("Still ambiguous after recheck") # return prev_result # except Exception as e: # print(f"Qwen re-verification failed: {e}") # return prev_result # # Fungsi Fallback Sederhana RetinaFace (Heuristik) DIHAPUS # # === Deteksi utama === # def detect_deepfake_llm(self, image_path): # prompt = ( # "You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. " # "Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE." # ) # if not self.validate_image(image_path): # return "ERROR", None, "invalid" # img_b64 = self.preprocess_image_with_retinaface(image_path) # if not img_b64: # return "ERROR", None, "invalid" # model_id = self.model_map[self.model_name] # method = "retinaface_crop" if self.use_face_detector else "original" # try: # resp = self.client.chat.completions.create( # extra_headers=self.extra_headers, # model=model_id, # messages=[{ # "role": "user", # "content": [ # {"type": "image_url", "image_url": {"url": img_b64}}, # {"type": "text", "text": prompt} # ] # }], # max_tokens=50, # temperature=0.1, # ) # content = resp.choices[0].message.content # result = self.normalize_output(content) # if self.model_name == "qwen" and result == "REAL": # result = self.reverify_qwen(img_b64, result) # return result, content, method # except Exception as e: # logger.error(f"Detection failed: {e}") # # --- LOGIKA KETIKA LLM GAGAL (TIDAK ADA TEBAKAN) --- # # Jika LLM gagal, catat sebagai ERROR. # return "ERROR", None, "API_FAILURE" # Mengganti 'error' dengan 'API_FAILURE' untuk kejelasan # # === Dataset & Resume === # def get_images(self): # dataset_path = Path(self.dataset_folder) # real_path = dataset_path / "face_real" # fake_path = dataset_path / "face_fake" # if not real_path.exists() or not fake_path.exists(): # print("āŒ Dataset folders missing.") # return [] # # Batasi gambar menjadi 500 REAL dan 500 FAKE (total 1000) # real_images = sorted(list(real_path.glob("*.jpg")))[:500] # fake_images = sorted(list(fake_path.glob("*.jpg")))[:500] # return [(str(p), "REAL") for p in real_images] + [(str(p), "FAKE") for p in fake_images] # def load_existing_results(self): # csv_path = os.path.join(self.results_folder, self.csv_filename) # if not os.path.exists(csv_path): # return [] # results = [] # with open(csv_path, "r", encoding="utf-8") as f: # reader = csv.reader(f) # next(reader) # for row in reader: # # Pastikan baris memiliki 5 kolom # if len(row) >= 5: # results.append((row[0], row[1], row[2], row[3], row[4])) # logger.info(f"Loaded {len(results)} existing results") # return results # def save_results_to_csv(self, results): # csv_path = os.path.join(self.results_folder, self.csv_filename) # with open(csv_path, "w", newline="", encoding="utf-8") as f: # writer = csv.writer(f) # writer.writerow(["filename", "ground_truth", "llm_result", "model_name", "method"]) # writer.writerows(results) # logger.info(f"Saved {len(results)} results to {csv_path}") # # === Adaptive delay logic === # def adjust_delay(self): # # Logika adaptive delay # if self.fail_count > 5: # self.delay = min(self.delay + 0.2, 2.0) # logger.warning(f"Increasing delay to {self.delay:.1f}s due to failures.") # self.fail_count = 0 # elif self.success_count > 10: # self.delay = max(self.delay - 0.1, 0.3) # logger.info(f"Reducing delay to {self.delay:.1f}s (stable).") # self.success_count = 0 # def run_detection(self, resume=True): # all_images = self.get_images() # if not all_images: # return # results = self.load_existing_results() if resume else [] # processed = {r[0] for r in results} # remaining = [(p, gt) for p, gt in all_images if os.path.basename(p) not in processed] # print(f"\n=== STARTING {self.model_name.upper()} DETECTION ===") # print(f"Total: {len(all_images)} | Already done: {len(processed)} | Remaining: {len(remaining)}") # with tqdm(total=len(remaining), desc=f"{self.model_name.upper()}") as pbar: # for img_path, truth in remaining: # try: # result, response, method = self.detect_deepfake_llm(img_path) # results.append((os.path.basename(img_path), truth, result, self.model_name, method)) # self.success_count += 1 # except Exception as e: # logger.error(f"Fatal error: {e}") # results.append((os.path.basename(img_path), truth, "ERROR", self.model_name, "error")) # self.fail_count += 1 # pbar.set_description(f"{os.path.basename(img_path)} -> {result}") # pbar.update(1) # self.save_results_to_csv(results) # self.adjust_delay() # time.sleep(self.delay) # print(f"\nāœ… Deteksi selesai untuk {self.model_name.upper()}") # print(f"Hasil simpan ke: {os.path.join(self.results_folder, self.csv_filename)}") import os import time import base64 from pathlib import Path import logging from PIL import Image import io from datetime import datetime from openai import OpenAI import numpy as np # === RetinaFace Configuration === try: from retinaface import RetinaFace RETINAFACE_AVAILABLE = True except ImportError: RETINAFACE_AVAILABLE = False # === LOGGING CONFIG === os.makedirs("logs", exist_ok=True) logging.basicConfig( filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) class DeepfakeDetector: """Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) for Single Image Inference""" def __init__(self, api_key, model_name="qwen", use_face_detector=True): self.api_key = api_key self.model_name = model_name.lower() # === OpenRouter API client === self.client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=self.api_key, ) self.extra_headers = { # Ganti Referer agar sesuai dengan Hugging Face "HTTP-Referer": "https://huggingface.co/spaces/[your-username]/[your-space-name]", "X-Title": f"Deepfake Detection Gradio ({self.model_name.upper()})" } # === Model map (5 LLMs via OpenRouter) === self.model_map = { "qwen": "qwen/qwen3-vl-8b-instruct", "gpt": "openai/chatgpt-4o-latest", "gemini": "google/gemini-2.5-flash", "llama": "meta-llama/llama-3.2-90b-vision-instruct", # "cohere": "cohere/command-r-plus-08-2024", } if self.model_name not in self.model_map: raise ValueError("āŒ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere") logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})") # diinisialisasi berdasarkan input self.use_face_detector = use_face_detector and RETINAFACE_AVAILABLE self.target_size = 512 # === Image handling (RetinaFace) === # FUNGSI INI TETAP TIDAK BERUBAH def preprocess_image_with_retinaface(self, image_path): if not self.use_face_detector or not RETINAFACE_AVAILABLE: return self.encode_image_simple(image_path) try: faces = RetinaFace.detect_faces(str(image_path)) if not faces: logger.warning(f"No face detected in {image_path}") return self.encode_image_simple(image_path) first_face = list(faces.values())[0] facial_area = first_face.get("facial_area", None) if not facial_area or len(facial_area) != 4: return self.encode_image_simple(image_path) x1, y1, x2, y2 = facial_area img = Image.open(image_path).convert("RGB") # Tambahkan margin (ekstraksi) margin_ratio = 0.2 w, h = x2 - x1, y2 - y1 margin_x = int(w * margin_ratio) margin_y = int(h * margin_ratio) x1 = max(0, x1 - margin_x) y1 = max(0, y1 - margin_y) x2 = min(img.width, x2 + margin_x) y2 = min(img.height, y2 + margin_y) cropped_img = img.crop((x1, y1, x2, y2)) cropped_img = cropped_img.resize((self.target_size, self.target_size), Image.Resampling.LANCZOS) buf = io.BytesIO() cropped_img.save(buf, format="JPEG", quality=90, optimize=True) encoded = base64.b64encode(buf.getvalue()).decode("utf-8") return f"data:image/jpeg;base64,{encoded}" except Exception as e: logger.error(f"RetinaFace error on {image_path}: {e}") return self.encode_image_simple(image_path) # FUNGSI encode def encode_image_simple(self, image_path): try: with open(image_path, "rb") as f: encoded = base64.b64encode(f.read()).decode("utf-8") return f"data:image/jpeg;base64,{encoded}" except Exception as e: logger.error(f"Encode error: {e}") return None # FUNGSI validasi def validate_image(self, image_path): try: if not os.path.exists(image_path): return False with Image.open(image_path) as img: img.verify() return True except Exception: return False # FUNGSI normalize def normalize_output(self, content): """ Normalizes verbose LLM output to a single word: REAL, FAKE, or UNKNOWN. """ if not content: return "UNKNOWN" text = content.strip().upper() if any(w in text for w in ["FAKE", "DEEPFAKE", "AI GENERATED", "SYNTHETIC"]): return "FAKE" if any(w in text for w in ["REAL", "GENUINE", "HUMAN", "NOT FAKE"]): return "REAL" words = text.split() if words: for word in words[:3]: if "REAL" in word: return "REAL" if "FAKE" in word: return "FAKE" logger.warning(f"Output ambiguous/unpredictable: {content}") return "UNKNOWN" # FUNGSI analisis qwen def reverify_qwen(self, img_b64, prev_result): prompt = ( "Re-analyze this face image for deepfake signs. " "Focus on lighting, symmetry, and unnatural skin artifacts. " "Respond with one word only: REAL or FAKE." ) try: resp = self.client.chat.completions.create( extra_headers=self.extra_headers, model=self.model_map["qwen"], messages=[{ "role": "user", "content": [ {"type": "image_url", "image_url": {"url": img_b64}}, {"type": "text", "text": prompt} ] }], max_tokens=50, temperature=0.1, ) content = resp.choices[0].message.content.strip().upper() if "FAKE" in content: # print("Changed to FAKE after second check") return "FAKE" elif "REAL" in content: # print("Confirmed REAL after second check") return "REAL" else: # print("Still ambiguous after recheck") return prev_result except Exception as e: # print(f"Qwen re-verification failed: {e}") return prev_result # === Deteksi utama === def detect_deepfake_llm(self, image_path): prompt = ( "You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. " "Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE." ) if not self.validate_image(image_path): return "ERROR", None, "invalid" img_b64 = self.preprocess_image_with_retinaface(image_path) if not img_b64: return "ERROR", None, "invalid" model_id = self.model_map[self.model_name] method = "retinaface_crop" if self.use_face_detector else "original" try: resp = self.client.chat.completions.create( extra_headers=self.extra_headers, model=model_id, messages=[{ "role": "user", "content": [ {"type": "image_url", "image_url": {"url": img_b64}}, {"type": "text", "text": prompt} ] }], max_tokens=50, temperature=0.1, ) content = resp.choices[0].message.content result = self.normalize_output(content) if self.model_name == "qwen" and result == "REAL": result = self.reverify_qwen(img_b64, result) return result, content, method except Exception as e: logger.error(f"Detection failed: {e}") return "ERROR", None, "API_FAILURE" pass # Pass diletakkan di sini hanya sebagai penanda bahwa sisanya telah dihapus.