| """ |
| The Sentinel Interface β FastAPI Backend |
| Main application with REST API + WebSocket endpoints for real-time emotion analysis. |
| """ |
| import os |
| import sys |
| import json |
| import base64 |
| import asyncio |
| import traceback |
| from datetime import datetime |
| import io |
| import csv |
|
|
| import numpy as np |
| import cv2 |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect, UploadFile, File, Form, HTTPException, Depends |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import FileResponse, JSONResponse, HTMLResponse, StreamingResponse |
| from pydantic import BaseModel |
| from typing import Optional, List |
|
|
| |
| sys.path.insert(0, os.path.dirname(__file__)) |
|
|
| import database as db |
|
|
| |
| app = FastAPI( |
| title="The Sentinel Interface API", |
| description="Multisource Emotion Detection & Engagement Optimization for E-Learning", |
| version="1.0.0", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| |
| _models_loaded = { |
| "face": False, |
| "face_mesh": False, |
| "speech": False, |
| "text": False, |
| } |
|
|
|
|
| def get_face_model(): |
| from models.face_model import predict_emotion |
| _models_loaded["face"] = True |
| return predict_emotion |
|
|
|
|
| def get_face_mesh(): |
| from models.face_mesh import process_frame, reset as reset_mesh |
| _models_loaded["face_mesh"] = True |
| return process_frame, reset_mesh |
|
|
|
|
| def get_speech_model(): |
| from models.speech_model import analyze_audio_bytes, analyze_audio_file |
| _models_loaded["speech"] = True |
| return analyze_audio_bytes, analyze_audio_file |
|
|
|
|
| def get_text_model(): |
| from models.text_model import analyze_text, batch_analyze |
| _models_loaded["text"] = True |
| return analyze_text, batch_analyze |
|
|
|
|
| def get_engagement_calc(): |
| from models.engagement import calculate_engagement |
| return calculate_engagement |
|
|
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| print("=" * 60) |
| print(" THE SENTINEL INTERFACE β Backend Server") |
| print(" Multisource Emotion Detection & Engagement Optimization") |
| print("=" * 60) |
| print(f" Frontend: {FRONTEND_DIR}") |
| print(f" Database: {db.DB_PATH}") |
| print(" API Docs: http://localhost:8000/docs") |
| print("=" * 60) |
| print("[System] Firing up core models for instant response...") |
| |
| try: |
| from models.face_model import get_cnn_model |
| get_cnn_model() |
| _models_loaded["face"] = True |
| except Exception as e: |
| print(f"[System] Face model warning: {e}") |
| try: get_text_model() |
| except Exception as e: |
| print(f"[System] Text model warning: {e}") |
| try: get_speech_model() |
| except Exception as e: |
| print(f"[System] Speech model warning: {e}") |
| print("[System] Pre-loading complete. Systems nominal.") |
|
|
|
|
| |
| class TextRequest(BaseModel): |
| text: str |
| student_id: Optional[str] = "default" |
|
|
|
|
| class MultimodalRequest(BaseModel): |
| face_data: Optional[dict] = None |
| speech_data: Optional[dict] = None |
| text: Optional[str] = None |
| student_id: Optional[str] = "default" |
|
|
|
|
| class SessionSaveRequest(BaseModel): |
| student_id: str = "default" |
| engagement_score: float |
| dominant_emotion: str = "neutral" |
| face_emotion: Optional[dict] = None |
| speech_emotion: Optional[dict] = None |
| text_sentiment: Optional[dict] = None |
| summary: str = "" |
| session_start_time: Optional[str] = None |
|
|
|
|
| |
|
|
| @app.get("/api/health") |
| async def health_check(): |
| return { |
| "status": "online", |
| "service": "The Sentinel Interface", |
| "version": "1.0.0", |
| "models_loaded": _models_loaded, |
| "timestamp": datetime.now().isoformat(), |
| } |
|
|
|
|
| @app.post("/api/analyze/face") |
| async def analyze_face(file: UploadFile = File(...)): |
| """ |
| Analyze uploaded face image using the ML Vision Transformer (ViT) model. |
| Same capability as the AI used in conversations β pre-trained on facial expression datasets. |
| Falls back to DeepFace, then pixel heuristic if ViT is unavailable. |
| """ |
| try: |
| contents = await file.read() |
| nparr = np.frombuffer(contents, np.uint8) |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
|
|
| if img is None: |
| raise HTTPException(status_code=400, detail="Invalid image file") |
|
|
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
|
|
| from models.image_emotion_model import predict_from_image |
| result = predict_from_image(img_rgb) |
| return result |
|
|
| except HTTPException: |
| raise |
| except Exception as e: |
| traceback.print_exc() |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
| @app.post("/api/analyze/speech") |
| async def analyze_speech(file: UploadFile = File(...)): |
| """Analyze uploaded audio file for speech emotion.""" |
| try: |
| contents = await file.read() |
|
|
| |
| import tempfile |
| suffix = os.path.splitext(file.filename)[1] if file.filename else ".wav" |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: |
| tmp.write(contents) |
| tmp_path = tmp.name |
|
|
| try: |
| _, analyze_file = get_speech_model() |
| result = analyze_file(tmp_path) |
| return result |
| finally: |
| os.unlink(tmp_path) |
|
|
| except Exception as e: |
| traceback.print_exc() |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/analyze/text") |
| async def analyze_text_endpoint(request: TextRequest): |
| """Analyze text for sentiment and emotion.""" |
| try: |
| analyze, _ = get_text_model() |
| result = analyze(request.text) |
| return result |
| except Exception as e: |
| traceback.print_exc() |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/analyze/text/batch") |
| async def analyze_text_batch(texts: List[str]): |
| """Batch analyze multiple texts.""" |
| try: |
| _, batch = get_text_model() |
| result = batch(texts) |
| return result |
| except Exception as e: |
| traceback.print_exc() |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/analyze/multimodal") |
| async def analyze_multimodal(request: MultimodalRequest): |
| """Combined multimodal emotion analysis.""" |
| try: |
| face_result = None |
| speech_result = None |
| text_result = None |
|
|
| if request.text: |
| analyze, _ = get_text_model() |
| text_result = analyze(request.text) |
|
|
| calc = get_engagement_calc() |
| engagement = calc( |
| face_result=request.face_data, |
| speech_result=request.speech_data, |
| text_result=text_result, |
| ) |
|
|
| return { |
| "face": request.face_data, |
| "speech": request.speech_data, |
| "text": text_result, |
| "engagement": engagement, |
| } |
| except Exception as e: |
| traceback.print_exc() |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| |
|
|
| @app.post("/api/session/start") |
| async def start_session(student_id: str = "default"): |
| """Start a new monitoring session.""" |
| session_id = db.create_session(student_id) |
| return {"session_id": session_id, "student_id": student_id, "started_at": datetime.now().isoformat()} |
|
|
|
|
| @app.post("/api/session/end") |
| async def end_session(session_id: int, avg_engagement: float = 0, dominant_emotion: str = "neutral"): |
| """End a monitoring session.""" |
| db.end_session(session_id, avg_engagement, dominant_emotion) |
| return {"session_id": session_id, "ended_at": datetime.now().isoformat()} |
|
|
|
|
| @app.post("/api/session/save") |
| async def save_session(request: SessionSaveRequest): |
| """Save session performance data.""" |
| session_id = db.create_session(request.student_id, start_time=request.session_start_time) |
| db.save_performance( |
| student_id=request.student_id, |
| session_id=session_id, |
| engagement_score=request.engagement_score, |
| face_emotion=request.face_emotion, |
| speech_emotion=request.speech_emotion, |
| text_sentiment=request.text_sentiment, |
| summary=request.summary, |
| ) |
| db.end_session(session_id, request.engagement_score, request.dominant_emotion) |
| return {"status": "saved", "session_id": session_id} |
|
|
|
|
| @app.get("/api/performance/{student_id}") |
| async def get_performance(student_id: str): |
| """Get student performance history.""" |
| perf = db.get_student_performance(student_id) |
| stats = db.get_overall_stats(student_id) |
| sessions = db.get_all_sessions(student_id) |
| return { |
| "student_id": student_id, |
| "performance": perf, |
| "overall_stats": stats, |
| "sessions": sessions, |
| } |
|
|
| @app.get("/api/stats/{session_id}") |
| async def get_stats(session_id: str): |
| """Get the 4 metric stats. If session_id is specific, drill down.""" |
| |
| stats = db.get_overall_stats("all") |
| |
| if session_id.lower() != 'all': |
| try: |
| sid = int(session_id) |
| details = db.get_session_details(sid) |
| if not details: |
| raise HTTPException(status_code=404, detail="User Not Found") |
| |
| |
| stats['avg_engagement'] = details['avg_engagement'] |
| stats['peak_engagement'] = details['peak_engagement'] |
| stats['min_engagement'] = details['min_engagement'] |
| |
| |
| stats['session_info'] = { |
| "id": sid, |
| "date_time": details['date_time'], |
| "duration_mins": details['duration_mins'] |
| } |
| except ValueError: |
| raise HTTPException(status_code=404, detail="User Not Found") |
| |
| return stats |
|
|
| @app.get("/api/sessions/latest") |
| async def get_latest_sessions(): |
| """Get all global sessions and performance records.""" |
| perf = db.get_student_performance("all") |
| sessions = db.get_all_sessions("all") |
| return { |
| "performance": perf, |
| "sessions": sessions |
| } |
|
|
| @app.delete("/api/sessions/{session_id}") |
| async def delete_session(session_id: int): |
| """Delete a session completely and cascade/reindex.""" |
| success = db.delete_session(session_id) |
| if not success: |
| raise HTTPException(status_code=500, detail="Failed to delete session") |
| return {"status": "deleted", "deleted_id": session_id} |
|
|
| @app.get("/api/sessions/export") |
| async def export_sessions(): |
| """Export all sessions as a CSV file.""" |
| import re |
| from datetime import datetime, timedelta |
| |
| conn = db.get_connection() |
| |
| rows = conn.execute(""" |
| SELECT s.*, p.overall_summary, p.engagement_score |
| FROM sessions s |
| LEFT JOIN student_performance p ON s.id = p.session_id |
| ORDER BY s.id ASC |
| """).fetchall() |
| sessions = [dict(r) for r in rows] |
| conn.close() |
| |
| output = io.StringIO() |
| writer = csv.writer(output) |
| |
| |
| writer.writerow([ |
| 'session id', 'date', 'start time', 'end time', |
| 'duration', 'engagement', 'average engagement', 'dominant emotion' |
| ]) |
| |
| for s in sessions: |
| try: |
| start_str = s.get('start_time', '').replace('Z', '') |
| start_dt = datetime.fromisoformat(start_str) if start_str else datetime.now() |
| |
| diff_sec = 0 |
| parsed_from_summary = False |
| |
| |
| if s.get('overall_summary'): |
| match = re.search(r'(?:lasted\s*(\d+)\s*minutes|Session:\s*(\d+)min)', s['overall_summary']) |
| if match: |
| val = match.group(1) or match.group(2) |
| diff_sec = int(val) * 60 |
| parsed_from_summary = True |
| |
| |
| if not parsed_from_summary: |
| end_str = s.get('end_time', '').replace('Z', '') |
| end_dt = datetime.fromisoformat(end_str) if end_str else start_dt |
| if end_dt < start_dt: end_dt = start_dt |
| diff_sec = (end_dt - start_dt).total_seconds() |
| |
| |
| calculated_end_dt = start_dt + timedelta(seconds=diff_sec) |
| |
| |
| date_str = f'="{start_dt.strftime("%Y-%m-%d")}"' |
| start_time_str = f'="{start_dt.strftime("%Y-%m-%d %H:%M")}"' |
| end_time_str = f'="{calculated_end_dt.strftime("%Y-%m-%d %H:%M")}"' |
| |
| if diff_sec < 60: |
| duration_str = "0 minutes" |
| else: |
| duration_str = f"{round(diff_sec/60.0, 1)} minutes" |
| except: |
| date_str = "Error" |
| start_time_str = "Error" |
| end_time_str = "Error" |
| duration_str = "0 minutes" |
|
|
| writer.writerow([ |
| s.get('id'), |
| date_str, |
| start_time_str, |
| end_time_str, |
| duration_str, |
| f"{s.get('engagement_score', 0):.1f}%" if s.get('engagement_score') is not None else "0.0%", |
| f"{s.get('avg_engagement', 0):.1f}%" if s.get('avg_engagement') is not None else "0.0%", |
| s.get('dominant_emotion') |
| ]) |
| |
| output.seek(0) |
| return StreamingResponse( |
| iter([output.getvalue()]), |
| media_type="text/csv", |
| headers={"Content-Disposition": "attachment; filename=sentinel_sessions_export.csv"} |
| ) |
|
|
|
|
| |
|
|
| @app.websocket("/ws/face") |
| async def websocket_face(websocket: WebSocket): |
| """ |
| Real-time face analysis via WebSocket. |
| Client sends base64-encoded video frames. |
| Server returns face mesh landmarks + emotion data. |
| """ |
| await websocket.accept() |
| print("[WebSocket] Face analysis client connected") |
|
|
| process_frame, reset_mesh = get_face_mesh() |
| predict_emotion = get_face_model() |
| calc_engagement = get_engagement_calc() |
| reset_mesh() |
| from models.face_model import reset_calibration |
| reset_calibration() |
|
|
| |
| motion_score = 0 |
| emotion_result = {"emotion": "Neutral", "confidence": 0, "probabilities": {}, "engagement_score": 50, "provider": "Initializing"} |
| engagement = {"overall_score": 50, "level": "Neutral", "factors": {}} |
| frame_count = 0 |
| session_id = db.create_session("default") |
| state_total_engagement = 0 |
| state_engagement_samples = 0 |
| state_dominant_emotion = "neutral" |
|
|
| try: |
| while True: |
| data = await websocket.receive_text() |
| msg = json.loads(data) |
|
|
| if msg.get("type") == "frame": |
| frame_count += 1 |
|
|
| |
| img_data = base64.b64decode(msg["data"]) |
| nparr = np.frombuffer(img_data, np.uint8) |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
|
|
| if frame is None: |
| await websocket.send_json({"type": "error", "message": "Invalid frame"}) |
| continue |
|
|
| |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
| |
| mesh_result = process_frame(frame_rgb) |
|
|
| |
| if frame_count % 3 == 0: |
| |
| landmarks = mesh_result.get("landmarks", []) if mesh_result.get("detected") else None |
| emotion_result = predict_emotion(frame_rgb, landmarks) |
|
|
| |
| engagement = calc_engagement(face_result=emotion_result) |
| state_total_engagement += engagement.get("overall_score", 0) |
| state_engagement_samples += 1 |
| state_dominant_emotion = emotion_result.get("emotion", "neutral") |
|
|
| |
| if frame_count % 10 == 0: |
| db.log_emotion( |
| session_id, "face", |
| emotion_result.get("emotion", "neutral"), |
| emotion_result.get("confidence", 0), |
| {"engagement": engagement.get("overall_score", 0)} |
| ) |
|
|
| |
| response = { |
| "type": "analysis", |
| "frame_id": frame_count, |
| "mesh": { |
| "detected": mesh_result["detected"], |
| "landmarks": mesh_result.get("landmarks", []), |
| "landmark_count": mesh_result.get("landmark_count", 0), |
| }, |
| "blink": mesh_result.get("blink", {}), |
| "head_pose": mesh_result.get("head_pose", {}), |
| "emotion": emotion_result, |
| "engagement": engagement, |
| } |
| await websocket.send_json(response) |
|
|
| elif msg.get("type") == "ping": |
| await websocket.send_json({"type": "pong"}) |
|
|
| elif msg.get("type") == "stop": |
| break |
|
|
| except WebSocketDisconnect: |
| print("[WebSocket] Client disconnected") |
| except Exception as e: |
| print(f"[WebSocket] Error: {e}") |
| traceback.print_exc() |
| finally: |
| |
| avg_eng = state_total_engagement / max(state_engagement_samples, 1) |
| db.end_session(session_id, avg_eng, state_dominant_emotion) |
| print(f"[WebSocket] Session {session_id} ended. Total frames: {frame_count}") |
|
|
|
|
| |
|
|
| @app.websocket("/ws/speech") |
| async def websocket_speech(websocket: WebSocket): |
| """ |
| Real-time speech analysis via WebSocket. |
| Client sends audio chunks. |
| Server returns emotion + frequency visualization data. |
| """ |
| await websocket.accept() |
| print("[WebSocket] Speech analysis client connected") |
|
|
| try: |
| while True: |
| data = await websocket.receive_bytes() |
|
|
| if len(data) < 1000: |
| await websocket.send_json({ |
| "type": "waiting", |
| "message": "Collecting audio data..." |
| }) |
| continue |
|
|
| analyze_bytes, _ = get_speech_model() |
| result = analyze_bytes(data) |
|
|
| await websocket.send_json({ |
| "type": "analysis", |
| **result, |
| }) |
|
|
| except WebSocketDisconnect: |
| print("[WebSocket] Speech client disconnected") |
| except Exception as e: |
| print(f"[WebSocket] Speech error: {e}") |
| traceback.print_exc() |
|
|
|
|
| |
|
|
| FRONTEND_DIR = os.path.join(os.path.dirname(__file__), "..", "frontend") |
|
|
| @app.get("/") |
| async def serve_index(): |
| return FileResponse(os.path.join(FRONTEND_DIR, "index.html")) |
|
|
| @app.get("/live") |
| @app.get("/live.html") |
| async def serve_live(): |
| return FileResponse(os.path.join(FRONTEND_DIR, "live.html")) |
|
|
| @app.get("/scan") |
| @app.get("/scan.html") |
| async def serve_scan(): |
| return FileResponse(os.path.join(FRONTEND_DIR, "scan.html")) |
|
|
| @app.get("/stats") |
| @app.get("/stats.html") |
| async def serve_stats(): |
| return FileResponse(os.path.join(FRONTEND_DIR, "stats.html")) |
|
|
| |
| if os.path.exists(os.path.join(FRONTEND_DIR, "css")): |
| app.mount("/css", StaticFiles(directory=os.path.join(FRONTEND_DIR, "css")), name="css") |
| if os.path.exists(os.path.join(FRONTEND_DIR, "js")): |
| app.mount("/js", StaticFiles(directory=os.path.join(FRONTEND_DIR, "js")), name="js") |
| if os.path.exists(os.path.join(FRONTEND_DIR, "assets")): |
| app.mount("/assets", StaticFiles(directory=os.path.join(FRONTEND_DIR, "assets")), name="assets") |
|
|
|
|
|
|
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) |
|
|