| | import os |
| | import cv2 |
| | import pickle |
| | import numpy as np |
| | from keras.models import load_model |
| | from moviepy.editor import VideoFileClip |
| | import librosa |
| | from sklearn.preprocessing import LabelEncoder |
| | import xml.etree.ElementTree as ET |
| | import random |
| |
|
| | class VideoAudioFeatureExtractor: |
| | def __init__(self, video_path, output_path): |
| | self.video_path = video_path |
| | self.output_path = output_path |
| | self.num_frames = 15 |
| | self.height = 224 |
| | self.width = 224 |
| | self.channels = 6 |
| | self.audio_feature_dim = 20 |
| | self.num_actions = 3 |
| | self.classification_model = load_model("EdAi-gamestyle.h5") |
| | with open("label_encoder.pkl", "rb") as f: |
| | self.label_encoder = pickle.load(f) |
| | with open("clip_items.pkl", "rb") as f: |
| | self.all_clip_items = pickle.load(f) |
| |
|
| | def generate_clipitem_id(self, video_name, num_digits=10): |
| | random_number = ''.join(random.choices('0123456789', k=num_digits)) |
| | return f"{video_name}{random_number}" |
| |
|
| | def combine_features(self, video_features, audio_features): |
| | audio_features_resized = cv2.resize(audio_features, (self.height, self.width)) |
| | audio_features_reshaped = np.repeat(audio_features_resized[:, :, np.newaxis], 3, axis=2) |
| | audio_features_reshaped = np.repeat(audio_features_reshaped[np.newaxis, :, :, :], self.num_frames, axis=0) |
| | combined_features = np.concatenate([video_features, audio_features_reshaped], axis=-1) |
| | return combined_features |
| |
|
| | def extract_audio(self, video_path): |
| | clip = VideoFileClip(video_path) |
| | audio = clip.audio |
| | audio_signal = audio.to_soundarray(fps=48000) |
| | audio_signal = audio_signal.astype(np.float32) |
| | if len(audio_signal.shape) == 2: |
| | audio_signal = librosa.to_mono(audio_signal.T) |
| | return audio_signal |
| |
|
| | def process_audio(self, audio_signal_segment): |
| | mfccs = librosa.feature.mfcc(y=audio_signal_segment, sr=48000, n_mfcc=self.audio_feature_dim) |
| | mfccs_fixed_length = librosa.util.fix_length(mfccs, size=self.num_frames) |
| | return mfccs_fixed_length.T |
| |
|
| | def preprocess_frame(self, frame): |
| | frame_resized = cv2.resize(frame, (self.width, self.height)) |
| | frame_normalized = (frame_resized / 255.0).astype(np.float32) |
| | return frame_normalized |
| |
|
| | def extract_clip(self, video, audio_features, decision): |
| | start_frame = int(decision["in"]) |
| | end_frame = int(decision["out"]) |
| | video.set(cv2.CAP_PROP_POS_FRAMES, start_frame) |
| | clip_frames = [] |
| | for i in range(start_frame, end_frame, 2): |
| | ret, frame = video.read() |
| | if not ret: |
| | break |
| | frame_processed = self.preprocess_frame(frame) |
| | clip_frames.append(frame_processed) |
| | while len(clip_frames) < self.num_frames: |
| | clip_frames.append(np.zeros((self.height, self.width, 3), dtype=np.float32)) |
| | clip_frames = np.array(clip_frames[:self.num_frames]) |
| | return clip_frames, audio_features |
| |
|
| | def create_audio_track(self, clip_item_ids, pan_value, link_ids, video_clip_items, video_file_elements): |
| | track = ET.Element("track") |
| | for idx, clipitem_id in enumerate(clip_item_ids): |
| | clipitem_element = ET.Element("clipitem", id=clipitem_id) |
| | video_clip_item = video_clip_items[idx] |
| | in_value = video_clip_item['in'] |
| | out_value = video_clip_item['out'] |
| | start_value = video_clip_item['start'] |
| | end_value = video_clip_item['end'] |
| | ET.SubElement(clipitem_element, "name").text = video_clip_item['name'] |
| | ET.SubElement(clipitem_element, "duration").text = video_clip_item['duration'] |
| | rate_elem = ET.SubElement(clipitem_element, "rate") |
| | ET.SubElement(rate_elem, "ntsc").text = "TRUE" |
| | ET.SubElement(rate_elem, "timebase").text = "30" |
| | ET.SubElement(clipitem_element, "in").text = in_value |
| | ET.SubElement(clipitem_element, "out").text = out_value |
| | ET.SubElement(clipitem_element, "start").text = start_value |
| | ET.SubElement(clipitem_element, "end").text = end_value |
| | ET.SubElement(clipitem_element, "masterclipid").text = os.path.basename(self.video_path) |
| | sourcetrack = ET.SubElement(clipitem_element, "sourcetrack") |
| | ET.SubElement(sourcetrack, "mediatype").text = "audio" |
| | ET.SubElement(sourcetrack, "trackindex").text = str(idx + 1) |
| | video_file_elem = video_clip_item.get("file") |
| | if video_file_elem is not None: |
| | ET.SubElement(clipitem_element, "file", id=video_file_elem.get("id")) |
| | else: |
| | ET.SubElement(clipitem_element, "file", id=os.path.basename(self.video_path).split('.')[0]) |
| | filter_elem = ET.SubElement(clipitem_element, "filter") |
| | effect_elem = ET.SubElement(filter_elem, "effect") |
| | ET.SubElement(effect_elem, "name").text = "Audio Levels" |
| | ET.SubElement(effect_elem, "effectid").text = "audiolevels" |
| | ET.SubElement(effect_elem, "effectcategory").text = "audiolevels" |
| | ET.SubElement(effect_elem, "effecttype").text = "audiolevels" |
| | ET.SubElement(effect_elem, "mediatype").text = "audio" |
| | parameter_elem = ET.SubElement(effect_elem, "parameter") |
| | ET.SubElement(parameter_elem, "name").text = "Level" |
| | ET.SubElement(parameter_elem, "parameterid").text = "level" |
| | ET.SubElement(parameter_elem, "valuemin").text = "0" |
| | ET.SubElement(parameter_elem, "valuemax").text = "3.98109" |
| | ET.SubElement(parameter_elem, "value").text = "1" |
| | filter_elem = ET.SubElement(clipitem_element, "filter") |
| | effect_elem = ET.SubElement(filter_elem, "effect") |
| | ET.SubElement(effect_elem, "name").text = "Audio Pan" |
| | ET.SubElement(effect_elem, "effectid").text = "audiopan" |
| | ET.SubElement(effect_elem, "effectcategory").text = "audiopan" |
| | ET.SubElement(effect_elem, "effecttype").text = "audiopan" |
| | ET.SubElement(effect_elem, "mediatype").text = "audio" |
| | parameter_elem = ET.SubElement(effect_elem, "parameter") |
| | ET.SubElement(parameter_elem, "name").text = "Pan" |
| | ET.SubElement(parameter_elem, "parameterid").text = "pan" |
| | ET.SubElement(parameter_elem, "valuemin").text = "-1" |
| | ET.SubElement(parameter_elem, "valuemax").text = "1" |
| | ET.SubElement(parameter_elem, "value").text = str(pan_value) |
| | for link_id in link_ids[idx]: |
| | link_elem = ET.SubElement(clipitem_element, "link") |
| | ET.SubElement(link_elem, "linkclipref").text = link_id |
| | track.append(clipitem_element) |
| | self.adjust_clipitem_start_end(track.findall('clipitem')) |
| | return track |
| |
|
| | def create_clip_structure(self, video_name, video_path, total_duration): |
| | clip = ET.Element("clip", id=video_name) |
| | ET.SubElement(clip, "updatebehavior").text = "add" |
| | ET.SubElement(clip, "name").text = video_name |
| | ET.SubElement(clip, "duration").text = str(total_duration) |
| | rate = ET.SubElement(clip, "rate") |
| | ET.SubElement(rate, "ntsc").text = "TRUE" |
| | ET.SubElement(rate, "timebase").text = "30" |
| | ET.SubElement(clip, "in").text = "-1" |
| | ET.SubElement(clip, "out").text = "-1" |
| | ET.SubElement(clip, "masterclipid").text = video_name |
| | ET.SubElement(clip, "ismasterclip").text = "TRUE" |
| |
|
| | |
| | logginginfo = ET.SubElement(clip, "logginginfo") |
| | ET.SubElement(logginginfo, "scene") |
| | ET.SubElement(logginginfo, "shottake") |
| | ET.SubElement(logginginfo, "lognote") |
| | ET.SubElement(logginginfo, "good").text = "FALSE" |
| |
|
| | |
| | labels = ET.SubElement(clip, "labels") |
| | ET.SubElement(labels, "label2") |
| |
|
| | |
| | comments = ET.SubElement(clip, "comments") |
| | ET.SubElement(comments, "mastercomment1") |
| | ET.SubElement(comments, "mastercomment2") |
| | ET.SubElement(comments, "mastercomment3") |
| | ET.SubElement(comments, "mastercomment4") |
| |
|
| | |
| | media = ET.SubElement(clip, "media") |
| | video = ET.SubElement(media, "video") |
| | track = ET.SubElement(video, "track") |
| | clipitem = ET.SubElement(track, "clipitem", id=f"{video_name}1") |
| | ET.SubElement(clipitem, "name").text = video_name |
| | ET.SubElement(clipitem, "duration").text = str(total_duration) |
| | rate = ET.SubElement(clipitem, "rate") |
| | ET.SubElement(rate, "ntsc").text = "TRUE" |
| | ET.SubElement(rate, "timebase").text = "30" |
| | ET.SubElement(clipitem, "in").text = "0" |
| | ET.SubElement(clipitem, "out").text = str(total_duration) |
| | ET.SubElement(clipitem, "start").text = "0" |
| | ET.SubElement(clipitem, "end").text = str(total_duration) |
| | ET.SubElement(clipitem, "pixelaspectratio").text = "Square" |
| | ET.SubElement(clipitem, "anamorphic").text = "FALSE" |
| | ET.SubElement(clipitem, "alphatype").text = "none" |
| | ET.SubElement(clipitem, "masterclipid").text = video_name |
| |
|
| | |
| | file_id = video_name.split('.')[0] |
| | file = ET.SubElement(clipitem, "file") |
| | file.text = f'file id="{file_id}"' |
| | ET.SubElement(file, "name").text = video_name |
| | ET.SubElement(file, "pathurl").text = f"file://localhost/{video_path}" |
| | rate = ET.SubElement(file, "rate") |
| | ET.SubElement(rate, "timebase").text = "30" |
| | ET.SubElement(rate, "ntsc").text = "TRUE" |
| | ET.SubElement(file, "duration").text = str(total_duration) |
| | timecode = ET.SubElement(file, "timecode") |
| | ET.SubElement(timecode, "string").text = "00:00:00:00" |
| | ET.SubElement(timecode, "frame").text = "0" |
| | ET.SubElement(timecode, "rate").text = "30" |
| | ET.SubElement(timecode, "displayformat").text = "NDF" |
| | media = ET.SubElement(file, "media") |
| | video = ET.SubElement(media, "video") |
| | audio = ET.SubElement(media, "audio") |
| |
|
| | |
| | for i in range(2): |
| | track = ET.SubElement(audio, "track") |
| | clipitem = ET.SubElement(track, "clipitem", id=f"{video_name}{i + 2}") |
| | ET.SubElement(clipitem, "name").text = video_name |
| | ET.SubElement(clipitem, "duration").text = str(total_duration) |
| | rate = ET.SubElement(clipitem, "rate") |
| | ET.SubElement(rate, "ntsc").text = "TRUE" |
| | ET.SubElement(rate, "timebase").text = "30" |
| | ET.SubElement(clipitem, "in").text = "0" |
| | ET.SubElement(clipitem, "out").text = str(total_duration) |
| | ET.SubElement(clipitem, "start").text = "0" |
| | ET.SubElement(clipitem, "end").text = str(total_duration) |
| | ET.SubElement(clipitem, "masterclipid").text = video_name |
| | file_id = video_name.split('.')[0] |
| | file_elem = ET.SubElement(clipitem, "file", id=file_id) |
| | ET.SubElement(file_elem, "name").text = video_name |
| | ET.SubElement(file_elem, "pathurl").text = f"file://localhost/{video_path}" |
| | sourcetrack = ET.SubElement(clipitem, "sourcetrack") |
| | ET.SubElement(sourcetrack, "mediatype").text = "audio" |
| | ET.SubElement(sourcetrack, "trackindex").text = str(i + 1) |
| |
|
| | return clip |
| |
|
| | def adjust_clipitem_start_end(self, clipitems): |
| | last_end = 0 |
| | for clipitem in clipitems: |
| | in_val = int(clipitem.find('in').text) |
| | out_val = int(clipitem.find('out').text) |
| | clipitem.find('start').text = str(last_end) |
| | clipitem.find('end').text = str(last_end + (out_val - in_val)) |
| | last_end = int(clipitem.find('end').text) |
| |
|
| | def dict_to_xml(self, tag, dictionary): |
| | attributes = dictionary.pop('attributes', {}) |
| | if tag == "clipitem" or tag == "file": |
| | attributes["id"] = dictionary.pop("id", None) |
| | for key, val in attributes.items(): |
| | if val is None: |
| | attributes[key] = "none" |
| | elem = ET.Element(tag, **attributes) |
| | for key, val in dictionary.items(): |
| | if val is None: |
| | val = "none" |
| | if isinstance(val, dict): |
| | child = self.dict_to_xml(key, val) |
| | elem.append(child) |
| | elif isinstance(val, list): |
| | for item in val: |
| | child = self.dict_to_xml(key, item) |
| | elem.append(child) |
| | else: |
| | child = ET.Element(key) |
| | child.text = str(val) if val is not None else "none" |
| | elem.append(child) |
| | return elem |
| |
|
| | def create_sequence_structure(self, video_name, total_duration): |
| | sequence = ET.Element("sequence", id="Sequence 1") |
| | ET.SubElement(sequence, "updatebehavior").text = "add" |
| | ET.SubElement(sequence, "name").text = "Sequence 1" |
| | ET.SubElement(sequence, "duration").text = str(total_duration) |
| | rate = ET.SubElement(sequence, "rate") |
| | ET.SubElement(rate, "ntsc").text = "TRUE" |
| | ET.SubElement(rate, "timebase").text = "30" |
| | timecode = ET.SubElement(sequence, "timecode") |
| | rate_timecode = ET.SubElement(timecode, "rate") |
| | ET.SubElement(rate_timecode, "ntsc").text = "TRUE" |
| | ET.SubElement(rate_timecode, "timebase").text = "30" |
| | ET.SubElement(timecode, "frame").text = "107891" |
| | ET.SubElement(timecode, "source").text = "source" |
| | ET.SubElement(timecode, "displayformat").text = "DF" |
| | ET.SubElement(sequence, "in").text = "-1" |
| | ET.SubElement(sequence, "out").text = "-1" |
| | media = ET.SubElement(sequence, "media") |
| | video = ET.SubElement(media, "video") |
| | format_ = ET.SubElement(video, "format") |
| | samplecharacteristics = ET.SubElement(format_, "samplecharacteristics") |
| | ET.SubElement(samplecharacteristics, "width").text = "1920" |
| | ET.SubElement(samplecharacteristics, "height").text = "1080" |
| | ET.SubElement(samplecharacteristics, "pixelaspectratio").text = "Square" |
| | ET.SubElement(samplecharacteristics, "anamorphic").text = "FALSE" |
| | ET.SubElement(samplecharacteristics, "fielddominance").text = "none" |
| | rate_sample = ET.SubElement(samplecharacteristics, "rate") |
| | ET.SubElement(rate_sample, "ntsc").text = "TRUE" |
| | ET.SubElement(rate_sample, "timebase").text = "30" |
| | ET.SubElement(samplecharacteristics, "colordepth").text = "24" |
| | track = ET.SubElement(video, "track") |
| | return sequence, track, media |
| |
|
| | def process_video(self): |
| | for video_path in [self.video_path]: |
| | |
| | audio_signal = self.extract_audio(video_path) |
| | audio_frame_features = self.process_audio(audio_signal) |
| |
|
| | |
| | video = cv2.VideoCapture(video_path) |
| | ret, frame = video.read() |
| | if not ret: |
| | print(f"Failed to read video: {video_path}") |
| | continue |
| | clip_frames = [] |
| | while len(clip_frames) < self.num_frames: |
| | frame_processed = self.preprocess_frame(frame) |
| | clip_frames.append(frame_processed) |
| | ret, frame = video.read() |
| | if not ret: |
| | break |
| | clip_frames = np.array(clip_frames[:self.num_frames]) |
| | video.release() |
| |
|
| | |
| | combined_features_for_prediction = self.combine_features(clip_frames, audio_frame_features) |
| |
|
| | |
| | prediction = self.classification_model.predict( |
| | [np.array([combined_features_for_prediction]), np.array([audio_frame_features])]) |
| | predicted_label = self.label_encoder.inverse_transform([np.argmax(prediction)])[0] |
| |
|
| | |
| | if predicted_label in self.all_clip_items: |
| | clip_items_for_predicted_label = self.all_clip_items[predicted_label] |
| | print("Clip items for predicted label:", clip_items_for_predicted_label) |
| |
|
| | |
| | total_duration = sum([int(clip_item["duration"]) for clip_item in clip_items_for_predicted_label]) |
| |
|
| | |
| | root = ET.Element("xmeml", version="5") |
| |
|
| | |
| | project = ET.SubElement(root, "project") |
| | ET.SubElement(project, "name").text = "Untitled Project 1" |
| | children = ET.SubElement(project, "children") |
| |
|
| | |
| | bin_element = ET.Element("bin") |
| | ET.SubElement(bin_element, "updatebehavior").text = "add" |
| | ET.SubElement(bin_element, "name").text = "Custom Bins" |
| | ET.SubElement(bin_element, "children") |
| | children.append(bin_element) |
| |
|
| | |
| | video_name_with_extension = os.path.basename(video_path) |
| |
|
| | |
| | clip_structure = self.create_clip_structure(video_name_with_extension, video_path, |
| | total_duration) |
| | children.append(clip_structure) |
| |
|
| | |
| | sequence, track, media = self.create_sequence_structure(video_name_with_extension, total_duration) |
| | children.append(sequence) |
| |
|
| | clip_item_ids = [] |
| | link_ids_list = [] |
| | video_file_elements = [] |
| |
|
| | for clip_item in clip_items_for_predicted_label: |
| | |
| | video_clip_item_id = self.generate_clipitem_id(video_name_with_extension) |
| | audio_clip_item_id_1 = self.generate_clipitem_id(video_name_with_extension) |
| | audio_clip_item_id_2 = self.generate_clipitem_id(video_name_with_extension) |
| |
|
| | |
| | clip_item_ids.append(video_clip_item_id) |
| | link_ids_list.append([video_clip_item_id, audio_clip_item_id_1, audio_clip_item_id_2]) |
| |
|
| | clip_item['id'] = video_clip_item_id |
| | clip_item['name'] = video_name_with_extension |
| |
|
| | |
| | clipitem_element = self.dict_to_xml("clipitem", clip_item) |
| |
|
| | |
| | for link in clipitem_element.findall("link"): |
| | clipitem_element.remove(link) |
| |
|
| | |
| | file_element = clipitem_element.find("file") |
| | file_id_value = os.path.splitext(os.path.basename(video_path))[ |
| | 0] |
| | if file_element is not None: |
| | file_element.set("id", file_id_value) |
| | file_element.text = None |
| | else: |
| | |
| | ET.SubElement(clipitem_element, "file", id=file_id_value) |
| |
|
| | |
| | fielddominance_elem = clipitem_element.find("fielddominance") |
| | if fielddominance_elem is not None: |
| | clipitem_element.remove(fielddominance_elem) |
| |
|
| | |
| | link_elem_self = ET.SubElement(clipitem_element, "link") |
| | ET.SubElement(link_elem_self, "linkclipref").text = video_clip_item_id |
| | link_elem_audio1 = ET.SubElement(clipitem_element, "link") |
| | ET.SubElement(link_elem_audio1, "linkclipref").text = audio_clip_item_id_1 |
| | link_elem_audio2 = ET.SubElement(clipitem_element, "link") |
| | ET.SubElement(link_elem_audio2, "linkclipref").text = audio_clip_item_id_2 |
| |
|
| | |
| | if fielddominance_elem is not None: |
| | clipitem_element.append(fielddominance_elem) |
| |
|
| | track.append(clipitem_element) |
| | |
| | self.adjust_clipitem_start_end(track.findall('clipitem')) |
| | |
| | file_element = clipitem_element.find("file") |
| | if file_element is not None: |
| | video_file_elements.append(file_element) |
| |
|
| | |
| | video_clip_items_list = clip_items_for_predicted_label |
| | |
| | audio_track_left = self.create_audio_track([id_list[1] for id_list in link_ids_list], -1, link_ids_list, |
| | video_clip_items_list, video_file_elements) |
| | audio_track_right = self.create_audio_track([id_list[2] for id_list in link_ids_list], 1, link_ids_list, |
| | video_clip_items_list, video_file_elements) |
| |
|
| | audio = ET.SubElement(media, "audio") |
| | audio.append(audio_track_left) |
| | audio.append(audio_track_right) |
| |
|
| | |
| | output_path = os.path.join(os.path.dirname(video_path), f"{os.path.basename(video_path)}_predicted.xml") |
| | tree = ET.ElementTree(root) |
| | with open(output_path, 'wb') as f: |
| | tree.write(f, encoding='utf-8', xml_declaration=True) |
| | else: |
| | print(f"Predicted label for {video_path}: {predicted_label} (No clip items found)") |
| |
|
| | |
| | |
| | |
| |
|
| |
|