Spaces:
Running
Running
| """EmoSphere Live Session Processor — real-time multimodal processing. | |
| Designed for use with streamlit-webrtc. Handles: | |
| - Video frame processing (face + posture at ~2fps) | |
| - Audio chunk accumulation and processing (3-second windows) | |
| - Speech-to-text via faster-whisper (base, multilingual — 99 languages) | |
| - Fuzzy fusion after each processing cycle | |
| - Thread-safe shared state for Streamlit display | |
| - Emotion timeline tracking | |
| - Keyword-based topic/trigger extraction | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import time | |
| import threading | |
| from collections import deque | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| import numpy as np | |
| from models import ( | |
| EmotionLabel, | |
| EMOTION_LABELS, | |
| EmotionScore, | |
| EmotionDetectionResult, | |
| FusedDetectionResult, | |
| CulturalRegion, | |
| ) | |
| try: | |
| from PIL import Image | |
| HAS_PIL = True | |
| except ImportError: | |
| HAS_PIL = False | |
| try: | |
| import cv2 | |
| HAS_CV2 = True | |
| except ImportError: | |
| HAS_CV2 = False | |
| # ===================================================================== | |
| # Data Structures | |
| # ===================================================================== | |
| class TranscriptSegment: | |
| text: str | |
| timestamp: float # seconds from session start | |
| emotion: Optional[EmotionLabel] = None | |
| confidence: float = 0.0 | |
| class TimelineEntry: | |
| timestamp: float # seconds from session start | |
| fused_result: FusedDetectionResult | |
| transcript: str = "" | |
| topics: list[str] = field(default_factory=list) | |
| fired_rules: list[str] = field(default_factory=list) | |
| class SessionSummary: | |
| duration_seconds: float | |
| total_video_frames: int | |
| total_audio_chunks: int | |
| total_transcript_segments: int | |
| dominant_emotion: EmotionLabel | |
| emotion_distribution: dict[str, float] | |
| modality_contribution: dict[str, float] | |
| topics_detected: list[str] | |
| emotional_shifts: list[dict] | |
| peaks: list[dict] | |
| # ===================================================================== | |
| # Keyword-based Topic Extractor (lightweight, no KeyBERT) | |
| # ===================================================================== | |
| # Reuse the keyword lists from text_detector.py for topic/trigger detection | |
| TOPIC_KEYWORDS: dict[str, list[str]] = { | |
| "happiness": ["happy", "glad", "excited", "wonderful", "great", "amazing", "awesome", | |
| "fantastic", "smile", "laugh", "fun", "enjoy", "delighted", "thrilled"], | |
| "grief": ["sad", "unhappy", "depressed", "lonely", "miss", "cry", "tears", | |
| "heartbreak", "grief", "loss", "mourning"], | |
| "anxiety": ["afraid", "scared", "worried", "anxious", "nervous", "terrified", | |
| "panic", "dread", "uneasy", "stressed", "overwhelmed", "tense"], | |
| "relationships": ["love", "adore", "partner", "together", "family", "friend", | |
| "relationship", "marriage", "breakup", "divorce", "trust"], | |
| "work": ["work", "job", "boss", "career", "office", "deadline", "project", | |
| "meeting", "fired", "promoted", "colleague", "salary"], | |
| "health": ["health", "sick", "pain", "doctor", "hospital", "tired", | |
| "sleep", "insomnia", "medication", "therapy", "diagnosis"], | |
| "self_esteem": ["worthless", "failure", "ugly", "stupid", "hopeless", | |
| "useless", "inadequate", "ashamed", "embarrassed", "confident"], | |
| "change": ["change", "different", "new", "suddenly", "unexpected", | |
| "surprise", "shocking", "unbelievable", "transform"], | |
| "calm": ["calm", "peaceful", "relaxed", "serene", "tranquil", | |
| "meditate", "breathe", "mindful", "quiet", "gentle"], | |
| "conflict": ["angry", "furious", "annoyed", "frustrated", "argue", | |
| "fight", "conflict", "hate", "rage", "resent"], | |
| } | |
| def extract_topics(text: str) -> list[str]: | |
| """Extract topic/trigger keywords from text using simple lexicon matching.""" | |
| lower = text.lower() | |
| found = [] | |
| for topic, keywords in TOPIC_KEYWORDS.items(): | |
| if any(kw in lower for kw in keywords): | |
| found.append(topic) | |
| return found | |
| # ===================================================================== | |
| # Live Session Processor | |
| # ===================================================================== | |
| class LiveSessionProcessor: | |
| """Thread-safe processor for real-time multimodal emotion analysis. | |
| Integrates with streamlit-webrtc callbacks for video and audio. | |
| Maintains shared state that Streamlit can poll for display updates. | |
| """ | |
| def __init__(self): | |
| self._lock = threading.Lock() | |
| # Detectors (loaded lazily) | |
| self._face_det = None | |
| self._voice_det = None | |
| self._text_det = None | |
| self._posture_det = None | |
| self._fusion = None | |
| self._whisper_model = None | |
| # Processing state | |
| self._session_active = False | |
| self._session_start: float = 0.0 | |
| self._last_video_process: float = 0.0 | |
| self._video_frame_count: int = 0 | |
| self._posture_frame_count: int = 0 | |
| # Audio accumulation | |
| self._audio_buffer: list[np.ndarray] = [] | |
| self._audio_sample_rate: int = 16000 | |
| self._last_audio_process: float = 0.0 | |
| self._audio_chunk_count: int = 0 | |
| # Latest results per modality | |
| self._face_result: Optional[EmotionDetectionResult] = None | |
| self._voice_result: Optional[EmotionDetectionResult] = None | |
| self._text_result: Optional[EmotionDetectionResult] = None | |
| self._posture_result: Optional[EmotionDetectionResult] = None | |
| self._fused_result: Optional[FusedDetectionResult] = None | |
| # Timeline and transcript | |
| self._timeline: list[TimelineEntry] = [] | |
| self._transcript: list[TranscriptSegment] = [] | |
| self._topics_seen: list[str] = [] | |
| # Modality processing time accumulators | |
| self._modality_times: dict[str, list[float]] = { | |
| "face": [], "voice": [], "text": [], "posture": [], | |
| } | |
| # ── Initialization ────────────────────────────────────────────── | |
| def initialize(self): | |
| """Load all detectors and models. Call once at startup.""" | |
| from face_detector import FaceEmotionDetector | |
| from voice_detector import VoiceEmotionDetector | |
| from text_detector import TextEmotionDetector | |
| from posture_detector import PostureEmotionDetector | |
| from fuzzy_fusion import FuzzyFusionEngine | |
| self._face_det = FaceEmotionDetector() | |
| self._face_det.load() | |
| self._voice_det = VoiceEmotionDetector() | |
| self._voice_det.load() | |
| self._text_det = TextEmotionDetector() | |
| self._text_det.load() | |
| self._posture_det = PostureEmotionDetector() | |
| self._posture_det.load() | |
| self._fusion = FuzzyFusionEngine() | |
| # Load faster-whisper for STT (base multilingual — good accuracy, fast on CPU) | |
| try: | |
| from faster_whisper import WhisperModel | |
| self._whisper_model = WhisperModel( | |
| "base", device="cpu", compute_type="int8" | |
| ) | |
| print("[LiveProcessor] faster-whisper base (multilingual) loaded") | |
| except ImportError: | |
| print("[LiveProcessor] faster-whisper not available, STT disabled") | |
| self._whisper_model = None | |
| except Exception as e: | |
| print(f"[LiveProcessor] Whisper load error: {e}") | |
| self._whisper_model = None | |
| print("[LiveProcessor] All detectors initialized") | |
| # ── Session Control ───────────────────────────────────────────── | |
| def start_session(self): | |
| """Start a new live session.""" | |
| with self._lock: | |
| self._session_active = True | |
| self._session_start = time.time() | |
| self._last_video_process = 0.0 | |
| self._last_audio_process = 0.0 | |
| self._video_frame_count = 0 | |
| self._posture_frame_count = 0 | |
| self._audio_chunk_count = 0 | |
| self._audio_buffer = [] | |
| self._face_result = None | |
| self._voice_result = None | |
| self._text_result = None | |
| self._posture_result = None | |
| self._fused_result = None | |
| self._timeline = [] | |
| self._transcript = [] | |
| self._topics_seen = [] | |
| self._modality_times = { | |
| "face": [], "voice": [], "text": [], "posture": [], | |
| } | |
| def stop_session(self): | |
| """Stop the current session.""" | |
| with self._lock: | |
| self._session_active = False | |
| def is_active(self) -> bool: | |
| return self._session_active | |
| def elapsed_seconds(self) -> float: | |
| if not self._session_active: | |
| return 0.0 | |
| return time.time() - self._session_start | |
| # ── Direct Image/Audio Processing (for custom webcam component) ── | |
| def process_image(self, image_bytes: bytes): | |
| """Process a single image for face + posture/gesture detection (non-blocking).""" | |
| if not self._session_active: | |
| return | |
| # Run in background thread to avoid blocking Streamlit UI | |
| t = threading.Thread( | |
| target=self._process_image_worker, | |
| args=(image_bytes,), | |
| daemon=True, | |
| ) | |
| t.start() | |
| def _process_image_worker(self, image_bytes: bytes): | |
| """Background worker for image processing (face + posture/gesture).""" | |
| self._video_frame_count += 1 | |
| # Face emotion detection | |
| if self._face_det is not None: | |
| try: | |
| result = self._face_det.detect(image_bytes) | |
| with self._lock: | |
| self._face_result = result | |
| self._modality_times["face"].append(result.processing_time_ms) | |
| except Exception as e: | |
| print(f"[LiveProcessor] Face detect error: {e}") | |
| # Posture/gesture detection every 2nd frame | |
| if self._video_frame_count % 2 == 0 and self._posture_det is not None: | |
| try: | |
| self._posture_frame_count += 1 | |
| p_result = self._posture_det.detect(image_bytes) | |
| with self._lock: | |
| self._posture_result = p_result | |
| self._modality_times["posture"].append(p_result.processing_time_ms) | |
| except Exception as e: | |
| print(f"[LiveProcessor] Posture detect error: {e}") | |
| self._run_fusion() | |
| def _convert_webm_to_wav(self, audio_bytes: bytes) -> Optional[str]: | |
| """Convert webm/opus audio bytes to a WAV file via ffmpeg. Returns wav path or None.""" | |
| import tempfile, subprocess, os | |
| tmp_in = None | |
| try: | |
| tmp_in = tempfile.NamedTemporaryFile(suffix=".webm", delete=False) | |
| tmp_in.write(audio_bytes) | |
| tmp_in.flush() | |
| tmp_in.close() | |
| tmp_out = tmp_in.name.replace(".webm", ".wav") | |
| result = subprocess.run( | |
| ["ffmpeg", "-y", "-i", tmp_in.name, | |
| "-ar", "16000", "-ac", "1", "-f", "wav", tmp_out], | |
| capture_output=True, timeout=10, | |
| ) | |
| os.unlink(tmp_in.name) | |
| if result.returncode == 0 and os.path.exists(tmp_out): | |
| return tmp_out | |
| print(f"[LiveProcessor] ffmpeg failed: {result.stderr[:200] if result.stderr else 'unknown'}") | |
| try: | |
| os.unlink(tmp_out) | |
| except Exception: | |
| pass | |
| except Exception as e: | |
| print(f"[LiveProcessor] webm→wav conversion error: {e}") | |
| if tmp_in: | |
| try: | |
| os.unlink(tmp_in.name) | |
| except Exception: | |
| pass | |
| return None | |
| def process_audio_bytes(self, audio_bytes: bytes, language: str = None): | |
| """Process raw audio bytes for voice emotion + STT (non-blocking via thread).""" | |
| if not self._session_active: | |
| return | |
| # Run audio processing in background thread to avoid blocking Streamlit UI | |
| t = threading.Thread( | |
| target=self._process_audio_worker, | |
| args=(audio_bytes, language), | |
| daemon=True, | |
| ) | |
| t.start() | |
| def _process_audio_worker(self, audio_bytes: bytes, language: str = None): | |
| """Background worker for audio processing (voice emotion + STT).""" | |
| import os | |
| self._audio_chunk_count += 1 | |
| # Convert webm to wav once — reuse for both voice detection and STT | |
| wav_path = self._convert_webm_to_wav(audio_bytes) | |
| # Voice emotion detection | |
| if self._voice_det is not None: | |
| try: | |
| # Use converted WAV if available, otherwise raw bytes | |
| audio_for_voice = open(wav_path, "rb").read() if wav_path else audio_bytes | |
| result = self._voice_det.detect(audio_for_voice) | |
| with self._lock: | |
| self._voice_result = result | |
| self._modality_times["voice"].append(result.processing_time_ms) | |
| print(f"[LiveProcessor] Voice: {result.dominant.value} ({result.dominant_score:.2f})") | |
| except Exception as e: | |
| print(f"[LiveProcessor] Voice detect error: {e}") | |
| # Speech-to-text via faster-whisper | |
| if self._whisper_model is not None and wav_path: | |
| try: | |
| segments, _info = self._whisper_model.transcribe( | |
| wav_path, | |
| beam_size=3, | |
| language=language, | |
| vad_filter=True, | |
| vad_parameters=dict(min_silence_duration_ms=300), | |
| ) | |
| detected_lang = getattr(_info, 'language', 'unknown') | |
| print(f"[LiveProcessor] Whisper detected language: {detected_lang}") | |
| for seg in segments: | |
| text = seg.text.strip() | |
| if not text or len(text) <= 2: | |
| continue | |
| # Skip common whisper hallucinations | |
| if text.lower() in ("thank you.", "thanks.", "you", "bye.", "the end.", | |
| "thanks for watching.", "subscribe.", "...", | |
| "thank you for watching."): | |
| print(f"[LiveProcessor] Skipping hallucination: {text}") | |
| continue | |
| print(f"[LiveProcessor] Transcript: '{text}'") | |
| # Text emotion | |
| t_result = None | |
| if self._text_det is not None: | |
| t_result = self._text_det.detect(text) | |
| with self._lock: | |
| self._text_result = t_result | |
| elapsed = time.time() - self._session_start | |
| segment = TranscriptSegment( | |
| text=text, | |
| timestamp=elapsed, | |
| emotion=t_result.dominant if t_result else None, | |
| confidence=t_result.confidence if t_result else 0.0, | |
| ) | |
| with self._lock: | |
| self._transcript.append(segment) | |
| topics = extract_topics(text) | |
| if topics: | |
| with self._lock: | |
| for t in topics: | |
| if t not in self._topics_seen: | |
| self._topics_seen.append(t) | |
| except Exception as e: | |
| print(f"[LiveProcessor] STT error: {e}") | |
| elif self._whisper_model is not None and not wav_path: | |
| print("[LiveProcessor] STT skipped — audio conversion failed") | |
| # Clean up wav file | |
| if wav_path: | |
| try: | |
| os.unlink(wav_path) | |
| except Exception: | |
| pass | |
| self._run_fusion() | |
| # ── Video Frame Callback (for streamlit-webrtc — legacy) ──────── | |
| def video_frame_callback(self, frame) -> object: | |
| """Process a video frame from streamlit-webrtc. | |
| Called for every frame. We subsample to ~2fps for face detection | |
| and ~every 5th processed frame for posture. | |
| Args: | |
| frame: av.VideoFrame from streamlit-webrtc | |
| Returns: | |
| The frame unchanged (passthrough for display). | |
| """ | |
| if not self._session_active: | |
| return frame | |
| now = time.time() | |
| # Subsample: process face at ~2fps (every 500ms) | |
| if now - self._last_video_process < 0.5: | |
| return frame | |
| self._last_video_process = now | |
| self._video_frame_count += 1 | |
| try: | |
| # Convert av.VideoFrame to numpy array | |
| img = frame.to_ndarray(format="bgr24") | |
| # Face emotion detection | |
| if self._face_det is not None: | |
| # Convert BGR to RGB for PIL | |
| rgb = img[:, :, ::-1] | |
| # Encode to bytes for the detector | |
| if HAS_PIL: | |
| pil_img = Image.fromarray(rgb) | |
| buf = io.BytesIO() | |
| pil_img.save(buf, format="JPEG", quality=70) | |
| img_bytes = buf.getvalue() | |
| else: | |
| success, enc = cv2.imencode(".jpg", img) | |
| img_bytes = enc.tobytes() if success else b"" | |
| if img_bytes: | |
| result = self._face_det.detect(img_bytes) | |
| with self._lock: | |
| self._face_result = result | |
| self._modality_times["face"].append(result.processing_time_ms) | |
| # Posture detection every 5th processed frame (~0.4fps) | |
| if self._video_frame_count % 5 == 0 and self._posture_det is not None: | |
| self._posture_frame_count += 1 | |
| if HAS_CV2: | |
| success, enc = cv2.imencode(".jpg", img) | |
| posture_bytes = enc.tobytes() if success else b"" | |
| elif HAS_PIL: | |
| pil_img = Image.fromarray(img[:, :, ::-1]) | |
| buf = io.BytesIO() | |
| pil_img.save(buf, format="JPEG", quality=70) | |
| posture_bytes = buf.getvalue() | |
| else: | |
| posture_bytes = b"" | |
| if posture_bytes: | |
| p_result = self._posture_det.detect(posture_bytes) | |
| with self._lock: | |
| self._posture_result = p_result | |
| self._modality_times["posture"].append(p_result.processing_time_ms) | |
| # Run fusion after face processing | |
| self._run_fusion() | |
| except Exception as e: | |
| print(f"[LiveProcessor] Video frame error: {e}") | |
| return frame | |
| # ── Audio Frame Callback (for streamlit-webrtc) ───────────────── | |
| def audio_frame_callback(self, frame) -> object: | |
| """Process an audio frame from streamlit-webrtc. | |
| Accumulates audio data and processes every ~3 seconds for: | |
| - Voice emotion detection | |
| - Speech-to-text transcription | |
| Args: | |
| frame: av.AudioFrame from streamlit-webrtc | |
| Returns: | |
| The frame unchanged (passthrough). | |
| """ | |
| if not self._session_active: | |
| return frame | |
| try: | |
| # Convert av.AudioFrame to numpy | |
| audio_array = frame.to_ndarray() | |
| # audio_array shape: (channels, samples) — take first channel | |
| if audio_array.ndim > 1: | |
| audio_array = audio_array[0] | |
| # Convert to float32 normalized | |
| if audio_array.dtype == np.int16: | |
| audio_array = audio_array.astype(np.float32) / 32768.0 | |
| elif audio_array.dtype == np.int32: | |
| audio_array = audio_array.astype(np.float32) / 2147483648.0 | |
| self._audio_buffer.append(audio_array) | |
| # Get sample rate from frame | |
| self._audio_sample_rate = frame.sample_rate or 16000 | |
| # Process every ~3 seconds of accumulated audio | |
| now = time.time() | |
| if now - self._last_audio_process >= 3.0 and len(self._audio_buffer) > 0: | |
| self._last_audio_process = now | |
| self._process_audio_chunk() | |
| except Exception as e: | |
| print(f"[LiveProcessor] Audio frame error: {e}") | |
| return frame | |
| def _process_audio_chunk(self): | |
| """Process accumulated audio buffer for voice emotion + STT.""" | |
| # Concatenate buffered audio | |
| with self._lock: | |
| if not self._audio_buffer: | |
| return | |
| audio = np.concatenate(self._audio_buffer) | |
| self._audio_buffer = [] | |
| self._audio_chunk_count += 1 | |
| # Resample to 16000 Hz if needed | |
| if self._audio_sample_rate != 16000: | |
| try: | |
| import librosa | |
| audio = librosa.resample( | |
| audio, orig_sr=self._audio_sample_rate, target_sr=16000 | |
| ) | |
| except ImportError: | |
| # Simple decimation/interpolation fallback | |
| ratio = 16000 / self._audio_sample_rate | |
| new_len = int(len(audio) * ratio) | |
| indices = np.linspace(0, len(audio) - 1, new_len).astype(int) | |
| audio = audio[indices] | |
| # Skip if audio is too short (< 0.5s) | |
| if len(audio) < 8000: | |
| return | |
| # Voice emotion detection | |
| if self._voice_det is not None: | |
| try: | |
| v_result = self._voice_det.detect(audio, sample_rate=16000) | |
| with self._lock: | |
| self._voice_result = v_result | |
| self._modality_times["voice"].append(v_result.processing_time_ms) | |
| except Exception as e: | |
| print(f"[LiveProcessor] Voice detection error: {e}") | |
| # Speech-to-text | |
| transcript_text = "" | |
| if self._whisper_model is not None: | |
| try: | |
| segments, info = self._whisper_model.transcribe( | |
| audio, | |
| beam_size=1, | |
| vad_filter=True, | |
| ) | |
| parts = [] | |
| for seg in segments: | |
| parts.append(seg.text.strip()) | |
| transcript_text = " ".join(parts).strip() | |
| except Exception as e: | |
| print(f"[LiveProcessor] STT error: {e}") | |
| # If we got text, run text emotion detection + topic extraction | |
| if transcript_text: | |
| if self._text_det is not None: | |
| try: | |
| t_result = self._text_det.detect(transcript_text) | |
| with self._lock: | |
| self._text_result = t_result | |
| self._modality_times["text"].append(t_result.processing_time_ms) | |
| except Exception as e: | |
| print(f"[LiveProcessor] Text detection error: {e}") | |
| # Record transcript segment | |
| elapsed = time.time() - self._session_start | |
| text_emotion = self._text_result.dominant if self._text_result else None | |
| text_conf = self._text_result.confidence if self._text_result else 0.0 | |
| segment = TranscriptSegment( | |
| text=transcript_text, | |
| timestamp=elapsed, | |
| emotion=text_emotion, | |
| confidence=text_conf, | |
| ) | |
| with self._lock: | |
| self._transcript.append(segment) | |
| # Topic extraction | |
| topics = extract_topics(transcript_text) | |
| if topics: | |
| with self._lock: | |
| for t in topics: | |
| if t not in self._topics_seen: | |
| self._topics_seen.append(t) | |
| # Run fusion after audio processing | |
| self._run_fusion() | |
| # ── Manual Text Input ─────────────────────────────────────────── | |
| def process_text(self, text: str): | |
| """Process manually typed text (e.g., from a text input field).""" | |
| if not self._session_active or not text.strip(): | |
| return | |
| if self._text_det is not None: | |
| try: | |
| t_result = self._text_det.detect(text) | |
| with self._lock: | |
| self._text_result = t_result | |
| except Exception: | |
| pass | |
| elapsed = time.time() - self._session_start | |
| text_emotion = self._text_result.dominant if self._text_result else None | |
| text_conf = self._text_result.confidence if self._text_result else 0.0 | |
| segment = TranscriptSegment( | |
| text=text, | |
| timestamp=elapsed, | |
| emotion=text_emotion, | |
| confidence=text_conf, | |
| ) | |
| with self._lock: | |
| self._transcript.append(segment) | |
| topics = extract_topics(text) | |
| if topics: | |
| with self._lock: | |
| for t in topics: | |
| if t not in self._topics_seen: | |
| self._topics_seen.append(t) | |
| self._run_fusion() | |
| # ── Fusion ────────────────────────────────────────────────────── | |
| def _run_fusion(self): | |
| """Run fuzzy fusion on the latest modality results.""" | |
| if self._fusion is None: | |
| return | |
| with self._lock: | |
| face = self._face_result | |
| voice = self._voice_result | |
| text = self._text_result | |
| posture = self._posture_result | |
| if not any([face, voice, text, posture]): | |
| return | |
| try: | |
| fused = self._fusion.fuse( | |
| face=face, | |
| voice=voice, | |
| text=text, | |
| posture=posture, | |
| ) | |
| elapsed = time.time() - self._session_start | |
| # Detect topics from latest transcript | |
| topics = [] | |
| with self._lock: | |
| if self._transcript: | |
| topics = extract_topics(self._transcript[-1].text) | |
| entry = TimelineEntry( | |
| timestamp=elapsed, | |
| fused_result=fused, | |
| transcript=self._transcript[-1].text if self._transcript else "", | |
| topics=topics, | |
| ) | |
| with self._lock: | |
| self._fused_result = fused | |
| self._timeline.append(entry) | |
| except Exception as e: | |
| print(f"[LiveProcessor] Fusion error: {e}") | |
| # ── State Accessors (thread-safe) ─────────────────────────────── | |
| def get_latest_fused(self) -> Optional[FusedDetectionResult]: | |
| with self._lock: | |
| return self._fused_result | |
| def get_latest_face(self) -> Optional[EmotionDetectionResult]: | |
| with self._lock: | |
| return self._face_result | |
| def get_latest_voice(self) -> Optional[EmotionDetectionResult]: | |
| with self._lock: | |
| return self._voice_result | |
| def get_latest_text(self) -> Optional[EmotionDetectionResult]: | |
| with self._lock: | |
| return self._text_result | |
| def get_latest_posture(self) -> Optional[EmotionDetectionResult]: | |
| with self._lock: | |
| return self._posture_result | |
| def get_timeline(self) -> list[TimelineEntry]: | |
| with self._lock: | |
| return list(self._timeline) | |
| def get_transcript(self) -> list[TranscriptSegment]: | |
| with self._lock: | |
| return list(self._transcript) | |
| def get_topics(self) -> list[str]: | |
| with self._lock: | |
| return list(self._topics_seen) | |
| def get_stats(self) -> dict: | |
| """Get session statistics.""" | |
| with self._lock: | |
| return { | |
| "duration": time.time() - self._session_start if self._session_active else 0.0, | |
| "video_frames": self._video_frame_count, | |
| "posture_frames": self._posture_frame_count, | |
| "audio_chunks": self._audio_chunk_count, | |
| "transcript_segments": len(self._transcript), | |
| "timeline_entries": len(self._timeline), | |
| "topics": list(self._topics_seen), | |
| } | |
| # ── Summary Generation ────────────────────────────────────────── | |
| def generate_summary(self) -> Optional[SessionSummary]: | |
| """Generate a session summary after the session ends.""" | |
| with self._lock: | |
| timeline = list(self._timeline) | |
| transcript = list(self._transcript) | |
| topics = list(self._topics_seen) | |
| stats = { | |
| "duration": (time.time() - self._session_start) if self._session_start else 0.0, | |
| "video_frames": self._video_frame_count, | |
| "audio_chunks": self._audio_chunk_count, | |
| "transcript_segments": len(transcript), | |
| } | |
| modality_times = {k: list(v) for k, v in self._modality_times.items()} | |
| if not timeline: | |
| return None | |
| # Compute emotion distribution from timeline | |
| emotion_accum: dict[str, float] = {label.value: 0.0 for label in EMOTION_LABELS} | |
| for entry in timeline: | |
| for score in entry.fused_result.scores: | |
| emotion_accum[score.label.value] += score.score | |
| total_entries = len(timeline) | |
| emotion_dist = {k: v / total_entries for k, v in emotion_accum.items()} | |
| # Dominant emotion overall | |
| dominant_val = max(emotion_dist, key=emotion_dist.get) # type: ignore | |
| dominant_emotion = EmotionLabel(dominant_val) | |
| # Modality contribution (average weight across timeline) | |
| mod_weights: dict[str, list[float]] = {} | |
| for entry in timeline: | |
| for mod, w in entry.fused_result.modality_weights.items(): | |
| if mod not in mod_weights: | |
| mod_weights[mod] = [] | |
| mod_weights[mod].append(w) | |
| modality_contribution = { | |
| mod: sum(ws) / len(ws) for mod, ws in mod_weights.items() | |
| } | |
| # Detect emotional shifts (dominant emotion changes) | |
| shifts: list[dict] = [] | |
| peaks: list[dict] = [] | |
| prev_dominant = None | |
| for entry in timeline: | |
| dom = entry.fused_result.dominant | |
| dom_score = entry.fused_result.dominant_score | |
| if prev_dominant is not None and dom != prev_dominant: | |
| shifts.append({ | |
| "timestamp": entry.timestamp, | |
| "from": prev_dominant.value, | |
| "to": dom.value, | |
| "score": dom_score, | |
| "transcript": entry.transcript[:80] if entry.transcript else "", | |
| }) | |
| # Detect peaks (score > 0.5 for non-neutral) | |
| if dom != EmotionLabel.NEUTRAL and dom_score > 0.5: | |
| peaks.append({ | |
| "timestamp": entry.timestamp, | |
| "emotion": dom.value, | |
| "score": dom_score, | |
| "transcript": entry.transcript[:80] if entry.transcript else "", | |
| }) | |
| prev_dominant = dom | |
| return SessionSummary( | |
| duration_seconds=stats["duration"], | |
| total_video_frames=stats["video_frames"], | |
| total_audio_chunks=stats["audio_chunks"], | |
| total_transcript_segments=stats["transcript_segments"], | |
| dominant_emotion=dominant_emotion, | |
| emotion_distribution=emotion_dist, | |
| modality_contribution=modality_contribution, | |
| topics_detected=topics, | |
| emotional_shifts=shifts, | |
| peaks=peaks, | |
| ) | |
| # ── Video File Processing ────────────────────────────────────── | |
| def process_video_file(self, video_bytes: bytes, progress_callback=None) -> Optional[SessionSummary]: | |
| """Process an uploaded video file frame by frame. | |
| Extracts frames at ~2fps for face + posture detection, | |
| extracts audio for voice emotion + STT, then fuses all modalities. | |
| Args: | |
| video_bytes: Raw video file bytes (mp4, webm, etc.) | |
| progress_callback: Optional callable(progress_float) for UI updates. | |
| Returns: | |
| SessionSummary after processing, or None on failure. | |
| """ | |
| import tempfile, os | |
| if not HAS_CV2: | |
| print("[LiveProcessor] cv2 required for video file processing") | |
| return None | |
| # Write to temp file for OpenCV | |
| tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| tmp.write(video_bytes) | |
| tmp.flush() | |
| tmp_path = tmp.name | |
| tmp.close() | |
| try: | |
| cap = cv2.VideoCapture(tmp_path) | |
| if not cap.isOpened(): | |
| print("[LiveProcessor] Failed to open video file") | |
| return None | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = total_frames / fps if fps > 0 else 0 | |
| # Start a virtual session | |
| self.start_session() | |
| # Process at ~2fps | |
| frame_interval = max(1, int(fps / 2)) | |
| frame_idx = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_idx % frame_interval == 0: | |
| current_time = frame_idx / fps | |
| self._session_start = time.time() - current_time | |
| # Face detection | |
| if self._face_det is not None: | |
| try: | |
| rgb = frame[:, :, ::-1] | |
| if HAS_PIL: | |
| pil_img = Image.fromarray(rgb) | |
| buf = io.BytesIO() | |
| pil_img.save(buf, format="JPEG", quality=70) | |
| img_bytes = buf.getvalue() | |
| else: | |
| success, enc = cv2.imencode(".jpg", frame) | |
| img_bytes = enc.tobytes() if success else b"" | |
| if img_bytes: | |
| result = self._face_det.detect(img_bytes) | |
| with self._lock: | |
| self._face_result = result | |
| self._video_frame_count += 1 | |
| except Exception as e: | |
| print(f"[VideoFile] Face error frame {frame_idx}: {e}") | |
| # Posture every 5th processed frame | |
| if (self._video_frame_count % 5 == 0) and self._posture_det is not None: | |
| try: | |
| success, enc = cv2.imencode(".jpg", frame) | |
| if success: | |
| p_result = self._posture_det.detect(enc.tobytes()) | |
| with self._lock: | |
| self._posture_result = p_result | |
| self._posture_frame_count += 1 | |
| except Exception as e: | |
| print(f"[VideoFile] Posture error frame {frame_idx}: {e}") | |
| # Run fusion | |
| self._run_fusion() | |
| # Progress callback | |
| if progress_callback and total_frames > 0: | |
| progress_callback(min(frame_idx / total_frames, 0.9)) | |
| frame_idx += 1 | |
| cap.release() | |
| # Extract and process audio track | |
| self._process_video_audio(tmp_path, duration) | |
| # Final fusion | |
| self._run_fusion() | |
| if progress_callback: | |
| progress_callback(1.0) | |
| self.stop_session() | |
| return self.generate_summary() | |
| except Exception as e: | |
| print(f"[LiveProcessor] Video file error: {e}") | |
| self.stop_session() | |
| return None | |
| finally: | |
| try: | |
| os.unlink(tmp_path) | |
| except Exception: | |
| pass | |
| def _process_video_audio(self, video_path: str, duration: float): | |
| """Extract audio from video file and process for voice emotion + STT.""" | |
| try: | |
| import subprocess, tempfile, os | |
| # Extract audio with ffmpeg (available on HF Spaces) | |
| audio_tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) | |
| audio_path = audio_tmp.name | |
| audio_tmp.close() | |
| result = subprocess.run( | |
| ["ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le", | |
| "-ar", "16000", "-ac", "1", "-y", audio_path], | |
| capture_output=True, timeout=30, | |
| ) | |
| if result.returncode != 0: | |
| print(f"[VideoFile] ffmpeg audio extract failed: {result.stderr[:200]}") | |
| return | |
| # Read audio and process | |
| import soundfile as sf | |
| audio_data, sr = sf.read(audio_path) | |
| if audio_data.dtype != np.float32: | |
| audio_data = audio_data.astype(np.float32) | |
| # Voice emotion on full audio | |
| if self._voice_det is not None: | |
| try: | |
| audio_bytes = io.BytesIO() | |
| sf.write(audio_bytes, audio_data, sr, format="WAV") | |
| audio_bytes.seek(0) | |
| voice_result = self._voice_det.detect(audio_bytes.read()) | |
| with self._lock: | |
| self._voice_result = voice_result | |
| self._audio_chunk_count += 1 | |
| except Exception as e: | |
| print(f"[VideoFile] Voice detection error: {e}") | |
| # STT on audio | |
| if self._whisper_model is not None: | |
| try: | |
| segments, _info = self._whisper_model.transcribe( | |
| audio_path, beam_size=1 | |
| ) | |
| for seg in segments: | |
| text = seg.text.strip() | |
| if text: | |
| # Text emotion | |
| t_result = None | |
| if self._text_det is not None: | |
| t_result = self._text_det.detect(text) | |
| with self._lock: | |
| self._text_result = t_result | |
| segment = TranscriptSegment( | |
| text=text, | |
| timestamp=seg.start, | |
| emotion=t_result.dominant if t_result else None, | |
| confidence=t_result.confidence if t_result else 0.0, | |
| ) | |
| with self._lock: | |
| self._transcript.append(segment) | |
| topics = extract_topics(text) | |
| if topics: | |
| with self._lock: | |
| for t in topics: | |
| if t not in self._topics_seen: | |
| self._topics_seen.append(t) | |
| except Exception as e: | |
| print(f"[VideoFile] STT error: {e}") | |
| try: | |
| os.unlink(audio_path) | |
| except Exception: | |
| pass | |
| except Exception as e: | |
| print(f"[VideoFile] Audio processing error: {e}") | |