video-analysis / handler.py
Matb09's picture
k
997d95e
# File: handler.py
import base64
import io
import os
import tempfile
from typing import Dict, Any
import cv2
import librosa
import numpy as np
import pandas as pd
import torch
import whisper_timestamped as whisper
from fer import FER
from moviepy.editor import VideoFileClip, AudioFileClip
from torch.nn.functional import softmax
from transformers import AutoModelForAudioClassification, pipeline
from translate import Translator
class EndpointHandler:
def __init__(self, path=""):
"""
Loads all models onto the device. This is called once when the endpoint starts.
"""
print("Loading models...")
# Use GPU if available
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {self.device}")
# 1. Audio Emotion Model
self.audio_model = AutoModelForAudioClassification.from_pretrained(
"3loi/SER-Odyssey-Baseline-WavLM-Categorical-Attributes", trust_remote_code=True
).to(self.device)
self.audio_mean = self.audio_model.config.mean
self.audio_std = self.audio_model.config.std
# 2. Facial Emotion Model
self.face_detector = FER(mtcnn=True)
# 3. Text Emotion Model
self.text_classifier = pipeline(
"text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=None, device=self.device
)
# 4. Transcription Model
self.transcription_model = whisper.load_model("medium", device=self.device)
# 5. Translator
self.translator = Translator(from_lang='ko', to_lang='en')
print("All models loaded successfully.")
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Handles an inference request.
Args:
data (Dict[str, Any]): Dictionary containing request parameters. Expected keys:
'video': Base64 encoded video string.
'analysis_type': One of "audio", "facial", or "text".
"""
print("Received inference request.")
# --- 1. Parameter Extraction ---
if 'inputs' in data and isinstance(data['inputs'], dict):
params = data['inputs']
else:
params = data
b64_video = params.get("video")
if not b64_video:
raise ValueError("Missing 'video' parameter (base64 encoded string)")
analysis_type = params.get("analysis_type")
if analysis_type not in ["audio", "facial", "text"]:
raise ValueError("Missing or invalid 'analysis_type'. Must be 'audio', 'facial', or 'text'.")
# --- 2. Video Decoding ---
video_bytes = base64.b64decode(b64_video)
# Use a temporary file to store the video, as the original functions expect a path
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=True) as temp_video_file:
temp_video_file.write(video_bytes)
temp_video_file.flush() # Ensure all data is written
video_path = temp_video_file.name
print(f"Video saved to temporary file: {video_path}")
# --- 3. Dispatch to correct analysis function ---
try:
if analysis_type == "audio":
result = self._analyze_audio_emotions(video_path)
elif analysis_type == "facial":
result = self._detect_faces_and_emotions(video_path)
elif analysis_type == "text":
result = self._process_video_text(video_path)
print("Analysis completed successfully.")
return {"status": "success", **result}
except Exception as e:
print(f"Error during {analysis_type} analysis: {e}")
# It's good practice to return a structured error
return {"status": "error", "message": str(e)}
# ===================================================================
# REFACTORED ANALYSIS FUNCTIONS
# ===================================================================
def _analyze_audio_emotions(self, video_path: str) -> Dict:
temp_audio_path = None
try:
# Extract audio
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file:
temp_audio_path = temp_audio_file.name
VideoFileClip(video_path).audio.write_audiofile(temp_audio_path, codec="pcm_s16le", logger=None)
raw_wav, _ = librosa.load(temp_audio_path, sr=self.audio_model.config.sampling_rate)
norm_wav = (raw_wav - self.audio_mean) / (self.audio_std + 1e-6)
times, emotions_dfs = [], []
for start_time in range(0, len(norm_wav), self.audio_model.config.sampling_rate):
audio_segment = norm_wav[start_time:start_time + self.audio_model.config.sampling_rate]
# Process segment
audio_np = np.array(audio_segment)
mask = torch.ones(1, len(audio_np)).to(self.device)
wavs = torch.tensor(audio_np).unsqueeze(0).to(self.device)
with torch.no_grad():
pred = self.audio_model(wavs, mask)
logits = pred.logits if hasattr(pred, 'logits') else pred[0]
labels = {0: 'Angry', 1: 'Sad', 2: 'Happy', 3: 'Surprise', 4: 'Fear', 5: 'Disgust', 7: 'Neutral'}
probabilities = softmax(logits, dim=-1).squeeze(0)[[0, 1, 2, 3, 4, 5, 7]]
probabilities = probabilities / probabilities.sum()
df = pd.DataFrame([probabilities.cpu().numpy()], columns=labels.values())
times.append(start_time / self.audio_model.config.sampling_rate)
emotions_dfs.append(df)
emotions_df = pd.concat(emotions_dfs, ignore_index=True)
emotions_df.insert(0, "Time(s)", times)
emotion_rename_map = {'Angry': 'anger', 'Sad': 'sadness', 'Happy': 'happy', 'Surprise': 'surprise',
'Fear': 'fear', 'Disgust': 'disgust', 'Neutral': 'neutral'}
emotions_df.rename(columns=emotion_rename_map, inplace=True)
# Return DataFrame as JSON
return {"emotions_data": emotions_df.to_json(orient='split')}
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
def _detect_faces_and_emotions(self, video_path: str) -> Dict:
emotions_data = []
output_video_path = None
# ===================================================================
# NEW: Confidence threshold for filtering false positives.
# Only faces where at least one emotion has a score > 0.35 will be kept.
# You can adjust this value. Higher = stricter filtering. (e.g., 0.40)
# ===================================================================
CONFIDENCE_THRESHOLD = 0.6
try:
# Create a temporary file for the output video
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_out_video:
output_video_path = temp_out_video.name
original_video = VideoFileClip(video_path)
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
if fps == 0: # Handle potential issue with video metadata
fps = 30
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Use a temporary path for the video writer intermediate file
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_video_writer_file:
temp_video_writer_path = temp_video_writer_file.name
out = cv2.VideoWriter(temp_video_writer_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
frame_number = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret: break
time_seconds = round(frame_number / fps)
result = self.face_detector.detect_emotions(frame)
# Process each face found in the frame
for face in result:
# ===================================================================
# NEW: Filtering logic starts here
# ===================================================================
emotions = face["emotions"]
max_emotion_score = max(emotions.values())
# If the highest emotion score is below our threshold, skip this face
if max_emotion_score < CONFIDENCE_THRESHOLD:
continue # Ignore this low-confidence detection
# ===================================================================
# End of new filtering logic. The rest of the loop proceeds as before.
# ===================================================================
box = face["box"]
emotions["Time(s)"] = time_seconds
emotions_data.append(emotions)
cv2.rectangle(frame, (box[0], box[1]), (box[0] + box[2], box[1] + box[3]), (0, 155, 255), 2)
# Find the dominant emotion to display on the video
dominant_emotion = max(emotions, key=lambda k: emotions[k] if k != 'Time(s)' else -1)
text_to_display = f"{dominant_emotion}: {emotions[dominant_emotion]:.2f}"
cv2.putText(frame, text_to_display, (box[0], box[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2, cv2.LINE_AA)
out.write(frame)
frame_number += 1
cap.release()
out.release()
# Combine processed video frames with original audio
processed_video_clip = VideoFileClip(temp_video_writer_path)
final_clip = processed_video_clip.set_audio(original_video.audio)
final_clip.write_videofile(output_video_path, codec='libx264', logger=None)
os.remove(temp_video_writer_path) # Clean up intermediate video
# Read the final video bytes and encode to base64
with open(output_video_path, "rb") as f:
processed_video_b64 = base64.b64encode(f.read()).decode("utf-8")
# Process DataFrame
emotions_df = pd.DataFrame(emotions_data)
if not emotions_df.empty:
emotions_df['Time(s)'] = emotions_df['Time(s)'].round().astype(int)
max_time = emotions_df['Time(s)'].max()
all_times = pd.DataFrame({'Time(s)': range(max_time + 1)})
avg_scores = emotions_df.groupby("Time(s)").mean().reset_index()
df_merged = pd.merge(all_times, avg_scores, on='Time(s)', how='left').fillna(0)
df_merged['Time(s)'] = df_merged['Time(s)'].astype(str) + " sec"
else:
df_merged = pd.DataFrame()
return {
"emotions_data": df_merged.to_json(orient='split'),
"processed_video": processed_video_b64
}
finally:
if output_video_path and os.path.exists(output_video_path):
os.remove(output_video_path)
def _process_video_text(self, video_path: str) -> Dict:
temp_audio_path = None
try:
video_clip = VideoFileClip(video_path)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file:
temp_audio_path = temp_audio_file.name
video_clip.audio.write_audiofile(temp_audio_path, logger=None)
audio = whisper.load_audio(temp_audio_path)
result = whisper.transcribe(self.transcription_model, audio)
segments_data = [{'text': seg['text'], 'start': seg['start'], 'end': seg['end']} for seg in
result['segments']]
segments_df = pd.DataFrame(segments_data)
if segments_df.empty:
return {"words_data": pd.DataFrame().to_json(orient='split'),
"segments_data": pd.DataFrame().to_json(orient='split')}
segments_df['Translated_Text'] = segments_df['text'].apply(lambda x: self.translator.translate(x))
segments_df['Sentiment_Scores'] = segments_df['Translated_Text'].apply(
lambda x: {entry['label']: entry['score'] for entry in self.text_classifier(x)[0]})
sentiment_df = segments_df['Sentiment_Scores'].apply(pd.Series)
final_segments_df = pd.concat([segments_df.drop(columns=['Sentiment_Scores']), sentiment_df], axis=1)
# Process words data
word_texts, word_starts, word_ends = [], [], []
for segment in result['segments']:
for word in segment['words']:
word_texts.append(word['text'])
word_starts.append(word['start'])
word_ends.append(word['end'])
words_df = pd.DataFrame({'text': word_texts, 'start': word_starts, 'end': word_ends})
words_df['second'] = words_df['start'].apply(lambda x: int(np.ceil(x)))
words_grouped = words_df.groupby('second').agg(
{'text': lambda x: ' '.join(x), 'start': 'min', 'end': 'max'}).reset_index()
max_second = int(video_clip.duration)
all_seconds = pd.DataFrame({'second': np.arange(0, max_second + 1)})
words_grouped = all_seconds.merge(words_grouped, on='second', how='left').fillna(
{'text': '', 'start': 0, 'end': 0})
emotion_columns = final_segments_df.columns.difference(['text', 'start', 'end', 'Translated_Text'])
for col in emotion_columns:
words_grouped[col] = np.nan
for i, row in words_grouped.iterrows():
matching_segment = final_segments_df[
(final_segments_df['start'] <= row['start']) & (final_segments_df['end'] >= row['end'])]
if not matching_segment.empty:
for emotion in emotion_columns:
words_grouped.at[i, emotion] = matching_segment.iloc[0][emotion]
words_grouped[emotion_columns] = words_grouped[emotion_columns].fillna(0)
return {
"words_data": words_grouped.to_json(orient='split'),
"segments_data": final_segments_df.to_json(orient='split')
}
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
os.remove(temp_audio_path)