ai-detector-backend / backend.py
Alstears's picture
Upload 142 files
cca874a verified
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.staticfiles import StaticFiles
import shutil, os, time, uuid, zipfile
import database
import httpx
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
FEEDBACK_DIR = os.path.join(BASE_DIR, "feedback")
PENDING_DIR = os.path.join(FEEDBACK_DIR, "pending")
os.makedirs(f"{FEEDBACK_DIR}/real", exist_ok=True)
os.makedirs(f"{FEEDBACK_DIR}/fake", exist_ok=True)
os.makedirs(PENDING_DIR, exist_ok=True)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Serve static images
app.mount("/images", StaticFiles(directory=os.path.join(BASE_DIR, "images")), name="images")
@app.get("/")
def root():
with open(os.path.join(BASE_DIR, "index.html"), encoding="utf-8") as f:
return HTMLResponse(f.read())
@app.get("/style.css")
def serve_css():
with open(os.path.join(BASE_DIR, "style.css"), encoding="utf-8") as f:
return HTMLResponse(f.read(), media_type="text/css")
@app.get("/template/style.css")
def serve_template_css():
with open(os.path.join(BASE_DIR, "template", "style.css"), encoding="utf-8") as f:
return HTMLResponse(f.read(), media_type="text/css")
@app.get("/script.js")
def serve_js():
with open(os.path.join(BASE_DIR, "script.js"), encoding="utf-8") as f:
return HTMLResponse(f.read(), media_type="application/javascript")
@app.on_event("startup")
def startup():
database.init_db()
database.migrate_existing_learning_data()
database.sync_scan_history_to_test_results()
@app.post("/api/register")
def api_register(username: str = Form(...), password: str = Form(...), name: str = Form(...)):
if database.register_user(username, password, name):
return {"status": "success", "message": "Registrasi berhasil!"}
raise HTTPException(status_code=400, detail="Username sudah digunakan")
@app.post("/api/login")
def api_login(username: str = Form(...), password: str = Form(...)):
user = database.login_user(username, password)
if user:
return {
"status": "success",
"name": user["name"],
"username": user["username"],
"trust_score": user["trust_score"] if "trust_score" in user and user["trust_score"] is not None else 50
}
raise HTTPException(status_code=401, detail="Username atau password salah")
import math
from PIL import Image
def get_color_feature_vector(img_path):
try:
with Image.open(img_path) as img:
img = img.resize((64, 64))
hist = img.histogram()
bins = []
for i in range(0, len(hist), 32):
bins.append(sum(hist[i:i+32]))
return bins
except Exception:
return [1.0] * 24
def cosine_similarity(v1, v2):
dot_product = sum(a * b for a, b in zip(v1, v2))
norm_v1 = math.sqrt(sum(a * a for a in v1))
norm_v2 = math.sqrt(sum(b * b for b in v2))
if norm_v1 == 0 or norm_v2 == 0:
return 0.0
return dot_product / (norm_v1 * norm_v2)
# Reference vector representing typical digital photo color centroid
REF_VECTOR = [100.0, 150.0, 200.0, 180.0, 120.0, 90.0, 80.0, 110.0, 130.0, 140.0, 160.0, 170.0, 190.0, 210.0, 220.0, 230.0, 240.0, 250.0, 200.0, 150.0, 100.0, 80.0, 60.0, 40.0]
def analyze_image_conditions(img_path):
try:
with Image.open(img_path) as img:
img_rgb = img.convert('RGB')
img_small = img_rgb.resize((32, 32))
pixels = list(img_small.getdata())
grayscale_diffs = []
brightness_vals = []
for r, g, b in pixels:
brightness = 0.299*r + 0.587*g + 0.114*b
brightness_vals.append(brightness)
diff = abs(r - g) + abs(g - b) + abs(b - r)
grayscale_diffs.append(diff)
avg_brightness = sum(brightness_vals) / len(brightness_vals)
avg_diff = sum(grayscale_diffs) / len(grayscale_diffs)
is_dark = 1 if avg_brightness < 45 else 0
is_grayscale = 1 if avg_diff < 12 else 0
return bool(is_dark), bool(is_grayscale), round(avg_brightness, 1)
except Exception:
return False, False, 127.0
def save_compressed_image(source_path, target_path, max_size=(512, 512)):
try:
with Image.open(source_path) as img:
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
img.thumbnail(max_size, Image.Resampling.LANCZOS)
ext = target_path.split('.')[-1].lower()
fmt = "PNG" if ext == "png" else "JPEG"
if fmt == "JPEG":
img.save(target_path, "JPEG", quality=80, optimize=True)
else:
img.save(target_path, "PNG", optimize=True)
return True
except Exception:
import shutil
shutil.copy2(source_path, target_path)
return False
HF_API_URL = "https://alstears-ai-forensic-detector.hf.space/predict"
@app.post("/api/scan-image")
def api_scan_image(file: UploadFile = File(...), username: str = Form(...)):
if not file.filename.lower().endswith(('png', 'jpg', 'jpeg', 'webp')):
raise HTTPException(status_code=400, detail="Format gambar tidak didukung")
uid = uuid.uuid4().hex[:8]
safe_name = file.filename.replace("\\", "/").split("/")[-1]
temp_path = f"temp_{uid}_{safe_name}"
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
try:
with open(temp_path, "rb") as f:
resp = httpx.post(HF_API_URL, files={"file": (file.filename, f, "image/jpeg")}, timeout=30)
if resp.status_code != 200:
raise HTTPException(status_code=502, detail="Gagal menghubungi AI detector")
result = resp.json()
prediction = result.get("prediction", "REAL")
confidence_str = result.get("confidence", "0.0%")
try:
if isinstance(confidence_str, str):
confidence_val = float(confidence_str.replace("%", "").strip()) / 100.0
else:
confidence_val = float(confidence_str)
except Exception:
confidence_val = 0.85 # fallback
is_ai = prediction.upper() in ("AI", "FAKE")
source = "Pollinations AI (Stable Diffusion)" if is_ai else "Kamera/Foto Digital Asli"
accuracy = round(confidence_val * 100, 1)
feedback_path = f"{PENDING_DIR}/{uid}_{safe_name}"
save_compressed_image(temp_path, feedback_path)
file_size = os.path.getsize(temp_path)
if username != "__guest__":
database.add_scan_history(username, file.filename, "Image", f"{file_size/(1024*1024):.2f} MB", source, is_ai, accuracy)
# Calculate color similarity & outlier detection (Solusi 2)
vector = get_color_feature_vector(temp_path)
similarity = cosine_similarity(vector, REF_VECTOR)
similarity = round(similarity, 3)
is_outlier = 1 if similarity < 0.88 else 0
# Detect low-light and monochrome conditions
is_dark, is_grayscale_local, avg_brightness = analyze_image_conditions(temp_path)
is_monochrome_detected = bool(result.get("is_monochrome_detected", False))
is_grayscale = is_monochrome_detected
# Check for Trap image (Solusi 3)
is_trap = 1 if "trap" in file.filename.lower() else 0
# Get user's current trust score
trust_score = 50
if username != "__guest__":
conn = database.get_connection()
user_row = conn.execute("SELECT trust_score FROM users WHERE username=?", (username,)).fetchone()
trust_score = user_row["trust_score"] if user_row else 50
conn.close()
# --- SINKRONISASI AKURASI GAMBAR TUNGGAL (Poin 2) ---
# Coba tebak ground truth (REAL/AI) dari nama file (misal: real11.jpg, fake4.jpg)
prediction_label = "AI" if is_ai else "REAL"
inferred_label = None
fn_lower = file.filename.lower()
if "real" in fn_lower:
inferred_label = "REAL"
elif "fake" in fn_lower or "ai" in fn_lower:
inferred_label = "AI"
if inferred_label and username != "__guest__":
is_mismatch = 1 if prediction_label != inferred_label else 0
database.save_test_result(
None, username, file.filename, inferred_label,
prediction_label, accuracy, is_mismatch
)
# Simpan data pembelajaran mismatch jika tebakan salah
if is_mismatch:
database.save_learning_data(
username, file.filename, prediction_label, inferred_label, accuracy,
source="single_mismatch"
)
return {
"status": "success",
"filename": file.filename,
"feedback_id": uid,
"type": "image",
"file_size": f"{file_size/(1024*1024):.2f} MB",
"source": source,
"is_ai": is_ai,
"accuracy": accuracy,
"date": time.strftime("%Y-%m-%d %H:%M:%S"),
"similarity": similarity,
"is_outlier": bool(is_outlier),
"is_trap": bool(is_trap),
"trust_score": trust_score,
"is_dark": is_dark,
"is_grayscale": is_grayscale,
"is_monochrome_detected": is_monochrome_detected,
"avg_brightness": avg_brightness,
"forensic_analysis_logs": result.get("forensic_analysis_logs", {})
}
finally:
if os.path.exists(temp_path): os.remove(temp_path)
def scan_single_image(file_bytes, filename):
resp = httpx.post(HF_API_URL, files={"file": (filename, file_bytes, "image/jpeg")}, timeout=30)
if resp.status_code != 200:
return None
return resp.json()
@app.post("/api/batch-scan")
async def api_batch_scan(files: list[UploadFile] = File(...), username: str = Form(...), labels: str = Form("[]")):
import json as json_mod
try:
parsed_labels = json_mod.loads(labels)
except:
parsed_labels = []
results = []
saved_bytes = {}
for idx, file in enumerate(files):
ext = file.filename.lower().split('.')[-1]
if ext not in ('png', 'jpg', 'jpeg', 'webp'):
continue
folder_label = None
# Safe lookup in dictionary map or fallback to list
if isinstance(parsed_labels, dict):
# Try exact match, then fallback to case-insensitive match
folder_label = parsed_labels.get(file.filename)
if not folder_label:
# Extract just the base filename in case of relative path difference
base_filename = file.filename.replace("\\", "/").split("/")[-1]
for k, v in parsed_labels.items():
k_base = k.replace("\\", "/").split("/")[-1]
if k_base.lower() == base_filename.lower():
folder_label = v
break
elif isinstance(parsed_labels, list) and idx < len(parsed_labels):
folder_label = parsed_labels[idx]
# Standardize folder label to uppercase
if folder_label:
folder_label_upper = str(folder_label).upper()
folder_label = "AI" if folder_label_upper in ("FAKE", "AI") else "REAL"
bytes_data = await file.read()
try:
resp = httpx.post(HF_API_URL, files={"file": (file.filename, bytes_data, "image/jpeg")}, timeout=30)
if resp.status_code != 200:
results.append({"filename": file.filename, "folder_label": folder_label, "error": "Gagal scan"})
continue
result = resp.json()
prediction = result.get("prediction", "REAL")
confidence_str = result.get("confidence", "0.0%")
try:
if isinstance(confidence_str, str):
confidence_val = float(confidence_str.replace("%", "").strip()) / 100.0
else:
confidence_val = float(confidence_str)
except Exception:
confidence_val = 0.85 # fallback
prediction_label = "AI" if prediction.upper() in ("AI", "FAKE") else "REAL"
confidence_pct = round(confidence_val * 100, 1)
is_mismatch = 0
if folder_label:
expected = "AI" if folder_label.upper() in ("FAKE", "AI") else "REAL"
if prediction_label != expected:
is_mismatch = 1
results.append({
"filename": file.filename,
"folder_label": folder_label,
"prediction": prediction_label,
"confidence": confidence_pct,
"is_mismatch": is_mismatch
})
if is_mismatch and folder_label:
uid = uuid.uuid4().hex[:8]
safe_name = file.filename.replace("\\", "/").split("/")[-1]
pending_path = f"{PENDING_DIR}/{uid}_{safe_name}"
with open(pending_path, "wb") as pf:
pf.write(bytes_data)
saved_bytes[file.filename] = uid
results[-1]["feedback_id"] = uid
else:
results[-1]["feedback_id"] = ""
except Exception as e:
results.append({"filename": file.filename, "folder_label": folder_label, "error": str(e)})
total = len(results)
has_labels = any(r.get("folder_label") is not None for r in results)
if has_labels:
mismatches = [r for r in results if r.get("is_mismatch")]
mismatch_count = len(mismatches)
correct_count = total - mismatch_count
accuracy = round((correct_count / total * 100), 1) if total > 0 else 0
else:
mismatch_count = 0
correct_count = 0
accuracy = -1.0
batch_id = database.create_test_batch(username, total, correct_count, mismatch_count, accuracy)
for r in results:
database.save_test_result(
batch_id, username, r["filename"], r.get("folder_label"),
r.get("prediction", "ERROR"), r.get("confidence", 0.0), r.get("is_mismatch", 0)
)
if r.get("is_mismatch") and r.get("folder_label"):
expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL"
database.save_learning_data(
username, r["filename"], r.get("prediction", "ERROR"), expected, r.get("confidence", 0.0),
source="batch_mismatch"
)
fid = r.get("feedback_id", "")
if fid:
safe_name = r['filename'].replace("\\", "/").split("/")[-1]
pending_file = f"{PENDING_DIR}/{fid}_{safe_name}"
target_dir = f"{FEEDBACK_DIR}/real" if expected == "REAL" else f"{FEEDBACK_DIR}/fake"
target_path = f"{target_dir}/{safe_name}"
if os.path.exists(pending_file):
os.makedirs(target_dir, exist_ok=True)
shutil.move(pending_file, target_path)
return {
"batch_id": batch_id,
"total": total,
"correct": correct_count,
"wrong": mismatch_count,
"accuracy": accuracy,
"results": results,
"needs_confirmation": mismatch_count if 1 <= mismatch_count <= 5 else 0
}
@app.post("/api/batch-confirm")
def api_batch_confirm(data: dict):
batch_id = data.get("batch_id")
corrections = data.get("corrections", [])
results = database.get_test_results_by_batch(batch_id)
for corr in corrections:
idx = corr.get("index")
user_answer = corr.get("user_answer")
if idx < len(results):
r = results[idx]
corrected_label = user_answer.upper()
original_prediction = r["prediction"]
confidence = r["confidence"]
database.update_test_result_correction(r["id"], corrected_label, corrected_label)
if original_prediction != corrected_label:
database.save_learning_data(
r["username"], r["filename"],
original_prediction, corrected_label, confidence
)
results = database.get_test_results_by_batch(batch_id)
total_with_label = 0
correct = 0
for r in results:
if not r["folder_label"]:
continue
total_with_label += 1
final_label = r.get("corrected_label") or r["prediction"]
expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL"
if final_label == expected:
correct += 1
wrong = total_with_label - correct
accuracy = round((correct / total_with_label * 100), 1) if total_with_label > 0 else 0
return {"status": "success", "total": total_with_label, "correct": correct, "wrong": wrong, "accuracy": accuracy}
def forward_feedback_to_hf_space(file_path: str, filename: str, correct_label: str):
if not os.path.exists(file_path):
return
try:
hf_label = "AI" if correct_label.upper() in ("FAKE", "AI") else "REAL"
with open(file_path, "rb") as f:
files = {"file": (filename, f, "image/jpeg")}
data = {"correct_label": hf_label}
with httpx.Client() as client:
res = client.post("https://alstears-ai-forensic-detector.hf.space/save-feedback", files=files, data=data, timeout=15.0)
print(f"Feedback successfully forwarded to Hugging Face space. Status: {res.status_code}, Response: {res.text}")
except Exception as e:
print(f"Error forwarding feedback to Hugging Face space: {e}")
@app.post("/api/correction-single")
def api_correction_single(data: dict):
username = data.get("username")
filename = data.get("filename")
original_prediction = data.get("original_prediction")
correct_label = data.get("correct_label")
confidence = data.get("confidence", 0)
feedback_id = data.get("feedback_id")
# Standardize correct_label to uppercase
correct_label = "AI" if str(correct_label).upper() in ("FAKE", "AI") else "REAL"
original_prediction = "AI" if str(original_prediction).upper() in ("FAKE", "AI") else "REAL"
# Determine if it is a trap image and adjust trust score
is_trap = "trap" in filename.lower()
trap_correct = False
trust_change = 0
new_trust = 50
if username == "__guest__":
if feedback_id:
safe_name = filename.replace("\\", "/").split("/")[-1]
pending_file = f"{PENDING_DIR}/{feedback_id}_{safe_name}"
target_dir = f"{FEEDBACK_DIR}/real" if correct_label == "REAL" else f"{FEEDBACK_DIR}/fake"
target_path = f"{target_dir}/{safe_name}"
if os.path.exists(pending_file):
os.makedirs(target_dir, exist_ok=True)
save_compressed_image(pending_file, target_path)
# Forward feedback to HF Space
forward_feedback_to_hf_space(pending_file, safe_name, correct_label)
os.remove(pending_file)
return {
"status": "success",
"is_trap": is_trap,
"trap_correct": trap_correct,
"trust_change": trust_change,
"new_trust": new_trust
}
if is_trap:
if "trap_real" in filename.lower():
true_label = "REAL"
elif "trap_ai" in filename.lower() or "trap_fake" in filename.lower():
true_label = "AI"
else:
true_label = "REAL" if original_prediction == "AI" else "AI"
if correct_label == true_label:
trap_correct = True
trust_change = 5
else:
trap_correct = False
trust_change = -15
conn = database.get_connection()
user_row = conn.execute("SELECT trust_score FROM users WHERE username=?", (username,)).fetchone()
if user_row:
current_trust = user_row["trust_score"] if user_row["trust_score"] is not None else 50
new_trust = max(0, min(100, current_trust + trust_change))
conn.execute("UPDATE users SET trust_score=? WHERE username=?", (new_trust, username))
conn.commit()
conn.close()
# Save to learning data for retraining
database.save_learning_data(username, filename, original_prediction, correct_label, confidence)
# Update or insert into test_results
conn = database.get_connection()
existing = conn.execute("SELECT id FROM test_results WHERE username=? AND filename=? AND batch_id IS NULL",
(username, filename)).fetchone()
is_mismatch = 1 if original_prediction != correct_label else 0
if existing:
conn.execute("UPDATE test_results SET folder_label=?, prediction=?, confidence=?, is_mismatch=?, corrected_label=? WHERE id=?",
(correct_label, original_prediction, confidence, is_mismatch, correct_label, existing["id"]))
conn.commit()
else:
database.save_test_result(None, username, filename, correct_label, original_prediction, confidence, is_mismatch)
conn.close()
if feedback_id:
safe_name = filename.replace("\\", "/").split("/")[-1]
pending_file = f"{PENDING_DIR}/{feedback_id}_{safe_name}"
target_dir = f"{FEEDBACK_DIR}/real" if correct_label == "REAL" else f"{FEEDBACK_DIR}/fake"
target_path = f"{target_dir}/{safe_name}"
if os.path.exists(pending_file):
os.makedirs(target_dir, exist_ok=True)
save_compressed_image(pending_file, target_path)
# Forward feedback to HF Space
forward_feedback_to_hf_space(pending_file, safe_name, correct_label)
os.remove(pending_file)
return {
"status": "success",
"is_trap": is_trap,
"trap_correct": trap_correct,
"trust_change": trust_change,
"new_trust": new_trust
}
@app.post("/save-feedback")
async def save_feedback_direct(file: UploadFile = File(...), correct_label: str = Form(...)):
# Standardize correct label to folder target
folder = "real" if correct_label.lower() in ("real", "true", "asli") else "fake"
target_dir = os.path.join(FEEDBACK_DIR, folder)
os.makedirs(target_dir, exist_ok=True)
safe_name = file.filename.replace("\\", "/").split("/")[-1]
target_path = os.path.join(target_dir, safe_name)
# Save uploaded file content directly
contents = await file.read()
with open(target_path, "wb") as f:
f.write(contents)
# Forward feedback to Hugging Face space
try:
hf_label = "AI" if folder == "fake" else "REAL"
files = {"file": (safe_name, contents, "image/jpeg")}
data_payload = {"correct_label": hf_label}
async with httpx.AsyncClient() as client:
res = await client.post("https://alstears-ai-forensic-detector.hf.space/save-feedback", files=files, data=data_payload, timeout=15.0)
print(f"Direct feedback forwarded to HF. Status: {res.status_code}")
except Exception as e:
print(f"Failed to forward direct feedback to HF: {e}")
return {
"status": "saved",
"path": f"feedback/{folder}/{safe_name}"
}
@app.get("/api/history/{username}")
def api_get_history(username: str):
return {"history": database.get_user_history(username)}
@app.get("/api/clear-history")
def api_clear_history():
database.clear_all_history()
return {"status": "success", "message": "Semua history berhasil dihapus"}
@app.get("/api/accuracy-report")
def api_accuracy_report(username: str = None, filter: str = "all"):
import datetime
import traceback
try:
now = datetime.datetime.now()
# Calculate time threshold
threshold_str = None
if filter == "today":
threshold_str = now.strftime("%Y-%m-%d")
elif filter == "week":
threshold_str = (now - datetime.timedelta(days=7)).isoformat()
elif filter == "month":
threshold_str = (now - datetime.timedelta(days=30)).isoformat()
conn = database.get_connection()
# Fetch test results with time filter
query_results = "SELECT * FROM test_results"
params_results = []
conditions = []
if username:
conditions.append("username = ?")
params_results.append(username)
if threshold_str:
conditions.append("scan_date >= ?")
params_results.append(threshold_str)
if conditions:
query_results += " WHERE " + " AND ".join(conditions)
rows = conn.execute(query_results, params_results).fetchall()
# Compute Confusion Matrix, Failures, and Confidence Distributions
tp = 0
fp = 0
fn = 0
tn = 0
real_conf_buckets = [0, 0, 0, 0, 0] # 50-60, 60-70, 70-80, 80-90, 90-100
ai_conf_buckets = [0, 0, 0, 0, 0]
failures = []
for r in rows:
if not r["folder_label"]:
continue
expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL"
final_pred = r["corrected_label"] or r["prediction"] or "REAL"
# Safe None check for confidence
confidence = float(r["confidence"]) if r["confidence"] is not None else 0.0
if expected == "AI" and final_pred == "AI":
tp += 1
elif expected == "REAL" and final_pred == "AI":
fp += 1
elif expected == "AI" and final_pred == "REAL":
fn += 1
elif expected == "REAL" and final_pred == "REAL":
tn += 1
# Add mismatch (prediction failure) to failure log
if final_pred != expected:
failures.append({
"filename": r["filename"] or "Unknown File",
"expected": expected,
"prediction": r["prediction"] or "REAL",
"final_pred": final_pred,
"confidence": confidence,
"date": r["scan_date"][:19].replace("T", " ") if r["scan_date"] else "-"
})
bucket_idx = min(int((confidence - 50) / 10), 4)
if bucket_idx >= 0:
if final_pred == "AI":
ai_conf_buckets[bucket_idx] += 1
else:
real_conf_buckets[bucket_idx] += 1
# Calculate global metrics
total = tp + fp + fn + tn
correct = tp + tn
wrong = fp + fn
accuracy = round((correct / total * 100), 1) if total > 0 else 0.0
# Calculate Advanced ML metrics
precision = round((tp / (tp + fp) * 100), 1) if (tp + fp) > 0 else 0.0
recall = round((tp / (tp + fn) * 100), 1) if (tp + fn) > 0 else 0.0
f1_score = round((2 * (precision * recall) / (precision + recall)), 1) if (precision + recall) > 0 else 0.0
# Count scans with time filter
q_scan = "SELECT COUNT(*) as cnt FROM scan_history"
p_scan = []
if username or threshold_str:
conds = []
if username:
conds.append("username = ?")
p_scan.append(username)
if threshold_str:
conds.append("scan_date >= ?")
p_scan.append(threshold_str)
q_scan += " WHERE " + " AND ".join(conds)
scan_count = conn.execute(q_scan, p_scan).fetchone()["cnt"]
# Count batch images with time filter
q_batch = "SELECT COALESCE(SUM(total_images), 0) as cnt FROM test_batches"
p_batch = []
if username or threshold_str:
conds = []
if username:
conds.append("username = ?")
p_batch.append(username)
if threshold_str:
conds.append("test_date >= ?")
p_batch.append(threshold_str)
q_batch += " WHERE " + " AND ".join(conds)
batch_images = conn.execute(q_batch, p_batch).fetchone()["cnt"]
# Count other parameters
learning_count = database.get_learning_data_count()
q_batches = "SELECT * FROM test_batches"
p_batches = []
conds_b = []
if username:
conds_b.append("username = ?")
p_batches.append(username)
if threshold_str:
conds_b.append("test_date >= ?")
p_batches.append(threshold_str)
if conds_b:
q_batches += " WHERE " + " AND ".join(conds_b)
q_batches += " ORDER BY id DESC"
batches_rows = conn.execute(q_batches, p_batches).fetchall()
batches = [dict(row) for row in batches_rows]
conn.close()
return {
"stats": {
"total": total,
"correct": correct,
"wrong": wrong,
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1_score
},
"confusion_matrix": {
"tp": tp,
"fp": fp,
"fn": fn,
"tn": tn
},
"confidence_distribution": {
"buckets": ["50-60%", "60-70%", "70-80%", "80-90%", "90-100%"],
"real": real_conf_buckets,
"ai": ai_conf_buckets
},
"failures": failures[:15], # limit to latest 15 failures
"batches": batches,
"learning_data_count": learning_count,
"scan_count": scan_count,
"batch_images": batch_images
}
except Exception as e:
print("EXCEPTION DETECTED IN ACCURACY REPORT:")
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/download-feedback")
def api_download_feedback(background: BackgroundTasks, username: str = ""):
if (username or "").lower() != "sandikad":
raise HTTPException(status_code=403, detail="Akses ditolak. Hanya user Sandikad yang dapat mendownload feedback.")
zip_path = os.path.join(BASE_DIR, f"feedback_{time.strftime('%Y%m%d_%H%M%S')}.zip")
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(FEEDBACK_DIR):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, FEEDBACK_DIR)
zf.write(file_path, arcname)
background.add_task(os.remove, zip_path)
return FileResponse(zip_path, media_type="application/zip",
filename=os.path.basename(zip_path))