"""EmoSphere -- HuggingFace Spaces Demo (Streamlit). Trial-gated multimodal emotion recognition demo with fuzzy fusion. Run: streamlit run app.py Flow: 1. Landing page with email registration 2. 6-digit code verification 3. Demo: Upload video OR camera+mic quick capture 4. Full multimodal analysis (face, voice, speech, posture) 5. Session report with emotion timeline 6. Trial-ended screen with contact info """ from __future__ import annotations import time import io from datetime import datetime from collections import deque from pathlib import Path import numpy as np import streamlit as st try: import cv2 HAS_CV2 = True except ImportError: HAS_CV2 = False try: from PIL import Image HAS_PIL = True except ImportError: HAS_PIL = False import base64 import streamlit.components.v1 as components_lib import os as _os # Custom webcam component (no WebRTC needed) _WEBCAM_DIR = _os.path.join(_os.path.dirname(_os.path.abspath(__file__)), "webcam_component") _webcam_component_func = None if _os.path.isdir(_WEBCAM_DIR): _webcam_component_func = components_lib.declare_component("webcam_capture", path=_WEBCAM_DIR) def webcam_capture(key="webcam"): """Custom webcam component — captures video frames + audio, sends to Python.""" if _webcam_component_func is None: st.error("Webcam component not found.") return None return _webcam_component_func(key=key, default=None) from models import EmotionLabel, EMOTION_LABELS, CulturalRegion, EMOTION_EMOJI from face_detector import FaceEmotionDetector from text_detector import TextEmotionDetector from posture_detector import PostureEmotionDetector from auth import ( validate_email, is_email_used, mark_email_used, generate_code, send_code_email, smtp_configured, get_remaining_seconds, is_trial_expired, TRIAL_DURATION_SECONDS, VIDEO_MAX_DURATION_SECONDS, CAMERA_MAX_DURATION_SECONDS, ) # ===================================================================== # Page Config # ===================================================================== st.set_page_config( page_title="EmoSphere", page_icon="\U0001f300", layout="wide", initial_sidebar_state="collapsed", ) # ===================================================================== # Brand Constants # ===================================================================== EMOTION_COLORS = { EmotionLabel.JOY: "#FFD700", EmotionLabel.SADNESS: "#4A90D9", EmotionLabel.SURPRISE: "#FF8C00", EmotionLabel.FEAR: "#8B5CF6", EmotionLabel.DISGUST: "#10B981", EmotionLabel.NEUTRAL: "#94A3B8", EmotionLabel.LOVE: "#F472B6", EmotionLabel.CALM: "#67E8F9", } CULTURAL_OPTIONS = { "universal": "Universal", "western": "Western", "east_asian": "East Asian", "south_asian": "South Asian", "middle_eastern": "Middle Eastern", "african": "African", "latin_american": "Latin American", } # ===================================================================== # Global CSS # ===================================================================== GLOBAL_CSS = """ """ st.markdown(GLOBAL_CSS, unsafe_allow_html=True) # ===================================================================== # Session State Initialization # ===================================================================== # Secret bypass: ?bypass=cait2026 skips auth _qp = st.query_params if "auth_stage" not in st.session_state: if _qp.get("bypass") == "cait2026": st.session_state.auth_stage = "demo" st.session_state.demo_start_time = time.time() + 99999 else: st.session_state.auth_stage = "email" # email | code | demo | ended if "auth_email" not in st.session_state: st.session_state.auth_email = "" if "auth_code" not in st.session_state: st.session_state.auth_code = "" if "auth_code_shown" not in st.session_state: st.session_state.auth_code_shown = False if "auth_email_pending" not in st.session_state: st.session_state.auth_email_pending = False if "demo_start_time" not in st.session_state: st.session_state.demo_start_time = None if "history" not in st.session_state: st.session_state.history = deque(maxlen=50) if "show_report" not in st.session_state: st.session_state.show_report = False if "video_processing" not in st.session_state: st.session_state.video_processing = False # ── Background email send (runs after rerun so UI stays responsive) ── if st.session_state.get("auth_email_pending") and st.session_state.auth_stage == "code": st.session_state.auth_email_pending = False _email = st.session_state.auth_email _code = st.session_state.auth_code print(f"[Auth] Attempting to send code to {_email}...") try: sent, _msg = send_code_email(_email, _code) print(f"[Auth] send_code_email returned: sent={sent}, msg={_msg}") if sent: st.session_state.auth_email_sent = True else: st.session_state.auth_email_error = _msg except Exception as _exc: print(f"[Auth] send_code_email exception: {_exc}") st.session_state.auth_email_error = str(_exc) # ===================================================================== # Model Loader (cached) # ===================================================================== @st.cache_resource def load_face_detector(): det = FaceEmotionDetector() det.load() return det @st.cache_resource def load_text_detector(): det = TextEmotionDetector() det.load() return det @st.cache_resource def load_posture_detector(): det = PostureEmotionDetector() det.load() return det @st.cache_resource def load_live_processor(): from live_processor import LiveSessionProcessor proc = LiveSessionProcessor() proc.initialize() return proc # ===================================================================== # Helper: Render Functions # ===================================================================== def render_emotion_bars(scores): sorted_emotions = sorted(scores.items(), key=lambda x: -x[1]) html_parts = [] for label, score in sorted_emotions: color = EMOTION_COLORS.get(label, "#94A3B8") emoji = EMOTION_EMOJI.get(label, "") pct = max(score * 100, 2) html_parts.append( '
' '
' '{} {} - {:.1f}%' '
'.format(pct, color, emoji, label.value, score * 100) ) st.markdown("".join(html_parts), unsafe_allow_html=True) def render_emotion_bubble(label, score): color = EMOTION_COLORS.get(label, "#94A3B8") emoji = EMOTION_EMOJI.get(label, "") st.markdown( '
' '
' '{}' '{:.0f}%' '
' '
{}
' '
'.format( color, color, color, color, emoji, color, score * 100, color, label.value ), unsafe_allow_html=True, ) def render_countdown_bar(remaining): pct = (remaining / TRIAL_DURATION_SECONDS) * 100 if remaining > 15: color = "#00D4FF" elif remaining > 5: color = "#FF4444" else: color = "#FF0000" mins = int(remaining) // 60 secs = int(remaining) % 60 st.markdown( '
' '
' '
' '
' 'Trial: {}:{:02d}' '
'.format(pct, color, color, mins, secs), unsafe_allow_html=True, ) # ===================================================================== # AUTH GATE: Stage 1 -- Email Input # ===================================================================== def _get_logo_b64(): """Load logo as base64 for HTML embedding.""" import base64 logo_path = Path(__file__).parent / "logo.png" if logo_path.exists(): return base64.b64encode(logo_path.read_bytes()).decode() return None def show_landing_page(): logo_b64 = _get_logo_b64() if logo_b64: st.markdown( f'
' f'' f'
', unsafe_allow_html=True, ) st.markdown( '

' 'Multimodal & cultural-aware emotion recognition — AI detection of emotions from face, text, posture and voice.' '

', unsafe_allow_html=True, ) col_l, col_c, col_r = st.columns([1, 2, 1]) with col_c: st.markdown( '
' '

Request Demo Access

' '

' 'Enter your email to receive a one-time access code.
' 'Each email grants a single 60-second demo session.' '

', unsafe_allow_html=True, ) with st.form("email_form", clear_on_submit=False): email = st.text_input( "Email address", placeholder="you@example.com", key="email_input", ) submitted = st.form_submit_button("Send Access Code", use_container_width=True, type="primary") if submitted: if not email or not email.strip(): st.error("Please enter an email address.") elif not validate_email(email): st.error("Please enter a valid email address.") elif is_email_used(email): st.error( "This email has already been used for a demo session. " "Contact info@caitcore.com for full access." ) else: code = generate_code() st.session_state.auth_email = email.strip().lower() st.session_state.auth_code = code st.session_state.auth_stage = "code" st.session_state.auth_email_pending = True st.rerun() st.markdown( '
' '

EmoSphere v1.0 by ' 'CAIT

' '

No surveillance • No medical screening • No data storage

' '
', unsafe_allow_html=True, ) # ===================================================================== # AUTH GATE: Stage 2 -- Code Verification # ===================================================================== def show_code_verification(): logo_b64 = _get_logo_b64() if logo_b64: st.markdown( f'
' f'' f'
', unsafe_allow_html=True, ) col_l, col_c, col_r = st.columns([1, 2, 1]) with col_c: st.markdown( '
' '

Enter Access Code

' '

' 'A 6-digit code was sent to ' '{}' '

'.format(st.session_state.auth_email), unsafe_allow_html=True, ) with st.form("code_form", clear_on_submit=False): code_input = st.text_input( "6-digit code", placeholder="123456", max_chars=6, key="code_input", ) verified = st.form_submit_button("Verify & Start Demo", use_container_width=True, type="primary") if verified: if code_input.strip() == st.session_state.auth_code: mark_email_used(st.session_state.auth_email) st.session_state.demo_start_time = time.time() st.session_state.auth_stage = "demo" st.rerun() else: st.error("Invalid code. Please try again.") if st.button("Back", use_container_width=True): st.session_state.auth_stage = "email" st.session_state.auth_code = "" st.rerun() # ===================================================================== # AUTH GATE: Stage 4 -- Trial Ended # ===================================================================== def show_trial_ended(): st.markdown( '
' '
🌀
' '
', unsafe_allow_html=True, ) st.markdown( '
' '' '
', unsafe_allow_html=True, ) col_l, col_c, col_r = st.columns([1, 2, 1]) with col_c: st.markdown( '
' '
' '

Trial Complete

' '

' 'Your 60-second demo session has ended.
' 'Thank you for trying EmoSphere.' '

' '
' '

' 'Want full access?' '

' '

' 'Contact us for enterprise licensing, API access, and custom integrations.' '

' '' 'Contact info@caitcore.com' '' '
' '

' 'EmoSphere v1.0 • Multimodal Emotion AI for Psychotherapy' '

' '
', unsafe_allow_html=True, ) # ===================================================================== # AUTH GATE: Stage 3 -- Demo (time-limited) # ===================================================================== def show_demo(): """Show the demo, guarded by a 60-second timer.""" start = st.session_state.demo_start_time # Allow report viewing even after trial expires has_report = st.session_state.get("show_report", False) if start is None: st.session_state.auth_stage = "ended" st.rerun() return if is_trial_expired(start) and not has_report: st.session_state.auth_stage = "ended" st.rerun() return remaining = get_remaining_seconds(start) if not has_report: render_countdown_bar(remaining) # Load the live processor processor = load_live_processor() with st.sidebar: st.markdown("### EmoSphere Demo") mins = int(remaining) // 60 secs = int(remaining) % 60 st.markdown("**Time remaining:** {}:{:02d}".format(mins, secs)) st.divider() st.markdown("### How it works") st.markdown( "1. Click **START** on the video stream\n" "2. All 4 modalities analyzed in real-time\n" "3. Fused with **Mamdani fuzzy logic**\n" "4. Click **Stop & View Report** when done" ) st.divider() st.markdown("### Modalities") st.markdown( "🧑 **Face** — ViT expression \n" "🎙 **Voice** — Wav2Vec2 prosody \n" "💬 **Speech** — DistilRoBERTa NLP \n" "🧍 **Posture/Gesture** — MediaPipe pose + hands" ) # Header st.markdown( '
' '' '

EmoSphere — Live Emotion Analysis

' '

' 'Multimodal AI emotion detection with fuzzy fusion ' '— face, voice, speech & posture

' '
', unsafe_allow_html=True, ) # Show report if ready if st.session_state.get("show_report"): from session_report import render_session_report render_session_report(processor) if st.button("Done — End Trial", use_container_width=True, type="primary"): st.session_state.show_report = False st.session_state.auth_stage = "ended" st.rerun() return # Show video processing screen if st.session_state.get("video_processing"): _show_video_processing(processor, start) return # ── Primary: Live Streaming ────────────────────────────────────── _show_live_session(processor, remaining, start) # ── Secondary: Video Upload ────────────────────────────────────── st.divider() st.markdown( '
' '

Or Upload a Video

' '

Upload a short video (MP4, max 60s) for full multimodal analysis.

' '
', unsafe_allow_html=True, ) uploaded_video = st.file_uploader( "Choose video file", type=["mp4", "webm", "avi", "mov", "mkv"], key="video_upload", label_visibility="collapsed", ) if uploaded_video is not None: st.video(uploaded_video) if st.button("🔍 Analyze Video", type="primary", use_container_width=True): video_bytes = uploaded_video.read() st.session_state.video_bytes = video_bytes st.session_state.video_processing = True st.rerun() # Check trial expiry (but allow report viewing) if is_trial_expired(start) and not st.session_state.get("show_report", False): if processor.is_active: processor.stop_session() # Auto-generate report instead of going to "ended" st.session_state.show_report = True st.rerun() else: st.session_state.auth_stage = "ended" st.rerun() def _show_live_session(processor, remaining, start): """Live session using custom webcam component (no WebRTC needed).""" # Video + Results side by side col_video, col_results = st.columns([1.6, 1]) with col_video: # Custom webcam component with built-in START/STOP + timer component_value = webcam_capture(key="webcam_live") # Handle component messages if component_value and isinstance(component_value, dict): msg_type = component_value.get("type") if msg_type == "started": if not processor.is_active: processor.start_session() st.session_state["session_started"] = True elif msg_type == "frame": if not processor.is_active: processor.start_session() st.session_state["session_started"] = True # Decode base64 JPEG and process data_url = component_value.get("data", "") if "," in data_url: try: img_b64 = data_url.split(",", 1)[1] img_bytes = base64.b64decode(img_b64) processor.process_image(img_bytes) except Exception as e: print(f"[App] Frame decode error: {e}") elif msg_type == "audio": data_url = component_value.get("data", "") if "," in data_url: try: audio_b64 = data_url.split(",", 1)[1] audio_bytes = base64.b64decode(audio_b64) whisper_lang = st.session_state.get("whisper_language", None) processor.process_audio_bytes(audio_bytes, language=whisper_lang) except Exception as e: print(f"[App] Audio decode error: {e}") elif msg_type == "stopped": if processor.is_active: processor.stop_session() st.session_state["session_started"] = False st.session_state.show_report = True st.rerun() elif msg_type == "error": st.error("Camera/mic error: " + component_value.get("message", "unknown")) with col_results: if processor.is_active: _render_live_results(processor) else: st.markdown( '
' '🎥' '

Ready to Stream

' '

' 'Click START SESSION on the left to begin ' 'your 60-second live emotion analysis.

' '
' '

' '🧑 Face • 🎙 Voice • 💬 Speech • 🧍 Posture/Gesture
' 'All fused with fuzzy logic in real-time.

' '
' '
', unsafe_allow_html=True, ) # Language selector under Ready to Stream st.markdown( '

' 'Select your speech language:

', unsafe_allow_html=True, ) lang_options = { "English": "en", "Greek": "el", "Spanish": "es", "French": "fr", "German": "de", "Italian": "it", "Portuguese": "pt", "Dutch": "nl", "Russian": "ru", "Chinese": "zh", "Japanese": "ja", "Korean": "ko", "Arabic": "ar", "Turkish": "tr", "Polish": "pl", "Swedish": "sv", "Romanian": "ro", "Bulgarian": "bg", "Serbian": "sr", "Croatian": "hr", } selected_lang = st.selectbox( "Speech language", options=list(lang_options.keys()), index=0, key="speech_language", label_visibility="collapsed", ) st.session_state["whisper_language"] = lang_options[selected_lang] # Stop button — below the columns, always visible during session if processor.is_active or st.session_state.get("session_started", False): st.markdown("") # spacer if st.button("⏹ Stop Session & View Report", type="primary", use_container_width=True, key="stop_session_btn"): if processor.is_active: processor.stop_session() st.session_state["session_started"] = False st.session_state.show_report = True st.rerun() @st.fragment(run_every=2.0) def _render_live_results(processor): """Auto-updating display of live emotion results. Refreshes every 2s.""" fused = processor.get_latest_fused() face = processor.get_latest_face() voice = processor.get_latest_voice() text = processor.get_latest_text() posture = processor.get_latest_posture() stats = processor.get_stats() transcript = processor.get_transcript() topics = processor.get_topics() if fused is None: st.markdown( '
' '🔮' '

' 'Analyzing... Speak, move, or express yourself.

' '
', unsafe_allow_html=True, ) return # Dominant emotion bubble render_emotion_bubble(fused.dominant, fused.dominant_score) # Fused emotion bars fused_scores = {s.label: s.score for s in fused.scores} render_emotion_bars(fused_scores) # Modality signals st.markdown("#### Modality Signals") mod_data = [ ("🧑 Face", face), ("🎙 Voice", voice), ("💬 Speech", text), ("🧍 Posture", posture), ] mod_colors = ["#E948A0", "#FFD700", "#00D4FF", "#10B981"] for (mod_label, mod_result), color in zip(mod_data, mod_colors): if mod_result is not None: dom = mod_result.dominant emoji = EMOTION_EMOJI.get(dom, "") conf = mod_result.confidence * 100 st.markdown( '
' '{}' '{}' '{}' '
' '
' '
' '{:.0f}%' '
'.format(mod_label, emoji, color, dom.value, conf, color, conf), unsafe_allow_html=True, ) else: st.markdown( '
' '{}' 'waiting...' '
'.format(mod_label), unsafe_allow_html=True, ) # Live transcript if transcript: st.markdown("#### Live Transcript") recent = transcript[-5:] html_parts = ['
'] for seg in recent: emoji = EMOTION_EMOJI.get(seg.emotion, "") if seg.emotion else "" mins = int(seg.timestamp) // 60 secs = int(seg.timestamp) % 60 html_parts.append( '
' '{}:{:02d} ' '{} ' '{}' '
'.format(mins, secs, emoji, seg.text) ) html_parts.append('
') st.markdown("".join(html_parts), unsafe_allow_html=True) # Topics if topics: topic_html = " ".join( '{}'.format( t.replace("_", " ").title() ) for t in topics ) st.markdown( '
' 'Topics: ' '{}
'.format(topic_html), unsafe_allow_html=True, ) # Stats st.markdown( '
' 'Frames: {} • Audio: {} • Transcript: {}' '
'.format( stats.get("video_frames", 0), stats.get("audio_chunks", 0), stats.get("transcript_segments", 0), ), unsafe_allow_html=True, ) def _show_video_processing(processor, start): """Process an uploaded video and show results.""" video_bytes = st.session_state.get("video_bytes") if not video_bytes: st.session_state.video_processing = False st.rerun() return st.markdown( '
' '⚙️' '

Analyzing Video...

' '

' 'Processing all 4 modalities (face, voice, speech, posture) with fuzzy fusion.

' '
', unsafe_allow_html=True, ) progress_bar = st.progress(0, text="Initializing...") def update_progress(pct): progress_bar.progress(min(int(pct * 100), 100), text=f"Processing... {int(pct * 100)}%") summary = processor.process_video_file(video_bytes, progress_callback=update_progress) progress_bar.progress(100, text="Complete!") # Clean up st.session_state.video_processing = False st.session_state.pop("video_bytes", None) if summary is not None: st.session_state.show_report = True st.rerun() else: st.error("Video processing failed. Please try a different video format (MP4 recommended).") if st.button("Back", use_container_width=True): st.rerun() def _schedule_rerun(remaining): """Schedule automatic page reload to update countdown and enforce expiry.""" if remaining <= 0: return interval = min(5.0, remaining) try: import streamlit.components.v1 as components js_code = ( '' ) components.html(js_code, height=0) except Exception: pass # ===================================================================== # Main Router # ===================================================================== def main(): stage = st.session_state.auth_stage if stage == "demo" and st.session_state.demo_start_time: if is_trial_expired(st.session_state.demo_start_time): st.session_state.auth_stage = "ended" stage = "ended" if stage == "email": show_landing_page() elif stage == "code": show_code_verification() elif stage == "demo": show_demo() elif stage == "ended": show_trial_ended() else: show_landing_page() if __name__ == "__main__": main()