import cv2 import random import copy from pyannote.core import Annotation, Segment import numpy as np import torch import torchaudio import pandas as pd import datetime as dt def colors(n): ''' Creates a list size n of distinctive colors ''' if n == 0: return [] ret = [] h = int(random.random() * 180) step = 180 / n for i in range(n): h += step h = int(h) % 180 hsv = np.uint8([[[h,200,200]]]) bgr = cv2.cvtColor(hsv,cv2.COLOR_HSV2BGR) ret.append((bgr[0][0][0].item()/255,bgr[0][0][1].item()/255,bgr[0][0][2].item()/255)) return ret def colorsCSS(n): ''' Creates a list size n of distinctive colors based on CSS formatting ''' if n == 0: return [] ret = [] h = int(random.random() * 180) step = 180 / n for i in range(n): h += step h = int(h) % 180 hsv = np.uint8([[[h,200,200]]]) bgr = cv2.cvtColor(hsv,cv2.COLOR_HSV2BGR) b = f'{bgr[0][0][0].item():02x}' g = f'{bgr[0][0][1].item():02x}' r = f'{bgr[0][0][2].item():02x}' ret.append('#'+b+g+r) return ret def extendSpeakers(mySpeakerList, fileLabel = 'NONE', maximumSecondDifference = 1, minimumSecondDuration = 0): ''' Assumes mySpeakerList is already split into Speaker/Audience ''' mySpeakerAnnotations = Annotation(uri=fileLabel) newSpeakerList = [[],[]] for i, speaker in enumerate(mySpeakerList): speaker.sort() lastEnd = -1 tempSection = None for section in speaker: if lastEnd == -1: tempSection = copy.deepcopy(section) lastEnd = section[0] + section[1] else: if section[0] - lastEnd <= maximumSecondDifference: tempSection = (tempSection[0],max(section[0] + section[1] - tempSection[0],tempSection[1])) lastEnd = tempSection[0] + tempSection[1] else: if tempSection[1] >= minimumSecondDuration: newSpeakerList[i].append(tempSection) mySpeakerAnnotations[Segment(tempSection[0],lastEnd)] = i tempSection = copy.deepcopy(section) lastEnd = section[0] + section[1] if tempSection is not None: # Add the last section back in if tempSection[1] >= minimumSecondDuration: newSpeakerList[i].append(tempSection) mySpeakerAnnotations[Segment(tempSection[0],lastEnd)] = i return newSpeakerList,mySpeakerAnnotations def twoClassExtendAnnotation(myAnnotation,maximumSecondDifference = 1, minimumSecondDuration = 0): lecturerID = None lecturerLen = 0 # Identify lecturer for speakerName in myAnnotation.labels(): tempLen = len(myAnnotation.label_support(speakerName)) if tempLen > lecturerLen: lecturerLen = tempLen lecturerID = speakerName tempSpeakerList = [[],[]] # Recreate speakerList as [[lecturer labels],[audience labels]] for speakerName in myAnnotation.labels(): if speakerName != lecturerID: for segmentItem in myAnnotation.label_support(speakerName): tempSpeakerList[1].append((segmentItem.start,segmentItem.duration)) else: for segmentItem in myAnnotation.label_support(speakerName): tempSpeakerList[0].append((segmentItem.start,segmentItem.duration)) newList, newAnnotation = extendSpeakers(tempSpeakerList, fileLabel = myAnnotation.uri, maximumSecondDifference = maximumSecondDifference, minimumSecondDuration = minimumSecondDuration) return newList, newAnnotation def loadAudioRTTM(sampleRTTM): # Read in prediction data # Data in list form, for convenient plotting speakerList = [] # Data in Annotation form, for convenient error rate calculation prediction = Annotation(uri=sampleRTTM) with open(sampleRTTM, "r") as rttm: for line in rttm: speakerResult = line.split(' ') index = int(speakerResult[7][-2:]) start = float(speakerResult[3]) end = start + float(speakerResult[4]) while len(speakerList) < index + 1: speakerList.append([]) speakerList[index].append((float(speakerResult[3]),float(speakerResult[4]))) prediction[Segment(start,end)] = speakerResult[7] return speakerList, prediction def loadAudioTXT(sampleTXT): # Read in prediction data # Data in list form, for convenient plotting speakerList = [] # Data in Annotation form, for convenient error rate calculation prediction = Annotation(uri=sampleTXT) with open(sampleTXT, "r") as txt: for line in txt: speakerResult = line.split('\t') print(speakerResult) if len(speakerResult) < 3: continue index = -1 start = float(speakerResult[0]) end = float(speakerResult[1]) duration = end - start prediction[Segment(start,end)] = speakerResult[2] return [], prediction def loadAudioCSV(sampleCSV): # Read in prediction data df = pd.read_csv(sampleCSV) df = df.reset_index() # make sure indexes pair with number of rows # Data in Annotation form, for convenient error rate calculation prediction = Annotation(uri=sampleCSV) for i, row in df.iterrows(): index = row['Resource'] start = row['Start'] end = row['Finish'] prediction[Segment(start,end)] = index return [], prediction def splitIntoTimeSegments(testFile,maxDurationInSeconds=60): waveform, sample_rate = torchaudio.load(testFile) audioSegments = [] outOfBoundsIndex = waveform.shape[-1] currentStart = 0 currentEnd = min(maxDurationInSeconds * sample_rate,outOfBoundsIndex) done = False while(not done): waveformSegment = waveform[:,currentStart:currentEnd] audioSegments.append(waveformSegment) if currentEnd >= outOfBoundsIndex: done = True break else: currentStart = currentEnd currentEnd = min(currentStart + maxDurationInSeconds * sample_rate,outOfBoundsIndex) return audioSegments, sample_rate def audioNormalize(waveform,sampleRate,stepSizeInSeconds = 2,dbThreshold = -50,dbTarget = -5): print("In audioNormalize") copyWaveform = waveform.clone().detach() print("Waveform copy made") transform = torchaudio.transforms.AmplitudeToDB(stype="amplitude", top_db=80) currStart = 0 currEnd = int(min(currStart + stepSizeInSeconds * sampleRate, len(copyWaveform[0])-1)) done = False while(not done): copyWaveform_db = waveform[:,currStart:currEnd].clone().detach() copyWaveform_db = transform(copyWaveform_db) if currStart == 0: print("First DB level calculated") if torch.max(copyWaveform_db[0]).item() > dbThreshold: gain = torch.min(dbTarget - copyWaveform_db[0]) adjustGain = torchaudio.transforms.Vol(gain,'db') copyWaveform[0][currStart:currEnd] = adjustGain(copyWaveform[0][currStart:currEnd]) if len(copyWaveform_db) > 1: if torch.max(copyWaveform_db[1]).item() > dbThreshold: gain = torch.min(dbTarget - copyWaveform_db[1]) adjustGain = torchaudio.transforms.Vol(gain,'db') copyWaveform[1][currStart:currEnd] = adjustGain(copyWaveform[1][currStart:currEnd]) currStart += int(stepSizeInSeconds * sampleRate) if currStart > currEnd: done = True else: currEnd = int(min(currStart + stepSizeInSeconds * sampleRate, len(copyWaveform[0])-1)) print("Waveform enhanced") return copyWaveform class equalizeVolume(torch.nn.Module): def forward(self, waveform,sampleRate,stepSizeInSeconds,dbThreshold,dbTarget): print("In equalizeVolume") waveformDifference = audioNormalize(waveform,sampleRate,stepSizeInSeconds,dbThreshold,dbTarget) return waveformDifference def combineWaveforms(waveformList): return torch.cat(waveformList,1) def annotationToSpeakerList(myAnnotation): tempSpeakerList = [] tempSpeakerNames = [] for speakerName in myAnnotation.labels(): speakerIndex = None if speakerName not in tempSpeakerNames: speakerIndex = len(tempSpeakerNames) tempSpeakerNames.append(speakerName) tempSpeakerList.append([]) else: speakerIndex = tempSpeakerNames.index(speakerName) for segmentItem in myAnnotation.label_support(speakerName): tempSpeakerList[speakerIndex].append((segmentItem.start,segmentItem.duration)) return tempSpeakerList def speakerListToDataFrame(speakerList): dataList = [] for j, row in enumerate(speakerList): for k, speakingPoint in enumerate(row): h0 = int(speakingPoint[0]//3600) m0 = int(speakingPoint[0]%3600//60) s0 = int(speakingPoint[0]%60) ms0 = int(speakingPoint[0]*1000000%1000000) time0 = dt.time(h0,m0,s0,ms0) dtStart = dt.datetime.combine(dt.date.today(), time0) endPoint = speakingPoint[0] + speakingPoint[1] h1 = int(endPoint//3600) m1 = int(endPoint%3600//60) s1 = int(endPoint%60) ms1 = int(endPoint*1000000%1000000) time1 = dt.time(h1,m1,s1,ms1) dtEnd = dt.datetime.combine(dt.date.today(), time1) dataList.append(dict(Task=f"Speaker {j}.{k}", Start=dtStart, Finish=dtEnd, Resource=f"Speaker {j+1}")) df = pd.DataFrame(dataList) return df def removeOverlap(timeSegment,overlap): times = [] if timeSegment.start < overlap.start: times.append(Segment(timeSegment.start,min(overlap.start,timeSegment.end))) if timeSegment.end > overlap.end: times.append(Segment(max(timeSegment.start,overlap.end),timeSegment.end)) return times def checkForOverlap(time1, time2): overlap = time1 & time2 if overlap: return overlap else: return None def sumSegments(segmentList): total = 0 for s in segmentList: total += s.duration return total def sumTimes(myAnnotation): return myAnnotation.get_timeline(False).duration() def sumTimesPerSpeaker(myAnnotation): speakerList = [] timeList = [] for speaker in myAnnotation.labels(): if speaker not in speakerList: speakerList.append(speaker) timeList.append(0) timeList[speakerList.index(speaker)] += sumTimes(myAnnotation.subset([speaker])) return speakerList, timeList def sumMultiTimesPerSpeaker(myAnnotation): speakerList = [] timeList = [] sList,tList = sumTimesPerSpeaker(myAnnotation) for i,speakerGroup in enumerate(sList): speakerSplit = speakerGroup.split('+') for speaker in speakerSplit: if speaker not in speakerList: speakerList.append(speaker) timeList.append(0) timeList[speakerList.index(speaker)] += tList[i] return speakerList, timeList def annotationToDataFrame(myAnnotation): dataList = [] speakerDict = {} for currSpeaker in myAnnotation.labels(): if currSpeaker not in speakerDict.keys(): speakerDict[currSpeaker] = [] for currSegment in myAnnotation.subset([currSpeaker]).itersegments(): speakerDict[currSpeaker].append(currSegment) timeSummary = {} for key in speakerDict.keys(): if key not in timeSummary.keys(): timeSummary[key] = 0 for speakingSegment in speakerDict[key]: timeSummary[key] += speakingSegment.duration for key in speakerDict.keys(): for k, speakingSegment in enumerate(speakerDict[key]): speakerName = key startPoint = speakingSegment.start endPoint = speakingSegment.end h0 = int(startPoint//3600) m0 = int(startPoint%3600//60) s0 = int(startPoint%60) ms0 = int(startPoint*1000000%1000000) time0 = dt.time(h0,m0,s0,ms0) dtStart = dt.datetime.combine(dt.date.today(), time0) h1 = int(endPoint//3600) m1 = int(endPoint%3600//60) s1 = int(endPoint%60) ms1 = int(endPoint*1000000%1000000) time1 = dt.time(h1,m1,s1,ms1) dtEnd = dt.datetime.combine(dt.date.today(), time1) dataList.append(dict(Task=speakerName + f".{k}", Start=dtStart, Finish=dtEnd, Resource=speakerName)) df = pd.DataFrame(dataList) return df, timeSummary def annotationToSimpleDataFrame(myAnnotation): dataList = [] speakerDict = {} for currSpeaker in myAnnotation.labels(): if currSpeaker not in speakerDict.keys(): speakerDict[currSpeaker] = [] for currSegment in myAnnotation.subset([currSpeaker]).itersegments(): speakerDict[currSpeaker].append(currSegment) timeSummary = {} for key in speakerDict.keys(): if key not in timeSummary.keys(): timeSummary[key] = 0 for speakingSegment in speakerDict[key]: timeSummary[key] += speakingSegment.duration for key in speakerDict.keys(): for k, speakingSegment in enumerate(speakerDict[key]): speakerName = key startPoint = speakingSegment.start endPoint = speakingSegment.end dataList.append(dict(Task=speakerName + f".{k}", Start=startPoint, Finish=endPoint, Resource=speakerName)) df = pd.DataFrame(dataList) return df, timeSummary def calcCategories(myAnnotation,categories): categorySlots = [] extraCategories = [] for category in categories: categorySlots.append([]) for speaker in myAnnotation.labels(): targetCategory = None for i, category in enumerate(categories): if speaker in category: targetCategory = i if targetCategory is None: targetCategory = len(categorySlots) categorySlots.append([]) extraCategories.append(speaker) for timeSegment in myAnnotation.subset([speaker]).itersegments(): categorySlots[targetCategory].append((speaker,timeSegment)) # Clean up categories cleanCategories = [] for category in categorySlots: newCategory = [] catSorted = copy.deepcopy(sorted(category,key=lambda cSegment: cSegment[1].start)) currID, currSegment = None, None if len(catSorted) > 0: currID, currSegment = catSorted[0] for sp, segmentSlot in catSorted[1:]: overlapTime = checkForOverlap(currSegment,segmentSlot) if overlapTime is None: newCategory.append((currID,currSegment)) currID = sp currTime = segmentSlot else: currID = currID + "+" + sp # Union of segments currTime[1] = currSegment | segmentSlot if currSegment is not None: newCategory.append((currID,currSegment)) cleanCategories.append(newCategory) return cleanCategories,extraCategories def calcSpeakingTypes(myAnnotation,maxTime): noVoice = [Segment(0,maxTime)] oneVoice = [] multiVoice = [] for speaker in myAnnotation.labels(): timesToProcess = [] for timeSegment in myAnnotation.subset([speaker]).itersegments(): timesToProcess.append((speaker,timeSegment)) while len(timesToProcess) > 0: currID, currSegment = timesToProcess[0] timesToProcess.remove(timesToProcess[0]) resetCheck = False # Check in multi for compareID,timeSegment in multiVoice: overlapTime = checkForOverlap(currSegment,timeSegment) if overlapTime is None: continue else: compareID.append(currID) newTimes = removeOverlap(currSegment,timeSegment) for i in range(len(newTimes)): newTimes[i] = (currID,newTimes[i]) timesToProcess += newTimes resetCheck = True break if resetCheck: continue # Check in one voice for timeSlot in oneVoice: tID = timeSlot[0] tSegment = timeSlot[1] overlapTime = checkForOverlap(currSegment,tSegment) if overlapTime is None: continue else: oneVoice.remove(timeSlot) # Add back non overlap newTimes = removeOverlap(tSegment,currSegment) for i in range(len(newTimes)): newTimes[i] = (tID,newTimes[i]) oneVoice += newTimes # Add overlap time to multivoice multiVoice.append(([tID,currID],overlapTime)) # Add new times back to process newTimes = removeOverlap(currSegment,tSegment) for i in range(len(newTimes)): newTimes[i] = (currID,newTimes[i]) timesToProcess += newTimes resetCheck = True break if resetCheck: continue # Add to one voice oneVoice.append((currID,currSegment)) ovAnnotation = Annotation() mvAnnotation = Annotation() for currID,timeSlot in multiVoice: currIDString = '+'.join(currID) mvAnnotation[timeSlot] = currIDString copyOfNo = copy.deepcopy(noVoice) for emptySlot in noVoice: if checkForOverlap(timeSlot,emptySlot) is None: continue else: copyOfNo.remove(emptySlot) copyOfNo += removeOverlap(emptySlot,timeSlot) noVoice = copyOfNo for currID,timeSlot in oneVoice: ovAnnotation[timeSlot] = currID copyOfNo = copy.deepcopy(noVoice) for emptySlot in noVoice: if checkForOverlap(timeSlot,emptySlot) is None: continue else: copyOfNo.remove(emptySlot) copyOfNo += removeOverlap(emptySlot,timeSlot) noVoice = copyOfNo nvAnnotation = Annotation() for emptySlot in noVoice: nvAnnotation[emptySlot] = "None" return nvAnnotation, ovAnnotation, mvAnnotation def timeToString(timeInSeconds): if isinstance(timeInSeconds,list): return [timeToString(t) for t in timeInSeconds] else: h = int(timeInSeconds//3600) m = int(timeInSeconds%3600//60) s = timeInSeconds%60 return f'{h:02d}::{m:02d}::{s:02.2f}'