"""EmoSphere Live Session Processor — real-time multimodal processing. Designed for use with streamlit-webrtc. Handles: - Video frame processing (face + posture at ~2fps) - Audio chunk accumulation and processing (3-second windows) - Speech-to-text via faster-whisper (base, multilingual — 99 languages) - Fuzzy fusion after each processing cycle - Thread-safe shared state for Streamlit display - Emotion timeline tracking - Keyword-based topic/trigger extraction """ from __future__ import annotations import io import time import threading from collections import deque from dataclasses import dataclass, field from typing import Optional import numpy as np from models import ( EmotionLabel, EMOTION_LABELS, EmotionScore, EmotionDetectionResult, FusedDetectionResult, CulturalRegion, ) try: from PIL import Image HAS_PIL = True except ImportError: HAS_PIL = False try: import cv2 HAS_CV2 = True except ImportError: HAS_CV2 = False # ===================================================================== # Data Structures # ===================================================================== @dataclass class TranscriptSegment: text: str timestamp: float # seconds from session start emotion: Optional[EmotionLabel] = None confidence: float = 0.0 @dataclass class TimelineEntry: timestamp: float # seconds from session start fused_result: FusedDetectionResult transcript: str = "" topics: list[str] = field(default_factory=list) fired_rules: list[str] = field(default_factory=list) @dataclass class SessionSummary: duration_seconds: float total_video_frames: int total_audio_chunks: int total_transcript_segments: int dominant_emotion: EmotionLabel emotion_distribution: dict[str, float] modality_contribution: dict[str, float] topics_detected: list[str] emotional_shifts: list[dict] peaks: list[dict] # ===================================================================== # Keyword-based Topic Extractor (lightweight, no KeyBERT) # ===================================================================== # Reuse the keyword lists from text_detector.py for topic/trigger detection TOPIC_KEYWORDS: dict[str, list[str]] = { "happiness": ["happy", "glad", "excited", "wonderful", "great", "amazing", "awesome", "fantastic", "smile", "laugh", "fun", "enjoy", "delighted", "thrilled"], "grief": ["sad", "unhappy", "depressed", "lonely", "miss", "cry", "tears", "heartbreak", "grief", "loss", "mourning"], "anxiety": ["afraid", "scared", "worried", "anxious", "nervous", "terrified", "panic", "dread", "uneasy", "stressed", "overwhelmed", "tense"], "relationships": ["love", "adore", "partner", "together", "family", "friend", "relationship", "marriage", "breakup", "divorce", "trust"], "work": ["work", "job", "boss", "career", "office", "deadline", "project", "meeting", "fired", "promoted", "colleague", "salary"], "health": ["health", "sick", "pain", "doctor", "hospital", "tired", "sleep", "insomnia", "medication", "therapy", "diagnosis"], "self_esteem": ["worthless", "failure", "ugly", "stupid", "hopeless", "useless", "inadequate", "ashamed", "embarrassed", "confident"], "change": ["change", "different", "new", "suddenly", "unexpected", "surprise", "shocking", "unbelievable", "transform"], "calm": ["calm", "peaceful", "relaxed", "serene", "tranquil", "meditate", "breathe", "mindful", "quiet", "gentle"], "conflict": ["angry", "furious", "annoyed", "frustrated", "argue", "fight", "conflict", "hate", "rage", "resent"], } def extract_topics(text: str) -> list[str]: """Extract topic/trigger keywords from text using simple lexicon matching.""" lower = text.lower() found = [] for topic, keywords in TOPIC_KEYWORDS.items(): if any(kw in lower for kw in keywords): found.append(topic) return found # ===================================================================== # Live Session Processor # ===================================================================== class LiveSessionProcessor: """Thread-safe processor for real-time multimodal emotion analysis. Integrates with streamlit-webrtc callbacks for video and audio. Maintains shared state that Streamlit can poll for display updates. """ def __init__(self): self._lock = threading.Lock() # Detectors (loaded lazily) self._face_det = None self._voice_det = None self._text_det = None self._posture_det = None self._fusion = None self._whisper_model = None # Processing state self._session_active = False self._session_start: float = 0.0 self._last_video_process: float = 0.0 self._video_frame_count: int = 0 self._posture_frame_count: int = 0 # Audio accumulation self._audio_buffer: list[np.ndarray] = [] self._audio_sample_rate: int = 16000 self._last_audio_process: float = 0.0 self._audio_chunk_count: int = 0 # Latest results per modality self._face_result: Optional[EmotionDetectionResult] = None self._voice_result: Optional[EmotionDetectionResult] = None self._text_result: Optional[EmotionDetectionResult] = None self._posture_result: Optional[EmotionDetectionResult] = None self._fused_result: Optional[FusedDetectionResult] = None # Timeline and transcript self._timeline: list[TimelineEntry] = [] self._transcript: list[TranscriptSegment] = [] self._topics_seen: list[str] = [] # Modality processing time accumulators self._modality_times: dict[str, list[float]] = { "face": [], "voice": [], "text": [], "posture": [], } # ── Initialization ────────────────────────────────────────────── def initialize(self): """Load all detectors and models. Call once at startup.""" from face_detector import FaceEmotionDetector from voice_detector import VoiceEmotionDetector from text_detector import TextEmotionDetector from posture_detector import PostureEmotionDetector from fuzzy_fusion import FuzzyFusionEngine self._face_det = FaceEmotionDetector() self._face_det.load() self._voice_det = VoiceEmotionDetector() self._voice_det.load() self._text_det = TextEmotionDetector() self._text_det.load() self._posture_det = PostureEmotionDetector() self._posture_det.load() self._fusion = FuzzyFusionEngine() # Load faster-whisper for STT (base multilingual — good accuracy, fast on CPU) try: from faster_whisper import WhisperModel self._whisper_model = WhisperModel( "base", device="cpu", compute_type="int8" ) print("[LiveProcessor] faster-whisper base (multilingual) loaded") except ImportError: print("[LiveProcessor] faster-whisper not available, STT disabled") self._whisper_model = None except Exception as e: print(f"[LiveProcessor] Whisper load error: {e}") self._whisper_model = None print("[LiveProcessor] All detectors initialized") # ── Session Control ───────────────────────────────────────────── def start_session(self): """Start a new live session.""" with self._lock: self._session_active = True self._session_start = time.time() self._last_video_process = 0.0 self._last_audio_process = 0.0 self._video_frame_count = 0 self._posture_frame_count = 0 self._audio_chunk_count = 0 self._audio_buffer = [] self._face_result = None self._voice_result = None self._text_result = None self._posture_result = None self._fused_result = None self._timeline = [] self._transcript = [] self._topics_seen = [] self._modality_times = { "face": [], "voice": [], "text": [], "posture": [], } def stop_session(self): """Stop the current session.""" with self._lock: self._session_active = False @property def is_active(self) -> bool: return self._session_active @property def elapsed_seconds(self) -> float: if not self._session_active: return 0.0 return time.time() - self._session_start # ── Direct Image/Audio Processing (for custom webcam component) ── def process_image(self, image_bytes: bytes): """Process a single image for face + posture/gesture detection (non-blocking).""" if not self._session_active: return # Run in background thread to avoid blocking Streamlit UI t = threading.Thread( target=self._process_image_worker, args=(image_bytes,), daemon=True, ) t.start() def _process_image_worker(self, image_bytes: bytes): """Background worker for image processing (face + posture/gesture).""" self._video_frame_count += 1 # Face emotion detection if self._face_det is not None: try: result = self._face_det.detect(image_bytes) with self._lock: self._face_result = result self._modality_times["face"].append(result.processing_time_ms) except Exception as e: print(f"[LiveProcessor] Face detect error: {e}") # Posture/gesture detection every 2nd frame if self._video_frame_count % 2 == 0 and self._posture_det is not None: try: self._posture_frame_count += 1 p_result = self._posture_det.detect(image_bytes) with self._lock: self._posture_result = p_result self._modality_times["posture"].append(p_result.processing_time_ms) except Exception as e: print(f"[LiveProcessor] Posture detect error: {e}") self._run_fusion() def _convert_webm_to_wav(self, audio_bytes: bytes) -> Optional[str]: """Convert webm/opus audio bytes to a WAV file via ffmpeg. Returns wav path or None.""" import tempfile, subprocess, os tmp_in = None try: tmp_in = tempfile.NamedTemporaryFile(suffix=".webm", delete=False) tmp_in.write(audio_bytes) tmp_in.flush() tmp_in.close() tmp_out = tmp_in.name.replace(".webm", ".wav") result = subprocess.run( ["ffmpeg", "-y", "-i", tmp_in.name, "-ar", "16000", "-ac", "1", "-f", "wav", tmp_out], capture_output=True, timeout=10, ) os.unlink(tmp_in.name) if result.returncode == 0 and os.path.exists(tmp_out): return tmp_out print(f"[LiveProcessor] ffmpeg failed: {result.stderr[:200] if result.stderr else 'unknown'}") try: os.unlink(tmp_out) except Exception: pass except Exception as e: print(f"[LiveProcessor] webm→wav conversion error: {e}") if tmp_in: try: os.unlink(tmp_in.name) except Exception: pass return None def process_audio_bytes(self, audio_bytes: bytes, language: str = None): """Process raw audio bytes for voice emotion + STT (non-blocking via thread).""" if not self._session_active: return # Run audio processing in background thread to avoid blocking Streamlit UI t = threading.Thread( target=self._process_audio_worker, args=(audio_bytes, language), daemon=True, ) t.start() def _process_audio_worker(self, audio_bytes: bytes, language: str = None): """Background worker for audio processing (voice emotion + STT).""" import os self._audio_chunk_count += 1 # Convert webm to wav once — reuse for both voice detection and STT wav_path = self._convert_webm_to_wav(audio_bytes) # Voice emotion detection if self._voice_det is not None: try: # Use converted WAV if available, otherwise raw bytes audio_for_voice = open(wav_path, "rb").read() if wav_path else audio_bytes result = self._voice_det.detect(audio_for_voice) with self._lock: self._voice_result = result self._modality_times["voice"].append(result.processing_time_ms) print(f"[LiveProcessor] Voice: {result.dominant.value} ({result.dominant_score:.2f})") except Exception as e: print(f"[LiveProcessor] Voice detect error: {e}") # Speech-to-text via faster-whisper if self._whisper_model is not None and wav_path: try: segments, _info = self._whisper_model.transcribe( wav_path, beam_size=3, language=language, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=300), ) detected_lang = getattr(_info, 'language', 'unknown') print(f"[LiveProcessor] Whisper detected language: {detected_lang}") for seg in segments: text = seg.text.strip() if not text or len(text) <= 2: continue # Skip common whisper hallucinations if text.lower() in ("thank you.", "thanks.", "you", "bye.", "the end.", "thanks for watching.", "subscribe.", "...", "thank you for watching."): print(f"[LiveProcessor] Skipping hallucination: {text}") continue print(f"[LiveProcessor] Transcript: '{text}'") # Text emotion t_result = None if self._text_det is not None: t_result = self._text_det.detect(text) with self._lock: self._text_result = t_result elapsed = time.time() - self._session_start segment = TranscriptSegment( text=text, timestamp=elapsed, emotion=t_result.dominant if t_result else None, confidence=t_result.confidence if t_result else 0.0, ) with self._lock: self._transcript.append(segment) topics = extract_topics(text) if topics: with self._lock: for t in topics: if t not in self._topics_seen: self._topics_seen.append(t) except Exception as e: print(f"[LiveProcessor] STT error: {e}") elif self._whisper_model is not None and not wav_path: print("[LiveProcessor] STT skipped — audio conversion failed") # Clean up wav file if wav_path: try: os.unlink(wav_path) except Exception: pass self._run_fusion() # ── Video Frame Callback (for streamlit-webrtc — legacy) ──────── def video_frame_callback(self, frame) -> object: """Process a video frame from streamlit-webrtc. Called for every frame. We subsample to ~2fps for face detection and ~every 5th processed frame for posture. Args: frame: av.VideoFrame from streamlit-webrtc Returns: The frame unchanged (passthrough for display). """ if not self._session_active: return frame now = time.time() # Subsample: process face at ~2fps (every 500ms) if now - self._last_video_process < 0.5: return frame self._last_video_process = now self._video_frame_count += 1 try: # Convert av.VideoFrame to numpy array img = frame.to_ndarray(format="bgr24") # Face emotion detection if self._face_det is not None: # Convert BGR to RGB for PIL rgb = img[:, :, ::-1] # Encode to bytes for the detector if HAS_PIL: pil_img = Image.fromarray(rgb) buf = io.BytesIO() pil_img.save(buf, format="JPEG", quality=70) img_bytes = buf.getvalue() else: success, enc = cv2.imencode(".jpg", img) img_bytes = enc.tobytes() if success else b"" if img_bytes: result = self._face_det.detect(img_bytes) with self._lock: self._face_result = result self._modality_times["face"].append(result.processing_time_ms) # Posture detection every 5th processed frame (~0.4fps) if self._video_frame_count % 5 == 0 and self._posture_det is not None: self._posture_frame_count += 1 if HAS_CV2: success, enc = cv2.imencode(".jpg", img) posture_bytes = enc.tobytes() if success else b"" elif HAS_PIL: pil_img = Image.fromarray(img[:, :, ::-1]) buf = io.BytesIO() pil_img.save(buf, format="JPEG", quality=70) posture_bytes = buf.getvalue() else: posture_bytes = b"" if posture_bytes: p_result = self._posture_det.detect(posture_bytes) with self._lock: self._posture_result = p_result self._modality_times["posture"].append(p_result.processing_time_ms) # Run fusion after face processing self._run_fusion() except Exception as e: print(f"[LiveProcessor] Video frame error: {e}") return frame # ── Audio Frame Callback (for streamlit-webrtc) ───────────────── def audio_frame_callback(self, frame) -> object: """Process an audio frame from streamlit-webrtc. Accumulates audio data and processes every ~3 seconds for: - Voice emotion detection - Speech-to-text transcription Args: frame: av.AudioFrame from streamlit-webrtc Returns: The frame unchanged (passthrough). """ if not self._session_active: return frame try: # Convert av.AudioFrame to numpy audio_array = frame.to_ndarray() # audio_array shape: (channels, samples) — take first channel if audio_array.ndim > 1: audio_array = audio_array[0] # Convert to float32 normalized if audio_array.dtype == np.int16: audio_array = audio_array.astype(np.float32) / 32768.0 elif audio_array.dtype == np.int32: audio_array = audio_array.astype(np.float32) / 2147483648.0 self._audio_buffer.append(audio_array) # Get sample rate from frame self._audio_sample_rate = frame.sample_rate or 16000 # Process every ~3 seconds of accumulated audio now = time.time() if now - self._last_audio_process >= 3.0 and len(self._audio_buffer) > 0: self._last_audio_process = now self._process_audio_chunk() except Exception as e: print(f"[LiveProcessor] Audio frame error: {e}") return frame def _process_audio_chunk(self): """Process accumulated audio buffer for voice emotion + STT.""" # Concatenate buffered audio with self._lock: if not self._audio_buffer: return audio = np.concatenate(self._audio_buffer) self._audio_buffer = [] self._audio_chunk_count += 1 # Resample to 16000 Hz if needed if self._audio_sample_rate != 16000: try: import librosa audio = librosa.resample( audio, orig_sr=self._audio_sample_rate, target_sr=16000 ) except ImportError: # Simple decimation/interpolation fallback ratio = 16000 / self._audio_sample_rate new_len = int(len(audio) * ratio) indices = np.linspace(0, len(audio) - 1, new_len).astype(int) audio = audio[indices] # Skip if audio is too short (< 0.5s) if len(audio) < 8000: return # Voice emotion detection if self._voice_det is not None: try: v_result = self._voice_det.detect(audio, sample_rate=16000) with self._lock: self._voice_result = v_result self._modality_times["voice"].append(v_result.processing_time_ms) except Exception as e: print(f"[LiveProcessor] Voice detection error: {e}") # Speech-to-text transcript_text = "" if self._whisper_model is not None: try: segments, info = self._whisper_model.transcribe( audio, beam_size=1, vad_filter=True, ) parts = [] for seg in segments: parts.append(seg.text.strip()) transcript_text = " ".join(parts).strip() except Exception as e: print(f"[LiveProcessor] STT error: {e}") # If we got text, run text emotion detection + topic extraction if transcript_text: if self._text_det is not None: try: t_result = self._text_det.detect(transcript_text) with self._lock: self._text_result = t_result self._modality_times["text"].append(t_result.processing_time_ms) except Exception as e: print(f"[LiveProcessor] Text detection error: {e}") # Record transcript segment elapsed = time.time() - self._session_start text_emotion = self._text_result.dominant if self._text_result else None text_conf = self._text_result.confidence if self._text_result else 0.0 segment = TranscriptSegment( text=transcript_text, timestamp=elapsed, emotion=text_emotion, confidence=text_conf, ) with self._lock: self._transcript.append(segment) # Topic extraction topics = extract_topics(transcript_text) if topics: with self._lock: for t in topics: if t not in self._topics_seen: self._topics_seen.append(t) # Run fusion after audio processing self._run_fusion() # ── Manual Text Input ─────────────────────────────────────────── def process_text(self, text: str): """Process manually typed text (e.g., from a text input field).""" if not self._session_active or not text.strip(): return if self._text_det is not None: try: t_result = self._text_det.detect(text) with self._lock: self._text_result = t_result except Exception: pass elapsed = time.time() - self._session_start text_emotion = self._text_result.dominant if self._text_result else None text_conf = self._text_result.confidence if self._text_result else 0.0 segment = TranscriptSegment( text=text, timestamp=elapsed, emotion=text_emotion, confidence=text_conf, ) with self._lock: self._transcript.append(segment) topics = extract_topics(text) if topics: with self._lock: for t in topics: if t not in self._topics_seen: self._topics_seen.append(t) self._run_fusion() # ── Fusion ────────────────────────────────────────────────────── def _run_fusion(self): """Run fuzzy fusion on the latest modality results.""" if self._fusion is None: return with self._lock: face = self._face_result voice = self._voice_result text = self._text_result posture = self._posture_result if not any([face, voice, text, posture]): return try: fused = self._fusion.fuse( face=face, voice=voice, text=text, posture=posture, ) elapsed = time.time() - self._session_start # Detect topics from latest transcript topics = [] with self._lock: if self._transcript: topics = extract_topics(self._transcript[-1].text) entry = TimelineEntry( timestamp=elapsed, fused_result=fused, transcript=self._transcript[-1].text if self._transcript else "", topics=topics, ) with self._lock: self._fused_result = fused self._timeline.append(entry) except Exception as e: print(f"[LiveProcessor] Fusion error: {e}") # ── State Accessors (thread-safe) ─────────────────────────────── def get_latest_fused(self) -> Optional[FusedDetectionResult]: with self._lock: return self._fused_result def get_latest_face(self) -> Optional[EmotionDetectionResult]: with self._lock: return self._face_result def get_latest_voice(self) -> Optional[EmotionDetectionResult]: with self._lock: return self._voice_result def get_latest_text(self) -> Optional[EmotionDetectionResult]: with self._lock: return self._text_result def get_latest_posture(self) -> Optional[EmotionDetectionResult]: with self._lock: return self._posture_result def get_timeline(self) -> list[TimelineEntry]: with self._lock: return list(self._timeline) def get_transcript(self) -> list[TranscriptSegment]: with self._lock: return list(self._transcript) def get_topics(self) -> list[str]: with self._lock: return list(self._topics_seen) def get_stats(self) -> dict: """Get session statistics.""" with self._lock: return { "duration": time.time() - self._session_start if self._session_active else 0.0, "video_frames": self._video_frame_count, "posture_frames": self._posture_frame_count, "audio_chunks": self._audio_chunk_count, "transcript_segments": len(self._transcript), "timeline_entries": len(self._timeline), "topics": list(self._topics_seen), } # ── Summary Generation ────────────────────────────────────────── def generate_summary(self) -> Optional[SessionSummary]: """Generate a session summary after the session ends.""" with self._lock: timeline = list(self._timeline) transcript = list(self._transcript) topics = list(self._topics_seen) stats = { "duration": (time.time() - self._session_start) if self._session_start else 0.0, "video_frames": self._video_frame_count, "audio_chunks": self._audio_chunk_count, "transcript_segments": len(transcript), } modality_times = {k: list(v) for k, v in self._modality_times.items()} if not timeline: return None # Compute emotion distribution from timeline emotion_accum: dict[str, float] = {label.value: 0.0 for label in EMOTION_LABELS} for entry in timeline: for score in entry.fused_result.scores: emotion_accum[score.label.value] += score.score total_entries = len(timeline) emotion_dist = {k: v / total_entries for k, v in emotion_accum.items()} # Dominant emotion overall dominant_val = max(emotion_dist, key=emotion_dist.get) # type: ignore dominant_emotion = EmotionLabel(dominant_val) # Modality contribution (average weight across timeline) mod_weights: dict[str, list[float]] = {} for entry in timeline: for mod, w in entry.fused_result.modality_weights.items(): if mod not in mod_weights: mod_weights[mod] = [] mod_weights[mod].append(w) modality_contribution = { mod: sum(ws) / len(ws) for mod, ws in mod_weights.items() } # Detect emotional shifts (dominant emotion changes) shifts: list[dict] = [] peaks: list[dict] = [] prev_dominant = None for entry in timeline: dom = entry.fused_result.dominant dom_score = entry.fused_result.dominant_score if prev_dominant is not None and dom != prev_dominant: shifts.append({ "timestamp": entry.timestamp, "from": prev_dominant.value, "to": dom.value, "score": dom_score, "transcript": entry.transcript[:80] if entry.transcript else "", }) # Detect peaks (score > 0.5 for non-neutral) if dom != EmotionLabel.NEUTRAL and dom_score > 0.5: peaks.append({ "timestamp": entry.timestamp, "emotion": dom.value, "score": dom_score, "transcript": entry.transcript[:80] if entry.transcript else "", }) prev_dominant = dom return SessionSummary( duration_seconds=stats["duration"], total_video_frames=stats["video_frames"], total_audio_chunks=stats["audio_chunks"], total_transcript_segments=stats["transcript_segments"], dominant_emotion=dominant_emotion, emotion_distribution=emotion_dist, modality_contribution=modality_contribution, topics_detected=topics, emotional_shifts=shifts, peaks=peaks, ) # ── Video File Processing ────────────────────────────────────── def process_video_file(self, video_bytes: bytes, progress_callback=None) -> Optional[SessionSummary]: """Process an uploaded video file frame by frame. Extracts frames at ~2fps for face + posture detection, extracts audio for voice emotion + STT, then fuses all modalities. Args: video_bytes: Raw video file bytes (mp4, webm, etc.) progress_callback: Optional callable(progress_float) for UI updates. Returns: SessionSummary after processing, or None on failure. """ import tempfile, os if not HAS_CV2: print("[LiveProcessor] cv2 required for video file processing") return None # Write to temp file for OpenCV tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) tmp.write(video_bytes) tmp.flush() tmp_path = tmp.name tmp.close() try: cap = cv2.VideoCapture(tmp_path) if not cap.isOpened(): print("[LiveProcessor] Failed to open video file") return None fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps if fps > 0 else 0 # Start a virtual session self.start_session() # Process at ~2fps frame_interval = max(1, int(fps / 2)) frame_idx = 0 while True: ret, frame = cap.read() if not ret: break if frame_idx % frame_interval == 0: current_time = frame_idx / fps self._session_start = time.time() - current_time # Face detection if self._face_det is not None: try: rgb = frame[:, :, ::-1] if HAS_PIL: pil_img = Image.fromarray(rgb) buf = io.BytesIO() pil_img.save(buf, format="JPEG", quality=70) img_bytes = buf.getvalue() else: success, enc = cv2.imencode(".jpg", frame) img_bytes = enc.tobytes() if success else b"" if img_bytes: result = self._face_det.detect(img_bytes) with self._lock: self._face_result = result self._video_frame_count += 1 except Exception as e: print(f"[VideoFile] Face error frame {frame_idx}: {e}") # Posture every 5th processed frame if (self._video_frame_count % 5 == 0) and self._posture_det is not None: try: success, enc = cv2.imencode(".jpg", frame) if success: p_result = self._posture_det.detect(enc.tobytes()) with self._lock: self._posture_result = p_result self._posture_frame_count += 1 except Exception as e: print(f"[VideoFile] Posture error frame {frame_idx}: {e}") # Run fusion self._run_fusion() # Progress callback if progress_callback and total_frames > 0: progress_callback(min(frame_idx / total_frames, 0.9)) frame_idx += 1 cap.release() # Extract and process audio track self._process_video_audio(tmp_path, duration) # Final fusion self._run_fusion() if progress_callback: progress_callback(1.0) self.stop_session() return self.generate_summary() except Exception as e: print(f"[LiveProcessor] Video file error: {e}") self.stop_session() return None finally: try: os.unlink(tmp_path) except Exception: pass def _process_video_audio(self, video_path: str, duration: float): """Extract audio from video file and process for voice emotion + STT.""" try: import subprocess, tempfile, os # Extract audio with ffmpeg (available on HF Spaces) audio_tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) audio_path = audio_tmp.name audio_tmp.close() result = subprocess.run( ["ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", audio_path], capture_output=True, timeout=30, ) if result.returncode != 0: print(f"[VideoFile] ffmpeg audio extract failed: {result.stderr[:200]}") return # Read audio and process import soundfile as sf audio_data, sr = sf.read(audio_path) if audio_data.dtype != np.float32: audio_data = audio_data.astype(np.float32) # Voice emotion on full audio if self._voice_det is not None: try: audio_bytes = io.BytesIO() sf.write(audio_bytes, audio_data, sr, format="WAV") audio_bytes.seek(0) voice_result = self._voice_det.detect(audio_bytes.read()) with self._lock: self._voice_result = voice_result self._audio_chunk_count += 1 except Exception as e: print(f"[VideoFile] Voice detection error: {e}") # STT on audio if self._whisper_model is not None: try: segments, _info = self._whisper_model.transcribe( audio_path, beam_size=1 ) for seg in segments: text = seg.text.strip() if text: # Text emotion t_result = None if self._text_det is not None: t_result = self._text_det.detect(text) with self._lock: self._text_result = t_result segment = TranscriptSegment( text=text, timestamp=seg.start, emotion=t_result.dominant if t_result else None, confidence=t_result.confidence if t_result else 0.0, ) with self._lock: self._transcript.append(segment) topics = extract_topics(text) if topics: with self._lock: for t in topics: if t not in self._topics_seen: self._topics_seen.append(t) except Exception as e: print(f"[VideoFile] STT error: {e}") try: os.unlink(audio_path) except Exception: pass except Exception as e: print(f"[VideoFile] Audio processing error: {e}")