Spaces:
Running
Running
| import sqlite3 | |
| import json | |
| import os | |
| import hashlib | |
| from datetime import datetime | |
| DATASET_CONFIG = { | |
| "dataset_dir": "./dataset_ai_vs_real", | |
| "real_dir": "./dataset_ai_vs_real/real", | |
| "ai_dir": "./dataset_ai_vs_real/fake" | |
| } | |
| DB_NAME = "app_database.db" | |
| def get_connection(): | |
| conn = sqlite3.connect(DB_NAME) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db(): | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute('''CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT UNIQUE NOT NULL, | |
| password TEXT NOT NULL, | |
| name TEXT NOT NULL, | |
| join_date TEXT, | |
| trust_score INTEGER DEFAULT 50)''') | |
| try: | |
| cursor.execute("ALTER TABLE users ADD COLUMN trust_score INTEGER DEFAULT 50") | |
| except sqlite3.OperationalError: | |
| pass | |
| cursor.execute('''CREATE TABLE IF NOT EXISTS scan_history ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT NOT NULL, | |
| filename TEXT, | |
| file_type TEXT, | |
| file_size TEXT, | |
| source TEXT, | |
| is_ai INTEGER, | |
| accuracy REAL, | |
| scan_date TEXT)''') | |
| cursor.execute('''CREATE TABLE IF NOT EXISTS test_batches ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT NOT NULL, | |
| total_images INTEGER, | |
| correct_count INTEGER, | |
| wrong_count INTEGER, | |
| accuracy REAL, | |
| test_date TEXT)''') | |
| cursor.execute('''CREATE TABLE IF NOT EXISTS test_results ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| batch_id INTEGER, | |
| username TEXT NOT NULL, | |
| filename TEXT, | |
| folder_label TEXT, | |
| prediction TEXT, | |
| confidence REAL, | |
| is_mismatch INTEGER, | |
| user_correction TEXT, | |
| corrected_label TEXT, | |
| scan_date TEXT)''') | |
| cursor.execute('''CREATE TABLE IF NOT EXISTS learning_data ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT NOT NULL, | |
| filename TEXT, | |
| original_prediction TEXT, | |
| correct_label TEXT, | |
| confidence REAL, | |
| source TEXT, | |
| scan_date TEXT)''') | |
| conn.commit() | |
| conn.close() | |
| for path in DATASET_CONFIG.values(): | |
| if not os.path.exists(path): | |
| os.makedirs(path, exist_ok=True) | |
| def hash_password(password): | |
| return hashlib.sha256(password.encode()).hexdigest() | |
| def register_user(username, password, name): | |
| conn = get_connection() | |
| try: | |
| conn.execute("INSERT INTO users (username, password, name, join_date) VALUES (?, ?, ?, ?)", | |
| (username, hash_password(password), name, datetime.now().isoformat())) | |
| conn.commit() | |
| return True | |
| except sqlite3.IntegrityError: | |
| return False | |
| finally: | |
| conn.close() | |
| def login_user(username, password): | |
| conn = get_connection() | |
| user = conn.execute("SELECT * FROM users WHERE username=? AND password=?", | |
| (username, hash_password(password))).fetchone() | |
| conn.close() | |
| return dict(user) if user else None | |
| def add_scan_history(username, filename, file_type, file_size, source, is_ai, accuracy): | |
| conn = get_connection() | |
| conn.execute('''INSERT INTO scan_history (username, filename, file_type, file_size, source, is_ai, accuracy, scan_date) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', | |
| (username, filename, file_type, file_size, source, is_ai, accuracy, datetime.now().isoformat())) | |
| conn.commit() | |
| conn.close() | |
| def get_user_history(username): | |
| conn = get_connection() | |
| scan_rows = conn.execute("SELECT * FROM scan_history WHERE username=? ORDER BY id DESC LIMIT 20", (username,)).fetchall() | |
| batch_rows = conn.execute('''SELECT tb.id, tb.total_images, tb.correct_count, tb.wrong_count, tb.accuracy, tb.test_date | |
| FROM test_batches tb WHERE tb.username=? ORDER BY tb.id DESC LIMIT 20''', (username,)).fetchall() | |
| conn.close() | |
| history = [] | |
| for row in scan_rows: | |
| r = dict(row) | |
| r["_type"] = "scan" | |
| history.append(r) | |
| for row in batch_rows: | |
| r = dict(row) | |
| r["_type"] = "batch" | |
| r["filename"] = f"Batch #{r['id']} ({r['total_images']} gambar)" | |
| r["file_type"] = "Batch" | |
| r["file_size"] = "-" | |
| r["source"] = f"{r['correct_count']} benar / {r['wrong_count']} salah" | |
| r["is_ai"] = 0 | |
| r["accuracy"] = r["accuracy"] | |
| r["scan_date"] = r["test_date"] | |
| history.append(r) | |
| history.sort(key=lambda x: x.get("scan_date") or "", reverse=True) | |
| return history[:20] | |
| def clear_all_history(): | |
| conn = get_connection() | |
| conn.execute("DELETE FROM scan_history") | |
| conn.execute("DELETE FROM test_batches") | |
| conn.execute("DELETE FROM test_results") | |
| conn.execute("DELETE FROM learning_data") | |
| conn.commit() | |
| conn.close() | |
| def create_test_batch(username, total_images, correct_count, wrong_count, accuracy): | |
| conn = get_connection() | |
| cur = conn.execute('''INSERT INTO test_batches (username, total_images, correct_count, wrong_count, accuracy, test_date) | |
| VALUES (?, ?, ?, ?, ?, ?)''', | |
| (username, total_images, correct_count, wrong_count, accuracy, datetime.now().isoformat())) | |
| conn.commit() | |
| batch_id = cur.lastrowid | |
| conn.close() | |
| return batch_id | |
| def save_test_result(batch_id, username, filename, folder_label, prediction, confidence, is_mismatch): | |
| conn = get_connection() | |
| conn.execute('''INSERT INTO test_results (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, scan_date) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', | |
| (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, datetime.now().isoformat())) | |
| conn.commit() | |
| conn.close() | |
| def update_test_result_correction(result_id, user_correction, corrected_label): | |
| conn = get_connection() | |
| conn.execute("UPDATE test_results SET user_correction=?, corrected_label=? WHERE id=?", | |
| (user_correction, corrected_label, result_id)) | |
| conn.commit() | |
| conn.close() | |
| def save_learning_data(username, filename, original_prediction, correct_label, confidence, source="user_correction"): | |
| conn = get_connection() | |
| conn.execute('''INSERT INTO learning_data (username, filename, original_prediction, correct_label, confidence, source, scan_date) | |
| VALUES (?, ?, ?, ?, ?, ?, ?)''', | |
| (username, filename, original_prediction, correct_label, confidence, source, datetime.now().isoformat())) | |
| conn.commit() | |
| conn.close() | |
| def get_all_test_batches(username=None): | |
| conn = get_connection() | |
| if username: | |
| rows = conn.execute("SELECT * FROM test_batches WHERE username=? ORDER BY id DESC", (username,)).fetchall() | |
| else: | |
| rows = conn.execute("SELECT * FROM test_batches ORDER BY id DESC").fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| def get_test_results_by_batch(batch_id): | |
| conn = get_connection() | |
| rows = conn.execute("SELECT * FROM test_results WHERE batch_id=?", (batch_id,)).fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| def get_overall_accuracy(username=None): | |
| conn = get_connection() | |
| total = 0 | |
| correct = 0 | |
| if username: | |
| rows = conn.execute("SELECT * FROM test_results WHERE username=?", (username,)).fetchall() | |
| ld_rows = conn.execute("SELECT * FROM learning_data WHERE username=? AND source='user_correction'", (username,)).fetchall() | |
| else: | |
| rows = conn.execute("SELECT * FROM test_results").fetchall() | |
| ld_rows = conn.execute("SELECT * FROM learning_data WHERE source='user_correction'").fetchall() | |
| conn.close() | |
| for r in rows: | |
| if not r["folder_label"]: | |
| continue | |
| total += 1 | |
| final_label = r["corrected_label"] or r["prediction"] | |
| expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL" | |
| if final_label == expected: | |
| correct += 1 | |
| for r in ld_rows: | |
| total += 1 | |
| if r["original_prediction"] == r["correct_label"]: | |
| correct += 1 | |
| wrong = total - correct | |
| acc = round((correct / total * 100), 1) if total > 0 else 0 | |
| return {"total": total, "correct": correct, "wrong": wrong, "accuracy": acc} | |
| def get_learning_data_count(): | |
| conn = get_connection() | |
| row = conn.execute("SELECT COUNT(*) as cnt FROM learning_data").fetchone() | |
| conn.close() | |
| return row["cnt"] or 0 | |
| def migrate_existing_learning_data(): | |
| conn = get_connection() | |
| rows = conn.execute('''SELECT tr.* FROM test_results tr | |
| LEFT JOIN learning_data ld ON tr.filename = ld.filename AND tr.username = ld.username | |
| WHERE tr.is_mismatch = 1 AND tr.folder_label IS NOT NULL AND ld.id IS NULL''').fetchall() | |
| for r in rows: | |
| expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL" | |
| conn.execute('''INSERT INTO learning_data (username, filename, original_prediction, correct_label, confidence, source, scan_date) | |
| VALUES (?, ?, ?, ?, ?, ?, ?)''', | |
| (r["username"], r["filename"], r["prediction"], expected, r["confidence"], "batch_mismatch", r["scan_date"])) | |
| conn.commit() | |
| conn.close() | |
| def sync_scan_history_to_test_results(): | |
| conn = get_connection() | |
| # Cari seluruh scan di scan_history yang belum ada di test_results (batch_id IS NULL) | |
| scans = conn.execute(''' | |
| SELECT sh.username, sh.filename, sh.is_ai, sh.accuracy, sh.scan_date | |
| FROM scan_history sh | |
| LEFT JOIN test_results tr ON sh.username = tr.username AND sh.filename = tr.filename AND tr.batch_id IS NULL | |
| WHERE tr.id IS NULL | |
| ''').fetchall() | |
| for s in scans: | |
| prediction_label = "AI" if s["is_ai"] == 1 else "REAL" | |
| # Inferred label | |
| inferred_label = None | |
| fn_lower = s["filename"].lower() | |
| if "real" in fn_lower: | |
| inferred_label = "REAL" | |
| elif "fake" in fn_lower or "ai" in fn_lower: | |
| inferred_label = "AI" | |
| else: | |
| inferred_label = prediction_label # Default correct | |
| is_mismatch = 1 if prediction_label != inferred_label else 0 | |
| # Cek jika ada user_correction di learning_data | |
| ld = conn.execute("SELECT correct_label FROM learning_data WHERE username=? AND filename=? AND source='user_correction' ORDER BY id DESC LIMIT 1", | |
| (s["username"], s["filename"])).fetchone() | |
| corrected_label = None | |
| if ld: | |
| corrected_label = "AI" if ld["correct_label"].upper() in ("FAKE", "AI") else "REAL" | |
| is_mismatch = 1 if prediction_label != corrected_label else 0 | |
| inferred_label = corrected_label | |
| conn.execute('''INSERT INTO test_results (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, corrected_label, scan_date) | |
| VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)''', | |
| (s["username"], s["filename"], inferred_label, prediction_label, s["accuracy"], is_mismatch, corrected_label, s["scan_date"])) | |
| conn.commit() | |
| conn.close() | |