Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| import shutil, os, time, uuid, zipfile | |
| import database | |
| import httpx | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| FEEDBACK_DIR = os.path.join(BASE_DIR, "feedback") | |
| PENDING_DIR = os.path.join(FEEDBACK_DIR, "pending") | |
| os.makedirs(f"{FEEDBACK_DIR}/real", exist_ok=True) | |
| os.makedirs(f"{FEEDBACK_DIR}/fake", exist_ok=True) | |
| os.makedirs(PENDING_DIR, exist_ok=True) | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Serve static images | |
| app.mount("/images", StaticFiles(directory=os.path.join(BASE_DIR, "images")), name="images") | |
| def root(): | |
| with open(os.path.join(BASE_DIR, "index.html"), encoding="utf-8") as f: | |
| return HTMLResponse(f.read()) | |
| def serve_css(): | |
| with open(os.path.join(BASE_DIR, "style.css"), encoding="utf-8") as f: | |
| return HTMLResponse(f.read(), media_type="text/css") | |
| def serve_template_css(): | |
| with open(os.path.join(BASE_DIR, "template", "style.css"), encoding="utf-8") as f: | |
| return HTMLResponse(f.read(), media_type="text/css") | |
| def serve_js(): | |
| with open(os.path.join(BASE_DIR, "script.js"), encoding="utf-8") as f: | |
| return HTMLResponse(f.read(), media_type="application/javascript") | |
| def startup(): | |
| database.init_db() | |
| database.migrate_existing_learning_data() | |
| database.sync_scan_history_to_test_results() | |
| def api_register(username: str = Form(...), password: str = Form(...), name: str = Form(...)): | |
| if database.register_user(username, password, name): | |
| return {"status": "success", "message": "Registrasi berhasil!"} | |
| raise HTTPException(status_code=400, detail="Username sudah digunakan") | |
| def api_login(username: str = Form(...), password: str = Form(...)): | |
| user = database.login_user(username, password) | |
| if user: | |
| return { | |
| "status": "success", | |
| "name": user["name"], | |
| "username": user["username"], | |
| "trust_score": user["trust_score"] if "trust_score" in user and user["trust_score"] is not None else 50 | |
| } | |
| raise HTTPException(status_code=401, detail="Username atau password salah") | |
| import math | |
| from PIL import Image | |
| def get_color_feature_vector(img_path): | |
| try: | |
| with Image.open(img_path) as img: | |
| img = img.resize((64, 64)) | |
| hist = img.histogram() | |
| bins = [] | |
| for i in range(0, len(hist), 32): | |
| bins.append(sum(hist[i:i+32])) | |
| return bins | |
| except Exception: | |
| return [1.0] * 24 | |
| def cosine_similarity(v1, v2): | |
| dot_product = sum(a * b for a, b in zip(v1, v2)) | |
| norm_v1 = math.sqrt(sum(a * a for a in v1)) | |
| norm_v2 = math.sqrt(sum(b * b for b in v2)) | |
| if norm_v1 == 0 or norm_v2 == 0: | |
| return 0.0 | |
| return dot_product / (norm_v1 * norm_v2) | |
| # Reference vector representing typical digital photo color centroid | |
| REF_VECTOR = [100.0, 150.0, 200.0, 180.0, 120.0, 90.0, 80.0, 110.0, 130.0, 140.0, 160.0, 170.0, 190.0, 210.0, 220.0, 230.0, 240.0, 250.0, 200.0, 150.0, 100.0, 80.0, 60.0, 40.0] | |
| def analyze_image_conditions(img_path): | |
| try: | |
| with Image.open(img_path) as img: | |
| img_rgb = img.convert('RGB') | |
| img_small = img_rgb.resize((32, 32)) | |
| pixels = list(img_small.getdata()) | |
| grayscale_diffs = [] | |
| brightness_vals = [] | |
| for r, g, b in pixels: | |
| brightness = 0.299*r + 0.587*g + 0.114*b | |
| brightness_vals.append(brightness) | |
| diff = abs(r - g) + abs(g - b) + abs(b - r) | |
| grayscale_diffs.append(diff) | |
| avg_brightness = sum(brightness_vals) / len(brightness_vals) | |
| avg_diff = sum(grayscale_diffs) / len(grayscale_diffs) | |
| is_dark = 1 if avg_brightness < 45 else 0 | |
| is_grayscale = 1 if avg_diff < 12 else 0 | |
| return bool(is_dark), bool(is_grayscale), round(avg_brightness, 1) | |
| except Exception: | |
| return False, False, 127.0 | |
| def save_compressed_image(source_path, target_path, max_size=(512, 512)): | |
| try: | |
| with Image.open(source_path) as img: | |
| if img.mode in ("RGBA", "P"): | |
| img = img.convert("RGB") | |
| img.thumbnail(max_size, Image.Resampling.LANCZOS) | |
| ext = target_path.split('.')[-1].lower() | |
| fmt = "PNG" if ext == "png" else "JPEG" | |
| if fmt == "JPEG": | |
| img.save(target_path, "JPEG", quality=80, optimize=True) | |
| else: | |
| img.save(target_path, "PNG", optimize=True) | |
| return True | |
| except Exception: | |
| import shutil | |
| shutil.copy2(source_path, target_path) | |
| return False | |
| HF_API_URL = "https://alstears-ai-forensic-detector.hf.space/predict" | |
| def api_scan_image(file: UploadFile = File(...), username: str = Form(...)): | |
| if not file.filename.lower().endswith(('png', 'jpg', 'jpeg', 'webp')): | |
| raise HTTPException(status_code=400, detail="Format gambar tidak didukung") | |
| uid = uuid.uuid4().hex[:8] | |
| safe_name = file.filename.replace("\\", "/").split("/")[-1] | |
| temp_path = f"temp_{uid}_{safe_name}" | |
| with open(temp_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| try: | |
| with open(temp_path, "rb") as f: | |
| resp = httpx.post(HF_API_URL, files={"file": (file.filename, f, "image/jpeg")}, timeout=30) | |
| if resp.status_code != 200: | |
| raise HTTPException(status_code=502, detail="Gagal menghubungi AI detector") | |
| result = resp.json() | |
| prediction = result.get("prediction", "REAL") | |
| confidence_str = result.get("confidence", "0.0%") | |
| try: | |
| if isinstance(confidence_str, str): | |
| confidence_val = float(confidence_str.replace("%", "").strip()) / 100.0 | |
| else: | |
| confidence_val = float(confidence_str) | |
| except Exception: | |
| confidence_val = 0.85 # fallback | |
| is_ai = prediction.upper() in ("AI", "FAKE") | |
| source = "Pollinations AI (Stable Diffusion)" if is_ai else "Kamera/Foto Digital Asli" | |
| accuracy = round(confidence_val * 100, 1) | |
| feedback_path = f"{PENDING_DIR}/{uid}_{safe_name}" | |
| save_compressed_image(temp_path, feedback_path) | |
| file_size = os.path.getsize(temp_path) | |
| if username != "__guest__": | |
| database.add_scan_history(username, file.filename, "Image", f"{file_size/(1024*1024):.2f} MB", source, is_ai, accuracy) | |
| # Calculate color similarity & outlier detection (Solusi 2) | |
| vector = get_color_feature_vector(temp_path) | |
| similarity = cosine_similarity(vector, REF_VECTOR) | |
| similarity = round(similarity, 3) | |
| is_outlier = 1 if similarity < 0.88 else 0 | |
| # Detect low-light and monochrome conditions | |
| is_dark, is_grayscale_local, avg_brightness = analyze_image_conditions(temp_path) | |
| is_monochrome_detected = bool(result.get("is_monochrome_detected", False)) | |
| is_grayscale = is_monochrome_detected | |
| # Check for Trap image (Solusi 3) | |
| is_trap = 1 if "trap" in file.filename.lower() else 0 | |
| # Get user's current trust score | |
| trust_score = 50 | |
| if username != "__guest__": | |
| conn = database.get_connection() | |
| user_row = conn.execute("SELECT trust_score FROM users WHERE username=?", (username,)).fetchone() | |
| trust_score = user_row["trust_score"] if user_row else 50 | |
| conn.close() | |
| # --- SINKRONISASI AKURASI GAMBAR TUNGGAL (Poin 2) --- | |
| # Coba tebak ground truth (REAL/AI) dari nama file (misal: real11.jpg, fake4.jpg) | |
| prediction_label = "AI" if is_ai else "REAL" | |
| inferred_label = None | |
| fn_lower = file.filename.lower() | |
| if "real" in fn_lower: | |
| inferred_label = "REAL" | |
| elif "fake" in fn_lower or "ai" in fn_lower: | |
| inferred_label = "AI" | |
| if inferred_label and username != "__guest__": | |
| is_mismatch = 1 if prediction_label != inferred_label else 0 | |
| database.save_test_result( | |
| None, username, file.filename, inferred_label, | |
| prediction_label, accuracy, is_mismatch | |
| ) | |
| # Simpan data pembelajaran mismatch jika tebakan salah | |
| if is_mismatch: | |
| database.save_learning_data( | |
| username, file.filename, prediction_label, inferred_label, accuracy, | |
| source="single_mismatch" | |
| ) | |
| return { | |
| "status": "success", | |
| "filename": file.filename, | |
| "feedback_id": uid, | |
| "type": "image", | |
| "file_size": f"{file_size/(1024*1024):.2f} MB", | |
| "source": source, | |
| "is_ai": is_ai, | |
| "accuracy": accuracy, | |
| "date": time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "similarity": similarity, | |
| "is_outlier": bool(is_outlier), | |
| "is_trap": bool(is_trap), | |
| "trust_score": trust_score, | |
| "is_dark": is_dark, | |
| "is_grayscale": is_grayscale, | |
| "is_monochrome_detected": is_monochrome_detected, | |
| "avg_brightness": avg_brightness, | |
| "forensic_analysis_logs": result.get("forensic_analysis_logs", {}) | |
| } | |
| finally: | |
| if os.path.exists(temp_path): os.remove(temp_path) | |
| def scan_single_image(file_bytes, filename): | |
| resp = httpx.post(HF_API_URL, files={"file": (filename, file_bytes, "image/jpeg")}, timeout=30) | |
| if resp.status_code != 200: | |
| return None | |
| return resp.json() | |
| async def api_batch_scan(files: list[UploadFile] = File(...), username: str = Form(...), labels: str = Form("[]")): | |
| import json as json_mod | |
| try: | |
| parsed_labels = json_mod.loads(labels) | |
| except: | |
| parsed_labels = [] | |
| results = [] | |
| saved_bytes = {} | |
| for idx, file in enumerate(files): | |
| ext = file.filename.lower().split('.')[-1] | |
| if ext not in ('png', 'jpg', 'jpeg', 'webp'): | |
| continue | |
| folder_label = None | |
| # Safe lookup in dictionary map or fallback to list | |
| if isinstance(parsed_labels, dict): | |
| # Try exact match, then fallback to case-insensitive match | |
| folder_label = parsed_labels.get(file.filename) | |
| if not folder_label: | |
| # Extract just the base filename in case of relative path difference | |
| base_filename = file.filename.replace("\\", "/").split("/")[-1] | |
| for k, v in parsed_labels.items(): | |
| k_base = k.replace("\\", "/").split("/")[-1] | |
| if k_base.lower() == base_filename.lower(): | |
| folder_label = v | |
| break | |
| elif isinstance(parsed_labels, list) and idx < len(parsed_labels): | |
| folder_label = parsed_labels[idx] | |
| # Standardize folder label to uppercase | |
| if folder_label: | |
| folder_label_upper = str(folder_label).upper() | |
| folder_label = "AI" if folder_label_upper in ("FAKE", "AI") else "REAL" | |
| bytes_data = await file.read() | |
| try: | |
| resp = httpx.post(HF_API_URL, files={"file": (file.filename, bytes_data, "image/jpeg")}, timeout=30) | |
| if resp.status_code != 200: | |
| results.append({"filename": file.filename, "folder_label": folder_label, "error": "Gagal scan"}) | |
| continue | |
| result = resp.json() | |
| prediction = result.get("prediction", "REAL") | |
| confidence_str = result.get("confidence", "0.0%") | |
| try: | |
| if isinstance(confidence_str, str): | |
| confidence_val = float(confidence_str.replace("%", "").strip()) / 100.0 | |
| else: | |
| confidence_val = float(confidence_str) | |
| except Exception: | |
| confidence_val = 0.85 # fallback | |
| prediction_label = "AI" if prediction.upper() in ("AI", "FAKE") else "REAL" | |
| confidence_pct = round(confidence_val * 100, 1) | |
| is_mismatch = 0 | |
| if folder_label: | |
| expected = "AI" if folder_label.upper() in ("FAKE", "AI") else "REAL" | |
| if prediction_label != expected: | |
| is_mismatch = 1 | |
| results.append({ | |
| "filename": file.filename, | |
| "folder_label": folder_label, | |
| "prediction": prediction_label, | |
| "confidence": confidence_pct, | |
| "is_mismatch": is_mismatch | |
| }) | |
| if is_mismatch and folder_label: | |
| uid = uuid.uuid4().hex[:8] | |
| safe_name = file.filename.replace("\\", "/").split("/")[-1] | |
| pending_path = f"{PENDING_DIR}/{uid}_{safe_name}" | |
| with open(pending_path, "wb") as pf: | |
| pf.write(bytes_data) | |
| saved_bytes[file.filename] = uid | |
| results[-1]["feedback_id"] = uid | |
| else: | |
| results[-1]["feedback_id"] = "" | |
| except Exception as e: | |
| results.append({"filename": file.filename, "folder_label": folder_label, "error": str(e)}) | |
| total = len(results) | |
| has_labels = any(r.get("folder_label") is not None for r in results) | |
| if has_labels: | |
| mismatches = [r for r in results if r.get("is_mismatch")] | |
| mismatch_count = len(mismatches) | |
| correct_count = total - mismatch_count | |
| accuracy = round((correct_count / total * 100), 1) if total > 0 else 0 | |
| else: | |
| mismatch_count = 0 | |
| correct_count = 0 | |
| accuracy = -1.0 | |
| batch_id = database.create_test_batch(username, total, correct_count, mismatch_count, accuracy) | |
| for r in results: | |
| database.save_test_result( | |
| batch_id, username, r["filename"], r.get("folder_label"), | |
| r.get("prediction", "ERROR"), r.get("confidence", 0.0), r.get("is_mismatch", 0) | |
| ) | |
| if r.get("is_mismatch") and r.get("folder_label"): | |
| expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL" | |
| database.save_learning_data( | |
| username, r["filename"], r.get("prediction", "ERROR"), expected, r.get("confidence", 0.0), | |
| source="batch_mismatch" | |
| ) | |
| fid = r.get("feedback_id", "") | |
| if fid: | |
| safe_name = r['filename'].replace("\\", "/").split("/")[-1] | |
| pending_file = f"{PENDING_DIR}/{fid}_{safe_name}" | |
| target_dir = f"{FEEDBACK_DIR}/real" if expected == "REAL" else f"{FEEDBACK_DIR}/fake" | |
| target_path = f"{target_dir}/{safe_name}" | |
| if os.path.exists(pending_file): | |
| os.makedirs(target_dir, exist_ok=True) | |
| shutil.move(pending_file, target_path) | |
| return { | |
| "batch_id": batch_id, | |
| "total": total, | |
| "correct": correct_count, | |
| "wrong": mismatch_count, | |
| "accuracy": accuracy, | |
| "results": results, | |
| "needs_confirmation": mismatch_count if 1 <= mismatch_count <= 5 else 0 | |
| } | |
| def api_batch_confirm(data: dict): | |
| batch_id = data.get("batch_id") | |
| corrections = data.get("corrections", []) | |
| results = database.get_test_results_by_batch(batch_id) | |
| for corr in corrections: | |
| idx = corr.get("index") | |
| user_answer = corr.get("user_answer") | |
| if idx < len(results): | |
| r = results[idx] | |
| corrected_label = user_answer.upper() | |
| original_prediction = r["prediction"] | |
| confidence = r["confidence"] | |
| database.update_test_result_correction(r["id"], corrected_label, corrected_label) | |
| if original_prediction != corrected_label: | |
| database.save_learning_data( | |
| r["username"], r["filename"], | |
| original_prediction, corrected_label, confidence | |
| ) | |
| results = database.get_test_results_by_batch(batch_id) | |
| total_with_label = 0 | |
| correct = 0 | |
| for r in results: | |
| if not r["folder_label"]: | |
| continue | |
| total_with_label += 1 | |
| final_label = r.get("corrected_label") or r["prediction"] | |
| expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL" | |
| if final_label == expected: | |
| correct += 1 | |
| wrong = total_with_label - correct | |
| accuracy = round((correct / total_with_label * 100), 1) if total_with_label > 0 else 0 | |
| return {"status": "success", "total": total_with_label, "correct": correct, "wrong": wrong, "accuracy": accuracy} | |
| def forward_feedback_to_hf_space(file_path: str, filename: str, correct_label: str): | |
| if not os.path.exists(file_path): | |
| return | |
| try: | |
| hf_label = "AI" if correct_label.upper() in ("FAKE", "AI") else "REAL" | |
| with open(file_path, "rb") as f: | |
| files = {"file": (filename, f, "image/jpeg")} | |
| data = {"correct_label": hf_label} | |
| with httpx.Client() as client: | |
| res = client.post("https://alstears-ai-forensic-detector.hf.space/save-feedback", files=files, data=data, timeout=15.0) | |
| print(f"Feedback successfully forwarded to Hugging Face space. Status: {res.status_code}, Response: {res.text}") | |
| except Exception as e: | |
| print(f"Error forwarding feedback to Hugging Face space: {e}") | |
| def api_correction_single(data: dict): | |
| username = data.get("username") | |
| filename = data.get("filename") | |
| original_prediction = data.get("original_prediction") | |
| correct_label = data.get("correct_label") | |
| confidence = data.get("confidence", 0) | |
| feedback_id = data.get("feedback_id") | |
| # Standardize correct_label to uppercase | |
| correct_label = "AI" if str(correct_label).upper() in ("FAKE", "AI") else "REAL" | |
| original_prediction = "AI" if str(original_prediction).upper() in ("FAKE", "AI") else "REAL" | |
| # Determine if it is a trap image and adjust trust score | |
| is_trap = "trap" in filename.lower() | |
| trap_correct = False | |
| trust_change = 0 | |
| new_trust = 50 | |
| if username == "__guest__": | |
| if feedback_id: | |
| safe_name = filename.replace("\\", "/").split("/")[-1] | |
| pending_file = f"{PENDING_DIR}/{feedback_id}_{safe_name}" | |
| target_dir = f"{FEEDBACK_DIR}/real" if correct_label == "REAL" else f"{FEEDBACK_DIR}/fake" | |
| target_path = f"{target_dir}/{safe_name}" | |
| if os.path.exists(pending_file): | |
| os.makedirs(target_dir, exist_ok=True) | |
| save_compressed_image(pending_file, target_path) | |
| # Forward feedback to HF Space | |
| forward_feedback_to_hf_space(pending_file, safe_name, correct_label) | |
| os.remove(pending_file) | |
| return { | |
| "status": "success", | |
| "is_trap": is_trap, | |
| "trap_correct": trap_correct, | |
| "trust_change": trust_change, | |
| "new_trust": new_trust | |
| } | |
| if is_trap: | |
| if "trap_real" in filename.lower(): | |
| true_label = "REAL" | |
| elif "trap_ai" in filename.lower() or "trap_fake" in filename.lower(): | |
| true_label = "AI" | |
| else: | |
| true_label = "REAL" if original_prediction == "AI" else "AI" | |
| if correct_label == true_label: | |
| trap_correct = True | |
| trust_change = 5 | |
| else: | |
| trap_correct = False | |
| trust_change = -15 | |
| conn = database.get_connection() | |
| user_row = conn.execute("SELECT trust_score FROM users WHERE username=?", (username,)).fetchone() | |
| if user_row: | |
| current_trust = user_row["trust_score"] if user_row["trust_score"] is not None else 50 | |
| new_trust = max(0, min(100, current_trust + trust_change)) | |
| conn.execute("UPDATE users SET trust_score=? WHERE username=?", (new_trust, username)) | |
| conn.commit() | |
| conn.close() | |
| # Save to learning data for retraining | |
| database.save_learning_data(username, filename, original_prediction, correct_label, confidence) | |
| # Update or insert into test_results | |
| conn = database.get_connection() | |
| existing = conn.execute("SELECT id FROM test_results WHERE username=? AND filename=? AND batch_id IS NULL", | |
| (username, filename)).fetchone() | |
| is_mismatch = 1 if original_prediction != correct_label else 0 | |
| if existing: | |
| conn.execute("UPDATE test_results SET folder_label=?, prediction=?, confidence=?, is_mismatch=?, corrected_label=? WHERE id=?", | |
| (correct_label, original_prediction, confidence, is_mismatch, correct_label, existing["id"])) | |
| conn.commit() | |
| else: | |
| database.save_test_result(None, username, filename, correct_label, original_prediction, confidence, is_mismatch) | |
| conn.close() | |
| if feedback_id: | |
| safe_name = filename.replace("\\", "/").split("/")[-1] | |
| pending_file = f"{PENDING_DIR}/{feedback_id}_{safe_name}" | |
| target_dir = f"{FEEDBACK_DIR}/real" if correct_label == "REAL" else f"{FEEDBACK_DIR}/fake" | |
| target_path = f"{target_dir}/{safe_name}" | |
| if os.path.exists(pending_file): | |
| os.makedirs(target_dir, exist_ok=True) | |
| save_compressed_image(pending_file, target_path) | |
| # Forward feedback to HF Space | |
| forward_feedback_to_hf_space(pending_file, safe_name, correct_label) | |
| os.remove(pending_file) | |
| return { | |
| "status": "success", | |
| "is_trap": is_trap, | |
| "trap_correct": trap_correct, | |
| "trust_change": trust_change, | |
| "new_trust": new_trust | |
| } | |
| async def save_feedback_direct(file: UploadFile = File(...), correct_label: str = Form(...)): | |
| # Standardize correct label to folder target | |
| folder = "real" if correct_label.lower() in ("real", "true", "asli") else "fake" | |
| target_dir = os.path.join(FEEDBACK_DIR, folder) | |
| os.makedirs(target_dir, exist_ok=True) | |
| safe_name = file.filename.replace("\\", "/").split("/")[-1] | |
| target_path = os.path.join(target_dir, safe_name) | |
| # Save uploaded file content directly | |
| contents = await file.read() | |
| with open(target_path, "wb") as f: | |
| f.write(contents) | |
| # Forward feedback to Hugging Face space | |
| try: | |
| hf_label = "AI" if folder == "fake" else "REAL" | |
| files = {"file": (safe_name, contents, "image/jpeg")} | |
| data_payload = {"correct_label": hf_label} | |
| async with httpx.AsyncClient() as client: | |
| res = await client.post("https://alstears-ai-forensic-detector.hf.space/save-feedback", files=files, data=data_payload, timeout=15.0) | |
| print(f"Direct feedback forwarded to HF. Status: {res.status_code}") | |
| except Exception as e: | |
| print(f"Failed to forward direct feedback to HF: {e}") | |
| return { | |
| "status": "saved", | |
| "path": f"feedback/{folder}/{safe_name}" | |
| } | |
| def api_get_history(username: str): | |
| return {"history": database.get_user_history(username)} | |
| def api_clear_history(): | |
| database.clear_all_history() | |
| return {"status": "success", "message": "Semua history berhasil dihapus"} | |
| def api_accuracy_report(username: str = None, filter: str = "all"): | |
| import datetime | |
| import traceback | |
| try: | |
| now = datetime.datetime.now() | |
| # Calculate time threshold | |
| threshold_str = None | |
| if filter == "today": | |
| threshold_str = now.strftime("%Y-%m-%d") | |
| elif filter == "week": | |
| threshold_str = (now - datetime.timedelta(days=7)).isoformat() | |
| elif filter == "month": | |
| threshold_str = (now - datetime.timedelta(days=30)).isoformat() | |
| conn = database.get_connection() | |
| # Fetch test results with time filter | |
| query_results = "SELECT * FROM test_results" | |
| params_results = [] | |
| conditions = [] | |
| if username: | |
| conditions.append("username = ?") | |
| params_results.append(username) | |
| if threshold_str: | |
| conditions.append("scan_date >= ?") | |
| params_results.append(threshold_str) | |
| if conditions: | |
| query_results += " WHERE " + " AND ".join(conditions) | |
| rows = conn.execute(query_results, params_results).fetchall() | |
| # Compute Confusion Matrix, Failures, and Confidence Distributions | |
| tp = 0 | |
| fp = 0 | |
| fn = 0 | |
| tn = 0 | |
| real_conf_buckets = [0, 0, 0, 0, 0] # 50-60, 60-70, 70-80, 80-90, 90-100 | |
| ai_conf_buckets = [0, 0, 0, 0, 0] | |
| failures = [] | |
| for r in rows: | |
| if not r["folder_label"]: | |
| continue | |
| expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL" | |
| final_pred = r["corrected_label"] or r["prediction"] or "REAL" | |
| # Safe None check for confidence | |
| confidence = float(r["confidence"]) if r["confidence"] is not None else 0.0 | |
| if expected == "AI" and final_pred == "AI": | |
| tp += 1 | |
| elif expected == "REAL" and final_pred == "AI": | |
| fp += 1 | |
| elif expected == "AI" and final_pred == "REAL": | |
| fn += 1 | |
| elif expected == "REAL" and final_pred == "REAL": | |
| tn += 1 | |
| # Add mismatch (prediction failure) to failure log | |
| if final_pred != expected: | |
| failures.append({ | |
| "filename": r["filename"] or "Unknown File", | |
| "expected": expected, | |
| "prediction": r["prediction"] or "REAL", | |
| "final_pred": final_pred, | |
| "confidence": confidence, | |
| "date": r["scan_date"][:19].replace("T", " ") if r["scan_date"] else "-" | |
| }) | |
| bucket_idx = min(int((confidence - 50) / 10), 4) | |
| if bucket_idx >= 0: | |
| if final_pred == "AI": | |
| ai_conf_buckets[bucket_idx] += 1 | |
| else: | |
| real_conf_buckets[bucket_idx] += 1 | |
| # Calculate global metrics | |
| total = tp + fp + fn + tn | |
| correct = tp + tn | |
| wrong = fp + fn | |
| accuracy = round((correct / total * 100), 1) if total > 0 else 0.0 | |
| # Calculate Advanced ML metrics | |
| precision = round((tp / (tp + fp) * 100), 1) if (tp + fp) > 0 else 0.0 | |
| recall = round((tp / (tp + fn) * 100), 1) if (tp + fn) > 0 else 0.0 | |
| f1_score = round((2 * (precision * recall) / (precision + recall)), 1) if (precision + recall) > 0 else 0.0 | |
| # Count scans with time filter | |
| q_scan = "SELECT COUNT(*) as cnt FROM scan_history" | |
| p_scan = [] | |
| if username or threshold_str: | |
| conds = [] | |
| if username: | |
| conds.append("username = ?") | |
| p_scan.append(username) | |
| if threshold_str: | |
| conds.append("scan_date >= ?") | |
| p_scan.append(threshold_str) | |
| q_scan += " WHERE " + " AND ".join(conds) | |
| scan_count = conn.execute(q_scan, p_scan).fetchone()["cnt"] | |
| # Count batch images with time filter | |
| q_batch = "SELECT COALESCE(SUM(total_images), 0) as cnt FROM test_batches" | |
| p_batch = [] | |
| if username or threshold_str: | |
| conds = [] | |
| if username: | |
| conds.append("username = ?") | |
| p_batch.append(username) | |
| if threshold_str: | |
| conds.append("test_date >= ?") | |
| p_batch.append(threshold_str) | |
| q_batch += " WHERE " + " AND ".join(conds) | |
| batch_images = conn.execute(q_batch, p_batch).fetchone()["cnt"] | |
| # Count other parameters | |
| learning_count = database.get_learning_data_count() | |
| q_batches = "SELECT * FROM test_batches" | |
| p_batches = [] | |
| conds_b = [] | |
| if username: | |
| conds_b.append("username = ?") | |
| p_batches.append(username) | |
| if threshold_str: | |
| conds_b.append("test_date >= ?") | |
| p_batches.append(threshold_str) | |
| if conds_b: | |
| q_batches += " WHERE " + " AND ".join(conds_b) | |
| q_batches += " ORDER BY id DESC" | |
| batches_rows = conn.execute(q_batches, p_batches).fetchall() | |
| batches = [dict(row) for row in batches_rows] | |
| conn.close() | |
| return { | |
| "stats": { | |
| "total": total, | |
| "correct": correct, | |
| "wrong": wrong, | |
| "accuracy": accuracy, | |
| "precision": precision, | |
| "recall": recall, | |
| "f1_score": f1_score | |
| }, | |
| "confusion_matrix": { | |
| "tp": tp, | |
| "fp": fp, | |
| "fn": fn, | |
| "tn": tn | |
| }, | |
| "confidence_distribution": { | |
| "buckets": ["50-60%", "60-70%", "70-80%", "80-90%", "90-100%"], | |
| "real": real_conf_buckets, | |
| "ai": ai_conf_buckets | |
| }, | |
| "failures": failures[:15], # limit to latest 15 failures | |
| "batches": batches, | |
| "learning_data_count": learning_count, | |
| "scan_count": scan_count, | |
| "batch_images": batch_images | |
| } | |
| except Exception as e: | |
| print("EXCEPTION DETECTED IN ACCURACY REPORT:") | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def api_download_feedback(background: BackgroundTasks, username: str = ""): | |
| if (username or "").lower() != "sandikad": | |
| raise HTTPException(status_code=403, detail="Akses ditolak. Hanya user Sandikad yang dapat mendownload feedback.") | |
| zip_path = os.path.join(BASE_DIR, f"feedback_{time.strftime('%Y%m%d_%H%M%S')}.zip") | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for root, dirs, files in os.walk(FEEDBACK_DIR): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| arcname = os.path.relpath(file_path, FEEDBACK_DIR) | |
| zf.write(file_path, arcname) | |
| background.add_task(os.remove, zip_path) | |
| return FileResponse(zip_path, media_type="application/zip", | |
| filename=os.path.basename(zip_path)) | |