r-vasanthkumar73-dev's picture
Deploying backend and frontend folder modules.
099d157 verified
Raw
History Blame Contribute Delete
9.7 kB
"""
Database module for The Sentinel Interface.
Uses SQLite for lightweight persistent storage of sessions and performance data.
"""
import sqlite3
import os
import json
from datetime import datetime
DB_PATH = os.path.join(os.path.dirname(__file__), "sentinel.db")
def get_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
return conn
def init_db():
"""Create tables if they don't exist."""
conn = get_connection()
cursor = conn.cursor()
cursor.executescript("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id TEXT NOT NULL DEFAULT 'default',
start_time TEXT NOT NULL,
end_time TEXT,
avg_engagement REAL DEFAULT 0,
dominant_emotion TEXT DEFAULT 'neutral',
notes TEXT DEFAULT ''
);
CREATE TABLE IF NOT EXISTS emotion_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
timestamp TEXT NOT NULL,
source TEXT NOT NULL,
emotion TEXT NOT NULL,
confidence REAL NOT NULL,
raw_data TEXT DEFAULT '{}',
FOREIGN KEY (session_id) REFERENCES sessions(id)
);
CREATE TABLE IF NOT EXISTS student_performance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id TEXT NOT NULL,
session_id INTEGER,
date TEXT NOT NULL,
engagement_score REAL NOT NULL,
face_emotion TEXT DEFAULT '{}',
speech_emotion TEXT DEFAULT '{}',
text_sentiment TEXT DEFAULT '{}',
overall_summary TEXT DEFAULT '',
FOREIGN KEY (session_id) REFERENCES sessions(id)
);
""")
conn.commit()
conn.close()
def create_session(student_id="default", start_time=None):
conn = get_connection()
cursor = conn.cursor()
now = start_time if start_time else datetime.now().isoformat()
cursor.execute(
"INSERT INTO sessions (student_id, start_time) VALUES (?, ?)",
(student_id, now)
)
conn.commit()
session_id = cursor.lastrowid
conn.close()
return session_id
def end_session(session_id, avg_engagement, dominant_emotion):
conn = get_connection()
now = datetime.now().isoformat()
conn.execute(
"UPDATE sessions SET end_time=?, avg_engagement=?, dominant_emotion=? WHERE id=?",
(now, avg_engagement, dominant_emotion, session_id)
)
conn.commit()
conn.close()
def log_emotion(session_id, source, emotion, confidence, raw_data=None):
conn = get_connection()
now = datetime.now().isoformat()
conn.execute(
"INSERT INTO emotion_logs (session_id, timestamp, source, emotion, confidence, raw_data) VALUES (?, ?, ?, ?, ?, ?)",
(session_id, now, source, emotion, confidence, json.dumps(raw_data or {}))
)
conn.commit()
conn.close()
def save_performance(student_id, session_id, engagement_score, face_emotion=None, speech_emotion=None, text_sentiment=None, summary=""):
conn = get_connection()
now = datetime.now().strftime("%Y-%m-%d")
conn.execute(
"INSERT INTO student_performance (student_id, session_id, date, engagement_score, face_emotion, speech_emotion, text_sentiment, overall_summary) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(student_id, session_id, now, engagement_score,
json.dumps(face_emotion or {}), json.dumps(speech_emotion or {}),
json.dumps(text_sentiment or {}), summary)
)
conn.commit()
conn.close()
def get_student_performance(student_id="default"):
conn = get_connection()
if student_id.lower() == 'all':
rows = conn.execute(
"SELECT * FROM student_performance ORDER BY session_id DESC, id DESC LIMIT 50"
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM student_performance WHERE student_id=? ORDER BY session_id DESC, id DESC LIMIT 50",
(student_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_all_sessions(student_id=None):
conn = get_connection()
if student_id and student_id.lower() != 'all':
rows = conn.execute(
"SELECT * FROM sessions WHERE student_id=? ORDER BY id DESC LIMIT 50",
(student_id,)
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM sessions ORDER BY id DESC LIMIT 50"
).fetchall()
conn.close()
return [dict(r) for r in rows]
def delete_session(session_id):
"""
Deletes a session and its associated logs, then shifts down all subsequent
session IDs by 1 so the IDs remain perfectly contiguous.
"""
conn = get_connection()
try:
# Cascade delete
conn.execute("DELETE FROM emotion_logs WHERE session_id=?", (session_id,))
conn.execute("DELETE FROM student_performance WHERE session_id=?", (session_id,))
conn.execute("DELETE FROM sessions WHERE id=?", (session_id,))
# Dynamic re-indexing to close the gap
conn.execute("UPDATE sessions SET id = id - 1 WHERE id > ?", (session_id,))
conn.execute("UPDATE student_performance SET session_id = session_id - 1 WHERE session_id > ?", (session_id,))
conn.execute("UPDATE emotion_logs SET session_id = session_id - 1 WHERE session_id > ?", (session_id,))
# Reset the AUTOINCREMENT sequence so the next inserted ID is MAX(id) + 1
conn.execute("UPDATE sqlite_sequence SET seq = (SELECT MAX(id) FROM sessions) WHERE name = 'sessions'")
conn.commit()
return True
except Exception as e:
conn.rollback()
print(f"Error deleting session: {e}")
return False
finally:
conn.close()
def get_session_emotions(session_id):
conn = get_connection()
rows = conn.execute(
"SELECT * FROM emotion_logs WHERE session_id=? ORDER BY timestamp ASC",
(session_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_overall_stats(student_id="default"):
conn = get_connection()
if student_id.lower() == 'all':
row = conn.execute(
"SELECT COUNT(*) as total_sessions, AVG(engagement_score) as avg_engagement, MAX(engagement_score) as peak_engagement, MIN(engagement_score) as min_engagement FROM student_performance"
).fetchone()
else:
row = conn.execute(
"SELECT COUNT(*) as total_sessions, AVG(engagement_score) as avg_engagement, MAX(engagement_score) as peak_engagement, MIN(engagement_score) as min_engagement FROM student_performance WHERE student_id=?",
(student_id,)
).fetchone()
conn.close()
if row and row['total_sessions'] > 0:
return dict(row)
return {"total_sessions": 0, "avg_engagement": 0, "peak_engagement": 0, "min_engagement": 0}
def get_session_details(session_id):
import re
from datetime import datetime, timedelta
conn = get_connection()
session = conn.execute("SELECT s.*, p.overall_summary FROM sessions s LEFT JOIN student_performance p ON s.id=p.session_id WHERE s.id=?", (session_id,)).fetchone()
if not session:
conn.close()
return None
session = dict(session)
logs_rows = conn.execute("SELECT timestamp, confidence FROM emotion_logs WHERE session_id=? AND confidence IS NOT NULL", (session_id,)).fetchall()
conn.close()
if logs_rows:
scores = [l['confidence'] for l in logs_rows]
session['peak_engagement'] = max(scores)
session['min_engagement'] = min(scores)
session['avg_engagement'] = sum(scores) / len(scores)
else:
# Fallback if no logs
session['peak_engagement'] = session['avg_engagement']
session['min_engagement'] = session['avg_engagement']
# Calculate duration
try:
start_str = session.get('start_time', '').replace('Z', '')
start = datetime.fromisoformat(start_str) if start_str else datetime.now()
diff_seconds = 0
parsed_from_summary = False
# ALWAYS prioritize parsing the true duration from the summary if available
if session.get('overall_summary'):
match = re.search(r'(?:lasted\s*(\d+)\s*minutes|Session:\s*(\d+)min)', session['overall_summary'])
if match:
val = match.group(1) or match.group(2)
diff_seconds = int(val) * 60
parsed_from_summary = True
# Fallback to timestamp delta if no summary matched
if not parsed_from_summary:
end_str = session.get('end_time', '').replace('Z', '')
end = datetime.fromisoformat(end_str) if end_str else start
if end < start: end = start
diff_seconds = (end - start).total_seconds()
# User explicitly requested that end_time must mathematically align with start_time + duration
calculated_end = start + timedelta(seconds=diff_seconds)
session['end_time'] = calculated_end.isoformat()
if diff_seconds < 60:
session['duration_mins'] = "0 minutes"
else:
mins = round(diff_seconds / 60.0, 1)
session['duration_mins'] = f"{mins} minutes"
session['date_time'] = start.strftime("%b %d, %Y %H:%M")
except Exception:
session['duration_mins'] = "0 seconds"
session['date_time'] = session.get('start_time', 'Unknown Date')
return session
# Initialize database on import
init_db()