""" Database module for The Sentinel Interface. Uses SQLite for lightweight persistent storage of sessions and performance data. """ import sqlite3 import os import json from datetime import datetime DB_PATH = os.path.join(os.path.dirname(__file__), "sentinel.db") def get_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn def init_db(): """Create tables if they don't exist.""" conn = get_connection() cursor = conn.cursor() cursor.executescript(""" CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, student_id TEXT NOT NULL DEFAULT 'default', start_time TEXT NOT NULL, end_time TEXT, avg_engagement REAL DEFAULT 0, dominant_emotion TEXT DEFAULT 'neutral', notes TEXT DEFAULT '' ); CREATE TABLE IF NOT EXISTS emotion_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id INTEGER NOT NULL, timestamp TEXT NOT NULL, source TEXT NOT NULL, emotion TEXT NOT NULL, confidence REAL NOT NULL, raw_data TEXT DEFAULT '{}', FOREIGN KEY (session_id) REFERENCES sessions(id) ); CREATE TABLE IF NOT EXISTS student_performance ( id INTEGER PRIMARY KEY AUTOINCREMENT, student_id TEXT NOT NULL, session_id INTEGER, date TEXT NOT NULL, engagement_score REAL NOT NULL, face_emotion TEXT DEFAULT '{}', speech_emotion TEXT DEFAULT '{}', text_sentiment TEXT DEFAULT '{}', overall_summary TEXT DEFAULT '', FOREIGN KEY (session_id) REFERENCES sessions(id) ); """) conn.commit() conn.close() def create_session(student_id="default", start_time=None): conn = get_connection() cursor = conn.cursor() now = start_time if start_time else datetime.now().isoformat() cursor.execute( "INSERT INTO sessions (student_id, start_time) VALUES (?, ?)", (student_id, now) ) conn.commit() session_id = cursor.lastrowid conn.close() return session_id def end_session(session_id, avg_engagement, dominant_emotion): conn = get_connection() now = datetime.now().isoformat() conn.execute( "UPDATE sessions SET end_time=?, avg_engagement=?, dominant_emotion=? WHERE id=?", (now, avg_engagement, dominant_emotion, session_id) ) conn.commit() conn.close() def log_emotion(session_id, source, emotion, confidence, raw_data=None): conn = get_connection() now = datetime.now().isoformat() conn.execute( "INSERT INTO emotion_logs (session_id, timestamp, source, emotion, confidence, raw_data) VALUES (?, ?, ?, ?, ?, ?)", (session_id, now, source, emotion, confidence, json.dumps(raw_data or {})) ) conn.commit() conn.close() def save_performance(student_id, session_id, engagement_score, face_emotion=None, speech_emotion=None, text_sentiment=None, summary=""): conn = get_connection() now = datetime.now().strftime("%Y-%m-%d") conn.execute( "INSERT INTO student_performance (student_id, session_id, date, engagement_score, face_emotion, speech_emotion, text_sentiment, overall_summary) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (student_id, session_id, now, engagement_score, json.dumps(face_emotion or {}), json.dumps(speech_emotion or {}), json.dumps(text_sentiment or {}), summary) ) conn.commit() conn.close() def get_student_performance(student_id="default"): conn = get_connection() if student_id.lower() == 'all': rows = conn.execute( "SELECT * FROM student_performance ORDER BY session_id DESC, id DESC LIMIT 50" ).fetchall() else: rows = conn.execute( "SELECT * FROM student_performance WHERE student_id=? ORDER BY session_id DESC, id DESC LIMIT 50", (student_id,) ).fetchall() conn.close() return [dict(r) for r in rows] def get_all_sessions(student_id=None): conn = get_connection() if student_id and student_id.lower() != 'all': rows = conn.execute( "SELECT * FROM sessions WHERE student_id=? ORDER BY id DESC LIMIT 50", (student_id,) ).fetchall() else: rows = conn.execute( "SELECT * FROM sessions ORDER BY id DESC LIMIT 50" ).fetchall() conn.close() return [dict(r) for r in rows] def delete_session(session_id): """ Deletes a session and its associated logs, then shifts down all subsequent session IDs by 1 so the IDs remain perfectly contiguous. """ conn = get_connection() try: # Cascade delete conn.execute("DELETE FROM emotion_logs WHERE session_id=?", (session_id,)) conn.execute("DELETE FROM student_performance WHERE session_id=?", (session_id,)) conn.execute("DELETE FROM sessions WHERE id=?", (session_id,)) # Dynamic re-indexing to close the gap conn.execute("UPDATE sessions SET id = id - 1 WHERE id > ?", (session_id,)) conn.execute("UPDATE student_performance SET session_id = session_id - 1 WHERE session_id > ?", (session_id,)) conn.execute("UPDATE emotion_logs SET session_id = session_id - 1 WHERE session_id > ?", (session_id,)) # Reset the AUTOINCREMENT sequence so the next inserted ID is MAX(id) + 1 conn.execute("UPDATE sqlite_sequence SET seq = (SELECT MAX(id) FROM sessions) WHERE name = 'sessions'") conn.commit() return True except Exception as e: conn.rollback() print(f"Error deleting session: {e}") return False finally: conn.close() def get_session_emotions(session_id): conn = get_connection() rows = conn.execute( "SELECT * FROM emotion_logs WHERE session_id=? ORDER BY timestamp ASC", (session_id,) ).fetchall() conn.close() return [dict(r) for r in rows] def get_overall_stats(student_id="default"): conn = get_connection() if student_id.lower() == 'all': row = conn.execute( "SELECT COUNT(*) as total_sessions, AVG(engagement_score) as avg_engagement, MAX(engagement_score) as peak_engagement, MIN(engagement_score) as min_engagement FROM student_performance" ).fetchone() else: row = conn.execute( "SELECT COUNT(*) as total_sessions, AVG(engagement_score) as avg_engagement, MAX(engagement_score) as peak_engagement, MIN(engagement_score) as min_engagement FROM student_performance WHERE student_id=?", (student_id,) ).fetchone() conn.close() if row and row['total_sessions'] > 0: return dict(row) return {"total_sessions": 0, "avg_engagement": 0, "peak_engagement": 0, "min_engagement": 0} def get_session_details(session_id): import re from datetime import datetime, timedelta conn = get_connection() session = conn.execute("SELECT s.*, p.overall_summary FROM sessions s LEFT JOIN student_performance p ON s.id=p.session_id WHERE s.id=?", (session_id,)).fetchone() if not session: conn.close() return None session = dict(session) logs_rows = conn.execute("SELECT timestamp, confidence FROM emotion_logs WHERE session_id=? AND confidence IS NOT NULL", (session_id,)).fetchall() conn.close() if logs_rows: scores = [l['confidence'] for l in logs_rows] session['peak_engagement'] = max(scores) session['min_engagement'] = min(scores) session['avg_engagement'] = sum(scores) / len(scores) else: # Fallback if no logs session['peak_engagement'] = session['avg_engagement'] session['min_engagement'] = session['avg_engagement'] # Calculate duration try: start_str = session.get('start_time', '').replace('Z', '') start = datetime.fromisoformat(start_str) if start_str else datetime.now() diff_seconds = 0 parsed_from_summary = False # ALWAYS prioritize parsing the true duration from the summary if available if session.get('overall_summary'): match = re.search(r'(?:lasted\s*(\d+)\s*minutes|Session:\s*(\d+)min)', session['overall_summary']) if match: val = match.group(1) or match.group(2) diff_seconds = int(val) * 60 parsed_from_summary = True # Fallback to timestamp delta if no summary matched if not parsed_from_summary: end_str = session.get('end_time', '').replace('Z', '') end = datetime.fromisoformat(end_str) if end_str else start if end < start: end = start diff_seconds = (end - start).total_seconds() # User explicitly requested that end_time must mathematically align with start_time + duration calculated_end = start + timedelta(seconds=diff_seconds) session['end_time'] = calculated_end.isoformat() if diff_seconds < 60: session['duration_mins'] = "0 minutes" else: mins = round(diff_seconds / 60.0, 1) session['duration_mins'] = f"{mins} minutes" session['date_time'] = start.strftime("%b %d, %Y %H:%M") except Exception: session['duration_mins'] = "0 seconds" session['date_time'] = session.get('start_time', 'Unknown Date') return session # Initialize database on import init_db()