czyoung commited on
Commit
ac9c2ba
·
verified ·
1 Parent(s): 12b447c

Reformat sonogram.py and create sonogram class

Browse files

Replaced unused sonogram file with sonogram class. This should be used to help clean up the app.py file and decouple the code.

Files changed (2) hide show
  1. sonogram +0 -96
  2. sonogram.py +77 -0
sonogram DELETED
@@ -1,96 +0,0 @@
1
- import cv2
2
- import random
3
- import copy
4
- from pyannote.core import Annotation, Segment
5
-
6
- def colors(n):
7
- '''
8
- Creates a list size n of distinctive colors
9
- '''
10
- if n == 0:
11
- return []
12
- ret = []
13
- h = int(random.random() * 180)
14
- step = 180 / n
15
- for i in range(n):
16
- h += step
17
- h = int(h) % 180
18
- hsv = np.uint8([[[h,200,200]]])
19
- bgr = cv2.cvtColor(hsv,cv2.COLOR_HSV2BGR)
20
- ret.append((bgr[0][0][0].item()/255,bgr[0][0][1].item()/255,bgr[0][0][2].item()/255))
21
- return ret
22
-
23
- def extendSpeakers(mySpeakerList, fileLabel = 'NONE', maximumSecondDifference = 1, minimumSecondDuration = 0):
24
- '''
25
- Assumes mySpeakerList is already split into Speaker/Audience
26
- '''
27
- mySpeakerAnnotations = Annotation(uri=fileLabel)
28
- newSpeakerList = [[],[]]
29
- for i, speaker in enumerate(mySpeakerList):
30
- speaker.sort()
31
- lastEnd = -1
32
- tempSection = None
33
- for section in speaker:
34
- if lastEnd == -1:
35
- tempSection = copy.deepcopy(section)
36
- lastEnd = section[0] + section[1]
37
- else:
38
- if section[0] - lastEnd <= maximumSecondDifference:
39
- tempSection = (tempSection[0],max(section[0] + section[1] - tempSection[0],tempSection[1]))
40
- lastEnd = tempSection[0] + tempSection[1]
41
- else:
42
- if tempSection[1] >= minimumSecondDuration:
43
- newSpeakerList[i].append(tempSection)
44
- mySpeakerAnnotations[Segment(tempSection[0],lastEnd)] = i
45
- tempSection = copy.deepcopy(section)
46
- lastEnd = section[0] + section[1]
47
- if tempSection is not None:
48
- # Add the last section back in
49
- if tempSection[1] >= minimumSecondDuration:
50
- newSpeakerList[i].append(tempSection)
51
- mySpeakerAnnotations[Segment(tempSection[0],lastEnd)] = i
52
- return newSpeakerList,mySpeakerAnnotations
53
-
54
- def twoClassExtendAnnotation(myAnnotation,maximumSecondDifference = 1, minimumSecondDuration = 0):
55
- lecturerID = None
56
- lecturerLen = 0
57
-
58
- # Identify lecturer
59
- for speakerName in myAnnotation.labels():
60
- tempLen = len(myAnnotation.label_support(speakerName))
61
- if tempLen > lecturerLen:
62
- lecturerLen = tempLen
63
- lecturerID = speakerName
64
-
65
- tempSpeakerList = [[],[]]
66
- # Recreate speakerList as [[lecturer labels],[audience labels]]
67
- for speakerName in myAnnotation.labels():
68
- if speakerName != lecturerID:
69
- for segmentItem in myAnnotation.label_support(speakerName):
70
- tempSpeakerList[1].append((segmentItem.start,segmentItem.duration))
71
- else:
72
- for segmentItem in myAnnotation.label_support(speakerName):
73
- tempSpeakerList[0].append((segmentItem.start,segmentItem.duration))
74
-
75
- newList, newAnnotation = extendSpeakers(tempSpeakerList, fileLabel = myAnnotation.uri, maximumSecondDifference = maximumSecondDifference, minimumSecondDuration = minimumSecondDuration)
76
-
77
- return newList, newAnnotation
78
-
79
- def loadAudioRTTM(sampleRTTM):
80
- # Read in prediction data
81
- # Data in list form, for convenient plotting
82
- speakerList = []
83
- # Data in Annotation form, for convenient error rate calculation
84
- prediction = Annotation(uri=sampleRTTM)
85
- with open(sampleRTTM, "r") as rttm:
86
- for line in rttm:
87
- speakerResult = line.split(' ')
88
- index = int(speakerResult[7][-2:])
89
- start = float(speakerResult[3])
90
- end = start + float(speakerResult[4])
91
- while len(speakerList) < index + 1:
92
- speakerList.append([])
93
- speakerList[index].append((float(speakerResult[3]),float(speakerResult[4])))
94
- prediction[Segment(start,end)] = index
95
-
96
- return speakerList, prediction
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
sonogram.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sonogram_utility as su
2
+ from pyannote.audio import Pipeline
3
+ import pickle
4
+
5
+ class Sonogram():
6
+
7
+ def __init__(self,enableDenoise=False):
8
+ #TODO: Should these be adjustable via initialization, or constants?
9
+ self.secondDifference = 5
10
+ self.gainWindow = 4
11
+ self.minimumGain = -45
12
+ self.maximumGain = -5
13
+ self.attenLimDB = 3
14
+ self.earlyCleanup = True
15
+
16
+ self.isTPU = False
17
+ self.isGPU = False
18
+ try:
19
+ raise(RuntimeError("Not an error"))
20
+ #device = xm.xla_device()
21
+ print("TPU is available.")
22
+ self.isTPU = True
23
+ except RuntimeError as e:
24
+ print(f"TPU is not available: {e}")
25
+ # Fallback to CPU or other devices if needed
26
+ self.isGPU = torch.cuda.is_available()
27
+ if not self.isGPU:
28
+ print(f"GPU is not available")
29
+ self.device = torch.device("cuda" if self.isGPU else "cpu")
30
+ print(f"Using {device} instead.")
31
+
32
+ self.pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1")
33
+ self.pipeline.to(self.device)
34
+
35
+ # Load SVM classifier
36
+ with open('groupClassifier.pkl', 'rb') as f:
37
+ self.groupClassifier = pickle.load(f)
38
+
39
+ def processFile(self,filePath):
40
+ print("Loading file")
41
+ waveformList, sampleRate = su.splitIntoTimeSegments(filePath,600)
42
+ print("File loaded")
43
+ waveformEnhanced = su.combineWaveforms(waveformList)
44
+ if (self.earlyCleanup):
45
+ del waveformList
46
+ print("Equalizing Audio")
47
+ waveform_gain_adjusted = su.equalizeVolume()(waveformEnhanced,sampleRate,self.gainWindow,self.minimumGain,self.maximumGain)
48
+ if (self.earlyCleanup):
49
+ del waveformEnhanced
50
+ print("Audio Equalized")
51
+ print("Detecting speakers")
52
+ diarizationOutput, embeddings = self.pipeline({"waveform": waveform_gain_adjusted, "sample_rate": sampleRate}, return_embeddings=True)
53
+ annotations = diarizationOutput.speaker_diarization
54
+ embeddings = diarizationOutput.speaker_embeddings
55
+ print("Speakers Detected")
56
+ totalTimeInSeconds = int(waveform_gain_adjusted.shape[-1]/sampleRate)
57
+ print("Time in seconds calculated")
58
+ return diarizationOutput, totalTimeInSeconds, waveformGainAdjusted, sampleRate
59
+
60
+ def __call__(self,audioPath):
61
+ diarizationOutput, totalTimeInSeconds, waveformGainAdjusted, sampleRate = self.processFile(audioPath)
62
+ annotation = diarizationOutput.speaker_diarization
63
+
64
+ # Relabel any existing silence and group speakers
65
+ labelMapping = {}
66
+ for s, speaker in enumerate(output.speaker_diarization.labels()):
67
+ diarizationOutput.speaker_embeddings[s]
68
+ prediction = self.groupClassifier.predict(diarizationOutput.speaker_embeddings[s].reshape(1,-1))
69
+ if prediction == 0:
70
+ labelMapping[speaker] = "silence"
71
+ elif prediction == 2:
72
+ labelMapping[speaker] = "group"
73
+ else:
74
+ # May not be necessary, consider using to reformat default names away from SPEAKER_XX
75
+ labelMapping[speaker] = speaker
76
+ annotation.rename_labels(labelMapping)
77
+ return annotation, totalTimeInSeconds, waveformGainAdjusted, sampleRate