| from __future__ import division |
|
|
| import array |
| import os |
| import subprocess |
| from tempfile import TemporaryFile, NamedTemporaryFile |
| import wave |
| import sys |
| import struct |
| from .logging_utils import log_conversion, log_subprocess_output |
| from .utils import mediainfo_json, fsdecode |
| import base64 |
| from collections import namedtuple |
|
|
| try: |
| from StringIO import StringIO |
| except: |
| from io import StringIO |
|
|
| from io import BytesIO |
|
|
| try: |
| from itertools import izip |
| except: |
| izip = zip |
|
|
| from .utils import ( |
| _fd_or_path_or_tempfile, |
| db_to_float, |
| ratio_to_db, |
| get_encoder_name, |
| get_array_type, |
| audioop, |
| ) |
| from .exceptions import ( |
| TooManyMissingFrames, |
| InvalidDuration, |
| InvalidID3TagVersion, |
| InvalidTag, |
| CouldntDecodeError, |
| CouldntEncodeError, |
| MissingAudioParameter, |
| ) |
|
|
| if sys.version_info >= (3, 0): |
| basestring = str |
| xrange = range |
| StringIO = BytesIO |
|
|
|
|
| class ClassPropertyDescriptor(object): |
|
|
| def __init__(self, fget, fset=None): |
| self.fget = fget |
| self.fset = fset |
|
|
| def __get__(self, obj, klass=None): |
| if klass is None: |
| klass = type(obj) |
| return self.fget.__get__(obj, klass)() |
|
|
| def __set__(self, obj, value): |
| if not self.fset: |
| raise AttributeError("can't set attribute") |
| type_ = type(obj) |
| return self.fset.__get__(obj, type_)(value) |
|
|
| def setter(self, func): |
| if not isinstance(func, (classmethod, staticmethod)): |
| func = classmethod(func) |
| self.fset = func |
| return self |
|
|
|
|
| def classproperty(func): |
| if not isinstance(func, (classmethod, staticmethod)): |
| func = classmethod(func) |
|
|
| return ClassPropertyDescriptor(func) |
|
|
|
|
| AUDIO_FILE_EXT_ALIASES = { |
| "m4a": "mp4", |
| "wave": "wav", |
| } |
|
|
| WavSubChunk = namedtuple('WavSubChunk', ['id', 'position', 'size']) |
| WavData = namedtuple('WavData', ['audio_format', 'channels', 'sample_rate', |
| 'bits_per_sample', 'raw_data']) |
|
|
|
|
| def extract_wav_headers(data): |
| |
| pos = 12 |
| subchunks = [] |
| while pos + 8 <= len(data) and len(subchunks) < 10: |
| subchunk_id = data[pos:pos + 4] |
| subchunk_size = struct.unpack_from('<I', data[pos + 4:pos + 8])[0] |
| subchunks.append(WavSubChunk(subchunk_id, pos, subchunk_size)) |
| if subchunk_id == b'data': |
| |
| break |
| pos += subchunk_size + 8 |
|
|
| return subchunks |
|
|
|
|
| def read_wav_audio(data, headers=None): |
| if not headers: |
| headers = extract_wav_headers(data) |
|
|
| fmt = [x for x in headers if x.id == b'fmt '] |
| if not fmt or fmt[0].size < 16: |
| raise CouldntDecodeError("Couldn't find fmt header in wav data") |
| fmt = fmt[0] |
| pos = fmt.position + 8 |
| audio_format = struct.unpack_from('<H', data[pos:pos + 2])[0] |
| if audio_format != 1 and audio_format != 0xFFFE: |
| raise CouldntDecodeError("Unknown audio format 0x%X in wav data" % |
| audio_format) |
|
|
| channels = struct.unpack_from('<H', data[pos + 2:pos + 4])[0] |
| sample_rate = struct.unpack_from('<I', data[pos + 4:pos + 8])[0] |
| bits_per_sample = struct.unpack_from('<H', data[pos + 14:pos + 16])[0] |
|
|
| data_hdr = headers[-1] |
| if data_hdr.id != b'data': |
| raise CouldntDecodeError("Couldn't find data header in wav data") |
|
|
| pos = data_hdr.position + 8 |
| return WavData(audio_format, channels, sample_rate, bits_per_sample, |
| data[pos:pos + data_hdr.size]) |
|
|
|
|
| def fix_wav_headers(data): |
| headers = extract_wav_headers(data) |
| if not headers or headers[-1].id != b'data': |
| return |
|
|
| |
| if len(data) > 2**32: |
| raise CouldntDecodeError("Unable to process >4GB files") |
|
|
| |
| data[4:8] = struct.pack('<I', len(data) - 8) |
|
|
| |
| pos = headers[-1].position |
| data[pos + 4:pos + 8] = struct.pack('<I', len(data) - pos - 8) |
|
|
|
|
| class AudioSegment(object): |
| """ |
| AudioSegments are *immutable* objects representing segments of audio |
| that can be manipulated using python code. |
| |
| AudioSegments are slicable using milliseconds. |
| for example: |
| a = AudioSegment.from_mp3(mp3file) |
| first_second = a[:1000] # get the first second of an mp3 |
| slice = a[5000:10000] # get a slice from 5 to 10 seconds of an mp3 |
| """ |
| converter = get_encoder_name() |
|
|
| |
| |
| @classproperty |
| def ffmpeg(cls): |
| return cls.converter |
|
|
| @ffmpeg.setter |
| def ffmpeg(cls, val): |
| cls.converter = val |
|
|
| DEFAULT_CODECS = { |
| "ogg": "libvorbis" |
| } |
|
|
| def __init__(self, data=None, *args, **kwargs): |
| self.sample_width = kwargs.pop("sample_width", None) |
| self.frame_rate = kwargs.pop("frame_rate", None) |
| self.channels = kwargs.pop("channels", None) |
|
|
| audio_params = (self.sample_width, self.frame_rate, self.channels) |
|
|
| if isinstance(data, array.array): |
| try: |
| data = data.tobytes() |
| except: |
| data = data.tostring() |
|
|
| |
| if any(audio_params) and None in audio_params: |
| raise MissingAudioParameter("Either all audio parameters or no parameter must be specified") |
|
|
| |
| elif self.sample_width is not None: |
| if len(data) % (self.sample_width * self.channels) != 0: |
| raise ValueError("data length must be a multiple of '(sample_width * channels)'") |
|
|
| self.frame_width = self.channels * self.sample_width |
| self._data = data |
|
|
| |
| elif kwargs.get('metadata', False): |
| |
| self._data = data |
| for attr, val in kwargs.pop('metadata').items(): |
| setattr(self, attr, val) |
| else: |
| |
| try: |
| data = data if isinstance(data, (basestring, bytes)) else data.read() |
| except(OSError): |
| d = b'' |
| reader = data.read(2 ** 31 - 1) |
| while reader: |
| d += reader |
| reader = data.read(2 ** 31 - 1) |
| data = d |
|
|
| wav_data = read_wav_audio(data) |
| if not wav_data: |
| raise CouldntDecodeError("Couldn't read wav audio from data") |
|
|
| self.channels = wav_data.channels |
| self.sample_width = wav_data.bits_per_sample // 8 |
| self.frame_rate = wav_data.sample_rate |
| self.frame_width = self.channels * self.sample_width |
| self._data = wav_data.raw_data |
| if self.sample_width == 1: |
| |
| self._data = audioop.bias(self._data, 1, -128) |
|
|
| |
| |
| if self.sample_width == 3: |
| byte_buffer = BytesIO() |
|
|
| |
| |
| pack_fmt = 'BBB' if isinstance(self._data[0], int) else 'ccc' |
|
|
| |
| |
| |
| i = iter(self._data) |
| padding = {False: b'\x00', True: b'\xFF'} |
| for b0, b1, b2 in izip(i, i, i): |
| byte_buffer.write(padding[b2 > b'\x7f'[0]]) |
| old_bytes = struct.pack(pack_fmt, b0, b1, b2) |
| byte_buffer.write(old_bytes) |
|
|
| self._data = byte_buffer.getvalue() |
| self.sample_width = 4 |
| self.frame_width = self.channels * self.sample_width |
|
|
| super(AudioSegment, self).__init__(*args, **kwargs) |
|
|
| @property |
| def raw_data(self): |
| """ |
| public access to the raw audio data as a bytestring |
| """ |
| return self._data |
|
|
| def get_array_of_samples(self, array_type_override=None): |
| """ |
| returns the raw_data as an array of samples |
| """ |
| if array_type_override is None: |
| array_type_override = self.array_type |
| return array.array(array_type_override, self._data) |
|
|
| @property |
| def array_type(self): |
| return get_array_type(self.sample_width * 8) |
|
|
| def __len__(self): |
| """ |
| returns the length of this audio segment in milliseconds |
| """ |
| return round(1000 * (self.frame_count() / self.frame_rate)) |
|
|
| def __eq__(self, other): |
| try: |
| return self._data == other._data |
| except: |
| return False |
|
|
| def __hash__(self): |
| return hash(AudioSegment) ^ hash((self.channels, self.frame_rate, self.sample_width, self._data)) |
|
|
| def __ne__(self, other): |
| return not (self == other) |
|
|
| def __iter__(self): |
| return (self[i] for i in xrange(len(self))) |
|
|
| def __getitem__(self, millisecond): |
| if isinstance(millisecond, slice): |
| if millisecond.step: |
| return ( |
| self[i:i + millisecond.step] |
| for i in xrange(*millisecond.indices(len(self))) |
| ) |
|
|
| start = millisecond.start if millisecond.start is not None else 0 |
| end = millisecond.stop if millisecond.stop is not None \ |
| else len(self) |
|
|
| start = min(start, len(self)) |
| end = min(end, len(self)) |
| else: |
| start = millisecond |
| end = millisecond + 1 |
|
|
| start = self._parse_position(start) * self.frame_width |
| end = self._parse_position(end) * self.frame_width |
| data = self._data[start:end] |
|
|
| |
| expected_length = end - start |
| missing_frames = (expected_length - len(data)) // self.frame_width |
| if missing_frames: |
| if missing_frames > self.frame_count(ms=2): |
| raise TooManyMissingFrames( |
| "You should never be filling in " |
| " more than 2 ms with silence here, " |
| "missing frames: %s" % missing_frames) |
| silence = audioop.mul(data[:self.frame_width], |
| self.sample_width, 0) |
| data += (silence * missing_frames) |
|
|
| return self._spawn(data) |
|
|
| def get_sample_slice(self, start_sample=None, end_sample=None): |
| """ |
| Get a section of the audio segment by sample index. |
| |
| NOTE: Negative indices do *not* address samples backword |
| from the end of the audio segment like a python list. |
| This is intentional. |
| """ |
| max_val = int(self.frame_count()) |
|
|
| def bounded(val, default): |
| if val is None: |
| return default |
| if val < 0: |
| return 0 |
| if val > max_val: |
| return max_val |
| return val |
|
|
| start_i = bounded(start_sample, 0) * self.frame_width |
| end_i = bounded(end_sample, max_val) * self.frame_width |
|
|
| data = self._data[start_i:end_i] |
| return self._spawn(data) |
|
|
| def __add__(self, arg): |
| if isinstance(arg, AudioSegment): |
| return self.append(arg, crossfade=0) |
| else: |
| return self.apply_gain(arg) |
|
|
| def __radd__(self, rarg): |
| """ |
| Permit use of sum() builtin with an iterable of AudioSegments |
| """ |
| if rarg == 0: |
| return self |
| raise TypeError("Gains must be the second addend after the " |
| "AudioSegment") |
|
|
| def __sub__(self, arg): |
| if isinstance(arg, AudioSegment): |
| raise TypeError("AudioSegment objects can't be subtracted from " |
| "each other") |
| else: |
| return self.apply_gain(-arg) |
|
|
| def __mul__(self, arg): |
| """ |
| If the argument is an AudioSegment, overlay the multiplied audio |
| segment. |
| |
| If it's a number, just use the string multiply operation to repeat the |
| audio. |
| |
| The following would return an AudioSegment that contains the |
| audio of audio_seg eight times |
| |
| `audio_seg * 8` |
| """ |
| if isinstance(arg, AudioSegment): |
| return self.overlay(arg, position=0, loop=True) |
| else: |
| return self._spawn(data=self._data * arg) |
|
|
| def _spawn(self, data, overrides={}): |
| """ |
| Creates a new audio segment using the metadata from the current one |
| and the data passed in. Should be used whenever an AudioSegment is |
| being returned by an operation that would alters the current one, |
| since AudioSegment objects are immutable. |
| """ |
| |
| if isinstance(data, list): |
| data = b''.join(data) |
|
|
| if isinstance(data, array.array): |
| try: |
| data = data.tobytes() |
| except: |
| data = data.tostring() |
|
|
| |
| if hasattr(data, 'read'): |
| if hasattr(data, 'seek'): |
| data.seek(0) |
| data = data.read() |
|
|
| metadata = { |
| 'sample_width': self.sample_width, |
| 'frame_rate': self.frame_rate, |
| 'frame_width': self.frame_width, |
| 'channels': self.channels |
| } |
| metadata.update(overrides) |
| return self.__class__(data=data, metadata=metadata) |
|
|
| @classmethod |
| def _sync(cls, *segs): |
| channels = max(seg.channels for seg in segs) |
| frame_rate = max(seg.frame_rate for seg in segs) |
| sample_width = max(seg.sample_width for seg in segs) |
|
|
| return tuple( |
| seg.set_channels(channels).set_frame_rate(frame_rate).set_sample_width(sample_width) |
| for seg in segs |
| ) |
|
|
| def _parse_position(self, val): |
| if val < 0: |
| val = len(self) - abs(val) |
| val = self.frame_count(ms=len(self)) if val == float("inf") else \ |
| self.frame_count(ms=val) |
| return int(val) |
|
|
| @classmethod |
| def empty(cls): |
| return cls(b'', metadata={ |
| "channels": 1, |
| "sample_width": 1, |
| "frame_rate": 1, |
| "frame_width": 1 |
| }) |
|
|
| @classmethod |
| def silent(cls, duration=1000, frame_rate=11025): |
| """ |
| Generate a silent audio segment. |
| duration specified in milliseconds (default duration: 1000ms, default frame_rate: 11025). |
| """ |
| frames = int(frame_rate * (duration / 1000.0)) |
| data = b"\0\0" * frames |
| return cls(data, metadata={"channels": 1, |
| "sample_width": 2, |
| "frame_rate": frame_rate, |
| "frame_width": 2}) |
|
|
| @classmethod |
| def from_mono_audiosegments(cls, *mono_segments): |
| if not len(mono_segments): |
| raise ValueError("At least one AudioSegment instance is required") |
|
|
| segs = cls._sync(*mono_segments) |
|
|
| if segs[0].channels != 1: |
| raise ValueError( |
| "AudioSegment.from_mono_audiosegments requires all arguments are mono AudioSegment instances") |
|
|
| channels = len(segs) |
| sample_width = segs[0].sample_width |
| frame_rate = segs[0].frame_rate |
|
|
| frame_count = max(int(seg.frame_count()) for seg in segs) |
| data = array.array( |
| segs[0].array_type, |
| b'\0' * (frame_count * sample_width * channels) |
| ) |
|
|
| for i, seg in enumerate(segs): |
| data[i::channels] = seg.get_array_of_samples() |
|
|
| return cls( |
| data, |
| channels=channels, |
| sample_width=sample_width, |
| frame_rate=frame_rate, |
| ) |
|
|
| @classmethod |
| def from_file_using_temporary_files(cls, file, format=None, codec=None, parameters=None, start_second=None, duration=None, **kwargs): |
| orig_file = file |
| file, close_file = _fd_or_path_or_tempfile(file, 'rb', tempfile=False) |
|
|
| if format: |
| format = format.lower() |
| format = AUDIO_FILE_EXT_ALIASES.get(format, format) |
|
|
| def is_format(f): |
| f = f.lower() |
| if format == f: |
| return True |
| if isinstance(orig_file, basestring): |
| return orig_file.lower().endswith(".{0}".format(f)) |
| if isinstance(orig_file, bytes): |
| return orig_file.lower().endswith((".{0}".format(f)).encode('utf8')) |
| return False |
|
|
| if is_format("wav"): |
| try: |
| obj = cls._from_safe_wav(file) |
| if close_file: |
| file.close() |
| if start_second is None and duration is None: |
| return obj |
| elif start_second is not None and duration is None: |
| return obj[start_second*1000:] |
| elif start_second is None and duration is not None: |
| return obj[:duration*1000] |
| else: |
| return obj[start_second*1000:(start_second+duration)*1000] |
| except: |
| file.seek(0) |
| elif is_format("raw") or is_format("pcm"): |
| sample_width = kwargs['sample_width'] |
| frame_rate = kwargs['frame_rate'] |
| channels = kwargs['channels'] |
| metadata = { |
| 'sample_width': sample_width, |
| 'frame_rate': frame_rate, |
| 'channels': channels, |
| 'frame_width': channels * sample_width |
| } |
| obj = cls(data=file.read(), metadata=metadata) |
| if close_file: |
| file.close() |
| if start_second is None and duration is None: |
| return obj |
| elif start_second is not None and duration is None: |
| return obj[start_second * 1000:] |
| elif start_second is None and duration is not None: |
| return obj[:duration * 1000] |
| else: |
| return obj[start_second * 1000:(start_second + duration) * 1000] |
|
|
| input_file = NamedTemporaryFile(mode='wb', delete=False) |
| try: |
| input_file.write(file.read()) |
| except(OSError): |
| input_file.flush() |
| input_file.close() |
| input_file = NamedTemporaryFile(mode='wb', delete=False, buffering=2 ** 31 - 1) |
| if close_file: |
| file.close() |
| close_file = True |
| file = open(orig_file, buffering=2 ** 13 - 1, mode='rb') |
| reader = file.read(2 ** 31 - 1) |
| while reader: |
| input_file.write(reader) |
| reader = file.read(2 ** 31 - 1) |
| input_file.flush() |
| if close_file: |
| file.close() |
|
|
| output = NamedTemporaryFile(mode="rb", delete=False) |
|
|
| conversion_command = [cls.converter, |
| '-y', |
| ] |
|
|
| |
| |
| if format: |
| conversion_command += ["-f", format] |
|
|
| if codec: |
| |
| conversion_command += ["-acodec", codec] |
|
|
| conversion_command += [ |
| "-i", input_file.name, |
| "-vn", |
| "-f", "wav" |
| ] |
|
|
| if start_second is not None: |
| conversion_command += ["-ss", str(start_second)] |
|
|
| if duration is not None: |
| conversion_command += ["-t", str(duration)] |
|
|
| conversion_command += [output.name] |
|
|
| if parameters is not None: |
| |
| conversion_command.extend(parameters) |
|
|
| log_conversion(conversion_command) |
|
|
| with open(os.devnull, 'rb') as devnull: |
| p = subprocess.Popen(conversion_command, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| p_out, p_err = p.communicate() |
|
|
| log_subprocess_output(p_out) |
| log_subprocess_output(p_err) |
|
|
| try: |
| if p.returncode != 0: |
| raise CouldntDecodeError( |
| "Decoding failed. ffmpeg returned error code: {0}\n\nOutput from ffmpeg/avlib:\n\n{1}".format( |
| p.returncode, p_err.decode(errors='ignore') )) |
| obj = cls._from_safe_wav(output) |
| finally: |
| input_file.close() |
| output.close() |
| os.unlink(input_file.name) |
| os.unlink(output.name) |
|
|
| if start_second is None and duration is None: |
| return obj |
| elif start_second is not None and duration is None: |
| return obj[0:] |
| elif start_second is None and duration is not None: |
| return obj[:duration * 1000] |
| else: |
| return obj[0:duration * 1000] |
|
|
|
|
| @classmethod |
| def from_file(cls, file, format=None, codec=None, parameters=None, start_second=None, duration=None, **kwargs): |
| orig_file = file |
| try: |
| filename = fsdecode(file) |
| except TypeError: |
| filename = None |
| file, close_file = _fd_or_path_or_tempfile(file, 'rb', tempfile=False) |
|
|
| if format: |
| format = format.lower() |
| format = AUDIO_FILE_EXT_ALIASES.get(format, format) |
|
|
| def is_format(f): |
| f = f.lower() |
| if format == f: |
| return True |
|
|
| if filename: |
| return filename.lower().endswith(".{0}".format(f)) |
|
|
| return False |
|
|
| if is_format("wav"): |
| try: |
| if start_second is None and duration is None: |
| return cls._from_safe_wav(file) |
| elif start_second is not None and duration is None: |
| return cls._from_safe_wav(file)[start_second*1000:] |
| elif start_second is None and duration is not None: |
| return cls._from_safe_wav(file)[:duration*1000] |
| else: |
| return cls._from_safe_wav(file)[start_second*1000:(start_second+duration)*1000] |
| except: |
| file.seek(0) |
| elif is_format("raw") or is_format("pcm"): |
| sample_width = kwargs['sample_width'] |
| frame_rate = kwargs['frame_rate'] |
| channels = kwargs['channels'] |
| metadata = { |
| 'sample_width': sample_width, |
| 'frame_rate': frame_rate, |
| 'channels': channels, |
| 'frame_width': channels * sample_width |
| } |
| if start_second is None and duration is None: |
| return cls(data=file.read(), metadata=metadata) |
| elif start_second is not None and duration is None: |
| return cls(data=file.read(), metadata=metadata)[start_second*1000:] |
| elif start_second is None and duration is not None: |
| return cls(data=file.read(), metadata=metadata)[:duration*1000] |
| else: |
| return cls(data=file.read(), metadata=metadata)[start_second*1000:(start_second+duration)*1000] |
|
|
| conversion_command = [cls.converter, |
| '-y', |
| ] |
|
|
| |
| |
| if format: |
| conversion_command += ["-f", format] |
|
|
| if codec: |
| |
| conversion_command += ["-acodec", codec] |
|
|
| read_ahead_limit = kwargs.get('read_ahead_limit', -1) |
| if filename: |
| conversion_command += ["-i", filename] |
| stdin_parameter = None |
| stdin_data = None |
| else: |
| if cls.converter == 'ffmpeg': |
| conversion_command += ["-read_ahead_limit", str(read_ahead_limit), |
| "-i", "cache:pipe:0"] |
| else: |
| conversion_command += ["-i", "-"] |
| stdin_parameter = subprocess.PIPE |
| stdin_data = file.read() |
|
|
| if codec: |
| info = None |
| else: |
| info = mediainfo_json(orig_file, read_ahead_limit=read_ahead_limit) |
| if info: |
| audio_streams = [x for x in info['streams'] |
| if x['codec_type'] == 'audio'] |
| |
| |
| audio_codec = audio_streams[0].get('codec_name') |
| if (audio_streams[0].get('sample_fmt') == 'fltp' and |
| audio_codec in ['mp3', 'mp4', 'aac', 'webm', 'ogg']): |
| bits_per_sample = 16 |
| else: |
| bits_per_sample = audio_streams[0]['bits_per_sample'] |
| if bits_per_sample == 8: |
| acodec = 'pcm_u8' |
| else: |
| acodec = 'pcm_s%dle' % bits_per_sample |
|
|
| conversion_command += ["-acodec", acodec] |
|
|
| conversion_command += [ |
| "-vn", |
| "-f", "wav" |
| ] |
|
|
| if start_second is not None: |
| conversion_command += ["-ss", str(start_second)] |
|
|
| if duration is not None: |
| conversion_command += ["-t", str(duration)] |
|
|
| conversion_command += ["-"] |
|
|
| if parameters is not None: |
| |
| conversion_command.extend(parameters) |
|
|
| log_conversion(conversion_command) |
|
|
| p = subprocess.Popen(conversion_command, stdin=stdin_parameter, |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| p_out, p_err = p.communicate(input=stdin_data) |
|
|
| if p.returncode != 0 or len(p_out) == 0: |
| if close_file: |
| file.close() |
| raise CouldntDecodeError( |
| "Decoding failed. ffmpeg returned error code: {0}\n\nOutput from ffmpeg/avlib:\n\n{1}".format( |
| p.returncode, p_err.decode(errors='ignore') )) |
|
|
| p_out = bytearray(p_out) |
| fix_wav_headers(p_out) |
| p_out = bytes(p_out) |
| obj = cls(p_out) |
|
|
| if close_file: |
| file.close() |
|
|
| if start_second is None and duration is None: |
| return obj |
| elif start_second is not None and duration is None: |
| return obj[0:] |
| elif start_second is None and duration is not None: |
| return obj[:duration * 1000] |
| else: |
| return obj[0:duration * 1000] |
|
|
| @classmethod |
| def from_mp3(cls, file, parameters=None): |
| return cls.from_file(file, 'mp3', parameters=parameters) |
|
|
| @classmethod |
| def from_flv(cls, file, parameters=None): |
| return cls.from_file(file, 'flv', parameters=parameters) |
|
|
| @classmethod |
| def from_ogg(cls, file, parameters=None): |
| return cls.from_file(file, 'ogg', parameters=parameters) |
|
|
| @classmethod |
| def from_wav(cls, file, parameters=None): |
| return cls.from_file(file, 'wav', parameters=parameters) |
|
|
| @classmethod |
| def from_raw(cls, file, **kwargs): |
| return cls.from_file(file, 'raw', sample_width=kwargs['sample_width'], frame_rate=kwargs['frame_rate'], |
| channels=kwargs['channels']) |
|
|
| @classmethod |
| def _from_safe_wav(cls, file): |
| file, close_file = _fd_or_path_or_tempfile(file, 'rb', tempfile=False) |
| file.seek(0) |
| obj = cls(data=file) |
| if close_file: |
| file.close() |
| return obj |
|
|
| def export(self, out_f=None, format='mp3', codec=None, bitrate=None, parameters=None, tags=None, id3v2_version='4', |
| cover=None): |
| """ |
| Export an AudioSegment to a file with given options |
| |
| out_f (string): |
| Path to destination audio file. Also accepts os.PathLike objects on |
| python >= 3.6 |
| |
| format (string) |
| Format for destination audio file. |
| ('mp3', 'wav', 'raw', 'ogg' or other ffmpeg/avconv supported files) |
| |
| codec (string) |
| Codec used to encode the destination file. |
| |
| bitrate (string) |
| Bitrate used when encoding destination file. (64, 92, 128, 256, 312k...) |
| Each codec accepts different bitrate arguments so take a look at the |
| ffmpeg documentation for details (bitrate usually shown as -b, -ba or |
| -a:b). |
| |
| parameters (list of strings) |
| Aditional ffmpeg/avconv parameters |
| |
| tags (dict) |
| Set metadata information to destination files |
| usually used as tags. ({title='Song Title', artist='Song Artist'}) |
| |
| id3v2_version (string) |
| Set ID3v2 version for tags. (default: '4') |
| |
| cover (file) |
| Set cover for audio file from image file. (png or jpg) |
| """ |
| id3v2_allowed_versions = ['3', '4'] |
|
|
| if format == "raw" and (codec is not None or parameters is not None): |
| raise AttributeError( |
| 'Can not invoke ffmpeg when export format is "raw"; ' |
| 'specify an ffmpeg raw format like format="s16le" instead ' |
| 'or call export(format="raw") with no codec or parameters') |
|
|
| out_f, _ = _fd_or_path_or_tempfile(out_f, 'wb+') |
| out_f.seek(0) |
|
|
| if format == "raw": |
| out_f.write(self._data) |
| out_f.seek(0) |
| return out_f |
|
|
| |
| easy_wav = format == "wav" and codec is None and parameters is None |
|
|
| if easy_wav: |
| data = out_f |
| else: |
| data = NamedTemporaryFile(mode="wb", delete=False) |
|
|
| pcm_for_wav = self._data |
| if self.sample_width == 1: |
| |
| pcm_for_wav = audioop.bias(self._data, 1, 128) |
|
|
| wave_data = wave.open(data, 'wb') |
| wave_data.setnchannels(self.channels) |
| wave_data.setsampwidth(self.sample_width) |
| wave_data.setframerate(self.frame_rate) |
| |
| |
| wave_data.setnframes(int(self.frame_count())) |
| wave_data.writeframesraw(pcm_for_wav) |
| wave_data.close() |
|
|
| |
| if easy_wav: |
| out_f.seek(0) |
| return out_f |
|
|
| output = NamedTemporaryFile(mode="w+b", delete=False) |
|
|
| |
| conversion_command = [ |
| self.converter, |
| '-y', |
| "-f", "wav", "-i", data.name, |
| ] |
|
|
| if codec is None: |
| codec = self.DEFAULT_CODECS.get(format, None) |
|
|
| if cover is not None: |
| if cover.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tif', '.tiff')) and format == "mp3": |
| conversion_command.extend(["-i", cover, "-map", "0", "-map", "1", "-c:v", "mjpeg"]) |
| else: |
| raise AttributeError( |
| "Currently cover images are only supported by MP3 files. The allowed image formats are: .tif, .jpg, .bmp, .jpeg and .png.") |
|
|
| if codec is not None: |
| |
| conversion_command.extend(["-acodec", codec]) |
|
|
| if bitrate is not None: |
| conversion_command.extend(["-b:a", bitrate]) |
|
|
| if parameters is not None: |
| |
| conversion_command.extend(parameters) |
|
|
| if tags is not None: |
| if not isinstance(tags, dict): |
| raise InvalidTag("Tags must be a dictionary.") |
| else: |
| |
| |
| for key, value in tags.items(): |
| conversion_command.extend( |
| ['-metadata', '{0}={1}'.format(key, value)]) |
|
|
| if format == 'mp3': |
| |
| if id3v2_version not in id3v2_allowed_versions: |
| raise InvalidID3TagVersion( |
| "id3v2_version not allowed, allowed versions: %s" % id3v2_allowed_versions) |
| conversion_command.extend([ |
| "-id3v2_version", id3v2_version |
| ]) |
|
|
| if sys.platform == 'darwin' and codec == 'mp3': |
| conversion_command.extend(["-write_xing", "0"]) |
|
|
| conversion_command.extend([ |
| "-f", format, output.name, |
| ]) |
|
|
| log_conversion(conversion_command) |
|
|
| |
| with open(os.devnull, 'rb') as devnull: |
| p = subprocess.Popen(conversion_command, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| p_out, p_err = p.communicate() |
|
|
| log_subprocess_output(p_out) |
| log_subprocess_output(p_err) |
|
|
| if p.returncode != 0: |
| raise CouldntEncodeError( |
| "Encoding failed. ffmpeg/avlib returned error code: {0}\n\nCommand:{1}\n\nOutput from ffmpeg/avlib:\n\n{2}".format( |
| p.returncode, conversion_command, p_err.decode(errors='ignore') )) |
|
|
| output.seek(0) |
| out_f.write(output.read()) |
|
|
| data.close() |
| output.close() |
|
|
| os.unlink(data.name) |
| os.unlink(output.name) |
|
|
| out_f.seek(0) |
| return out_f |
|
|
| def get_frame(self, index): |
| frame_start = index * self.frame_width |
| frame_end = frame_start + self.frame_width |
| return self._data[frame_start:frame_end] |
|
|
| def frame_count(self, ms=None): |
| """ |
| returns the number of frames for the given number of milliseconds, or |
| if not specified, the number of frames in the whole AudioSegment |
| """ |
| if ms is not None: |
| return ms * (self.frame_rate / 1000.0) |
| else: |
| return float(len(self._data) // self.frame_width) |
|
|
| def set_sample_width(self, sample_width): |
| if sample_width == self.sample_width: |
| return self |
|
|
| frame_width = self.channels * sample_width |
|
|
| return self._spawn( |
| audioop.lin2lin(self._data, self.sample_width, sample_width), |
| overrides={'sample_width': sample_width, 'frame_width': frame_width} |
| ) |
|
|
| def set_frame_rate(self, frame_rate): |
| if frame_rate == self.frame_rate: |
| return self |
|
|
| if self._data: |
| converted, _ = audioop.ratecv(self._data, self.sample_width, |
| self.channels, self.frame_rate, |
| frame_rate, None) |
| else: |
| converted = self._data |
|
|
| return self._spawn(data=converted, |
| overrides={'frame_rate': frame_rate}) |
|
|
| def set_channels(self, channels): |
| if channels == self.channels: |
| return self |
|
|
| if channels == 2 and self.channels == 1: |
| fn = audioop.tostereo |
| frame_width = self.frame_width * 2 |
| fac = 1 |
| converted = fn(self._data, self.sample_width, fac, fac) |
| elif channels == 1 and self.channels == 2: |
| fn = audioop.tomono |
| frame_width = self.frame_width // 2 |
| fac = 0.5 |
| converted = fn(self._data, self.sample_width, fac, fac) |
| elif channels == 1: |
| channels_data = [seg.get_array_of_samples() for seg in self.split_to_mono()] |
| frame_count = int(self.frame_count()) |
| converted = array.array( |
| channels_data[0].typecode, |
| b'\0' * (frame_count * self.sample_width) |
| ) |
| for raw_channel_data in channels_data: |
| for i in range(frame_count): |
| converted[i] += raw_channel_data[i] // self.channels |
| frame_width = self.frame_width // self.channels |
| elif self.channels == 1: |
| dup_channels = [self for iChannel in range(channels)] |
| return AudioSegment.from_mono_audiosegments(*dup_channels) |
| else: |
| raise ValueError( |
| "AudioSegment.set_channels only supports mono-to-multi channel and multi-to-mono channel conversion") |
|
|
| return self._spawn(data=converted, |
| overrides={ |
| 'channels': channels, |
| 'frame_width': frame_width}) |
|
|
| def split_to_mono(self): |
| if self.channels == 1: |
| return [self] |
|
|
| samples = self.get_array_of_samples() |
|
|
| mono_channels = [] |
| for i in range(self.channels): |
| samples_for_current_channel = samples[i::self.channels] |
|
|
| try: |
| mono_data = samples_for_current_channel.tobytes() |
| except AttributeError: |
| mono_data = samples_for_current_channel.tostring() |
|
|
| mono_channels.append( |
| self._spawn(mono_data, overrides={"channels": 1, "frame_width": self.sample_width}) |
| ) |
|
|
| return mono_channels |
|
|
| @property |
| def rms(self): |
| return audioop.rms(self._data, self.sample_width) |
|
|
| @property |
| def dBFS(self): |
| rms = self.rms |
| if not rms: |
| return -float("infinity") |
| return ratio_to_db(self.rms / self.max_possible_amplitude) |
|
|
| @property |
| def max(self): |
| return audioop.max(self._data, self.sample_width) |
|
|
| @property |
| def max_possible_amplitude(self): |
| bits = self.sample_width * 8 |
| max_possible_val = (2 ** bits) |
|
|
| |
| return max_possible_val / 2 |
|
|
| @property |
| def max_dBFS(self): |
| return ratio_to_db(self.max, self.max_possible_amplitude) |
|
|
| @property |
| def duration_seconds(self): |
| return self.frame_rate and self.frame_count() / self.frame_rate or 0.0 |
|
|
| def get_dc_offset(self, channel=1): |
| """ |
| Returns a value between -1.0 and 1.0 representing the DC offset of a |
| channel (1 for left, 2 for right). |
| """ |
| if not 1 <= channel <= 2: |
| raise ValueError("channel value must be 1 (left) or 2 (right)") |
|
|
| if self.channels == 1: |
| data = self._data |
| elif channel == 1: |
| data = audioop.tomono(self._data, self.sample_width, 1, 0) |
| else: |
| data = audioop.tomono(self._data, self.sample_width, 0, 1) |
|
|
| return float(audioop.avg(data, self.sample_width)) / self.max_possible_amplitude |
|
|
| def remove_dc_offset(self, channel=None, offset=None): |
| """ |
| Removes DC offset of given channel. Calculates offset if it's not given. |
| Offset values must be in range -1.0 to 1.0. If channel is None, removes |
| DC offset from all available channels. |
| """ |
| if channel and not 1 <= channel <= 2: |
| raise ValueError("channel value must be None, 1 (left) or 2 (right)") |
|
|
| if offset and not -1.0 <= offset <= 1.0: |
| raise ValueError("offset value must be in range -1.0 to 1.0") |
|
|
| if offset: |
| offset = int(round(offset * self.max_possible_amplitude)) |
|
|
| def remove_data_dc(data, off): |
| if not off: |
| off = audioop.avg(data, self.sample_width) |
| return audioop.bias(data, self.sample_width, -off) |
|
|
| if self.channels == 1: |
| return self._spawn(data=remove_data_dc(self._data, offset)) |
|
|
| left_channel = audioop.tomono(self._data, self.sample_width, 1, 0) |
| right_channel = audioop.tomono(self._data, self.sample_width, 0, 1) |
|
|
| if not channel or channel == 1: |
| left_channel = remove_data_dc(left_channel, offset) |
|
|
| if not channel or channel == 2: |
| right_channel = remove_data_dc(right_channel, offset) |
|
|
| left_channel = audioop.tostereo(left_channel, self.sample_width, 1, 0) |
| right_channel = audioop.tostereo(right_channel, self.sample_width, 0, 1) |
|
|
| return self._spawn(data=audioop.add(left_channel, right_channel, |
| self.sample_width)) |
|
|
| def apply_gain(self, volume_change): |
| return self._spawn(data=audioop.mul(self._data, self.sample_width, |
| db_to_float(float(volume_change)))) |
|
|
| def overlay(self, seg, position=0, loop=False, times=None, gain_during_overlay=None): |
| """ |
| Overlay the provided segment on to this segment starting at the |
| specificed position and using the specfied looping beahvior. |
| |
| seg (AudioSegment): |
| The audio segment to overlay on to this one. |
| |
| position (optional int): |
| The position to start overlaying the provided segment in to this |
| one. |
| |
| loop (optional bool): |
| Loop seg as many times as necessary to match this segment's length. |
| Overrides loops param. |
| |
| times (optional int): |
| Loop seg the specified number of times or until it matches this |
| segment's length. 1 means once, 2 means twice, ... 0 would make the |
| call a no-op |
| gain_during_overlay (optional int): |
| Changes this segment's volume by the specified amount during the |
| duration of time that seg is overlaid on top of it. When negative, |
| this has the effect of 'ducking' the audio under the overlay. |
| """ |
|
|
| if loop: |
| |
| times = -1 |
| elif times is None: |
| |
| times = 1 |
| elif times == 0: |
| |
| return self._spawn(self._data) |
|
|
| output = StringIO() |
|
|
| seg1, seg2 = AudioSegment._sync(self, seg) |
| sample_width = seg1.sample_width |
| spawn = seg1._spawn |
|
|
| output.write(seg1[:position]._data) |
|
|
| |
| seg1 = seg1[position:]._data |
| seg2 = seg2._data |
| pos = 0 |
| seg1_len = len(seg1) |
| seg2_len = len(seg2) |
| while times: |
| remaining = max(0, seg1_len - pos) |
| if seg2_len >= remaining: |
| seg2 = seg2[:remaining] |
| seg2_len = remaining |
| |
| |
| times = 1 |
|
|
| if gain_during_overlay: |
| seg1_overlaid = seg1[pos:pos + seg2_len] |
| seg1_adjusted_gain = audioop.mul(seg1_overlaid, self.sample_width, |
| db_to_float(float(gain_during_overlay))) |
| output.write(audioop.add(seg1_adjusted_gain, seg2, sample_width)) |
| else: |
| output.write(audioop.add(seg1[pos:pos + seg2_len], seg2, |
| sample_width)) |
| pos += seg2_len |
|
|
| |
| times -= 1 |
|
|
| output.write(seg1[pos:]) |
|
|
| return spawn(data=output) |
|
|
| def append(self, seg, crossfade=100): |
| seg1, seg2 = AudioSegment._sync(self, seg) |
|
|
| if not crossfade: |
| return seg1._spawn(seg1._data + seg2._data) |
| elif crossfade > len(self): |
| raise ValueError("Crossfade is longer than the original AudioSegment ({}ms > {}ms)".format( |
| crossfade, len(self) |
| )) |
| elif crossfade > len(seg): |
| raise ValueError("Crossfade is longer than the appended AudioSegment ({}ms > {}ms)".format( |
| crossfade, len(seg) |
| )) |
|
|
| xf = seg1[-crossfade:].fade(to_gain=-120, start=0, end=float('inf')) |
| xf *= seg2[:crossfade].fade(from_gain=-120, start=0, end=float('inf')) |
|
|
| output = TemporaryFile() |
|
|
| output.write(seg1[:-crossfade]._data) |
| output.write(xf._data) |
| output.write(seg2[crossfade:]._data) |
|
|
| output.seek(0) |
| obj = seg1._spawn(data=output) |
| output.close() |
| return obj |
|
|
| def fade(self, to_gain=0, from_gain=0, start=None, end=None, |
| duration=None): |
| """ |
| Fade the volume of this audio segment. |
| |
| to_gain (float): |
| resulting volume_change in db |
| |
| start (int): |
| default = beginning of the segment |
| when in this segment to start fading in milliseconds |
| |
| end (int): |
| default = end of the segment |
| when in this segment to start fading in milliseconds |
| |
| duration (int): |
| default = until the end of the audio segment |
| the duration of the fade |
| """ |
| if None not in [duration, end, start]: |
| raise TypeError('Only two of the three arguments, "start", ' |
| '"end", and "duration" may be specified') |
|
|
| |
| if to_gain == 0 and from_gain == 0: |
| return self |
|
|
| start = min(len(self), start) if start is not None else None |
| end = min(len(self), end) if end is not None else None |
|
|
| if start is not None and start < 0: |
| start += len(self) |
| if end is not None and end < 0: |
| end += len(self) |
|
|
| if duration is not None and duration < 0: |
| raise InvalidDuration("duration must be a positive integer") |
|
|
| if duration: |
| if start is not None: |
| end = start + duration |
| elif end is not None: |
| start = end - duration |
| else: |
| duration = end - start |
|
|
| from_power = db_to_float(from_gain) |
|
|
| output = [] |
|
|
| |
| before_fade = self[:start]._data |
| if from_gain != 0: |
| before_fade = audioop.mul(before_fade, |
| self.sample_width, |
| from_power) |
| output.append(before_fade) |
|
|
| gain_delta = db_to_float(to_gain) - from_power |
|
|
| |
| |
| |
| if duration > 100: |
| scale_step = gain_delta / duration |
|
|
| for i in range(duration): |
| volume_change = from_power + (scale_step * i) |
| chunk = self[start + i] |
| chunk = audioop.mul(chunk._data, |
| self.sample_width, |
| volume_change) |
|
|
| output.append(chunk) |
| else: |
| start_frame = self.frame_count(ms=start) |
| end_frame = self.frame_count(ms=end) |
| fade_frames = end_frame - start_frame |
| scale_step = gain_delta / fade_frames |
|
|
| for i in range(int(fade_frames)): |
| volume_change = from_power + (scale_step * i) |
| sample = self.get_frame(int(start_frame + i)) |
| sample = audioop.mul(sample, self.sample_width, volume_change) |
|
|
| output.append(sample) |
|
|
| |
| after_fade = self[end:]._data |
| if to_gain != 0: |
| after_fade = audioop.mul(after_fade, |
| self.sample_width, |
| db_to_float(to_gain)) |
| output.append(after_fade) |
|
|
| return self._spawn(data=output) |
|
|
| def fade_out(self, duration): |
| return self.fade(to_gain=-120, duration=duration, end=float('inf')) |
|
|
| def fade_in(self, duration): |
| return self.fade(from_gain=-120, duration=duration, start=0) |
|
|
| def reverse(self): |
| return self._spawn( |
| data=audioop.reverse(self._data, self.sample_width) |
| ) |
|
|
| def _repr_html_(self): |
| src = """ |
| <audio controls> |
| <source src="data:audio/mpeg;base64,{base64}" type="audio/mpeg"/> |
| Your browser does not support the audio element. |
| </audio> |
| """ |
| fh = self.export() |
| data = base64.b64encode(fh.read()).decode('ascii') |
| return src.format(base64=data) |
|
|
|
|
| from . import effects |
|
|