"""EmoSphere -- HuggingFace Spaces Demo (Streamlit).
Trial-gated multimodal emotion recognition demo with fuzzy fusion.
Run: streamlit run app.py
Flow:
1. Landing page with email registration
2. 6-digit code verification
3. Demo: Upload video OR camera+mic quick capture
4. Full multimodal analysis (face, voice, speech, posture)
5. Session report with emotion timeline
6. Trial-ended screen with contact info
"""
from __future__ import annotations
import time
import io
from datetime import datetime
from collections import deque
from pathlib import Path
import numpy as np
import streamlit as st
try:
import cv2
HAS_CV2 = True
except ImportError:
HAS_CV2 = False
try:
from PIL import Image
HAS_PIL = True
except ImportError:
HAS_PIL = False
import base64
import streamlit.components.v1 as components_lib
import os as _os
# Custom webcam component (no WebRTC needed)
_WEBCAM_DIR = _os.path.join(_os.path.dirname(_os.path.abspath(__file__)), "webcam_component")
_webcam_component_func = None
if _os.path.isdir(_WEBCAM_DIR):
_webcam_component_func = components_lib.declare_component("webcam_capture", path=_WEBCAM_DIR)
def webcam_capture(key="webcam"):
"""Custom webcam component — captures video frames + audio, sends to Python."""
if _webcam_component_func is None:
st.error("Webcam component not found.")
return None
return _webcam_component_func(key=key, default=None)
from models import EmotionLabel, EMOTION_LABELS, CulturalRegion, EMOTION_EMOJI
from face_detector import FaceEmotionDetector
from text_detector import TextEmotionDetector
from posture_detector import PostureEmotionDetector
from auth import (
validate_email,
is_email_used,
mark_email_used,
generate_code,
send_code_email,
smtp_configured,
get_remaining_seconds,
is_trial_expired,
TRIAL_DURATION_SECONDS,
VIDEO_MAX_DURATION_SECONDS,
CAMERA_MAX_DURATION_SECONDS,
)
# =====================================================================
# Page Config
# =====================================================================
st.set_page_config(
page_title="EmoSphere",
page_icon="\U0001f300",
layout="wide",
initial_sidebar_state="collapsed",
)
# =====================================================================
# Brand Constants
# =====================================================================
EMOTION_COLORS = {
EmotionLabel.JOY: "#FFD700",
EmotionLabel.SADNESS: "#4A90D9",
EmotionLabel.SURPRISE: "#FF8C00",
EmotionLabel.FEAR: "#8B5CF6",
EmotionLabel.DISGUST: "#10B981",
EmotionLabel.NEUTRAL: "#94A3B8",
EmotionLabel.LOVE: "#F472B6",
EmotionLabel.CALM: "#67E8F9",
}
CULTURAL_OPTIONS = {
"universal": "Universal",
"western": "Western",
"east_asian": "East Asian",
"south_asian": "South Asian",
"middle_eastern": "Middle Eastern",
"african": "African",
"latin_american": "Latin American",
}
# =====================================================================
# Global CSS
# =====================================================================
GLOBAL_CSS = """
"""
st.markdown(GLOBAL_CSS, unsafe_allow_html=True)
# =====================================================================
# Session State Initialization
# =====================================================================
# Secret bypass: ?bypass=cait2026 skips auth
_qp = st.query_params
if "auth_stage" not in st.session_state:
if _qp.get("bypass") == "cait2026":
st.session_state.auth_stage = "demo"
st.session_state.demo_start_time = time.time() + 99999
else:
st.session_state.auth_stage = "email" # email | code | demo | ended
if "auth_email" not in st.session_state:
st.session_state.auth_email = ""
if "auth_code" not in st.session_state:
st.session_state.auth_code = ""
if "auth_code_shown" not in st.session_state:
st.session_state.auth_code_shown = False
if "auth_email_pending" not in st.session_state:
st.session_state.auth_email_pending = False
if "demo_start_time" not in st.session_state:
st.session_state.demo_start_time = None
if "history" not in st.session_state:
st.session_state.history = deque(maxlen=50)
if "show_report" not in st.session_state:
st.session_state.show_report = False
if "video_processing" not in st.session_state:
st.session_state.video_processing = False
# ── Background email send (runs after rerun so UI stays responsive) ──
if st.session_state.get("auth_email_pending") and st.session_state.auth_stage == "code":
st.session_state.auth_email_pending = False
_email = st.session_state.auth_email
_code = st.session_state.auth_code
print(f"[Auth] Attempting to send code to {_email}...")
try:
sent, _msg = send_code_email(_email, _code)
print(f"[Auth] send_code_email returned: sent={sent}, msg={_msg}")
if sent:
st.session_state.auth_email_sent = True
else:
st.session_state.auth_email_error = _msg
except Exception as _exc:
print(f"[Auth] send_code_email exception: {_exc}")
st.session_state.auth_email_error = str(_exc)
# =====================================================================
# Model Loader (cached)
# =====================================================================
@st.cache_resource
def load_face_detector():
det = FaceEmotionDetector()
det.load()
return det
@st.cache_resource
def load_text_detector():
det = TextEmotionDetector()
det.load()
return det
@st.cache_resource
def load_posture_detector():
det = PostureEmotionDetector()
det.load()
return det
@st.cache_resource
def load_live_processor():
from live_processor import LiveSessionProcessor
proc = LiveSessionProcessor()
proc.initialize()
return proc
# =====================================================================
# Helper: Render Functions
# =====================================================================
def render_emotion_bars(scores):
sorted_emotions = sorted(scores.items(), key=lambda x: -x[1])
html_parts = []
for label, score in sorted_emotions:
color = EMOTION_COLORS.get(label, "#94A3B8")
emoji = EMOTION_EMOJI.get(label, "")
pct = max(score * 100, 2)
html_parts.append(
'
'.format(pct, color, emoji, label.value, score * 100)
)
st.markdown("".join(html_parts), unsafe_allow_html=True)
def render_emotion_bubble(label, score):
color = EMOTION_COLORS.get(label, "#94A3B8")
emoji = EMOTION_EMOJI.get(label, "")
st.markdown(
''
'
'
'{}'
'{:.0f}%'
'
'
'
{}
'
'
'.format(
color, color, color, color, emoji, color, score * 100, color, label.value
),
unsafe_allow_html=True,
)
def render_countdown_bar(remaining):
pct = (remaining / TRIAL_DURATION_SECONDS) * 100
if remaining > 15:
color = "#00D4FF"
elif remaining > 5:
color = "#FF4444"
else:
color = "#FF0000"
mins = int(remaining) // 60
secs = int(remaining) % 60
st.markdown(
''
''
'Trial: {}:{:02d}'
'
'.format(pct, color, color, mins, secs),
unsafe_allow_html=True,
)
# =====================================================================
# AUTH GATE: Stage 1 -- Email Input
# =====================================================================
def _get_logo_b64():
"""Load logo as base64 for HTML embedding."""
import base64
logo_path = Path(__file__).parent / "logo.png"
if logo_path.exists():
return base64.b64encode(logo_path.read_bytes()).decode()
return None
def show_landing_page():
logo_b64 = _get_logo_b64()
if logo_b64:
st.markdown(
f''
f'

'
f'
',
unsafe_allow_html=True,
)
st.markdown(
''
'Multimodal & cultural-aware emotion recognition — AI detection of emotions from face, text, posture and voice.'
'
',
unsafe_allow_html=True,
)
col_l, col_c, col_r = st.columns([1, 2, 1])
with col_c:
st.markdown(
''
'
Request Demo Access
'
'
'
'Enter your email to receive a one-time access code.
'
'Each email grants a single 60-second demo session.'
'
',
unsafe_allow_html=True,
)
with st.form("email_form", clear_on_submit=False):
email = st.text_input(
"Email address",
placeholder="you@example.com",
key="email_input",
)
submitted = st.form_submit_button("Send Access Code", use_container_width=True, type="primary")
if submitted:
if not email or not email.strip():
st.error("Please enter an email address.")
elif not validate_email(email):
st.error("Please enter a valid email address.")
elif is_email_used(email):
st.error(
"This email has already been used for a demo session. "
"Contact info@caitcore.com for full access."
)
else:
code = generate_code()
st.session_state.auth_email = email.strip().lower()
st.session_state.auth_code = code
st.session_state.auth_stage = "code"
st.session_state.auth_email_pending = True
st.rerun()
st.markdown(
''
'
EmoSphere v1.0 by '
'CAIT
'
'
No surveillance • No medical screening • No data storage
'
'
',
unsafe_allow_html=True,
)
# =====================================================================
# AUTH GATE: Stage 2 -- Code Verification
# =====================================================================
def show_code_verification():
logo_b64 = _get_logo_b64()
if logo_b64:
st.markdown(
f''
f'

'
f'
',
unsafe_allow_html=True,
)
col_l, col_c, col_r = st.columns([1, 2, 1])
with col_c:
st.markdown(
''
'
Enter Access Code
'
'
'
'A 6-digit code was sent to '
'{}'
'
'.format(st.session_state.auth_email),
unsafe_allow_html=True,
)
with st.form("code_form", clear_on_submit=False):
code_input = st.text_input(
"6-digit code",
placeholder="123456",
max_chars=6,
key="code_input",
)
verified = st.form_submit_button("Verify & Start Demo", use_container_width=True, type="primary")
if verified:
if code_input.strip() == st.session_state.auth_code:
mark_email_used(st.session_state.auth_email)
st.session_state.demo_start_time = time.time()
st.session_state.auth_stage = "demo"
st.rerun()
else:
st.error("Invalid code. Please try again.")
if st.button("Back", use_container_width=True):
st.session_state.auth_stage = "email"
st.session_state.auth_code = ""
st.rerun()
# =====================================================================
# AUTH GATE: Stage 4 -- Trial Ended
# =====================================================================
def show_trial_ended():
st.markdown(
'',
unsafe_allow_html=True,
)
st.markdown(
''
'

'
'
',
unsafe_allow_html=True,
)
col_l, col_c, col_r = st.columns([1, 2, 1])
with col_c:
st.markdown(
''
'
⏰
'
'
Trial Complete
'
'
'
'Your 60-second demo session has ended.
'
'Thank you for trying EmoSphere.'
'
'
'
'
'
'
'EmoSphere v1.0 • Multimodal Emotion AI for Psychotherapy'
'
'
'
',
unsafe_allow_html=True,
)
# =====================================================================
# AUTH GATE: Stage 3 -- Demo (time-limited)
# =====================================================================
def show_demo():
"""Show the demo, guarded by a 60-second timer."""
start = st.session_state.demo_start_time
# Allow report viewing even after trial expires
has_report = st.session_state.get("show_report", False)
if start is None:
st.session_state.auth_stage = "ended"
st.rerun()
return
if is_trial_expired(start) and not has_report:
st.session_state.auth_stage = "ended"
st.rerun()
return
remaining = get_remaining_seconds(start)
if not has_report:
render_countdown_bar(remaining)
# Load the live processor
processor = load_live_processor()
with st.sidebar:
st.markdown("### EmoSphere Demo")
mins = int(remaining) // 60
secs = int(remaining) % 60
st.markdown("**Time remaining:** {}:{:02d}".format(mins, secs))
st.divider()
st.markdown("### How it works")
st.markdown(
"1. Click **START** on the video stream\n"
"2. All 4 modalities analyzed in real-time\n"
"3. Fused with **Mamdani fuzzy logic**\n"
"4. Click **Stop & View Report** when done"
)
st.divider()
st.markdown("### Modalities")
st.markdown(
"🧑 **Face** — ViT expression \n"
"🎙 **Voice** — Wav2Vec2 prosody \n"
"💬 **Speech** — DistilRoBERTa NLP \n"
"🧍 **Posture/Gesture** — MediaPipe pose + hands"
)
# Header
st.markdown(
''
'

'
'
EmoSphere — Live Emotion Analysis
'
'
'
'Multimodal AI emotion detection with fuzzy fusion '
'— face, voice, speech & posture
'
'
',
unsafe_allow_html=True,
)
# Show report if ready
if st.session_state.get("show_report"):
from session_report import render_session_report
render_session_report(processor)
if st.button("Done — End Trial", use_container_width=True, type="primary"):
st.session_state.show_report = False
st.session_state.auth_stage = "ended"
st.rerun()
return
# Show video processing screen
if st.session_state.get("video_processing"):
_show_video_processing(processor, start)
return
# ── Primary: Live Streaming ──────────────────────────────────────
_show_live_session(processor, remaining, start)
# ── Secondary: Video Upload ──────────────────────────────────────
st.divider()
st.markdown(
''
'
Or Upload a Video
'
'
Upload a short video (MP4, max 60s) for full multimodal analysis.
'
'
',
unsafe_allow_html=True,
)
uploaded_video = st.file_uploader(
"Choose video file",
type=["mp4", "webm", "avi", "mov", "mkv"],
key="video_upload",
label_visibility="collapsed",
)
if uploaded_video is not None:
st.video(uploaded_video)
if st.button("🔍 Analyze Video", type="primary", use_container_width=True):
video_bytes = uploaded_video.read()
st.session_state.video_bytes = video_bytes
st.session_state.video_processing = True
st.rerun()
# Check trial expiry (but allow report viewing)
if is_trial_expired(start) and not st.session_state.get("show_report", False):
if processor.is_active:
processor.stop_session()
# Auto-generate report instead of going to "ended"
st.session_state.show_report = True
st.rerun()
else:
st.session_state.auth_stage = "ended"
st.rerun()
def _show_live_session(processor, remaining, start):
"""Live session using custom webcam component (no WebRTC needed)."""
# Video + Results side by side
col_video, col_results = st.columns([1.6, 1])
with col_video:
# Custom webcam component with built-in START/STOP + timer
component_value = webcam_capture(key="webcam_live")
# Handle component messages
if component_value and isinstance(component_value, dict):
msg_type = component_value.get("type")
if msg_type == "started":
if not processor.is_active:
processor.start_session()
st.session_state["session_started"] = True
elif msg_type == "frame":
if not processor.is_active:
processor.start_session()
st.session_state["session_started"] = True
# Decode base64 JPEG and process
data_url = component_value.get("data", "")
if "," in data_url:
try:
img_b64 = data_url.split(",", 1)[1]
img_bytes = base64.b64decode(img_b64)
processor.process_image(img_bytes)
except Exception as e:
print(f"[App] Frame decode error: {e}")
elif msg_type == "audio":
data_url = component_value.get("data", "")
if "," in data_url:
try:
audio_b64 = data_url.split(",", 1)[1]
audio_bytes = base64.b64decode(audio_b64)
whisper_lang = st.session_state.get("whisper_language", None)
processor.process_audio_bytes(audio_bytes, language=whisper_lang)
except Exception as e:
print(f"[App] Audio decode error: {e}")
elif msg_type == "stopped":
if processor.is_active:
processor.stop_session()
st.session_state["session_started"] = False
st.session_state.show_report = True
st.rerun()
elif msg_type == "error":
st.error("Camera/mic error: " + component_value.get("message", "unknown"))
with col_results:
if processor.is_active:
_render_live_results(processor)
else:
st.markdown(
''
'
🎥'
'
Ready to Stream
'
'
'
'Click START SESSION on the left to begin '
'your 60-second live emotion analysis.
'
'
'
'
'
'🧑 Face • 🎙 Voice • 💬 Speech • 🧍 Posture/Gesture
'
'All fused with fuzzy logic in real-time.
'
'
'
'
',
unsafe_allow_html=True,
)
# Language selector under Ready to Stream
st.markdown(
''
'Select your speech language:
',
unsafe_allow_html=True,
)
lang_options = {
"English": "en",
"Greek": "el",
"Spanish": "es",
"French": "fr",
"German": "de",
"Italian": "it",
"Portuguese": "pt",
"Dutch": "nl",
"Russian": "ru",
"Chinese": "zh",
"Japanese": "ja",
"Korean": "ko",
"Arabic": "ar",
"Turkish": "tr",
"Polish": "pl",
"Swedish": "sv",
"Romanian": "ro",
"Bulgarian": "bg",
"Serbian": "sr",
"Croatian": "hr",
}
selected_lang = st.selectbox(
"Speech language",
options=list(lang_options.keys()),
index=0,
key="speech_language",
label_visibility="collapsed",
)
st.session_state["whisper_language"] = lang_options[selected_lang]
# Stop button — below the columns, always visible during session
if processor.is_active or st.session_state.get("session_started", False):
st.markdown("") # spacer
if st.button("⏹ Stop Session & View Report", type="primary", use_container_width=True, key="stop_session_btn"):
if processor.is_active:
processor.stop_session()
st.session_state["session_started"] = False
st.session_state.show_report = True
st.rerun()
@st.fragment(run_every=2.0)
def _render_live_results(processor):
"""Auto-updating display of live emotion results. Refreshes every 2s."""
fused = processor.get_latest_fused()
face = processor.get_latest_face()
voice = processor.get_latest_voice()
text = processor.get_latest_text()
posture = processor.get_latest_posture()
stats = processor.get_stats()
transcript = processor.get_transcript()
topics = processor.get_topics()
if fused is None:
st.markdown(
''
'
🔮'
'
'
'Analyzing... Speak, move, or express yourself.
'
'
',
unsafe_allow_html=True,
)
return
# Dominant emotion bubble
render_emotion_bubble(fused.dominant, fused.dominant_score)
# Fused emotion bars
fused_scores = {s.label: s.score for s in fused.scores}
render_emotion_bars(fused_scores)
# Modality signals
st.markdown("#### Modality Signals")
mod_data = [
("🧑 Face", face),
("🎙 Voice", voice),
("💬 Speech", text),
("🧍 Posture", posture),
]
mod_colors = ["#E948A0", "#FFD700", "#00D4FF", "#10B981"]
for (mod_label, mod_result), color in zip(mod_data, mod_colors):
if mod_result is not None:
dom = mod_result.dominant
emoji = EMOTION_EMOJI.get(dom, "")
conf = mod_result.confidence * 100
st.markdown(
''
'
{}'
'
{}'
'
{}'
'
'
'
{:.0f}%'
'
'.format(mod_label, emoji, color, dom.value, conf, color, conf),
unsafe_allow_html=True,
)
else:
st.markdown(
''
'{}'
'waiting...'
'
'.format(mod_label),
unsafe_allow_html=True,
)
# Live transcript
if transcript:
st.markdown("#### Live Transcript")
recent = transcript[-5:]
html_parts = ['']
for seg in recent:
emoji = EMOTION_EMOJI.get(seg.emotion, "") if seg.emotion else ""
mins = int(seg.timestamp) // 60
secs = int(seg.timestamp) % 60
html_parts.append(
'
'
'{}:{:02d} '
'{} '
'{}'
'
'.format(mins, secs, emoji, seg.text)
)
html_parts.append('
')
st.markdown("".join(html_parts), unsafe_allow_html=True)
# Topics
if topics:
topic_html = " ".join(
'{}'.format(
t.replace("_", " ").title()
)
for t in topics
)
st.markdown(
''
'Topics: '
'{}
'.format(topic_html),
unsafe_allow_html=True,
)
# Stats
st.markdown(
''
'Frames: {} • Audio: {} • Transcript: {}'
'
'.format(
stats.get("video_frames", 0),
stats.get("audio_chunks", 0),
stats.get("transcript_segments", 0),
),
unsafe_allow_html=True,
)
def _show_video_processing(processor, start):
"""Process an uploaded video and show results."""
video_bytes = st.session_state.get("video_bytes")
if not video_bytes:
st.session_state.video_processing = False
st.rerun()
return
st.markdown(
''
'
⚙️'
'
Analyzing Video...
'
'
'
'Processing all 4 modalities (face, voice, speech, posture) with fuzzy fusion.
'
'
',
unsafe_allow_html=True,
)
progress_bar = st.progress(0, text="Initializing...")
def update_progress(pct):
progress_bar.progress(min(int(pct * 100), 100), text=f"Processing... {int(pct * 100)}%")
summary = processor.process_video_file(video_bytes, progress_callback=update_progress)
progress_bar.progress(100, text="Complete!")
# Clean up
st.session_state.video_processing = False
st.session_state.pop("video_bytes", None)
if summary is not None:
st.session_state.show_report = True
st.rerun()
else:
st.error("Video processing failed. Please try a different video format (MP4 recommended).")
if st.button("Back", use_container_width=True):
st.rerun()
def _schedule_rerun(remaining):
"""Schedule automatic page reload to update countdown and enforce expiry."""
if remaining <= 0:
return
interval = min(5.0, remaining)
try:
import streamlit.components.v1 as components
js_code = (
''
)
components.html(js_code, height=0)
except Exception:
pass
# =====================================================================
# Main Router
# =====================================================================
def main():
stage = st.session_state.auth_stage
if stage == "demo" and st.session_state.demo_start_time:
if is_trial_expired(st.session_state.demo_start_time):
st.session_state.auth_stage = "ended"
stage = "ended"
if stage == "email":
show_landing_page()
elif stage == "code":
show_code_verification()
elif stage == "demo":
show_demo()
elif stage == "ended":
show_trial_ended()
else:
show_landing_page()
if __name__ == "__main__":
main()