Spaces:
Sleeping
Sleeping
| import cv2 | |
| import random | |
| import copy | |
| from pyannote.core import Annotation, Segment | |
| import numpy as np | |
| import torch | |
| import torchaudio | |
| import pandas as pd | |
| import datetime as dt | |
| def colors(n): | |
| ''' | |
| Creates a list size n of distinctive colors | |
| ''' | |
| if n == 0: | |
| return [] | |
| ret = [] | |
| h = int(random.random() * 180) | |
| step = 180 / n | |
| for i in range(n): | |
| h += step | |
| h = int(h) % 180 | |
| hsv = np.uint8([[[h,200,200]]]) | |
| bgr = cv2.cvtColor(hsv,cv2.COLOR_HSV2BGR) | |
| ret.append((bgr[0][0][0].item()/255,bgr[0][0][1].item()/255,bgr[0][0][2].item()/255)) | |
| return ret | |
| def colorsCSS(n): | |
| ''' | |
| Creates a list size n of distinctive colors based on CSS formatting | |
| ''' | |
| if n == 0: | |
| return [] | |
| ret = [] | |
| h = int(random.random() * 180) | |
| step = 180 / n | |
| for i in range(n): | |
| h += step | |
| h = int(h) % 180 | |
| hsv = np.uint8([[[h,200,200]]]) | |
| bgr = cv2.cvtColor(hsv,cv2.COLOR_HSV2BGR) | |
| b = f'{bgr[0][0][0].item():02x}' | |
| g = f'{bgr[0][0][1].item():02x}' | |
| r = f'{bgr[0][0][2].item():02x}' | |
| ret.append('#'+b+g+r) | |
| return ret | |
| def extendSpeakers(mySpeakerList, fileLabel = 'NONE', maximumSecondDifference = 1, minimumSecondDuration = 0): | |
| ''' | |
| Assumes mySpeakerList is already split into Speaker/Audience | |
| ''' | |
| mySpeakerAnnotations = Annotation(uri=fileLabel) | |
| newSpeakerList = [[],[]] | |
| for i, speaker in enumerate(mySpeakerList): | |
| speaker.sort() | |
| lastEnd = -1 | |
| tempSection = None | |
| for section in speaker: | |
| if lastEnd == -1: | |
| tempSection = copy.deepcopy(section) | |
| lastEnd = section[0] + section[1] | |
| else: | |
| if section[0] - lastEnd <= maximumSecondDifference: | |
| tempSection = (tempSection[0],max(section[0] + section[1] - tempSection[0],tempSection[1])) | |
| lastEnd = tempSection[0] + tempSection[1] | |
| else: | |
| if tempSection[1] >= minimumSecondDuration: | |
| newSpeakerList[i].append(tempSection) | |
| mySpeakerAnnotations[Segment(tempSection[0],lastEnd)] = i | |
| tempSection = copy.deepcopy(section) | |
| lastEnd = section[0] + section[1] | |
| if tempSection is not None: | |
| # Add the last section back in | |
| if tempSection[1] >= minimumSecondDuration: | |
| newSpeakerList[i].append(tempSection) | |
| mySpeakerAnnotations[Segment(tempSection[0],lastEnd)] = i | |
| return newSpeakerList,mySpeakerAnnotations | |
| def twoClassExtendAnnotation(myAnnotation,maximumSecondDifference = 1, minimumSecondDuration = 0): | |
| lecturerID = None | |
| lecturerLen = 0 | |
| # Identify lecturer | |
| for speakerName in myAnnotation.labels(): | |
| tempLen = len(myAnnotation.label_support(speakerName)) | |
| if tempLen > lecturerLen: | |
| lecturerLen = tempLen | |
| lecturerID = speakerName | |
| tempSpeakerList = [[],[]] | |
| # Recreate speakerList as [[lecturer labels],[audience labels]] | |
| for speakerName in myAnnotation.labels(): | |
| if speakerName != lecturerID: | |
| for segmentItem in myAnnotation.label_support(speakerName): | |
| tempSpeakerList[1].append((segmentItem.start,segmentItem.duration)) | |
| else: | |
| for segmentItem in myAnnotation.label_support(speakerName): | |
| tempSpeakerList[0].append((segmentItem.start,segmentItem.duration)) | |
| newList, newAnnotation = extendSpeakers(tempSpeakerList, fileLabel = myAnnotation.uri, maximumSecondDifference = maximumSecondDifference, minimumSecondDuration = minimumSecondDuration) | |
| return newList, newAnnotation | |
| def loadAudioRTTM(sampleRTTM): | |
| # Read in prediction data | |
| # Data in list form, for convenient plotting | |
| speakerList = [] | |
| # Data in Annotation form, for convenient error rate calculation | |
| prediction = Annotation(uri=sampleRTTM) | |
| with open(sampleRTTM, "r") as rttm: | |
| for line in rttm: | |
| speakerResult = line.split(' ') | |
| index = int(speakerResult[7][-2:]) | |
| start = float(speakerResult[3]) | |
| end = start + float(speakerResult[4]) | |
| while len(speakerList) < index + 1: | |
| speakerList.append([]) | |
| speakerList[index].append((float(speakerResult[3]),float(speakerResult[4]))) | |
| prediction[Segment(start,end)] = speakerResult[7] | |
| return speakerList, prediction | |
| def loadAudioTXT(sampleTXT): | |
| # Read in prediction data | |
| # Data in list form, for convenient plotting | |
| speakerList = [] | |
| # Data in Annotation form, for convenient error rate calculation | |
| prediction = Annotation(uri=sampleTXT) | |
| with open(sampleTXT, "r") as txt: | |
| for line in txt: | |
| speakerResult = line.split('\t') | |
| print(speakerResult) | |
| if len(speakerResult) < 3: | |
| continue | |
| index = -1 | |
| start = float(speakerResult[0]) | |
| end = float(speakerResult[1]) | |
| duration = end - start | |
| prediction[Segment(start,end)] = speakerResult[2] | |
| return [], prediction | |
| def loadAudioCSV(sampleCSV): | |
| # Read in prediction data | |
| df = pd.read_csv(sampleCSV) | |
| df = df.reset_index() # make sure indexes pair with number of rows | |
| # Data in Annotation form, for convenient error rate calculation | |
| prediction = Annotation(uri=sampleCSV) | |
| for i, row in df.iterrows(): | |
| index = row['Resource'] | |
| start = row['Start'] | |
| end = row['Finish'] | |
| prediction[Segment(start,end)] = index | |
| return [], prediction | |
| def splitIntoTimeSegments(testFile,maxDurationInSeconds=60): | |
| waveform, sample_rate = torchaudio.load(testFile) | |
| audioSegments = [] | |
| outOfBoundsIndex = waveform.shape[-1] | |
| currentStart = 0 | |
| currentEnd = min(maxDurationInSeconds * sample_rate,outOfBoundsIndex) | |
| done = False | |
| while(not done): | |
| waveformSegment = waveform[:,currentStart:currentEnd] | |
| audioSegments.append(waveformSegment) | |
| if currentEnd >= outOfBoundsIndex: | |
| done = True | |
| break | |
| else: | |
| currentStart = currentEnd | |
| currentEnd = min(currentStart + maxDurationInSeconds * sample_rate,outOfBoundsIndex) | |
| return audioSegments, sample_rate | |
| def audioNormalize(waveform,sampleRate,stepSizeInSeconds = 2,dbThreshold = -50,dbTarget = -5): | |
| print("In audioNormalize") | |
| copyWaveform = waveform.clone().detach() | |
| print("Waveform copy made") | |
| transform = torchaudio.transforms.AmplitudeToDB(stype="amplitude", top_db=80) | |
| currStart = 0 | |
| currEnd = int(min(currStart + stepSizeInSeconds * sampleRate, len(copyWaveform[0])-1)) | |
| done = False | |
| while(not done): | |
| copyWaveform_db = waveform[:,currStart:currEnd].clone().detach() | |
| copyWaveform_db = transform(copyWaveform_db) | |
| if currStart == 0: | |
| print("First DB level calculated") | |
| if torch.max(copyWaveform_db[0]).item() > dbThreshold: | |
| gain = torch.min(dbTarget - copyWaveform_db[0]) | |
| adjustGain = torchaudio.transforms.Vol(gain,'db') | |
| copyWaveform[0][currStart:currEnd] = adjustGain(copyWaveform[0][currStart:currEnd]) | |
| if len(copyWaveform_db) > 1: | |
| if torch.max(copyWaveform_db[1]).item() > dbThreshold: | |
| gain = torch.min(dbTarget - copyWaveform_db[1]) | |
| adjustGain = torchaudio.transforms.Vol(gain,'db') | |
| copyWaveform[1][currStart:currEnd] = adjustGain(copyWaveform[1][currStart:currEnd]) | |
| currStart += int(stepSizeInSeconds * sampleRate) | |
| if currStart > currEnd: | |
| done = True | |
| else: | |
| currEnd = int(min(currStart + stepSizeInSeconds * sampleRate, len(copyWaveform[0])-1)) | |
| print("Waveform enhanced") | |
| return copyWaveform | |
| class equalizeVolume(torch.nn.Module): | |
| def forward(self, waveform,sampleRate,stepSizeInSeconds,dbThreshold,dbTarget): | |
| print("In equalizeVolume") | |
| waveformDifference = audioNormalize(waveform,sampleRate,stepSizeInSeconds,dbThreshold,dbTarget) | |
| return waveformDifference | |
| def combineWaveforms(waveformList): | |
| return torch.cat(waveformList,1) | |
| def annotationToSpeakerList(myAnnotation): | |
| tempSpeakerList = [] | |
| tempSpeakerNames = [] | |
| for speakerName in myAnnotation.labels(): | |
| speakerIndex = None | |
| if speakerName not in tempSpeakerNames: | |
| speakerIndex = len(tempSpeakerNames) | |
| tempSpeakerNames.append(speakerName) | |
| tempSpeakerList.append([]) | |
| else: | |
| speakerIndex = tempSpeakerNames.index(speakerName) | |
| for segmentItem in myAnnotation.label_support(speakerName): | |
| tempSpeakerList[speakerIndex].append((segmentItem.start,segmentItem.duration)) | |
| return tempSpeakerList | |
| def speakerListToDataFrame(speakerList): | |
| dataList = [] | |
| for j, row in enumerate(speakerList): | |
| for k, speakingPoint in enumerate(row): | |
| h0 = int(speakingPoint[0]//3600) | |
| m0 = int(speakingPoint[0]%3600//60) | |
| s0 = int(speakingPoint[0]%60) | |
| ms0 = int(speakingPoint[0]*1000000%1000000) | |
| time0 = dt.time(h0,m0,s0,ms0) | |
| dtStart = dt.datetime.combine(dt.date.today(), time0) | |
| endPoint = speakingPoint[0] + speakingPoint[1] | |
| h1 = int(endPoint//3600) | |
| m1 = int(endPoint%3600//60) | |
| s1 = int(endPoint%60) | |
| ms1 = int(endPoint*1000000%1000000) | |
| time1 = dt.time(h1,m1,s1,ms1) | |
| dtEnd = dt.datetime.combine(dt.date.today(), time1) | |
| dataList.append(dict(Task=f"Speaker {j}.{k}", Start=dtStart, Finish=dtEnd, Resource=f"Speaker {j+1}")) | |
| df = pd.DataFrame(dataList) | |
| return df | |
| def removeOverlap(timeSegment,overlap): | |
| times = [] | |
| if timeSegment.start < overlap.start: | |
| times.append(Segment(timeSegment.start,min(overlap.start,timeSegment.end))) | |
| if timeSegment.end > overlap.end: | |
| times.append(Segment(max(timeSegment.start,overlap.end),timeSegment.end)) | |
| return times | |
| def checkForOverlap(time1, time2): | |
| overlap = time1 & time2 | |
| if overlap: | |
| return overlap | |
| else: | |
| return None | |
| def sumSegments(segmentList): | |
| total = 0 | |
| for s in segmentList: | |
| total += s.duration | |
| return total | |
| def sumTimes(myAnnotation): | |
| return myAnnotation.get_timeline(False).duration() | |
| def sumTimesPerSpeaker(myAnnotation): | |
| speakerList = [] | |
| timeList = [] | |
| for speaker in myAnnotation.labels(): | |
| if speaker not in speakerList: | |
| speakerList.append(speaker) | |
| timeList.append(0) | |
| timeList[speakerList.index(speaker)] += sumTimes(myAnnotation.subset([speaker])) | |
| return speakerList, timeList | |
| def sumMultiTimesPerSpeaker(myAnnotation): | |
| speakerList = [] | |
| timeList = [] | |
| sList,tList = sumTimesPerSpeaker(myAnnotation) | |
| for i,speakerGroup in enumerate(sList): | |
| speakerSplit = speakerGroup.split('+') | |
| for speaker in speakerSplit: | |
| if speaker not in speakerList: | |
| speakerList.append(speaker) | |
| timeList.append(0) | |
| timeList[speakerList.index(speaker)] += tList[i] | |
| return speakerList, timeList | |
| def annotationToDataFrame(myAnnotation): | |
| dataList = [] | |
| speakerDict = {} | |
| for currSpeaker in myAnnotation.labels(): | |
| if currSpeaker not in speakerDict.keys(): | |
| speakerDict[currSpeaker] = [] | |
| for currSegment in myAnnotation.subset([currSpeaker]).itersegments(): | |
| speakerDict[currSpeaker].append(currSegment) | |
| timeSummary = {} | |
| for key in speakerDict.keys(): | |
| if key not in timeSummary.keys(): | |
| timeSummary[key] = 0 | |
| for speakingSegment in speakerDict[key]: | |
| timeSummary[key] += speakingSegment.duration | |
| for key in speakerDict.keys(): | |
| for k, speakingSegment in enumerate(speakerDict[key]): | |
| speakerName = key | |
| startPoint = speakingSegment.start | |
| endPoint = speakingSegment.end | |
| h0 = int(startPoint//3600) | |
| m0 = int(startPoint%3600//60) | |
| s0 = int(startPoint%60) | |
| ms0 = int(startPoint*1000000%1000000) | |
| time0 = dt.time(h0,m0,s0,ms0) | |
| dtStart = dt.datetime.combine(dt.date.today(), time0) | |
| h1 = int(endPoint//3600) | |
| m1 = int(endPoint%3600//60) | |
| s1 = int(endPoint%60) | |
| ms1 = int(endPoint*1000000%1000000) | |
| time1 = dt.time(h1,m1,s1,ms1) | |
| dtEnd = dt.datetime.combine(dt.date.today(), time1) | |
| dataList.append(dict(Task=speakerName + f".{k}", Start=dtStart, Finish=dtEnd, Resource=speakerName)) | |
| df = pd.DataFrame(dataList) | |
| return df, timeSummary | |
| def annotationToSimpleDataFrame(myAnnotation): | |
| dataList = [] | |
| speakerDict = {} | |
| for currSpeaker in myAnnotation.labels(): | |
| if currSpeaker not in speakerDict.keys(): | |
| speakerDict[currSpeaker] = [] | |
| for currSegment in myAnnotation.subset([currSpeaker]).itersegments(): | |
| speakerDict[currSpeaker].append(currSegment) | |
| timeSummary = {} | |
| for key in speakerDict.keys(): | |
| if key not in timeSummary.keys(): | |
| timeSummary[key] = 0 | |
| for speakingSegment in speakerDict[key]: | |
| timeSummary[key] += speakingSegment.duration | |
| for key in speakerDict.keys(): | |
| for k, speakingSegment in enumerate(speakerDict[key]): | |
| speakerName = key | |
| startPoint = speakingSegment.start | |
| endPoint = speakingSegment.end | |
| dataList.append(dict(Task=speakerName + f".{k}", Start=startPoint, Finish=endPoint, Resource=speakerName)) | |
| df = pd.DataFrame(dataList) | |
| return df, timeSummary | |
| def calcCategories(myAnnotation,categories): | |
| categorySlots = [] | |
| extraCategories = [] | |
| for category in categories: | |
| categorySlots.append([]) | |
| for speaker in myAnnotation.labels(): | |
| targetCategory = None | |
| for i, category in enumerate(categories): | |
| if speaker in category: | |
| targetCategory = i | |
| if targetCategory is None: | |
| targetCategory = len(categorySlots) | |
| categorySlots.append([]) | |
| extraCategories.append(speaker) | |
| for timeSegment in myAnnotation.subset([speaker]).itersegments(): | |
| categorySlots[targetCategory].append((speaker,timeSegment)) | |
| # Clean up categories | |
| cleanCategories = [] | |
| for category in categorySlots: | |
| newCategory = [] | |
| catSorted = copy.deepcopy(sorted(category,key=lambda cSegment: cSegment[1].start)) | |
| currID, currSegment = None, None | |
| if len(catSorted) > 0: | |
| currID, currSegment = catSorted[0] | |
| for sp, segmentSlot in catSorted[1:]: | |
| overlapTime = checkForOverlap(currSegment,segmentSlot) | |
| if overlapTime is None: | |
| newCategory.append((currID,currSegment)) | |
| currID = sp | |
| currTime = segmentSlot | |
| else: | |
| currID = currID + "+" + sp | |
| # Union of segments | |
| currTime[1] = currSegment | segmentSlot | |
| if currSegment is not None: | |
| newCategory.append((currID,currSegment)) | |
| cleanCategories.append(newCategory) | |
| return cleanCategories,extraCategories | |
| def calcSpeakingTypes(myAnnotation,maxTime): | |
| noVoice = [Segment(0,maxTime)] | |
| oneVoice = [] | |
| multiVoice = [] | |
| for speaker in myAnnotation.labels(): | |
| timesToProcess = [] | |
| for timeSegment in myAnnotation.subset([speaker]).itersegments(): | |
| timesToProcess.append((speaker,timeSegment)) | |
| while len(timesToProcess) > 0: | |
| currID, currSegment = timesToProcess[0] | |
| timesToProcess.remove(timesToProcess[0]) | |
| resetCheck = False | |
| # Check in multi | |
| for compareID,timeSegment in multiVoice: | |
| overlapTime = checkForOverlap(currSegment,timeSegment) | |
| if overlapTime is None: | |
| continue | |
| else: | |
| compareID.append(currID) | |
| newTimes = removeOverlap(currSegment,timeSegment) | |
| for i in range(len(newTimes)): | |
| newTimes[i] = (currID,newTimes[i]) | |
| timesToProcess += newTimes | |
| resetCheck = True | |
| break | |
| if resetCheck: | |
| continue | |
| # Check in one voice | |
| for timeSlot in oneVoice: | |
| tID = timeSlot[0] | |
| tSegment = timeSlot[1] | |
| overlapTime = checkForOverlap(currSegment,tSegment) | |
| if overlapTime is None: | |
| continue | |
| else: | |
| oneVoice.remove(timeSlot) | |
| # Add back non overlap | |
| newTimes = removeOverlap(tSegment,currSegment) | |
| for i in range(len(newTimes)): | |
| newTimes[i] = (tID,newTimes[i]) | |
| oneVoice += newTimes | |
| # Add overlap time to multivoice | |
| multiVoice.append(([tID,currID],overlapTime)) | |
| # Add new times back to process | |
| newTimes = removeOverlap(currSegment,tSegment) | |
| for i in range(len(newTimes)): | |
| newTimes[i] = (currID,newTimes[i]) | |
| timesToProcess += newTimes | |
| resetCheck = True | |
| break | |
| if resetCheck: | |
| continue | |
| # Add to one voice | |
| oneVoice.append((currID,currSegment)) | |
| ovAnnotation = Annotation() | |
| mvAnnotation = Annotation() | |
| for currID,timeSlot in multiVoice: | |
| currIDString = '+'.join(currID) | |
| mvAnnotation[timeSlot] = currIDString | |
| copyOfNo = copy.deepcopy(noVoice) | |
| for emptySlot in noVoice: | |
| if checkForOverlap(timeSlot,emptySlot) is None: | |
| continue | |
| else: | |
| copyOfNo.remove(emptySlot) | |
| copyOfNo += removeOverlap(emptySlot,timeSlot) | |
| noVoice = copyOfNo | |
| for currID,timeSlot in oneVoice: | |
| ovAnnotation[timeSlot] = currID | |
| copyOfNo = copy.deepcopy(noVoice) | |
| for emptySlot in noVoice: | |
| if checkForOverlap(timeSlot,emptySlot) is None: | |
| continue | |
| else: | |
| copyOfNo.remove(emptySlot) | |
| copyOfNo += removeOverlap(emptySlot,timeSlot) | |
| noVoice = copyOfNo | |
| nvAnnotation = Annotation() | |
| for emptySlot in noVoice: | |
| nvAnnotation[emptySlot] = "None" | |
| return nvAnnotation, ovAnnotation, mvAnnotation | |
| def timeToString(timeInSeconds): | |
| if isinstance(timeInSeconds,list): | |
| return [timeToString(t) for t in timeInSeconds] | |
| else: | |
| h = int(timeInSeconds//3600) | |
| m = int(timeInSeconds%3600//60) | |
| s = timeInSeconds%60 | |
| return f'{h:02d}::{m:02d}::{s:02.2f}' |