talk-move-router / handler.py
aekupor's picture
Update handler.py
1331c45
import os
os.system('pip install webvtt-py')
os.system('pip install spacy')
os.system('python3 -m spacy download en_core_web_sm')
os.system('pip install simpletransformers')
os.system('pip install pytorch')
from simpletransformers.classification import ClassificationModel, ClassificationArgs
from typing import Dict, List, Any
import pandas as pd
import webvtt
from datetime import datetime
import torch
import spacy
import json
import requests
from io import StringIO
nlp = spacy.load("en_core_web_sm")
tokenizer = nlp.tokenizer
token_limit = 200
class Utterance(object):
def __init__(self, starttime, endtime, speaker, chat, text,
idx, prev_utterance, prev_prev_utterance):
self.starttime = starttime
self.endtime = endtime
self.speaker = speaker
self.chat = chat
self.text = text
self.idx = idx
self.prev_utterance = prev_utterance
self.prev_prev_utterance = prev_prev_utterance
class Chat(object):
def __init__(self, time, speaker, text):
self.time = time
self.speaker = speaker
self.text = text
class EndpointHandler():
def __init__(self, path="."):
print("Loading models...")
def eliciting_utterance_to_str(self, utterance: Utterance) -> str:
# eliciting only uses text
doc = nlp(utterance.text)
if len(doc) > token_limit:
return self.handle_long_utterances(doc), 'list'
return utterance.text, 'single'
def connecting_utterance_to_str(self, utterance: Utterance) -> str:
# connecting only uses text
doc = nlp(utterance.text)
if len(doc) > token_limit:
return self.handle_long_utterances(doc), 'list'
return utterance.text, 'single'
def probing_utterance_to_str(self, utterance: Utterance) -> str:
#probing uses prior text and truncates end of the prior text
doc = nlp(utterance.text)
prior_text = self.truncate_end(self.get_prior_text(utterance))
if len(doc) > token_limit:
utterance_text_list = self.handle_long_utterances(doc)
utterance_with_prior_text = []
for text in utterance_text_list:
utterance_with_prior_text.append([prior_text, text])
return utterance_with_prior_text, 'list'
else:
return [prior_text, utterance.text], 'single'
def revoicing_utterance_to_str(self, utterance: Utterance) -> str:
# revoicing uses prior text and truncates end of the prior text
doc = nlp(utterance.text)
prior_text = self.truncate_end(self.get_prior_text(utterance))
if len(doc) > token_limit:
utterance_text_list = self.handle_long_utterances(doc)
utterance_with_prior_text = []
for text in utterance_text_list:
utterance_with_prior_text.append([prior_text, text])
return utterance_with_prior_text, 'list'
else:
return [prior_text, utterance.text], 'single'
def adding_on_utterance_to_str(self, utterance: Utterance) -> str:
#adding_on uses prior text
doc = nlp(utterance.text)
prior_text = self.get_prior_text(utterance)
if len(doc) > token_limit:
utterance_text_list = self.handle_long_utterances(doc)
utterance_with_prior_text = []
for text in utterance_text_list:
utterance_with_prior_text.append([prior_text, text])
return utterance_with_prior_text, 'list'
else:
return [prior_text, utterance.text], 'single'
def model_utterance_to_str(self, utterance: Utterance) -> str:
#model utterance uses prior text
doc = nlp(utterance.text)
prior_text = self.get_prior_text(utterance)
if len(doc) > token_limit:
utterance_text_list = self.handle_long_utterances(doc)
utterance_with_prior_text = []
for text in utterance_text_list:
utterance_with_prior_text.append([prior_text, text])
return utterance_with_prior_text, 'list'
else:
return [prior_text, utterance.text], 'single'
def truncate_end(self, prior_text: str) -> str:
max_seq_length = 512
prior_text_max_length = int(max_seq_length / 2) #divide by 2 because 2 columns
if len(prior_text) > prior_text_max_length:
starting_index = len(prior_text) - prior_text_max_length
return prior_text[starting_index:]
return prior_text
def format_speaker(self, speaker: str, chat: bool) -> str:
prior_text = ''
if speaker == 'student':
prior_text += '***STUDENT '
else:
prior_text += '***SECTION_LEADER '
if not chat:
prior_text += '(audio)*** : '
else:
prior_text += '(chat)*** : '
return prior_text
def get_sl(self, utterances: List[Utterance]) -> str:
for utterance in utterances:
if '(SL)' in utterance.speaker or 'Section Leader' in utterance.speaker:
return utterance.speaker
# decide based on talk time
talk_time = dict()
for utterance in utterances:
if utterance.speaker not in talk_time:
talk_time[utterance.speaker] = 0
talk_time[utterance.speaker] += utterance.endtime - utterance.starttime
max_talk_time = 0
max_speaker = ""
for speaker in talk_time:
if talk_time[speaker] > max_talk_time:
max_talk_time = talk_time[speaker]
max_speaker = speaker
return max_speaker
def get_prior_text(self, utterance: Utterance) -> str:
prior_text = ''
if utterance.prev_utterance != None and utterance.prev_prev_utterance != None:
prior_text = '\"' + self.format_speaker(utterance.prev_prev_utterance.speaker, utterance.prev_prev_utterance.chat) + utterance.prev_prev_utterance.text + ' \n '
prior_text += self.format_speaker(utterance.prev_utterance.speaker, utterance.prev_utterance.chat) + utterance.prev_utterance.text + ' \n '
else:
prior_text = 'No prior utterance'
return prior_text
def handle_long_utterances(self, doc: str) -> List[str]:
split_count = 1
total_sent = len([x for x in doc.sents])
sent_count = 0
token_count = 0
split_utterance = ''
utterances = []
for sent in doc.sents:
# add a sentence to split
split_utterance = split_utterance + ' ' + sent.text
token_count += len(sent)
sent_count +=1
if token_count >= token_limit or sent_count == total_sent:
# save utterance segment
utterances.append(split_utterance)
# restart count
split_utterance = ''
token_count = 0
split_count += 1
return utterances
def convert_time(self, time_str):
time = datetime.strptime(time_str, "%H:%M:%S.%f")
return 1000 * (3600 * time.hour + 60 * time.minute + time.second) + time.microsecond / 1000
def process_chat_transcript(self, chat_file) -> List[Chat]:
chat_list = []
chat_file = open(chat_file, 'r')
for line in chat_file.readlines():
split_line = line.split('\t')
if len(split_line) < 3 or split_line[0] == '': # had an edge case where no time
continue
time = split_line[0] + '.00'
name = split_line[1].replace(':', '')
text = split_line[2].replace('\n', '')
chat_list.append(Chat(time=self.convert_time(time), speaker=name, text=text))
return chat_list
def process_vtt_transcript(self, vttfile: str, chat_list: List[Chat]) -> List[Utterance]:
"""Process raw vtt file."""
utterances_list = []
text = ""
prev_speaker = None
prev_start = "00:00:00.000"
prev_end = "00:00:00.000"
idx = 0
prev_utterance = None
prev_prev_utterance = None
cur_chat = None
cur_chat_ptr = 0
if len(chat_list) > 0:
cur_chat = chat_list[cur_chat_ptr]
vtt = ""
try:
vtt = webvtt.read(vttfile)
except:
return utterances_list
for i in range(len(vtt)):
caption = vtt[i]
# add in chat, if chat is next
while cur_chat is not None and prev_utterance is not None and prev_utterance.endtime > cur_chat.time:
utterance = Utterance(starttime=cur_chat.time,
endtime=cur_chat.time,
speaker=cur_chat.speaker,
chat=True,
text=cur_chat.text,
idx=idx,
prev_utterance=prev_utterance,
prev_prev_utterance=prev_prev_utterance)
utterances_list.append(utterance)
prev_prev_utterance = prev_utterance
prev_utterance = utterance
idx+=1
# update chat ptr
cur_chat_ptr += 1
if cur_chat_ptr < len(chat_list):
cur_chat = chat_list[cur_chat_ptr]
else:
cur_chat = None
# Get speaker
check_for_speaker = caption.text.split(":")
if len(check_for_speaker) > 1: # the speaker was changed or restated
speaker = check_for_speaker[0]
else:
speaker = prev_speaker
# Get utterance
new_text = check_for_speaker[1] if len(check_for_speaker) > 1 else check_for_speaker[0]
# If speaker was changed, start new batch
if (prev_speaker is not None) and (speaker != prev_speaker):
utterance = Utterance(starttime=self.convert_time(prev_start),
endtime=self.convert_time(prev_end),
speaker=prev_speaker,
chat=False,
text=text.strip(),
idx=idx,
prev_utterance=prev_utterance,
prev_prev_utterance=prev_prev_utterance)
utterances_list.append(utterance)
# Start new batch
prev_start = caption.start
text = ""
prev_prev_utterance = prev_utterance
prev_utterance = utterance
idx+=1
text += new_text + " "
prev_end = caption.end
prev_speaker = speaker
# Append last one
if prev_speaker is not None:
utterance = Utterance(starttime=self.convert_time(prev_start),
endtime=self.convert_time(prev_end),
speaker=prev_speaker,
chat=False,
text=text.strip(),
idx=idx,
prev_utterance=prev_utterance,
prev_prev_utterance=prev_prev_utterance)
utterances_list.append(utterance)
return utterances_list
def transcript_to_json(self, utterances: List[Utterance]) -> List[str]:
formatted = []
for utterance in utterances:
formatted.append({'speaker': utterance.speaker, 'data': utterance.text, 'time': utterance.starttime, 'chat': utterance.chat})
return sorted(formatted, key=lambda d: d['time'])
def get_talk_time(self, utterances: List[Utterance]) -> (float, float, str):
sl_time = 0
student_time = 0
sl_name = self.get_sl(utterances)
for utterance in utterances:
if sl_name != utterance.speaker:
student_time += utterance.endtime - utterance.starttime
else:
sl_time += utterance.endtime - utterance.starttime
total_time = sl_time + student_time
return sl_time / total_time, student_time / total_time, sl_name
def talk_moves_list_to_json(self, utterances: List[Utterance]) -> List[str]:
formatted = []
for utterance in utterances:
is_model_utterance = utterances[utterance]
if utterance.prev_utterance is None:
formatted.append({'timing': utterance.starttime, 'is_model_utterance': is_model_utterance, 'excerpt': [
{'speaker': "", 'data': "", 'time': utterance.starttime, 'chat': False},
{'speaker': "", 'data': "", 'time': utterance.starttime, 'chat': False},
{'speaker': utterance.speaker, 'data': utterance.text, 'time': utterance.starttime, 'chat': utterance.chat}]})
elif utterance.prev_prev_utterance is None:
formatted.append({'timing': utterance.starttime, 'is_model_utterance': is_model_utterance, 'excerpt': [
{'speaker': "", 'data': "", 'time': utterance.starttime, 'chat': False},
{'speaker': utterance.prev_utterance.speaker, 'data': utterance.prev_utterance.text, 'time': utterance.prev_utterance.starttime, 'chat': utterance.prev_utterance.chat},
{'speaker': utterance.speaker, 'data': utterance.text, 'time': utterance.starttime, 'chat': utterance.chat}]})
else:
formatted.append({'timing': utterance.starttime, 'is_model_utterance': is_model_utterance, 'excerpt': [
{'speaker': utterance.prev_prev_utterance.speaker, 'data': utterance.prev_prev_utterance.text, 'time': utterance.prev_prev_utterance.starttime, 'chat': utterance.prev_prev_utterance.chat},
{'speaker': utterance.prev_utterance.speaker, 'data': utterance.prev_utterance.text, 'time': utterance.prev_utterance.starttime, 'chat': utterance.prev_utterance.chat},
{'speaker': utterance.speaker, 'data': utterance.text, 'time': utterance.starttime, 'chat': utterance.chat}]})
return sorted(formatted, key=lambda d: d['timing'])
def get_utterances_list(self, full_transcript, utterances_list, utterances_indexes, model_id):
sl_speaker = self.get_sl(full_transcript)
for i in range(len(full_transcript)):
utterance = full_transcript[i]
#filter out to only have SL utterances
if sl_speaker != utterance.speaker:
continue
if model_id == 'eliciting':
utterance_str, is_list = self.eliciting_utterance_to_str(utterance)
elif model_id == 'connecting':
utterance_str, is_list = self.connecting_utterance_to_str(utterance)
elif model_id == 'probing':
utterance_str, is_list = self.probing_utterance_to_str(utterance)
elif model_id == 'adding_on':
utterance_str, is_list = self.adding_on_utterance_to_str(utterance)
elif model_id == 'revoicing':
utterance_str, is_list = self.revoicing_utterance_to_str(utterance)
elif model_id == 'model_utterance':
utterance_str, is_list = self.model_utterance_to_str(utterance)
if is_list == 'list':
utterances_list.extend(utterance_str)
for j in range(len(utterance_str)):
utterances_indexes.append(i)
else:
utterances_list.append(utterance_str)
utterances_indexes.append(i)
return utterances_list, utterances_indexes
def do_prediction(self, full_transcript, model_id):
# utterances_indexes entry corresponds to utterance in full_transcript
utterances_list, utterances_indexes = self.get_utterances_list(full_transcript, [], [], model_id)
if len(utterances_list) == 0: # no SL found
return [], [], []
cuda_available = torch.cuda.is_available()
if model_id == 'eliciting':
self.model = ClassificationModel(
"roberta", "aekupor/eliciting", use_cuda=cuda_available
)
elif model_id == 'connecting':
self.model = ClassificationModel(
"roberta", "aekupor/connecting", use_cuda=cuda_available
)
elif model_id == 'probing':
self.model = ClassificationModel(
"roberta", "aekupor/probing", use_cuda=cuda_available
)
elif model_id == 'adding_on':
self.model = ClassificationModel(
"roberta", "aekupor/adding_on", use_cuda=cuda_available
)
elif model_id == 'revoicing':
self.model = ClassificationModel(
"roberta", "aekupor/revoicing", use_cuda=cuda_available
)
elif model_id == 'model_utterance':
self.model = ClassificationModel(
"roberta", "aekupor/model_utterance", use_cuda=cuda_available
)
predictions, _ = self.model.predict(utterances_list)
return utterances_list, utterances_indexes, predictions
def add_preds_to_list(self, utterance_talk_moves, predictions, utterances_indexes, full_transcript, model_utterances_predictions):
for i in range(len(predictions)):
if predictions[i] == 1:
if model_utterances_predictions[i] == 1:
utterance_talk_moves[full_transcript[utterances_indexes[i]]] = True
else:
utterance_talk_moves[full_transcript[utterances_indexes[i]]] = False
return utterance_talk_moves
def __call__(self, data: str) -> List[Dict[str, Any]]:
''' data_file is a str pointing to filename of type .vtt '''
# deserialize incoming request
transcript_file = data.pop("transcript_file", None)
chat_file = data.pop("chat_file", None)
talk_move = data.pop("talk_move", None)
if transcript_file is None:
raise ValueError("no data file provided")
chat_list = []
if chat_file is not None:
chat_list = self.process_chat_transcript(chat_file)
full_transcript = self.process_vtt_transcript(transcript_file, chat_list)
if len(full_transcript) == 0: # transcript is empty
return {}
utterance_talk_moves_json = ""
_, _, model_utterances_predictions = self.do_prediction(full_transcript, 'model_utterance')
gi_utterances_list, gi_utterances_indexes, gi_predictions = self.do_prediction(full_transcript, 'eliciting')
gi_utterance_talk_moves = self.add_preds_to_list(dict(), gi_predictions, gi_utterances_indexes, full_transcript, model_utterances_predictions)
if talk_move == 'getIdeas':
utterance_talk_moves_json = self.talk_moves_list_to_json(gi_utterance_talk_moves)
oi_utterances_list, oi_utterances_indexes, oi_predictions = self.do_prediction(full_transcript, 'connecting')
oi_utterance_talk_moves = self.add_preds_to_list(dict(), oi_predictions, oi_utterances_indexes, full_transcript, model_utterances_predictions)
if talk_move == 'orientIdeas':
utterance_talk_moves_json = self.talk_moves_list_to_json(oi_utterance_talk_moves)
bi_utterances_list, bi_utterances_indexes, bi_predictions = self.do_prediction(full_transcript, 'probing')
bi_utterance_talk_moves = self.add_preds_to_list(dict(), bi_predictions, bi_utterances_indexes, full_transcript, model_utterances_predictions)
bi_utterances_list, bi_utterances_indexes, bi_predictions = self.do_prediction(full_transcript, 'adding_on')
bi_utterance_talk_moves = self.add_preds_to_list(bi_utterance_talk_moves, bi_predictions, bi_utterances_indexes, full_transcript, model_utterances_predictions)
bi_utterances_list, bi_utterances_indexes, bi_predictions = self.do_prediction(full_transcript, 'revoicing')
bi_utterance_talk_moves = self.add_preds_to_list(bi_utterance_talk_moves, bi_predictions, bi_utterances_indexes, full_transcript, model_utterances_predictions)
if talk_move == 'buildIdeas':
utterance_talk_moves_json = self.talk_moves_list_to_json(bi_utterance_talk_moves)
# json formating
full_transcript_json = self.transcript_to_json(full_transcript)
sl_time, student_time, sl_name = self.get_talk_time(full_transcript)
talk_time_json = {'sl': sl_time, 'student': student_time}
num_moments_json = {'getIdeas': len(gi_utterance_talk_moves), 'buildIdeas': len(bi_utterance_talk_moves), 'orientIdeas': len(oi_utterance_talk_moves)}
response = {'talkTime': talk_time_json, 'talkMoveInFocus': talk_move, 'numberOfMoments': num_moments_json,
'talkMoveDemonstrations': utterance_talk_moves_json, 'transcript': full_transcript_json, 'slName': sl_name}
return response