import sqlite3 import json import os import hashlib from datetime import datetime DATASET_CONFIG = { "dataset_dir": "./dataset_ai_vs_real", "real_dir": "./dataset_ai_vs_real/real", "ai_dir": "./dataset_ai_vs_real/fake" } DB_NAME = "app_database.db" def get_connection(): conn = sqlite3.connect(DB_NAME) conn.row_factory = sqlite3.Row return conn def init_db(): conn = get_connection() cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password TEXT NOT NULL, name TEXT NOT NULL, join_date TEXT, trust_score INTEGER DEFAULT 50)''') try: cursor.execute("ALTER TABLE users ADD COLUMN trust_score INTEGER DEFAULT 50") except sqlite3.OperationalError: pass cursor.execute('''CREATE TABLE IF NOT EXISTS scan_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, filename TEXT, file_type TEXT, file_size TEXT, source TEXT, is_ai INTEGER, accuracy REAL, scan_date TEXT)''') cursor.execute('''CREATE TABLE IF NOT EXISTS test_batches ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, total_images INTEGER, correct_count INTEGER, wrong_count INTEGER, accuracy REAL, test_date TEXT)''') cursor.execute('''CREATE TABLE IF NOT EXISTS test_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, batch_id INTEGER, username TEXT NOT NULL, filename TEXT, folder_label TEXT, prediction TEXT, confidence REAL, is_mismatch INTEGER, user_correction TEXT, corrected_label TEXT, scan_date TEXT)''') cursor.execute('''CREATE TABLE IF NOT EXISTS learning_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, filename TEXT, original_prediction TEXT, correct_label TEXT, confidence REAL, source TEXT, scan_date TEXT)''') conn.commit() conn.close() for path in DATASET_CONFIG.values(): if not os.path.exists(path): os.makedirs(path, exist_ok=True) def hash_password(password): return hashlib.sha256(password.encode()).hexdigest() def register_user(username, password, name): conn = get_connection() try: conn.execute("INSERT INTO users (username, password, name, join_date) VALUES (?, ?, ?, ?)", (username, hash_password(password), name, datetime.now().isoformat())) conn.commit() return True except sqlite3.IntegrityError: return False finally: conn.close() def login_user(username, password): conn = get_connection() user = conn.execute("SELECT * FROM users WHERE username=? AND password=?", (username, hash_password(password))).fetchone() conn.close() return dict(user) if user else None def add_scan_history(username, filename, file_type, file_size, source, is_ai, accuracy): conn = get_connection() conn.execute('''INSERT INTO scan_history (username, filename, file_type, file_size, source, is_ai, accuracy, scan_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', (username, filename, file_type, file_size, source, is_ai, accuracy, datetime.now().isoformat())) conn.commit() conn.close() def get_user_history(username): conn = get_connection() scan_rows = conn.execute("SELECT * FROM scan_history WHERE username=? ORDER BY id DESC LIMIT 20", (username,)).fetchall() batch_rows = conn.execute('''SELECT tb.id, tb.total_images, tb.correct_count, tb.wrong_count, tb.accuracy, tb.test_date FROM test_batches tb WHERE tb.username=? ORDER BY tb.id DESC LIMIT 20''', (username,)).fetchall() conn.close() history = [] for row in scan_rows: r = dict(row) r["_type"] = "scan" history.append(r) for row in batch_rows: r = dict(row) r["_type"] = "batch" r["filename"] = f"Batch #{r['id']} ({r['total_images']} gambar)" r["file_type"] = "Batch" r["file_size"] = "-" r["source"] = f"{r['correct_count']} benar / {r['wrong_count']} salah" r["is_ai"] = 0 r["accuracy"] = r["accuracy"] r["scan_date"] = r["test_date"] history.append(r) history.sort(key=lambda x: x.get("scan_date") or "", reverse=True) return history[:20] def clear_all_history(): conn = get_connection() conn.execute("DELETE FROM scan_history") conn.execute("DELETE FROM test_batches") conn.execute("DELETE FROM test_results") conn.execute("DELETE FROM learning_data") conn.commit() conn.close() def create_test_batch(username, total_images, correct_count, wrong_count, accuracy): conn = get_connection() cur = conn.execute('''INSERT INTO test_batches (username, total_images, correct_count, wrong_count, accuracy, test_date) VALUES (?, ?, ?, ?, ?, ?)''', (username, total_images, correct_count, wrong_count, accuracy, datetime.now().isoformat())) conn.commit() batch_id = cur.lastrowid conn.close() return batch_id def save_test_result(batch_id, username, filename, folder_label, prediction, confidence, is_mismatch): conn = get_connection() conn.execute('''INSERT INTO test_results (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, scan_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, datetime.now().isoformat())) conn.commit() conn.close() def update_test_result_correction(result_id, user_correction, corrected_label): conn = get_connection() conn.execute("UPDATE test_results SET user_correction=?, corrected_label=? WHERE id=?", (user_correction, corrected_label, result_id)) conn.commit() conn.close() def save_learning_data(username, filename, original_prediction, correct_label, confidence, source="user_correction"): conn = get_connection() conn.execute('''INSERT INTO learning_data (username, filename, original_prediction, correct_label, confidence, source, scan_date) VALUES (?, ?, ?, ?, ?, ?, ?)''', (username, filename, original_prediction, correct_label, confidence, source, datetime.now().isoformat())) conn.commit() conn.close() def get_all_test_batches(username=None): conn = get_connection() if username: rows = conn.execute("SELECT * FROM test_batches WHERE username=? ORDER BY id DESC", (username,)).fetchall() else: rows = conn.execute("SELECT * FROM test_batches ORDER BY id DESC").fetchall() conn.close() return [dict(row) for row in rows] def get_test_results_by_batch(batch_id): conn = get_connection() rows = conn.execute("SELECT * FROM test_results WHERE batch_id=?", (batch_id,)).fetchall() conn.close() return [dict(row) for row in rows] def get_overall_accuracy(username=None): conn = get_connection() total = 0 correct = 0 if username: rows = conn.execute("SELECT * FROM test_results WHERE username=?", (username,)).fetchall() ld_rows = conn.execute("SELECT * FROM learning_data WHERE username=? AND source='user_correction'", (username,)).fetchall() else: rows = conn.execute("SELECT * FROM test_results").fetchall() ld_rows = conn.execute("SELECT * FROM learning_data WHERE source='user_correction'").fetchall() conn.close() for r in rows: if not r["folder_label"]: continue total += 1 final_label = r["corrected_label"] or r["prediction"] expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL" if final_label == expected: correct += 1 for r in ld_rows: total += 1 if r["original_prediction"] == r["correct_label"]: correct += 1 wrong = total - correct acc = round((correct / total * 100), 1) if total > 0 else 0 return {"total": total, "correct": correct, "wrong": wrong, "accuracy": acc} def get_learning_data_count(): conn = get_connection() row = conn.execute("SELECT COUNT(*) as cnt FROM learning_data").fetchone() conn.close() return row["cnt"] or 0 def migrate_existing_learning_data(): conn = get_connection() rows = conn.execute('''SELECT tr.* FROM test_results tr LEFT JOIN learning_data ld ON tr.filename = ld.filename AND tr.username = ld.username WHERE tr.is_mismatch = 1 AND tr.folder_label IS NOT NULL AND ld.id IS NULL''').fetchall() for r in rows: expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL" conn.execute('''INSERT INTO learning_data (username, filename, original_prediction, correct_label, confidence, source, scan_date) VALUES (?, ?, ?, ?, ?, ?, ?)''', (r["username"], r["filename"], r["prediction"], expected, r["confidence"], "batch_mismatch", r["scan_date"])) conn.commit() conn.close() def sync_scan_history_to_test_results(): conn = get_connection() # Cari seluruh scan di scan_history yang belum ada di test_results (batch_id IS NULL) scans = conn.execute(''' SELECT sh.username, sh.filename, sh.is_ai, sh.accuracy, sh.scan_date FROM scan_history sh LEFT JOIN test_results tr ON sh.username = tr.username AND sh.filename = tr.filename AND tr.batch_id IS NULL WHERE tr.id IS NULL ''').fetchall() for s in scans: prediction_label = "AI" if s["is_ai"] == 1 else "REAL" # Inferred label inferred_label = None fn_lower = s["filename"].lower() if "real" in fn_lower: inferred_label = "REAL" elif "fake" in fn_lower or "ai" in fn_lower: inferred_label = "AI" else: inferred_label = prediction_label # Default correct is_mismatch = 1 if prediction_label != inferred_label else 0 # Cek jika ada user_correction di learning_data ld = conn.execute("SELECT correct_label FROM learning_data WHERE username=? AND filename=? AND source='user_correction' ORDER BY id DESC LIMIT 1", (s["username"], s["filename"])).fetchone() corrected_label = None if ld: corrected_label = "AI" if ld["correct_label"].upper() in ("FAKE", "AI") else "REAL" is_mismatch = 1 if prediction_label != corrected_label else 0 inferred_label = corrected_label conn.execute('''INSERT INTO test_results (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, corrected_label, scan_date) VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)''', (s["username"], s["filename"], inferred_label, prediction_label, s["accuracy"], is_mismatch, corrected_label, s["scan_date"])) conn.commit() conn.close()