r-vasanthkumar73-dev's picture
Deploying backend and frontend folder modules.
099d157 verified
Raw
History Blame Contribute Delete
21.4 kB
"""
The Sentinel Interface β€” FastAPI Backend
Main application with REST API + WebSocket endpoints for real-time emotion analysis.
"""
import os
import sys
import json
import base64
import asyncio
import traceback
from datetime import datetime
import io
import csv
import numpy as np
import cv2
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, UploadFile, File, Form, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse, HTMLResponse, StreamingResponse
from pydantic import BaseModel
from typing import Optional, List
# Ensure models are importable
sys.path.insert(0, os.path.dirname(__file__))
import database as db
# ── App Setup ──────────────────────────────────────────────
app = FastAPI(
title="The Sentinel Interface API",
description="Multisource Emotion Detection & Engagement Optimization for E-Learning",
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── Lazy Model Loading ─────────────────────────────────────
# Models are loaded on first use to speed up server startup
_models_loaded = {
"face": False,
"face_mesh": False,
"speech": False,
"text": False,
}
def get_face_model():
from models.face_model import predict_emotion
_models_loaded["face"] = True
return predict_emotion
def get_face_mesh():
from models.face_mesh import process_frame, reset as reset_mesh
_models_loaded["face_mesh"] = True
return process_frame, reset_mesh
def get_speech_model():
from models.speech_model import analyze_audio_bytes, analyze_audio_file
_models_loaded["speech"] = True
return analyze_audio_bytes, analyze_audio_file
def get_text_model():
from models.text_model import analyze_text, batch_analyze
_models_loaded["text"] = True
return analyze_text, batch_analyze
def get_engagement_calc():
from models.engagement import calculate_engagement
return calculate_engagement
@app.on_event("startup")
async def startup_event():
print("=" * 60)
print(" THE SENTINEL INTERFACE β€” Backend Server")
print(" Multisource Emotion Detection & Engagement Optimization")
print("=" * 60)
print(f" Frontend: {FRONTEND_DIR}")
print(f" Database: {db.DB_PATH}")
print(" API Docs: http://localhost:8000/docs")
print("=" * 60)
print("[System] Firing up core models for instant response...")
# Trigger lazy loaders to preload models into RAM before first API request hits
try:
from models.face_model import get_cnn_model
get_cnn_model()
_models_loaded["face"] = True
except Exception as e:
print(f"[System] Face model warning: {e}")
try: get_text_model()
except Exception as e:
print(f"[System] Text model warning: {e}")
try: get_speech_model()
except Exception as e:
print(f"[System] Speech model warning: {e}")
print("[System] Pre-loading complete. Systems nominal.")
# ── Pydantic Models ────────────────────────────────────────
class TextRequest(BaseModel):
text: str
student_id: Optional[str] = "default"
class MultimodalRequest(BaseModel):
face_data: Optional[dict] = None
speech_data: Optional[dict] = None
text: Optional[str] = None
student_id: Optional[str] = "default"
class SessionSaveRequest(BaseModel):
student_id: str = "default"
engagement_score: float
dominant_emotion: str = "neutral"
face_emotion: Optional[dict] = None
speech_emotion: Optional[dict] = None
text_sentiment: Optional[dict] = None
summary: str = ""
session_start_time: Optional[str] = None # ISO string from frontend
# ── REST API Endpoints ─────────────────────────────────────
@app.get("/api/health")
async def health_check():
return {
"status": "online",
"service": "The Sentinel Interface",
"version": "1.0.0",
"models_loaded": _models_loaded,
"timestamp": datetime.now().isoformat(),
}
@app.post("/api/analyze/face")
async def analyze_face(file: UploadFile = File(...)):
"""
Analyze uploaded face image using the ML Vision Transformer (ViT) model.
Same capability as the AI used in conversations β€” pre-trained on facial expression datasets.
Falls back to DeepFace, then pixel heuristic if ViT is unavailable.
"""
try:
contents = await file.read()
nparr = np.frombuffer(contents, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=400, detail="Invalid image file")
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
from models.image_emotion_model import predict_from_image
result = predict_from_image(img_rgb)
return result
except HTTPException:
raise
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/analyze/speech")
async def analyze_speech(file: UploadFile = File(...)):
"""Analyze uploaded audio file for speech emotion."""
try:
contents = await file.read()
# Save to temp file for librosa
import tempfile
suffix = os.path.splitext(file.filename)[1] if file.filename else ".wav"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(contents)
tmp_path = tmp.name
try:
_, analyze_file = get_speech_model()
result = analyze_file(tmp_path)
return result
finally:
os.unlink(tmp_path)
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/analyze/text")
async def analyze_text_endpoint(request: TextRequest):
"""Analyze text for sentiment and emotion."""
try:
analyze, _ = get_text_model()
result = analyze(request.text)
return result
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/analyze/text/batch")
async def analyze_text_batch(texts: List[str]):
"""Batch analyze multiple texts."""
try:
_, batch = get_text_model()
result = batch(texts)
return result
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/analyze/multimodal")
async def analyze_multimodal(request: MultimodalRequest):
"""Combined multimodal emotion analysis."""
try:
face_result = None
speech_result = None
text_result = None
if request.text:
analyze, _ = get_text_model()
text_result = analyze(request.text)
calc = get_engagement_calc()
engagement = calc(
face_result=request.face_data,
speech_result=request.speech_data,
text_result=text_result,
)
return {
"face": request.face_data,
"speech": request.speech_data,
"text": text_result,
"engagement": engagement,
}
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
# ── Session & Performance Endpoints ───────────────────────
@app.post("/api/session/start")
async def start_session(student_id: str = "default"):
"""Start a new monitoring session."""
session_id = db.create_session(student_id)
return {"session_id": session_id, "student_id": student_id, "started_at": datetime.now().isoformat()}
@app.post("/api/session/end")
async def end_session(session_id: int, avg_engagement: float = 0, dominant_emotion: str = "neutral"):
"""End a monitoring session."""
db.end_session(session_id, avg_engagement, dominant_emotion)
return {"session_id": session_id, "ended_at": datetime.now().isoformat()}
@app.post("/api/session/save")
async def save_session(request: SessionSaveRequest):
"""Save session performance data."""
session_id = db.create_session(request.student_id, start_time=request.session_start_time)
db.save_performance(
student_id=request.student_id,
session_id=session_id,
engagement_score=request.engagement_score,
face_emotion=request.face_emotion,
speech_emotion=request.speech_emotion,
text_sentiment=request.text_sentiment,
summary=request.summary,
)
db.end_session(session_id, request.engagement_score, request.dominant_emotion)
return {"status": "saved", "session_id": session_id}
@app.get("/api/performance/{student_id}")
async def get_performance(student_id: str):
"""Get student performance history."""
perf = db.get_student_performance(student_id)
stats = db.get_overall_stats(student_id)
sessions = db.get_all_sessions(student_id)
return {
"student_id": student_id,
"performance": perf,
"overall_stats": stats,
"sessions": sessions,
}
@app.get("/api/stats/{session_id}")
async def get_stats(session_id: str):
"""Get the 4 metric stats. If session_id is specific, drill down."""
# Always get global stats to keep Total Sessions
stats = db.get_overall_stats("all")
if session_id.lower() != 'all':
try:
sid = int(session_id)
details = db.get_session_details(sid)
if not details:
raise HTTPException(status_code=404, detail="User Not Found")
# Override metrics with session specific ones
stats['avg_engagement'] = details['avg_engagement']
stats['peak_engagement'] = details['peak_engagement']
stats['min_engagement'] = details['min_engagement']
# Attach session info
stats['session_info'] = {
"id": sid,
"date_time": details['date_time'],
"duration_mins": details['duration_mins']
}
except ValueError:
raise HTTPException(status_code=404, detail="User Not Found")
return stats
@app.get("/api/sessions/latest")
async def get_latest_sessions():
"""Get all global sessions and performance records."""
perf = db.get_student_performance("all")
sessions = db.get_all_sessions("all")
return {
"performance": perf,
"sessions": sessions
}
@app.delete("/api/sessions/{session_id}")
async def delete_session(session_id: int):
"""Delete a session completely and cascade/reindex."""
success = db.delete_session(session_id)
if not success:
raise HTTPException(status_code=500, detail="Failed to delete session")
return {"status": "deleted", "deleted_id": session_id}
@app.get("/api/sessions/export")
async def export_sessions():
"""Export all sessions as a CSV file."""
import re
from datetime import datetime, timedelta
conn = db.get_connection()
# Join with student_performance to get exactly the data needed
rows = conn.execute("""
SELECT s.*, p.overall_summary, p.engagement_score
FROM sessions s
LEFT JOIN student_performance p ON s.id = p.session_id
ORDER BY s.id ASC
""").fetchall()
sessions = [dict(r) for r in rows]
conn.close()
output = io.StringIO()
writer = csv.writer(output)
# Headers exactly as requested by user
writer.writerow([
'session id', 'date', 'start time', 'end time',
'duration', 'engagement', 'average engagement', 'dominant emotion'
])
for s in sessions:
try:
start_str = s.get('start_time', '').replace('Z', '')
start_dt = datetime.fromisoformat(start_str) if start_str else datetime.now()
diff_sec = 0
parsed_from_summary = False
# ALWAYS prioritize parsing the true duration from the summary if available
if s.get('overall_summary'):
match = re.search(r'(?:lasted\s*(\d+)\s*minutes|Session:\s*(\d+)min)', s['overall_summary'])
if match:
val = match.group(1) or match.group(2)
diff_sec = int(val) * 60
parsed_from_summary = True
# Fallback to timestamp delta
if not parsed_from_summary:
end_str = s.get('end_time', '').replace('Z', '')
end_dt = datetime.fromisoformat(end_str) if end_str else start_dt
if end_dt < start_dt: end_dt = start_dt
diff_sec = (end_dt - start_dt).total_seconds()
# Mathematically calculate the end time from the start time
calculated_end_dt = start_dt + timedelta(seconds=diff_sec)
# Format dates using Excel string formula to strictly prevent ######## masking
date_str = f'="{start_dt.strftime("%Y-%m-%d")}"'
start_time_str = f'="{start_dt.strftime("%Y-%m-%d %H:%M")}"'
end_time_str = f'="{calculated_end_dt.strftime("%Y-%m-%d %H:%M")}"'
if diff_sec < 60:
duration_str = "0 minutes"
else:
duration_str = f"{round(diff_sec/60.0, 1)} minutes"
except:
date_str = "Error"
start_time_str = "Error"
end_time_str = "Error"
duration_str = "0 minutes"
writer.writerow([
s.get('id'),
date_str,
start_time_str,
end_time_str,
duration_str,
f"{s.get('engagement_score', 0):.1f}%" if s.get('engagement_score') is not None else "0.0%",
f"{s.get('avg_engagement', 0):.1f}%" if s.get('avg_engagement') is not None else "0.0%",
s.get('dominant_emotion')
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=sentinel_sessions_export.csv"}
)
# ── WebSocket for Real-time Face Analysis ─────────────────
@app.websocket("/ws/face")
async def websocket_face(websocket: WebSocket):
"""
Real-time face analysis via WebSocket.
Client sends base64-encoded video frames.
Server returns face mesh landmarks + emotion data.
"""
await websocket.accept()
print("[WebSocket] Face analysis client connected")
process_frame, reset_mesh = get_face_mesh()
predict_emotion = get_face_model()
calc_engagement = get_engagement_calc()
reset_mesh()
from models.face_model import reset_calibration
reset_calibration()
# Initialize analysis state for current session
motion_score = 0
emotion_result = {"emotion": "Neutral", "confidence": 0, "probabilities": {}, "engagement_score": 50, "provider": "Initializing"}
engagement = {"overall_score": 50, "level": "Neutral", "factors": {}}
frame_count = 0
session_id = db.create_session("default")
state_total_engagement = 0
state_engagement_samples = 0
state_dominant_emotion = "neutral"
try:
while True:
data = await websocket.receive_text()
msg = json.loads(data)
if msg.get("type") == "frame":
frame_count += 1
# Decode base64 image
img_data = base64.b64decode(msg["data"])
nparr = np.frombuffer(img_data, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
await websocket.send_json({"type": "error", "message": "Invalid frame"})
continue
# Convert BGR to RGB for MediaPipe
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Process face mesh
mesh_result = process_frame(frame_rgb)
# Emotion analysis (every 3rd frame for performance)
if frame_count % 3 == 0:
# Pass the RGB frame to the PyTorch CNN alongside landmarks for crop guidance
landmarks = mesh_result.get("landmarks", []) if mesh_result.get("detected") else None
emotion_result = predict_emotion(frame_rgb, landmarks)
# Calculate engagement
engagement = calc_engagement(face_result=emotion_result)
state_total_engagement += engagement.get("overall_score", 0)
state_engagement_samples += 1
state_dominant_emotion = emotion_result.get("emotion", "neutral")
# Log every 10th frame
if frame_count % 10 == 0:
db.log_emotion(
session_id, "face",
emotion_result.get("emotion", "neutral"),
emotion_result.get("confidence", 0),
{"engagement": engagement.get("overall_score", 0)}
)
# Send response
response = {
"type": "analysis",
"frame_id": frame_count,
"mesh": {
"detected": mesh_result["detected"],
"landmarks": mesh_result.get("landmarks", []),
"landmark_count": mesh_result.get("landmark_count", 0),
},
"blink": mesh_result.get("blink", {}),
"head_pose": mesh_result.get("head_pose", {}),
"emotion": emotion_result,
"engagement": engagement,
}
await websocket.send_json(response)
elif msg.get("type") == "ping":
await websocket.send_json({"type": "pong"})
elif msg.get("type") == "stop":
break
except WebSocketDisconnect:
print("[WebSocket] Client disconnected")
except Exception as e:
print(f"[WebSocket] Error: {e}")
traceback.print_exc()
finally:
# End session
avg_eng = state_total_engagement / max(state_engagement_samples, 1)
db.end_session(session_id, avg_eng, state_dominant_emotion)
print(f"[WebSocket] Session {session_id} ended. Total frames: {frame_count}")
# ── WebSocket for Real-time Speech Analysis ───────────────
@app.websocket("/ws/speech")
async def websocket_speech(websocket: WebSocket):
"""
Real-time speech analysis via WebSocket.
Client sends audio chunks.
Server returns emotion + frequency visualization data.
"""
await websocket.accept()
print("[WebSocket] Speech analysis client connected")
try:
while True:
data = await websocket.receive_bytes()
if len(data) < 1000:
await websocket.send_json({
"type": "waiting",
"message": "Collecting audio data..."
})
continue
analyze_bytes, _ = get_speech_model()
result = analyze_bytes(data)
await websocket.send_json({
"type": "analysis",
**result,
})
except WebSocketDisconnect:
print("[WebSocket] Speech client disconnected")
except Exception as e:
print(f"[WebSocket] Speech error: {e}")
traceback.print_exc()
# ── Serve Frontend Static Files ───────────────────────────
FRONTEND_DIR = os.path.join(os.path.dirname(__file__), "..", "frontend")
@app.get("/")
async def serve_index():
return FileResponse(os.path.join(FRONTEND_DIR, "index.html"))
@app.get("/live")
@app.get("/live.html")
async def serve_live():
return FileResponse(os.path.join(FRONTEND_DIR, "live.html"))
@app.get("/scan")
@app.get("/scan.html")
async def serve_scan():
return FileResponse(os.path.join(FRONTEND_DIR, "scan.html"))
@app.get("/stats")
@app.get("/stats.html")
async def serve_stats():
return FileResponse(os.path.join(FRONTEND_DIR, "stats.html"))
# Mount static files (CSS, JS, images)
if os.path.exists(os.path.join(FRONTEND_DIR, "css")):
app.mount("/css", StaticFiles(directory=os.path.join(FRONTEND_DIR, "css")), name="css")
if os.path.exists(os.path.join(FRONTEND_DIR, "js")):
app.mount("/js", StaticFiles(directory=os.path.join(FRONTEND_DIR, "js")), name="js")
if os.path.exists(os.path.join(FRONTEND_DIR, "assets")):
app.mount("/assets", StaticFiles(directory=os.path.join(FRONTEND_DIR, "assets")), name="assets")
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)