| from tqdm import tqdm |
| import subprocess |
| import numpy as np |
| from pathlib import Path |
| import librosa |
| from scipy.signal import ShortTimeFFT, resample |
| from scipy.signal.windows import dpss, hann |
| from numpy.typing import DTypeLike |
| from typing import List, Tuple, Optional, Union, Dict, Any, Callable |
| from i18n import _i18n |
|
|
| ffmpeg_path = "ffmpeg" |
| ffprobe_path = "ffprobe" |
| n_fft = 2048 |
| hop = 1024 |
|
|
| def average(*ints: Union[int, float]) -> float: |
| """ |
| Вычислить среднее арифметическое |
| |
| Args: |
| *ints: Числа для усреднения |
| |
| Returns: |
| Среднее значение |
| """ |
| numbers = len(ints) |
| return sum(ints) / numbers |
|
|
|
|
| def check_installed() -> None: |
| """Проверить наличие ffmpeg и ffprobe""" |
| try: |
| ffmpeg_version_output = subprocess.check_output( |
| [ffmpeg_path, "-version"], text=True |
| ) |
| print(_i18n("ffmpeg_found")) |
| except: |
| print(_i18n("ffmpeg_not_found")) |
|
|
| try: |
| ffprobe_version_output = subprocess.check_output( |
| [ffprobe_path, "-version"], text=True |
| ) |
| print(_i18n("ffprobe_found")) |
| except: |
| print(_i18n("ffprobe_not_found")) |
|
|
|
|
| def get_ogg_bitrate(sample_rate: int, channels: int = 2) -> int: |
| """ |
| Определяет рекомендуемый битрейт для OGG на основе частоты дискретизации |
| |
| Args: |
| sample_rate: Частота дискретизации |
| channels: Количество каналов |
| |
| Returns: |
| Рекомендуемый битрейт |
| """ |
| if sample_rate >= 40000: |
| per_channel = 240 |
| elif sample_rate >= 26000: |
| per_channel = 190 |
| elif sample_rate >= 15000: |
| per_channel = 90 |
| elif sample_rate >= 9000: |
| per_channel = 50 |
| elif sample_rate >= 8000: |
| per_channel = 42 |
| else: |
| per_channel = 30 |
| |
| return int(per_channel * channels) |
|
|
|
|
| SAMPLE_FORMATS_DICT: Dict[Union[str, type], str] = { |
| "int16": "s16le", |
| "int32": "s32le", |
| "float32": "f32le", |
| "float64": "f64le", |
| np.int16: "s16le", |
| np.int32: "s32le", |
| np.float32: "f32le", |
| np.float64: "f64le", |
| } |
|
|
| audio_formats: List[str] = [ |
| 'aac', 'ac3', 'ac4', 'adts', 'aiff', 'au', 'caf', 'dts', 'eac3', |
| 'flac', 'm4a', 'mp3', 'mp2', 'ogg', 'oga', 'opus', 'ra', 'raw', |
| 'snd', 'voc', 'wav', 'wma', 'wv' |
| ] |
|
|
| video_formats_with_audio: List[str] = [ |
| '3gp', '3g2', 'asf', 'avi', 'flv', 'f4v', 'm4v', 'mkv', 'mov', |
| 'mp4', 'mpeg', 'mpg', 'mts', 'mxf', 'ogv', 'rm', 'rmvb', 'ts', |
| 'vob', 'webm', 'wmv' |
| ] |
|
|
| input_formats: List[str] = video_formats_with_audio + audio_formats |
|
|
| output_formats: List[str] = [ |
| "mp3", "wav", "flac", "ogg", "opus", "m4a", "aac", "ac3", "aiff", "wma" |
| ] |
|
|
| input_extensions: List[str] = [f".{of}" for of in input_formats] |
|
|
| output_extensions: List[str] = [f".{of}" for of in output_formats] |
|
|
| codec_args: Dict[str, Dict[bool, List[str]]] = { |
| ".mp3": { |
| True: ["-c:a", "libmp3lame", "-sample_fmt", "fltp"], |
| False: ["-c:a", "libmp3lame", "-sample_fmt", "s16p"] |
| }, |
| ".wav": { |
| True: ["-c:a", "pcm_f32le", "-sample_fmt", "flt"], |
| False: ["-c:a", "pcm_s16le", "-sample_fmt", "s16"] |
| }, |
| ".flac": { |
| True: ["-c:a", "flac", "-sample_fmt", "s32"], |
| False: ["-c:a", "flac", "-sample_fmt", "s16"] |
| }, |
| ".ogg": { |
| True: ["-c:a", "libvorbis", "-sample_fmt", "fltp"], |
| False: ["-c:a", "libvorbis", "-sample_fmt", "fltp"] |
| }, |
| ".opus": { |
| True: ["-c:a", "libopus", "-sample_fmt", "flt"], |
| False: ["-c:a", "libopus", "-sample_fmt", "s16"] |
| }, |
| ".m4a": { |
| True: ["-c:a", "aac", "-sample_fmt", "fltp"], |
| False: ["-c:a", "aac", "-sample_fmt", "fltp"] |
| }, |
| ".aac": { |
| True: ["-c:a", "aac", "-sample_fmt", "fltp"], |
| False: ["-c:a", "aac", "-sample_fmt", "fltp"] |
| }, |
| ".ac3": { |
| True: ["-c:a", "ac3", "-sample_fmt", "fltp"], |
| False: ["-c:a", "ac3", "-sample_fmt", "fltp"] |
| }, |
| ".aiff": { |
| True: ["-c:a", "pcm_f32be", "-sample_fmt", "flt"], |
| False: ["-c:a", "pcm_s16be", "-sample_fmt", "s16"] |
| }, |
| ".wma": { |
| True: ["-c:a", "wmav2", "-sample_fmt", "fltp"], |
| False: ["-c:a", "wmav2", "-sample_fmt", "fltp"] |
| } |
| } |
|
|
| ensemble_types = ("avg_fft", "min_fft", "max_fft", "median_fft") |
|
|
| def get_codec_args(extension: str, prefer_float: bool) -> List[str]: |
| """ |
| Получить аргументы кодека для FFmpeg |
| |
| Args: |
| extension: Расширение файла |
| prefer_float: Предпочитать float формат |
| |
| Returns: |
| Список аргументов FFmpeg |
| """ |
| if extension not in codec_args: |
| return [] |
| return codec_args[extension][prefer_float] |
|
|
|
|
| allowed_chars: str = r"1234567890" |
|
|
|
|
| def sanitize_output(output: str) -> str: |
| """ |
| Очистить вывод от посторонних символов |
| |
| Args: |
| output: Выходная строка |
| |
| Returns: |
| Очищенная строка |
| """ |
| return "".join([char for char in output if char in allowed_chars]) |
|
|
|
|
| def get_sr(path: str | Path, stream: int = 0) -> int: |
| """ |
| Получить частоту дискретизации аудиофайла |
| |
| Args: |
| path: Путь к файлу |
| stream: Номер аудиопотока |
| |
| Returns: |
| Частота дискретизации |
| """ |
| path = Path(path) |
| cmd = [ffprobe_path, "-i", path.as_posix(), "-v", "quiet", "-hide_banner", |
| "-show_entries", "stream=sample_rate", "-select_streams", f"a:{stream}", |
| "-of", "compact=p=0:nk=1"] |
| process = subprocess.Popen( |
| cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
| ) |
| stdout, stderr = process.communicate() |
| sample_rate = stdout.decode('utf-8').strip() |
| sample_rate = sanitize_output(sample_rate) |
| if sample_rate.isdigit(): |
| return int(sample_rate) |
| else: |
| print(_i18n("sr_read_error", path=path)) |
| return 0 |
|
|
|
|
| def get_channels(path: str | Path, stream: int = 0) -> int: |
| """ |
| Получить количество каналов аудиофайла |
| |
| Args: |
| path: Путь к файлу |
| stream: Номер аудиопотока |
| |
| Returns: |
| Количество каналов |
| """ |
| path = Path(path) |
| cmd = [ffprobe_path, "-i", path.as_posix(), "-v", "quiet", "-hide_banner", |
| "-show_entries", "stream=channels", "-select_streams", f"a:{stream}", |
| "-of", "compact=p=0:nk=1"] |
| process = subprocess.Popen( |
| cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE |
| ) |
| stdout, stderr = process.communicate() |
| channels = stdout.decode('utf-8').strip() |
| channels = sanitize_output(channels) |
| if channels.isdigit(): |
| return int(channels) |
| else: |
| print(_i18n("channels_read_error", path=path)) |
| return 0 |
|
|
|
|
| def check(path: str | Path) -> bool: |
| """ |
| Проверить, является ли файл валидным аудио |
| |
| Args: |
| path: Путь к файлу |
| |
| Returns: |
| True если файл содержит аудио |
| """ |
| channels = get_channels(path) |
| sr = get_sr(path) |
| return channels != 0 and sr != 0 |
|
|
|
|
| def read( |
| path: str | Path, |
| sr: Optional[int] = None, |
| mono: bool = False, |
| dtype: DTypeLike = "float32", |
| multi_channel: bool = False, |
| num_channels: int = 2, |
| stream: int = 0, |
| flatten: bool = False |
| ) -> Tuple[np.ndarray, int]: |
| """ |
| Прочитать аудиофайл |
| |
| Args: |
| path: Путь к файлу |
| sr: Частота дискретизации |
| mono: Читать как моно |
| dtype: Тип данных |
| multi_channel: Многоканальный режим |
| num_channels: Количество каналов |
| stream: Номер аудиопотока |
| flatten: Вернуть плоский массив |
| |
| Returns: |
| Кортеж (аудиоданные, частота дискретизации) |
| """ |
| path = Path(path) |
| output_format = SAMPLE_FORMATS_DICT.get(dtype, None) |
| if not sr: |
| sr = get_sr(path, stream) |
| channels = 1 if mono else (get_channels(path, stream) if multi_channel else num_channels) |
| |
| if not output_format: |
| output_format = "f32le" |
| cmd = [ffmpeg_path, "-i", path.as_posix(), "-map", f"0:a:{stream}", "-vn", |
| "-f", output_format, "-ac", str(channels), "-ar", str(sr), "-"] |
| process = subprocess.Popen( |
| cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10**8 |
| ) |
| stdout, stderr = process.communicate() |
| y = np.frombuffer(stdout, dtype=np.float32) |
| y = convert_to_dtype(y, dtype) |
| else: |
| cmd = [ffmpeg_path, "-i", path.as_posix(), "-map", f"0:a:{stream}", "-vn", |
| "-f", output_format, "-ac", str(channels), "-ar", str(sr), "-"] |
| process = subprocess.Popen( |
| cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10**8 |
| ) |
| stdout, stderr = process.communicate() |
| y = np.frombuffer(stdout, dtype=dtype) |
| |
| if mono: |
| if flatten: |
| y = y.flatten() |
| else: |
| y = y.reshape((-1, 1)).T |
| else: |
| y = y.reshape((-1, channels)).T |
| |
| return y.copy(), sr |
|
|
|
|
| def multiread( |
| paths: Union[List[str | Path], Tuple[str | Path, ...]], |
| *args, |
| **kwargs |
| ) -> Tuple[List[np.ndarray], List[int]]: |
| """ |
| Прочитать несколько аудиофайлов |
| |
| Args: |
| paths: Список путей к файлам |
| *args: Аргументы для read |
| **kwargs: Именованные аргументы для read |
| |
| Returns: |
| Кортеж (список аудиоданных, список частот дискретизации) |
| """ |
| readed_files = [] |
| srs = [] |
| len_arrays = len(paths) |
| for path in tqdm(paths, desc=_i18n("multi_reading"), unit=_i18n("files")): |
| array, sr = read(path, *args, **kwargs) |
| readed_files.append(array) |
| srs.append(sr) |
| return readed_files, srs |
|
|
|
|
| def bitrate_to_int(a: Union[str, int, float]) -> int: |
| """ |
| Преобразовать битрейт в целое число |
| |
| Args: |
| a: Битрейт в виде строки или числа |
| |
| Returns: |
| Битрейт как целое число |
| """ |
| if isinstance(a, str): |
| if a.endswith(("k", "K")): |
| numeric_part = a[:-1] |
| if numeric_part.isdigit(): |
| return int(numeric_part) |
| else: |
| print(_i18n("invalid_bitrate", bitrate=a)) |
| return 320 |
| else: |
| if a.isdigit(): |
| return int(a) |
| else: |
| print(_i18n("invalid_bitrate", bitrate=a)) |
| return 320 |
| elif isinstance(a, (int, float)): |
| return int(a) |
| else: |
| return 320 |
|
|
|
|
| def get_info_array(y: np.ndarray) -> Tuple[int, int, Optional[int], bool]: |
| """ |
| Получить информацию об аудио массиве |
| |
| Args: |
| y: Аудио массив |
| |
| Returns: |
| Кортеж (количество каналов, количество сэмплов, индекс оси, флаг flatten) |
| """ |
| if y.ndim == 1: |
| flatten = True |
| channels = 1 |
| samples = len(y) |
| array_index = -1 |
| elif y.ndim == 2: |
| flatten = False |
| if y.shape[0] < y.shape[1]: |
| channels = y.shape[0] |
| samples = y.shape[1] |
| array_index = 1 |
| else: |
| channels = y.shape[1] |
| samples = y.shape[0] |
| array_index = 0 |
| else: |
| raise ValueError(_i18n("array_dim_error", axis=y.ndim)) |
| return channels, samples, array_index, flatten |
|
|
|
|
| def get_axis_from_array_index(index: int) -> int: |
| """ |
| Получить ось для операций на основе индекса массива |
| |
| Args: |
| index: Индекс массива |
| |
| Returns: |
| Номер оси |
| """ |
| if index == -1: |
| return -1 |
| elif index == 1: |
| return 0 |
| elif index == 0: |
| return 1 |
| else: |
| return -1 |
|
|
|
|
| def get_duration_from_array(y: np.ndarray, sr: Optional[int] = None) -> Union[float, int]: |
| """ |
| Получить длительность аудио из массива |
| |
| Args: |
| y: Аудио массив |
| sr: Частота дискретизации |
| |
| Returns: |
| Длительность в секундах или количество сэмплов |
| """ |
| len_samples: int = get_info_array(y)[1] |
| if sr is not None: |
| return len_samples / sr |
| else: |
| return len_samples |
|
|
|
|
| def is_float(y: np.ndarray) -> bool: |
| """ |
| Проверить, является ли массив float типом |
| |
| Args: |
| y: Аудио массив |
| |
| Returns: |
| True если тип float |
| """ |
| return np.issubdtype(y.dtype, np.floating) |
|
|
|
|
| def is_float_dtype(dtype: DTypeLike) -> bool: |
| """ |
| Проверить, является ли тип данных float |
| |
| Args: |
| dtype: Тип данных |
| |
| Returns: |
| True если тип float |
| """ |
| return np.issubdtype(dtype, np.floating) |
|
|
|
|
| def float_to_int(y: np.ndarray, dtype: DTypeLike) -> np.ndarray: |
| """ |
| Преобразовать float массив в целочисленный |
| |
| Args: |
| y: Float массив |
| dtype: Целевой тип данных |
| |
| Returns: |
| Целочисленный массив |
| """ |
| info = np.iinfo(dtype) |
| min_val = info.min |
| max_val = info.max |
| |
| if min_val < 0: |
| y_scaled = y * max_val |
| y_rounded = np.round(y_scaled) |
| y_clipped = np.clip(y_rounded, min_val, max_val) |
| return y_clipped.astype(dtype) |
| elif min_val == 0: |
| y_normalized = (y + 1) / 2 |
| y_scaled = y_normalized * max_val |
| y_rounded = np.round(y_scaled) |
| y_clipped = np.clip(y_rounded, 0, max_val) |
| return y_clipped.astype(dtype) |
| else: |
| raise ValueError(_i18n("unexpected_min_val", value=min_val)) |
|
|
|
|
| def int_to_int(y: np.ndarray, dtype: DTypeLike) -> np.ndarray: |
| """ |
| Преобразовать целочисленный массив в другой целочисленный тип |
| |
| Args: |
| y: Целочисленный массив |
| dtype: Целевой тип данных |
| |
| Returns: |
| Преобразованный массив |
| """ |
| info_dst = np.iinfo(dtype) |
| info_src = np.iinfo(y.dtype) |
| y_float = y.astype(np.float64) |
| src_range = info_src.max - info_src.min |
| dst_range = info_dst.max - info_dst.min |
| if src_range == 0: |
| return np.full_like(y, info_dst.min, dtype=dtype) |
| y_scaled = (y_float - info_src.min) * (dst_range / src_range) + info_dst.min |
| y_rounded = np.round(y_scaled) |
| y_clipped = np.clip(y_rounded, info_dst.min, info_dst.max) |
| return y_clipped.astype(dtype) |
|
|
|
|
| def int_to_float(y: np.ndarray, dtype: DTypeLike) -> np.ndarray: |
| """ |
| Преобразовать целочисленный массив в float |
| |
| Args: |
| y: Целочисленный массив |
| dtype: Целевой тип данных |
| |
| Returns: |
| Float массив |
| """ |
| info = np.iinfo(y.dtype) |
| if info.min == 0: |
| y_normalized = (y.astype(np.float64) + -int(average(info.min, info.max))) / info.max |
| elif info.min < 0: |
| abs_max = max(abs(info.min), abs(info.max)) |
| y_normalized = y.astype(np.float64) / abs_max |
| else: |
| raise ValueError(_i18n("unexpected_min_val", value=info.min)) |
| return y_normalized.astype(dtype) |
|
|
|
|
| def float_to_float(y: np.ndarray, dtype: DTypeLike) -> np.ndarray: |
| """ |
| Преобразовать float массив в другой float тип |
| |
| Args: |
| y: Float массив |
| dtype: Целевой тип данных |
| |
| Returns: |
| Преобразованный массив |
| """ |
| return y.astype(dtype) |
|
|
|
|
| def get_center_value_from_dtype(dtype: DTypeLike) -> int: |
| """ |
| Получить центральное значение для типа данных |
| |
| Args: |
| dtype: Тип данных |
| |
| Returns: |
| Центральное значение |
| """ |
| if is_float_dtype(dtype): |
| return 0 |
| else: |
| info = np.iinfo(dtype) |
| return int(average(info.min, info.max)) |
|
|
|
|
| def convert_to_dtype(y: np.ndarray, dtype: DTypeLike) -> np.ndarray: |
| """ |
| Преобразовать массив в указанный тип данных |
| |
| Args: |
| y: Входной массив |
| dtype: Целевой тип данных |
| |
| Returns: |
| Преобразованный массив |
| """ |
| if is_float(y): |
| if is_float_dtype(dtype): |
| return float_to_float(y, dtype) |
| else: |
| return float_to_int(y, dtype) |
| else: |
| if is_float_dtype(dtype): |
| return int_to_float(y, dtype) |
| else: |
| return int_to_int(y, dtype) |
|
|
|
|
| def dc_offset(y: np.ndarray, offset: Union[float, int]) -> np.ndarray: |
| """ |
| Добавить смещение постоянного тока |
| |
| Args: |
| y: Аудио массив |
| offset: Смещение |
| |
| Returns: |
| Массив со смещением |
| """ |
| orig_dtype = y.dtype |
| y = convert_to_dtype(y, np.float32) |
| y = y + offset |
| return convert_to_dtype(y, orig_dtype) |
|
|
|
|
| def gain(y: np.ndarray, gain_value: Union[float, int]) -> np.ndarray: |
| """ |
| Применить усиление к аудио |
| |
| Args: |
| y: Аудио массив |
| gain_value: Коэффициент усиления |
| |
| Returns: |
| Усиленный массив |
| """ |
| orig_dtype = y.dtype |
| y = convert_to_dtype(y, np.float32) |
| y = y * gain_value |
| return convert_to_dtype(y, orig_dtype) |
|
|
|
|
| def normalize(y: np.ndarray, target_peak: Union[float, int] = 1.0) -> np.ndarray: |
| """ |
| Нормализовать аудио по пиковому значению |
| |
| Args: |
| y: Аудио массив |
| target_peak: Целевое пиковое значение |
| |
| Returns: |
| Нормализованный массив |
| """ |
| orig_dtype = y.dtype |
| y = convert_to_dtype(y, np.float32) |
| current_peak = np.max(np.abs(y)) |
| if current_peak > 0: |
| scaling_factor = target_peak / current_peak |
| y = y * scaling_factor |
| return convert_to_dtype(y, orig_dtype) |
|
|
|
|
| def create_zero_array(samples: int, dtype: DTypeLike) -> np.ndarray: |
| """ |
| Создать массив нулей с центром для типа данных |
| |
| Args: |
| samples: Количество сэмплов |
| dtype: Тип данных |
| |
| Returns: |
| Массив нулей |
| """ |
| return np.array([get_center_value_from_dtype(dtype) for _c in range(samples)], dtype=dtype) |
|
|
|
|
| def split_channels(y: np.ndarray) -> Tuple[np.ndarray, ...]: |
| """ |
| Разделить многоканальное аудио на отдельные каналы |
| |
| Args: |
| y: Аудио массив |
| |
| Returns: |
| Кортеж массивов каналов |
| """ |
| channels, samples, array_index, flatten = get_info_array(y) |
| channels_arrays = [] |
| if not flatten: |
| if array_index == 1: |
| for ch in range(channels): |
| channels_arrays.append(y[ch, :]) |
| else: |
| for ch in range(channels): |
| channels_arrays.append(y[:, ch]) |
| return tuple(channels_arrays) |
| else: |
| return (y,) |
|
|
| def get_stft_obj(sr: int, n_fft: int, hop: int) -> ShortTimeFFT: |
| """ |
| Создает STFT с окном DPSS для сверхточного разделения частот |
| |
| Args: |
| sr: Частота дискретизации |
| n_fft: Размер FFT |
| hop: Шаг |
| |
| Returns: |
| Объект ShortTimeFFT |
| """ |
| win = hann(n_fft, sym=False) |
| return ShortTimeFFT(win, hop=hop, fs=sr, scale_to='magnitude', phase_shift=None) |
|
|
|
|
| def split_mid_side( |
| y: np.ndarray, |
| var: int = 1, |
| sr: Optional[int] = None |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Разделить стерео на Mid/Side |
| |
| Args: |
| y: Аудио массив |
| var: Вариант разделения (0-4) |
| sr: Частота дискретизации |
| |
| Returns: |
| Кортеж (mid, side) |
| """ |
| channels, samples, array_index, flatten = get_info_array(y) |
| axis = get_axis_from_array_index(array_index) |
| if channels != 2: |
| raise Exception(_i18n("stereo_required")) |
| orig_dtype = y.dtype |
| y = convert_to_dtype(y, np.float32) |
| channels_arrays = split_channels(y) |
| left_channel = channels_arrays[0] |
| right_channel = channels_arrays[1] |
| mid_channel_one = (left_channel * 0.5) + (right_channel * 0.5) |
| |
| if var == 0: |
| print(_i18n("mid_side_var0")) |
| side_channel = np.stack([(left_channel + -mid_channel_one), (right_channel + -mid_channel_one)], axis=axis) |
| mid_channel = y + -side_channel |
| elif var == 1: |
| print(_i18n("mid_side_var1")) |
| mid_channel = np.stack([mid_channel_one, mid_channel_one], axis=axis) |
| side_channel = y + -mid_channel |
| elif var == 2: |
| print(_i18n("mid_side_var2")) |
| same_sign = (left_channel * right_channel) > 0 |
| center_mono = np.where( |
| same_sign, |
| np.minimum(np.abs(left_channel), np.abs(right_channel)) * np.sign(left_channel), |
| 0.0 |
| ) |
| mid_channel = np.stack([center_mono, center_mono], axis=axis) |
| stereo_L = left_channel - center_mono |
| stereo_R = right_channel - center_mono |
| side_channel = np.stack([stereo_L, stereo_R], axis=axis) |
| elif var == 3: |
| print(_i18n("mid_side_var3")) |
| if not sr: |
| raise Exception(_i18n("sr_required")) |
| |
| sft = get_stft_obj(sr, n_fft=n_fft, hop=hop) |
| y_float = convert_to_dtype(y, np.float32) |
| channels = split_channels(y_float) |
| |
| |
| Lf = sft.stft(channels[0]) |
| Rf = sft.stft(channels[1]) |
| |
| |
| similarity_L = np.real(Lf * np.conj(Rf)) |
| similarity_R = np.real(Rf * np.conj(Lf)) |
| mask_l = similarity_L > 0 |
| mask_r = similarity_R > 0 |
| magL = np.abs(Lf) |
| magR = np.abs(Rf) |
|
|
| magC_L = np.minimum(magL, magR) * mask_l |
| magC_R = np.minimum(magL, magR) * mask_r |
|
|
| C_L = magC_L * np.exp(1j * np.angle(Rf)) |
| C_R = magC_R * np.exp(1j * np.angle(Lf)) |
| SL = Lf - C_L |
| SR = Rf - C_R |
| |
| len_orig = y.shape[-1] |
| center_l = sft.istft(C_L, k1=len_orig) |
| center_r = sft.istft(C_R, k1=len_orig) |
| side_l = sft.istft(SL, k1=len_orig) |
| side_r = sft.istft(SR, k1=len_orig) |
| |
| mid_ch = multi_channel_array_from_arrays(center_l, center_r, index=array_index, dtype=y.dtype) |
| side_ch = multi_channel_array_from_arrays(side_l, side_r, index=array_index, dtype=y.dtype) |
| |
| return mid_ch, side_ch |
| elif var == 4: |
| print(_i18n("mid_side_var4")) |
| mid_channel = mid_channel_one |
| side_channel = left_channel + -right_channel |
| else: |
| raise ValueError(_i18n("unknown_var", var=var)) |
| |
| return convert_to_dtype(mid_channel, orig_dtype), convert_to_dtype(side_channel, orig_dtype) |
|
|
|
|
| def mid_side_to_stereo( |
| y: np.ndarray, |
| z: np.ndarray, |
| index: int = -1, |
| dtype: DTypeLike = np.float32 |
| ) -> np.ndarray: |
| """ |
| Преобразовать Mid/Side обратно в стерео |
| |
| Args: |
| y: Mid канал |
| z: Side канал |
| index: Индекс оси |
| dtype: Тип данных |
| |
| Returns: |
| Стерео массив |
| """ |
| y, z = convert_to_dtype(y, np.float32), convert_to_dtype(z, np.float32) |
| mid = multi_channel_array_from_arrays(y, y, index=index, dtype=np.float32) |
| side = multi_channel_array_from_arrays(z, -z, index=index, dtype=np.float32) |
| return convert_to_dtype(mid + side, dtype) |
|
|
|
|
| def mono_to_stereo( |
| y: np.ndarray, |
| index: int, |
| num_channels: int = 2 |
| ) -> np.ndarray: |
| """ |
| Преобразовать моно в стерео |
| |
| Args: |
| y: Моно массив |
| index: Индекс оси |
| num_channels: Количество каналов |
| |
| Returns: |
| Стерео массив |
| """ |
| channels, samples, array_index, flatten = get_info_array(y) |
| axis = get_axis_from_array_index(array_index) |
| new_axis = get_axis_from_array_index(index) |
| orig_dtype = y.dtype |
| if channels == 1: |
| if flatten: |
| return np.stack([y for _c in range(num_channels)], axis=new_axis, dtype=orig_dtype) |
| else: |
| return np.stack([y.flatten() for _c in range(num_channels)], axis=new_axis, dtype=orig_dtype) |
| else: |
| if num_channels <= channels: |
| return y |
| else: |
| for _i in range(num_channels - channels): |
| y = np.append(y, create_zero_array(samples, orig_dtype), axis=new_axis) |
| return y |
|
|
|
|
| def stereo_to_mono(y: np.ndarray, to_flatten: bool = False) -> np.ndarray: |
| channels, samples, array_index, flatten = get_info_array(y) |
| orig_dtype = y.dtype |
| y = convert_to_dtype(y, np.float32) |
| |
| if channels > 1: |
| mono = create_zero_array(samples, np.float64) |
| for ch in split_channels(y): |
| mono = mono + gain(ch, (1 / channels)) |
| |
| if not to_flatten: |
| |
| if array_index == 0: |
| return convert_to_dtype(mono.reshape(-1, 1), orig_dtype) |
| else: |
| return convert_to_dtype(mono.reshape(1, -1), orig_dtype) |
| else: |
| return convert_to_dtype(mono, orig_dtype) |
| else: |
| if to_flatten and not flatten: |
| return convert_to_dtype(y.flatten(), orig_dtype) |
| elif not to_flatten and flatten: |
| if array_index == 0: |
| return convert_to_dtype(y.reshape(-1, 1), orig_dtype) |
| else: |
| return convert_to_dtype(y.reshape(1, -1), orig_dtype) |
| else: |
| return convert_to_dtype(y, orig_dtype) |
|
|
|
|
| def multi_channel_array_from_arrays( |
| *arrays: np.ndarray, |
| index: int = -1, |
| dtype: DTypeLike |
| ) -> np.ndarray: |
| """ |
| Создать многоканальный массив из отдельных каналов |
| |
| Args: |
| *arrays: Массивы каналов |
| index: Индекс оси |
| dtype: Тип данных |
| |
| Returns: |
| Многоканальный массив |
| """ |
| return np.stack([convert_to_dtype(array, dtype) for array in arrays], |
| axis=get_axis_from_array_index(index), |
| dtype=dtype) |
|
|
|
|
| def reshape(y: np.ndarray, shape: Tuple[str, ...] = ("channels", "samples")) -> np.ndarray: |
| """ |
| Изменить форму аудио массива |
| |
| Args: |
| y: Аудио массив |
| shape: Целевая форма |
| |
| Returns: |
| Измененный массив |
| """ |
| channels, samples, array_index, flatten = get_info_array(y) |
| |
| if shape == ("channels", "samples"): |
| if array_index == 0: |
| return y.T |
| elif array_index == 1: |
| return y |
| elif array_index is None and flatten: |
| return y.reshape((-1, 1)).T |
| else: |
| if y.shape[0] == channels: |
| return y |
| else: |
| return y.T |
| |
| elif shape == ("samples", "channels"): |
| if array_index == 1: |
| return y.T |
| elif array_index == 0: |
| return y |
| elif array_index == -1 and flatten: |
| return y.reshape((-1, 1)) |
| else: |
| if y.shape[0] == samples: |
| return y |
| else: |
| return y.T |
| |
| elif shape == ("samples",): |
| if channels == 1 and not flatten: |
| return y.flatten() |
| elif flatten: |
| return y |
| else: |
| return stereo_to_mono(y, to_flatten=True) |
| |
| else: |
| raise ValueError(f"{_i18n('unknown_shape')}: {shape}") |
|
|
|
|
| def easy_resampler(y: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray: |
| """ |
| Простой ресемплинг аудио |
| |
| Args: |
| y: Аудио массив |
| orig_sr: Исходная частота |
| target_sr: Целевая частота |
| |
| Returns: |
| Ресемплированный массив |
| """ |
| channels, samples, array_index, flatten = get_info_array(y) |
| orig_dtype = y.dtype |
| y = convert_to_dtype(y, np.float32) |
| resampled = librosa.resample( |
| y, |
| orig_sr=orig_sr, |
| target_sr=target_sr, |
| ) |
| return convert_to_dtype(resampled, orig_dtype) |
|
|
|
|
| def add_zero_to_end(y: np.ndarray, max_samples: int) -> np.ndarray: |
| """ |
| Добавить нули в конец массива до указанной длины |
| |
| Args: |
| y: Аудио массив |
| max_samples: Максимальное количество сэмплов |
| |
| Returns: |
| Дополненный массив |
| """ |
| channels, samples, array_index, flatten = get_info_array(y) |
| center_value = get_center_value_from_dtype(y.dtype) |
| if samples < max_samples: |
| if flatten: |
| pad_width = (0, max_samples - samples) |
| else: |
| if array_index == 1: |
| pad_width = ((0, 0), (0, max_samples - samples)) |
| else: |
| pad_width = ((0, max_samples - samples), (0, 0)) |
| return np.pad(y, pad_width, mode="constant", constant_values=center_value) |
| else: |
| return trim(y, 0, max_samples) |
|
|
|
|
| def fit_arrays( |
| arrays: Union[Tuple[np.ndarray, ...], List[np.ndarray]], |
| srs: Union[Tuple[int, ...], List[int]], |
| max_channels: int = 2, |
| min_sr: int = 44100, |
| flatten: bool = False, |
| extend: bool = True |
| ) -> Tuple[np.ndarray, ...]: |
| """ |
| Привести несколько массивов к единому формату |
| |
| Args: |
| arrays: Список массивов |
| srs: Список частот дискретизации |
| max_channels: Максимальное количество каналов |
| min_sr: Минимальная частота дискретизации |
| flatten: Вернуть плоские массивы |
| extend: Дополнить до максимальной длины |
| |
| Returns: |
| Кортеж приведенных массивов |
| """ |
| if len(arrays) != len(srs): |
| raise Exception(_i18n("arrays_srs_mismatch")) |
| |
| new_arrays = [] |
| |
| arrays_with_srs = list(zip(arrays, srs)) |
| len_arrays = len(arrays_with_srs) |
| same_sr = len(set(srs)) <= 1 |
|
|
| for (array, sr) in tqdm(arrays_with_srs, desc=_i18n("fitting_progress"), unit=_i18n("arrays")): |
| channels1, samples1, array_index1, _c = get_info_array(array) |
| if same_sr and sr == min_sr: |
| a1 = array |
| else: |
| a1 = easy_resampler(array, sr, min_sr) |
| if flatten: |
| a1 = stereo_to_mono(a1, to_flatten=True) |
| else: |
| if max_channels >= 2: |
| a1 = mono_to_stereo(a1, array_index1, max_channels) |
| else: |
| a1 = stereo_to_mono(a1) |
| a1 = reshape(a1, shape=("channels", "samples")) |
| new_arrays.append(a1) |
|
|
| durations = [get_duration_from_array(array) for array in new_arrays] |
| max_samples = max(durations) if durations else 0 |
|
|
| if extend: |
| for i, array_ in tqdm(enumerate(new_arrays), desc=_i18n("extending_progress"), unit=_i18n("arrays")): |
| new_arrays[i] = add_zero_to_end(array_, max_samples) |
| |
| return tuple(new_arrays) |
|
|
|
|
| def subtractor( |
| y: np.ndarray, |
| z: np.ndarray, |
| sr1: int, |
| sr2: int, |
| spectrogram: bool = False |
| ) -> Tuple[np.ndarray, int]: |
| """ |
| Вычесть одно аудио из другого |
| |
| Args: |
| y: Первое аудио |
| z: Второе аудио |
| sr1: Частота первого |
| sr2: Частота второго |
| spectrogram: Использовать спектрограмму |
| |
| Returns: |
| Кортеж (результат, частота дискретизации) |
| """ |
| channels1, _, array_index1, flatten1 = get_info_array(y) |
| channels2, _, array_index2, flatten2 = get_info_array(z) |
| orig_dtype1 = y.dtype |
| y = convert_to_dtype(y, np.float32) |
| z = convert_to_dtype(z, np.float32) |
| max_channels = max(channels1, channels2) |
| min_sr = min(sr1, sr2) |
| yz = fit_arrays([y, z], [sr1, sr2], max_channels=max_channels, min_sr=min_sr) |
| y, z = yz[0], yz[1] |
| |
| if spectrogram: |
| print(_i18n("subtract_spectrogram")) |
| sft = get_stft_obj(min_sr, n_fft=n_fft, hop=hop) |
| res_channels = [] |
| |
| |
| for ch_y, ch_z in zip(split_channels(y), split_channels(z)): |
| spec_y = sft.stft(ch_y.astype(np.float32)) |
| spec_z = sft.stft(ch_z.astype(np.float32)) |
| |
| |
| |
| res_spec = np.maximum(np.abs(spec_y) - np.abs(spec_z), 0) * np.exp(1j * np.angle(spec_y)) |
| |
| del spec_y, spec_z |
| |
| res_wav = sft.istft(res_spec, k1=ch_y.shape[-1]) |
| res_channels.append(res_wav) |
| |
| subtracted = multi_channel_array_from_arrays(*res_channels, index=1, dtype=orig_dtype1) |
| return subtracted, min_sr |
| else: |
| print(_i18n("subtract_phase")) |
| return convert_to_dtype(y - z, orig_dtype1), min_sr |
|
|
|
|
| def absmax(a: np.ndarray, *, axis: Optional[int] = None) -> np.ndarray: |
| """ |
| Получить элемент с максимальным абсолютным значением |
| |
| Args: |
| a: Входной массив |
| axis: Ось |
| |
| Returns: |
| Элемент с максимальным абсолютным значением |
| """ |
| if axis is None: |
| return a.flatten()[np.argmax(np.abs(a))] |
| dims = list(a.shape) |
| dims.pop(axis) |
| indices = np.ogrid[tuple(slice(0, d) for d in dims)] |
| argmax = np.abs(a).argmax(axis=axis) |
| indices = list(indices) |
| indices.insert(axis % len(a.shape), argmax) |
| return a[tuple(indices)] |
|
|
|
|
| def absmin(a: np.ndarray, *, axis: Optional[int] = None) -> np.ndarray: |
| """ |
| Получить элемент с минимальным абсолютным значением |
| |
| Args: |
| a: Входной массив |
| axis: Ось |
| |
| Returns: |
| Элемент с минимальным абсолютным значением |
| """ |
| if axis is None: |
| return a.flatten()[np.argmin(np.abs(a))] |
| dims = list(a.shape) |
| dims.pop(axis) |
| indices = np.ogrid[tuple(slice(0, d) for d in dims)] |
| argmax = np.abs(a).argmin(axis=axis) |
| indices.insert((len(a.shape) + axis) % len(a.shape), argmax) |
| return a[tuple(indices)] |
|
|
|
|
| def lambda_max( |
| arr: np.ndarray, |
| axis: Optional[int] = None, |
| key: Optional[Callable] = None, |
| keepdims: bool = False |
| ) -> np.ndarray: |
| """ |
| Применить функцию максимума с ключом |
| |
| Args: |
| arr: Входной массив |
| axis: Ось |
| key: Функция ключа |
| keepdims: Сохранить размерность |
| |
| Returns: |
| Результат |
| """ |
| if key is None: |
| key = np.abs |
| idxs = np.argmax(key(arr), axis) |
| if axis is not None: |
| idxs = np.expand_dims(idxs, axis) |
| result = np.take_along_axis(arr, idxs, axis) |
| if not keepdims: |
| result = np.squeeze(result, axis=axis) |
| return result |
| else: |
| return arr.flatten()[idxs] |
|
|
|
|
| def lambda_min( |
| arr: np.ndarray, |
| axis: Optional[int] = None, |
| key: Optional[Callable] = None, |
| keepdims: bool = False |
| ) -> np.ndarray: |
| """ |
| Применить функцию минимума с ключом |
| |
| Args: |
| arr: Входной массив |
| axis: Ось |
| key: Функция ключа |
| keepdims: Сохранить размерность |
| |
| Returns: |
| Результат |
| """ |
| if key is None: |
| key = np.abs |
| idxs = np.argmin(key(arr), axis) |
| if axis is not None: |
| idxs = np.expand_dims(idxs, axis) |
| result = np.take_along_axis(arr, idxs, axis) |
| if not keepdims: |
| result = np.squeeze(result, axis=axis) |
| return result |
| else: |
| return arr.flatten()[idxs] |
|
|
|
|
| def ensemble( |
| pred_tracks: List[np.ndarray], |
| srs: List[int], |
| ensemble_type: str = ensemble_types[0], |
| weights: List[float] = [], |
| dtype: np.dtype = np.float32, |
| disable_progress: bool = False, |
| ) -> Tuple[np.ndarray, int]: |
| """ |
| Создать ансамбль из нескольких предсказаний |
| |
| Args: |
| pred_tracks: Список предсказаний (ожидается форма [channels, samples]) |
| srs: Список частот дискретизации |
| ensemble_type: Алгоритм объединения ('avg_fft', 'min_fft', 'max_fft', 'median_fft') |
| weights: Веса для avg_fft |
| dtype: Тип данных |
| disable_progress: Отключить отображение прогресса |
| |
| Returns: |
| Кортеж (результат, частота дискретизации) |
| """ |
| if ensemble_type == "min_fft": |
| result_sr = int(min(srs)) |
| else: |
| result_sr = int(max(srs)) |
|
|
| if ensemble_type == "avg_fft": |
| if weights: |
| if len(weights) > len(pred_tracks): |
| weights = weights[:len(pred_tracks)] |
| elif len(weights) < len(pred_tracks): |
| weights = weights + [1.0] * (len(pred_tracks) - len(weights)) |
| else: |
| weights = [1.0] * len(pred_tracks) |
| total_weight = sum(weights) |
| |
| |
| pred_tracks = list(fit_arrays(pred_tracks, srs, max_channels=2, min_sr=result_sr)) |
| |
| sft = get_stft_obj(result_sr, n_fft=2048, hop=1024) |
| final_length = pred_tracks[0].shape[-1] |
| |
| |
| if ensemble_type == "avg_fft": |
| left_accumulator = None |
| right_accumulator = None |
| elif ensemble_type in ["min_fft", "max_fft", "median_fft"]: |
| left_accumulator = [] |
| right_accumulator = [] |
| |
| |
| with tqdm( |
| total=len(pred_tracks), |
| desc=_i18n("ensemble_processing"), |
| unit=_i18n("track"), |
| disable=disable_progress, |
| leave=False |
| ) as pbar: |
| |
| for i, track in enumerate(pred_tracks): |
| |
| spec_left = sft.stft(convert_to_dtype(track[0], np.float32)) |
| spec_right = sft.stft(convert_to_dtype(track[1], np.float32)) |
| |
| if ensemble_type == "avg_fft": |
| weighted_left = spec_left * weights[i] |
| weighted_right = spec_right * weights[i] |
| |
| if left_accumulator is None: |
| left_accumulator = weighted_left |
| right_accumulator = weighted_right |
| else: |
| left_accumulator += weighted_left |
| right_accumulator += weighted_right |
| |
| elif ensemble_type in ["min_fft", "max_fft", "median_fft"]: |
| left_accumulator.append(spec_left) |
| right_accumulator.append(spec_right) |
| |
| del spec_left, spec_right |
| pbar.update(1) |
| |
| |
| if ensemble_type == "avg_fft": |
| left_res_spec = left_accumulator / total_weight |
| right_res_spec = right_accumulator / total_weight |
| |
| elif ensemble_type == "median_fft": |
| |
| left_real = np.real(left_accumulator) |
| left_imag = np.imag(left_accumulator) |
| right_real = np.real(right_accumulator) |
| right_imag = np.imag(right_accumulator) |
| |
| left_res_spec = np.median(left_real, axis=0) + 1j * np.median(left_imag, axis=0) |
| right_res_spec = np.median(right_real, axis=0) + 1j * np.median(right_imag, axis=0) |
| |
| elif ensemble_type == "min_fft": |
| left_res_spec = lambda_min(np.array(left_accumulator), axis=0, key=np.abs) |
| right_res_spec = lambda_min(np.array(right_accumulator), axis=0, key=np.abs) |
| |
| elif ensemble_type == "max_fft": |
| left_res_spec = absmax(np.array(left_accumulator), axis=0) |
| right_res_spec = absmax(np.array(right_accumulator), axis=0) |
| |
| else: |
| raise ValueError(_i18n("unknown_etype", alg=ensemble_type)) |
| |
| |
| left_channel = sft.istft(left_res_spec, k1=final_length) |
| right_channel = sft.istft(right_res_spec, k1=final_length) |
| |
| |
| result = multi_channel_array_from_arrays(left_channel, right_channel, index=1, dtype=dtype) |
| |
| return result, result_sr |
|
|
|
|
| def concatenate( |
| arrays: Union[Tuple[np.ndarray, ...], List[np.ndarray]], |
| srs: Union[Tuple[int, ...], List[int]], |
| dtype=np.float32 |
| ) -> Tuple[np.ndarray, int]: |
| """ |
| Склеить несколько аудио массивов |
| |
| Args: |
| arrays: Список массивов |
| srs: Список частот дискретизации |
| dtype: Тип данных |
| |
| Returns: |
| Кортеж (результат, частота дискретизации) |
| """ |
| max_sr = int(max(*srs)) |
| arrayss = fit_arrays([convert_to_dtype(array, np.float64) for array in arrays], |
| srs, max_channels=2, min_sr=max_sr, extend=False) |
| result = np.concatenate(arrayss, axis=1, dtype=np.float64) |
| print(_i18n("concatenate_complete")) |
| return convert_to_dtype(result, dtype), max_sr |
|
|
|
|
| def trim(y: np.ndarray, start: int = 0, end: int = -1) -> np.ndarray: |
| """ |
| Обрезать аудио массив |
| |
| Args: |
| y: Аудио массив |
| start: Начальная позиция |
| end: Конечная позиция |
| |
| Returns: |
| Обрезанный массив |
| """ |
| channels, samples, array_index, flatten = get_info_array(y) |
| end_index = samples |
| _end = end if end > 0 and end <= end_index else end_index |
| if flatten: |
| return y[start:_end] |
| elif array_index == 0: |
| return y[start:_end, :] |
| elif array_index == 1: |
| return y[:, start:_end] |
| else: |
| return y |
|
|
|
|
| def reverse(y: np.ndarray) -> np.ndarray: |
| """ |
| Перевернуть аудио массив |
| |
| Args: |
| y: Аудио массив |
| |
| Returns: |
| Перевернутый массив |
| """ |
| channels, samples, array_index, flatten = get_info_array(y) |
| if flatten: |
| return np.flip(y) |
| else: |
| return np.flip(y, axis=array_index) |
|
|
|
|
| def mix_arrays( |
| arrays: list[np.ndarray], |
| srs: list[int], |
| target_sr: int, |
| index: int = -1, |
| dtype: DTypeLike = np.float32 |
| ) -> Tuple[np.ndarray, int]: |
| """ |
| Смешать несколько аудио массивов (сложение с нормализацией) |
| |
| Args: |
| arrays: Список массивов для смешивания |
| srs: Список частот дискретизации |
| target_sr: Целевая частота дискретизации |
| index: Индекс оси для выходного массива |
| dtype: Тип данных |
| |
| Returns: |
| Кортеж (смешанный массив, частота дискретизации) |
| """ |
| if len(arrays) != len(srs): |
| raise Exception(_i18n("arrays_srs_mismatch")) |
| |
| if len(arrays) == 0: |
| raise Exception(_i18n("no_arrays_to_mix")) |
| |
| |
| arrays_float = [convert_to_dtype(array, np.float32) for array in arrays] |
| |
| |
| |
| max_channels = 2 |
| for array in arrays_float: |
| channels, _, _, _ = get_info_array(array) |
| if channels > max_channels: |
| max_channels = channels |
| |
| |
| fitted_arrays = list(fit_arrays( |
| arrays_float, |
| srs, |
| max_channels=max_channels, |
| min_sr=target_sr, |
| extend=True |
| )) |
| |
| |
| mixed = None |
| num_arrays = len(fitted_arrays) |
| |
| for array in fitted_arrays: |
| if mixed is None: |
| mixed = array.copy() |
| else: |
| mixed = mixed + array |
| |
| |
| |
| mixed = mixed / num_arrays |
| |
| |
| max_peak = np.max(np.abs(mixed)) |
| if max_peak > 0.95: |
| mixed = mixed * (0.95 / max_peak) |
| |
| |
| result = convert_to_dtype(mixed, dtype) |
| |
| |
| if index != -1: |
| channels, samples, _, flatten = get_info_array(result) |
| if not flatten: |
| if index == 0: |
| result = result.T |
| elif index == 1: |
| pass |
| else: |
| |
| result = reshape(result, shape=("channels", "samples")) |
| |
| print(_i18n("mix_complete", count=num_arrays)) |
| return result, target_sr |
|
|
|
|
| def write( |
| path: str | Path, |
| y: np.ndarray, |
| sr: int, |
| bitrate: Union[int, str] = 320, |
| prefer_float: bool = False |
| ) -> str: |
| """ |
| Записать аудио в файл |
| |
| Args: |
| path: Путь для сохранения |
| y: Аудио массив |
| sr: Частота дискретизации |
| bitrate: Битрейт |
| prefer_float: Предпочитать float формат |
| |
| Returns: |
| Путь к сохраненному файлу (Posix-вариант) |
| """ |
| output_path = Path(path) |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| |
| if not sr: |
| raise Exception(_i18n("sr_required")) |
| |
| sr: int = int(sr) |
|
|
| dtype = y.dtype |
| channels, *_ = get_info_array(y) |
| y = reshape(y, shape=("samples", "channels")) |
| |
| sample_format = SAMPLE_FORMATS_DICT.get(str(dtype), None) |
| if not sample_format: |
| sample_format = "f32le" |
| y = convert_to_dtype(y, np.float32) |
| |
| y = np.nan_to_num(y, nan=0, posinf=0, neginf=0) |
| |
| bitrate_val = bitrate_to_int(bitrate) |
| if output_path.suffix == ".ogg": |
| max_bitrate = get_ogg_bitrate(sr, channels) |
| if bitrate_val > max_bitrate: |
| bitrate_val = max_bitrate |
| elif output_path.suffix == ".opus": |
| max_bitrate = 256 * channels |
| if bitrate_val > max_bitrate: |
| bitrate_val = max_bitrate |
| |
| bitrate_fixed = 32 if bitrate_val < 32 else 320 if bitrate_val > 320 else bitrate_val |
|
|
| output_path_str = output_path.as_posix() |
|
|
| cmd = [ffmpeg_path, "-y", "-f", sample_format, "-ar", str(sr), "-ac", str(channels), |
| "-i", "-", *get_codec_args(output_path.suffix, prefer_float), "-ab", f"{bitrate_fixed}k", output_path_str] |
|
|
| process = subprocess.Popen( |
| cmd, |
| stdin=subprocess.PIPE, |
| stdout=None, |
| stderr=subprocess.PIPE, |
| bufsize=10**8 |
| ) |
|
|
| try: |
| stdout_data, stderr_data = process.communicate(input=y.tobytes()) |
| |
| if process.returncode != 0: |
| error_msg = stderr_data.decode('utf-8', errors='ignore') |
| print(_i18n("ffmpeg_error", error=error_msg)) |
| raise Exception(_i18n("ffmpeg_exit_code", code=process.returncode)) |
| |
| except Exception as e: |
| print(_i18n("write_critical_error", error=str(e))) |
| process.kill() |
| raise e |
|
|
| return output_path_str |
|
|
|
|
| def multiwrite( |
| arrays: Union[Tuple[np.ndarray, ...], List[np.ndarray]], |
| srs: Union[Tuple[int, ...], List[int]], |
| paths: Union[Tuple[str | Path, ...], List[str | Path]], |
| bitrate: Union[int, str] = 320, |
| prefer_float: bool = False, |
| callable_func: Optional[Callable] = None, |
| strict: bool = False |
| ) -> Tuple[str, ...]: |
| """ |
| Записать несколько аудио массивов в файлы |
| |
| Args: |
| arrays: Список массивов |
| srs: Список частот дискретизации |
| paths: Список путей для сохранения |
| bitrate: Битрейт |
| prefer_float: Предпочитать float формат |
| callable_func: Функция обратного вызова |
| strict: Строгий режим |
| |
| Returns: |
| Кортеж сохраненных путей |
| """ |
| saved_paths = [] |
| exceptions = [] |
| |
| if len(arrays) == len(srs) == len(paths): |
| save_arrays = list(zip(arrays, srs, paths)) |
| for array, sr, path in tqdm(save_arrays, desc=_i18n("multi_writing"), unit=_i18n("arrays")): |
| if callable_func is not None: |
| callable_func(path) |
| try: |
| saved_paths.append(write(path, array, sr, bitrate=bitrate, prefer_float=prefer_float)) |
| except Exception as e: |
| if strict: |
| raise Exception(str(e)) |
| else: |
| print(_i18n("write_error", error=str(e))) |
| exceptions.append(str(e)) |
| |
| if not saved_paths: |
| exceptions_str = '\n'.join(exceptions) |
| raise Exception(_i18n("no_files_written", errors=exceptions_str)) |
| |
| return tuple(saved_paths) |
|
|
| def get_audio_files_from_list(input_paths: Union[str | Path, List[str | Path]], only_files: bool = False) -> List[str]: |
| """ |
| Получить список аудиофайлов из переданных путей |
| |
| Args: |
| input_paths: Путь к файлу или директории или список путей |
| only_files: Только файлы (не рекурсивно) |
| |
| Returns: |
| Список путей к аудиофайлам |
| """ |
| input_list: List[str] = [] |
| |
| if isinstance(input_paths, (str, Path)): |
| input_paths = [input_paths] |
| |
| for p_str in input_paths: |
| p = Path(p_str) |
| |
| if p.is_dir(): |
| if not only_files: |
| for file in p.rglob('*'): |
| if file.is_file() and check(file): |
| input_list.append(p.as_posix()) |
| elif p.is_file(): |
| if check(p): |
| input_list.append(p.as_posix()) |
|
|
| return input_list |