Spaces:
Running
Running
File size: 11,974 Bytes
6f6024d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 | import sqlite3
import json
import os
import hashlib
from datetime import datetime
DATASET_CONFIG = {
"dataset_dir": "./dataset_ai_vs_real",
"real_dir": "./dataset_ai_vs_real/real",
"ai_dir": "./dataset_ai_vs_real/fake"
}
DB_NAME = "app_database.db"
def get_connection():
conn = sqlite3.connect(DB_NAME)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_connection()
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
name TEXT NOT NULL,
join_date TEXT,
trust_score INTEGER DEFAULT 50)''')
try:
cursor.execute("ALTER TABLE users ADD COLUMN trust_score INTEGER DEFAULT 50")
except sqlite3.OperationalError:
pass
cursor.execute('''CREATE TABLE IF NOT EXISTS scan_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
filename TEXT,
file_type TEXT,
file_size TEXT,
source TEXT,
is_ai INTEGER,
accuracy REAL,
scan_date TEXT)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS test_batches (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
total_images INTEGER,
correct_count INTEGER,
wrong_count INTEGER,
accuracy REAL,
test_date TEXT)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS test_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch_id INTEGER,
username TEXT NOT NULL,
filename TEXT,
folder_label TEXT,
prediction TEXT,
confidence REAL,
is_mismatch INTEGER,
user_correction TEXT,
corrected_label TEXT,
scan_date TEXT)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS learning_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
filename TEXT,
original_prediction TEXT,
correct_label TEXT,
confidence REAL,
source TEXT,
scan_date TEXT)''')
conn.commit()
conn.close()
for path in DATASET_CONFIG.values():
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
def hash_password(password):
return hashlib.sha256(password.encode()).hexdigest()
def register_user(username, password, name):
conn = get_connection()
try:
conn.execute("INSERT INTO users (username, password, name, join_date) VALUES (?, ?, ?, ?)",
(username, hash_password(password), name, datetime.now().isoformat()))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
finally:
conn.close()
def login_user(username, password):
conn = get_connection()
user = conn.execute("SELECT * FROM users WHERE username=? AND password=?",
(username, hash_password(password))).fetchone()
conn.close()
return dict(user) if user else None
def add_scan_history(username, filename, file_type, file_size, source, is_ai, accuracy):
conn = get_connection()
conn.execute('''INSERT INTO scan_history (username, filename, file_type, file_size, source, is_ai, accuracy, scan_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(username, filename, file_type, file_size, source, is_ai, accuracy, datetime.now().isoformat()))
conn.commit()
conn.close()
def get_user_history(username):
conn = get_connection()
scan_rows = conn.execute("SELECT * FROM scan_history WHERE username=? ORDER BY id DESC LIMIT 20", (username,)).fetchall()
batch_rows = conn.execute('''SELECT tb.id, tb.total_images, tb.correct_count, tb.wrong_count, tb.accuracy, tb.test_date
FROM test_batches tb WHERE tb.username=? ORDER BY tb.id DESC LIMIT 20''', (username,)).fetchall()
conn.close()
history = []
for row in scan_rows:
r = dict(row)
r["_type"] = "scan"
history.append(r)
for row in batch_rows:
r = dict(row)
r["_type"] = "batch"
r["filename"] = f"Batch #{r['id']} ({r['total_images']} gambar)"
r["file_type"] = "Batch"
r["file_size"] = "-"
r["source"] = f"{r['correct_count']} benar / {r['wrong_count']} salah"
r["is_ai"] = 0
r["accuracy"] = r["accuracy"]
r["scan_date"] = r["test_date"]
history.append(r)
history.sort(key=lambda x: x.get("scan_date") or "", reverse=True)
return history[:20]
def clear_all_history():
conn = get_connection()
conn.execute("DELETE FROM scan_history")
conn.execute("DELETE FROM test_batches")
conn.execute("DELETE FROM test_results")
conn.execute("DELETE FROM learning_data")
conn.commit()
conn.close()
def create_test_batch(username, total_images, correct_count, wrong_count, accuracy):
conn = get_connection()
cur = conn.execute('''INSERT INTO test_batches (username, total_images, correct_count, wrong_count, accuracy, test_date)
VALUES (?, ?, ?, ?, ?, ?)''',
(username, total_images, correct_count, wrong_count, accuracy, datetime.now().isoformat()))
conn.commit()
batch_id = cur.lastrowid
conn.close()
return batch_id
def save_test_result(batch_id, username, filename, folder_label, prediction, confidence, is_mismatch):
conn = get_connection()
conn.execute('''INSERT INTO test_results (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, scan_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, datetime.now().isoformat()))
conn.commit()
conn.close()
def update_test_result_correction(result_id, user_correction, corrected_label):
conn = get_connection()
conn.execute("UPDATE test_results SET user_correction=?, corrected_label=? WHERE id=?",
(user_correction, corrected_label, result_id))
conn.commit()
conn.close()
def save_learning_data(username, filename, original_prediction, correct_label, confidence, source="user_correction"):
conn = get_connection()
conn.execute('''INSERT INTO learning_data (username, filename, original_prediction, correct_label, confidence, source, scan_date)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(username, filename, original_prediction, correct_label, confidence, source, datetime.now().isoformat()))
conn.commit()
conn.close()
def get_all_test_batches(username=None):
conn = get_connection()
if username:
rows = conn.execute("SELECT * FROM test_batches WHERE username=? ORDER BY id DESC", (username,)).fetchall()
else:
rows = conn.execute("SELECT * FROM test_batches ORDER BY id DESC").fetchall()
conn.close()
return [dict(row) for row in rows]
def get_test_results_by_batch(batch_id):
conn = get_connection()
rows = conn.execute("SELECT * FROM test_results WHERE batch_id=?", (batch_id,)).fetchall()
conn.close()
return [dict(row) for row in rows]
def get_overall_accuracy(username=None):
conn = get_connection()
total = 0
correct = 0
if username:
rows = conn.execute("SELECT * FROM test_results WHERE username=?", (username,)).fetchall()
ld_rows = conn.execute("SELECT * FROM learning_data WHERE username=? AND source='user_correction'", (username,)).fetchall()
else:
rows = conn.execute("SELECT * FROM test_results").fetchall()
ld_rows = conn.execute("SELECT * FROM learning_data WHERE source='user_correction'").fetchall()
conn.close()
for r in rows:
if not r["folder_label"]:
continue
total += 1
final_label = r["corrected_label"] or r["prediction"]
expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL"
if final_label == expected:
correct += 1
for r in ld_rows:
total += 1
if r["original_prediction"] == r["correct_label"]:
correct += 1
wrong = total - correct
acc = round((correct / total * 100), 1) if total > 0 else 0
return {"total": total, "correct": correct, "wrong": wrong, "accuracy": acc}
def get_learning_data_count():
conn = get_connection()
row = conn.execute("SELECT COUNT(*) as cnt FROM learning_data").fetchone()
conn.close()
return row["cnt"] or 0
def migrate_existing_learning_data():
conn = get_connection()
rows = conn.execute('''SELECT tr.* FROM test_results tr
LEFT JOIN learning_data ld ON tr.filename = ld.filename AND tr.username = ld.username
WHERE tr.is_mismatch = 1 AND tr.folder_label IS NOT NULL AND ld.id IS NULL''').fetchall()
for r in rows:
expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL"
conn.execute('''INSERT INTO learning_data (username, filename, original_prediction, correct_label, confidence, source, scan_date)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(r["username"], r["filename"], r["prediction"], expected, r["confidence"], "batch_mismatch", r["scan_date"]))
conn.commit()
conn.close()
def sync_scan_history_to_test_results():
conn = get_connection()
# Cari seluruh scan di scan_history yang belum ada di test_results (batch_id IS NULL)
scans = conn.execute('''
SELECT sh.username, sh.filename, sh.is_ai, sh.accuracy, sh.scan_date
FROM scan_history sh
LEFT JOIN test_results tr ON sh.username = tr.username AND sh.filename = tr.filename AND tr.batch_id IS NULL
WHERE tr.id IS NULL
''').fetchall()
for s in scans:
prediction_label = "AI" if s["is_ai"] == 1 else "REAL"
# Inferred label
inferred_label = None
fn_lower = s["filename"].lower()
if "real" in fn_lower:
inferred_label = "REAL"
elif "fake" in fn_lower or "ai" in fn_lower:
inferred_label = "AI"
else:
inferred_label = prediction_label # Default correct
is_mismatch = 1 if prediction_label != inferred_label else 0
# Cek jika ada user_correction di learning_data
ld = conn.execute("SELECT correct_label FROM learning_data WHERE username=? AND filename=? AND source='user_correction' ORDER BY id DESC LIMIT 1",
(s["username"], s["filename"])).fetchone()
corrected_label = None
if ld:
corrected_label = "AI" if ld["correct_label"].upper() in ("FAKE", "AI") else "REAL"
is_mismatch = 1 if prediction_label != corrected_label else 0
inferred_label = corrected_label
conn.execute('''INSERT INTO test_results (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, corrected_label, scan_date)
VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)''',
(s["username"], s["filename"], inferred_label, prediction_label, s["accuracy"], is_mismatch, corrected_label, s["scan_date"]))
conn.commit()
conn.close()
|