# File: handler.py import base64 import io import os import tempfile from typing import Dict, Any import cv2 import librosa import numpy as np import pandas as pd import torch import whisper_timestamped as whisper from fer import FER from moviepy.editor import VideoFileClip, AudioFileClip from torch.nn.functional import softmax from transformers import AutoModelForAudioClassification, pipeline from translate import Translator class EndpointHandler: def __init__(self, path=""): """ Loads all models onto the device. This is called once when the endpoint starts. """ print("Loading models...") # Use GPU if available self.device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Using device: {self.device}") # 1. Audio Emotion Model self.audio_model = AutoModelForAudioClassification.from_pretrained( "3loi/SER-Odyssey-Baseline-WavLM-Categorical-Attributes", trust_remote_code=True ).to(self.device) self.audio_mean = self.audio_model.config.mean self.audio_std = self.audio_model.config.std # 2. Facial Emotion Model self.face_detector = FER(mtcnn=True) # 3. Text Emotion Model self.text_classifier = pipeline( "text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=None, device=self.device ) # 4. Transcription Model self.transcription_model = whisper.load_model("medium", device=self.device) # 5. Translator self.translator = Translator(from_lang='ko', to_lang='en') print("All models loaded successfully.") def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Handles an inference request. Args: data (Dict[str, Any]): Dictionary containing request parameters. Expected keys: 'video': Base64 encoded video string. 'analysis_type': One of "audio", "facial", or "text". """ print("Received inference request.") # --- 1. Parameter Extraction --- if 'inputs' in data and isinstance(data['inputs'], dict): params = data['inputs'] else: params = data b64_video = params.get("video") if not b64_video: raise ValueError("Missing 'video' parameter (base64 encoded string)") analysis_type = params.get("analysis_type") if analysis_type not in ["audio", "facial", "text"]: raise ValueError("Missing or invalid 'analysis_type'. Must be 'audio', 'facial', or 'text'.") # --- 2. Video Decoding --- video_bytes = base64.b64decode(b64_video) # Use a temporary file to store the video, as the original functions expect a path with tempfile.NamedTemporaryFile(suffix=".mp4", delete=True) as temp_video_file: temp_video_file.write(video_bytes) temp_video_file.flush() # Ensure all data is written video_path = temp_video_file.name print(f"Video saved to temporary file: {video_path}") # --- 3. Dispatch to correct analysis function --- try: if analysis_type == "audio": result = self._analyze_audio_emotions(video_path) elif analysis_type == "facial": result = self._detect_faces_and_emotions(video_path) elif analysis_type == "text": result = self._process_video_text(video_path) print("Analysis completed successfully.") return {"status": "success", **result} except Exception as e: print(f"Error during {analysis_type} analysis: {e}") # It's good practice to return a structured error return {"status": "error", "message": str(e)} # =================================================================== # REFACTORED ANALYSIS FUNCTIONS # =================================================================== def _analyze_audio_emotions(self, video_path: str) -> Dict: temp_audio_path = None try: # Extract audio with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: temp_audio_path = temp_audio_file.name VideoFileClip(video_path).audio.write_audiofile(temp_audio_path, codec="pcm_s16le", logger=None) raw_wav, _ = librosa.load(temp_audio_path, sr=self.audio_model.config.sampling_rate) norm_wav = (raw_wav - self.audio_mean) / (self.audio_std + 1e-6) times, emotions_dfs = [], [] for start_time in range(0, len(norm_wav), self.audio_model.config.sampling_rate): audio_segment = norm_wav[start_time:start_time + self.audio_model.config.sampling_rate] # Process segment audio_np = np.array(audio_segment) mask = torch.ones(1, len(audio_np)).to(self.device) wavs = torch.tensor(audio_np).unsqueeze(0).to(self.device) with torch.no_grad(): pred = self.audio_model(wavs, mask) logits = pred.logits if hasattr(pred, 'logits') else pred[0] labels = {0: 'Angry', 1: 'Sad', 2: 'Happy', 3: 'Surprise', 4: 'Fear', 5: 'Disgust', 7: 'Neutral'} probabilities = softmax(logits, dim=-1).squeeze(0)[[0, 1, 2, 3, 4, 5, 7]] probabilities = probabilities / probabilities.sum() df = pd.DataFrame([probabilities.cpu().numpy()], columns=labels.values()) times.append(start_time / self.audio_model.config.sampling_rate) emotions_dfs.append(df) emotions_df = pd.concat(emotions_dfs, ignore_index=True) emotions_df.insert(0, "Time(s)", times) emotion_rename_map = {'Angry': 'anger', 'Sad': 'sadness', 'Happy': 'happy', 'Surprise': 'surprise', 'Fear': 'fear', 'Disgust': 'disgust', 'Neutral': 'neutral'} emotions_df.rename(columns=emotion_rename_map, inplace=True) # Return DataFrame as JSON return {"emotions_data": emotions_df.to_json(orient='split')} finally: if temp_audio_path and os.path.exists(temp_audio_path): os.remove(temp_audio_path) def _detect_faces_and_emotions(self, video_path: str) -> Dict: emotions_data = [] output_video_path = None # =================================================================== # NEW: Confidence threshold for filtering false positives. # Only faces where at least one emotion has a score > 0.35 will be kept. # You can adjust this value. Higher = stricter filtering. (e.g., 0.40) # =================================================================== CONFIDENCE_THRESHOLD = 0.6 try: # Create a temporary file for the output video with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_out_video: output_video_path = temp_out_video.name original_video = VideoFileClip(video_path) cap = cv2.VideoCapture(video_path) fps = int(cap.get(cv2.CAP_PROP_FPS)) if fps == 0: # Handle potential issue with video metadata fps = 30 w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Use a temporary path for the video writer intermediate file with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_video_writer_file: temp_video_writer_path = temp_video_writer_file.name out = cv2.VideoWriter(temp_video_writer_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) frame_number = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break time_seconds = round(frame_number / fps) result = self.face_detector.detect_emotions(frame) # Process each face found in the frame for face in result: # =================================================================== # NEW: Filtering logic starts here # =================================================================== emotions = face["emotions"] max_emotion_score = max(emotions.values()) # If the highest emotion score is below our threshold, skip this face if max_emotion_score < CONFIDENCE_THRESHOLD: continue # Ignore this low-confidence detection # =================================================================== # End of new filtering logic. The rest of the loop proceeds as before. # =================================================================== box = face["box"] emotions["Time(s)"] = time_seconds emotions_data.append(emotions) cv2.rectangle(frame, (box[0], box[1]), (box[0] + box[2], box[1] + box[3]), (0, 155, 255), 2) # Find the dominant emotion to display on the video dominant_emotion = max(emotions, key=lambda k: emotions[k] if k != 'Time(s)' else -1) text_to_display = f"{dominant_emotion}: {emotions[dominant_emotion]:.2f}" cv2.putText(frame, text_to_display, (box[0], box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2, cv2.LINE_AA) out.write(frame) frame_number += 1 cap.release() out.release() # Combine processed video frames with original audio processed_video_clip = VideoFileClip(temp_video_writer_path) final_clip = processed_video_clip.set_audio(original_video.audio) final_clip.write_videofile(output_video_path, codec='libx264', logger=None) os.remove(temp_video_writer_path) # Clean up intermediate video # Read the final video bytes and encode to base64 with open(output_video_path, "rb") as f: processed_video_b64 = base64.b64encode(f.read()).decode("utf-8") # Process DataFrame emotions_df = pd.DataFrame(emotions_data) if not emotions_df.empty: emotions_df['Time(s)'] = emotions_df['Time(s)'].round().astype(int) max_time = emotions_df['Time(s)'].max() all_times = pd.DataFrame({'Time(s)': range(max_time + 1)}) avg_scores = emotions_df.groupby("Time(s)").mean().reset_index() df_merged = pd.merge(all_times, avg_scores, on='Time(s)', how='left').fillna(0) df_merged['Time(s)'] = df_merged['Time(s)'].astype(str) + " sec" else: df_merged = pd.DataFrame() return { "emotions_data": df_merged.to_json(orient='split'), "processed_video": processed_video_b64 } finally: if output_video_path and os.path.exists(output_video_path): os.remove(output_video_path) def _process_video_text(self, video_path: str) -> Dict: temp_audio_path = None try: video_clip = VideoFileClip(video_path) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: temp_audio_path = temp_audio_file.name video_clip.audio.write_audiofile(temp_audio_path, logger=None) audio = whisper.load_audio(temp_audio_path) result = whisper.transcribe(self.transcription_model, audio) segments_data = [{'text': seg['text'], 'start': seg['start'], 'end': seg['end']} for seg in result['segments']] segments_df = pd.DataFrame(segments_data) if segments_df.empty: return {"words_data": pd.DataFrame().to_json(orient='split'), "segments_data": pd.DataFrame().to_json(orient='split')} segments_df['Translated_Text'] = segments_df['text'].apply(lambda x: self.translator.translate(x)) segments_df['Sentiment_Scores'] = segments_df['Translated_Text'].apply( lambda x: {entry['label']: entry['score'] for entry in self.text_classifier(x)[0]}) sentiment_df = segments_df['Sentiment_Scores'].apply(pd.Series) final_segments_df = pd.concat([segments_df.drop(columns=['Sentiment_Scores']), sentiment_df], axis=1) # Process words data word_texts, word_starts, word_ends = [], [], [] for segment in result['segments']: for word in segment['words']: word_texts.append(word['text']) word_starts.append(word['start']) word_ends.append(word['end']) words_df = pd.DataFrame({'text': word_texts, 'start': word_starts, 'end': word_ends}) words_df['second'] = words_df['start'].apply(lambda x: int(np.ceil(x))) words_grouped = words_df.groupby('second').agg( {'text': lambda x: ' '.join(x), 'start': 'min', 'end': 'max'}).reset_index() max_second = int(video_clip.duration) all_seconds = pd.DataFrame({'second': np.arange(0, max_second + 1)}) words_grouped = all_seconds.merge(words_grouped, on='second', how='left').fillna( {'text': '', 'start': 0, 'end': 0}) emotion_columns = final_segments_df.columns.difference(['text', 'start', 'end', 'Translated_Text']) for col in emotion_columns: words_grouped[col] = np.nan for i, row in words_grouped.iterrows(): matching_segment = final_segments_df[ (final_segments_df['start'] <= row['start']) & (final_segments_df['end'] >= row['end'])] if not matching_segment.empty: for emotion in emotion_columns: words_grouped.at[i, emotion] = matching_segment.iloc[0][emotion] words_grouped[emotion_columns] = words_grouped[emotion_columns].fillna(0) return { "words_data": words_grouped.to_json(orient='split'), "segments_data": final_segments_df.to_json(orient='split') } finally: if temp_audio_path and os.path.exists(temp_audio_path): os.remove(temp_audio_path)