dikdimon's picture
Upload extensions using SD-Hub extension
c336648 verified
import os
import subprocess
import concurrent.futures
from datetime import timedelta
def convert_mp4_to_wav(mp4_file, wav_file):
cmd = 'ffmpeg -i {} -acodec pcm_s16le -ar 48000 {}'
if wav_file is None or len(wav_file) == 0:
wav_file = mp4_file.replace('.mp4', '.wav')
os.system(cmd.format(mp4_file, wav_file))
segment_duration = 30 * 60
output_pattern = f"{wav_file.replace('.wav', '')}_%03d.wav"
cmd = f'ffmpeg -i "{wav_file}" -f segment -segment_time {segment_duration} -c copy "{output_pattern}"'
os.system(cmd)
def duration_get(file_path):
ffprobe_command = f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 "{file_path}"'
result = subprocess.check_output(ffprobe_command, shell=True, universal_newlines=True)
duration = float(result)
print(f'{file_path} : {duration}')
return duration
def duration_sum(directory):
total_duration = 0
with concurrent.futures.ThreadPoolExecutor() as executor:
# Collect the futures
futures = []
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath):
try:
future = executor.submit(duration_get, filepath)
futures.append(future)
except (subprocess.CalledProcessError, ValueError):
continue
# Collect the results
for future in concurrent.futures.as_completed(futures):
try:
duration = future.result()
total_duration += duration
except Exception as e:
print(f"Error occurred: {e}")
return total_duration
def duration_split(input_dir, output_dir, num_parts=2, file_ext="wav"):
if not input_dir:
print("Input directory not provided.")
return
if not output_dir:
output_dir = os.path.join(input_dir, "output")
os.makedirs(output_dir, exist_ok=True)
for file_name in os.listdir(input_dir):
if file_name.endswith("." + file_ext):
file_path = os.path.join(input_dir, file_name)
duration = duration_get(file_path)
part_duration = duration / num_parts
print("[ * ] Duration:", duration)
print("[ * ] Number of Parts:", num_parts)
print("[ * ] Part Duration:", part_duration)
base_name = os.path.splitext(file_name)[0]
for i in range(num_parts):
part_output_file = os.path.join(output_dir, f"{base_name}_part{i + 1}.{file_ext}")
start_time = i * part_duration
ffmpeg_split_command = f'ffmpeg -i "{file_path}" -ss {start_time} -t {part_duration} -c copy "{part_output_file}"'
subprocess.call(ffmpeg_split_command, shell=True)