| from __future__ import division |
|
|
| import json |
| import os |
| import re |
| import sys |
| from subprocess import Popen, PIPE |
| from math import log, ceil |
| from tempfile import TemporaryFile |
| from warnings import warn |
| from functools import wraps |
|
|
| try: |
| import audioop |
| except ImportError: |
| import pyaudioop as audioop |
|
|
| if sys.version_info >= (3, 0): |
| basestring = str |
|
|
| FRAME_WIDTHS = { |
| 8: 1, |
| 16: 2, |
| 32: 4, |
| } |
| ARRAY_TYPES = { |
| 8: "b", |
| 16: "h", |
| 32: "i", |
| } |
| ARRAY_RANGES = { |
| 8: (-0x80, 0x7f), |
| 16: (-0x8000, 0x7fff), |
| 32: (-0x80000000, 0x7fffffff), |
| } |
|
|
|
|
| def get_frame_width(bit_depth): |
| return FRAME_WIDTHS[bit_depth] |
|
|
|
|
| def get_array_type(bit_depth, signed=True): |
| t = ARRAY_TYPES[bit_depth] |
| if not signed: |
| t = t.upper() |
| return t |
|
|
|
|
| def get_min_max_value(bit_depth): |
| return ARRAY_RANGES[bit_depth] |
|
|
|
|
| def _fd_or_path_or_tempfile(fd, mode='w+b', tempfile=True): |
| close_fd = False |
| if fd is None and tempfile: |
| fd = TemporaryFile(mode=mode) |
| close_fd = True |
|
|
| if isinstance(fd, basestring): |
| fd = open(fd, mode=mode) |
| close_fd = True |
|
|
| try: |
| if isinstance(fd, os.PathLike): |
| fd = open(fd, mode=mode) |
| close_fd = True |
| except AttributeError: |
| |
| |
| pass |
|
|
| return fd, close_fd |
|
|
|
|
| def db_to_float(db, using_amplitude=True): |
| """ |
| Converts the input db to a float, which represents the equivalent |
| ratio in power. |
| """ |
| db = float(db) |
| if using_amplitude: |
| return 10 ** (db / 20) |
| else: |
| return 10 ** (db / 10) |
|
|
|
|
| def ratio_to_db(ratio, val2=None, using_amplitude=True): |
| """ |
| Converts the input float to db, which represents the equivalent |
| to the ratio in power represented by the multiplier passed in. |
| """ |
| ratio = float(ratio) |
|
|
| |
| if val2 is not None: |
| ratio = ratio / val2 |
|
|
| |
| if ratio == 0: |
| return -float('inf') |
|
|
| if using_amplitude: |
| return 20 * log(ratio, 10) |
| else: |
| return 10 * log(ratio, 10) |
|
|
|
|
| def register_pydub_effect(fn, name=None): |
| """ |
| decorator for adding pydub effects to the AudioSegment objects. |
| example use: |
| @register_pydub_effect |
| def normalize(audio_segment): |
| ... |
| or you can specify a name: |
| @register_pydub_effect("normalize") |
| def normalize_audio_segment(audio_segment): |
| ... |
| """ |
| if isinstance(fn, basestring): |
| name = fn |
| return lambda fn: register_pydub_effect(fn, name) |
|
|
| if name is None: |
| name = fn.__name__ |
|
|
| from .audio_segment import AudioSegment |
| setattr(AudioSegment, name, fn) |
| return fn |
|
|
|
|
| def make_chunks(audio_segment, chunk_length): |
| """ |
| Breaks an AudioSegment into chunks that are <chunk_length> milliseconds |
| long. |
| if chunk_length is 50 then you'll get a list of 50 millisecond long audio |
| segments back (except the last one, which can be shorter) |
| """ |
| number_of_chunks = ceil(len(audio_segment) / float(chunk_length)) |
| return [audio_segment[i * chunk_length:(i + 1) * chunk_length] |
| for i in range(int(number_of_chunks))] |
|
|
|
|
| def which(program): |
| """ |
| Mimics behavior of UNIX which command. |
| """ |
| |
| if os.name == "nt" and not program.endswith(".exe"): |
| program += ".exe" |
|
|
| envdir_list = [os.curdir] + os.environ["PATH"].split(os.pathsep) |
|
|
| for envdir in envdir_list: |
| program_path = os.path.join(envdir, program) |
| if os.path.isfile(program_path) and os.access(program_path, os.X_OK): |
| return program_path |
|
|
|
|
| def get_encoder_name(): |
| """ |
| Return enconder default application for system, either avconv or ffmpeg |
| """ |
| if which("avconv"): |
| return "avconv" |
| elif which("ffmpeg"): |
| return "ffmpeg" |
| else: |
| |
| warn("Couldn't find ffmpeg or avconv - defaulting to ffmpeg, but may not work", RuntimeWarning) |
| return "ffmpeg" |
|
|
|
|
| def get_player_name(): |
| """ |
| Return enconder default application for system, either avconv or ffmpeg |
| """ |
| if which("avplay"): |
| return "avplay" |
| elif which("ffplay"): |
| return "ffplay" |
| else: |
| |
| warn("Couldn't find ffplay or avplay - defaulting to ffplay, but may not work", RuntimeWarning) |
| return "ffplay" |
|
|
|
|
| def get_prober_name(): |
| """ |
| Return probe application, either avconv or ffmpeg |
| """ |
| if which("avprobe"): |
| return "avprobe" |
| elif which("ffprobe"): |
| return "ffprobe" |
| else: |
| |
| warn("Couldn't find ffprobe or avprobe - defaulting to ffprobe, but may not work", RuntimeWarning) |
| return "ffprobe" |
|
|
|
|
| def fsdecode(filename): |
| """Wrapper for os.fsdecode which was introduced in python 3.2 .""" |
|
|
| if sys.version_info >= (3, 2): |
| PathLikeTypes = (basestring, bytes) |
| if sys.version_info >= (3, 6): |
| PathLikeTypes += (os.PathLike,) |
| if isinstance(filename, PathLikeTypes): |
| return os.fsdecode(filename) |
| else: |
| if isinstance(filename, bytes): |
| return filename.decode(sys.getfilesystemencoding()) |
| if isinstance(filename, basestring): |
| return filename |
|
|
| raise TypeError("type {0} not accepted by fsdecode".format(type(filename))) |
|
|
|
|
| def get_extra_info(stderr): |
| """ |
| avprobe sometimes gives more information on stderr than |
| on the json output. The information has to be extracted |
| from stderr of the format of: |
| ' Stream #0:0: Audio: flac, 88200 Hz, stereo, s32 (24 bit)' |
| or (macOS version): |
| ' Stream #0:0: Audio: vorbis' |
| ' 44100 Hz, stereo, fltp, 320 kb/s' |
| |
| :type stderr: str |
| :rtype: list of dict |
| """ |
| extra_info = {} |
|
|
| re_stream = r'(?P<space_start> +)Stream #0[:\.](?P<stream_id>([0-9]+))(?P<content_0>.+)\n?(?! *Stream)((?P<space_end> +)(?P<content_1>.+))?' |
| for i in re.finditer(re_stream, stderr): |
| if i.group('space_end') is not None and len(i.group('space_start')) <= len( |
| i.group('space_end')): |
| content_line = ','.join([i.group('content_0'), i.group('content_1')]) |
| else: |
| content_line = i.group('content_0') |
| tokens = [x.strip() for x in re.split('[:,]', content_line) if x] |
| extra_info[int(i.group('stream_id'))] = tokens |
| return extra_info |
|
|
|
|
| def mediainfo_json(filepath, read_ahead_limit=-1): |
| """Return json dictionary with media info(codec, duration, size, bitrate...) from filepath |
| """ |
| prober = get_prober_name() |
| command_args = [ |
| "-v", "info", |
| "-show_format", |
| "-show_streams", |
| ] |
| try: |
| command_args += [fsdecode(filepath)] |
| stdin_parameter = None |
| stdin_data = None |
| except TypeError: |
| if prober == 'ffprobe': |
| command_args += ["-read_ahead_limit", str(read_ahead_limit), |
| "cache:pipe:0"] |
| else: |
| command_args += ["-"] |
| stdin_parameter = PIPE |
| file, close_file = _fd_or_path_or_tempfile(filepath, 'rb', tempfile=False) |
| file.seek(0) |
| stdin_data = file.read() |
| if close_file: |
| file.close() |
|
|
| command = [prober, '-of', 'json'] + command_args |
| res = Popen(command, stdin=stdin_parameter, stdout=PIPE, stderr=PIPE) |
| output, stderr = res.communicate(input=stdin_data) |
| output = output.decode("utf-8", 'ignore') |
| stderr = stderr.decode("utf-8", 'ignore') |
|
|
| info = json.loads(output) |
|
|
| if not info: |
| |
| |
| return info |
|
|
| extra_info = get_extra_info(stderr) |
|
|
| audio_streams = [x for x in info['streams'] if x['codec_type'] == 'audio'] |
| if len(audio_streams) == 0: |
| return info |
|
|
| |
| stream = audio_streams[0] |
|
|
| def set_property(stream, prop, value): |
| if prop not in stream or stream[prop] == 0: |
| stream[prop] = value |
|
|
| for token in extra_info[stream['index']]: |
| m = re.match('([su]([0-9]{1,2})p?) \(([0-9]{1,2}) bit\)$', token) |
| m2 = re.match('([su]([0-9]{1,2})p?)( \(default\))?$', token) |
| if m: |
| set_property(stream, 'sample_fmt', m.group(1)) |
| set_property(stream, 'bits_per_sample', int(m.group(2))) |
| set_property(stream, 'bits_per_raw_sample', int(m.group(3))) |
| elif m2: |
| set_property(stream, 'sample_fmt', m2.group(1)) |
| set_property(stream, 'bits_per_sample', int(m2.group(2))) |
| set_property(stream, 'bits_per_raw_sample', int(m2.group(2))) |
| elif re.match('(flt)p?( \(default\))?$', token): |
| set_property(stream, 'sample_fmt', token) |
| set_property(stream, 'bits_per_sample', 32) |
| set_property(stream, 'bits_per_raw_sample', 32) |
| elif re.match('(dbl)p?( \(default\))?$', token): |
| set_property(stream, 'sample_fmt', token) |
| set_property(stream, 'bits_per_sample', 64) |
| set_property(stream, 'bits_per_raw_sample', 64) |
| return info |
|
|
|
|
| def mediainfo(filepath): |
| """Return dictionary with media info(codec, duration, size, bitrate...) from filepath |
| """ |
|
|
| prober = get_prober_name() |
| command_args = [ |
| "-v", "quiet", |
| "-show_format", |
| "-show_streams", |
| filepath |
| ] |
|
|
| command = [prober, '-of', 'old'] + command_args |
| res = Popen(command, stdout=PIPE) |
| output = res.communicate()[0].decode("utf-8") |
|
|
| if res.returncode != 0: |
| command = [prober] + command_args |
| output = Popen(command, stdout=PIPE).communicate()[0].decode("utf-8") |
|
|
| rgx = re.compile(r"(?:(?P<inner_dict>.*?):)?(?P<key>.*?)\=(?P<value>.*?)$") |
| info = {} |
|
|
| if sys.platform == 'win32': |
| output = output.replace("\r", "") |
|
|
| for line in output.split("\n"): |
| |
| mobj = rgx.match(line) |
|
|
| if mobj: |
| |
| inner_dict, key, value = mobj.groups() |
|
|
| if inner_dict: |
| try: |
| info[inner_dict] |
| except KeyError: |
| info[inner_dict] = {} |
| info[inner_dict][key] = value |
| else: |
| info[key] = value |
|
|
| return info |
|
|
|
|
| def cache_codecs(function): |
| cache = {} |
|
|
| @wraps(function) |
| def wrapper(): |
| try: |
| return cache[0] |
| except: |
| cache[0] = function() |
| return cache[0] |
|
|
| return wrapper |
|
|
|
|
| @cache_codecs |
| def get_supported_codecs(): |
| encoder = get_encoder_name() |
| command = [encoder, "-codecs"] |
| res = Popen(command, stdout=PIPE, stderr=PIPE) |
| output = res.communicate()[0].decode("utf-8") |
| if res.returncode != 0: |
| return [] |
|
|
| if sys.platform == 'win32': |
| output = output.replace("\r", "") |
|
|
|
|
| rgx = re.compile(r"^([D.][E.][AVS.][I.][L.][S.]) (\w*) +(.*)") |
| decoders = set() |
| encoders = set() |
| for line in output.split('\n'): |
| match = rgx.match(line.strip()) |
| if not match: |
| continue |
| flags, codec, name = match.groups() |
|
|
| if flags[0] == 'D': |
| decoders.add(codec) |
|
|
| if flags[1] == 'E': |
| encoders.add(codec) |
|
|
| return (decoders, encoders) |
|
|
|
|
| def get_supported_decoders(): |
| return get_supported_codecs()[0] |
|
|
|
|
| def get_supported_encoders(): |
| return get_supported_codecs()[1] |
|
|
| def stereo_to_ms(audio_segment): |
| ''' |
| Left-Right -> Mid-Side |
| ''' |
| channel = audio_segment.split_to_mono() |
| channel = [channel[0].overlay(channel[1]), channel[0].overlay(channel[1].invert_phase())] |
| return AudioSegment.from_mono_audiosegments(channel[0], channel[1]) |
|
|
| def ms_to_stereo(audio_segment): |
| ''' |
| Mid-Side -> Left-Right |
| ''' |
| channel = audio_segment.split_to_mono() |
| channel = [channel[0].overlay(channel[1]) - 3, channel[0].overlay(channel[1].invert_phase()) - 3] |
| return AudioSegment.from_mono_audiosegments(channel[0], channel[1]) |
|
|
|
|