| """ |
| Database module for The Sentinel Interface. |
| Uses SQLite for lightweight persistent storage of sessions and performance data. |
| """ |
| import sqlite3 |
| import os |
| import json |
| from datetime import datetime |
|
|
| DB_PATH = os.path.join(os.path.dirname(__file__), "sentinel.db") |
|
|
|
|
| def get_connection(): |
| conn = sqlite3.connect(DB_PATH) |
| conn.row_factory = sqlite3.Row |
| conn.execute("PRAGMA journal_mode=WAL") |
| return conn |
|
|
|
|
| def init_db(): |
| """Create tables if they don't exist.""" |
| conn = get_connection() |
| cursor = conn.cursor() |
| cursor.executescript(""" |
| CREATE TABLE IF NOT EXISTS sessions ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| student_id TEXT NOT NULL DEFAULT 'default', |
| start_time TEXT NOT NULL, |
| end_time TEXT, |
| avg_engagement REAL DEFAULT 0, |
| dominant_emotion TEXT DEFAULT 'neutral', |
| notes TEXT DEFAULT '' |
| ); |
| |
| CREATE TABLE IF NOT EXISTS emotion_logs ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| session_id INTEGER NOT NULL, |
| timestamp TEXT NOT NULL, |
| source TEXT NOT NULL, |
| emotion TEXT NOT NULL, |
| confidence REAL NOT NULL, |
| raw_data TEXT DEFAULT '{}', |
| FOREIGN KEY (session_id) REFERENCES sessions(id) |
| ); |
| |
| CREATE TABLE IF NOT EXISTS student_performance ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| student_id TEXT NOT NULL, |
| session_id INTEGER, |
| date TEXT NOT NULL, |
| engagement_score REAL NOT NULL, |
| face_emotion TEXT DEFAULT '{}', |
| speech_emotion TEXT DEFAULT '{}', |
| text_sentiment TEXT DEFAULT '{}', |
| overall_summary TEXT DEFAULT '', |
| FOREIGN KEY (session_id) REFERENCES sessions(id) |
| ); |
| """) |
| conn.commit() |
| conn.close() |
|
|
|
|
| def create_session(student_id="default", start_time=None): |
| conn = get_connection() |
| cursor = conn.cursor() |
| now = start_time if start_time else datetime.now().isoformat() |
| cursor.execute( |
| "INSERT INTO sessions (student_id, start_time) VALUES (?, ?)", |
| (student_id, now) |
| ) |
| conn.commit() |
| session_id = cursor.lastrowid |
| conn.close() |
| return session_id |
|
|
|
|
| def end_session(session_id, avg_engagement, dominant_emotion): |
| conn = get_connection() |
| now = datetime.now().isoformat() |
| conn.execute( |
| "UPDATE sessions SET end_time=?, avg_engagement=?, dominant_emotion=? WHERE id=?", |
| (now, avg_engagement, dominant_emotion, session_id) |
| ) |
| conn.commit() |
| conn.close() |
|
|
|
|
| def log_emotion(session_id, source, emotion, confidence, raw_data=None): |
| conn = get_connection() |
| now = datetime.now().isoformat() |
| conn.execute( |
| "INSERT INTO emotion_logs (session_id, timestamp, source, emotion, confidence, raw_data) VALUES (?, ?, ?, ?, ?, ?)", |
| (session_id, now, source, emotion, confidence, json.dumps(raw_data or {})) |
| ) |
| conn.commit() |
| conn.close() |
|
|
|
|
| def save_performance(student_id, session_id, engagement_score, face_emotion=None, speech_emotion=None, text_sentiment=None, summary=""): |
| conn = get_connection() |
| now = datetime.now().strftime("%Y-%m-%d") |
| conn.execute( |
| "INSERT INTO student_performance (student_id, session_id, date, engagement_score, face_emotion, speech_emotion, text_sentiment, overall_summary) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", |
| (student_id, session_id, now, engagement_score, |
| json.dumps(face_emotion or {}), json.dumps(speech_emotion or {}), |
| json.dumps(text_sentiment or {}), summary) |
| ) |
| conn.commit() |
| conn.close() |
|
|
|
|
| def get_student_performance(student_id="default"): |
| conn = get_connection() |
| if student_id.lower() == 'all': |
| rows = conn.execute( |
| "SELECT * FROM student_performance ORDER BY session_id DESC, id DESC LIMIT 50" |
| ).fetchall() |
| else: |
| rows = conn.execute( |
| "SELECT * FROM student_performance WHERE student_id=? ORDER BY session_id DESC, id DESC LIMIT 50", |
| (student_id,) |
| ).fetchall() |
| conn.close() |
| return [dict(r) for r in rows] |
|
|
|
|
| def get_all_sessions(student_id=None): |
| conn = get_connection() |
| if student_id and student_id.lower() != 'all': |
| rows = conn.execute( |
| "SELECT * FROM sessions WHERE student_id=? ORDER BY id DESC LIMIT 50", |
| (student_id,) |
| ).fetchall() |
| else: |
| rows = conn.execute( |
| "SELECT * FROM sessions ORDER BY id DESC LIMIT 50" |
| ).fetchall() |
| conn.close() |
| return [dict(r) for r in rows] |
|
|
|
|
| def delete_session(session_id): |
| """ |
| Deletes a session and its associated logs, then shifts down all subsequent |
| session IDs by 1 so the IDs remain perfectly contiguous. |
| """ |
| conn = get_connection() |
| try: |
| |
| conn.execute("DELETE FROM emotion_logs WHERE session_id=?", (session_id,)) |
| conn.execute("DELETE FROM student_performance WHERE session_id=?", (session_id,)) |
| conn.execute("DELETE FROM sessions WHERE id=?", (session_id,)) |
| |
| |
| conn.execute("UPDATE sessions SET id = id - 1 WHERE id > ?", (session_id,)) |
| conn.execute("UPDATE student_performance SET session_id = session_id - 1 WHERE session_id > ?", (session_id,)) |
| conn.execute("UPDATE emotion_logs SET session_id = session_id - 1 WHERE session_id > ?", (session_id,)) |
| |
| |
| conn.execute("UPDATE sqlite_sequence SET seq = (SELECT MAX(id) FROM sessions) WHERE name = 'sessions'") |
| |
| conn.commit() |
| return True |
| except Exception as e: |
| conn.rollback() |
| print(f"Error deleting session: {e}") |
| return False |
| finally: |
| conn.close() |
|
|
|
|
| def get_session_emotions(session_id): |
| conn = get_connection() |
| rows = conn.execute( |
| "SELECT * FROM emotion_logs WHERE session_id=? ORDER BY timestamp ASC", |
| (session_id,) |
| ).fetchall() |
| conn.close() |
| return [dict(r) for r in rows] |
|
|
|
|
| def get_overall_stats(student_id="default"): |
| conn = get_connection() |
| if student_id.lower() == 'all': |
| row = conn.execute( |
| "SELECT COUNT(*) as total_sessions, AVG(engagement_score) as avg_engagement, MAX(engagement_score) as peak_engagement, MIN(engagement_score) as min_engagement FROM student_performance" |
| ).fetchone() |
| else: |
| row = conn.execute( |
| "SELECT COUNT(*) as total_sessions, AVG(engagement_score) as avg_engagement, MAX(engagement_score) as peak_engagement, MIN(engagement_score) as min_engagement FROM student_performance WHERE student_id=?", |
| (student_id,) |
| ).fetchone() |
| conn.close() |
| if row and row['total_sessions'] > 0: |
| return dict(row) |
| return {"total_sessions": 0, "avg_engagement": 0, "peak_engagement": 0, "min_engagement": 0} |
|
|
| def get_session_details(session_id): |
| import re |
| from datetime import datetime, timedelta |
| conn = get_connection() |
| session = conn.execute("SELECT s.*, p.overall_summary FROM sessions s LEFT JOIN student_performance p ON s.id=p.session_id WHERE s.id=?", (session_id,)).fetchone() |
| if not session: |
| conn.close() |
| return None |
| |
| session = dict(session) |
| logs_rows = conn.execute("SELECT timestamp, confidence FROM emotion_logs WHERE session_id=? AND confidence IS NOT NULL", (session_id,)).fetchall() |
| conn.close() |
| |
| if logs_rows: |
| scores = [l['confidence'] for l in logs_rows] |
| session['peak_engagement'] = max(scores) |
| session['min_engagement'] = min(scores) |
| session['avg_engagement'] = sum(scores) / len(scores) |
| else: |
| |
| session['peak_engagement'] = session['avg_engagement'] |
| session['min_engagement'] = session['avg_engagement'] |
| |
| |
| try: |
| start_str = session.get('start_time', '').replace('Z', '') |
| start = datetime.fromisoformat(start_str) if start_str else datetime.now() |
| |
| diff_seconds = 0 |
| parsed_from_summary = False |
| |
| |
| if session.get('overall_summary'): |
| match = re.search(r'(?:lasted\s*(\d+)\s*minutes|Session:\s*(\d+)min)', session['overall_summary']) |
| if match: |
| val = match.group(1) or match.group(2) |
| diff_seconds = int(val) * 60 |
| parsed_from_summary = True |
| |
| |
| if not parsed_from_summary: |
| end_str = session.get('end_time', '').replace('Z', '') |
| end = datetime.fromisoformat(end_str) if end_str else start |
| if end < start: end = start |
| diff_seconds = (end - start).total_seconds() |
| |
| |
| calculated_end = start + timedelta(seconds=diff_seconds) |
| session['end_time'] = calculated_end.isoformat() |
|
|
| if diff_seconds < 60: |
| session['duration_mins'] = "0 minutes" |
| else: |
| mins = round(diff_seconds / 60.0, 1) |
| session['duration_mins'] = f"{mins} minutes" |
| |
| session['date_time'] = start.strftime("%b %d, %Y %H:%M") |
| except Exception: |
| session['duration_mins'] = "0 seconds" |
| session['date_time'] = session.get('start_time', 'Unknown Date') |
| |
| return session |
| |
| init_db() |
|
|