ai-detector-backend / database.py
Alstears's picture
Upload 9 files
6f6024d verified
import sqlite3
import json
import os
import hashlib
from datetime import datetime
DATASET_CONFIG = {
"dataset_dir": "./dataset_ai_vs_real",
"real_dir": "./dataset_ai_vs_real/real",
"ai_dir": "./dataset_ai_vs_real/fake"
}
DB_NAME = "app_database.db"
def get_connection():
conn = sqlite3.connect(DB_NAME)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_connection()
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
name TEXT NOT NULL,
join_date TEXT,
trust_score INTEGER DEFAULT 50)''')
try:
cursor.execute("ALTER TABLE users ADD COLUMN trust_score INTEGER DEFAULT 50")
except sqlite3.OperationalError:
pass
cursor.execute('''CREATE TABLE IF NOT EXISTS scan_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
filename TEXT,
file_type TEXT,
file_size TEXT,
source TEXT,
is_ai INTEGER,
accuracy REAL,
scan_date TEXT)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS test_batches (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
total_images INTEGER,
correct_count INTEGER,
wrong_count INTEGER,
accuracy REAL,
test_date TEXT)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS test_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch_id INTEGER,
username TEXT NOT NULL,
filename TEXT,
folder_label TEXT,
prediction TEXT,
confidence REAL,
is_mismatch INTEGER,
user_correction TEXT,
corrected_label TEXT,
scan_date TEXT)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS learning_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
filename TEXT,
original_prediction TEXT,
correct_label TEXT,
confidence REAL,
source TEXT,
scan_date TEXT)''')
conn.commit()
conn.close()
for path in DATASET_CONFIG.values():
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
def hash_password(password):
return hashlib.sha256(password.encode()).hexdigest()
def register_user(username, password, name):
conn = get_connection()
try:
conn.execute("INSERT INTO users (username, password, name, join_date) VALUES (?, ?, ?, ?)",
(username, hash_password(password), name, datetime.now().isoformat()))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
finally:
conn.close()
def login_user(username, password):
conn = get_connection()
user = conn.execute("SELECT * FROM users WHERE username=? AND password=?",
(username, hash_password(password))).fetchone()
conn.close()
return dict(user) if user else None
def add_scan_history(username, filename, file_type, file_size, source, is_ai, accuracy):
conn = get_connection()
conn.execute('''INSERT INTO scan_history (username, filename, file_type, file_size, source, is_ai, accuracy, scan_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(username, filename, file_type, file_size, source, is_ai, accuracy, datetime.now().isoformat()))
conn.commit()
conn.close()
def get_user_history(username):
conn = get_connection()
scan_rows = conn.execute("SELECT * FROM scan_history WHERE username=? ORDER BY id DESC LIMIT 20", (username,)).fetchall()
batch_rows = conn.execute('''SELECT tb.id, tb.total_images, tb.correct_count, tb.wrong_count, tb.accuracy, tb.test_date
FROM test_batches tb WHERE tb.username=? ORDER BY tb.id DESC LIMIT 20''', (username,)).fetchall()
conn.close()
history = []
for row in scan_rows:
r = dict(row)
r["_type"] = "scan"
history.append(r)
for row in batch_rows:
r = dict(row)
r["_type"] = "batch"
r["filename"] = f"Batch #{r['id']} ({r['total_images']} gambar)"
r["file_type"] = "Batch"
r["file_size"] = "-"
r["source"] = f"{r['correct_count']} benar / {r['wrong_count']} salah"
r["is_ai"] = 0
r["accuracy"] = r["accuracy"]
r["scan_date"] = r["test_date"]
history.append(r)
history.sort(key=lambda x: x.get("scan_date") or "", reverse=True)
return history[:20]
def clear_all_history():
conn = get_connection()
conn.execute("DELETE FROM scan_history")
conn.execute("DELETE FROM test_batches")
conn.execute("DELETE FROM test_results")
conn.execute("DELETE FROM learning_data")
conn.commit()
conn.close()
def create_test_batch(username, total_images, correct_count, wrong_count, accuracy):
conn = get_connection()
cur = conn.execute('''INSERT INTO test_batches (username, total_images, correct_count, wrong_count, accuracy, test_date)
VALUES (?, ?, ?, ?, ?, ?)''',
(username, total_images, correct_count, wrong_count, accuracy, datetime.now().isoformat()))
conn.commit()
batch_id = cur.lastrowid
conn.close()
return batch_id
def save_test_result(batch_id, username, filename, folder_label, prediction, confidence, is_mismatch):
conn = get_connection()
conn.execute('''INSERT INTO test_results (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, scan_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, datetime.now().isoformat()))
conn.commit()
conn.close()
def update_test_result_correction(result_id, user_correction, corrected_label):
conn = get_connection()
conn.execute("UPDATE test_results SET user_correction=?, corrected_label=? WHERE id=?",
(user_correction, corrected_label, result_id))
conn.commit()
conn.close()
def save_learning_data(username, filename, original_prediction, correct_label, confidence, source="user_correction"):
conn = get_connection()
conn.execute('''INSERT INTO learning_data (username, filename, original_prediction, correct_label, confidence, source, scan_date)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(username, filename, original_prediction, correct_label, confidence, source, datetime.now().isoformat()))
conn.commit()
conn.close()
def get_all_test_batches(username=None):
conn = get_connection()
if username:
rows = conn.execute("SELECT * FROM test_batches WHERE username=? ORDER BY id DESC", (username,)).fetchall()
else:
rows = conn.execute("SELECT * FROM test_batches ORDER BY id DESC").fetchall()
conn.close()
return [dict(row) for row in rows]
def get_test_results_by_batch(batch_id):
conn = get_connection()
rows = conn.execute("SELECT * FROM test_results WHERE batch_id=?", (batch_id,)).fetchall()
conn.close()
return [dict(row) for row in rows]
def get_overall_accuracy(username=None):
conn = get_connection()
total = 0
correct = 0
if username:
rows = conn.execute("SELECT * FROM test_results WHERE username=?", (username,)).fetchall()
ld_rows = conn.execute("SELECT * FROM learning_data WHERE username=? AND source='user_correction'", (username,)).fetchall()
else:
rows = conn.execute("SELECT * FROM test_results").fetchall()
ld_rows = conn.execute("SELECT * FROM learning_data WHERE source='user_correction'").fetchall()
conn.close()
for r in rows:
if not r["folder_label"]:
continue
total += 1
final_label = r["corrected_label"] or r["prediction"]
expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL"
if final_label == expected:
correct += 1
for r in ld_rows:
total += 1
if r["original_prediction"] == r["correct_label"]:
correct += 1
wrong = total - correct
acc = round((correct / total * 100), 1) if total > 0 else 0
return {"total": total, "correct": correct, "wrong": wrong, "accuracy": acc}
def get_learning_data_count():
conn = get_connection()
row = conn.execute("SELECT COUNT(*) as cnt FROM learning_data").fetchone()
conn.close()
return row["cnt"] or 0
def migrate_existing_learning_data():
conn = get_connection()
rows = conn.execute('''SELECT tr.* FROM test_results tr
LEFT JOIN learning_data ld ON tr.filename = ld.filename AND tr.username = ld.username
WHERE tr.is_mismatch = 1 AND tr.folder_label IS NOT NULL AND ld.id IS NULL''').fetchall()
for r in rows:
expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL"
conn.execute('''INSERT INTO learning_data (username, filename, original_prediction, correct_label, confidence, source, scan_date)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(r["username"], r["filename"], r["prediction"], expected, r["confidence"], "batch_mismatch", r["scan_date"]))
conn.commit()
conn.close()
def sync_scan_history_to_test_results():
conn = get_connection()
# Cari seluruh scan di scan_history yang belum ada di test_results (batch_id IS NULL)
scans = conn.execute('''
SELECT sh.username, sh.filename, sh.is_ai, sh.accuracy, sh.scan_date
FROM scan_history sh
LEFT JOIN test_results tr ON sh.username = tr.username AND sh.filename = tr.filename AND tr.batch_id IS NULL
WHERE tr.id IS NULL
''').fetchall()
for s in scans:
prediction_label = "AI" if s["is_ai"] == 1 else "REAL"
# Inferred label
inferred_label = None
fn_lower = s["filename"].lower()
if "real" in fn_lower:
inferred_label = "REAL"
elif "fake" in fn_lower or "ai" in fn_lower:
inferred_label = "AI"
else:
inferred_label = prediction_label # Default correct
is_mismatch = 1 if prediction_label != inferred_label else 0
# Cek jika ada user_correction di learning_data
ld = conn.execute("SELECT correct_label FROM learning_data WHERE username=? AND filename=? AND source='user_correction' ORDER BY id DESC LIMIT 1",
(s["username"], s["filename"])).fetchone()
corrected_label = None
if ld:
corrected_label = "AI" if ld["correct_label"].upper() in ("FAKE", "AI") else "REAL"
is_mismatch = 1 if prediction_label != corrected_label else 0
inferred_label = corrected_label
conn.execute('''INSERT INTO test_results (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, corrected_label, scan_date)
VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)''',
(s["username"], s["filename"], inferred_label, prediction_label, s["accuracy"], is_mismatch, corrected_label, s["scan_date"]))
conn.commit()
conn.close()