|
|
import os |
|
|
os.system('pip install webvtt-py') |
|
|
os.system('pip install spacy') |
|
|
os.system('python3 -m spacy download en_core_web_sm') |
|
|
os.system('pip install simpletransformers') |
|
|
os.system('pip install pytorch') |
|
|
|
|
|
from simpletransformers.classification import ClassificationModel, ClassificationArgs |
|
|
from typing import Dict, List, Any |
|
|
import pandas as pd |
|
|
import webvtt |
|
|
from datetime import datetime |
|
|
import torch |
|
|
import spacy |
|
|
import json |
|
|
import requests |
|
|
from io import StringIO |
|
|
|
|
|
|
|
|
nlp = spacy.load("en_core_web_sm") |
|
|
tokenizer = nlp.tokenizer |
|
|
token_limit = 200 |
|
|
|
|
|
class Utterance(object): |
|
|
|
|
|
def __init__(self, starttime, endtime, speaker, chat, text, |
|
|
idx, prev_utterance, prev_prev_utterance): |
|
|
self.starttime = starttime |
|
|
self.endtime = endtime |
|
|
self.speaker = speaker |
|
|
self.chat = chat |
|
|
self.text = text |
|
|
self.idx = idx |
|
|
self.prev_utterance = prev_utterance |
|
|
self.prev_prev_utterance = prev_prev_utterance |
|
|
|
|
|
|
|
|
class Chat(object): |
|
|
|
|
|
def __init__(self, time, speaker, text): |
|
|
self.time = time |
|
|
self.speaker = speaker |
|
|
self.text = text |
|
|
|
|
|
|
|
|
class EndpointHandler(): |
|
|
|
|
|
def __init__(self, path="."): |
|
|
print("Loading models...") |
|
|
|
|
|
def eliciting_utterance_to_str(self, utterance: Utterance) -> str: |
|
|
|
|
|
doc = nlp(utterance.text) |
|
|
if len(doc) > token_limit: |
|
|
return self.handle_long_utterances(doc), 'list' |
|
|
return utterance.text, 'single' |
|
|
|
|
|
def connecting_utterance_to_str(self, utterance: Utterance) -> str: |
|
|
|
|
|
doc = nlp(utterance.text) |
|
|
if len(doc) > token_limit: |
|
|
return self.handle_long_utterances(doc), 'list' |
|
|
return utterance.text, 'single' |
|
|
|
|
|
def probing_utterance_to_str(self, utterance: Utterance) -> str: |
|
|
|
|
|
|
|
|
doc = nlp(utterance.text) |
|
|
prior_text = self.truncate_end(self.get_prior_text(utterance)) |
|
|
|
|
|
if len(doc) > token_limit: |
|
|
utterance_text_list = self.handle_long_utterances(doc) |
|
|
utterance_with_prior_text = [] |
|
|
for text in utterance_text_list: |
|
|
utterance_with_prior_text.append([prior_text, text]) |
|
|
return utterance_with_prior_text, 'list' |
|
|
|
|
|
else: |
|
|
return [prior_text, utterance.text], 'single' |
|
|
|
|
|
def revoicing_utterance_to_str(self, utterance: Utterance) -> str: |
|
|
|
|
|
|
|
|
doc = nlp(utterance.text) |
|
|
prior_text = self.truncate_end(self.get_prior_text(utterance)) |
|
|
|
|
|
if len(doc) > token_limit: |
|
|
utterance_text_list = self.handle_long_utterances(doc) |
|
|
utterance_with_prior_text = [] |
|
|
for text in utterance_text_list: |
|
|
utterance_with_prior_text.append([prior_text, text]) |
|
|
return utterance_with_prior_text, 'list' |
|
|
|
|
|
else: |
|
|
return [prior_text, utterance.text], 'single' |
|
|
|
|
|
def adding_on_utterance_to_str(self, utterance: Utterance) -> str: |
|
|
|
|
|
|
|
|
doc = nlp(utterance.text) |
|
|
prior_text = self.get_prior_text(utterance) |
|
|
|
|
|
if len(doc) > token_limit: |
|
|
utterance_text_list = self.handle_long_utterances(doc) |
|
|
utterance_with_prior_text = [] |
|
|
for text in utterance_text_list: |
|
|
utterance_with_prior_text.append([prior_text, text]) |
|
|
return utterance_with_prior_text, 'list' |
|
|
|
|
|
else: |
|
|
return [prior_text, utterance.text], 'single' |
|
|
|
|
|
def model_utterance_to_str(self, utterance: Utterance) -> str: |
|
|
|
|
|
|
|
|
doc = nlp(utterance.text) |
|
|
prior_text = self.get_prior_text(utterance) |
|
|
|
|
|
if len(doc) > token_limit: |
|
|
utterance_text_list = self.handle_long_utterances(doc) |
|
|
utterance_with_prior_text = [] |
|
|
for text in utterance_text_list: |
|
|
utterance_with_prior_text.append([prior_text, text]) |
|
|
return utterance_with_prior_text, 'list' |
|
|
|
|
|
else: |
|
|
return [prior_text, utterance.text], 'single' |
|
|
|
|
|
def truncate_end(self, prior_text: str) -> str: |
|
|
max_seq_length = 512 |
|
|
prior_text_max_length = int(max_seq_length / 2) |
|
|
|
|
|
if len(prior_text) > prior_text_max_length: |
|
|
starting_index = len(prior_text) - prior_text_max_length |
|
|
return prior_text[starting_index:] |
|
|
return prior_text |
|
|
|
|
|
def format_speaker(self, speaker: str, chat: bool) -> str: |
|
|
prior_text = '' |
|
|
if speaker == 'student': |
|
|
prior_text += '***STUDENT ' |
|
|
else: |
|
|
prior_text += '***SECTION_LEADER ' |
|
|
if not chat: |
|
|
prior_text += '(audio)*** : ' |
|
|
else: |
|
|
prior_text += '(chat)*** : ' |
|
|
return prior_text |
|
|
|
|
|
def get_sl(self, utterances: List[Utterance]) -> str: |
|
|
for utterance in utterances: |
|
|
if '(SL)' in utterance.speaker or 'Section Leader' in utterance.speaker: |
|
|
return utterance.speaker |
|
|
|
|
|
|
|
|
talk_time = dict() |
|
|
for utterance in utterances: |
|
|
if utterance.speaker not in talk_time: |
|
|
talk_time[utterance.speaker] = 0 |
|
|
talk_time[utterance.speaker] += utterance.endtime - utterance.starttime |
|
|
max_talk_time = 0 |
|
|
max_speaker = "" |
|
|
for speaker in talk_time: |
|
|
if talk_time[speaker] > max_talk_time: |
|
|
max_talk_time = talk_time[speaker] |
|
|
max_speaker = speaker |
|
|
return max_speaker |
|
|
|
|
|
def get_prior_text(self, utterance: Utterance) -> str: |
|
|
prior_text = '' |
|
|
if utterance.prev_utterance != None and utterance.prev_prev_utterance != None: |
|
|
prior_text = '\"' + self.format_speaker(utterance.prev_prev_utterance.speaker, utterance.prev_prev_utterance.chat) + utterance.prev_prev_utterance.text + ' \n ' |
|
|
prior_text += self.format_speaker(utterance.prev_utterance.speaker, utterance.prev_utterance.chat) + utterance.prev_utterance.text + ' \n ' |
|
|
else: |
|
|
prior_text = 'No prior utterance' |
|
|
return prior_text |
|
|
|
|
|
def handle_long_utterances(self, doc: str) -> List[str]: |
|
|
split_count = 1 |
|
|
total_sent = len([x for x in doc.sents]) |
|
|
sent_count = 0 |
|
|
token_count = 0 |
|
|
split_utterance = '' |
|
|
utterances = [] |
|
|
for sent in doc.sents: |
|
|
|
|
|
split_utterance = split_utterance + ' ' + sent.text |
|
|
token_count += len(sent) |
|
|
sent_count +=1 |
|
|
if token_count >= token_limit or sent_count == total_sent: |
|
|
|
|
|
utterances.append(split_utterance) |
|
|
|
|
|
|
|
|
split_utterance = '' |
|
|
token_count = 0 |
|
|
split_count += 1 |
|
|
|
|
|
return utterances |
|
|
|
|
|
|
|
|
def convert_time(self, time_str): |
|
|
time = datetime.strptime(time_str, "%H:%M:%S.%f") |
|
|
return 1000 * (3600 * time.hour + 60 * time.minute + time.second) + time.microsecond / 1000 |
|
|
|
|
|
def process_chat_transcript(self, chat_file) -> List[Chat]: |
|
|
chat_list = [] |
|
|
chat_file = open(chat_file, 'r') |
|
|
for line in chat_file.readlines(): |
|
|
split_line = line.split('\t') |
|
|
if len(split_line) < 3 or split_line[0] == '': |
|
|
continue |
|
|
time = split_line[0] + '.00' |
|
|
name = split_line[1].replace(':', '') |
|
|
text = split_line[2].replace('\n', '') |
|
|
chat_list.append(Chat(time=self.convert_time(time), speaker=name, text=text)) |
|
|
return chat_list |
|
|
|
|
|
|
|
|
def process_vtt_transcript(self, vttfile: str, chat_list: List[Chat]) -> List[Utterance]: |
|
|
"""Process raw vtt file.""" |
|
|
|
|
|
utterances_list = [] |
|
|
text = "" |
|
|
prev_speaker = None |
|
|
prev_start = "00:00:00.000" |
|
|
prev_end = "00:00:00.000" |
|
|
idx = 0 |
|
|
prev_utterance = None |
|
|
prev_prev_utterance = None |
|
|
cur_chat = None |
|
|
cur_chat_ptr = 0 |
|
|
if len(chat_list) > 0: |
|
|
cur_chat = chat_list[cur_chat_ptr] |
|
|
|
|
|
vtt = "" |
|
|
try: |
|
|
vtt = webvtt.read(vttfile) |
|
|
except: |
|
|
return utterances_list |
|
|
for i in range(len(vtt)): |
|
|
caption = vtt[i] |
|
|
|
|
|
|
|
|
while cur_chat is not None and prev_utterance is not None and prev_utterance.endtime > cur_chat.time: |
|
|
utterance = Utterance(starttime=cur_chat.time, |
|
|
endtime=cur_chat.time, |
|
|
speaker=cur_chat.speaker, |
|
|
chat=True, |
|
|
text=cur_chat.text, |
|
|
idx=idx, |
|
|
prev_utterance=prev_utterance, |
|
|
prev_prev_utterance=prev_prev_utterance) |
|
|
|
|
|
utterances_list.append(utterance) |
|
|
prev_prev_utterance = prev_utterance |
|
|
prev_utterance = utterance |
|
|
idx+=1 |
|
|
|
|
|
|
|
|
cur_chat_ptr += 1 |
|
|
if cur_chat_ptr < len(chat_list): |
|
|
cur_chat = chat_list[cur_chat_ptr] |
|
|
else: |
|
|
cur_chat = None |
|
|
|
|
|
|
|
|
check_for_speaker = caption.text.split(":") |
|
|
if len(check_for_speaker) > 1: |
|
|
speaker = check_for_speaker[0] |
|
|
else: |
|
|
speaker = prev_speaker |
|
|
|
|
|
|
|
|
new_text = check_for_speaker[1] if len(check_for_speaker) > 1 else check_for_speaker[0] |
|
|
|
|
|
|
|
|
if (prev_speaker is not None) and (speaker != prev_speaker): |
|
|
utterance = Utterance(starttime=self.convert_time(prev_start), |
|
|
endtime=self.convert_time(prev_end), |
|
|
speaker=prev_speaker, |
|
|
chat=False, |
|
|
text=text.strip(), |
|
|
idx=idx, |
|
|
prev_utterance=prev_utterance, |
|
|
prev_prev_utterance=prev_prev_utterance) |
|
|
|
|
|
utterances_list.append(utterance) |
|
|
|
|
|
|
|
|
prev_start = caption.start |
|
|
text = "" |
|
|
prev_prev_utterance = prev_utterance |
|
|
prev_utterance = utterance |
|
|
idx+=1 |
|
|
text += new_text + " " |
|
|
prev_end = caption.end |
|
|
prev_speaker = speaker |
|
|
|
|
|
|
|
|
if prev_speaker is not None: |
|
|
utterance = Utterance(starttime=self.convert_time(prev_start), |
|
|
endtime=self.convert_time(prev_end), |
|
|
speaker=prev_speaker, |
|
|
chat=False, |
|
|
text=text.strip(), |
|
|
idx=idx, |
|
|
prev_utterance=prev_utterance, |
|
|
prev_prev_utterance=prev_prev_utterance) |
|
|
utterances_list.append(utterance) |
|
|
return utterances_list |
|
|
|
|
|
def transcript_to_json(self, utterances: List[Utterance]) -> List[str]: |
|
|
formatted = [] |
|
|
for utterance in utterances: |
|
|
formatted.append({'speaker': utterance.speaker, 'data': utterance.text, 'time': utterance.starttime, 'chat': utterance.chat}) |
|
|
return sorted(formatted, key=lambda d: d['time']) |
|
|
|
|
|
def get_talk_time(self, utterances: List[Utterance]) -> (float, float, str): |
|
|
sl_time = 0 |
|
|
student_time = 0 |
|
|
sl_name = self.get_sl(utterances) |
|
|
for utterance in utterances: |
|
|
if sl_name != utterance.speaker: |
|
|
student_time += utterance.endtime - utterance.starttime |
|
|
else: |
|
|
sl_time += utterance.endtime - utterance.starttime |
|
|
total_time = sl_time + student_time |
|
|
return sl_time / total_time, student_time / total_time, sl_name |
|
|
|
|
|
def talk_moves_list_to_json(self, utterances: List[Utterance]) -> List[str]: |
|
|
formatted = [] |
|
|
for utterance in utterances: |
|
|
is_model_utterance = utterances[utterance] |
|
|
if utterance.prev_utterance is None: |
|
|
formatted.append({'timing': utterance.starttime, 'is_model_utterance': is_model_utterance, 'excerpt': [ |
|
|
{'speaker': "", 'data': "", 'time': utterance.starttime, 'chat': False}, |
|
|
{'speaker': "", 'data': "", 'time': utterance.starttime, 'chat': False}, |
|
|
{'speaker': utterance.speaker, 'data': utterance.text, 'time': utterance.starttime, 'chat': utterance.chat}]}) |
|
|
elif utterance.prev_prev_utterance is None: |
|
|
formatted.append({'timing': utterance.starttime, 'is_model_utterance': is_model_utterance, 'excerpt': [ |
|
|
{'speaker': "", 'data': "", 'time': utterance.starttime, 'chat': False}, |
|
|
{'speaker': utterance.prev_utterance.speaker, 'data': utterance.prev_utterance.text, 'time': utterance.prev_utterance.starttime, 'chat': utterance.prev_utterance.chat}, |
|
|
{'speaker': utterance.speaker, 'data': utterance.text, 'time': utterance.starttime, 'chat': utterance.chat}]}) |
|
|
else: |
|
|
formatted.append({'timing': utterance.starttime, 'is_model_utterance': is_model_utterance, 'excerpt': [ |
|
|
{'speaker': utterance.prev_prev_utterance.speaker, 'data': utterance.prev_prev_utterance.text, 'time': utterance.prev_prev_utterance.starttime, 'chat': utterance.prev_prev_utterance.chat}, |
|
|
{'speaker': utterance.prev_utterance.speaker, 'data': utterance.prev_utterance.text, 'time': utterance.prev_utterance.starttime, 'chat': utterance.prev_utterance.chat}, |
|
|
{'speaker': utterance.speaker, 'data': utterance.text, 'time': utterance.starttime, 'chat': utterance.chat}]}) |
|
|
return sorted(formatted, key=lambda d: d['timing']) |
|
|
|
|
|
def get_utterances_list(self, full_transcript, utterances_list, utterances_indexes, model_id): |
|
|
sl_speaker = self.get_sl(full_transcript) |
|
|
for i in range(len(full_transcript)): |
|
|
utterance = full_transcript[i] |
|
|
|
|
|
if sl_speaker != utterance.speaker: |
|
|
continue |
|
|
|
|
|
if model_id == 'eliciting': |
|
|
utterance_str, is_list = self.eliciting_utterance_to_str(utterance) |
|
|
elif model_id == 'connecting': |
|
|
utterance_str, is_list = self.connecting_utterance_to_str(utterance) |
|
|
elif model_id == 'probing': |
|
|
utterance_str, is_list = self.probing_utterance_to_str(utterance) |
|
|
elif model_id == 'adding_on': |
|
|
utterance_str, is_list = self.adding_on_utterance_to_str(utterance) |
|
|
elif model_id == 'revoicing': |
|
|
utterance_str, is_list = self.revoicing_utterance_to_str(utterance) |
|
|
elif model_id == 'model_utterance': |
|
|
utterance_str, is_list = self.model_utterance_to_str(utterance) |
|
|
|
|
|
if is_list == 'list': |
|
|
utterances_list.extend(utterance_str) |
|
|
for j in range(len(utterance_str)): |
|
|
utterances_indexes.append(i) |
|
|
else: |
|
|
utterances_list.append(utterance_str) |
|
|
utterances_indexes.append(i) |
|
|
return utterances_list, utterances_indexes |
|
|
|
|
|
def do_prediction(self, full_transcript, model_id): |
|
|
|
|
|
utterances_list, utterances_indexes = self.get_utterances_list(full_transcript, [], [], model_id) |
|
|
if len(utterances_list) == 0: |
|
|
return [], [], [] |
|
|
|
|
|
cuda_available = torch.cuda.is_available() |
|
|
if model_id == 'eliciting': |
|
|
self.model = ClassificationModel( |
|
|
"roberta", "aekupor/eliciting", use_cuda=cuda_available |
|
|
) |
|
|
elif model_id == 'connecting': |
|
|
self.model = ClassificationModel( |
|
|
"roberta", "aekupor/connecting", use_cuda=cuda_available |
|
|
) |
|
|
elif model_id == 'probing': |
|
|
self.model = ClassificationModel( |
|
|
"roberta", "aekupor/probing", use_cuda=cuda_available |
|
|
) |
|
|
elif model_id == 'adding_on': |
|
|
self.model = ClassificationModel( |
|
|
"roberta", "aekupor/adding_on", use_cuda=cuda_available |
|
|
) |
|
|
elif model_id == 'revoicing': |
|
|
self.model = ClassificationModel( |
|
|
"roberta", "aekupor/revoicing", use_cuda=cuda_available |
|
|
) |
|
|
elif model_id == 'model_utterance': |
|
|
self.model = ClassificationModel( |
|
|
"roberta", "aekupor/model_utterance", use_cuda=cuda_available |
|
|
) |
|
|
|
|
|
predictions, _ = self.model.predict(utterances_list) |
|
|
return utterances_list, utterances_indexes, predictions |
|
|
|
|
|
def add_preds_to_list(self, utterance_talk_moves, predictions, utterances_indexes, full_transcript, model_utterances_predictions): |
|
|
for i in range(len(predictions)): |
|
|
if predictions[i] == 1: |
|
|
if model_utterances_predictions[i] == 1: |
|
|
utterance_talk_moves[full_transcript[utterances_indexes[i]]] = True |
|
|
else: |
|
|
utterance_talk_moves[full_transcript[utterances_indexes[i]]] = False |
|
|
return utterance_talk_moves |
|
|
|
|
|
def __call__(self, data: str) -> List[Dict[str, Any]]: |
|
|
''' data_file is a str pointing to filename of type .vtt ''' |
|
|
|
|
|
|
|
|
transcript_file = data.pop("transcript_file", None) |
|
|
chat_file = data.pop("chat_file", None) |
|
|
talk_move = data.pop("talk_move", None) |
|
|
|
|
|
if transcript_file is None: |
|
|
raise ValueError("no data file provided") |
|
|
|
|
|
chat_list = [] |
|
|
if chat_file is not None: |
|
|
chat_list = self.process_chat_transcript(chat_file) |
|
|
full_transcript = self.process_vtt_transcript(transcript_file, chat_list) |
|
|
if len(full_transcript) == 0: |
|
|
return {} |
|
|
|
|
|
|
|
|
utterance_talk_moves_json = "" |
|
|
_, _, model_utterances_predictions = self.do_prediction(full_transcript, 'model_utterance') |
|
|
|
|
|
gi_utterances_list, gi_utterances_indexes, gi_predictions = self.do_prediction(full_transcript, 'eliciting') |
|
|
gi_utterance_talk_moves = self.add_preds_to_list(dict(), gi_predictions, gi_utterances_indexes, full_transcript, model_utterances_predictions) |
|
|
if talk_move == 'getIdeas': |
|
|
utterance_talk_moves_json = self.talk_moves_list_to_json(gi_utterance_talk_moves) |
|
|
|
|
|
oi_utterances_list, oi_utterances_indexes, oi_predictions = self.do_prediction(full_transcript, 'connecting') |
|
|
oi_utterance_talk_moves = self.add_preds_to_list(dict(), oi_predictions, oi_utterances_indexes, full_transcript, model_utterances_predictions) |
|
|
if talk_move == 'orientIdeas': |
|
|
utterance_talk_moves_json = self.talk_moves_list_to_json(oi_utterance_talk_moves) |
|
|
|
|
|
bi_utterances_list, bi_utterances_indexes, bi_predictions = self.do_prediction(full_transcript, 'probing') |
|
|
bi_utterance_talk_moves = self.add_preds_to_list(dict(), bi_predictions, bi_utterances_indexes, full_transcript, model_utterances_predictions) |
|
|
bi_utterances_list, bi_utterances_indexes, bi_predictions = self.do_prediction(full_transcript, 'adding_on') |
|
|
bi_utterance_talk_moves = self.add_preds_to_list(bi_utterance_talk_moves, bi_predictions, bi_utterances_indexes, full_transcript, model_utterances_predictions) |
|
|
bi_utterances_list, bi_utterances_indexes, bi_predictions = self.do_prediction(full_transcript, 'revoicing') |
|
|
bi_utterance_talk_moves = self.add_preds_to_list(bi_utterance_talk_moves, bi_predictions, bi_utterances_indexes, full_transcript, model_utterances_predictions) |
|
|
if talk_move == 'buildIdeas': |
|
|
utterance_talk_moves_json = self.talk_moves_list_to_json(bi_utterance_talk_moves) |
|
|
|
|
|
|
|
|
|
|
|
full_transcript_json = self.transcript_to_json(full_transcript) |
|
|
|
|
|
sl_time, student_time, sl_name = self.get_talk_time(full_transcript) |
|
|
talk_time_json = {'sl': sl_time, 'student': student_time} |
|
|
|
|
|
num_moments_json = {'getIdeas': len(gi_utterance_talk_moves), 'buildIdeas': len(bi_utterance_talk_moves), 'orientIdeas': len(oi_utterance_talk_moves)} |
|
|
|
|
|
response = {'talkTime': talk_time_json, 'talkMoveInFocus': talk_move, 'numberOfMoments': num_moments_json, |
|
|
'talkMoveDemonstrations': utterance_talk_moves_json, 'transcript': full_transcript_json, 'slName': sl_name} |
|
|
|
|
|
return response |
|
|
|
|
|
|
|
|
|