| import os |
| import librosa |
| import soundfile as sf |
| import numpy as np |
| from pathlib import Path |
| import io |
|
|
| |
| |
| class AudioStreamProcessor: |
| def __init__(self, sr=22050, min_silence_duration=0.1, threshold_db=-40): |
| self.sr = sr |
| self.min_silence_duration = min_silence_duration |
| self.threshold_db = threshold_db |
| self.buffer = np.array([]) |
| |
| |
| def process(self, audio_data, last=False): |
| """ |
| Add audio data and process it |
| params: |
| audio_data: audio data in numpy array |
| last: whether this is the last chunk of data |
| returns: |
| Processed audio data, returns None if no split point is found |
| """ |
|
|
| |
| self.buffer = np.concatenate([self.buffer, audio_data]) if len(self.buffer) > 0 else audio_data |
| |
| if last: |
| result = self.buffer |
| self.buffer = np.array([]) |
| return self._to_wav_bytes(result) |
| |
| |
| split_point = self._find_silence_boundary(self.buffer) |
| |
| if split_point is not None: |
| |
| silence_end = self._find_silence_end(split_point) |
| result = self.buffer[:silence_end] |
| self.buffer = self.buffer[silence_end:] |
| return self._to_wav_bytes(result) |
| |
| return None |
| |
| def _find_silence_boundary(self, audio): |
| """ |
| Find the starting point of silence boundary in audio |
| """ |
| |
| db = librosa.amplitude_to_db(np.abs(audio), ref=np.max) |
| |
| |
| silence_points = np.where(db < self.threshold_db)[0] |
| |
| if len(silence_points) == 0: |
| return None |
| |
| |
| min_silence_samples = int(self.min_silence_duration * self.sr) |
| |
| |
| for i in range(len(silence_points) - min_silence_samples, -1, -1): |
| if i < 0: |
| break |
| if np.all(np.diff(silence_points[i:i+min_silence_samples]) == 1): |
| return silence_points[i] |
| |
| return None |
| |
| def _find_silence_end(self, start_point): |
| """ |
| Find the end point of silence segment |
| """ |
| db = librosa.amplitude_to_db(np.abs(self.buffer[start_point:]), ref=np.max) |
| silence_points = np.where(db >= self.threshold_db)[0] |
| |
| if len(silence_points) == 0: |
| return len(self.buffer) |
| |
| return start_point + silence_points[0] |
| |
| def _to_wav_bytes(self, audio_data): |
| """ |
| trans_to_wav_bytes |
| """ |
| wav_buffer = io.BytesIO() |
| sf.write(wav_buffer, audio_data, self.sr, format='WAV') |
| return wav_buffer.getvalue() |
| |
| |
|
|