Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| import sonogram_utility as su | |
| from pyannote.audio import Pipeline | |
| import pickle | |
| import torch | |
| class Sonogram(): | |
| def __init__(self,enableDenoise=False): | |
| ''' | |
| Initialize Sonogram Class | |
| enableDenoise : False|True | |
| Legacy code to support denoise, which has currently been removed. Consider removing if denoise will not be reimplemented in the future. | |
| ''' | |
| #TODO: Should these be adjustable via initialization, or constants? | |
| self.secondDifference = 5 | |
| self.gainWindow = 4 | |
| self.minimumGain = -45 | |
| self.maximumGain = -5 | |
| self.attenLimDB = 3 | |
| self.earlyCleanup = True | |
| self.isTPU = False | |
| self.isGPU = False | |
| try: | |
| raise(RuntimeError("Not an error")) | |
| #device = xm.xla_device() | |
| print("TPU is available.") | |
| self.isTPU = True | |
| except RuntimeError as e: | |
| print(f"TPU is not available: {e}") | |
| # Fallback to CPU or other devices if needed | |
| self.isGPU = torch.cuda.is_available() | |
| if not self.isGPU: | |
| print(f"GPU is not available") | |
| self.device = torch.device("cuda" if self.isGPU else "cpu") | |
| print(f"Using {self.device} instead.") | |
| self.pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1") | |
| self.pipeline.to(self.device) | |
| # Load SVM classifier | |
| with open('groupClassifier.pkl', 'rb') as f: | |
| self.groupClassifier = pickle.load(f) | |
| def processFile(self,filePath): | |
| ''' | |
| Processes audio file to generate diarization output | |
| filePath : string | |
| Path to the audio file | |
| Returns | |
| -------- | |
| diarizationOutput : DiarizeOutput | |
| found here https://github.com/pyannote/pyannote-audio/blob/main/src/pyannote/audio/pipelines/speaker_diarization.py#L64 | |
| totalTimeInSeconds : int | |
| Approximate total seconds of audio file | |
| waveformGainAdjusted : np.array | |
| The waveform of the audio file after equalization | |
| sampleRate : int | |
| The sample rate of the audio file | |
| ''' | |
| print(f"Loading file : {filePath}") | |
| waveformList, sampleRate = su.splitIntoTimeSegments(filePath,600) | |
| print("File loaded") | |
| waveformEnhanced = su.combineWaveforms(waveformList) | |
| if (self.earlyCleanup): | |
| del waveformList | |
| print("Equalizing Audio") | |
| waveform_gain_adjusted = su.equalizeVolume()(waveformEnhanced,sampleRate,self.gainWindow,self.minimumGain,self.maximumGain) | |
| if (self.earlyCleanup): | |
| del waveformEnhanced | |
| print("Audio Equalized") | |
| print("Detecting speakers") | |
| diarizationOutput, embeddings = self.pipeline({"waveform": waveform_gain_adjusted, "sample_rate": sampleRate}, return_embeddings=True) | |
| annotations = diarizationOutput.speaker_diarization | |
| embeddings = diarizationOutput.speaker_embeddings | |
| print("Speakers Detected") | |
| totalTimeInSeconds = int(waveform_gain_adjusted.shape[-1]/sampleRate) | |
| print("Time in seconds calculated") | |
| return diarizationOutput, totalTimeInSeconds, waveformGainAdjusted, sampleRate | |
| def __call__(self,audioPath): | |
| ''' | |
| Processes audio file to generate results necessary for app | |
| filePath : string | |
| Path to the audio file | |
| Returns | |
| -------- | |
| annotation : pyannote.core.annotation | |
| found here https://pyannote.github.io/pyannote-core/_modules/pyannote/core/annotation.html | |
| totalTimeInSeconds : int | |
| Approximate total seconds of audio file | |
| waveformGainAdjusted : np.array | |
| The waveform of the audio file after equalization | |
| sampleRate : int | |
| The sample rate of the audio file | |
| ''' | |
| diarizationOutput, totalTimeInSeconds, waveformGainAdjusted, sampleRate = self.processFile(audioPath) | |
| annotation = diarizationOutput.speaker_diarization | |
| # Relabel any existing silence and group speakers | |
| labelMapping = {} | |
| for s, speaker in enumerate(output.speaker_diarization.labels()): | |
| diarizationOutput.speaker_embeddings[s] | |
| prediction = self.groupClassifier.predict(diarizationOutput.speaker_embeddings[s].reshape(1,-1)) | |
| if prediction == 0: | |
| labelMapping[speaker] = "silence" | |
| elif prediction == 2: | |
| labelMapping[speaker] = "group" | |
| else: | |
| # May not be necessary, consider using to reformat default names away from SPEAKER_XX | |
| labelMapping[speaker] = speaker | |
| # Rename in place | |
| annotation.rename_labels(labelMapping) | |
| return annotation, totalTimeInSeconds, waveformGainAdjusted, sampleRate |