import io import os import warnings import numpy as np import timm import torch import torch.nn.functional as F import cv2 import uvicorn from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from PIL import Image import torchvision.transforms as transforms warnings.filterwarnings("ignore") # ========================================== # ⚙️ CONFIGURATION (MODE LOKAL / PRODUCTION) # ========================================== APP_TITLE = "AI Forensic Detector API" DEVICE = "cuda" if torch.cuda.is_available() else "cpu" FEEDBACK_DIR = "feedback" ALLOWED_EXT = {"png", "jpg", "jpeg", "webp"} # Jalur "." membaca berkas model .pth yang berada satu folder dengan app.py ROOT_DIR = "." MODEL_FILES = ["ckpt_best_v4_epoch8.pth", "ckpt_best_v4_epoch14.pth"] # Threshold murni untuk filter internal model Deep Learning MODEL_THRESHOLD = 0.615 # ========================================== # 🚀 APP INIT & CORS MIDDLEWARE # ========================================== app = FastAPI(title=APP_TITLE) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) os.makedirs(os.path.join(FEEDBACK_DIR, "real"), exist_ok=True) os.makedirs(os.path.join(FEEDBACK_DIR, "fake"), exist_ok=True) # ========================================== # 📦 GLOBAL MODELS & TRANSFORMS # ========================================== models_ensemble = [] val_tf = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) # ========================================== # 🔗 LOAD MODELS FUNCTION # ========================================== def load_ensemble_models(): global models_ensemble models_ensemble = [] print(f"🖥️ ENGINE COMPUTATION RUNNING ON: {DEVICE}") print(f"📂 MEMERIKSA JALUR PROYEK LOKAL: {os.path.abspath(ROOT_DIR)}") print("⏳ Memuat infrastruktur model deep learning...") for f_name in MODEL_FILES: path = os.path.join(ROOT_DIR, f_name) abs_path = os.path.abspath(path) if not os.path.exists(abs_path): raise FileNotFoundError(f"File model ensemble tidak ditemukan di folder proyek: {abs_path}") print(f"📦 Loading komponen model: {f_name}...") m = timm.create_model('efficientnet_b0', pretrained=False, num_classes=2) ckpt = torch.load(abs_path, map_location=DEVICE) state_dict = ckpt["state_dict"] if isinstance(ckpt, dict) and "state_dict" in ckpt else ckpt m.load_state_dict(state_dict) m.to(DEVICE).eval() models_ensemble.append(m) print(f"✅ Sistem Ensemble Siap! ({len(models_ensemble)} Model Terintegrasi)") @app.on_event("startup") def startup_event(): try: load_ensemble_models() except Exception as e: print(f"❌ Gagal memuat arsitektur ensemble: {e}") raise RuntimeError(f"Startup digagalkan server: {e}") # ========================================== # 🔬 LAB FORENSIK DIGITAL - WITH WHATSAPP BYPASS # ========================================== def run_full_forensic_pipeline(img_pil: Image.Image, img_cv, file_bytes, filename): img_gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) h, w, c = img_cv.shape # Inisialisasi Poin Penalti untuk Penggabungan Keputusan poin_penalti_fake = 0 total_aturan = 5 # Deteksi Otomatis Jalur Kompresi WhatsApp via Nama Berkas fname_lower = filename.lower() is_whatsapp = "whatsapp" in fname_lower or "img-" in fname_lower or "wa" in fname_lower # --- DETEKSI OTOMATIS GAMBAR MONOKROM --- b, g, r = cv2.split(img_cv) is_monochrome = bool(np.max(np.abs(b.astype(int) - g.astype(int))) < 5 and \ np.max(np.abs(g.astype(int) - r.astype(int))) < 5) # [Step 1/12] Metadata exif = img_pil._getexif() step1 = "Ada metadata EXIF (Kamera Asli)" if exif else "Metadata kosong (Khas Gambar AI/Screenshot)" if not exif and not is_whatsapp: # Penalti dinonaktifkan untuk WhatsApp karena kompresi sistem WA pasti menghapus EXIF poin_penalti_fake += 1 # [Step 2/12] Analisis Pixel (Komite Binary + TTA) img_rgb = img_pil.convert("RGB") aug_images = [val_tf(img_rgb), val_tf(img_rgb.transpose(Image.FLIP_LEFT_RIGHT))] batch_t = torch.stack(aug_images).to(DEVICE) with torch.no_grad(): preds8 = F.softmax(models_ensemble[0](batch_t), dim=1).cpu().numpy() probs_epoch8 = np.mean(preds8, axis=0) preds14 = F.softmax(models_ensemble[1](batch_t), dim=1).cpu().numpy() probs_epoch14 = np.mean(preds14, axis=0) prob_fake_raw = float((0.4 * probs_epoch8[1]) + (0.6 * probs_epoch14[1])) step2 = f"Score Indikasi AI: {prob_fake_raw * 100:.1f}%" # Aturan Khusus WhatsApp: Naikkan batas threshold model AI agar tidak sensitif pada noise rusak WA THRESHOLD_MODEL_ACTUAL = 0.85 if is_whatsapp else MODEL_THRESHOLD if prob_fake_raw >= THRESHOLD_MODEL_ACTUAL: poin_penalti_fake += 1 # [Step 3/12] Analisis Pola Sensor CFA diff_cfa = np.mean(np.abs(img_gray[1:, :] - img_gray[:-1, :])) step3 = "Pola interpolasi mulus alami" if diff_cfa < 15.0 else "Anomali interpolasi piksel buatan terdeteksi" if diff_cfa >= 15.0 and not is_whatsapp: # Penalti dinonaktifkan untuk WhatsApp karena kompresi WA merusak interpolasi alami piksel sensor poin_penalti_fake += 1 # [Step 4/12] Pencarian jejak Hex/Binary ai_signatures = [b"midjourney", b"stable diffusion", b"adobe firefly", b"dall-e", b"photoshop"] found_sigs = [sig.decode() for sig in ai_signatures if sig in file_bytes.lower()] step4 = f"Ditemukan string biner: {found_sigs}" if found_sigs else "Biner bersih dari signature generator AI" if found_sigs: poin_penalti_fake += 1 # [Step 5/12] Pemetaan Noise laplacian_var = float(cv2.Laplacian(img_gray, cv2.CV_64F).var()) step5 = f"Varians noise lokal: {laplacian_var:.2f}" # [Step 6/12] Analisis Geometri aspect_ratio = w / h step6 = f"Dimensi berkas: {w}x{h} (Rasio: {aspect_ratio:.2f})" # [Step 7/12] Pencarian Artifact Visual edges = cv2.Canny(img_gray, 100, 200) edge_density = float(np.sum(edges > 0) / (h * w)) step7 = f"Kepadatan tekstur tepi: {edge_density*100:.2f}%" # [Step 8/12] Verifikasi Tipe File via Magic Numbers header_bytes = file_bytes[:4] if header_bytes.startswith(b'\xff\xd8\xff'): real_type = "Murni JPEG" elif header_bytes.startswith(b'\x89PNG'): real_type = "Murni PNG" elif b'WEBP' in header_bytes: real_type = "Murni WEBP" else: real_type = "Format Termodifikasi" step8 = f"Tipe biner asli: {real_type}" # [Step 9/12] Analisis Konsistensi Pencahayaan skewness = float(np.mean(img_gray) - np.median(img_gray)) step9 = "Pencahayaan seimbang alami" if abs(skewness) < 10 else "Pencahayaan timpang (Khas editing/AI)" # [Step 10/12] Pemindaian Duplikasi Pixel downsample = cv2.resize(img_gray, (64, 64)) match_score = np.mean(np.abs(downsample - np.fliplr(downsample))) step10 = "Indikasi kloning matriks tinggi" if match_score < 5.0 else "Struktur piksel unik" # [Step 11/12] Analisis Pola Frekuensi GAN f_transform = np.fft.fft2(img_gray) f_shift = np.fft.fftshift(f_transform) magnitude_spectrum = 20 * np.log(np.abs(f_shift) + 1) mean_freq = float(np.mean(magnitude_spectrum)) step11 = f"Amplitudo rata-rata: {mean_freq:.2f} dB" # [Step 12/12] Inspeksi Tingkat Error (ELA) _, encoded_img = cv2.imencode('.jpg', img_cv, [cv2.IMWRITE_JPEG_QUALITY, 95]) ela_img = cv2.imdecode(encoded_img, cv2.IMREAD_COLOR) ela_score = float(np.mean(cv2.absdiff(img_cv, ela_img))) step12 = f"Rasio eror kompresi: {ela_score:.4f}" if ela_score > 1.5: poin_penalti_fake += 1 # ======================================================= # KETOK PALU KEPUTUSAN GABUNGAN MATEMATIKA MULTI-EVIDENCE # ======================================================= if poin_penalti_fake >= 3: prediction = "AI" # Menyesuaikan label tampilan sistem frontend web Anda ("AI" / "REAL") confidence = (poin_penalti_fake / total_aturan) * 100 else: prediction = "REAL" confidence = ((total_aturan - poin_penalti_fake) / total_aturan) * 100 forensic_logs = { "step_1": f"[Step 1/12] Metadata: {step1}", "step_2": f"[Step 2/12] Analisis Pixel (Komite Binary): {step2}", "step_3": f"[Step 3/12] Analisis Pola Sensor CFA: {step3}", "step_4": f"[Step 4/12] Pencarian jejak Hex/Binary: {step4}", "step_5": f"[Step 5/12] Pemetaan Noise: {step5}", "step_6": f"[Step 6/12] Analisis Geometri: {step6}", "step_7": f"[Step 7/12] Pencarian Artifact Visual: {step7}", "step_8": f"[Step 8/12] Verifikasi Tipe File: {step8}", "step_9": f"[Step 9/12] Analisis Konsistensi Pencahayaan: {step9}", "step_10": f"[Step 10/12] Pemindaian Duplikasi Pixel: {step10}", "step_11": f"[Step 11/12] Analisis Pola Frekuensi GAN: {step11}", "step_12": f"[Step 12/12] Inspeksi Tingkat Error (ELA): {step12}" } return prediction, round(confidence, 2), round(prob_fake_raw * 100, 2), forensic_logs, is_monochrome, poin_penalti_fake # ========================================== # 🎛️ ENDPOINTS ROUTING # ========================================== @app.get("/") def root(): return { "status": "online", "engine": "Hybrid Integrated Forensic Engine (WhatsApp Bypass Fixed)", "models_loaded": len(models_ensemble) == 2, "hybrid_voting_rules_count": 5 } @app.post("/predict") async def predict(file: UploadFile = File(...)): if len(models_ensemble) < 2: raise HTTPException(status_code=503, detail="Model server belum siap di folder lokal") filename = file.filename or "image.jpg" ext = filename.lower().split(".")[-1] if "." in filename else "" if ext not in ALLOWED_EXT: raise HTTPException(status_code=400, detail="Format file wajib png/jpg/jpeg/webp") try: file_bytes = await file.read() img_pil = Image.open(io.BytesIO(file_bytes)) nparr = np.frombuffer(file_bytes, np.uint8) img_cv = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img_cv is None: raise ValueError("File korup atau struktur piksel tidak valid") # Eksekusi pipeline terintegrasi baru dengan parameter operan filename prediction, confidence, raw_score, forensic_steps, is_monochrome, active_penalties = run_full_forensic_pipeline( img_pil, img_cv, file_bytes, filename ) return { "filename": filename, "prediction": prediction, "confidence": f"{confidence}%", "raw_deep_learning_score": f"{raw_score}%", "active_fake_indicators": f"{active_penalties} dari 5", "is_monochrome_detected": is_monochrome, "forensic_analysis_logs": forensic_steps } except Exception as e: raise HTTPException(status_code=500, detail=f"Gagal memproses analisis gabungan forensik: {str(e)}") @app.post("/save-feedback") async def save_feedback(file: UploadFile = File(...), correct_label: str = Form(...)): label = correct_label.strip().upper() if label not in {"REAL", "AI", "FAKE"}: raise HTTPException(status_code=400, detail="Label harus REAL / FAKE") folder = "real" if label == "REAL" else "fake" save_path = os.path.join(FEEDBACK_DIR, folder, file.filename) try: contents = await file.read() with open(save_path, "wb") as f: f.write(contents) return {"status": "saved", "path": save_path} except Exception as e: raise HTTPException(status_code=500, detail=f"Gagal menyimpan berkas feedback: {str(e)}") if __name__ == "__main__": uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)