File size: 3,850 Bytes
989a9b7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import cv2
import random
import copy
from pyannote.core import Annotation, Segment

def colors(n):
  '''
  Creates a list size n of distinctive colors
  '''
  if n == 0:
    return []
  ret = []
  h = int(random.random() * 180)
  step = 180 / n
  for i in range(n):
    h += step
    h = int(h) % 180
    hsv = np.uint8([[[h,200,200]]])
    bgr = cv2.cvtColor(hsv,cv2.COLOR_HSV2BGR)
    ret.append((bgr[0][0][0].item()/255,bgr[0][0][1].item()/255,bgr[0][0][2].item()/255))
  return ret

def extendSpeakers(mySpeakerList, fileLabel = 'NONE', maximumSecondDifference = 1, minimumSecondDuration = 0):
    '''
    Assumes mySpeakerList is already split into Speaker/Audience 
    '''
    mySpeakerAnnotations = Annotation(uri=fileLabel)
    newSpeakerList = [[],[]]
    for i, speaker in enumerate(mySpeakerList):
        speaker.sort()
        lastEnd = -1
        tempSection = None
        for section in speaker:
            if lastEnd == -1:
                tempSection = copy.deepcopy(section)
                lastEnd = section[0] + section[1]
            else:
                if section[0] - lastEnd <= maximumSecondDifference:
                    tempSection = (tempSection[0],max(section[0] + section[1] - tempSection[0],tempSection[1]))
                    lastEnd = tempSection[0] + tempSection[1]
                else:
                    if tempSection[1] >= minimumSecondDuration:
                        newSpeakerList[i].append(tempSection)
                        mySpeakerAnnotations[Segment(tempSection[0],lastEnd)] = i
                    tempSection = copy.deepcopy(section)
                    lastEnd = section[0] + section[1]
        if tempSection is not None:
            # Add the last section back in
            if tempSection[1] >= minimumSecondDuration:
                newSpeakerList[i].append(tempSection)
                mySpeakerAnnotations[Segment(tempSection[0],lastEnd)] = i
    return newSpeakerList,mySpeakerAnnotations

def twoClassExtendAnnotation(myAnnotation,maximumSecondDifference = 1, minimumSecondDuration = 0):
    lecturerID = None
    lecturerLen = 0
    
    # Identify lecturer
    for speakerName in myAnnotation.labels():
        tempLen = len(myAnnotation.label_support(speakerName))
        if tempLen > lecturerLen:
            lecturerLen = tempLen
            lecturerID = speakerName

    tempSpeakerList = [[],[]]
    # Recreate speakerList as [[lecturer labels],[audience labels]]
    for speakerName in myAnnotation.labels():
        if speakerName != lecturerID:
            for segmentItem in myAnnotation.label_support(speakerName):
                tempSpeakerList[1].append((segmentItem.start,segmentItem.duration))
        else:
            for segmentItem in myAnnotation.label_support(speakerName):
                tempSpeakerList[0].append((segmentItem.start,segmentItem.duration))
                
    newList, newAnnotation = extendSpeakers(tempSpeakerList, fileLabel = myAnnotation.uri, maximumSecondDifference = maximumSecondDifference, minimumSecondDuration = minimumSecondDuration)

    return newList, newAnnotation

def loadAudioRTTM(sampleRTTM):
    # Read in prediction data
    # Data in list form, for convenient plotting
    speakerList = []
    # Data in Annotation form, for convenient error rate calculation
    prediction = Annotation(uri=sampleRTTM)
    with open(sampleRTTM, "r") as rttm:
        for line in rttm:
            speakerResult = line.split(' ')
            index = int(speakerResult[7][-2:])
            start = float(speakerResult[3])
            end = start + float(speakerResult[4])
            while len(speakerList) < index + 1:
                speakerList.append([])
            speakerList[index].append((float(speakerResult[3]),float(speakerResult[4])))
            prediction[Segment(start,end)] = index

    return speakerList, prediction