Alstears's picture
Update app.py
a4bfda2 verified
import io
import os
import warnings
import numpy as np
import timm
import torch
import torch.nn.functional as F
import cv2
import uvicorn
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from PIL import Image
import torchvision.transforms as transforms
warnings.filterwarnings("ignore")
# ==========================================
# ⚙️ CONFIGURATION (MODE LOKAL / PRODUCTION)
# ==========================================
APP_TITLE = "AI Forensic Detector API"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
FEEDBACK_DIR = "feedback"
ALLOWED_EXT = {"png", "jpg", "jpeg", "webp"}
# Jalur "." membaca berkas model .pth yang berada satu folder dengan app.py
ROOT_DIR = "."
MODEL_FILES = ["ckpt_best_v4_epoch8.pth", "ckpt_best_v4_epoch14.pth"]
# Threshold murni untuk filter internal model Deep Learning
MODEL_THRESHOLD = 0.615
# ==========================================
# 🚀 APP INIT & CORS MIDDLEWARE
# ==========================================
app = FastAPI(title=APP_TITLE)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
os.makedirs(os.path.join(FEEDBACK_DIR, "real"), exist_ok=True)
os.makedirs(os.path.join(FEEDBACK_DIR, "fake"), exist_ok=True)
# ==========================================
# 📦 GLOBAL MODELS & TRANSFORMS
# ==========================================
models_ensemble = []
val_tf = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
# ==========================================
# 🔗 LOAD MODELS FUNCTION
# ==========================================
def load_ensemble_models():
global models_ensemble
models_ensemble = []
print(f"🖥️ ENGINE COMPUTATION RUNNING ON: {DEVICE}")
print(f"📂 MEMERIKSA JALUR PROYEK LOKAL: {os.path.abspath(ROOT_DIR)}")
print("⏳ Memuat infrastruktur model deep learning...")
for f_name in MODEL_FILES:
path = os.path.join(ROOT_DIR, f_name)
abs_path = os.path.abspath(path)
if not os.path.exists(abs_path):
raise FileNotFoundError(f"File model ensemble tidak ditemukan di folder proyek: {abs_path}")
print(f"📦 Loading komponen model: {f_name}...")
m = timm.create_model('efficientnet_b0', pretrained=False, num_classes=2)
ckpt = torch.load(abs_path, map_location=DEVICE)
state_dict = ckpt["state_dict"] if isinstance(ckpt, dict) and "state_dict" in ckpt else ckpt
m.load_state_dict(state_dict)
m.to(DEVICE).eval()
models_ensemble.append(m)
print(f"✅ Sistem Ensemble Siap! ({len(models_ensemble)} Model Terintegrasi)")
@app.on_event("startup")
def startup_event():
try:
load_ensemble_models()
except Exception as e:
print(f"❌ Gagal memuat arsitektur ensemble: {e}")
raise RuntimeError(f"Startup digagalkan server: {e}")
# ==========================================
# 🔬 LAB FORENSIK DIGITAL - WITH WHATSAPP BYPASS
# ==========================================
def run_full_forensic_pipeline(img_pil: Image.Image, img_cv, file_bytes, filename):
img_gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
h, w, c = img_cv.shape
# Inisialisasi Poin Penalti untuk Penggabungan Keputusan
poin_penalti_fake = 0
total_aturan = 5
# Deteksi Otomatis Jalur Kompresi WhatsApp via Nama Berkas
fname_lower = filename.lower()
is_whatsapp = "whatsapp" in fname_lower or "img-" in fname_lower or "wa" in fname_lower
# --- DETEKSI OTOMATIS GAMBAR MONOKROM ---
b, g, r = cv2.split(img_cv)
is_monochrome = bool(np.max(np.abs(b.astype(int) - g.astype(int))) < 5 and \
np.max(np.abs(g.astype(int) - r.astype(int))) < 5)
# [Step 1/12] Metadata
exif = img_pil._getexif()
step1 = "Ada metadata EXIF (Kamera Asli)" if exif else "Metadata kosong (Khas Gambar AI/Screenshot)"
if not exif and not is_whatsapp:
# Penalti dinonaktifkan untuk WhatsApp karena kompresi sistem WA pasti menghapus EXIF
poin_penalti_fake += 1
# [Step 2/12] Analisis Pixel (Komite Binary + TTA)
img_rgb = img_pil.convert("RGB")
aug_images = [val_tf(img_rgb), val_tf(img_rgb.transpose(Image.FLIP_LEFT_RIGHT))]
batch_t = torch.stack(aug_images).to(DEVICE)
with torch.no_grad():
preds8 = F.softmax(models_ensemble[0](batch_t), dim=1).cpu().numpy()
probs_epoch8 = np.mean(preds8, axis=0)
preds14 = F.softmax(models_ensemble[1](batch_t), dim=1).cpu().numpy()
probs_epoch14 = np.mean(preds14, axis=0)
prob_fake_raw = float((0.4 * probs_epoch8[1]) + (0.6 * probs_epoch14[1]))
step2 = f"Score Indikasi AI: {prob_fake_raw * 100:.1f}%"
# Aturan Khusus WhatsApp: Naikkan batas threshold model AI agar tidak sensitif pada noise rusak WA
THRESHOLD_MODEL_ACTUAL = 0.85 if is_whatsapp else MODEL_THRESHOLD
if prob_fake_raw >= THRESHOLD_MODEL_ACTUAL:
poin_penalti_fake += 1
# [Step 3/12] Analisis Pola Sensor CFA
diff_cfa = np.mean(np.abs(img_gray[1:, :] - img_gray[:-1, :]))
step3 = "Pola interpolasi mulus alami" if diff_cfa < 15.0 else "Anomali interpolasi piksel buatan terdeteksi"
if diff_cfa >= 15.0 and not is_whatsapp:
# Penalti dinonaktifkan untuk WhatsApp karena kompresi WA merusak interpolasi alami piksel sensor
poin_penalti_fake += 1
# [Step 4/12] Pencarian jejak Hex/Binary
ai_signatures = [b"midjourney", b"stable diffusion", b"adobe firefly", b"dall-e", b"photoshop"]
found_sigs = [sig.decode() for sig in ai_signatures if sig in file_bytes.lower()]
step4 = f"Ditemukan string biner: {found_sigs}" if found_sigs else "Biner bersih dari signature generator AI"
if found_sigs:
poin_penalti_fake += 1
# [Step 5/12] Pemetaan Noise
laplacian_var = float(cv2.Laplacian(img_gray, cv2.CV_64F).var())
step5 = f"Varians noise lokal: {laplacian_var:.2f}"
# [Step 6/12] Analisis Geometri
aspect_ratio = w / h
step6 = f"Dimensi berkas: {w}x{h} (Rasio: {aspect_ratio:.2f})"
# [Step 7/12] Pencarian Artifact Visual
edges = cv2.Canny(img_gray, 100, 200)
edge_density = float(np.sum(edges > 0) / (h * w))
step7 = f"Kepadatan tekstur tepi: {edge_density*100:.2f}%"
# [Step 8/12] Verifikasi Tipe File via Magic Numbers
header_bytes = file_bytes[:4]
if header_bytes.startswith(b'\xff\xd8\xff'): real_type = "Murni JPEG"
elif header_bytes.startswith(b'\x89PNG'): real_type = "Murni PNG"
elif b'WEBP' in header_bytes: real_type = "Murni WEBP"
else: real_type = "Format Termodifikasi"
step8 = f"Tipe biner asli: {real_type}"
# [Step 9/12] Analisis Konsistensi Pencahayaan
skewness = float(np.mean(img_gray) - np.median(img_gray))
step9 = "Pencahayaan seimbang alami" if abs(skewness) < 10 else "Pencahayaan timpang (Khas editing/AI)"
# [Step 10/12] Pemindaian Duplikasi Pixel
downsample = cv2.resize(img_gray, (64, 64))
match_score = np.mean(np.abs(downsample - np.fliplr(downsample)))
step10 = "Indikasi kloning matriks tinggi" if match_score < 5.0 else "Struktur piksel unik"
# [Step 11/12] Analisis Pola Frekuensi GAN
f_transform = np.fft.fft2(img_gray)
f_shift = np.fft.fftshift(f_transform)
magnitude_spectrum = 20 * np.log(np.abs(f_shift) + 1)
mean_freq = float(np.mean(magnitude_spectrum))
step11 = f"Amplitudo rata-rata: {mean_freq:.2f} dB"
# [Step 12/12] Inspeksi Tingkat Error (ELA)
_, encoded_img = cv2.imencode('.jpg', img_cv, [cv2.IMWRITE_JPEG_QUALITY, 95])
ela_img = cv2.imdecode(encoded_img, cv2.IMREAD_COLOR)
ela_score = float(np.mean(cv2.absdiff(img_cv, ela_img)))
step12 = f"Rasio eror kompresi: {ela_score:.4f}"
if ela_score > 1.5:
poin_penalti_fake += 1
# =======================================================
# KETOK PALU KEPUTUSAN GABUNGAN MATEMATIKA MULTI-EVIDENCE
# =======================================================
if poin_penalti_fake >= 3:
prediction = "AI" # Menyesuaikan label tampilan sistem frontend web Anda ("AI" / "REAL")
confidence = (poin_penalti_fake / total_aturan) * 100
else:
prediction = "REAL"
confidence = ((total_aturan - poin_penalti_fake) / total_aturan) * 100
forensic_logs = {
"step_1": f"[Step 1/12] Metadata: {step1}",
"step_2": f"[Step 2/12] Analisis Pixel (Komite Binary): {step2}",
"step_3": f"[Step 3/12] Analisis Pola Sensor CFA: {step3}",
"step_4": f"[Step 4/12] Pencarian jejak Hex/Binary: {step4}",
"step_5": f"[Step 5/12] Pemetaan Noise: {step5}",
"step_6": f"[Step 6/12] Analisis Geometri: {step6}",
"step_7": f"[Step 7/12] Pencarian Artifact Visual: {step7}",
"step_8": f"[Step 8/12] Verifikasi Tipe File: {step8}",
"step_9": f"[Step 9/12] Analisis Konsistensi Pencahayaan: {step9}",
"step_10": f"[Step 10/12] Pemindaian Duplikasi Pixel: {step10}",
"step_11": f"[Step 11/12] Analisis Pola Frekuensi GAN: {step11}",
"step_12": f"[Step 12/12] Inspeksi Tingkat Error (ELA): {step12}"
}
return prediction, round(confidence, 2), round(prob_fake_raw * 100, 2), forensic_logs, is_monochrome, poin_penalti_fake
# ==========================================
# 🎛️ ENDPOINTS ROUTING
# ==========================================
@app.get("/")
def root():
return {
"status": "online",
"engine": "Hybrid Integrated Forensic Engine (WhatsApp Bypass Fixed)",
"models_loaded": len(models_ensemble) == 2,
"hybrid_voting_rules_count": 5
}
@app.post("/predict")
async def predict(file: UploadFile = File(...)):
if len(models_ensemble) < 2:
raise HTTPException(status_code=503, detail="Model server belum siap di folder lokal")
filename = file.filename or "image.jpg"
ext = filename.lower().split(".")[-1] if "." in filename else ""
if ext not in ALLOWED_EXT:
raise HTTPException(status_code=400, detail="Format file wajib png/jpg/jpeg/webp")
try:
file_bytes = await file.read()
img_pil = Image.open(io.BytesIO(file_bytes))
nparr = np.frombuffer(file_bytes, np.uint8)
img_cv = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img_cv is None:
raise ValueError("File korup atau struktur piksel tidak valid")
# Eksekusi pipeline terintegrasi baru dengan parameter operan filename
prediction, confidence, raw_score, forensic_steps, is_monochrome, active_penalties = run_full_forensic_pipeline(
img_pil, img_cv, file_bytes, filename
)
return {
"filename": filename,
"prediction": prediction,
"confidence": f"{confidence}%",
"raw_deep_learning_score": f"{raw_score}%",
"active_fake_indicators": f"{active_penalties} dari 5",
"is_monochrome_detected": is_monochrome,
"forensic_analysis_logs": forensic_steps
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Gagal memproses analisis gabungan forensik: {str(e)}")
@app.post("/save-feedback")
async def save_feedback(file: UploadFile = File(...), correct_label: str = Form(...)):
label = correct_label.strip().upper()
if label not in {"REAL", "AI", "FAKE"}:
raise HTTPException(status_code=400, detail="Label harus REAL / FAKE")
folder = "real" if label == "REAL" else "fake"
save_path = os.path.join(FEEDBACK_DIR, folder, file.filename)
try:
contents = await file.read()
with open(save_path, "wb") as f:
f.write(contents)
return {"status": "saved", "path": save_path}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Gagal menyimpan berkas feedback: {str(e)}")
if __name__ == "__main__":
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)