Spaces:
Sleeping
Sleeping
| import warnings | |
| from functions.models import models_dict | |
| warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow') | |
| import os | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' | |
| os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' | |
| import logging | |
| logging.getLogger('absl').setLevel(logging.ERROR) | |
| from moviepy.editor import VideoFileClip | |
| import pandas as pd | |
| from tqdm import tqdm | |
| import time | |
| import json | |
| import cv2 | |
| import dlib | |
| from collections import Counter | |
| import statistics | |
| import shutil | |
| import asyncio | |
| import traceback | |
| from functions.valence_arousal import va_predict | |
| from functions.speech import speech_predict | |
| from functions.eye_track import Facetrack, eye_track_predict | |
| from functions.fer import extract_face,fer_predict,plot_graph,filter | |
| # from app.utils.session import send_analytics, send_individual_analytics_files, send_combined_analytics_files, send_error | |
| # from app.utils.socket import ConnectionManager | |
| from typing import Callable | |
| session_data={} | |
| dnn_net=models_dict['face'][0] | |
| predictor=models_dict['face'][1] | |
| speech_model=models_dict['speech'] | |
| valence_dict_path=models_dict['vad'][0] | |
| arousal_dict_path=models_dict['vad'][1] | |
| dominance_dict_path=models_dict['vad'][2] | |
| valence_arousal_model=models_dict['valence_fer'][1] | |
| val_ar_feat_model=models_dict['valence_fer'][0] | |
| fer_model=models_dict['fer'] | |
| def analyze_live_video(video_path: str, uid: str, user_id: str, count: int, final: bool, log: Callable[[str], None]): | |
| try: | |
| #initilalizing lists | |
| global session_data | |
| if uid not in session_data: | |
| session_data[uid] = { | |
| "vcount":[], | |
| "duration":[], | |
| "eye": [], | |
| "fer": [], | |
| "valence":[], | |
| "arousal":[], | |
| "stress":[], | |
| "blinks": [], | |
| "class_wise_frame_counts": [], | |
| "speech_emotions": [], | |
| "speech_data":[], | |
| "word_weights_list": [] | |
| } | |
| print(f"UID: {uid}, User ID: {user_id}, Count: {count}, Final: {final}, Video: {video_path}") | |
| log(f"Analyzing video for question - {count}") | |
| output_dir = os.path.join('output',str(uid)) | |
| print(output_dir) | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| # Wait for previous files to be written if final | |
| if final and count > 1: | |
| for i in range(1, count): | |
| previous_file_name = os.path.join(output_dir, f"{i}.json") | |
| print(previous_file_name) | |
| while not os.path.exists(previous_file_name): | |
| time.sleep(1) | |
| video_clip = VideoFileClip(video_path) | |
| video_clip = video_clip.set_fps(30) | |
| print("Duration: ", video_clip.duration) | |
| session_data[uid]['vcount'].append(count) | |
| session_data[uid]['duration'].append(video_clip.duration) | |
| fps = video_clip.fps | |
| audio = video_clip.audio | |
| audio_path = os.path.join(output_dir,'extracted_audio.wav') | |
| audio.write_audiofile(audio_path) | |
| video_frames = [frame for frame in video_clip.iter_frames()] | |
| #Face extraction | |
| print("extracting faces") | |
| faces=[extract_face(frame,dnn_net,predictor) for frame in tqdm(video_frames)] | |
| print(f'{len([face for face in faces if face is not None])} faces found.') | |
| ##EYE TRACKING | |
| fc=Facetrack() | |
| log(f"Extracting eye features for question - {count}") | |
| eye_preds,blink_durations,total_blinks=eye_track_predict(fc,faces,fps) | |
| print(len(eye_preds)) | |
| print("total_blinks- ",total_blinks) | |
| session_data[uid]['eye'].append(eye_preds) | |
| session_data[uid]['blinks'].append(blink_durations) | |
| #FACIAL EXPRESSION RECOGNITION | |
| log(f"Extracting facial features for question - {count}") | |
| fer_emotions,class_wise_frame_count,em_tensors=fer_predict(faces,fps,fer_model) | |
| print("face emotions",len(fer_emotions)) | |
| session_data[uid]['fer'].append(fer_emotions) | |
| session_data[uid]['class_wise_frame_counts'].append(class_wise_frame_count) | |
| #VALENCE AROUSAL STRESS | |
| valence_list,arousal_list,stress_list=va_predict(valence_arousal_model,val_ar_feat_model,faces,list(em_tensors)) | |
| session_data[uid]['valence'].append(valence_list) | |
| session_data[uid]['arousal'].append(arousal_list) | |
| session_data[uid]['stress'].append(stress_list) | |
| log(f"Extracting speech features for question - {count}") | |
| emotions,major_emotion,word=speech_predict(audio_path,speech_model,valence_dict_path,arousal_dict_path,dominance_dict_path) | |
| session_data[uid]['speech_emotions'].append(emotions) | |
| session_data[uid]['word_weights_list'].append(word['word_weights']) | |
| session_data[uid]['speech_data'].append([float(word['average_pause_length'] if word and word['average_pause_length'] else 0),float(word['articulation_rate'] if word and word['articulation_rate'] else 0),float(word['speaking_rate'] if word and word['speaking_rate'] else 0)]) | |
| log(f"Generating the metadata for question - {count}") | |
| # Create Meta Data | |
| meta_data={} | |
| try: | |
| avg_blink_duration= float(sum(blink_durations)/(len(blink_durations))) | |
| except: | |
| avg_blink_duration=0 | |
| meta_data['vcount']=count | |
| meta_data['eye_emotion_recognition'] = { | |
| "blink_durations": blink_durations, | |
| "avg_blink_duration":avg_blink_duration, | |
| "total_blinks": total_blinks, | |
| "duration":video_clip.duration | |
| } | |
| meta_data['facial_emotion_recognition'] = { | |
| "class_wise_frame_count": class_wise_frame_count, | |
| } | |
| meta_data['speech_emotion_recognition'] = { | |
| 'major_emotion':str(major_emotion), | |
| 'pause_length':float(word['average_pause_length']), | |
| 'articulation_rate':float(word['articulation_rate']), | |
| 'speaking_rate':float(word['speaking_rate']), | |
| 'word_weights':word['word_weights'] | |
| } | |
| file_path=audio_path | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| print(f"{file_path} deleted") | |
| file_path='segment.wav' | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| print(f"{file_path} deleted") | |
| print("Individual: ", meta_data) | |
| if not final: | |
| print("Not final Executing") | |
| log(f"Saving analytics for question - {count}") | |
| # send_analytics(valence_plot, arousal_plot,{ | |
| # "uid": uid, | |
| # "user_id": user_id, | |
| # "individual": meta_data, | |
| # "count": count | |
| # }) | |
| print("Sent analytics") | |
| # send_individual_analytics_files(uid, output_dir, count) | |
| dummy_file_path = os.path.join(output_dir, f'{count}.json') | |
| print("Writing dummy file: ", dummy_file_path) | |
| with open(dummy_file_path, 'w') as dummy_file: | |
| json.dump({"status": "completed"}, dummy_file) | |
| return | |
| # Process combined | |
| log(f"Processing gathered data for final output") | |
| vcount=session_data[uid]['vcount'] | |
| sorted_indices = sorted(range(len(vcount)), key=lambda i: vcount[i]) | |
| for key in session_data[uid]: | |
| # Only sort lists that are the same length as vcount | |
| if len(session_data[uid][key]) == len(vcount): | |
| session_data[uid][key] = [session_data[uid][key][i] for i in sorted_indices] | |
| videos=len(session_data[uid]['vcount']) | |
| #INDIV PLOT SAVING | |
| combined_speech=[] | |
| combined_valence=[] | |
| combined_arousal=[] | |
| combined_stress=[] | |
| combined_fer=[] | |
| combined_eye=[] | |
| vid_index=[] | |
| combined_speech=[] | |
| combined_blinks=[] | |
| for i in range(videos): | |
| for j in range(len(session_data[uid]['speech_emotions'][i])): | |
| vid_index.append(i+1) | |
| combined_speech+=session_data[uid]['speech_emotions'][i] | |
| timestamps=[i*3 for i in range(len(combined_speech))] | |
| df = pd.DataFrame({ | |
| 'timestamps':timestamps, | |
| 'video_index':vid_index, | |
| 'speech_emotion':combined_speech | |
| }) | |
| df.to_csv(os.path.join(output_dir,'combined_speech.csv'), index=False) | |
| vid_index=[] | |
| for i in range(videos): | |
| timestamps=[j/30 for j in range(len(session_data[uid]['valence'][i]))] | |
| for j in range(len(timestamps)): | |
| vid_index.append(i+1) | |
| folder_path=os.path.join(output_dir,f"{session_data[uid]['vcount'][i]}") | |
| os.makedirs(folder_path, exist_ok=True) | |
| plot_graph(timestamps,session_data[uid]['valence'][i],'valence',os.path.join(folder_path,'valence.png')) | |
| plot_graph(timestamps,session_data[uid]['arousal'][i],'arousal',os.path.join(folder_path,'arousal.png')) | |
| plot_graph(timestamps,session_data[uid]['stress'][i],'stress',os.path.join(folder_path,'stress.png')) | |
| combined_arousal+=session_data[uid]['arousal'][i] | |
| combined_valence+=session_data[uid]['valence'][i] | |
| combined_stress+=session_data[uid]['stress'][i] | |
| combined_fer+=session_data[uid]['fer'][i] | |
| combined_blinks+=session_data[uid]['blinks'][i] | |
| # combined_class_wise_frame_count+=session_data[uid]['class_wise_frame_counts'][i] | |
| try: | |
| max_value=max([x for x in combined_eye if isinstance(x, (int, float))]) | |
| except: | |
| max_value=0 | |
| session_data[uid]['eye'][i]=[x + max_value if isinstance(x, (int, float)) else x for x in session_data[uid]['eye'][i]] | |
| combined_eye+=session_data[uid]['eye'][i] | |
| timestamps=[i/fps for i in range(len(combined_arousal))] | |
| plot_graph(timestamps,combined_valence,'valence',os.path.join(output_dir,'valence.png')) | |
| plot_graph(timestamps,combined_arousal,'arousal',os.path.join(output_dir,'arousal.png')) | |
| plot_graph(timestamps,combined_stress,'stress',os.path.join(output_dir,'stress.png')) | |
| print(len(timestamps),len(vid_index),len(combined_fer),len(combined_valence),len(combined_arousal),len(combined_stress),len(combined_eye)) | |
| df = pd.DataFrame({ | |
| 'timestamps':timestamps, | |
| 'video_index': vid_index, # Add a column for video index | |
| 'fer': combined_fer, | |
| 'valence': combined_valence, | |
| 'arousal': combined_arousal, | |
| 'stress': combined_stress, | |
| 'eye': combined_eye, | |
| }) | |
| df.to_csv(os.path.join(output_dir,'combined_data.csv'), index=False) | |
| #generate metadata for Combined | |
| comb_meta_data={} | |
| try: | |
| avg_blink_duration= float(sum(combined_blinks)/(len(combined_blinks))) | |
| except: | |
| avg_blink_duration=0 | |
| total_blinks=max([x for x in combined_eye if isinstance(x, (int, float))]) | |
| comb_meta_data['eye_emotion_recognition'] = { | |
| "avg_blink_duration":avg_blink_duration, | |
| "total_blinks": total_blinks, | |
| } | |
| dict_list = session_data[uid]['class_wise_frame_counts'] | |
| result = {} | |
| for d in dict_list: | |
| for key,value in d.items(): | |
| result[key]=result.get(key,0)+value | |
| comb_meta_data['facial_emotion_recognition'] = { | |
| "class_wise_frame_count": result, | |
| } | |
| combined_weights = Counter() | |
| for word_weight in session_data[uid]['word_weights_list']: | |
| combined_weights.update(word_weight) | |
| combined_weights_dict = dict(combined_weights) | |
| print(combined_weights_dict) | |
| comb_meta_data['speech_emotion_recognition'] = { | |
| 'major_emotion':str(major_emotion), | |
| 'pause_length':statistics.mean([row[0] for row in session_data[uid]['speech_data']]), | |
| 'articulation_rate':statistics.mean([row[1] for row in session_data[uid]['speech_data']]), | |
| 'speaking_rate':statistics.mean([row[2] for row in session_data[uid]['speech_data']]), | |
| 'word_weights':combined_weights_dict | |
| } | |
| with open(os.path.join(output_dir,'combined.json'), 'w') as json_file: | |
| json.dump(comb_meta_data, json_file) | |
| log(f"Saving analytics for final output") | |
| # send_analytics(valence_plot, arousal_plot,{ | |
| # "uid": uid, | |
| # "user_id": user_id, | |
| # "individual": meta_data, | |
| # "combined": combined_meta_data, | |
| # "count": count | |
| # }) | |
| # send_individual_analytics_files(uid, output_dir, count) | |
| # send_combined_analytics_files(uid, output_dir) | |
| # shutil.rmtree(output_dir) | |
| # print(f"Deleted output directory: {output_dir}") | |
| except Exception as e: | |
| print("Error analyzing video...: ", e) | |
| error_trace = traceback.format_exc() | |
| print("Error Trace: ", error_trace) | |
| log(f"Error analyzing video for question - {count}") | |
| # send_error(uid, { | |
| # "message": str(e), | |
| # "trace": error_trace | |
| # }) | |
| shutil.rmtree('output') | |
| print(f"Deleted output directory: {output_dir}") | |
| # st=time.time() | |
| # # analyze_live_video(video_path, uid, user_id, count, final, log) | |
| # analyze_live_video('videos/s2.webm', 1,1,1,False,print) | |
| # analyze_live_video('videos/a4.webm', 1,1,2,True,print) | |
| # analyze_live_video('videos/s2.webm', 1,1,2,True,print) | |
| # print("time taken - ",time.time()-st) |