from fastapi import FastAPI, UploadFile, File, Form, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, FileResponse from fastapi.staticfiles import StaticFiles import shutil, os, time, uuid, zipfile import database import httpx BASE_DIR = os.path.dirname(os.path.abspath(__file__)) FEEDBACK_DIR = os.path.join(BASE_DIR, "feedback") PENDING_DIR = os.path.join(FEEDBACK_DIR, "pending") os.makedirs(f"{FEEDBACK_DIR}/real", exist_ok=True) os.makedirs(f"{FEEDBACK_DIR}/fake", exist_ok=True) os.makedirs(PENDING_DIR, exist_ok=True) app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Serve static images app.mount("/images", StaticFiles(directory=os.path.join(BASE_DIR, "images")), name="images") @app.get("/") def root(): with open(os.path.join(BASE_DIR, "index.html"), encoding="utf-8") as f: return HTMLResponse(f.read()) @app.get("/style.css") def serve_css(): with open(os.path.join(BASE_DIR, "style.css"), encoding="utf-8") as f: return HTMLResponse(f.read(), media_type="text/css") @app.get("/template/style.css") def serve_template_css(): with open(os.path.join(BASE_DIR, "template", "style.css"), encoding="utf-8") as f: return HTMLResponse(f.read(), media_type="text/css") @app.get("/script.js") def serve_js(): with open(os.path.join(BASE_DIR, "script.js"), encoding="utf-8") as f: return HTMLResponse(f.read(), media_type="application/javascript") @app.on_event("startup") def startup(): database.init_db() database.migrate_existing_learning_data() database.sync_scan_history_to_test_results() @app.post("/api/register") def api_register(username: str = Form(...), password: str = Form(...), name: str = Form(...)): if database.register_user(username, password, name): return {"status": "success", "message": "Registrasi berhasil!"} raise HTTPException(status_code=400, detail="Username sudah digunakan") @app.post("/api/login") def api_login(username: str = Form(...), password: str = Form(...)): user = database.login_user(username, password) if user: return { "status": "success", "name": user["name"], "username": user["username"], "trust_score": user["trust_score"] if "trust_score" in user and user["trust_score"] is not None else 50 } raise HTTPException(status_code=401, detail="Username atau password salah") import math from PIL import Image def get_color_feature_vector(img_path): try: with Image.open(img_path) as img: img = img.resize((64, 64)) hist = img.histogram() bins = [] for i in range(0, len(hist), 32): bins.append(sum(hist[i:i+32])) return bins except Exception: return [1.0] * 24 def cosine_similarity(v1, v2): dot_product = sum(a * b for a, b in zip(v1, v2)) norm_v1 = math.sqrt(sum(a * a for a in v1)) norm_v2 = math.sqrt(sum(b * b for b in v2)) if norm_v1 == 0 or norm_v2 == 0: return 0.0 return dot_product / (norm_v1 * norm_v2) # Reference vector representing typical digital photo color centroid REF_VECTOR = [100.0, 150.0, 200.0, 180.0, 120.0, 90.0, 80.0, 110.0, 130.0, 140.0, 160.0, 170.0, 190.0, 210.0, 220.0, 230.0, 240.0, 250.0, 200.0, 150.0, 100.0, 80.0, 60.0, 40.0] def analyze_image_conditions(img_path): try: with Image.open(img_path) as img: img_rgb = img.convert('RGB') img_small = img_rgb.resize((32, 32)) pixels = list(img_small.getdata()) grayscale_diffs = [] brightness_vals = [] for r, g, b in pixels: brightness = 0.299*r + 0.587*g + 0.114*b brightness_vals.append(brightness) diff = abs(r - g) + abs(g - b) + abs(b - r) grayscale_diffs.append(diff) avg_brightness = sum(brightness_vals) / len(brightness_vals) avg_diff = sum(grayscale_diffs) / len(grayscale_diffs) is_dark = 1 if avg_brightness < 45 else 0 is_grayscale = 1 if avg_diff < 12 else 0 return bool(is_dark), bool(is_grayscale), round(avg_brightness, 1) except Exception: return False, False, 127.0 def save_compressed_image(source_path, target_path, max_size=(512, 512)): try: with Image.open(source_path) as img: if img.mode in ("RGBA", "P"): img = img.convert("RGB") img.thumbnail(max_size, Image.Resampling.LANCZOS) ext = target_path.split('.')[-1].lower() fmt = "PNG" if ext == "png" else "JPEG" if fmt == "JPEG": img.save(target_path, "JPEG", quality=80, optimize=True) else: img.save(target_path, "PNG", optimize=True) return True except Exception: import shutil shutil.copy2(source_path, target_path) return False HF_API_URL = "https://alstears-ai-forensic-detector.hf.space/predict" @app.post("/api/scan-image") def api_scan_image(file: UploadFile = File(...), username: str = Form(...)): if not file.filename.lower().endswith(('png', 'jpg', 'jpeg', 'webp')): raise HTTPException(status_code=400, detail="Format gambar tidak didukung") uid = uuid.uuid4().hex[:8] safe_name = file.filename.replace("\\", "/").split("/")[-1] temp_path = f"temp_{uid}_{safe_name}" with open(temp_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) try: with open(temp_path, "rb") as f: resp = httpx.post(HF_API_URL, files={"file": (file.filename, f, "image/jpeg")}, timeout=30) if resp.status_code != 200: raise HTTPException(status_code=502, detail="Gagal menghubungi AI detector") result = resp.json() prediction = result.get("prediction", "REAL") confidence_str = result.get("confidence", "0.0%") try: if isinstance(confidence_str, str): confidence_val = float(confidence_str.replace("%", "").strip()) / 100.0 else: confidence_val = float(confidence_str) except Exception: confidence_val = 0.85 # fallback is_ai = prediction.upper() in ("AI", "FAKE") source = "Pollinations AI (Stable Diffusion)" if is_ai else "Kamera/Foto Digital Asli" accuracy = round(confidence_val * 100, 1) feedback_path = f"{PENDING_DIR}/{uid}_{safe_name}" save_compressed_image(temp_path, feedback_path) file_size = os.path.getsize(temp_path) if username != "__guest__": database.add_scan_history(username, file.filename, "Image", f"{file_size/(1024*1024):.2f} MB", source, is_ai, accuracy) # Calculate color similarity & outlier detection (Solusi 2) vector = get_color_feature_vector(temp_path) similarity = cosine_similarity(vector, REF_VECTOR) similarity = round(similarity, 3) is_outlier = 1 if similarity < 0.88 else 0 # Detect low-light and monochrome conditions is_dark, is_grayscale_local, avg_brightness = analyze_image_conditions(temp_path) is_monochrome_detected = bool(result.get("is_monochrome_detected", False)) is_grayscale = is_monochrome_detected # Check for Trap image (Solusi 3) is_trap = 1 if "trap" in file.filename.lower() else 0 # Get user's current trust score trust_score = 50 if username != "__guest__": conn = database.get_connection() user_row = conn.execute("SELECT trust_score FROM users WHERE username=?", (username,)).fetchone() trust_score = user_row["trust_score"] if user_row else 50 conn.close() # --- SINKRONISASI AKURASI GAMBAR TUNGGAL (Poin 2) --- # Coba tebak ground truth (REAL/AI) dari nama file (misal: real11.jpg, fake4.jpg) prediction_label = "AI" if is_ai else "REAL" inferred_label = None fn_lower = file.filename.lower() if "real" in fn_lower: inferred_label = "REAL" elif "fake" in fn_lower or "ai" in fn_lower: inferred_label = "AI" if inferred_label and username != "__guest__": is_mismatch = 1 if prediction_label != inferred_label else 0 database.save_test_result( None, username, file.filename, inferred_label, prediction_label, accuracy, is_mismatch ) # Simpan data pembelajaran mismatch jika tebakan salah if is_mismatch: database.save_learning_data( username, file.filename, prediction_label, inferred_label, accuracy, source="single_mismatch" ) return { "status": "success", "filename": file.filename, "feedback_id": uid, "type": "image", "file_size": f"{file_size/(1024*1024):.2f} MB", "source": source, "is_ai": is_ai, "accuracy": accuracy, "date": time.strftime("%Y-%m-%d %H:%M:%S"), "similarity": similarity, "is_outlier": bool(is_outlier), "is_trap": bool(is_trap), "trust_score": trust_score, "is_dark": is_dark, "is_grayscale": is_grayscale, "is_monochrome_detected": is_monochrome_detected, "avg_brightness": avg_brightness, "forensic_analysis_logs": result.get("forensic_analysis_logs", {}) } finally: if os.path.exists(temp_path): os.remove(temp_path) def scan_single_image(file_bytes, filename): resp = httpx.post(HF_API_URL, files={"file": (filename, file_bytes, "image/jpeg")}, timeout=30) if resp.status_code != 200: return None return resp.json() @app.post("/api/batch-scan") async def api_batch_scan(files: list[UploadFile] = File(...), username: str = Form(...), labels: str = Form("[]")): import json as json_mod try: parsed_labels = json_mod.loads(labels) except: parsed_labels = [] results = [] saved_bytes = {} for idx, file in enumerate(files): ext = file.filename.lower().split('.')[-1] if ext not in ('png', 'jpg', 'jpeg', 'webp'): continue folder_label = None # Safe lookup in dictionary map or fallback to list if isinstance(parsed_labels, dict): # Try exact match, then fallback to case-insensitive match folder_label = parsed_labels.get(file.filename) if not folder_label: # Extract just the base filename in case of relative path difference base_filename = file.filename.replace("\\", "/").split("/")[-1] for k, v in parsed_labels.items(): k_base = k.replace("\\", "/").split("/")[-1] if k_base.lower() == base_filename.lower(): folder_label = v break elif isinstance(parsed_labels, list) and idx < len(parsed_labels): folder_label = parsed_labels[idx] # Standardize folder label to uppercase if folder_label: folder_label_upper = str(folder_label).upper() folder_label = "AI" if folder_label_upper in ("FAKE", "AI") else "REAL" bytes_data = await file.read() try: resp = httpx.post(HF_API_URL, files={"file": (file.filename, bytes_data, "image/jpeg")}, timeout=30) if resp.status_code != 200: results.append({"filename": file.filename, "folder_label": folder_label, "error": "Gagal scan"}) continue result = resp.json() prediction = result.get("prediction", "REAL") confidence_str = result.get("confidence", "0.0%") try: if isinstance(confidence_str, str): confidence_val = float(confidence_str.replace("%", "").strip()) / 100.0 else: confidence_val = float(confidence_str) except Exception: confidence_val = 0.85 # fallback prediction_label = "AI" if prediction.upper() in ("AI", "FAKE") else "REAL" confidence_pct = round(confidence_val * 100, 1) is_mismatch = 0 if folder_label: expected = "AI" if folder_label.upper() in ("FAKE", "AI") else "REAL" if prediction_label != expected: is_mismatch = 1 results.append({ "filename": file.filename, "folder_label": folder_label, "prediction": prediction_label, "confidence": confidence_pct, "is_mismatch": is_mismatch }) if is_mismatch and folder_label: uid = uuid.uuid4().hex[:8] safe_name = file.filename.replace("\\", "/").split("/")[-1] pending_path = f"{PENDING_DIR}/{uid}_{safe_name}" with open(pending_path, "wb") as pf: pf.write(bytes_data) saved_bytes[file.filename] = uid results[-1]["feedback_id"] = uid else: results[-1]["feedback_id"] = "" except Exception as e: results.append({"filename": file.filename, "folder_label": folder_label, "error": str(e)}) total = len(results) has_labels = any(r.get("folder_label") is not None for r in results) if has_labels: mismatches = [r for r in results if r.get("is_mismatch")] mismatch_count = len(mismatches) correct_count = total - mismatch_count accuracy = round((correct_count / total * 100), 1) if total > 0 else 0 else: mismatch_count = 0 correct_count = 0 accuracy = -1.0 batch_id = database.create_test_batch(username, total, correct_count, mismatch_count, accuracy) for r in results: database.save_test_result( batch_id, username, r["filename"], r.get("folder_label"), r.get("prediction", "ERROR"), r.get("confidence", 0.0), r.get("is_mismatch", 0) ) if r.get("is_mismatch") and r.get("folder_label"): expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL" database.save_learning_data( username, r["filename"], r.get("prediction", "ERROR"), expected, r.get("confidence", 0.0), source="batch_mismatch" ) fid = r.get("feedback_id", "") if fid: safe_name = r['filename'].replace("\\", "/").split("/")[-1] pending_file = f"{PENDING_DIR}/{fid}_{safe_name}" target_dir = f"{FEEDBACK_DIR}/real" if expected == "REAL" else f"{FEEDBACK_DIR}/fake" target_path = f"{target_dir}/{safe_name}" if os.path.exists(pending_file): os.makedirs(target_dir, exist_ok=True) shutil.move(pending_file, target_path) return { "batch_id": batch_id, "total": total, "correct": correct_count, "wrong": mismatch_count, "accuracy": accuracy, "results": results, "needs_confirmation": mismatch_count if 1 <= mismatch_count <= 5 else 0 } @app.post("/api/batch-confirm") def api_batch_confirm(data: dict): batch_id = data.get("batch_id") corrections = data.get("corrections", []) results = database.get_test_results_by_batch(batch_id) for corr in corrections: idx = corr.get("index") user_answer = corr.get("user_answer") if idx < len(results): r = results[idx] corrected_label = user_answer.upper() original_prediction = r["prediction"] confidence = r["confidence"] database.update_test_result_correction(r["id"], corrected_label, corrected_label) if original_prediction != corrected_label: database.save_learning_data( r["username"], r["filename"], original_prediction, corrected_label, confidence ) results = database.get_test_results_by_batch(batch_id) total_with_label = 0 correct = 0 for r in results: if not r["folder_label"]: continue total_with_label += 1 final_label = r.get("corrected_label") or r["prediction"] expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL" if final_label == expected: correct += 1 wrong = total_with_label - correct accuracy = round((correct / total_with_label * 100), 1) if total_with_label > 0 else 0 return {"status": "success", "total": total_with_label, "correct": correct, "wrong": wrong, "accuracy": accuracy} def forward_feedback_to_hf_space(file_path: str, filename: str, correct_label: str): if not os.path.exists(file_path): return try: hf_label = "AI" if correct_label.upper() in ("FAKE", "AI") else "REAL" with open(file_path, "rb") as f: files = {"file": (filename, f, "image/jpeg")} data = {"correct_label": hf_label} with httpx.Client() as client: res = client.post("https://alstears-ai-forensic-detector.hf.space/save-feedback", files=files, data=data, timeout=15.0) print(f"Feedback successfully forwarded to Hugging Face space. Status: {res.status_code}, Response: {res.text}") except Exception as e: print(f"Error forwarding feedback to Hugging Face space: {e}") @app.post("/api/correction-single") def api_correction_single(data: dict): username = data.get("username") filename = data.get("filename") original_prediction = data.get("original_prediction") correct_label = data.get("correct_label") confidence = data.get("confidence", 0) feedback_id = data.get("feedback_id") # Standardize correct_label to uppercase correct_label = "AI" if str(correct_label).upper() in ("FAKE", "AI") else "REAL" original_prediction = "AI" if str(original_prediction).upper() in ("FAKE", "AI") else "REAL" # Determine if it is a trap image and adjust trust score is_trap = "trap" in filename.lower() trap_correct = False trust_change = 0 new_trust = 50 if username == "__guest__": if feedback_id: safe_name = filename.replace("\\", "/").split("/")[-1] pending_file = f"{PENDING_DIR}/{feedback_id}_{safe_name}" target_dir = f"{FEEDBACK_DIR}/real" if correct_label == "REAL" else f"{FEEDBACK_DIR}/fake" target_path = f"{target_dir}/{safe_name}" if os.path.exists(pending_file): os.makedirs(target_dir, exist_ok=True) save_compressed_image(pending_file, target_path) # Forward feedback to HF Space forward_feedback_to_hf_space(pending_file, safe_name, correct_label) os.remove(pending_file) return { "status": "success", "is_trap": is_trap, "trap_correct": trap_correct, "trust_change": trust_change, "new_trust": new_trust } if is_trap: if "trap_real" in filename.lower(): true_label = "REAL" elif "trap_ai" in filename.lower() or "trap_fake" in filename.lower(): true_label = "AI" else: true_label = "REAL" if original_prediction == "AI" else "AI" if correct_label == true_label: trap_correct = True trust_change = 5 else: trap_correct = False trust_change = -15 conn = database.get_connection() user_row = conn.execute("SELECT trust_score FROM users WHERE username=?", (username,)).fetchone() if user_row: current_trust = user_row["trust_score"] if user_row["trust_score"] is not None else 50 new_trust = max(0, min(100, current_trust + trust_change)) conn.execute("UPDATE users SET trust_score=? WHERE username=?", (new_trust, username)) conn.commit() conn.close() # Save to learning data for retraining database.save_learning_data(username, filename, original_prediction, correct_label, confidence) # Update or insert into test_results conn = database.get_connection() existing = conn.execute("SELECT id FROM test_results WHERE username=? AND filename=? AND batch_id IS NULL", (username, filename)).fetchone() is_mismatch = 1 if original_prediction != correct_label else 0 if existing: conn.execute("UPDATE test_results SET folder_label=?, prediction=?, confidence=?, is_mismatch=?, corrected_label=? WHERE id=?", (correct_label, original_prediction, confidence, is_mismatch, correct_label, existing["id"])) conn.commit() else: database.save_test_result(None, username, filename, correct_label, original_prediction, confidence, is_mismatch) conn.close() if feedback_id: safe_name = filename.replace("\\", "/").split("/")[-1] pending_file = f"{PENDING_DIR}/{feedback_id}_{safe_name}" target_dir = f"{FEEDBACK_DIR}/real" if correct_label == "REAL" else f"{FEEDBACK_DIR}/fake" target_path = f"{target_dir}/{safe_name}" if os.path.exists(pending_file): os.makedirs(target_dir, exist_ok=True) save_compressed_image(pending_file, target_path) # Forward feedback to HF Space forward_feedback_to_hf_space(pending_file, safe_name, correct_label) os.remove(pending_file) return { "status": "success", "is_trap": is_trap, "trap_correct": trap_correct, "trust_change": trust_change, "new_trust": new_trust } @app.post("/save-feedback") async def save_feedback_direct(file: UploadFile = File(...), correct_label: str = Form(...)): # Standardize correct label to folder target folder = "real" if correct_label.lower() in ("real", "true", "asli") else "fake" target_dir = os.path.join(FEEDBACK_DIR, folder) os.makedirs(target_dir, exist_ok=True) safe_name = file.filename.replace("\\", "/").split("/")[-1] target_path = os.path.join(target_dir, safe_name) # Save uploaded file content directly contents = await file.read() with open(target_path, "wb") as f: f.write(contents) # Forward feedback to Hugging Face space try: hf_label = "AI" if folder == "fake" else "REAL" files = {"file": (safe_name, contents, "image/jpeg")} data_payload = {"correct_label": hf_label} async with httpx.AsyncClient() as client: res = await client.post("https://alstears-ai-forensic-detector.hf.space/save-feedback", files=files, data=data_payload, timeout=15.0) print(f"Direct feedback forwarded to HF. Status: {res.status_code}") except Exception as e: print(f"Failed to forward direct feedback to HF: {e}") return { "status": "saved", "path": f"feedback/{folder}/{safe_name}" } @app.get("/api/history/{username}") def api_get_history(username: str): return {"history": database.get_user_history(username)} @app.get("/api/clear-history") def api_clear_history(): database.clear_all_history() return {"status": "success", "message": "Semua history berhasil dihapus"} @app.get("/api/accuracy-report") def api_accuracy_report(username: str = None, filter: str = "all"): import datetime import traceback try: now = datetime.datetime.now() # Calculate time threshold threshold_str = None if filter == "today": threshold_str = now.strftime("%Y-%m-%d") elif filter == "week": threshold_str = (now - datetime.timedelta(days=7)).isoformat() elif filter == "month": threshold_str = (now - datetime.timedelta(days=30)).isoformat() conn = database.get_connection() # Fetch test results with time filter query_results = "SELECT * FROM test_results" params_results = [] conditions = [] if username: conditions.append("username = ?") params_results.append(username) if threshold_str: conditions.append("scan_date >= ?") params_results.append(threshold_str) if conditions: query_results += " WHERE " + " AND ".join(conditions) rows = conn.execute(query_results, params_results).fetchall() # Compute Confusion Matrix, Failures, and Confidence Distributions tp = 0 fp = 0 fn = 0 tn = 0 real_conf_buckets = [0, 0, 0, 0, 0] # 50-60, 60-70, 70-80, 80-90, 90-100 ai_conf_buckets = [0, 0, 0, 0, 0] failures = [] for r in rows: if not r["folder_label"]: continue expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL" final_pred = r["corrected_label"] or r["prediction"] or "REAL" # Safe None check for confidence confidence = float(r["confidence"]) if r["confidence"] is not None else 0.0 if expected == "AI" and final_pred == "AI": tp += 1 elif expected == "REAL" and final_pred == "AI": fp += 1 elif expected == "AI" and final_pred == "REAL": fn += 1 elif expected == "REAL" and final_pred == "REAL": tn += 1 # Add mismatch (prediction failure) to failure log if final_pred != expected: failures.append({ "filename": r["filename"] or "Unknown File", "expected": expected, "prediction": r["prediction"] or "REAL", "final_pred": final_pred, "confidence": confidence, "date": r["scan_date"][:19].replace("T", " ") if r["scan_date"] else "-" }) bucket_idx = min(int((confidence - 50) / 10), 4) if bucket_idx >= 0: if final_pred == "AI": ai_conf_buckets[bucket_idx] += 1 else: real_conf_buckets[bucket_idx] += 1 # Calculate global metrics total = tp + fp + fn + tn correct = tp + tn wrong = fp + fn accuracy = round((correct / total * 100), 1) if total > 0 else 0.0 # Calculate Advanced ML metrics precision = round((tp / (tp + fp) * 100), 1) if (tp + fp) > 0 else 0.0 recall = round((tp / (tp + fn) * 100), 1) if (tp + fn) > 0 else 0.0 f1_score = round((2 * (precision * recall) / (precision + recall)), 1) if (precision + recall) > 0 else 0.0 # Count scans with time filter q_scan = "SELECT COUNT(*) as cnt FROM scan_history" p_scan = [] if username or threshold_str: conds = [] if username: conds.append("username = ?") p_scan.append(username) if threshold_str: conds.append("scan_date >= ?") p_scan.append(threshold_str) q_scan += " WHERE " + " AND ".join(conds) scan_count = conn.execute(q_scan, p_scan).fetchone()["cnt"] # Count batch images with time filter q_batch = "SELECT COALESCE(SUM(total_images), 0) as cnt FROM test_batches" p_batch = [] if username or threshold_str: conds = [] if username: conds.append("username = ?") p_batch.append(username) if threshold_str: conds.append("test_date >= ?") p_batch.append(threshold_str) q_batch += " WHERE " + " AND ".join(conds) batch_images = conn.execute(q_batch, p_batch).fetchone()["cnt"] # Count other parameters learning_count = database.get_learning_data_count() q_batches = "SELECT * FROM test_batches" p_batches = [] conds_b = [] if username: conds_b.append("username = ?") p_batches.append(username) if threshold_str: conds_b.append("test_date >= ?") p_batches.append(threshold_str) if conds_b: q_batches += " WHERE " + " AND ".join(conds_b) q_batches += " ORDER BY id DESC" batches_rows = conn.execute(q_batches, p_batches).fetchall() batches = [dict(row) for row in batches_rows] conn.close() return { "stats": { "total": total, "correct": correct, "wrong": wrong, "accuracy": accuracy, "precision": precision, "recall": recall, "f1_score": f1_score }, "confusion_matrix": { "tp": tp, "fp": fp, "fn": fn, "tn": tn }, "confidence_distribution": { "buckets": ["50-60%", "60-70%", "70-80%", "80-90%", "90-100%"], "real": real_conf_buckets, "ai": ai_conf_buckets }, "failures": failures[:15], # limit to latest 15 failures "batches": batches, "learning_data_count": learning_count, "scan_count": scan_count, "batch_images": batch_images } except Exception as e: print("EXCEPTION DETECTED IN ACCURACY REPORT:") traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/download-feedback") def api_download_feedback(background: BackgroundTasks, username: str = ""): if (username or "").lower() != "sandikad": raise HTTPException(status_code=403, detail="Akses ditolak. Hanya user Sandikad yang dapat mendownload feedback.") zip_path = os.path.join(BASE_DIR, f"feedback_{time.strftime('%Y%m%d_%H%M%S')}.zip") with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: for root, dirs, files in os.walk(FEEDBACK_DIR): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, FEEDBACK_DIR) zf.write(file_path, arcname) background.add_task(os.remove, zip_path) return FileResponse(zip_path, media_type="application/zip", filename=os.path.basename(zip_path))