| # # #!/usr/bin/env python3 | |
| # # import os | |
| # # import csv | |
| # # import time | |
| # # import base64 | |
| # # from pathlib import Path | |
| # # from tqdm import tqdm | |
| # # import logging | |
| # # from PIL import Image | |
| # # import io | |
| # # from datetime import datetime | |
| # # from openai import OpenAI | |
| # # import numpy as np | |
| # # # === RetinaFace Configuration === | |
| # # try: | |
| # # from retinaface import RetinaFace | |
| # # RETINAFACE_AVAILABLE = True | |
| # # except ImportError: | |
| # # RETINAFACE_AVAILABLE = False | |
| # # print("❌ ERROR: RetinaFace library not found. Please run 'pip install retina-face'. Running without face cropping.") | |
| # # # === LOGGING CONFIG === | |
| # # os.makedirs("logs", exist_ok=True) | |
| # # logging.basicConfig( | |
| # # filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log", | |
| # # level=logging.INFO, | |
| # # format="%(asctime)s - %(levelname)s - %(message)s", | |
| # # ) | |
| # # logger = logging.getLogger(__name__) | |
| # # class DeepfakeDetector: | |
| # # """Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) with RetinaFace + adaptive delay""" | |
| # # def __init__(self, api_key, model_name="qwen", debug_mode=False, start_from=0, use_face_detector=True): | |
| # # self.api_key = api_key | |
| # # self.model_name = model_name.lower() | |
| # # self.debug_mode = debug_mode | |
| # # self.start_from = start_from | |
| # # self.dataset_folder = "dataset" | |
| # # self.results_folder = "result" | |
| # # self.csv_filename = f"result_{self.model_name}_{start_from}.csv" | |
| # # # === OpenRouter API client === | |
| # # self.client = OpenAI( | |
| # # base_url="https://openrouter.ai/api/v1", | |
| # # api_key=self.api_key, | |
| # # ) | |
| # # self.extra_headers = { | |
| # # "HTTP-Referer": "https://github.com/retinaface-comparison", | |
| # # "X-Title": "Deepfake Detection Adaptive" | |
| # # } | |
| # # # === Model map (5 LLMs) === | |
| # # self.model_map = { | |
| # # "qwen": "qwen/qwen3-vl-8b-instruct", | |
| # # "gpt": "openai/chatgpt-4o-latest", | |
| # # "gemini": "google/gemini-2.5-flash", | |
| # # "llama": "meta-llama/llama-3.2-90b-vision-instruct", | |
| # # "cohere": "cohere/command-r-plus-08-2024", | |
| # # } | |
| # # if self.model_name not in self.model_map: | |
| # # raise ValueError("❌ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere") | |
| # # logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})") | |
| # # os.makedirs(self.results_folder, exist_ok=True) | |
| # # self.use_face_detector = use_face_detector and RETINAFACE_AVAILABLE | |
| # # self.target_size = 512 | |
| # # # Adaptive delay system | |
| # # self.delay = 0.5 | |
| # # self.fail_count = 0 | |
| # # self.success_count = 0 | |
| # # print(f"\nModel: {self.model_name.upper()} | RetinaFace Cropping: {'ON' if self.use_face_detector else 'OFF'}") | |
| # # # === Image handling (RetinaFace) === | |
| # # def preprocess_image_with_retinaface(self, image_path): | |
| # # if not self.use_face_detector or not RETINAFACE_AVAILABLE: | |
| # # return self.encode_image_simple(image_path) | |
| # # try: | |
| # # faces = RetinaFace.detect_faces(image_path) | |
| # # if not faces: | |
| # # logger.warning(f"No face detected in {image_path}") | |
| # # return self.encode_image_simple(image_path) | |
| # # # Ambil wajah utama | |
| # # first_face = list(faces.values())[0] | |
| # # facial_area = first_face.get("facial_area", None) | |
| # # if not facial_area or len(facial_area) != 4: | |
| # # return self.encode_image_simple(image_path) | |
| # # x1, y1, x2, y2 = facial_area | |
| # # img = Image.open(image_path).convert("RGB") | |
| # # cropped_img = img.crop((x1, y1, x2, y2)) | |
| # # cropped_img = cropped_img.resize((self.target_size, self.target_size)) | |
| # # buf = io.BytesIO() | |
| # # cropped_img.save(buf, format="JPEG", quality=90, optimize=True) | |
| # # encoded = base64.b64encode(buf.getvalue()).decode("utf-8") | |
| # # return f"data:image/jpeg;base64,{encoded}" | |
| # # except Exception as e: | |
| # # logger.error(f"RetinaFace error on {image_path}: {e}") | |
| # # return self.encode_image_simple(image_path) | |
| # # def encode_image_simple(self, image_path): | |
| # # try: | |
| # # with open(image_path, "rb") as f: | |
| # # encoded = base64.b64encode(f.read()).decode("utf-8") | |
| # # return f"data:image/jpeg;base64,{encoded}" | |
| # # except Exception as e: | |
| # # logger.error(f"Encode error: {e}") | |
| # # return None | |
| # # def validate_image(self, image_path): | |
| # # try: | |
| # # if not os.path.exists(image_path): | |
| # # return False | |
| # # with Image.open(image_path) as img: | |
| # # img.verify() | |
| # # return True | |
| # # except Exception: | |
| # # return False | |
| # # def normalize_output(self, content): | |
| # # if not content: | |
| # # return "UNKNOWN" | |
| # # text = content.strip().upper() | |
| # # if any(w in text for w in ["REAL", "GENUINE", "HUMAN"]): | |
| # # return "REAL" | |
| # # if any(w in text for w in ["FAKE", "DEEPFAKE", "AI", "SYNTHETIC", "GENERATED"]): | |
| # # return "FAKE" | |
| # # if "NOT FAKE" in text or "LOOKS REAL" in text: | |
| # # return "REAL" | |
| # # if "PROBABLY FAKE" in text or "MAYBE FAKE" in text: | |
| # # return "FAKE" | |
| # # return "UNKNOWN" | |
| # # def reverify_qwen(self, img_b64, prev_result): | |
| # # prompt = ( | |
| # # "Re-analyze this face image for deepfake signs. " | |
| # # "Focus on lighting, symmetry, and unnatural skin artifacts. " | |
| # # "Respond with one word only: REAL or FAKE." | |
| # # ) | |
| # # print(f"🔁 Re-verifying Qwen result (was {prev_result})...") | |
| # # try: | |
| # # resp = self.client.chat.completions.create( | |
| # # extra_headers=self.extra_headers, | |
| # # model=self.model_map["qwen"], | |
| # # messages=[{ | |
| # # "role": "user", | |
| # # "content": [ | |
| # # {"type": "image_url", "image_url": {"url": img_b64}}, | |
| # # {"type": "text", "text": prompt} | |
| # # ] | |
| # # }], | |
| # # max_tokens=50, | |
| # # temperature=0.1, | |
| # # ) | |
| # # content = resp.choices[0].message.content.strip().upper() | |
| # # if "FAKE" in content: | |
| # # print("✅ Changed to FAKE after second check") | |
| # # return "FAKE" | |
| # # elif "REAL" in content: | |
| # # print("✅ Confirmed REAL after second check") | |
| # # return "REAL" | |
| # # else: | |
| # # print("⚠️ Still ambiguous after recheck") | |
| # # return prev_result | |
| # # except Exception as e: | |
| # # print(f"⚠️ Qwen re-verification failed: {e}") | |
| # # return prev_result | |
| # # # === Deteksi utama === | |
| # # def detect_deepfake_llm(self, image_path): | |
| # # prompt = ( | |
| # # "You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. " | |
| # # "Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE." | |
| # # ) | |
| # # if not self.validate_image(image_path): | |
| # # return "ERROR", None, "invalid" | |
| # # img_b64 = self.preprocess_image_with_retinaface(image_path) | |
| # # if not img_b64: | |
| # # return "ERROR", None, "invalid" | |
| # # model_id = self.model_map[self.model_name] | |
| # # method = "retinaface_crop" if self.use_face_detector else "original" | |
| # # try: | |
| # # resp = self.client.chat.completions.create( | |
| # # extra_headers=self.extra_headers, | |
| # # model=model_id, | |
| # # messages=[{ | |
| # # "role": "user", | |
| # # "content": [ | |
| # # {"type": "image_url", "image_url": {"url": img_b64}}, | |
| # # {"type": "text", "text": prompt} | |
| # # ] | |
| # # }], | |
| # # max_tokens=50, | |
| # # temperature=0.1, | |
| # # ) | |
| # # content = resp.choices[0].message.content | |
| # # result = self.normalize_output(content) | |
| # # if self.model_name == "qwen" and result == "REAL": | |
| # # result = self.reverify_qwen(img_b64, result) | |
| # # return result, content, method | |
| # # except Exception as e: | |
| # # logger.error(f"Detection failed: {e}") | |
| # # return "ERROR", None, method | |
| # # # === Dataset & Resume === | |
| # # def get_images(self): | |
| # # dataset_path = Path(self.dataset_folder) | |
| # # real_path = dataset_path / "face_real" | |
| # # fake_path = dataset_path / "face_fake" | |
| # # if not real_path.exists() or not fake_path.exists(): | |
| # # print("❌ Dataset folders missing.") | |
| # # return [] | |
| # # real_images = sorted(list(real_path.glob("*.jpg")))[:500] | |
| # # fake_images = sorted(list(fake_path.glob("*.jpg")))[:500] | |
| # # return [(str(p), "REAL") for p in real_images] + [(str(p), "FAKE") for p in fake_images] | |
| # # def load_existing_results(self): | |
| # # csv_path = os.path.join(self.results_folder, self.csv_filename) | |
| # # if not os.path.exists(csv_path): | |
| # # return [] | |
| # # results = [] | |
| # # with open(csv_path, "r", encoding="utf-8") as f: | |
| # # reader = csv.reader(f) | |
| # # next(reader) | |
| # # for row in reader: | |
| # # if len(row) >= 5: | |
| # # results.append((row[0], row[1], row[2], row[3], row[4])) | |
| # # logger.info(f"Loaded {len(results)} existing results") | |
| # # return results | |
| # # def save_results_to_csv(self, results): | |
| # # csv_path = os.path.join(self.results_folder, self.csv_filename) | |
| # # with open(csv_path, "w", newline="", encoding="utf-8") as f: | |
| # # writer = csv.writer(f) | |
| # # writer.writerow(["filename", "ground_truth", "llm_result", "model_name", "method"]) | |
| # # writer.writerows(results) | |
| # # logger.info(f"Saved {len(results)} results to {csv_path}") | |
| # # # === Adaptive delay logic === | |
| # # def adjust_delay(self): | |
| # # if self.fail_count > 5: | |
| # # self.delay = min(self.delay + 0.2, 2.0) | |
| # # logger.warning(f"Increasing delay to {self.delay:.1f}s due to failures.") | |
| # # self.fail_count = 0 | |
| # # elif self.success_count > 10: | |
| # # self.delay = max(self.delay - 0.1, 0.3) | |
| # # logger.info(f"Reducing delay to {self.delay:.1f}s (stable).") | |
| # # self.success_count = 0 | |
| # # def run_detection(self, resume=True): | |
| # # all_images = self.get_images() | |
| # # if not all_images: | |
| # # return | |
| # # results = self.load_existing_results() if resume else [] | |
| # # processed = {r[0] for r in results} | |
| # # remaining = [(p, gt) for p, gt in all_images if os.path.basename(p) not in processed] | |
| # # print(f"\n=== STARTING {self.model_name.upper()} DETECTION ===") | |
| # # print(f"Total: {len(all_images)} | Already done: {len(processed)} | Remaining: {len(remaining)}") | |
| # # with tqdm(total=len(remaining), desc=f"{self.model_name.upper()}") as pbar: | |
| # # for img_path, truth in remaining: | |
| # # try: | |
| # # result, response, method = self.detect_deepfake_llm(img_path) | |
| # # results.append((os.path.basename(img_path), truth, result, self.model_name, method)) | |
| # # self.success_count += 1 | |
| # # except Exception as e: | |
| # # logger.error(f"Fatal error: {e}") | |
| # # results.append((os.path.basename(img_path), truth, "ERROR", self.model_name, "error")) | |
| # # self.fail_count += 1 | |
| # # pbar.set_description(f"{os.path.basename(img_path)} -> {result}") | |
| # # pbar.update(1) | |
| # # self.save_results_to_csv(results) | |
| # # self.adjust_delay() | |
| # # time.sleep(self.delay) | |
| # # print(f"\n✅ Detection completed for {self.model_name.upper()}") | |
| # # print(f"Results saved to: {os.path.join(self.results_folder, self.csv_filename)}") | |
| # #!/usr/bin/env python3 | |
| # #!/usr/bin/env python3 | |
| # import os | |
| # import csv | |
| # import time | |
| # import base64 | |
| # from pathlib import Path | |
| # from tqdm import tqdm | |
| # import logging | |
| # from PIL import Image | |
| # import io | |
| # from datetime import datetime | |
| # from openai import OpenAI | |
| # import numpy as np | |
| # import math # Diperlukan untuk perhitungan akurasi | |
| # # RetinaFace Configuration | |
| # try: | |
| # #retinaface | |
| # from retinaface import RetinaFace | |
| # RETINAFACE_AVAILABLE = True | |
| # except ImportError: | |
| # RETINAFACE_AVAILABLE = False | |
| # print("❌ ERROR: RetinaFace library not found. Running without face cropping.") | |
| # # === LOGGING CONFIG === | |
| # os.makedirs("logs", exist_ok=True) | |
| # logging.basicConfig( | |
| # filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log", | |
| # level=logging.INFO, | |
| # format="%(asctime)s - %(levelname)s - %(message)s", | |
| # ) | |
| # logger = logging.getLogger(__name__) | |
| # class DeepfakeDetector: | |
| # """Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) with RetinaFace + adaptive delay""" | |
| # def __init__(self, api_key, model_name="qwen", debug_mode=False, start_from=0, use_face_detector=True): | |
| # self.api_key = api_key | |
| # self.model_name = model_name.lower() | |
| # self.debug_mode = debug_mode | |
| # self.start_from = start_from | |
| # self.dataset_folder = "dataset" | |
| # self.results_folder = "result" | |
| # self.csv_filename = f"result_{self.model_name}_{start_from}.csv" | |
| # # OpenRouter API client | |
| # self.client = OpenAI( | |
| # base_url="https://openrouter.ai/api/v1", | |
| # api_key=self.api_key, | |
| # ) | |
| # self.extra_headers = { | |
| # "HTTP-Referer": "https://github.com/retinaface-comparison", | |
| # "X-Title": "Deepfake Detection Adaptive" | |
| # } | |
| # # Model map (5 LLMs via OpenRouter) | |
| # self.model_map = { | |
| # "qwen": "qwen/qwen3-vl-8b-instruct", | |
| # "gpt": "openai/chatgpt-4o-latest", | |
| # "gemini": "google/gemini-2.5-flash", | |
| # "llama": "meta-llama/llama-3.2-90b-vision-instruct", | |
| # "cohere": "cohere/command-r-plus-08-2024", | |
| # } | |
| # if self.model_name not in self.model_map: | |
| # raise ValueError("❌ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere") | |
| # logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})") | |
| # os.makedirs(self.results_folder, exist_ok=True) | |
| # self.use_face_detector = use_face_detector and RETINAFACE_AVAILABLE | |
| # self.target_size = 512 | |
| # # waktu delay | |
| # self.delay = 0.3 | |
| # self.fail_count = 0 | |
| # self.success_count = 0 | |
| # print(f"\nModel: {self.model_name.upper()} | RetinaFace Cropping: {'ON' if self.use_face_detector else 'OFF'}") | |
| # # Image handling (RetinaFace) | |
| # def preprocess_image_with_retinaface(self, image_path): | |
| # if not self.use_face_detector or not RETINAFACE_AVAILABLE: | |
| # return self.encode_image_simple(image_path) | |
| # try: | |
| # # Perlu diubah ke string karena RetinaFace kadang tidak menerima objek Path | |
| # faces = RetinaFace.detect_faces(str(image_path)) | |
| # if not faces: | |
| # logger.warning(f"No face detected in {image_path}") | |
| # return self.encode_image_simple(image_path) | |
| # first_face = list(faces.values())[0] | |
| # facial_area = first_face.get("facial_area", None) | |
| # if not facial_area or len(facial_area) != 4: | |
| # return self.encode_image_simple(image_path) | |
| # x1, y1, x2, y2 = facial_area | |
| # img = Image.open(image_path).convert("RGB") | |
| # # Tambahkan margin (ekstraksi) | |
| # margin_ratio = 0.2 | |
| # w, h = x2 - x1, y2 - y1 | |
| # margin_x = int(w * margin_ratio) | |
| # margin_y = int(h * margin_ratio) | |
| # x1 = max(0, x1 - margin_x) | |
| # y1 = max(0, y1 - margin_y) | |
| # x2 = min(img.width, x2 + margin_x) | |
| # y2 = min(img.height, y2 + margin_y) | |
| # cropped_img = img.crop((x1, y1, x2, y2)) | |
| # cropped_img = cropped_img.resize((self.target_size, self.target_size), Image.Resampling.LANCZOS) | |
| # buf = io.BytesIO() | |
| # cropped_img.save(buf, format="JPEG", quality=90, optimize=True) | |
| # encoded = base64.b64encode(buf.getvalue()).decode("utf-8") | |
| # return f"data:image/jpeg;base64,{encoded}" | |
| # except Exception as e: | |
| # logger.error(f"RetinaFace error on {image_path}: {e}") | |
| # return self.encode_image_simple(image_path) | |
| # def encode_image_simple(self, image_path): | |
| # try: | |
| # with open(image_path, "rb") as f: | |
| # encoded = base64.b64encode(f.read()).decode("utf-8") | |
| # return f"data:image/jpeg;base64,{encoded}" | |
| # except Exception as e: | |
| # logger.error(f"Encode error: {e}") | |
| # return None | |
| # def validate_image(self, image_path): | |
| # try: | |
| # if not os.path.exists(image_path): | |
| # return False | |
| # with Image.open(image_path) as img: | |
| # img.verify() | |
| # return True | |
| # except Exception: | |
| # return False | |
| # def normalize_output(self, content): | |
| # """ | |
| # Normalizes verbose LLM output to a single word: REAL, FAKE, or UNKNOWN. | |
| # """ | |
| # if not content: | |
| # return "UNKNOWN" | |
| # text = content.strip().upper() | |
| # # Mencari kata kunci FAKE | |
| # if any(w in text for w in ["FAKE", "DEEPFAKE", "AI GENERATED", "SYNTHETIC"]): | |
| # return "FAKE" | |
| # # Mencari kata kunci REAL | |
| # if any(w in text for w in ["REAL", "GENUINE", "HUMAN", "NOT FAKE"]): | |
| # return "REAL" | |
| # # Upaya kedua: Coba ambil kata pertama/kata kunci di tengah respons | |
| # words = text.split() | |
| # if words: | |
| # for word in words[:3]: # Cek 3 kata pertama | |
| # if "REAL" in word: return "REAL" | |
| # if "FAKE" in word: return "FAKE" | |
| # logger.warning(f"Output ambiguous/unpredictable: {content}") | |
| # return "UNKNOWN" | |
| # def reverify_qwen(self, img_b64, prev_result): | |
| # # Logika re-verifikasi | |
| # prompt = ( | |
| # "Re-analyze this face image for deepfake signs. " | |
| # "Focus on lighting, symmetry, and unnatural skin artifacts. " | |
| # "Respond with one word only: REAL or FAKE." | |
| # ) | |
| # print(f"🔁 Re-verifying Qwen result (was {prev_result})...") | |
| # try: | |
| # resp = self.client.chat.completions.create( | |
| # extra_headers=self.extra_headers, | |
| # model=self.model_map["qwen"], | |
| # messages=[{ | |
| # "role": "user", | |
| # "content": [ | |
| # {"type": "image_url", "image_url": {"url": img_b64}}, | |
| # {"type": "text", "text": prompt} | |
| # ] | |
| # }], | |
| # max_tokens=50, | |
| # temperature=0.1, | |
| # ) | |
| # content = resp.choices[0].message.content.strip().upper() | |
| # if "FAKE" in content: | |
| # print("Changed to FAKE after second check") | |
| # return "FAKE" | |
| # elif "REAL" in content: | |
| # print("Confirmed REAL after second check") | |
| # return "REAL" | |
| # else: | |
| # print("Still ambiguous after recheck") | |
| # return prev_result | |
| # except Exception as e: | |
| # print(f"Qwen re-verification failed: {e}") | |
| # return prev_result | |
| # # Fungsi Fallback | |
| # def retinaface_simple_fallback(self, image_path): | |
| # """Applies a heuristic rule if LLM returns an error.""" | |
| # if not self.use_face_detector or not RETINAFACE_AVAILABLE: | |
| # return 'UNKNOWN_FALLBACK' | |
| # try: | |
| # # Panggil deteksi wajah (menggunakan str(image_path)) | |
| # faces = RetinaFace.detect_faces(str(image_path)) | |
| # if not faces: | |
| # return 'UNKNOWN_FALLBACK' | |
| # best_score = max(f['score'] for f in faces.values()) | |
| # # Aturan Heuristik: Jika skor kepercayaan wajah sangat tinggi, REAL. | |
| # if best_score > 0.995: | |
| # return 'REAL' | |
| # else: | |
| # return 'UNKNOWN_FALLBACK' | |
| # except Exception: | |
| # return 'UNKNOWN_FALLBACK' | |
| # # Deteksi utama | |
| # def detect_deepfake_llm(self, image_path): | |
| # prompt = ( | |
| # "You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. " | |
| # "Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE." | |
| # ) | |
| # if not self.validate_image(image_path): | |
| # return "ERROR", None, "invalid" | |
| # img_b64 = self.preprocess_image_with_retinaface(image_path) | |
| # if not img_b64: | |
| # return "ERROR", None, "invalid" | |
| # model_id = self.model_map[self.model_name] | |
| # method = "retinaface_crop" if self.use_face_detector else "original" | |
| # try: | |
| # resp = self.client.chat.completions.create( | |
| # extra_headers=self.extra_headers, | |
| # model=model_id, | |
| # messages=[{ | |
| # "role": "user", | |
| # "content": [ | |
| # {"type": "image_url", "image_url": {"url": img_b64}}, | |
| # {"type": "text", "text": prompt} | |
| # ] | |
| # }], | |
| # max_tokens=50, | |
| # temperature=0.1, | |
| # ) | |
| # content = resp.choices[0].message.content | |
| # result = self.normalize_output(content) | |
| # if self.model_name == "qwen" and result == "REAL": | |
| # result = self.reverify_qwen(img_b64, result) | |
| # return result, content, method | |
| # except Exception as e: | |
| # logger.error(f"Detection failed: {e}") | |
| # # Tambahkan logika Fallback RetinaFace | |
| # fallback_result = self.retinaface_simple_fallback(image_path) | |
| # logger.warning(f"LLM Failed. Applying Fallback Logic: {fallback_result}") | |
| # if fallback_result == 'REAL': | |
| # # Jika heuristik RetinaFace yakin gambar BERKUALITAS BAGUS, prediksi REAL | |
| # return 'REAL', "RetinaFace Heuristic", "retinaface_crop_FALLBACK" | |
| # else: | |
| # return "FAKE", "RetinaFace Heuristic (Assumed FAKE)", "retinaface_crop" | |
| # # === Dataset & Resume === | |
| # def get_images(self): | |
| # dataset_path = Path(self.dataset_folder) | |
| # real_path = dataset_path / "face_real" | |
| # fake_path = dataset_path / "face_fake" | |
| # if not real_path.exists() or not fake_path.exists(): | |
| # print("❌ Dataset folders missing.") | |
| # return [] | |
| # # Batasi gambar menjadi 500 REAL dan 500 FAKE (total 1000) | |
| # real_images = sorted(list(real_path.glob("*.jpg")))[:500] | |
| # fake_images = sorted(list(fake_path.glob("*.jpg")))[:500] | |
| # return [(str(p), "REAL") for p in real_images] + [(str(p), "FAKE") for p in fake_images] | |
| # def load_existing_results(self): | |
| # csv_path = os.path.join(self.results_folder, self.csv_filename) | |
| # if not os.path.exists(csv_path): | |
| # return [] | |
| # results = [] | |
| # with open(csv_path, "r", encoding="utf-8") as f: | |
| # reader = csv.reader(f) | |
| # next(reader) | |
| # for row in reader: | |
| # # baris memiliki 5 kolom | |
| # if len(row) >= 5: | |
| # results.append((row[0], row[1], row[2], row[3], row[4])) | |
| # logger.info(f"Loaded {len(results)} existing results") | |
| # return results | |
| # def save_results_to_csv(self, results): | |
| # csv_path = os.path.join(self.results_folder, self.csv_filename) | |
| # with open(csv_path, "w", newline="", encoding="utf-8") as f: | |
| # writer = csv.writer(f) | |
| # writer.writerow(["filename", "ground_truth", "llm_result", "model_name", "method"]) | |
| # writer.writerows(results) | |
| # logger.info(f"Saved {len(results)} results to {csv_path}") | |
| # # logika delay | |
| # def adjust_delay(self): | |
| # # Logika adaptive delay | |
| # if self.fail_count > 5: | |
| # self.delay = min(self.delay + 0.2, 2.0) | |
| # logger.warning(f"Increasing delay to {self.delay:.1f}s due to failures.") | |
| # self.fail_count = 0 | |
| # elif self.success_count > 10: | |
| # self.delay = max(self.delay - 0.1, 0.3) | |
| # logger.info(f"Reducing delay to {self.delay:.1f}s (stable).") | |
| # self.success_count = 0 | |
| # def run_detection(self, resume=True): | |
| # all_images = self.get_images() | |
| # if not all_images: | |
| # return | |
| # results = self.load_existing_results() if resume else [] | |
| # processed = {r[0] for r in results} | |
| # remaining = [(p, gt) for p, gt in all_images if os.path.basename(p) not in processed] | |
| # print(f"\n=== STARTING {self.model_name.upper()} DETECTION ===") | |
| # print(f"Total: {len(all_images)} | Already done: {len(processed)} | Remaining: {len(remaining)}") | |
| # with tqdm(total=len(remaining), desc=f"{self.model_name.upper()}") as pbar: | |
| # for img_path, truth in remaining: | |
| # try: | |
| # result, response, method = self.detect_deepfake_llm(img_path) | |
| # results.append((os.path.basename(img_path), truth, result, self.model_name, method)) | |
| # self.success_count += 1 | |
| # except Exception as e: | |
| # logger.error(f"Fatal error: {e}") | |
| # results.append((os.path.basename(img_path), truth, "ERROR", self.model_name, "error")) | |
| # self.fail_count += 1 | |
| # pbar.set_description(f"{os.path.basename(img_path)} -> {result}") | |
| # pbar.update(1) | |
| # self.save_results_to_csv(results) | |
| # self.adjust_delay() | |
| # time.sleep(self.delay) | |
| # print(f"\n✅ Deteksi selesai untuk {self.model_name.upper()}") | |
| # print(f"Hasil simpan ke: {os.path.join(self.results_folder, self.csv_filename)}") | |
| # #!/usr/bin/env python3 | |
| # import os | |
| # import csv | |
| # import time | |
| # import base64 | |
| # from pathlib import Path | |
| # from tqdm import tqdm | |
| # import logging | |
| # from PIL import Image | |
| # import io | |
| # from datetime import datetime | |
| # from openai import OpenAI | |
| # import numpy as np | |
| # import math | |
| # # === RetinaFace Configuration === | |
| # try: | |
| # from retinaface import RetinaFace | |
| # RETINAFACE_AVAILABLE = True | |
| # except ImportError: | |
| # RETINAFACE_AVAILABLE = False | |
| # print("❌ ERROR: RetinaFace library not found. Please run 'pip install retina-face'. Running without face cropping.") | |
| # # === LOGGING CONFIG === | |
| # os.makedirs("logs", exist_ok=True) | |
| # logging.basicConfig( | |
| # filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log", | |
| # level=logging.INFO, | |
| # format="%(asctime)s - %(levelname)s - %(message)s", | |
| # ) | |
| # logger = logging.getLogger(__name__) | |
| # class DeepfakeDetector: | |
| # """Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) with RetinaFace + adaptive delay""" | |
| # def __init__(self, api_key, model_name="qwen", debug_mode=False, start_from=0, use_face_detector=True): | |
| # self.api_key = api_key | |
| # self.model_name = model_name.lower() | |
| # self.debug_mode = debug_mode | |
| # self.start_from = start_from | |
| # self.dataset_folder = "dataset" | |
| # self.results_folder = "result" | |
| # self.csv_filename = f"result_{self.model_name}_{start_from}.csv" | |
| # # === OpenRouter API client (Menggunakan OpenRouter untuk semua model) === | |
| # self.client = OpenAI( | |
| # base_url="https://openrouter.ai/api/v1", | |
| # api_key=self.api_key, | |
| # ) | |
| # self.extra_headers = { | |
| # "HTTP-Referer": "https://github.com/retinaface-comparison", | |
| # "X-Title": "Deepfake Detection Adaptive" | |
| # } | |
| # # === Model map (5 LLMs via OpenRouter) === | |
| # self.model_map = { | |
| # "qwen": "qwen/qwen3-vl-8b-instruct", | |
| # "gpt": "openai/chatgpt-4o-latest", | |
| # "gemini": "google/gemini-2.5-flash", | |
| # "llama": "meta-llama/llama-3.2-90b-vision-instruct", | |
| # "cohere": "cohere/command-r-plus-08-2024", | |
| # } | |
| # if self.model_name not in self.model_map: | |
| # raise ValueError("❌ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere") | |
| # logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})") | |
| # os.makedirs(self.results_folder, exist_ok=True) | |
| # self.use_face_detector = False and RETINAFACE_AVAILABLE | |
| # self.target_size = 512 | |
| # # waktu delay | |
| # self.delay = 0.3 | |
| # self.fail_count = 0 | |
| # self.success_count = 0 | |
| # print(f"\nModel: {self.model_name.upper()} | RetinaFace Cropping: {'ON' if self.use_face_detector else 'OFF'}") | |
| # # === Image handling (RetinaFace) === | |
| # def preprocess_image_with_retinaface(self, image_path): | |
| # if not self.use_face_detector or not RETINAFACE_AVAILABLE: | |
| # return self.encode_image_simple(image_path) | |
| # try: | |
| # # Perlu diubah ke string karena RetinaFace kadang tidak menerima objek Path | |
| # faces = RetinaFace.detect_faces(str(image_path)) | |
| # if not faces: | |
| # logger.warning(f"No face detected in {image_path}") | |
| # return self.encode_image_simple(image_path) | |
| # first_face = list(faces.values())[0] | |
| # facial_area = first_face.get("facial_area", None) | |
| # if not facial_area or len(facial_area) != 4: | |
| # return self.encode_image_simple(image_path) | |
| # x1, y1, x2, y2 = facial_area | |
| # img = Image.open(image_path).convert("RGB") | |
| # # Tambahkan margin (ekstraksi) | |
| # margin_ratio = 0.2 | |
| # w, h = x2 - x1, y2 - y1 | |
| # margin_x = int(w * margin_ratio) | |
| # margin_y = int(h * margin_ratio) | |
| # x1 = max(0, x1 - margin_x) | |
| # y1 = max(0, y1 - margin_y) | |
| # x2 = min(img.width, x2 + margin_x) | |
| # y2 = min(img.height, y2 + margin_y) | |
| # cropped_img = img.crop((x1, y1, x2, y2)) | |
| # cropped_img = cropped_img.resize((self.target_size, self.target_size), Image.Resampling.LANCZOS) | |
| # buf = io.BytesIO() | |
| # cropped_img.save(buf, format="JPEG", quality=90, optimize=True) | |
| # encoded = base64.b64encode(buf.getvalue()).decode("utf-8") | |
| # return f"data:image/jpeg;base64,{encoded}" | |
| # except Exception as e: | |
| # logger.error(f"RetinaFace error on {image_path}: {e}") | |
| # return self.encode_image_simple(image_path) | |
| # def encode_image_simple(self, image_path): | |
| # try: | |
| # with open(image_path, "rb") as f: | |
| # encoded = base64.b64encode(f.read()).decode("utf-8") | |
| # return f"data:image/jpeg;base64,{encoded}" | |
| # except Exception as e: | |
| # logger.error(f"Encode error: {e}") | |
| # return None | |
| # def validate_image(self, image_path): | |
| # try: | |
| # if not os.path.exists(image_path): | |
| # return False | |
| # with Image.open(image_path) as img: | |
| # img.verify() | |
| # return True | |
| # except Exception: | |
| # return False | |
| # def normalize_output(self, content): | |
| # """ | |
| # Normalizes verbose LLM output to a single word: REAL, FAKE, or UNKNOWN. | |
| # """ | |
| # if not content: | |
| # return "UNKNOWN" | |
| # text = content.strip().upper() | |
| # # Mencari kata kunci FAKE (atau sinonim) | |
| # if any(w in text for w in ["FAKE", "DEEPFAKE", "AI GENERATED", "SYNTHETIC"]): | |
| # return "FAKE" | |
| # # Mencari kata kunci REAL (atau sinonim) | |
| # if any(w in text for w in ["REAL", "GENUINE", "HUMAN", "NOT FAKE"]): | |
| # return "REAL" | |
| # # Upaya kedua: Coba ambil kata pertama/kata kunci di tengah respons | |
| # words = text.split() | |
| # if words: | |
| # for word in words[:3]: # Cek 3 kata pertama | |
| # if "REAL" in word: return "REAL" | |
| # if "FAKE" in word: return "FAKE" | |
| # logger.warning(f"Output ambiguous/unpredictable: {content}") | |
| # return "UNKNOWN" | |
| # def reverify_qwen(self, img_b64, prev_result): | |
| # # Logika re-verifikasi | |
| # prompt = ( | |
| # "Re-analyze this face image for deepfake signs. " | |
| # "Focus on lighting, symmetry, and unnatural skin artifacts. " | |
| # "Respond with one word only: REAL or FAKE." | |
| # ) | |
| # print(f"🔁 Re-verifying Qwen result (was {prev_result})...") | |
| # try: | |
| # resp = self.client.chat.completions.create( | |
| # extra_headers=self.extra_headers, | |
| # model=self.model_map["qwen"], | |
| # messages=[{ | |
| # "role": "user", | |
| # "content": [ | |
| # {"type": "image_url", "image_url": {"url": img_b64}}, | |
| # {"type": "text", "text": prompt} | |
| # ] | |
| # }], | |
| # max_tokens=50, | |
| # temperature=0.1, | |
| # ) | |
| # content = resp.choices[0].message.content.strip().upper() | |
| # if "FAKE" in content: | |
| # print("Changed to FAKE after second check") | |
| # return "FAKE" | |
| # elif "REAL" in content: | |
| # print("Confirmed REAL after second check") | |
| # return "REAL" | |
| # else: | |
| # print("Still ambiguous after recheck") | |
| # return prev_result | |
| # except Exception as e: | |
| # print(f"Qwen re-verification failed: {e}") | |
| # return prev_result | |
| # # Fungsi Fallback Sederhana RetinaFace (Heuristik) DIHAPUS | |
| # # === Deteksi utama === | |
| # def detect_deepfake_llm(self, image_path): | |
| # prompt = ( | |
| # "You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. " | |
| # "Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE." | |
| # ) | |
| # if not self.validate_image(image_path): | |
| # return "ERROR", None, "invalid" | |
| # img_b64 = self.preprocess_image_with_retinaface(image_path) | |
| # if not img_b64: | |
| # return "ERROR", None, "invalid" | |
| # model_id = self.model_map[self.model_name] | |
| # method = "retinaface_crop" if self.use_face_detector else "original" | |
| # try: | |
| # resp = self.client.chat.completions.create( | |
| # extra_headers=self.extra_headers, | |
| # model=model_id, | |
| # messages=[{ | |
| # "role": "user", | |
| # "content": [ | |
| # {"type": "image_url", "image_url": {"url": img_b64}}, | |
| # {"type": "text", "text": prompt} | |
| # ] | |
| # }], | |
| # max_tokens=50, | |
| # temperature=0.1, | |
| # ) | |
| # content = resp.choices[0].message.content | |
| # result = self.normalize_output(content) | |
| # if self.model_name == "qwen" and result == "REAL": | |
| # result = self.reverify_qwen(img_b64, result) | |
| # return result, content, method | |
| # except Exception as e: | |
| # logger.error(f"Detection failed: {e}") | |
| # # --- LOGIKA KETIKA LLM GAGAL (TIDAK ADA TEBAKAN) --- | |
| # # Jika LLM gagal, catat sebagai ERROR. | |
| # return "ERROR", None, "API_FAILURE" # Mengganti 'error' dengan 'API_FAILURE' untuk kejelasan | |
| # # === Dataset & Resume === | |
| # def get_images(self): | |
| # dataset_path = Path(self.dataset_folder) | |
| # real_path = dataset_path / "face_real" | |
| # fake_path = dataset_path / "face_fake" | |
| # if not real_path.exists() or not fake_path.exists(): | |
| # print("❌ Dataset folders missing.") | |
| # return [] | |
| # # Batasi gambar menjadi 500 REAL dan 500 FAKE (total 1000) | |
| # real_images = sorted(list(real_path.glob("*.jpg")))[:500] | |
| # fake_images = sorted(list(fake_path.glob("*.jpg")))[:500] | |
| # return [(str(p), "REAL") for p in real_images] + [(str(p), "FAKE") for p in fake_images] | |
| # def load_existing_results(self): | |
| # csv_path = os.path.join(self.results_folder, self.csv_filename) | |
| # if not os.path.exists(csv_path): | |
| # return [] | |
| # results = [] | |
| # with open(csv_path, "r", encoding="utf-8") as f: | |
| # reader = csv.reader(f) | |
| # next(reader) | |
| # for row in reader: | |
| # # Pastikan baris memiliki 5 kolom | |
| # if len(row) >= 5: | |
| # results.append((row[0], row[1], row[2], row[3], row[4])) | |
| # logger.info(f"Loaded {len(results)} existing results") | |
| # return results | |
| # def save_results_to_csv(self, results): | |
| # csv_path = os.path.join(self.results_folder, self.csv_filename) | |
| # with open(csv_path, "w", newline="", encoding="utf-8") as f: | |
| # writer = csv.writer(f) | |
| # writer.writerow(["filename", "ground_truth", "llm_result", "model_name", "method"]) | |
| # writer.writerows(results) | |
| # logger.info(f"Saved {len(results)} results to {csv_path}") | |
| # # === Adaptive delay logic === | |
| # def adjust_delay(self): | |
| # # Logika adaptive delay | |
| # if self.fail_count > 5: | |
| # self.delay = min(self.delay + 0.2, 2.0) | |
| # logger.warning(f"Increasing delay to {self.delay:.1f}s due to failures.") | |
| # self.fail_count = 0 | |
| # elif self.success_count > 10: | |
| # self.delay = max(self.delay - 0.1, 0.3) | |
| # logger.info(f"Reducing delay to {self.delay:.1f}s (stable).") | |
| # self.success_count = 0 | |
| # def run_detection(self, resume=True): | |
| # all_images = self.get_images() | |
| # if not all_images: | |
| # return | |
| # results = self.load_existing_results() if resume else [] | |
| # processed = {r[0] for r in results} | |
| # remaining = [(p, gt) for p, gt in all_images if os.path.basename(p) not in processed] | |
| # print(f"\n=== STARTING {self.model_name.upper()} DETECTION ===") | |
| # print(f"Total: {len(all_images)} | Already done: {len(processed)} | Remaining: {len(remaining)}") | |
| # with tqdm(total=len(remaining), desc=f"{self.model_name.upper()}") as pbar: | |
| # for img_path, truth in remaining: | |
| # try: | |
| # result, response, method = self.detect_deepfake_llm(img_path) | |
| # results.append((os.path.basename(img_path), truth, result, self.model_name, method)) | |
| # self.success_count += 1 | |
| # except Exception as e: | |
| # logger.error(f"Fatal error: {e}") | |
| # results.append((os.path.basename(img_path), truth, "ERROR", self.model_name, "error")) | |
| # self.fail_count += 1 | |
| # pbar.set_description(f"{os.path.basename(img_path)} -> {result}") | |
| # pbar.update(1) | |
| # self.save_results_to_csv(results) | |
| # self.adjust_delay() | |
| # time.sleep(self.delay) | |
| # print(f"\n✅ Deteksi selesai untuk {self.model_name.upper()}") | |
| # print(f"Hasil simpan ke: {os.path.join(self.results_folder, self.csv_filename)}") | |
| import os | |
| import time | |
| import base64 | |
| from pathlib import Path | |
| import logging | |
| from PIL import Image | |
| import io | |
| from datetime import datetime | |
| from openai import OpenAI | |
| import numpy as np | |
| # === RetinaFace Configuration === | |
| try: | |
| from retinaface import RetinaFace | |
| RETINAFACE_AVAILABLE = True | |
| except ImportError: | |
| RETINAFACE_AVAILABLE = False | |
| # === LOGGING CONFIG === | |
| os.makedirs("logs", exist_ok=True) | |
| logging.basicConfig( | |
| filename=f"logs/run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log", | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class DeepfakeDetector: | |
| """Deepfake Detection System (Qwen / GPT / Gemini / Llama / Cohere) for Single Image Inference""" | |
| def __init__(self, api_key, model_name="qwen", use_face_detector=True): | |
| self.api_key = api_key | |
| self.model_name = model_name.lower() | |
| # === OpenRouter API client === | |
| self.client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=self.api_key, | |
| ) | |
| self.extra_headers = { | |
| # Ganti Referer agar sesuai dengan Hugging Face | |
| "HTTP-Referer": "https://huggingface.co/spaces/[your-username]/[your-space-name]", | |
| "X-Title": f"Deepfake Detection Gradio ({self.model_name.upper()})" | |
| } | |
| # === Model map (5 LLMs via OpenRouter) === | |
| self.model_map = { | |
| "qwen": "qwen/qwen3-vl-8b-instruct", | |
| "gpt": "openai/chatgpt-4o-latest", | |
| "gemini": "google/gemini-2.5-flash", | |
| "llama": "meta-llama/llama-3.2-90b-vision-instruct", | |
| # "cohere": "cohere/command-r-plus-08-2024", | |
| } | |
| if self.model_name not in self.model_map: | |
| raise ValueError("❌ Invalid model name. Choose from: qwen, gpt, gemini, llama, cohere") | |
| logger.info(f"Model selected: {self.model_name.upper()} ({self.model_map[self.model_name]})") | |
| # diinisialisasi berdasarkan input | |
| self.use_face_detector = use_face_detector and RETINAFACE_AVAILABLE | |
| self.target_size = 512 | |
| # === Image handling (RetinaFace) === | |
| # FUNGSI INI TETAP TIDAK BERUBAH | |
| def preprocess_image_with_retinaface(self, image_path): | |
| if not self.use_face_detector or not RETINAFACE_AVAILABLE: | |
| return self.encode_image_simple(image_path) | |
| try: | |
| faces = RetinaFace.detect_faces(str(image_path)) | |
| if not faces: | |
| logger.warning(f"No face detected in {image_path}") | |
| return self.encode_image_simple(image_path) | |
| first_face = list(faces.values())[0] | |
| facial_area = first_face.get("facial_area", None) | |
| if not facial_area or len(facial_area) != 4: | |
| return self.encode_image_simple(image_path) | |
| x1, y1, x2, y2 = facial_area | |
| img = Image.open(image_path).convert("RGB") | |
| # Tambahkan margin (ekstraksi) | |
| margin_ratio = 0.2 | |
| w, h = x2 - x1, y2 - y1 | |
| margin_x = int(w * margin_ratio) | |
| margin_y = int(h * margin_ratio) | |
| x1 = max(0, x1 - margin_x) | |
| y1 = max(0, y1 - margin_y) | |
| x2 = min(img.width, x2 + margin_x) | |
| y2 = min(img.height, y2 + margin_y) | |
| cropped_img = img.crop((x1, y1, x2, y2)) | |
| cropped_img = cropped_img.resize((self.target_size, self.target_size), Image.Resampling.LANCZOS) | |
| buf = io.BytesIO() | |
| cropped_img.save(buf, format="JPEG", quality=90, optimize=True) | |
| encoded = base64.b64encode(buf.getvalue()).decode("utf-8") | |
| return f"data:image/jpeg;base64,{encoded}" | |
| except Exception as e: | |
| logger.error(f"RetinaFace error on {image_path}: {e}") | |
| return self.encode_image_simple(image_path) | |
| # FUNGSI encode | |
| def encode_image_simple(self, image_path): | |
| try: | |
| with open(image_path, "rb") as f: | |
| encoded = base64.b64encode(f.read()).decode("utf-8") | |
| return f"data:image/jpeg;base64,{encoded}" | |
| except Exception as e: | |
| logger.error(f"Encode error: {e}") | |
| return None | |
| # FUNGSI validasi | |
| def validate_image(self, image_path): | |
| try: | |
| if not os.path.exists(image_path): | |
| return False | |
| with Image.open(image_path) as img: | |
| img.verify() | |
| return True | |
| except Exception: | |
| return False | |
| # FUNGSI normalize | |
| def normalize_output(self, content): | |
| """ | |
| Normalizes verbose LLM output to a single word: REAL, FAKE, or UNKNOWN. | |
| """ | |
| if not content: | |
| return "UNKNOWN" | |
| text = content.strip().upper() | |
| if any(w in text for w in ["FAKE", "DEEPFAKE", "AI GENERATED", "SYNTHETIC"]): | |
| return "FAKE" | |
| if any(w in text for w in ["REAL", "GENUINE", "HUMAN", "NOT FAKE"]): | |
| return "REAL" | |
| words = text.split() | |
| if words: | |
| for word in words[:3]: | |
| if "REAL" in word: return "REAL" | |
| if "FAKE" in word: return "FAKE" | |
| logger.warning(f"Output ambiguous/unpredictable: {content}") | |
| return "UNKNOWN" | |
| # FUNGSI analisis qwen | |
| def reverify_qwen(self, img_b64, prev_result): | |
| prompt = ( | |
| "Re-analyze this face image for deepfake signs. " | |
| "Focus on lighting, symmetry, and unnatural skin artifacts. " | |
| "Respond with one word only: REAL or FAKE." | |
| ) | |
| try: | |
| resp = self.client.chat.completions.create( | |
| extra_headers=self.extra_headers, | |
| model=self.model_map["qwen"], | |
| messages=[{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "image_url", "image_url": {"url": img_b64}}, | |
| {"type": "text", "text": prompt} | |
| ] | |
| }], | |
| max_tokens=50, | |
| temperature=0.1, | |
| ) | |
| content = resp.choices[0].message.content.strip().upper() | |
| if "FAKE" in content: | |
| # print("Changed to FAKE after second check") | |
| return "FAKE" | |
| elif "REAL" in content: | |
| # print("Confirmed REAL after second check") | |
| return "REAL" | |
| else: | |
| # print("Still ambiguous after recheck") | |
| return prev_result | |
| except Exception as e: | |
| # print(f"Qwen re-verification failed: {e}") | |
| return prev_result | |
| # === Deteksi utama === | |
| def detect_deepfake_llm(self, image_path): | |
| prompt = ( | |
| "You are a forensic image analyst. Analyze this face image for any deepfake or AI manipulation. " | |
| "Consider lighting, eyes, skin, and blending. Respond with only one word: REAL or FAKE." | |
| ) | |
| if not self.validate_image(image_path): | |
| return "ERROR", None, "invalid" | |
| img_b64 = self.preprocess_image_with_retinaface(image_path) | |
| if not img_b64: | |
| return "ERROR", None, "invalid" | |
| model_id = self.model_map[self.model_name] | |
| method = "retinaface_crop" if self.use_face_detector else "original" | |
| try: | |
| resp = self.client.chat.completions.create( | |
| extra_headers=self.extra_headers, | |
| model=model_id, | |
| messages=[{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "image_url", "image_url": {"url": img_b64}}, | |
| {"type": "text", "text": prompt} | |
| ] | |
| }], | |
| max_tokens=50, | |
| temperature=0.1, | |
| ) | |
| content = resp.choices[0].message.content | |
| result = self.normalize_output(content) | |
| if self.model_name == "qwen" and result == "REAL": | |
| result = self.reverify_qwen(img_b64, result) | |
| return result, content, method | |
| except Exception as e: | |
| logger.error(f"Detection failed: {e}") | |
| return "ERROR", None, "API_FAILURE" | |
| pass # Pass diletakkan di sini hanya sebagai penanda bahwa sisanya telah dihapus. |