| | |
| |
|
| | import base64 |
| | import io |
| | import os |
| | import tempfile |
| | from typing import Dict, Any |
| |
|
| | import cv2 |
| | import librosa |
| | import numpy as np |
| | import pandas as pd |
| | import torch |
| | import whisper_timestamped as whisper |
| | from fer import FER |
| | from moviepy.editor import VideoFileClip, AudioFileClip |
| | from torch.nn.functional import softmax |
| | from transformers import AutoModelForAudioClassification, pipeline |
| | from translate import Translator |
| |
|
| |
|
| | class EndpointHandler: |
| | def __init__(self, path=""): |
| | """ |
| | Loads all models onto the device. This is called once when the endpoint starts. |
| | """ |
| | print("Loading models...") |
| | |
| | self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| | print(f"Using device: {self.device}") |
| |
|
| | |
| | self.audio_model = AutoModelForAudioClassification.from_pretrained( |
| | "3loi/SER-Odyssey-Baseline-WavLM-Categorical-Attributes", trust_remote_code=True |
| | ).to(self.device) |
| | self.audio_mean = self.audio_model.config.mean |
| | self.audio_std = self.audio_model.config.std |
| |
|
| | |
| | self.face_detector = FER(mtcnn=True) |
| |
|
| | |
| | self.text_classifier = pipeline( |
| | "text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=None, device=self.device |
| | ) |
| |
|
| | |
| | self.transcription_model = whisper.load_model("medium", device=self.device) |
| |
|
| | |
| | self.translator = Translator(from_lang='ko', to_lang='en') |
| |
|
| | print("All models loaded successfully.") |
| |
|
| | def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Handles an inference request. |
| | |
| | Args: |
| | data (Dict[str, Any]): Dictionary containing request parameters. Expected keys: |
| | 'video': Base64 encoded video string. |
| | 'analysis_type': One of "audio", "facial", or "text". |
| | """ |
| | print("Received inference request.") |
| |
|
| | |
| | if 'inputs' in data and isinstance(data['inputs'], dict): |
| | params = data['inputs'] |
| | else: |
| | params = data |
| |
|
| | b64_video = params.get("video") |
| | if not b64_video: |
| | raise ValueError("Missing 'video' parameter (base64 encoded string)") |
| |
|
| | analysis_type = params.get("analysis_type") |
| | if analysis_type not in ["audio", "facial", "text"]: |
| | raise ValueError("Missing or invalid 'analysis_type'. Must be 'audio', 'facial', or 'text'.") |
| |
|
| | |
| | video_bytes = base64.b64decode(b64_video) |
| |
|
| | |
| | with tempfile.NamedTemporaryFile(suffix=".mp4", delete=True) as temp_video_file: |
| | temp_video_file.write(video_bytes) |
| | temp_video_file.flush() |
| | video_path = temp_video_file.name |
| | print(f"Video saved to temporary file: {video_path}") |
| |
|
| | |
| | try: |
| | if analysis_type == "audio": |
| | result = self._analyze_audio_emotions(video_path) |
| | elif analysis_type == "facial": |
| | result = self._detect_faces_and_emotions(video_path) |
| | elif analysis_type == "text": |
| | result = self._process_video_text(video_path) |
| |
|
| | print("Analysis completed successfully.") |
| | return {"status": "success", **result} |
| |
|
| | except Exception as e: |
| | print(f"Error during {analysis_type} analysis: {e}") |
| | |
| | return {"status": "error", "message": str(e)} |
| |
|
| | |
| | |
| | |
| |
|
| | def _analyze_audio_emotions(self, video_path: str) -> Dict: |
| | temp_audio_path = None |
| | try: |
| | |
| | with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: |
| | temp_audio_path = temp_audio_file.name |
| | VideoFileClip(video_path).audio.write_audiofile(temp_audio_path, codec="pcm_s16le", logger=None) |
| |
|
| | raw_wav, _ = librosa.load(temp_audio_path, sr=self.audio_model.config.sampling_rate) |
| | norm_wav = (raw_wav - self.audio_mean) / (self.audio_std + 1e-6) |
| |
|
| | times, emotions_dfs = [], [] |
| | for start_time in range(0, len(norm_wav), self.audio_model.config.sampling_rate): |
| | audio_segment = norm_wav[start_time:start_time + self.audio_model.config.sampling_rate] |
| |
|
| | |
| | audio_np = np.array(audio_segment) |
| | mask = torch.ones(1, len(audio_np)).to(self.device) |
| | wavs = torch.tensor(audio_np).unsqueeze(0).to(self.device) |
| | with torch.no_grad(): |
| | pred = self.audio_model(wavs, mask) |
| | logits = pred.logits if hasattr(pred, 'logits') else pred[0] |
| | labels = {0: 'Angry', 1: 'Sad', 2: 'Happy', 3: 'Surprise', 4: 'Fear', 5: 'Disgust', 7: 'Neutral'} |
| | probabilities = softmax(logits, dim=-1).squeeze(0)[[0, 1, 2, 3, 4, 5, 7]] |
| | probabilities = probabilities / probabilities.sum() |
| | df = pd.DataFrame([probabilities.cpu().numpy()], columns=labels.values()) |
| |
|
| | times.append(start_time / self.audio_model.config.sampling_rate) |
| | emotions_dfs.append(df) |
| |
|
| | emotions_df = pd.concat(emotions_dfs, ignore_index=True) |
| | emotions_df.insert(0, "Time(s)", times) |
| | emotion_rename_map = {'Angry': 'anger', 'Sad': 'sadness', 'Happy': 'happy', 'Surprise': 'surprise', |
| | 'Fear': 'fear', 'Disgust': 'disgust', 'Neutral': 'neutral'} |
| | emotions_df.rename(columns=emotion_rename_map, inplace=True) |
| |
|
| | |
| | return {"emotions_data": emotions_df.to_json(orient='split')} |
| |
|
| | finally: |
| | if temp_audio_path and os.path.exists(temp_audio_path): |
| | os.remove(temp_audio_path) |
| |
|
| | def _detect_faces_and_emotions(self, video_path: str) -> Dict: |
| | emotions_data = [] |
| | output_video_path = None |
| |
|
| | |
| | |
| | |
| | |
| | |
| | CONFIDENCE_THRESHOLD = 0.6 |
| |
|
| | try: |
| | |
| | with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_out_video: |
| | output_video_path = temp_out_video.name |
| |
|
| | original_video = VideoFileClip(video_path) |
| | cap = cv2.VideoCapture(video_path) |
| | fps = int(cap.get(cv2.CAP_PROP_FPS)) |
| | if fps == 0: |
| | fps = 30 |
| | w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| | h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| |
|
| | |
| | with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_video_writer_file: |
| | temp_video_writer_path = temp_video_writer_file.name |
| |
|
| | out = cv2.VideoWriter(temp_video_writer_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) |
| |
|
| | frame_number = 0 |
| | while cap.isOpened(): |
| | ret, frame = cap.read() |
| | if not ret: break |
| |
|
| | time_seconds = round(frame_number / fps) |
| | result = self.face_detector.detect_emotions(frame) |
| |
|
| | |
| | for face in result: |
| | |
| | |
| | |
| | emotions = face["emotions"] |
| | max_emotion_score = max(emotions.values()) |
| |
|
| | |
| | if max_emotion_score < CONFIDENCE_THRESHOLD: |
| | continue |
| | |
| | |
| | |
| |
|
| | box = face["box"] |
| | emotions["Time(s)"] = time_seconds |
| | emotions_data.append(emotions) |
| | cv2.rectangle(frame, (box[0], box[1]), (box[0] + box[2], box[1] + box[3]), (0, 155, 255), 2) |
| |
|
| | |
| | dominant_emotion = max(emotions, key=lambda k: emotions[k] if k != 'Time(s)' else -1) |
| | text_to_display = f"{dominant_emotion}: {emotions[dominant_emotion]:.2f}" |
| |
|
| | cv2.putText(frame, text_to_display, (box[0], box[1] - 10), |
| | cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2, cv2.LINE_AA) |
| |
|
| | out.write(frame) |
| | frame_number += 1 |
| |
|
| | cap.release() |
| | out.release() |
| |
|
| | |
| | processed_video_clip = VideoFileClip(temp_video_writer_path) |
| | final_clip = processed_video_clip.set_audio(original_video.audio) |
| | final_clip.write_videofile(output_video_path, codec='libx264', logger=None) |
| | os.remove(temp_video_writer_path) |
| |
|
| | |
| | with open(output_video_path, "rb") as f: |
| | processed_video_b64 = base64.b64encode(f.read()).decode("utf-8") |
| |
|
| | |
| | emotions_df = pd.DataFrame(emotions_data) |
| | if not emotions_df.empty: |
| | emotions_df['Time(s)'] = emotions_df['Time(s)'].round().astype(int) |
| | max_time = emotions_df['Time(s)'].max() |
| | all_times = pd.DataFrame({'Time(s)': range(max_time + 1)}) |
| | avg_scores = emotions_df.groupby("Time(s)").mean().reset_index() |
| | df_merged = pd.merge(all_times, avg_scores, on='Time(s)', how='left').fillna(0) |
| | df_merged['Time(s)'] = df_merged['Time(s)'].astype(str) + " sec" |
| | else: |
| | df_merged = pd.DataFrame() |
| |
|
| | return { |
| | "emotions_data": df_merged.to_json(orient='split'), |
| | "processed_video": processed_video_b64 |
| | } |
| | finally: |
| | if output_video_path and os.path.exists(output_video_path): |
| | os.remove(output_video_path) |
| |
|
| | def _process_video_text(self, video_path: str) -> Dict: |
| | temp_audio_path = None |
| | try: |
| | video_clip = VideoFileClip(video_path) |
| | with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: |
| | temp_audio_path = temp_audio_file.name |
| | video_clip.audio.write_audiofile(temp_audio_path, logger=None) |
| |
|
| | audio = whisper.load_audio(temp_audio_path) |
| | result = whisper.transcribe(self.transcription_model, audio) |
| |
|
| | segments_data = [{'text': seg['text'], 'start': seg['start'], 'end': seg['end']} for seg in |
| | result['segments']] |
| | segments_df = pd.DataFrame(segments_data) |
| |
|
| | if segments_df.empty: |
| | return {"words_data": pd.DataFrame().to_json(orient='split'), |
| | "segments_data": pd.DataFrame().to_json(orient='split')} |
| |
|
| | segments_df['Translated_Text'] = segments_df['text'].apply(lambda x: self.translator.translate(x)) |
| | segments_df['Sentiment_Scores'] = segments_df['Translated_Text'].apply( |
| | lambda x: {entry['label']: entry['score'] for entry in self.text_classifier(x)[0]}) |
| | sentiment_df = segments_df['Sentiment_Scores'].apply(pd.Series) |
| | final_segments_df = pd.concat([segments_df.drop(columns=['Sentiment_Scores']), sentiment_df], axis=1) |
| |
|
| | |
| | word_texts, word_starts, word_ends = [], [], [] |
| | for segment in result['segments']: |
| | for word in segment['words']: |
| | word_texts.append(word['text']) |
| | word_starts.append(word['start']) |
| | word_ends.append(word['end']) |
| |
|
| | words_df = pd.DataFrame({'text': word_texts, 'start': word_starts, 'end': word_ends}) |
| | words_df['second'] = words_df['start'].apply(lambda x: int(np.ceil(x))) |
| | words_grouped = words_df.groupby('second').agg( |
| | {'text': lambda x: ' '.join(x), 'start': 'min', 'end': 'max'}).reset_index() |
| |
|
| | max_second = int(video_clip.duration) |
| | all_seconds = pd.DataFrame({'second': np.arange(0, max_second + 1)}) |
| | words_grouped = all_seconds.merge(words_grouped, on='second', how='left').fillna( |
| | {'text': '', 'start': 0, 'end': 0}) |
| |
|
| | emotion_columns = final_segments_df.columns.difference(['text', 'start', 'end', 'Translated_Text']) |
| | for col in emotion_columns: |
| | words_grouped[col] = np.nan |
| |
|
| | for i, row in words_grouped.iterrows(): |
| | matching_segment = final_segments_df[ |
| | (final_segments_df['start'] <= row['start']) & (final_segments_df['end'] >= row['end'])] |
| | if not matching_segment.empty: |
| | for emotion in emotion_columns: |
| | words_grouped.at[i, emotion] = matching_segment.iloc[0][emotion] |
| |
|
| | words_grouped[emotion_columns] = words_grouped[emotion_columns].fillna(0) |
| |
|
| | return { |
| | "words_data": words_grouped.to_json(orient='split'), |
| | "segments_data": final_segments_df.to_json(orient='split') |
| | } |
| | finally: |
| | if temp_audio_path and os.path.exists(temp_audio_path): |
| | os.remove(temp_audio_path) |