EmoSphere / live_processor.py
chariscait's picture
Revert whisper small back to base — small too heavy for free tier
32988ca verified
"""EmoSphere Live Session Processor — real-time multimodal processing.
Designed for use with streamlit-webrtc. Handles:
- Video frame processing (face + posture at ~2fps)
- Audio chunk accumulation and processing (3-second windows)
- Speech-to-text via faster-whisper (base, multilingual — 99 languages)
- Fuzzy fusion after each processing cycle
- Thread-safe shared state for Streamlit display
- Emotion timeline tracking
- Keyword-based topic/trigger extraction
"""
from __future__ import annotations
import io
import time
import threading
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
from models import (
EmotionLabel,
EMOTION_LABELS,
EmotionScore,
EmotionDetectionResult,
FusedDetectionResult,
CulturalRegion,
)
try:
from PIL import Image
HAS_PIL = True
except ImportError:
HAS_PIL = False
try:
import cv2
HAS_CV2 = True
except ImportError:
HAS_CV2 = False
# =====================================================================
# Data Structures
# =====================================================================
@dataclass
class TranscriptSegment:
text: str
timestamp: float # seconds from session start
emotion: Optional[EmotionLabel] = None
confidence: float = 0.0
@dataclass
class TimelineEntry:
timestamp: float # seconds from session start
fused_result: FusedDetectionResult
transcript: str = ""
topics: list[str] = field(default_factory=list)
fired_rules: list[str] = field(default_factory=list)
@dataclass
class SessionSummary:
duration_seconds: float
total_video_frames: int
total_audio_chunks: int
total_transcript_segments: int
dominant_emotion: EmotionLabel
emotion_distribution: dict[str, float]
modality_contribution: dict[str, float]
topics_detected: list[str]
emotional_shifts: list[dict]
peaks: list[dict]
# =====================================================================
# Keyword-based Topic Extractor (lightweight, no KeyBERT)
# =====================================================================
# Reuse the keyword lists from text_detector.py for topic/trigger detection
TOPIC_KEYWORDS: dict[str, list[str]] = {
"happiness": ["happy", "glad", "excited", "wonderful", "great", "amazing", "awesome",
"fantastic", "smile", "laugh", "fun", "enjoy", "delighted", "thrilled"],
"grief": ["sad", "unhappy", "depressed", "lonely", "miss", "cry", "tears",
"heartbreak", "grief", "loss", "mourning"],
"anxiety": ["afraid", "scared", "worried", "anxious", "nervous", "terrified",
"panic", "dread", "uneasy", "stressed", "overwhelmed", "tense"],
"relationships": ["love", "adore", "partner", "together", "family", "friend",
"relationship", "marriage", "breakup", "divorce", "trust"],
"work": ["work", "job", "boss", "career", "office", "deadline", "project",
"meeting", "fired", "promoted", "colleague", "salary"],
"health": ["health", "sick", "pain", "doctor", "hospital", "tired",
"sleep", "insomnia", "medication", "therapy", "diagnosis"],
"self_esteem": ["worthless", "failure", "ugly", "stupid", "hopeless",
"useless", "inadequate", "ashamed", "embarrassed", "confident"],
"change": ["change", "different", "new", "suddenly", "unexpected",
"surprise", "shocking", "unbelievable", "transform"],
"calm": ["calm", "peaceful", "relaxed", "serene", "tranquil",
"meditate", "breathe", "mindful", "quiet", "gentle"],
"conflict": ["angry", "furious", "annoyed", "frustrated", "argue",
"fight", "conflict", "hate", "rage", "resent"],
}
def extract_topics(text: str) -> list[str]:
"""Extract topic/trigger keywords from text using simple lexicon matching."""
lower = text.lower()
found = []
for topic, keywords in TOPIC_KEYWORDS.items():
if any(kw in lower for kw in keywords):
found.append(topic)
return found
# =====================================================================
# Live Session Processor
# =====================================================================
class LiveSessionProcessor:
"""Thread-safe processor for real-time multimodal emotion analysis.
Integrates with streamlit-webrtc callbacks for video and audio.
Maintains shared state that Streamlit can poll for display updates.
"""
def __init__(self):
self._lock = threading.Lock()
# Detectors (loaded lazily)
self._face_det = None
self._voice_det = None
self._text_det = None
self._posture_det = None
self._fusion = None
self._whisper_model = None
# Processing state
self._session_active = False
self._session_start: float = 0.0
self._last_video_process: float = 0.0
self._video_frame_count: int = 0
self._posture_frame_count: int = 0
# Audio accumulation
self._audio_buffer: list[np.ndarray] = []
self._audio_sample_rate: int = 16000
self._last_audio_process: float = 0.0
self._audio_chunk_count: int = 0
# Latest results per modality
self._face_result: Optional[EmotionDetectionResult] = None
self._voice_result: Optional[EmotionDetectionResult] = None
self._text_result: Optional[EmotionDetectionResult] = None
self._posture_result: Optional[EmotionDetectionResult] = None
self._fused_result: Optional[FusedDetectionResult] = None
# Timeline and transcript
self._timeline: list[TimelineEntry] = []
self._transcript: list[TranscriptSegment] = []
self._topics_seen: list[str] = []
# Modality processing time accumulators
self._modality_times: dict[str, list[float]] = {
"face": [], "voice": [], "text": [], "posture": [],
}
# ── Initialization ──────────────────────────────────────────────
def initialize(self):
"""Load all detectors and models. Call once at startup."""
from face_detector import FaceEmotionDetector
from voice_detector import VoiceEmotionDetector
from text_detector import TextEmotionDetector
from posture_detector import PostureEmotionDetector
from fuzzy_fusion import FuzzyFusionEngine
self._face_det = FaceEmotionDetector()
self._face_det.load()
self._voice_det = VoiceEmotionDetector()
self._voice_det.load()
self._text_det = TextEmotionDetector()
self._text_det.load()
self._posture_det = PostureEmotionDetector()
self._posture_det.load()
self._fusion = FuzzyFusionEngine()
# Load faster-whisper for STT (base multilingual — good accuracy, fast on CPU)
try:
from faster_whisper import WhisperModel
self._whisper_model = WhisperModel(
"base", device="cpu", compute_type="int8"
)
print("[LiveProcessor] faster-whisper base (multilingual) loaded")
except ImportError:
print("[LiveProcessor] faster-whisper not available, STT disabled")
self._whisper_model = None
except Exception as e:
print(f"[LiveProcessor] Whisper load error: {e}")
self._whisper_model = None
print("[LiveProcessor] All detectors initialized")
# ── Session Control ─────────────────────────────────────────────
def start_session(self):
"""Start a new live session."""
with self._lock:
self._session_active = True
self._session_start = time.time()
self._last_video_process = 0.0
self._last_audio_process = 0.0
self._video_frame_count = 0
self._posture_frame_count = 0
self._audio_chunk_count = 0
self._audio_buffer = []
self._face_result = None
self._voice_result = None
self._text_result = None
self._posture_result = None
self._fused_result = None
self._timeline = []
self._transcript = []
self._topics_seen = []
self._modality_times = {
"face": [], "voice": [], "text": [], "posture": [],
}
def stop_session(self):
"""Stop the current session."""
with self._lock:
self._session_active = False
@property
def is_active(self) -> bool:
return self._session_active
@property
def elapsed_seconds(self) -> float:
if not self._session_active:
return 0.0
return time.time() - self._session_start
# ── Direct Image/Audio Processing (for custom webcam component) ──
def process_image(self, image_bytes: bytes):
"""Process a single image for face + posture/gesture detection (non-blocking)."""
if not self._session_active:
return
# Run in background thread to avoid blocking Streamlit UI
t = threading.Thread(
target=self._process_image_worker,
args=(image_bytes,),
daemon=True,
)
t.start()
def _process_image_worker(self, image_bytes: bytes):
"""Background worker for image processing (face + posture/gesture)."""
self._video_frame_count += 1
# Face emotion detection
if self._face_det is not None:
try:
result = self._face_det.detect(image_bytes)
with self._lock:
self._face_result = result
self._modality_times["face"].append(result.processing_time_ms)
except Exception as e:
print(f"[LiveProcessor] Face detect error: {e}")
# Posture/gesture detection every 2nd frame
if self._video_frame_count % 2 == 0 and self._posture_det is not None:
try:
self._posture_frame_count += 1
p_result = self._posture_det.detect(image_bytes)
with self._lock:
self._posture_result = p_result
self._modality_times["posture"].append(p_result.processing_time_ms)
except Exception as e:
print(f"[LiveProcessor] Posture detect error: {e}")
self._run_fusion()
def _convert_webm_to_wav(self, audio_bytes: bytes) -> Optional[str]:
"""Convert webm/opus audio bytes to a WAV file via ffmpeg. Returns wav path or None."""
import tempfile, subprocess, os
tmp_in = None
try:
tmp_in = tempfile.NamedTemporaryFile(suffix=".webm", delete=False)
tmp_in.write(audio_bytes)
tmp_in.flush()
tmp_in.close()
tmp_out = tmp_in.name.replace(".webm", ".wav")
result = subprocess.run(
["ffmpeg", "-y", "-i", tmp_in.name,
"-ar", "16000", "-ac", "1", "-f", "wav", tmp_out],
capture_output=True, timeout=10,
)
os.unlink(tmp_in.name)
if result.returncode == 0 and os.path.exists(tmp_out):
return tmp_out
print(f"[LiveProcessor] ffmpeg failed: {result.stderr[:200] if result.stderr else 'unknown'}")
try:
os.unlink(tmp_out)
except Exception:
pass
except Exception as e:
print(f"[LiveProcessor] webm→wav conversion error: {e}")
if tmp_in:
try:
os.unlink(tmp_in.name)
except Exception:
pass
return None
def process_audio_bytes(self, audio_bytes: bytes, language: str = None):
"""Process raw audio bytes for voice emotion + STT (non-blocking via thread)."""
if not self._session_active:
return
# Run audio processing in background thread to avoid blocking Streamlit UI
t = threading.Thread(
target=self._process_audio_worker,
args=(audio_bytes, language),
daemon=True,
)
t.start()
def _process_audio_worker(self, audio_bytes: bytes, language: str = None):
"""Background worker for audio processing (voice emotion + STT)."""
import os
self._audio_chunk_count += 1
# Convert webm to wav once — reuse for both voice detection and STT
wav_path = self._convert_webm_to_wav(audio_bytes)
# Voice emotion detection
if self._voice_det is not None:
try:
# Use converted WAV if available, otherwise raw bytes
audio_for_voice = open(wav_path, "rb").read() if wav_path else audio_bytes
result = self._voice_det.detect(audio_for_voice)
with self._lock:
self._voice_result = result
self._modality_times["voice"].append(result.processing_time_ms)
print(f"[LiveProcessor] Voice: {result.dominant.value} ({result.dominant_score:.2f})")
except Exception as e:
print(f"[LiveProcessor] Voice detect error: {e}")
# Speech-to-text via faster-whisper
if self._whisper_model is not None and wav_path:
try:
segments, _info = self._whisper_model.transcribe(
wav_path,
beam_size=3,
language=language,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=300),
)
detected_lang = getattr(_info, 'language', 'unknown')
print(f"[LiveProcessor] Whisper detected language: {detected_lang}")
for seg in segments:
text = seg.text.strip()
if not text or len(text) <= 2:
continue
# Skip common whisper hallucinations
if text.lower() in ("thank you.", "thanks.", "you", "bye.", "the end.",
"thanks for watching.", "subscribe.", "...",
"thank you for watching."):
print(f"[LiveProcessor] Skipping hallucination: {text}")
continue
print(f"[LiveProcessor] Transcript: '{text}'")
# Text emotion
t_result = None
if self._text_det is not None:
t_result = self._text_det.detect(text)
with self._lock:
self._text_result = t_result
elapsed = time.time() - self._session_start
segment = TranscriptSegment(
text=text,
timestamp=elapsed,
emotion=t_result.dominant if t_result else None,
confidence=t_result.confidence if t_result else 0.0,
)
with self._lock:
self._transcript.append(segment)
topics = extract_topics(text)
if topics:
with self._lock:
for t in topics:
if t not in self._topics_seen:
self._topics_seen.append(t)
except Exception as e:
print(f"[LiveProcessor] STT error: {e}")
elif self._whisper_model is not None and not wav_path:
print("[LiveProcessor] STT skipped — audio conversion failed")
# Clean up wav file
if wav_path:
try:
os.unlink(wav_path)
except Exception:
pass
self._run_fusion()
# ── Video Frame Callback (for streamlit-webrtc — legacy) ────────
def video_frame_callback(self, frame) -> object:
"""Process a video frame from streamlit-webrtc.
Called for every frame. We subsample to ~2fps for face detection
and ~every 5th processed frame for posture.
Args:
frame: av.VideoFrame from streamlit-webrtc
Returns:
The frame unchanged (passthrough for display).
"""
if not self._session_active:
return frame
now = time.time()
# Subsample: process face at ~2fps (every 500ms)
if now - self._last_video_process < 0.5:
return frame
self._last_video_process = now
self._video_frame_count += 1
try:
# Convert av.VideoFrame to numpy array
img = frame.to_ndarray(format="bgr24")
# Face emotion detection
if self._face_det is not None:
# Convert BGR to RGB for PIL
rgb = img[:, :, ::-1]
# Encode to bytes for the detector
if HAS_PIL:
pil_img = Image.fromarray(rgb)
buf = io.BytesIO()
pil_img.save(buf, format="JPEG", quality=70)
img_bytes = buf.getvalue()
else:
success, enc = cv2.imencode(".jpg", img)
img_bytes = enc.tobytes() if success else b""
if img_bytes:
result = self._face_det.detect(img_bytes)
with self._lock:
self._face_result = result
self._modality_times["face"].append(result.processing_time_ms)
# Posture detection every 5th processed frame (~0.4fps)
if self._video_frame_count % 5 == 0 and self._posture_det is not None:
self._posture_frame_count += 1
if HAS_CV2:
success, enc = cv2.imencode(".jpg", img)
posture_bytes = enc.tobytes() if success else b""
elif HAS_PIL:
pil_img = Image.fromarray(img[:, :, ::-1])
buf = io.BytesIO()
pil_img.save(buf, format="JPEG", quality=70)
posture_bytes = buf.getvalue()
else:
posture_bytes = b""
if posture_bytes:
p_result = self._posture_det.detect(posture_bytes)
with self._lock:
self._posture_result = p_result
self._modality_times["posture"].append(p_result.processing_time_ms)
# Run fusion after face processing
self._run_fusion()
except Exception as e:
print(f"[LiveProcessor] Video frame error: {e}")
return frame
# ── Audio Frame Callback (for streamlit-webrtc) ─────────────────
def audio_frame_callback(self, frame) -> object:
"""Process an audio frame from streamlit-webrtc.
Accumulates audio data and processes every ~3 seconds for:
- Voice emotion detection
- Speech-to-text transcription
Args:
frame: av.AudioFrame from streamlit-webrtc
Returns:
The frame unchanged (passthrough).
"""
if not self._session_active:
return frame
try:
# Convert av.AudioFrame to numpy
audio_array = frame.to_ndarray()
# audio_array shape: (channels, samples) — take first channel
if audio_array.ndim > 1:
audio_array = audio_array[0]
# Convert to float32 normalized
if audio_array.dtype == np.int16:
audio_array = audio_array.astype(np.float32) / 32768.0
elif audio_array.dtype == np.int32:
audio_array = audio_array.astype(np.float32) / 2147483648.0
self._audio_buffer.append(audio_array)
# Get sample rate from frame
self._audio_sample_rate = frame.sample_rate or 16000
# Process every ~3 seconds of accumulated audio
now = time.time()
if now - self._last_audio_process >= 3.0 and len(self._audio_buffer) > 0:
self._last_audio_process = now
self._process_audio_chunk()
except Exception as e:
print(f"[LiveProcessor] Audio frame error: {e}")
return frame
def _process_audio_chunk(self):
"""Process accumulated audio buffer for voice emotion + STT."""
# Concatenate buffered audio
with self._lock:
if not self._audio_buffer:
return
audio = np.concatenate(self._audio_buffer)
self._audio_buffer = []
self._audio_chunk_count += 1
# Resample to 16000 Hz if needed
if self._audio_sample_rate != 16000:
try:
import librosa
audio = librosa.resample(
audio, orig_sr=self._audio_sample_rate, target_sr=16000
)
except ImportError:
# Simple decimation/interpolation fallback
ratio = 16000 / self._audio_sample_rate
new_len = int(len(audio) * ratio)
indices = np.linspace(0, len(audio) - 1, new_len).astype(int)
audio = audio[indices]
# Skip if audio is too short (< 0.5s)
if len(audio) < 8000:
return
# Voice emotion detection
if self._voice_det is not None:
try:
v_result = self._voice_det.detect(audio, sample_rate=16000)
with self._lock:
self._voice_result = v_result
self._modality_times["voice"].append(v_result.processing_time_ms)
except Exception as e:
print(f"[LiveProcessor] Voice detection error: {e}")
# Speech-to-text
transcript_text = ""
if self._whisper_model is not None:
try:
segments, info = self._whisper_model.transcribe(
audio,
beam_size=1,
vad_filter=True,
)
parts = []
for seg in segments:
parts.append(seg.text.strip())
transcript_text = " ".join(parts).strip()
except Exception as e:
print(f"[LiveProcessor] STT error: {e}")
# If we got text, run text emotion detection + topic extraction
if transcript_text:
if self._text_det is not None:
try:
t_result = self._text_det.detect(transcript_text)
with self._lock:
self._text_result = t_result
self._modality_times["text"].append(t_result.processing_time_ms)
except Exception as e:
print(f"[LiveProcessor] Text detection error: {e}")
# Record transcript segment
elapsed = time.time() - self._session_start
text_emotion = self._text_result.dominant if self._text_result else None
text_conf = self._text_result.confidence if self._text_result else 0.0
segment = TranscriptSegment(
text=transcript_text,
timestamp=elapsed,
emotion=text_emotion,
confidence=text_conf,
)
with self._lock:
self._transcript.append(segment)
# Topic extraction
topics = extract_topics(transcript_text)
if topics:
with self._lock:
for t in topics:
if t not in self._topics_seen:
self._topics_seen.append(t)
# Run fusion after audio processing
self._run_fusion()
# ── Manual Text Input ───────────────────────────────────────────
def process_text(self, text: str):
"""Process manually typed text (e.g., from a text input field)."""
if not self._session_active or not text.strip():
return
if self._text_det is not None:
try:
t_result = self._text_det.detect(text)
with self._lock:
self._text_result = t_result
except Exception:
pass
elapsed = time.time() - self._session_start
text_emotion = self._text_result.dominant if self._text_result else None
text_conf = self._text_result.confidence if self._text_result else 0.0
segment = TranscriptSegment(
text=text,
timestamp=elapsed,
emotion=text_emotion,
confidence=text_conf,
)
with self._lock:
self._transcript.append(segment)
topics = extract_topics(text)
if topics:
with self._lock:
for t in topics:
if t not in self._topics_seen:
self._topics_seen.append(t)
self._run_fusion()
# ── Fusion ──────────────────────────────────────────────────────
def _run_fusion(self):
"""Run fuzzy fusion on the latest modality results."""
if self._fusion is None:
return
with self._lock:
face = self._face_result
voice = self._voice_result
text = self._text_result
posture = self._posture_result
if not any([face, voice, text, posture]):
return
try:
fused = self._fusion.fuse(
face=face,
voice=voice,
text=text,
posture=posture,
)
elapsed = time.time() - self._session_start
# Detect topics from latest transcript
topics = []
with self._lock:
if self._transcript:
topics = extract_topics(self._transcript[-1].text)
entry = TimelineEntry(
timestamp=elapsed,
fused_result=fused,
transcript=self._transcript[-1].text if self._transcript else "",
topics=topics,
)
with self._lock:
self._fused_result = fused
self._timeline.append(entry)
except Exception as e:
print(f"[LiveProcessor] Fusion error: {e}")
# ── State Accessors (thread-safe) ───────────────────────────────
def get_latest_fused(self) -> Optional[FusedDetectionResult]:
with self._lock:
return self._fused_result
def get_latest_face(self) -> Optional[EmotionDetectionResult]:
with self._lock:
return self._face_result
def get_latest_voice(self) -> Optional[EmotionDetectionResult]:
with self._lock:
return self._voice_result
def get_latest_text(self) -> Optional[EmotionDetectionResult]:
with self._lock:
return self._text_result
def get_latest_posture(self) -> Optional[EmotionDetectionResult]:
with self._lock:
return self._posture_result
def get_timeline(self) -> list[TimelineEntry]:
with self._lock:
return list(self._timeline)
def get_transcript(self) -> list[TranscriptSegment]:
with self._lock:
return list(self._transcript)
def get_topics(self) -> list[str]:
with self._lock:
return list(self._topics_seen)
def get_stats(self) -> dict:
"""Get session statistics."""
with self._lock:
return {
"duration": time.time() - self._session_start if self._session_active else 0.0,
"video_frames": self._video_frame_count,
"posture_frames": self._posture_frame_count,
"audio_chunks": self._audio_chunk_count,
"transcript_segments": len(self._transcript),
"timeline_entries": len(self._timeline),
"topics": list(self._topics_seen),
}
# ── Summary Generation ──────────────────────────────────────────
def generate_summary(self) -> Optional[SessionSummary]:
"""Generate a session summary after the session ends."""
with self._lock:
timeline = list(self._timeline)
transcript = list(self._transcript)
topics = list(self._topics_seen)
stats = {
"duration": (time.time() - self._session_start) if self._session_start else 0.0,
"video_frames": self._video_frame_count,
"audio_chunks": self._audio_chunk_count,
"transcript_segments": len(transcript),
}
modality_times = {k: list(v) for k, v in self._modality_times.items()}
if not timeline:
return None
# Compute emotion distribution from timeline
emotion_accum: dict[str, float] = {label.value: 0.0 for label in EMOTION_LABELS}
for entry in timeline:
for score in entry.fused_result.scores:
emotion_accum[score.label.value] += score.score
total_entries = len(timeline)
emotion_dist = {k: v / total_entries for k, v in emotion_accum.items()}
# Dominant emotion overall
dominant_val = max(emotion_dist, key=emotion_dist.get) # type: ignore
dominant_emotion = EmotionLabel(dominant_val)
# Modality contribution (average weight across timeline)
mod_weights: dict[str, list[float]] = {}
for entry in timeline:
for mod, w in entry.fused_result.modality_weights.items():
if mod not in mod_weights:
mod_weights[mod] = []
mod_weights[mod].append(w)
modality_contribution = {
mod: sum(ws) / len(ws) for mod, ws in mod_weights.items()
}
# Detect emotional shifts (dominant emotion changes)
shifts: list[dict] = []
peaks: list[dict] = []
prev_dominant = None
for entry in timeline:
dom = entry.fused_result.dominant
dom_score = entry.fused_result.dominant_score
if prev_dominant is not None and dom != prev_dominant:
shifts.append({
"timestamp": entry.timestamp,
"from": prev_dominant.value,
"to": dom.value,
"score": dom_score,
"transcript": entry.transcript[:80] if entry.transcript else "",
})
# Detect peaks (score > 0.5 for non-neutral)
if dom != EmotionLabel.NEUTRAL and dom_score > 0.5:
peaks.append({
"timestamp": entry.timestamp,
"emotion": dom.value,
"score": dom_score,
"transcript": entry.transcript[:80] if entry.transcript else "",
})
prev_dominant = dom
return SessionSummary(
duration_seconds=stats["duration"],
total_video_frames=stats["video_frames"],
total_audio_chunks=stats["audio_chunks"],
total_transcript_segments=stats["transcript_segments"],
dominant_emotion=dominant_emotion,
emotion_distribution=emotion_dist,
modality_contribution=modality_contribution,
topics_detected=topics,
emotional_shifts=shifts,
peaks=peaks,
)
# ── Video File Processing ──────────────────────────────────────
def process_video_file(self, video_bytes: bytes, progress_callback=None) -> Optional[SessionSummary]:
"""Process an uploaded video file frame by frame.
Extracts frames at ~2fps for face + posture detection,
extracts audio for voice emotion + STT, then fuses all modalities.
Args:
video_bytes: Raw video file bytes (mp4, webm, etc.)
progress_callback: Optional callable(progress_float) for UI updates.
Returns:
SessionSummary after processing, or None on failure.
"""
import tempfile, os
if not HAS_CV2:
print("[LiveProcessor] cv2 required for video file processing")
return None
# Write to temp file for OpenCV
tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
tmp.write(video_bytes)
tmp.flush()
tmp_path = tmp.name
tmp.close()
try:
cap = cv2.VideoCapture(tmp_path)
if not cap.isOpened():
print("[LiveProcessor] Failed to open video file")
return None
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
# Start a virtual session
self.start_session()
# Process at ~2fps
frame_interval = max(1, int(fps / 2))
frame_idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_idx % frame_interval == 0:
current_time = frame_idx / fps
self._session_start = time.time() - current_time
# Face detection
if self._face_det is not None:
try:
rgb = frame[:, :, ::-1]
if HAS_PIL:
pil_img = Image.fromarray(rgb)
buf = io.BytesIO()
pil_img.save(buf, format="JPEG", quality=70)
img_bytes = buf.getvalue()
else:
success, enc = cv2.imencode(".jpg", frame)
img_bytes = enc.tobytes() if success else b""
if img_bytes:
result = self._face_det.detect(img_bytes)
with self._lock:
self._face_result = result
self._video_frame_count += 1
except Exception as e:
print(f"[VideoFile] Face error frame {frame_idx}: {e}")
# Posture every 5th processed frame
if (self._video_frame_count % 5 == 0) and self._posture_det is not None:
try:
success, enc = cv2.imencode(".jpg", frame)
if success:
p_result = self._posture_det.detect(enc.tobytes())
with self._lock:
self._posture_result = p_result
self._posture_frame_count += 1
except Exception as e:
print(f"[VideoFile] Posture error frame {frame_idx}: {e}")
# Run fusion
self._run_fusion()
# Progress callback
if progress_callback and total_frames > 0:
progress_callback(min(frame_idx / total_frames, 0.9))
frame_idx += 1
cap.release()
# Extract and process audio track
self._process_video_audio(tmp_path, duration)
# Final fusion
self._run_fusion()
if progress_callback:
progress_callback(1.0)
self.stop_session()
return self.generate_summary()
except Exception as e:
print(f"[LiveProcessor] Video file error: {e}")
self.stop_session()
return None
finally:
try:
os.unlink(tmp_path)
except Exception:
pass
def _process_video_audio(self, video_path: str, duration: float):
"""Extract audio from video file and process for voice emotion + STT."""
try:
import subprocess, tempfile, os
# Extract audio with ffmpeg (available on HF Spaces)
audio_tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
audio_path = audio_tmp.name
audio_tmp.close()
result = subprocess.run(
["ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le",
"-ar", "16000", "-ac", "1", "-y", audio_path],
capture_output=True, timeout=30,
)
if result.returncode != 0:
print(f"[VideoFile] ffmpeg audio extract failed: {result.stderr[:200]}")
return
# Read audio and process
import soundfile as sf
audio_data, sr = sf.read(audio_path)
if audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32)
# Voice emotion on full audio
if self._voice_det is not None:
try:
audio_bytes = io.BytesIO()
sf.write(audio_bytes, audio_data, sr, format="WAV")
audio_bytes.seek(0)
voice_result = self._voice_det.detect(audio_bytes.read())
with self._lock:
self._voice_result = voice_result
self._audio_chunk_count += 1
except Exception as e:
print(f"[VideoFile] Voice detection error: {e}")
# STT on audio
if self._whisper_model is not None:
try:
segments, _info = self._whisper_model.transcribe(
audio_path, beam_size=1
)
for seg in segments:
text = seg.text.strip()
if text:
# Text emotion
t_result = None
if self._text_det is not None:
t_result = self._text_det.detect(text)
with self._lock:
self._text_result = t_result
segment = TranscriptSegment(
text=text,
timestamp=seg.start,
emotion=t_result.dominant if t_result else None,
confidence=t_result.confidence if t_result else 0.0,
)
with self._lock:
self._transcript.append(segment)
topics = extract_topics(text)
if topics:
with self._lock:
for t in topics:
if t not in self._topics_seen:
self._topics_seen.append(t)
except Exception as e:
print(f"[VideoFile] STT error: {e}")
try:
os.unlink(audio_path)
except Exception:
pass
except Exception as e:
print(f"[VideoFile] Audio processing error: {e}")