import sqlite3 import datetime import os DB_NAME = os.path.join(os.path.dirname(__file__), 'database.db') def get_db_connection(): try: conn = sqlite3.connect(DB_NAME) conn.row_factory = sqlite3.Row return conn except sqlite3.Error as e: print(f"Database error: {e}") return None def init_db(): conn = get_db_connection() if conn: try: conn.execute(''' CREATE TABLE IF NOT EXISTS history ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT NOT NULL, prediction TEXT NOT NULL, confidence REAL NOT NULL, fake_probability REAL NOT NULL, real_probability REAL NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() print("✅ Database initialized successfully.") except sqlite3.Error as e: print(f"Error initializing database: {e}") # Migration: Add image_path, notes, tags if not exists try: conn.execute('ALTER TABLE history ADD COLUMN image_path TEXT') print("✅ Added image_path column.") except sqlite3.Error: pass # Column likely exists try: conn.execute('ALTER TABLE history ADD COLUMN notes TEXT') print("✅ Added notes column.") except sqlite3.Error: pass try: conn.execute('ALTER TABLE history ADD COLUMN tags TEXT') print("✅ Added tags column.") except sqlite3.Error: pass try: conn.execute('ALTER TABLE history ADD COLUMN session_id TEXT') print("✅ Added session_id column.") except sqlite3.Error: pass # Create feedback table for user feedback on predictions try: conn.execute(''' CREATE TABLE IF NOT EXISTS feedback ( id INTEGER PRIMARY KEY AUTOINCREMENT, scan_id INTEGER NOT NULL, user_feedback TEXT NOT NULL, predicted_label TEXT NOT NULL, actual_label TEXT, image_path TEXT, confidence REAL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (scan_id) REFERENCES history(id) ) ''') conn.commit() print("✅ Feedback table initialized successfully.") except sqlite3.Error as e: print(f"Error initializing feedback table: {e}") finally: conn.close() def add_scan(filename, prediction, confidence, fake_prob, real_prob, image_path="", session_id=None): conn = get_db_connection() if conn: try: cursor = conn.execute(''' INSERT INTO history (filename, prediction, confidence, fake_probability, real_probability, image_path, session_id) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (filename, prediction, confidence, fake_prob, real_prob, image_path, session_id)) conn.commit() scan_id = cursor.lastrowid return scan_id except sqlite3.Error as e: print(f"Error adding scan: {e}") return None finally: conn.close() return None def get_history(session_id=None): conn = get_db_connection() if conn: try: query = 'SELECT * FROM history' params = [] if session_id: query += ' WHERE session_id = ? OR session_id IS NULL' # Allow seeing public/legacy items if desired, or strictly session specific # Strict session isolation: query = 'SELECT * FROM history WHERE session_id = ?' params = [session_id] else: # If no session_id provided (legacy behavior), maybe show all or none? # Let's show only items with NULL session_id to avoid leaking user data query = 'SELECT * FROM history WHERE session_id IS NULL' query += ' ORDER BY timestamp DESC' cursor = conn.execute(query, params) history = [dict(row) for row in cursor.fetchall()] return history except sqlite3.Error as e: print(f"Error retrieving history: {e}") return [] finally: conn.close() return [] def clear_history(session_id=None): conn = get_db_connection() if conn: try: if session_id: conn.execute('DELETE FROM history WHERE session_id = ?', (session_id,)) else: conn.execute('DELETE FROM history WHERE session_id IS NULL') conn.commit() return True except sqlite3.Error as e: print(f"Error clearing history: {e}") return False finally: conn.close() return False def delete_scan(scan_id, session_id=None): conn = get_db_connection() if conn: try: if session_id: conn.execute('DELETE FROM history WHERE id = ? AND session_id = ?', (scan_id, session_id)) else: conn.execute('DELETE FROM history WHERE id = ? AND session_id IS NULL', (scan_id,)) conn.commit() return True except sqlite3.Error as e: print(f"Error deleting scan: {e}") return False finally: conn.close() return False def update_scan(scan_id, data): conn = get_db_connection() if conn: try: fields = [] values = [] if 'notes' in data: fields.append("notes = ?") values.append(data['notes']) if 'tags' in data: fields.append("tags = ?") values.append(data['tags']) if not fields: return True values.append(scan_id) query = f"UPDATE history SET {', '.join(fields)} WHERE id = ?" conn.execute(query, tuple(values)) conn.commit() return True except sqlite3.Error as e: print(f"Error updating scan: {e}") return False finally: conn.close() return False def add_feedback(scan_id, is_correct, predicted_label, actual_label=None, image_path=None, confidence=None): """Record user feedback on a prediction""" conn = get_db_connection() if conn: try: user_feedback = 'correct' if is_correct else 'incorrect' conn.execute(''' INSERT INTO feedback (scan_id, user_feedback, predicted_label, actual_label, image_path, confidence) VALUES (?, ?, ?, ?, ?, ?) ''', (scan_id, user_feedback, predicted_label, actual_label, image_path, confidence)) conn.commit() return True except sqlite3.Error as e: print(f"Error adding feedback: {e}") return False finally: conn.close() return False def get_incorrect_predictions(): """Get all incorrect predictions for model retraining""" conn = get_db_connection() if conn: try: cursor = conn.execute(''' SELECT f.*, h.filename FROM feedback f LEFT JOIN history h ON f.scan_id = h.id WHERE f.user_feedback = 'incorrect' ORDER BY f.timestamp DESC ''') incorrect = [dict(row) for row in cursor.fetchall()] return incorrect except sqlite3.Error as e: print(f"Error retrieving incorrect predictions: {e}") return [] finally: conn.close() return [] def get_feedback_stats(): """Get statistics on user feedback""" conn = get_db_connection() if conn: try: cursor = conn.execute(''' SELECT COUNT(*) as total_feedback, SUM(CASE WHEN user_feedback = 'correct' THEN 1 ELSE 0 END) as correct_count, SUM(CASE WHEN user_feedback = 'incorrect' THEN 1 ELSE 0 END) as incorrect_count FROM feedback ''') stats = dict(cursor.fetchone()) return stats except sqlite3.Error as e: print(f"Error retrieving feedback stats: {e}") return {'total_feedback': 0, 'correct_count': 0, 'incorrect_count': 0} finally: conn.close() return {'total_feedback': 0, 'correct_count': 0, 'incorrect_count': 0} # Initialize DB on module load init_db()