Kevin King commited on
Commit
cf09d5c
·
1 Parent(s): abd725c

REFAC: Simplify emotion vector creation and enhance video processing logic in Streamlit app

Browse files
Files changed (1) hide show
  1. src/streamlit_app.py +137 -120
src/streamlit_app.py CHANGED
@@ -27,7 +27,6 @@ st.title("AffectLink: Post-Hoc Emotion Analysis")
27
  st.write("Upload a short video clip (under 30 seconds) to see a multimodal emotion analysis.")
28
 
29
  # --- Logger Configuration ---
30
- logging.basicConfig(level=logging.INFO)
31
  # [Logger setup remains the same]
32
 
33
  # --- Emotion Mappings ---
@@ -51,16 +50,17 @@ def load_models():
51
  whisper_model, text_classifier, ser_model, ser_feature_extractor = load_models()
52
 
53
  # --- Helper Functions for Analysis ---
54
- def create_unified_vector(scores_dict, mapping_dict):
55
  vector = np.zeros(len(UNIFIED_EMOTIONS))
56
- for label, score in scores_dict.items():
57
- unified_label = mapping_dict.get(label)
58
- if unified_label in UNIFIED_EMOTIONS:
59
- vector[UNIFIED_EMOTIONS.index(unified_label)] += score
60
- norm = np.linalg.norm(vector)
61
- return vector / norm if norm > 0 else vector
62
 
63
  def get_consistency_level(cosine_sim):
 
64
  if cosine_sim >= 0.8: return "High"
65
  if cosine_sim >= 0.6: return "Medium"
66
  if cosine_sim >= 0.3: return "Low"
@@ -71,6 +71,9 @@ uploaded_file = st.file_uploader("Choose a video file...", type=["mp4", "mov", "
71
 
72
  if uploaded_file is not None:
73
  temp_video_path = None
 
 
 
74
  try:
75
  with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tfile:
76
  tfile.write(uploaded_file.read())
@@ -79,142 +82,156 @@ if uploaded_file is not None:
79
  st.video(temp_video_path)
80
 
81
  if st.button("Analyze Video"):
82
- # Dictionaries to hold all results
83
- fer_timeline = {}
84
- ser_timeline = {}
85
- ter_timeline = {}
86
  full_transcription = "No speech detected."
87
 
88
  video_clip_for_duration = VideoFileClip(temp_video_path)
89
  duration = video_clip_for_duration.duration
90
- video_clip_for_duration.close()
91
-
92
-
93
- # --- Video Processing ---
94
  with st.spinner("Analyzing facial expressions..."):
95
- cap = None
96
- try:
97
- cap = cv2.VideoCapture(temp_video_path)
98
- fps = cap.get(cv2.CAP_PROP_FPS) or 30
99
- frame_count = 0
100
- while cap.isOpened():
101
- ret, frame = cap.read()
102
- if not ret: break
103
- timestamp = frame_count / fps
104
- if frame_count % int(fps) == 0:
105
- analysis = DeepFace.analyze(frame, actions=['emotion'], enforce_detection=False, silent=True)
106
- if isinstance(analysis, list) and len(analysis) > 0:
107
- fer_timeline[timestamp] = analysis[0]['emotion']
108
- frame_count += 1
109
- finally:
110
- if cap: cap.release()
111
-
112
- # --- Audio Processing ---
113
  with st.spinner("Analyzing audio and text..."):
114
- # --- THIS IS THE FIX ---
115
- video_clip = None
116
- # =======================
117
- try:
118
- video_clip = VideoFileClip(temp_video_path)
119
- if video_clip.audio:
120
- with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as taudio:
121
- video_clip.audio.write_audiofile(taudio.name, fps=AUDIO_SAMPLE_RATE, logger=None)
122
- temp_audio_path = taudio.name
123
-
124
- whisper_result = whisper_model.transcribe(temp_audio_path, word_timestamps=True, fp16=False)
125
- full_transcription = whisper_result['text'].strip()
 
 
 
 
 
 
 
126
 
127
- audio_array, _ = sf.read(temp_audio_path, dtype='float32')
128
- if audio_array.ndim == 2: audio_array = audio_array.mean(axis=1)
129
-
130
- for i in range(int(duration)):
131
- start_sample = i * AUDIO_SAMPLE_RATE
132
- end_sample = (i + 1) * AUDIO_SAMPLE_RATE
133
- chunk = audio_array[start_sample:end_sample]
134
-
135
- if len(chunk) > 400:
136
- inputs = ser_feature_extractor(chunk, sampling_rate=AUDIO_SAMPLE_RATE, return_tensors="pt", padding=True)
137
- with torch.no_grad():
138
- logits = ser_model(**inputs).logits
139
- scores = torch.nn.functional.softmax(logits, dim=1).squeeze()
140
- ser_timeline[i] = {ser_model.config.id2label[k]: score.item() for k, score in enumerate(scores)}
141
-
142
- words_in_segment = [seg['word'] for seg in whisper_result['segments'] if seg['start'] >= i and seg['start'] < i+1 for seg in seg.get('words', [])]
143
- segment_text = " ".join(words_in_segment).strip()
144
- if segment_text:
145
- text_emotions = text_classifier(segment_text)[0]
146
- ter_timeline[i] = {emo['label']: emo['score'] for emo in text_emotions}
147
- finally:
148
- if video_clip: video_clip.close()
149
- if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path): os.unlink(temp_audio_path)
150
 
151
- # --- Post-Analysis and Visualization ---
152
  st.header("Analysis Results")
153
-
154
- fer_df = pd.DataFrame.from_dict(fer_timeline, orient='index').rename(columns=FACIAL_TO_UNIFIED)
155
- ser_df = pd.DataFrame.from_dict(ser_timeline, orient='index').rename(columns=SER_TO_UNIFIED)
156
- ter_df = pd.DataFrame.from_dict(ter_timeline, orient='index').rename(columns=TEXT_TO_UNIFIED)
157
-
158
- fer_avg_scores = fer_df[UNIFIED_EMOTIONS].mean().to_dict()
159
- ser_avg_scores = ser_df[UNIFIED_EMOTIONS].mean().to_dict()
160
- ter_avg_scores = ter_df[UNIFIED_EMOTIONS].mean().to_dict()
161
-
162
- fer_vector = create_unified_vector(fer_avg_scores, {e: e for e in UNIFIED_EMOTIONS})
163
- ser_vector = create_unified_vector(ser_avg_scores, {e: e for e in UNIFIED_EMOTIONS})
164
- text_vector = create_unified_vector(ter_avg_scores, {e: e for e in UNIFIED_EMOTIONS})
165
 
166
- sim_face_text = cosine_similarity([fer_vector], [text_vector])[0][0]
167
- sim_face_speech = cosine_similarity([fer_vector], [ser_vector])[0][0]
168
- sim_speech_text = cosine_similarity([ser_vector], [text_vector])[0][0]
169
- avg_similarity = np.mean([sim for sim in [sim_face_text, sim_face_speech, sim_speech_text] if not np.isnan(sim)])
170
-
171
- dominant_fer = max(fer_avg_scores, key=fer_avg_scores.get) if fer_avg_scores else "N/A"
172
- dominant_text_raw = max(ter_avg_scores, key=ter_avg_scores.get) if ter_avg_scores else "N/A"
173
- dominant_ser_raw = max(ser_avg_scores, key=ser_avg_scores.get) if ser_avg_scores else "N/A"
174
-
175
- display_fer = FACIAL_TO_UNIFIED.get(dominant_fer.lower(), "N/A").capitalize()
176
- display_text = TEXT_TO_UNIFIED.get(dominant_text_raw, "N/A").capitalize()
177
- display_ser = SER_TO_UNIFIED.get(dominant_ser_raw, "N/A").capitalize()
 
 
 
 
 
 
 
 
 
 
178
 
179
  col1, col2 = st.columns([1, 2])
180
  with col1:
181
  st.subheader("Multimodal Summary")
182
  st.write(f"**Transcription:** \"{full_transcription}\"")
183
- st.metric("Dominant Facial Emotion", display_fer)
184
- st.metric("Dominant Text Emotion", display_text)
185
- st.metric("Dominant Speech Emotion", display_ser)
186
  st.metric("Emotion Consistency", get_consistency_level(avg_similarity), f"{avg_similarity:.2f} Avg. Cosine Similarity")
187
 
188
  with col2:
189
  st.subheader("Unified Emotion Timeline")
190
- combined_df = pd.DataFrame(index=range(int(duration)))
191
- for emotion in UNIFIED_EMOTIONS:
192
- if emotion in fer_df: combined_df[f'Facial_{emotion}'] = fer_df[emotion]
193
- if emotion in ser_df: combined_df[f'Speech_{emotion}'] = ser_df[emotion]
194
- if emotion in ter_df: combined_df[f'Text_{emotion}'] = ter_df[emotion]
195
 
196
- combined_df.ffill(inplace=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
  combined_df.fillna(0, inplace=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
198
 
199
- fig, ax = plt.subplots(figsize=(10, 5))
200
- colors = {'happy': 'green', 'sad': 'blue', 'angry': 'red', 'neutral': 'gray'}
201
- styles = {'Facial': '-', 'Speech': '--', 'Text': ':'}
202
-
203
- for col in combined_df.columns:
204
- modality, emotion = col.split('_')
205
- if emotion in colors:
206
- ax.plot(combined_df.index, combined_df[col], label=f'{modality} {emotion.capitalize()}', color=colors[emotion], linestyle=styles[modality], alpha=0.8)
207
-
208
- ax.set_title("Emotion Confidence Over Time")
209
- ax.set_xlabel("Time (seconds)")
210
- ax.set_ylabel("Confidence Score")
211
- ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
212
- ax.grid(True, which='both', linestyle='--', linewidth=0.5)
213
- plt.tight_layout()
214
- st.pyplot(fig)
215
  finally:
 
 
216
  if temp_video_path and os.path.exists(temp_video_path):
217
  time.sleep(1)
218
  try:
219
  os.unlink(temp_video_path)
220
- except Exception: pass
 
 
27
  st.write("Upload a short video clip (under 30 seconds) to see a multimodal emotion analysis.")
28
 
29
  # --- Logger Configuration ---
 
30
  # [Logger setup remains the same]
31
 
32
  # --- Emotion Mappings ---
 
50
  whisper_model, text_classifier, ser_model, ser_feature_extractor = load_models()
51
 
52
  # --- Helper Functions for Analysis ---
53
+ def create_unified_vector(scores_dict):
54
  vector = np.zeros(len(UNIFIED_EMOTIONS))
55
+ total_score = sum(scores_dict.values())
56
+ if total_score > 0:
57
+ for label, score in scores_dict.items():
58
+ if label in UNIFIED_EMOTIONS:
59
+ vector[UNIFIED_EMOTIONS.index(label)] = score / total_score
60
+ return vector
61
 
62
  def get_consistency_level(cosine_sim):
63
+ if np.isnan(cosine_sim): return "N/A"
64
  if cosine_sim >= 0.8: return "High"
65
  if cosine_sim >= 0.6: return "Medium"
66
  if cosine_sim >= 0.3: return "Low"
 
71
 
72
  if uploaded_file is not None:
73
  temp_video_path = None
74
+ # --- THIS IS THE FIX ---
75
+ video_clip_for_duration = None
76
+ # ========================
77
  try:
78
  with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tfile:
79
  tfile.write(uploaded_file.read())
 
82
  st.video(temp_video_path)
83
 
84
  if st.button("Analyze Video"):
85
+ fer_timeline, ser_timeline, ter_timeline = {}, {}, {}
 
 
 
86
  full_transcription = "No speech detected."
87
 
88
  video_clip_for_duration = VideoFileClip(temp_video_path)
89
  duration = video_clip_for_duration.duration
90
+
 
 
 
91
  with st.spinner("Analyzing facial expressions..."):
92
+ cap = cv2.VideoCapture(temp_video_path)
93
+ fps = cap.get(cv2.CAP_PROP_FPS) or 30
94
+ frame_count = 0
95
+ while cap.isOpened():
96
+ ret, frame = cap.read()
97
+ if not ret: break
98
+ timestamp = frame_count / fps
99
+ if frame_count % int(fps) == 0:
100
+ analysis = DeepFace.analyze(frame, actions=['emotion'], enforce_detection=False, silent=True)
101
+ if isinstance(analysis, list) and len(analysis) > 0:
102
+ fer_timeline[timestamp] = {k: v / 100.0 for k, v in analysis[0]['emotion'].items()}
103
+ frame_count += 1
104
+ cap.release()
105
+
 
 
 
 
106
  with st.spinner("Analyzing audio and text..."):
107
+ if video_clip_for_duration.audio:
108
+ with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as taudio:
109
+ video_clip_for_duration.audio.write_audiofile(taudio.name, fps=AUDIO_SAMPLE_RATE, logger=None)
110
+ temp_audio_path = taudio.name
111
+
112
+ whisper_result = whisper_model.transcribe(
113
+ temp_audio_path,
114
+ word_timestamps=True,
115
+ fp16=False,
116
+ condition_on_previous_text=False
117
+ )
118
+ full_transcription = whisper_result['text'].strip()
119
+
120
+ audio_array, _ = sf.read(temp_audio_path, dtype='float32')
121
+ if audio_array.ndim == 2: audio_array = audio_array.mean(axis=1)
122
+
123
+ for i in range(int(duration)):
124
+ start_sample, end_sample = i * AUDIO_SAMPLE_RATE, (i + 1) * AUDIO_SAMPLE_RATE
125
+ chunk = audio_array[start_sample:end_sample]
126
 
127
+ if len(chunk) > 400:
128
+ inputs = ser_feature_extractor(chunk, sampling_rate=AUDIO_SAMPLE_RATE, return_tensors="pt", padding=True)
129
+ with torch.no_grad():
130
+ logits = ser_model(**inputs).logits
131
+ scores = torch.nn.functional.softmax(logits, dim=1).squeeze()
132
+ ser_timeline[i] = {ser_model.config.id2label[k]: score.item() for k, score in enumerate(scores)}
133
+
134
+ words_in_segment = [seg['word'] for seg in whisper_result.get('segments', []) if seg['start'] >= i and seg['start'] < i+1 for seg in seg.get('words', [])]
135
+ segment_text = " ".join(words_in_segment).strip()
136
+ if segment_text:
137
+ text_emotions = text_classifier(segment_text)[0]
138
+ ter_timeline[i] = {emo['label']: emo['score'] for emo in text_emotions}
 
 
 
 
 
 
 
 
 
 
 
139
 
 
140
  st.header("Analysis Results")
 
 
 
 
 
 
 
 
 
 
 
 
141
 
142
+ def process_and_get_dominant(timeline, mapping):
143
+ if not timeline: return "N/A", {}
144
+ df = pd.DataFrame.from_dict(timeline, orient='index')
145
+ unified_scores = {e: 0.0 for e in UNIFIED_EMOTIONS}
146
+ for raw_label, scores in df.items():
147
+ unified_label = mapping.get(raw_label)
148
+ if unified_label:
149
+ unified_scores[unified_label] += scores.mean()
150
+ if sum(unified_scores.values()) == 0: return "N/A", {}
151
+ dominant_emotion = max(unified_scores, key=unified_scores.get)
152
+ return dominant_emotion.capitalize(), unified_scores
153
+
154
+ dominant_fer, fer_avg_scores = process_and_get_dominant(fer_timeline, FACIAL_TO_UNIFIED)
155
+ dominant_ser, ser_avg_scores = process_and_get_dominant(ser_timeline, SER_TO_UNIFIED)
156
+ dominant_text, ter_avg_scores = process_and_get_dominant(ter_timeline, TEXT_TO_UNIFIED)
157
+
158
+ fer_vector = create_unified_vector(fer_avg_scores)
159
+ ser_vector = create_unified_vector(ser_avg_scores)
160
+ text_vector = create_unified_vector(ter_avg_scores)
161
+
162
+ similarities = [cosine_similarity([fer_vector], [text_vector])[0][0], cosine_similarity([fer_vector], [ser_vector])[0][0], cosine_similarity([ser_vector], [text_vector])[0][0]]
163
+ avg_similarity = np.nanmean([s for s in similarities if not np.isnan(s)])
164
 
165
  col1, col2 = st.columns([1, 2])
166
  with col1:
167
  st.subheader("Multimodal Summary")
168
  st.write(f"**Transcription:** \"{full_transcription}\"")
169
+ st.metric("Dominant Facial Emotion", dominant_fer)
170
+ st.metric("Dominant Text Emotion", dominant_text)
171
+ st.metric("Dominant Speech Emotion", dominant_ser)
172
  st.metric("Emotion Consistency", get_consistency_level(avg_similarity), f"{avg_similarity:.2f} Avg. Cosine Similarity")
173
 
174
  with col2:
175
  st.subheader("Unified Emotion Timeline")
 
 
 
 
 
176
 
177
+ def create_timeline_df(timeline, mapping):
178
+ if not timeline: return pd.DataFrame(columns=UNIFIED_EMOTIONS)
179
+ df = pd.DataFrame.from_dict(timeline, orient='index')
180
+ df_unified = pd.DataFrame(index=df.index, columns=UNIFIED_EMOTIONS).fillna(0.0)
181
+ for raw_col in df.columns:
182
+ unified_col = mapping.get(raw_col)
183
+ if unified_col:
184
+ df_unified[unified_col] += df[raw_col]
185
+ return df_unified
186
+
187
+ fer_df = create_timeline_df(fer_timeline, FACIAL_TO_UNIFIED)
188
+ ser_df = create_timeline_df(ser_timeline, SER_TO_UNIFIED)
189
+ ter_df = create_timeline_df(ter_timeline, TEXT_TO_UNIFIED)
190
+
191
+ full_index = np.arange(0, duration, 0.5)
192
+ combined_df = pd.DataFrame(index=full_index)
193
+
194
+ if not fer_df.empty:
195
+ fer_df_resampled = fer_df.reindex(fer_df.index.union(full_index)).interpolate(method='linear').reindex(full_index)
196
+ for e in UNIFIED_EMOTIONS: combined_df[f'Facial_{e}'] = fer_df_resampled.get(e, 0.0)
197
+
198
+ if not ser_df.empty:
199
+ ser_df_resampled = ser_df.reindex(ser_df.index.union(full_index)).interpolate(method='linear').reindex(full_index)
200
+ for e in UNIFIED_EMOTIONS: combined_df[f'Speech_{e}'] = ser_df_resampled.get(e, 0.0)
201
+
202
+ if not ter_df.empty:
203
+ ter_df_resampled = ter_df.reindex(ter_df.index.union(full_index)).interpolate(method='linear').reindex(full_index)
204
+ for e in UNIFIED_EMOTIONS: combined_df[f'Text_{e}'] = ter_df_resampled.get(e, 0.0)
205
+
206
  combined_df.fillna(0, inplace=True)
207
+
208
+ if not combined_df.empty:
209
+ fig, ax = plt.subplots(figsize=(10, 5))
210
+ colors = {'happy': 'green', 'sad': 'blue', 'angry': 'red', 'neutral': 'gray'}
211
+ styles = {'Facial': '-', 'Speech': '--', 'Text': ':'}
212
+
213
+ for col in combined_df.columns:
214
+ modality, emotion = col.split('_')
215
+ if emotion in colors:
216
+ ax.plot(combined_df.index, combined_df[col], label=f'{modality} {emotion.capitalize()}', color=colors[emotion], linestyle=styles[modality], alpha=0.8)
217
+
218
+ ax.set_title("Emotion Confidence Over Time (Normalized)")
219
+ ax.set_xlabel("Time (seconds)")
220
+ ax.set_ylabel("Confidence Score (0-1)")
221
+ ax.set_ylim(0, 1)
222
+ ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
223
+ ax.grid(True, which='both', linestyle='--', linewidth=0.5)
224
+ plt.tight_layout()
225
+ st.pyplot(fig)
226
+ else:
227
+ st.write("No emotion data available to plot.")
228
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
229
  finally:
230
+ if video_clip_for_duration: video_clip_for_duration.close()
231
+ if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path): os.unlink(temp_audio_path)
232
  if temp_video_path and os.path.exists(temp_video_path):
233
  time.sleep(1)
234
  try:
235
  os.unlink(temp_video_path)
236
+ except Exception:
237
+ pass