""" The Sentinel Interface — FastAPI Backend Main application with REST API + WebSocket endpoints for real-time emotion analysis. """ import os import sys import json import base64 import asyncio import traceback from datetime import datetime import io import csv import numpy as np import cv2 from fastapi import FastAPI, WebSocket, WebSocketDisconnect, UploadFile, File, Form, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse, JSONResponse, HTMLResponse, StreamingResponse from pydantic import BaseModel from typing import Optional, List # Ensure models are importable sys.path.insert(0, os.path.dirname(__file__)) import database as db # ── App Setup ────────────────────────────────────────────── app = FastAPI( title="The Sentinel Interface API", description="Multisource Emotion Detection & Engagement Optimization for E-Learning", version="1.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ── Lazy Model Loading ───────────────────────────────────── # Models are loaded on first use to speed up server startup _models_loaded = { "face": False, "face_mesh": False, "speech": False, "text": False, } def get_face_model(): from models.face_model import predict_emotion _models_loaded["face"] = True return predict_emotion def get_face_mesh(): from models.face_mesh import process_frame, reset as reset_mesh _models_loaded["face_mesh"] = True return process_frame, reset_mesh def get_speech_model(): from models.speech_model import analyze_audio_bytes, analyze_audio_file _models_loaded["speech"] = True return analyze_audio_bytes, analyze_audio_file def get_text_model(): from models.text_model import analyze_text, batch_analyze _models_loaded["text"] = True return analyze_text, batch_analyze def get_engagement_calc(): from models.engagement import calculate_engagement return calculate_engagement @app.on_event("startup") async def startup_event(): print("=" * 60) print(" THE SENTINEL INTERFACE — Backend Server") print(" Multisource Emotion Detection & Engagement Optimization") print("=" * 60) print(f" Frontend: {FRONTEND_DIR}") print(f" Database: {db.DB_PATH}") print(" API Docs: http://localhost:8000/docs") print("=" * 60) print("[System] Firing up core models for instant response...") # Trigger lazy loaders to preload models into RAM before first API request hits try: from models.face_model import get_cnn_model get_cnn_model() _models_loaded["face"] = True except Exception as e: print(f"[System] Face model warning: {e}") try: get_text_model() except Exception as e: print(f"[System] Text model warning: {e}") try: get_speech_model() except Exception as e: print(f"[System] Speech model warning: {e}") print("[System] Pre-loading complete. Systems nominal.") # ── Pydantic Models ──────────────────────────────────────── class TextRequest(BaseModel): text: str student_id: Optional[str] = "default" class MultimodalRequest(BaseModel): face_data: Optional[dict] = None speech_data: Optional[dict] = None text: Optional[str] = None student_id: Optional[str] = "default" class SessionSaveRequest(BaseModel): student_id: str = "default" engagement_score: float dominant_emotion: str = "neutral" face_emotion: Optional[dict] = None speech_emotion: Optional[dict] = None text_sentiment: Optional[dict] = None summary: str = "" session_start_time: Optional[str] = None # ISO string from frontend # ── REST API Endpoints ───────────────────────────────────── @app.get("/api/health") async def health_check(): return { "status": "online", "service": "The Sentinel Interface", "version": "1.0.0", "models_loaded": _models_loaded, "timestamp": datetime.now().isoformat(), } @app.post("/api/analyze/face") async def analyze_face(file: UploadFile = File(...)): """ Analyze uploaded face image using the ML Vision Transformer (ViT) model. Same capability as the AI used in conversations — pre-trained on facial expression datasets. Falls back to DeepFace, then pixel heuristic if ViT is unavailable. """ try: contents = await file.read() nparr = np.frombuffer(contents, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: raise HTTPException(status_code=400, detail="Invalid image file") img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) from models.image_emotion_model import predict_from_image result = predict_from_image(img_rgb) return result except HTTPException: raise except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/analyze/speech") async def analyze_speech(file: UploadFile = File(...)): """Analyze uploaded audio file for speech emotion.""" try: contents = await file.read() # Save to temp file for librosa import tempfile suffix = os.path.splitext(file.filename)[1] if file.filename else ".wav" with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp.write(contents) tmp_path = tmp.name try: _, analyze_file = get_speech_model() result = analyze_file(tmp_path) return result finally: os.unlink(tmp_path) except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/analyze/text") async def analyze_text_endpoint(request: TextRequest): """Analyze text for sentiment and emotion.""" try: analyze, _ = get_text_model() result = analyze(request.text) return result except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/analyze/text/batch") async def analyze_text_batch(texts: List[str]): """Batch analyze multiple texts.""" try: _, batch = get_text_model() result = batch(texts) return result except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/analyze/multimodal") async def analyze_multimodal(request: MultimodalRequest): """Combined multimodal emotion analysis.""" try: face_result = None speech_result = None text_result = None if request.text: analyze, _ = get_text_model() text_result = analyze(request.text) calc = get_engagement_calc() engagement = calc( face_result=request.face_data, speech_result=request.speech_data, text_result=text_result, ) return { "face": request.face_data, "speech": request.speech_data, "text": text_result, "engagement": engagement, } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # ── Session & Performance Endpoints ─────────────────────── @app.post("/api/session/start") async def start_session(student_id: str = "default"): """Start a new monitoring session.""" session_id = db.create_session(student_id) return {"session_id": session_id, "student_id": student_id, "started_at": datetime.now().isoformat()} @app.post("/api/session/end") async def end_session(session_id: int, avg_engagement: float = 0, dominant_emotion: str = "neutral"): """End a monitoring session.""" db.end_session(session_id, avg_engagement, dominant_emotion) return {"session_id": session_id, "ended_at": datetime.now().isoformat()} @app.post("/api/session/save") async def save_session(request: SessionSaveRequest): """Save session performance data.""" session_id = db.create_session(request.student_id, start_time=request.session_start_time) db.save_performance( student_id=request.student_id, session_id=session_id, engagement_score=request.engagement_score, face_emotion=request.face_emotion, speech_emotion=request.speech_emotion, text_sentiment=request.text_sentiment, summary=request.summary, ) db.end_session(session_id, request.engagement_score, request.dominant_emotion) return {"status": "saved", "session_id": session_id} @app.get("/api/performance/{student_id}") async def get_performance(student_id: str): """Get student performance history.""" perf = db.get_student_performance(student_id) stats = db.get_overall_stats(student_id) sessions = db.get_all_sessions(student_id) return { "student_id": student_id, "performance": perf, "overall_stats": stats, "sessions": sessions, } @app.get("/api/stats/{session_id}") async def get_stats(session_id: str): """Get the 4 metric stats. If session_id is specific, drill down.""" # Always get global stats to keep Total Sessions stats = db.get_overall_stats("all") if session_id.lower() != 'all': try: sid = int(session_id) details = db.get_session_details(sid) if not details: raise HTTPException(status_code=404, detail="User Not Found") # Override metrics with session specific ones stats['avg_engagement'] = details['avg_engagement'] stats['peak_engagement'] = details['peak_engagement'] stats['min_engagement'] = details['min_engagement'] # Attach session info stats['session_info'] = { "id": sid, "date_time": details['date_time'], "duration_mins": details['duration_mins'] } except ValueError: raise HTTPException(status_code=404, detail="User Not Found") return stats @app.get("/api/sessions/latest") async def get_latest_sessions(): """Get all global sessions and performance records.""" perf = db.get_student_performance("all") sessions = db.get_all_sessions("all") return { "performance": perf, "sessions": sessions } @app.delete("/api/sessions/{session_id}") async def delete_session(session_id: int): """Delete a session completely and cascade/reindex.""" success = db.delete_session(session_id) if not success: raise HTTPException(status_code=500, detail="Failed to delete session") return {"status": "deleted", "deleted_id": session_id} @app.get("/api/sessions/export") async def export_sessions(): """Export all sessions as a CSV file.""" import re from datetime import datetime, timedelta conn = db.get_connection() # Join with student_performance to get exactly the data needed rows = conn.execute(""" SELECT s.*, p.overall_summary, p.engagement_score FROM sessions s LEFT JOIN student_performance p ON s.id = p.session_id ORDER BY s.id ASC """).fetchall() sessions = [dict(r) for r in rows] conn.close() output = io.StringIO() writer = csv.writer(output) # Headers exactly as requested by user writer.writerow([ 'session id', 'date', 'start time', 'end time', 'duration', 'engagement', 'average engagement', 'dominant emotion' ]) for s in sessions: try: start_str = s.get('start_time', '').replace('Z', '') start_dt = datetime.fromisoformat(start_str) if start_str else datetime.now() diff_sec = 0 parsed_from_summary = False # ALWAYS prioritize parsing the true duration from the summary if available if s.get('overall_summary'): match = re.search(r'(?:lasted\s*(\d+)\s*minutes|Session:\s*(\d+)min)', s['overall_summary']) if match: val = match.group(1) or match.group(2) diff_sec = int(val) * 60 parsed_from_summary = True # Fallback to timestamp delta if not parsed_from_summary: end_str = s.get('end_time', '').replace('Z', '') end_dt = datetime.fromisoformat(end_str) if end_str else start_dt if end_dt < start_dt: end_dt = start_dt diff_sec = (end_dt - start_dt).total_seconds() # Mathematically calculate the end time from the start time calculated_end_dt = start_dt + timedelta(seconds=diff_sec) # Format dates using Excel string formula to strictly prevent ######## masking date_str = f'="{start_dt.strftime("%Y-%m-%d")}"' start_time_str = f'="{start_dt.strftime("%Y-%m-%d %H:%M")}"' end_time_str = f'="{calculated_end_dt.strftime("%Y-%m-%d %H:%M")}"' if diff_sec < 60: duration_str = "0 minutes" else: duration_str = f"{round(diff_sec/60.0, 1)} minutes" except: date_str = "Error" start_time_str = "Error" end_time_str = "Error" duration_str = "0 minutes" writer.writerow([ s.get('id'), date_str, start_time_str, end_time_str, duration_str, f"{s.get('engagement_score', 0):.1f}%" if s.get('engagement_score') is not None else "0.0%", f"{s.get('avg_engagement', 0):.1f}%" if s.get('avg_engagement') is not None else "0.0%", s.get('dominant_emotion') ]) output.seek(0) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=sentinel_sessions_export.csv"} ) # ── WebSocket for Real-time Face Analysis ───────────────── @app.websocket("/ws/face") async def websocket_face(websocket: WebSocket): """ Real-time face analysis via WebSocket. Client sends base64-encoded video frames. Server returns face mesh landmarks + emotion data. """ await websocket.accept() print("[WebSocket] Face analysis client connected") process_frame, reset_mesh = get_face_mesh() predict_emotion = get_face_model() calc_engagement = get_engagement_calc() reset_mesh() from models.face_model import reset_calibration reset_calibration() # Initialize analysis state for current session motion_score = 0 emotion_result = {"emotion": "Neutral", "confidence": 0, "probabilities": {}, "engagement_score": 50, "provider": "Initializing"} engagement = {"overall_score": 50, "level": "Neutral", "factors": {}} frame_count = 0 session_id = db.create_session("default") state_total_engagement = 0 state_engagement_samples = 0 state_dominant_emotion = "neutral" try: while True: data = await websocket.receive_text() msg = json.loads(data) if msg.get("type") == "frame": frame_count += 1 # Decode base64 image img_data = base64.b64decode(msg["data"]) nparr = np.frombuffer(img_data, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: await websocket.send_json({"type": "error", "message": "Invalid frame"}) continue # Convert BGR to RGB for MediaPipe frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Process face mesh mesh_result = process_frame(frame_rgb) # Emotion analysis (every 3rd frame for performance) if frame_count % 3 == 0: # Pass the RGB frame to the PyTorch CNN alongside landmarks for crop guidance landmarks = mesh_result.get("landmarks", []) if mesh_result.get("detected") else None emotion_result = predict_emotion(frame_rgb, landmarks) # Calculate engagement engagement = calc_engagement(face_result=emotion_result) state_total_engagement += engagement.get("overall_score", 0) state_engagement_samples += 1 state_dominant_emotion = emotion_result.get("emotion", "neutral") # Log every 10th frame if frame_count % 10 == 0: db.log_emotion( session_id, "face", emotion_result.get("emotion", "neutral"), emotion_result.get("confidence", 0), {"engagement": engagement.get("overall_score", 0)} ) # Send response response = { "type": "analysis", "frame_id": frame_count, "mesh": { "detected": mesh_result["detected"], "landmarks": mesh_result.get("landmarks", []), "landmark_count": mesh_result.get("landmark_count", 0), }, "blink": mesh_result.get("blink", {}), "head_pose": mesh_result.get("head_pose", {}), "emotion": emotion_result, "engagement": engagement, } await websocket.send_json(response) elif msg.get("type") == "ping": await websocket.send_json({"type": "pong"}) elif msg.get("type") == "stop": break except WebSocketDisconnect: print("[WebSocket] Client disconnected") except Exception as e: print(f"[WebSocket] Error: {e}") traceback.print_exc() finally: # End session avg_eng = state_total_engagement / max(state_engagement_samples, 1) db.end_session(session_id, avg_eng, state_dominant_emotion) print(f"[WebSocket] Session {session_id} ended. Total frames: {frame_count}") # ── WebSocket for Real-time Speech Analysis ─────────────── @app.websocket("/ws/speech") async def websocket_speech(websocket: WebSocket): """ Real-time speech analysis via WebSocket. Client sends audio chunks. Server returns emotion + frequency visualization data. """ await websocket.accept() print("[WebSocket] Speech analysis client connected") try: while True: data = await websocket.receive_bytes() if len(data) < 1000: await websocket.send_json({ "type": "waiting", "message": "Collecting audio data..." }) continue analyze_bytes, _ = get_speech_model() result = analyze_bytes(data) await websocket.send_json({ "type": "analysis", **result, }) except WebSocketDisconnect: print("[WebSocket] Speech client disconnected") except Exception as e: print(f"[WebSocket] Speech error: {e}") traceback.print_exc() # ── Serve Frontend Static Files ─────────────────────────── FRONTEND_DIR = os.path.join(os.path.dirname(__file__), "..", "frontend") @app.get("/") async def serve_index(): return FileResponse(os.path.join(FRONTEND_DIR, "index.html")) @app.get("/live") @app.get("/live.html") async def serve_live(): return FileResponse(os.path.join(FRONTEND_DIR, "live.html")) @app.get("/scan") @app.get("/scan.html") async def serve_scan(): return FileResponse(os.path.join(FRONTEND_DIR, "scan.html")) @app.get("/stats") @app.get("/stats.html") async def serve_stats(): return FileResponse(os.path.join(FRONTEND_DIR, "stats.html")) # Mount static files (CSS, JS, images) if os.path.exists(os.path.join(FRONTEND_DIR, "css")): app.mount("/css", StaticFiles(directory=os.path.join(FRONTEND_DIR, "css")), name="css") if os.path.exists(os.path.join(FRONTEND_DIR, "js")): app.mount("/js", StaticFiles(directory=os.path.join(FRONTEND_DIR, "js")), name="js") if os.path.exists(os.path.join(FRONTEND_DIR, "assets")): app.mount("/assets", StaticFiles(directory=os.path.join(FRONTEND_DIR, "assets")), name="assets") if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)