File size: 11,974 Bytes
6f6024d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import sqlite3
import json
import os
import hashlib
from datetime import datetime

DATASET_CONFIG = {
    "dataset_dir": "./dataset_ai_vs_real",
    "real_dir": "./dataset_ai_vs_real/real",
    "ai_dir": "./dataset_ai_vs_real/fake"
}

DB_NAME = "app_database.db"

def get_connection():
    conn = sqlite3.connect(DB_NAME)
    conn.row_factory = sqlite3.Row
    return conn

def init_db():
    conn = get_connection()
    cursor = conn.cursor()
    cursor.execute('''CREATE TABLE IF NOT EXISTS users (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        username TEXT UNIQUE NOT NULL,
                        password TEXT NOT NULL,
                        name TEXT NOT NULL,
                        join_date TEXT,
                        trust_score INTEGER DEFAULT 50)''')
    try:
        cursor.execute("ALTER TABLE users ADD COLUMN trust_score INTEGER DEFAULT 50")
    except sqlite3.OperationalError:
        pass
    cursor.execute('''CREATE TABLE IF NOT EXISTS scan_history (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        username TEXT NOT NULL,
                        filename TEXT,
                        file_type TEXT,
                        file_size TEXT,
                        source TEXT,
                        is_ai INTEGER,
                        accuracy REAL,
                        scan_date TEXT)''')
    cursor.execute('''CREATE TABLE IF NOT EXISTS test_batches (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        username TEXT NOT NULL,
                        total_images INTEGER,
                        correct_count INTEGER,
                        wrong_count INTEGER,
                        accuracy REAL,
                        test_date TEXT)''')
    cursor.execute('''CREATE TABLE IF NOT EXISTS test_results (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        batch_id INTEGER,
                        username TEXT NOT NULL,
                        filename TEXT,
                        folder_label TEXT,
                        prediction TEXT,
                        confidence REAL,
                        is_mismatch INTEGER,
                        user_correction TEXT,
                        corrected_label TEXT,
                        scan_date TEXT)''')
    cursor.execute('''CREATE TABLE IF NOT EXISTS learning_data (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        username TEXT NOT NULL,
                        filename TEXT,
                        original_prediction TEXT,
                        correct_label TEXT,
                        confidence REAL,
                        source TEXT,
                        scan_date TEXT)''')
    conn.commit()
    conn.close()
    for path in DATASET_CONFIG.values():
        if not os.path.exists(path):
            os.makedirs(path, exist_ok=True)

def hash_password(password):
    return hashlib.sha256(password.encode()).hexdigest()

def register_user(username, password, name):
    conn = get_connection()
    try:
        conn.execute("INSERT INTO users (username, password, name, join_date) VALUES (?, ?, ?, ?)",
                     (username, hash_password(password), name, datetime.now().isoformat()))
        conn.commit()
        return True
    except sqlite3.IntegrityError:
        return False
    finally:
        conn.close()

def login_user(username, password):
    conn = get_connection()
    user = conn.execute("SELECT * FROM users WHERE username=? AND password=?", 
                        (username, hash_password(password))).fetchone()
    conn.close()
    return dict(user) if user else None

def add_scan_history(username, filename, file_type, file_size, source, is_ai, accuracy):
    conn = get_connection()
    conn.execute('''INSERT INTO scan_history (username, filename, file_type, file_size, source, is_ai, accuracy, scan_date)
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', 
                 (username, filename, file_type, file_size, source, is_ai, accuracy, datetime.now().isoformat()))
    conn.commit()
    conn.close()

def get_user_history(username):
    conn = get_connection()
    scan_rows = conn.execute("SELECT * FROM scan_history WHERE username=? ORDER BY id DESC LIMIT 20", (username,)).fetchall()
    batch_rows = conn.execute('''SELECT tb.id, tb.total_images, tb.correct_count, tb.wrong_count, tb.accuracy, tb.test_date
                                  FROM test_batches tb WHERE tb.username=? ORDER BY tb.id DESC LIMIT 20''', (username,)).fetchall()
    conn.close()

    history = []
    for row in scan_rows:
        r = dict(row)
        r["_type"] = "scan"
        history.append(r)
    for row in batch_rows:
        r = dict(row)
        r["_type"] = "batch"
        r["filename"] = f"Batch #{r['id']} ({r['total_images']} gambar)"
        r["file_type"] = "Batch"
        r["file_size"] = "-"
        r["source"] = f"{r['correct_count']} benar / {r['wrong_count']} salah"
        r["is_ai"] = 0
        r["accuracy"] = r["accuracy"]
        r["scan_date"] = r["test_date"]
        history.append(r)

    history.sort(key=lambda x: x.get("scan_date") or "", reverse=True)
    return history[:20]

def clear_all_history():
    conn = get_connection()
    conn.execute("DELETE FROM scan_history")
    conn.execute("DELETE FROM test_batches")
    conn.execute("DELETE FROM test_results")
    conn.execute("DELETE FROM learning_data")
    conn.commit()
    conn.close()

def create_test_batch(username, total_images, correct_count, wrong_count, accuracy):
    conn = get_connection()
    cur = conn.execute('''INSERT INTO test_batches (username, total_images, correct_count, wrong_count, accuracy, test_date)
                          VALUES (?, ?, ?, ?, ?, ?)''',
                       (username, total_images, correct_count, wrong_count, accuracy, datetime.now().isoformat()))
    conn.commit()
    batch_id = cur.lastrowid
    conn.close()
    return batch_id

def save_test_result(batch_id, username, filename, folder_label, prediction, confidence, is_mismatch):
    conn = get_connection()
    conn.execute('''INSERT INTO test_results (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, scan_date)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
                 (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, datetime.now().isoformat()))
    conn.commit()
    conn.close()

def update_test_result_correction(result_id, user_correction, corrected_label):
    conn = get_connection()
    conn.execute("UPDATE test_results SET user_correction=?, corrected_label=? WHERE id=?",
                 (user_correction, corrected_label, result_id))
    conn.commit()
    conn.close()

def save_learning_data(username, filename, original_prediction, correct_label, confidence, source="user_correction"):
    conn = get_connection()
    conn.execute('''INSERT INTO learning_data (username, filename, original_prediction, correct_label, confidence, source, scan_date)
                    VALUES (?, ?, ?, ?, ?, ?, ?)''',
                 (username, filename, original_prediction, correct_label, confidence, source, datetime.now().isoformat()))
    conn.commit()
    conn.close()

def get_all_test_batches(username=None):
    conn = get_connection()
    if username:
        rows = conn.execute("SELECT * FROM test_batches WHERE username=? ORDER BY id DESC", (username,)).fetchall()
    else:
        rows = conn.execute("SELECT * FROM test_batches ORDER BY id DESC").fetchall()
    conn.close()
    return [dict(row) for row in rows]

def get_test_results_by_batch(batch_id):
    conn = get_connection()
    rows = conn.execute("SELECT * FROM test_results WHERE batch_id=?", (batch_id,)).fetchall()
    conn.close()
    return [dict(row) for row in rows]

def get_overall_accuracy(username=None):
    conn = get_connection()

    total = 0
    correct = 0

    if username:
        rows = conn.execute("SELECT * FROM test_results WHERE username=?", (username,)).fetchall()
        ld_rows = conn.execute("SELECT * FROM learning_data WHERE username=? AND source='user_correction'", (username,)).fetchall()
    else:
        rows = conn.execute("SELECT * FROM test_results").fetchall()
        ld_rows = conn.execute("SELECT * FROM learning_data WHERE source='user_correction'").fetchall()

    conn.close()

    for r in rows:
        if not r["folder_label"]:
            continue
        total += 1
        final_label = r["corrected_label"] or r["prediction"]
        expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL"
        if final_label == expected:
            correct += 1

    for r in ld_rows:
        total += 1
        if r["original_prediction"] == r["correct_label"]:
            correct += 1

    wrong = total - correct
    acc = round((correct / total * 100), 1) if total > 0 else 0
    return {"total": total, "correct": correct, "wrong": wrong, "accuracy": acc}

def get_learning_data_count():
    conn = get_connection()
    row = conn.execute("SELECT COUNT(*) as cnt FROM learning_data").fetchone()
    conn.close()
    return row["cnt"] or 0

def migrate_existing_learning_data():
    conn = get_connection()
    rows = conn.execute('''SELECT tr.* FROM test_results tr
                           LEFT JOIN learning_data ld ON tr.filename = ld.filename AND tr.username = ld.username
                           WHERE tr.is_mismatch = 1 AND tr.folder_label IS NOT NULL AND ld.id IS NULL''').fetchall()
    for r in rows:
        expected = "AI" if r["folder_label"].lower() in ("fake", "ai") else "REAL"
        conn.execute('''INSERT INTO learning_data (username, filename, original_prediction, correct_label, confidence, source, scan_date)
                        VALUES (?, ?, ?, ?, ?, ?, ?)''',
                     (r["username"], r["filename"], r["prediction"], expected, r["confidence"], "batch_mismatch", r["scan_date"]))
    conn.commit()
    conn.close()

def sync_scan_history_to_test_results():
    conn = get_connection()
    # Cari seluruh scan di scan_history yang belum ada di test_results (batch_id IS NULL)
    scans = conn.execute('''
        SELECT sh.username, sh.filename, sh.is_ai, sh.accuracy, sh.scan_date
        FROM scan_history sh
        LEFT JOIN test_results tr ON sh.username = tr.username AND sh.filename = tr.filename AND tr.batch_id IS NULL
        WHERE tr.id IS NULL
    ''').fetchall()
    
    for s in scans:
        prediction_label = "AI" if s["is_ai"] == 1 else "REAL"
        
        # Inferred label
        inferred_label = None
        fn_lower = s["filename"].lower()
        if "real" in fn_lower:
            inferred_label = "REAL"
        elif "fake" in fn_lower or "ai" in fn_lower:
            inferred_label = "AI"
        else:
            inferred_label = prediction_label # Default correct
            
        is_mismatch = 1 if prediction_label != inferred_label else 0
        
        # Cek jika ada user_correction di learning_data
        ld = conn.execute("SELECT correct_label FROM learning_data WHERE username=? AND filename=? AND source='user_correction' ORDER BY id DESC LIMIT 1",
                          (s["username"], s["filename"])).fetchone()
        corrected_label = None
        if ld:
            corrected_label = "AI" if ld["correct_label"].upper() in ("FAKE", "AI") else "REAL"
            is_mismatch = 1 if prediction_label != corrected_label else 0
            inferred_label = corrected_label
            
        conn.execute('''INSERT INTO test_results (batch_id, username, filename, folder_label, prediction, confidence, is_mismatch, corrected_label, scan_date)
                        VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)''',
                     (s["username"], s["filename"], inferred_label, prediction_label, s["accuracy"], is_mismatch, corrected_label, s["scan_date"]))
    conn.commit()
    conn.close()