transcriber_tools / utils.py
rosyvs
Remove unused sort_transcript function and update column renaming in merge_ellipsis and parse_label_csv functions
693e4cf
import csv
import logging
import os
import re
import subprocess
from pathlib import Path
import sys
import gradio as gr
import pandas as pd
from pathlib import Path
import nltk
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.worksheet.datavalidation import DataValidation
os.makedirs(f'{os.getcwd()}/logs', exist_ok=True)
os.makedirs(f'{os.getcwd()}/results', exist_ok=True)
logging.basicConfig(filename=f'{os.getcwd()}/logs/logfile.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logging.info('Starting the application...')
def subprocess_run_verbose(cmd):
res = subprocess.check_call(cmd, stdout=sys.stdout, stderr=subprocess.STDOUT)
return res
def HHMMSS_to_sec(time_str):
"""Get Seconds from timestamp string with milliseconds."""
if not time_str:
return None
if isinstance(time_str, (int, float)):
return float(time_str)
if time_str.count(':')==2:
h, m, s = time_str.split(':')
elif time_str.count(':')==3:
# weird timestamps where there is a field followign seconds delimited by colon
h, m, s, u = time_str.split(':')
# determine whether ms field is in tenths or hundredths or thousandths by countng how many digits
if len(u)==1:
print('Weird time format with 3 colons detected - HH:MM:SS:X . Interpreting X as tenths of a second. - please verify this is how you want the time interpreted')
ms = float(u)/10
elif len(u)==2: # hundredths
print('Weird time format with 3 colons detected - HH:MM:SS:XX . Interpreting XX as hundredths of a second. - please verify this is how you want the time interpreted')
ms = float(u)/100
elif len(u)==3: # hundredths
print('Weird time format with 3 colons detected - HH:MM:SS:XXX . Interpreting XX as milliseconds. - please verify this is how you want the time interpreted')
ms = float(u)/1000
else:
print(f'input string format not supported: {time_str}')
return None
s = int(s)+ms
elif time_str.count(':')==1:
# print('missing HH from timestamp, assuming MM:SS')
m, s = time_str.split(':')
h=0
else:
try:
time_str=float(time_str) # maybe its already in seconds!
return time_str
except Exception as e:
gr.Error(f"Error converting time to seconds: {e}")
return None
return int(h) * 3600 + int(m) * 60 + float(s)
def molly_xlsx_to_table(xl_file):
# contractor transcribers provide an xlsx with the following columns
# utt_ix: int
# Timecode: "HH:MM:SS:ss - HH:MM:SS:ss"
# Duration: HH:MM:SS:ss
# Speaker: str
# Dialogue: str
# Annotations: blank
# Error Type: blank
with pd.ExcelFile(xl_file) as xls:
sheetname = xls.sheet_names
table = pd.DataFrame(pd.read_excel(xls, sheetname[0]))
table[['start_time','end_time']] = table['Timecode'].str.split('-',expand=True)
table['start_sec'] = table['start_time'].str.strip().apply(HHMMSS_to_sec)
table['end_sec'] = table['end_time'].str.strip().apply(HHMMSS_to_sec)
table.drop(labels=['Annotations','Error Type','Duration'], axis=1, inplace=True)
table=table[['#','Speaker','Dialogue','start_sec','end_sec']]
table.rename(columns={'#':'uttID','Speaker':'speaker', 'Dialogue':'transcript'}, inplace=True)
return table
def xlsx_to_table(xl_file):
try:
# read the first sheet of the Excel file into a DataFrame
print(f'...reading {xl_file}...')
table = pd.read_excel(xl_file, sheet_name=0)
print(f'...done reading {xl_file}...')
# convert column names to lowercase
table.columns = map(str.lower, table.columns)
# extract start and end time from the Timecode column
print(f'...splitting Timecode column into start and end time...')
timecodes = table['timecode'].str.split(' - ', expand=True)
table['start_time'] = timecodes[0]
table['end_time'] = timecodes[1]
print(f'...done splitting Timecode column into start and end time...')
# convert start and end time to seconds using the HHMMSS_to_sec function
print(f'...converting start and end time to seconds...')
table['start_sec'] = table['start_time'].apply(HHMMSS_to_sec)
table['end_sec'] = table['end_time'].apply(HHMMSS_to_sec)
print(f'...done converting start and end time to seconds...')
# drop unnecessary columns
print(f'...dropping unnecessary columns...')
table.drop(['timecode', 'annotations', 'error type', 'duration'], axis=1, inplace=True)
# rename columns
print(f'...renaming columns...')
table.rename(columns={'#': 'uttID', 'speaker': 'speaker', 'dialogue': 'transcript'}, inplace=True)
# reorder columns
print(f'...reordering columns...')
table = table[['uttID', 'speaker', 'transcript', 'start_sec', 'end_sec']]
# sort by start time
table.sort_values('start_sec', inplace=True)
return table
except Exception as e:
gr.Error(f'Error converting {xl_file}: {e}')
def table_to_ELAN_tsv(table:pd.DataFrame, path:str):
# write table to tsv compatible with ELAN import
table.to_csv(path, index=False, float_format='%.3f',sep='\t')
return path
def convert_and_trim_video(media_in, media_out, start=None, end=None):
WAV_CHANNELS = 1
WAV_SAMPLE_RATE = 16000
start_sec = HHMMSS_to_sec(start)
end_sec = HHMMSS_to_sec(end)
try:
if start_sec is None and end_sec is None:
logging.info(f'...No start and end times provided. Converting entire video without trimming...')
trim_command=[]
else:
if start_sec is None:
logging.info(f'...No start time provided. Trimming video from start to specified end...')
start_sec = 0.0
trim_command = ['-ss',f'{start_sec}']
if end_sec is None:
logging.info(f'...No end time provided. Trimming video from specified start to end of video...')
end_sec = None
else:
trim_command.extend(['-to', f'{end_sec}'])
if not isinstance(media_in, (str, Path)):
raise TypeError("media_in must be a string or a PathLike object")
if not isinstance(media_out, (str, Path)):
raise TypeError("media_out must be a string or a PathLike object")
in_ext = Path(media_in).suffix.lower()
out_ext = Path(media_out).suffix.lower()
print(f'...detected extensions from filename: input={in_ext} output={out_ext}')
if in_ext == out_ext:
logging.info(f'...No media conversion needed...')
else:
logging.info(f'...Using ffmpeg to convert {in_ext} to {out_ext}...')
if out_ext == '.wav':
if in_ext == '.webm':
command = [
'ffmpeg', '-y',
'-i', media_in,
*trim_command,
media_out,
'-hide_banner', '-loglevel', 'info']
else:
# convert to wav with standard format for audio models
command = [
'ffmpeg',
"-f", "s16le",
'-y',
'-i', media_in,
*trim_command,
'-vn',
'-acodec', 'pcm_s16le',
'-ac', str(WAV_CHANNELS),
'-ar', str(WAV_SAMPLE_RATE),
media_out,
'-hide_banner', '-loglevel', 'info']
else: # convert using copy codec
if in_ext == '.webm':
command = [
'ffmpeg', '-y',
'-i', media_in,
'-strict', '-2',
*trim_command,
'-c:v', 'copy',
# '-vcodec', 'h264',
# '-acodec', 'aac',
media_out,
'-hide_banner', '-loglevel', 'info']
else: # not webm
command = [
'ffmpeg',
'-y',
'-i', media_in,
*trim_command,
'-c','copy',
media_out,
'-hide_banner', '-loglevel', 'info']
# run the ffmpeg command
logging.info(f"FFMPEG command: {' '.join(command)}")
gr.Info(f"FFMPEG command: {' '.join(command)}", visible=False)
print(f"...FFMPEG command: {' '.join(command)}")
# process = subprocess.run(command, capture_output=True, text=True)
# if process.returncode != 0:
# logging.info(f"FFMPEG error: {process.stderr}")
# print(f"FFMPEG error: {process.stderr}")
# gr.Error(f"FFMPEG error: {process.stderr}")
# else:
# logging.info(process.stdout)
# print(f"...FFMPEG status: {process.stdout}")
return_code = subprocess_run_verbose(command)
print(f"FFMPEG return code: {return_code}")
if return_code != 0:
logging.info(f"FFMPEG error: {return_code}")
print(f"FFMPEG error: {return_code}")
gr.Error(f"FFMPEG error: {return_code}")
return None
else:
logging.info(f"...FFMPEG completed successfully...")
print(f"...FFMPEG completed successfully...")
return media_out
except Exception as e:
print(f"Error converting video format: {e}")
gr.Error(f"Error converting video format: {e}")
###### TRANSCRIT UTILS ######
def convert_transcript_for_TM(file_list):
"""Convert transcripts for TalkMoves Annotation
Input can be xlsx or csv transcript file
Can handle sepraate start and end time columns or a single timecode column
Output will have separate start and end timestamps in HH:MM:SS.sss format
Args:
file_list (_type_): _description_
Raises:
gr.Error: _description_
gr.Error: _description_
Returns:
_type_: _description_
"""
# Regular expression pattern for matching speaker names and timecodes.
bracket_re = re.compile(r'(?:\[[UI|ui|Inaudible|inaudible|overlapping speech|VIDEO SILENCE|teacher explaining in background].*\]\W{0,2})')
# Regular expression pattern for matching anything enclosed in square brackets.
all_bracket_re = re.compile(r'(?:\[.*\]\W{0,2})')
# whether remove the inaudible
do_remove_inaudible = True
# whether_keep_context_switch
do_keep_context_switch = True
# whether_convert_to_timestamp if start and end time are in seconds and in separate columns
convert_to_timestamp = True
error_message = [] # List of error messages to be displayed to the user.
global_stat_dict = {} # Dictionary of global statistics.
output_filepath_list = [] # List of output file paths.
trans_log_filepath_list = [] # List of transcription log file paths.
for file in file_list:
filename = file.split('/')[-1] # Get the filename from the file.
filepath = os.path.dirname(file) # Get the file path from the file.
# Read the file into a Pandas DataFrame depending on its file format.
if filename.endswith('.xlsx'):
df = pd.read_excel(file, index_col=0)
output_filename = f"{filename[:-5]}" + "_TMcoded.xlsx"
elif filename.endswith('.csv'):
df = pd.read_csv(file, index_col=0, error_bad_lines=False)
output_filename = f"{filename[:-4]}" + "_TMcoded.xlsx"
else:
raise gr.Error(f"{file} format is wrong")
# Remove the "Copy of" prefix from the output filename, if present.
if output_filename.startswith("Copy of "):
output_filename = output_filename[8:]
# Remove the word "_Transcript" from the output filename, if present.
if '_Transcript' in output_filename:
# print("before: "+output_filename)
error_message.append("before: "+output_filename)
output_filename = ''.join(output_filename.split('_Transcript'))
# print("after: "+output_filename)
error_message.append("after: "+output_filename)
# Construct the output file and transcription log file paths.
output_filepath = os.path.join(filepath, output_filename)
trans_log_filepath = os.path.join(filepath, f"{output_filename}"+ ".log")
# Open the transcription log file for writing.
with open(trans_log_filepath, "w") as outfile:
sub_cnt_in_file = 0
empty_speaker_cnt_in_file = 0
turn_skipped_in_file = 0
turn_skipped_speaker_switch_in_file = 0
snt_mark_skip_in_file = 0
snt_skipped_in_file = 0
chat_flag_in_speaker_time_line = 0
chat_flag_in_content_line = 0
all_inaudible_in_file = 0
all_bracket_in_file = 0
all_snts_in_file = 0
all_token_cnt_in_file = 0
#index Timecode Duration Speaker Dialogue Annotations Error Type
#1 00:00:05:04 - 00:00:07:12 00:00:02:08 Tutor Did you... How was your Halloween?
turns = []
time_stamps = []
speakers = []
chat_flags = []
sentences = []
snt_ids = []
## parse the df flexibly: find key column names which might vary dependign on transcript source
# set all column names to lowercase
df.columns = map(str.lower, df.columns)
# several possibilities for column names, detect which are present
uttID_keys = ['utt','seg','utt_id','seg_id','index']
speaker_keys = ['speaker']
start_keys=['start_sec','start','start_time','timestart']
end_keys=['end_sec','end','end_time','timeend']
timestamp_keys = ['timecode','timestamp']
content_keys=['dialogue','utterance','transcript','text']
# detect which is used in this df
uttID_key = next((key for key in uttID_keys if key in df.columns), None)
speaker_key = next((key for key in speaker_keys if key in df.columns), None)
content_key = next((key for key in content_keys if key in df.columns), None)
# check if separate start and end times are present, otherwise assume single timecode column
if any(df.columns.isin(start_keys)):
start_key = next((key for key in start_keys if key in df.columns), None)
end_key = next((key for key in end_keys if key in df.columns), None)
time_format = 'seconds'
if convert_to_timestamp:
# convert to timestamp format HH:MM:SS.sss - HH:MM:SS.sss
df['timecode'] = df.apply(lambda x: f"{sec_to_HHMMSS(x[start_key])} - {sec_to_HHMMSS(x[end_key])}", axis=1)
timestamp_key='timecode'
time_format = 'timestamp'
else:
timestamp_key=next((key for key in timestamp_keys if key in df.columns), None)
time_format = 'timestamp'
# Turn started with 1, the same as molly's transcripts
for i, row in df.iterrows():
turn = row[uttID_key] if uttID_key else i+1
speaker = row[speaker_key]
time_str = row[timestamp_key]
content = "" if pd.isna(row[content_key]) else row[content_key].strip("\n")
# when speaker is empty, use the previous speaker
if speaker == "":
if speakers:
speaker = speakers[-1]
empty_speaker_cnt_in_file += 1
outfile.write(f"{turn}: found empty speaker, use the speaker in previous turn: {speaker}\n")
else:
raise gr.Error(f"{row}, the first turn is empty speaker")
# clean after the sentence tokenize
snts = sent_tokenize(content)
all_snts_in_file += len(snts)
snt_skipped_in_turn = 0
for i, snt in enumerate(snts):
remove_flag = False
inaudible_search = re.findall(bracket_re, snt)
if inaudible_search:
all_inaudible_in_file += len(inaudible_search)
outfile.write(f"{turn}, {inaudible_search}, inaudible found in snt: {snt}\n")
all_bracket_search = re.findall(all_bracket_re, snt)
if all_bracket_search:
all_bracket_in_file += len(all_bracket_search)
outfile.write(f"{turn}, {all_bracket_search} bracket found in snt: {snt}\n")
# only remove the [inaudible xxx] when it is the whole sentence.
inaudible_match = re.fullmatch(bracket_re, snt)
if inaudible_match:
if do_keep_context_switch:
# if keep context switch
if speakers and speaker == speakers[-1]:
# share the same speaker, no context switching, just remove it
remove_flag = True
else:
# different speakers, it is the context switching.
if len(snts) == 1:
# current empty sentence is the only single sentence
remove_flag = False
else:
if i != len(snts)-1:
# current empty utterance is not the last one, just delete it
remove_flag = True
else:
# current empty utterance is the last one, keep it.
if snt_skipped_in_turn == len(snts)-1:
# all previous snts are empty, then keep this to not skip the whole turn
remove_flag = False
else:
remove_flag = True
else:
# if not keep context switch, then simply remove all empty utterance
remove_flag = True
# If remove_flag is true:
if remove_flag:
# Increment sub_cnt_in_file and snt_mark_skip_in_file
sub_cnt_in_file += 1
snt_mark_skip_in_file += 1
# Write the following message to outfile:
outfile.write(f"{turn}, sub happend: {snt}, skip this sentence\n")
# If do_remove_inaudible is true:
if do_remove_inaudible:
snt_skipped_in_file += 1
snt_skipped_in_turn += 1
continue
# Add to pd:
# Append turn to turns list
turns.append(turn)
# Set snt_id to the string f"{turn}.{i}"
snt_id = f"{turn}.{i}"
# Append time_str to time_stamps list
time_stamps.append(time_str)
# Append speaker to speakers list
speakers.append(speaker)
# Set sentence to the string representation of snt, with whitespace removed from the start and end
sentence = str(snt).strip().rstrip("\n")
# Calculate the number of tokens in sentence and add to all_token_cnt_in_file
token_cnt = len(nltk.word_tokenize(sentence))
all_token_cnt_in_file += token_cnt
# Append snt_id to snt_ids list
snt_ids.append(snt_id)
# Append sentence to sentences list
sentences.append(sentence)
if snt_skipped_in_turn == len(snts):
# all snts in turn are skiped, then skip the turn
turn_skipped_in_file += 1
if (speakers and speaker != speakers[-1]) or not speakers:
turn_skipped_speaker_switch_in_file += 1
outfile.write(f"{turn}, since all snts are empty, skip this whole turn {content}\n")
# Create a new DataFrame with the following columns:
new_df = pd.DataFrame({
"Sentence_ID": snt_ids, # A
"TimeStamp": time_stamps, #B
"Turn" : turns, #C
"Speaker" : speakers, #D
"Sentence" : sentences #E
})
# assert turn_skipped_speaker_switch_in_file==0, "Some speaker switch turn skipped"
new_df["Teacher_TM"] = None #F
new_df["Student_TM"] = None #G
# write new_df to xlsx file
new_df.to_excel(output_filepath, index=False)
# https://openpyxl.readthedocs.io/en/latest/api/openpyxl.utils.dataframe.html#openpyxl.utils.dataframe.dataframe_to_rows
wb = Workbook()
ws = wb.active
teacher_dv = DataValidation(type="list", formula1='",1-None,2-Keep-Together,3-Getting-Student-to-Relate,4-Restating,5-Revoicing,6-Context,7-Press-for-Accuracy,8-Press-for-Reasoning"', allow_blank=True)
student_dv = DataValidation(type="list", formula1='",1-None,2-Relate-to-Another-Student,3-Asking-for-More-info,4-Making-a-Claim,5-Providing-Evidence/Reasoning"', allow_blank=True)
ws.add_data_validation(teacher_dv)
ws.add_data_validation(student_dv)
teacher_dv.add('F2:F1048576')
student_dv.add('G2:G1048576')
for r in dataframe_to_rows(new_df, index=False, header=True):
ws.append(r)
wb.save(output_filepath)
stat_dict = {
"chat_flag_in_speaker_time_line": chat_flag_in_speaker_time_line,
"chat_flag_in_content_line": chat_flag_in_content_line,
"empty_speaker_cnt_in_file": empty_speaker_cnt_in_file,
"ori_total_turn": df.shape[0],
"ori_total_snt": all_snts_in_file,
"turn_skipped": turn_skipped_in_file,
"turn_skipped_speaker_switch_in_file": turn_skipped_speaker_switch_in_file,
"snt_skipped": snt_skipped_in_file,
"remaining_snt": all_snts_in_file - snt_skipped_in_file,
"all_token_cnt_in_file": all_token_cnt_in_file,
"avg_token_cnt_per_snt": all_token_cnt_in_file/(all_snts_in_file - snt_skipped_in_file),
"sub_cnt_in_file": sub_cnt_in_file,
"all_inaudible_in_file": all_inaudible_in_file,
"all_bracket_in_file": all_bracket_in_file,
"other_bracket_in_file": all_bracket_in_file - all_inaudible_in_file
}
if all_inaudible_in_file != all_bracket_in_file:
# print(f"{filename} has special brakets")
error_message.append(f"Warning: {filename} has special brakets")
for k, v in stat_dict.items():
global_stat_dict[k] = global_stat_dict.get(k,0) + v
outfile.write(f"{output_filepath}, {json.dumps(stat_dict, indent=4)}")
output_filepath_list.append(output_filepath)
trans_log_filepath_list.append(trans_log_filepath)
for k, v in global_stat_dict.items():
if "avg" in k:
global_stat_dict[k] = global_stat_dict[k]/len(file_list)
global_log_filepath = os.path.join(filepath, "global_transfer"+ ".log")
with open(global_log_filepath, "w") as outfile:
outfile.write(f"global_stat_dict: {json.dumps(global_stat_dict, indent=4)}")
# error_check
if global_stat_dict["all_inaudible_in_file"] != global_stat_dict["all_bracket_in_file"]:
error_message.append("Error: 'all_inaudible_in_file' does not match 'all_bracket_in_file'")
if global_stat_dict["other_bracket_in_file"] != 0:
error_message.append("Error: 'other_bracket_in_file' is not zero")
return output_filepath_list, trans_log_filepath_list, error_message, global_log_filepath
def add_CPS_columns(df):
# Observation Instructions CONST_SharesU_Situation CONST_SharesU_CorrectSolutions CONST_SharesU_IncorrectSolutions CONST_EstablishesCG_Confirms CONST_EstablishesCG_Interrupts NEG_Responds_Reasons NEG_Responds_QuestionsOthers NEG_Responds_Responds MAINTAIN_Initiative_Criticizes NEG_MonitorsE_Results NEG_MonitorsE_GivingUp NEG_MonitorsE_Strategizes NEG_MonitorsE_Save MAINTAIN_Initiative_Suggestions MAINTAIN_Initiative_Compliments MAINTAIN_FulfillsR_InitiatesOffTopic MAINTAIN_FulfillsR_JoinsOffTopic MAINTAIN_FulfillsR_Support MAINTAIN_FulfillsR_Apologizes Notes
annotation_columns = ['Observation','Instructions', 'CONST_SharesU_Situation', 'CONST_SharesU_CorrectSolutions', 'CONST_SharesU_IncorrectSolutions', 'CONST_EstablishesCG_Confirms', 'CONST_EstablishesCG_Interrupts', 'NEG_Responds_Reasons', 'NEG_Responds_QuestionsOthers', 'NEG_Responds_Responds', 'MAINTAIN_Initiative_Criticizes', 'NEG_MonitorsE_Results', 'NEG_MonitorsE_GivingUp', 'NEG_MonitorsE_Strategizes', 'NEG_MonitorsE_Save', 'MAINTAIN_Initiative_Suggestions', 'MAINTAIN_Initiative_Compliments', 'MAINTAIN_FulfillsR_InitiatesOffTopic', 'MAINTAIN_FulfillsR_JoinsOffTopic', 'MAINTAIN_FulfillsR_Support', 'MAINTAIN_FulfillsR_Apologizes', 'Notes']
# add these columns to the end of the df in this order
for col in annotation_columns:
df[col]=''
return df
def add_TM_columns(df):
annotation_columns = ['Teacher_TM', 'Student_TM']
# add these columns to the end of the df in this order
for col in annotation_columns:
df[col]=''
return df
def convert_transcript_for_annotation(file, annotation_scheme=None):
"""Convert transcript for annotation:
Input standard csv transcript file
Output will have separate start and end timestamps in HH:MM:SS.sss format
Filename column will infer the video filename from the transcript filename
Columns for CPS annotators are added
"""
filename,ext = os.path.splitext(os.path.basename(file)) # Get the filename from the file.
filepath = os.path.dirname(file) # Get the file path from the file.
# Read the file into a Pandas DataFrame depending on its file format.
try:
table = parse_label_csv(file)
media_filename = get_sessname_from_filename(filename)
out_df=table.copy()
out_df['recordingID']=media_filename
out_df['TimeStart']=out_df['start_sec'].apply(sec_to_HHMMSS)
out_df['TimeEnd']=out_df['end_sec'].apply(sec_to_HHMMSS)
out_df=out_df[['speaker','TimeStart','TimeEnd','utterance','recordingID','uttID']]
if annotation_scheme=='CPS':
out_df=add_CPS_columns(out_df)
output_file = os.path.join(filepath, f"CPS_{filename}.xlsx")
out_df.to_excel(output_file, index=False)
elif annotation_scheme=='TM':
out_df=add_TM_columns(out_df)
output_file = os.path.join(filepath, f"TM_{filename}.xlsx")
out_df.to_excel(output_file, index=False)
else:
output_file = os.path.join(filepath, f"{filename}.xlsx")
out_df.to_excel(output_file, index=False)
return output_file
except Exception as e:
raise gr.Error(f"{filename}: error {e}")
def sec_to_HHMMSS(seconds):
"""Get timestamp string from seconds."""
seconds = float(seconds)
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
h=int(h)
m=int(m)
return f"{h:02d}:{m:02d}:{s:06.3f}"
def readELANtsv(file, fmt=None):
with open(file,'r',newline='') as in_file:
reader = csv.reader(in_file, delimiter="\t", quoting=csv.QUOTE_NONE)
skiprows=0
row=next(reader)
while not len(row)>=4: # 4 being the min numbert of cols ELAN exports have
skiprows+=1
row=next(reader)
in_file.seek(skiprows)
if skiprows>0:
print(f'Detected {skiprows} header rows to skip')
reader = csv.reader(in_file, delimiter="\t")
for _ in range(skiprows):
next(reader)
labels = [] # transcript with speaker labels and timestamp in sec
for i,utt in enumerate(reader):
if not ''.join(utt).strip(): # skip blank lines
continue
try:
if len(utt) == 5: # IF data comes straight from ELAN sometimes there is a superfluous blank column 2
if i==0:
print('detected extra blank column in first row, will remove')
if fmt=='AUG23':
if i==0:
print('detected extra blank 1st column, will remove')
_,speaker,start_HHMMSS,end_HHMMSS,utterance= utt
convert_timestamps=True
else:
if i==0:
print('detected extra blank 2nd column, will remove')
speaker,_,start_HHMMSS, end_HHMMSS, utterance = utt
convert_timestamps=True
elif len(utt) == 4: # sometimes the blank col is already removed
if i==0:
print('detected 4 columns, assuming: speaker,start_HHMMSS, end_HHMMSS, utterance ')
speaker,start_HHMMSS, end_HHMMSS, utterance = utt
convert_timestamps=True
elif len(utt) == 6: # New one from 2023 Aug has a redundant extra start col!?
if i==0:
print('detected 6 columns, assuming: _,speaker,start_HHMMSS, end_HHMMSS, utterance,_ ')
_,speaker,start_HHMMSS,end_HHMMSS,utterance,_ = utt
convert_timestamps=True
elif len(utt) == 9: # 2023 transcribers tend to give full elan output
if i==0:
print('detected 9 columns, assuming: speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance ')
speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance = utt
convert_timestamps=True
elif len(utt) == 10: # sometimes an extra blank column appears at the end
if i==0:
print('detected 10 columns, assuming: speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance,_ ')
speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance,_ = utt
convert_timestamps=True
elif len(utt) == 12: # WOw how many redundant columns can ELAN make...
if i==0:
print('detected 12 columns, assuming: speaker,_,start_HHMMSS,_,_,end_HHMMSS,_,_,_,_,_,utterance ')
speaker,_,start_HHMMSS,_,_,end_HHMMSS,_,_,_,_,_,utterance = utt
convert_timestamps=True
else:
raise ValueError(f'Unknown transcript format with {len(utt)} columns for {file}')
except BaseException as err:
print(f'!!! transcript parse error on line {i} for {file}')
print(utt)
raise err
if convert_timestamps:
start_sec = HHMMSS_to_sec(start_HHMMSS)
end_sec = HHMMSS_to_sec(end_HHMMSS)
labels.append((speaker, utterance, start_sec,end_sec))
labels= pd.DataFrame(labels, columns = ('speaker', 'utterance', 'start_sec','end_sec'))
labels.sort_values(by='start_sec', inplace=True, ignore_index=True)
labels.reset_index(inplace=True)
labels = labels.rename(columns = {'index':'seg'})
return(labels)
def merge_ellipsis(seg_labels):
# merge utterances with ellipsis
# input is seg_labels format: [optional index] speaker, utterance, start_sec, end_sec
if isinstance(seg_labels,str) and seg_labels.endswith(('.csv','.tsv','.txt')):
df=pd.read_csv(seg_labels)
elif isinstance(seg_labels, pd.DataFrame):
df=seg_labels
else:
raise ValueError('input seg_labels should be path to csv or pd.DataFrame')
if len(df.columns)==4:
# no seg index yet
df.reset_index(inplace=True)
df = df.rename(columns = {'index':'seg'})
elif len(df.columns)==5:
# first col is seg
df = df.rename(columns = {df.columns[0]:'seg'})
else:
raise ValueError('input seg_labels should have 4 or 5 columns')
df2=[]
prev_spk=None
prev_utt=""
prev_start=0
prev_end=0
segs=[0]
merge_utt={"seg":None, "speaker":None,"utterance":None,"start_sec":None, "end_sec":None}
for i,row in df.iterrows():
if i==0:
merge_utt=row
else:
# if same speaker as last and ellipsis
if merge_utt["speaker"]==row["speaker"] and str(merge_utt["utterance"]).endswith('...') and str(row["utterance"]).startswith('...'):
# append current to temporary merged utt: use prev_ items
merge_utt["utterance"]+=str(row["utterance"])
merge_utt["end_sec"]=row["end_sec"]
segs.append(row["seg"])
else:
# append merge_utt to df2
merge_utt["seg"]=segs
df2.append(merge_utt)
# clear merge_utt and set to current
merge_utt=row
segs=[merge_utt["seg"]]
merge_utt["seg"]=segs
# if not isinstance(merge_utt["seg"],list):
# merge_utt["seg"]=list(segs)
df2.append(merge_utt) # catch final merge_utt if not terminated
df2=pd.DataFrame(df2)
df2['utterance']=df2['utterance'].str.replace('\\.+',' ', regex=True)
# clear up "......"
# enumerate utterances
df2.reset_index(inplace=True,drop=True)
df2 = df2.reset_index().rename(columns = {'index':'utt'})
return df2
def add_dummy_seg_column(table):
# adds a dummy seg column (listing segments comprising utterance) for a df without this column
# labelfiles generated from merge_ellipsis have an 'utt' column giving utterance ID, and a seg column
# containing a list of original segments comprising each utterance
# but you may need all label files top have the exact same format even if they weren't produced by
# merge_ellipsis()
# returns a table with columns 'utt' and 'seg'
if 'seg' in table.columns.tolist():
print('\'seg\' column already exists, not changing anything')
return table
if 'uttID' in table.columns.tolist():
table=table.rename(columns={"uttID":"utt"})
if not 'utt' in table.columns.tolist():
table['utt']=table.index
table['seg']=[[u] for u in table['utt']]
table=table[['utt','seg','speaker','start_sec','end_sec','utterance']]
return table
def get_sessname_from_filename(filename):
sessname=Path(filename).stem
sessname = re.sub('reworked-transcript-diarized-timestamped-', '', sessname,flags=re.I)
sessname = re.sub('reworked_transcript-diarized-timestamped-', '', sessname,flags=re.I)
sessname = re.sub('reworked-diarized-timestamped-', '', sessname,flags=re.I)
sessname = re.sub('reworked_timestamped_', '', sessname,flags=re.I)
sessname = re.sub('reworked_', '', sessname,flags=re.I)
sessname = re.sub('reworked-', '', sessname,flags=re.I)
sessname = re.sub('transcript_diarized_timestamped_', '', sessname,flags=re.I)
sessname = re.sub('transcript-diarized-timestamped_', '', sessname,flags=re.I)
sessname = re.sub('transcript-diarized-timestamped-', '', sessname,flags=re.I)
sessname = re.sub('_transcript', '', sessname,flags=re.I)
sessname = re.sub('_tmcoded', '', sessname,flags=re.I)
sessname = re.sub('utt_labels_', '', sessname,flags=re.I)
sessname = re.sub('seg_labels_', '', sessname,flags=re.I)
sessname = re.sub('_redacted', '', sessname,flags=re.I)
return sessname
def ELAN_to_labels_csv(ELANfile, merge_segments = True):
# dumb but effective string wrangling to get sess name
sessname=get_sessname_from_filename(ELANfile)
# reads ELAN output to pd.DataFrame in a unified format
labels=readELANtsv(ELANfile)
if merge_segments:
save_file=f'utt_labels_{sessname}.csv'
# merge segments to form utterances where there have been splits separated by '...'
merged_labels=merge_ellipsis(labels)
merged_labels.to_csv(save_file,index=False, float_format='%.3f')
else:
save_file=f'seg_labels_{sessname}.csv'
labels.to_csv(save_file,index=False, float_format='%.3f')
return save_file
def parse_label_csv(label_csv:str):
# utt_labels_csv is the usual format used for diarized, timed transcripts in this repo
# There are several versions with differnt columns (with/without segment &/ utterance index,
# withouot column headers etc)
# table:
# [uttID, speaker, transcript, start_sec, end_sec]
table = pd.read_csv(label_csv,keep_default_na=False, header=None)
row0=table.iloc[0]
is_header = not any(str(cell).replace('.','').isdigit() for cell in row0)
if is_header:
table.columns=row0.tolist()
table=table.iloc[1:]
table=table.reset_index(drop=True)
else:
if len(table.columns)==4:
print('no header detected, assuming annotation file has columns [speaker,utterance,start_sec, end_sec] ')
table.columns=['speaker','utterance','start_sec', 'end_sec']
elif len(table.columns)==5:
print('no header detected, assuming annotation file has columns [seg,speaker,utterance,start_sec, end_sec] ')
table.columns=['seg','speaker','utterance','start_sec', 'end_sec']
elif len(table.columns)==6:
print('no header detected, assuming annotation file has columns [utt,seg,speaker,utterance,start_sec, end_sec] ')
table.columns=['utt','seg','speaker','utterance','start_sec', 'end_sec']
else:
print(f'no header detected, csv has {len(table.columns)} columns, could not determine column names.')
return None
# choose which column to use for uttID in table
if 'utt' in table.columns.tolist():
table=table.rename(columns={"utt":"uttID"}).drop('seg', axis=1)
elif 'seg' in table.columns.tolist():
table=table.rename(columns={"seg":"uttID"})
else:
table=table.reset_index().rename(columns={"index":"uttID"})
table=table[['uttID','speaker','utterance','start_sec','end_sec']]
return table
def deidentify_speaker(df, who='all'):
"""replace speaker ID with generic labels
in order of appearance (speaker1, speaker2)'
if who is "student", only student names are replaced
Args:
df (_type_): _description_
who (str, optional): 'all','student'. Which names to replace. Defaults to 'all'.
"""
colnames = df.columns.tolist()
speaker_key = next((key for key in ['speaker','Speaker','speaker_id','Speaker_ID'] if key in colnames),None)
if not speaker_key:
raise ValueError('No speaker column found in dataframe!')
speakers = df[speaker_key].unique()
if who=='student':
# detect student. ID format can be student_xxx or 00-0000 numeric
speakers = [s for s in speakers if ('student' in s.lower() or re.match(r'^\d{2}-\d{4}$',s))]
generic_speakers = [f'student_{i+1}' for i in range(len(speakers))]
else:
generic_speakers = [f'speaker_{i+1}' for i in range(len(speakers))]
speaker_dict = dict(zip(speakers, generic_speakers))
df[speaker_key] = df[speaker_key].replace(speaker_dict)
return df