Spaces:
Sleeping
Sleeping
| import io | |
| import os | |
| import warnings | |
| import numpy as np | |
| import timm | |
| import torch | |
| import torch.nn.functional as F | |
| import cv2 | |
| import uvicorn | |
| from fastapi import FastAPI, File, Form, HTTPException, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from PIL import Image | |
| import torchvision.transforms as transforms | |
| warnings.filterwarnings("ignore") | |
| # ========================================== | |
| # ⚙️ CONFIGURATION (MODE LOKAL / PRODUCTION) | |
| # ========================================== | |
| APP_TITLE = "AI Forensic Detector API" | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| FEEDBACK_DIR = "feedback" | |
| ALLOWED_EXT = {"png", "jpg", "jpeg", "webp"} | |
| # Jalur "." membaca berkas model .pth yang berada satu folder dengan app.py | |
| ROOT_DIR = "." | |
| MODEL_FILES = ["ckpt_best_v4_epoch8.pth", "ckpt_best_v4_epoch14.pth"] | |
| # Threshold murni untuk filter internal model Deep Learning | |
| MODEL_THRESHOLD = 0.615 | |
| # ========================================== | |
| # 🚀 APP INIT & CORS MIDDLEWARE | |
| # ========================================== | |
| app = FastAPI(title=APP_TITLE) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| os.makedirs(os.path.join(FEEDBACK_DIR, "real"), exist_ok=True) | |
| os.makedirs(os.path.join(FEEDBACK_DIR, "fake"), exist_ok=True) | |
| # ========================================== | |
| # 📦 GLOBAL MODELS & TRANSFORMS | |
| # ========================================== | |
| models_ensemble = [] | |
| val_tf = transforms.Compose([ | |
| transforms.Resize((224, 224)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
| ]) | |
| # ========================================== | |
| # 🔗 LOAD MODELS FUNCTION | |
| # ========================================== | |
| def load_ensemble_models(): | |
| global models_ensemble | |
| models_ensemble = [] | |
| print(f"🖥️ ENGINE COMPUTATION RUNNING ON: {DEVICE}") | |
| print(f"📂 MEMERIKSA JALUR PROYEK LOKAL: {os.path.abspath(ROOT_DIR)}") | |
| print("⏳ Memuat infrastruktur model deep learning...") | |
| for f_name in MODEL_FILES: | |
| path = os.path.join(ROOT_DIR, f_name) | |
| abs_path = os.path.abspath(path) | |
| if not os.path.exists(abs_path): | |
| raise FileNotFoundError(f"File model ensemble tidak ditemukan di folder proyek: {abs_path}") | |
| print(f"📦 Loading komponen model: {f_name}...") | |
| m = timm.create_model('efficientnet_b0', pretrained=False, num_classes=2) | |
| ckpt = torch.load(abs_path, map_location=DEVICE) | |
| state_dict = ckpt["state_dict"] if isinstance(ckpt, dict) and "state_dict" in ckpt else ckpt | |
| m.load_state_dict(state_dict) | |
| m.to(DEVICE).eval() | |
| models_ensemble.append(m) | |
| print(f"✅ Sistem Ensemble Siap! ({len(models_ensemble)} Model Terintegrasi)") | |
| def startup_event(): | |
| try: | |
| load_ensemble_models() | |
| except Exception as e: | |
| print(f"❌ Gagal memuat arsitektur ensemble: {e}") | |
| raise RuntimeError(f"Startup digagalkan server: {e}") | |
| # ========================================== | |
| # 🔬 LAB FORENSIK DIGITAL - WITH WHATSAPP BYPASS | |
| # ========================================== | |
| def run_full_forensic_pipeline(img_pil: Image.Image, img_cv, file_bytes, filename): | |
| img_gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
| h, w, c = img_cv.shape | |
| # Inisialisasi Poin Penalti untuk Penggabungan Keputusan | |
| poin_penalti_fake = 0 | |
| total_aturan = 5 | |
| # Deteksi Otomatis Jalur Kompresi WhatsApp via Nama Berkas | |
| fname_lower = filename.lower() | |
| is_whatsapp = "whatsapp" in fname_lower or "img-" in fname_lower or "wa" in fname_lower | |
| # --- DETEKSI OTOMATIS GAMBAR MONOKROM --- | |
| b, g, r = cv2.split(img_cv) | |
| is_monochrome = bool(np.max(np.abs(b.astype(int) - g.astype(int))) < 5 and \ | |
| np.max(np.abs(g.astype(int) - r.astype(int))) < 5) | |
| # [Step 1/12] Metadata | |
| exif = img_pil._getexif() | |
| step1 = "Ada metadata EXIF (Kamera Asli)" if exif else "Metadata kosong (Khas Gambar AI/Screenshot)" | |
| if not exif and not is_whatsapp: | |
| # Penalti dinonaktifkan untuk WhatsApp karena kompresi sistem WA pasti menghapus EXIF | |
| poin_penalti_fake += 1 | |
| # [Step 2/12] Analisis Pixel (Komite Binary + TTA) | |
| img_rgb = img_pil.convert("RGB") | |
| aug_images = [val_tf(img_rgb), val_tf(img_rgb.transpose(Image.FLIP_LEFT_RIGHT))] | |
| batch_t = torch.stack(aug_images).to(DEVICE) | |
| with torch.no_grad(): | |
| preds8 = F.softmax(models_ensemble[0](batch_t), dim=1).cpu().numpy() | |
| probs_epoch8 = np.mean(preds8, axis=0) | |
| preds14 = F.softmax(models_ensemble[1](batch_t), dim=1).cpu().numpy() | |
| probs_epoch14 = np.mean(preds14, axis=0) | |
| prob_fake_raw = float((0.4 * probs_epoch8[1]) + (0.6 * probs_epoch14[1])) | |
| step2 = f"Score Indikasi AI: {prob_fake_raw * 100:.1f}%" | |
| # Aturan Khusus WhatsApp: Naikkan batas threshold model AI agar tidak sensitif pada noise rusak WA | |
| THRESHOLD_MODEL_ACTUAL = 0.85 if is_whatsapp else MODEL_THRESHOLD | |
| if prob_fake_raw >= THRESHOLD_MODEL_ACTUAL: | |
| poin_penalti_fake += 1 | |
| # [Step 3/12] Analisis Pola Sensor CFA | |
| diff_cfa = np.mean(np.abs(img_gray[1:, :] - img_gray[:-1, :])) | |
| step3 = "Pola interpolasi mulus alami" if diff_cfa < 15.0 else "Anomali interpolasi piksel buatan terdeteksi" | |
| if diff_cfa >= 15.0 and not is_whatsapp: | |
| # Penalti dinonaktifkan untuk WhatsApp karena kompresi WA merusak interpolasi alami piksel sensor | |
| poin_penalti_fake += 1 | |
| # [Step 4/12] Pencarian jejak Hex/Binary | |
| ai_signatures = [b"midjourney", b"stable diffusion", b"adobe firefly", b"dall-e", b"photoshop"] | |
| found_sigs = [sig.decode() for sig in ai_signatures if sig in file_bytes.lower()] | |
| step4 = f"Ditemukan string biner: {found_sigs}" if found_sigs else "Biner bersih dari signature generator AI" | |
| if found_sigs: | |
| poin_penalti_fake += 1 | |
| # [Step 5/12] Pemetaan Noise | |
| laplacian_var = float(cv2.Laplacian(img_gray, cv2.CV_64F).var()) | |
| step5 = f"Varians noise lokal: {laplacian_var:.2f}" | |
| # [Step 6/12] Analisis Geometri | |
| aspect_ratio = w / h | |
| step6 = f"Dimensi berkas: {w}x{h} (Rasio: {aspect_ratio:.2f})" | |
| # [Step 7/12] Pencarian Artifact Visual | |
| edges = cv2.Canny(img_gray, 100, 200) | |
| edge_density = float(np.sum(edges > 0) / (h * w)) | |
| step7 = f"Kepadatan tekstur tepi: {edge_density*100:.2f}%" | |
| # [Step 8/12] Verifikasi Tipe File via Magic Numbers | |
| header_bytes = file_bytes[:4] | |
| if header_bytes.startswith(b'\xff\xd8\xff'): real_type = "Murni JPEG" | |
| elif header_bytes.startswith(b'\x89PNG'): real_type = "Murni PNG" | |
| elif b'WEBP' in header_bytes: real_type = "Murni WEBP" | |
| else: real_type = "Format Termodifikasi" | |
| step8 = f"Tipe biner asli: {real_type}" | |
| # [Step 9/12] Analisis Konsistensi Pencahayaan | |
| skewness = float(np.mean(img_gray) - np.median(img_gray)) | |
| step9 = "Pencahayaan seimbang alami" if abs(skewness) < 10 else "Pencahayaan timpang (Khas editing/AI)" | |
| # [Step 10/12] Pemindaian Duplikasi Pixel | |
| downsample = cv2.resize(img_gray, (64, 64)) | |
| match_score = np.mean(np.abs(downsample - np.fliplr(downsample))) | |
| step10 = "Indikasi kloning matriks tinggi" if match_score < 5.0 else "Struktur piksel unik" | |
| # [Step 11/12] Analisis Pola Frekuensi GAN | |
| f_transform = np.fft.fft2(img_gray) | |
| f_shift = np.fft.fftshift(f_transform) | |
| magnitude_spectrum = 20 * np.log(np.abs(f_shift) + 1) | |
| mean_freq = float(np.mean(magnitude_spectrum)) | |
| step11 = f"Amplitudo rata-rata: {mean_freq:.2f} dB" | |
| # [Step 12/12] Inspeksi Tingkat Error (ELA) | |
| _, encoded_img = cv2.imencode('.jpg', img_cv, [cv2.IMWRITE_JPEG_QUALITY, 95]) | |
| ela_img = cv2.imdecode(encoded_img, cv2.IMREAD_COLOR) | |
| ela_score = float(np.mean(cv2.absdiff(img_cv, ela_img))) | |
| step12 = f"Rasio eror kompresi: {ela_score:.4f}" | |
| if ela_score > 1.5: | |
| poin_penalti_fake += 1 | |
| # ======================================================= | |
| # KETOK PALU KEPUTUSAN GABUNGAN MATEMATIKA MULTI-EVIDENCE | |
| # ======================================================= | |
| if poin_penalti_fake >= 3: | |
| prediction = "AI" # Menyesuaikan label tampilan sistem frontend web Anda ("AI" / "REAL") | |
| confidence = (poin_penalti_fake / total_aturan) * 100 | |
| else: | |
| prediction = "REAL" | |
| confidence = ((total_aturan - poin_penalti_fake) / total_aturan) * 100 | |
| forensic_logs = { | |
| "step_1": f"[Step 1/12] Metadata: {step1}", | |
| "step_2": f"[Step 2/12] Analisis Pixel (Komite Binary): {step2}", | |
| "step_3": f"[Step 3/12] Analisis Pola Sensor CFA: {step3}", | |
| "step_4": f"[Step 4/12] Pencarian jejak Hex/Binary: {step4}", | |
| "step_5": f"[Step 5/12] Pemetaan Noise: {step5}", | |
| "step_6": f"[Step 6/12] Analisis Geometri: {step6}", | |
| "step_7": f"[Step 7/12] Pencarian Artifact Visual: {step7}", | |
| "step_8": f"[Step 8/12] Verifikasi Tipe File: {step8}", | |
| "step_9": f"[Step 9/12] Analisis Konsistensi Pencahayaan: {step9}", | |
| "step_10": f"[Step 10/12] Pemindaian Duplikasi Pixel: {step10}", | |
| "step_11": f"[Step 11/12] Analisis Pola Frekuensi GAN: {step11}", | |
| "step_12": f"[Step 12/12] Inspeksi Tingkat Error (ELA): {step12}" | |
| } | |
| return prediction, round(confidence, 2), round(prob_fake_raw * 100, 2), forensic_logs, is_monochrome, poin_penalti_fake | |
| # ========================================== | |
| # 🎛️ ENDPOINTS ROUTING | |
| # ========================================== | |
| def root(): | |
| return { | |
| "status": "online", | |
| "engine": "Hybrid Integrated Forensic Engine (WhatsApp Bypass Fixed)", | |
| "models_loaded": len(models_ensemble) == 2, | |
| "hybrid_voting_rules_count": 5 | |
| } | |
| async def predict(file: UploadFile = File(...)): | |
| if len(models_ensemble) < 2: | |
| raise HTTPException(status_code=503, detail="Model server belum siap di folder lokal") | |
| filename = file.filename or "image.jpg" | |
| ext = filename.lower().split(".")[-1] if "." in filename else "" | |
| if ext not in ALLOWED_EXT: | |
| raise HTTPException(status_code=400, detail="Format file wajib png/jpg/jpeg/webp") | |
| try: | |
| file_bytes = await file.read() | |
| img_pil = Image.open(io.BytesIO(file_bytes)) | |
| nparr = np.frombuffer(file_bytes, np.uint8) | |
| img_cv = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| if img_cv is None: | |
| raise ValueError("File korup atau struktur piksel tidak valid") | |
| # Eksekusi pipeline terintegrasi baru dengan parameter operan filename | |
| prediction, confidence, raw_score, forensic_steps, is_monochrome, active_penalties = run_full_forensic_pipeline( | |
| img_pil, img_cv, file_bytes, filename | |
| ) | |
| return { | |
| "filename": filename, | |
| "prediction": prediction, | |
| "confidence": f"{confidence}%", | |
| "raw_deep_learning_score": f"{raw_score}%", | |
| "active_fake_indicators": f"{active_penalties} dari 5", | |
| "is_monochrome_detected": is_monochrome, | |
| "forensic_analysis_logs": forensic_steps | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Gagal memproses analisis gabungan forensik: {str(e)}") | |
| async def save_feedback(file: UploadFile = File(...), correct_label: str = Form(...)): | |
| label = correct_label.strip().upper() | |
| if label not in {"REAL", "AI", "FAKE"}: | |
| raise HTTPException(status_code=400, detail="Label harus REAL / FAKE") | |
| folder = "real" if label == "REAL" else "fake" | |
| save_path = os.path.join(FEEDBACK_DIR, folder, file.filename) | |
| try: | |
| contents = await file.read() | |
| with open(save_path, "wb") as f: | |
| f.write(contents) | |
| return {"status": "saved", "path": save_path} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Gagal menyimpan berkas feedback: {str(e)}") | |
| if __name__ == "__main__": | |
| uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) | |