rosyvs
Add transcript sorting and merging tool for xlsx or csv input t
5d0f90f
import os
import re
import threading
import time
from pathlib import Path
import random
import gradio as gr
from utils import (HHMMSS_to_sec, convert_and_trim_video,
table_to_ELAN_tsv, parse_label_csv,
xlsx_to_table, merge_ellipsis,
convert_transcript_for_TM, convert_transcript_for_annotation,
table_to_ELAN_tsv, ELAN_to_labels_csv, deidentify_speaker)
def delete_files(files):
time.sleep(300)
for file in files:
try:
os.remove(file)
except FileNotFoundError:
print(f"File {file} not found for deletion.")
pass
print("...files deleted")
def set_output_file(input_file, output_format, folder, insert_string = 'trimmed'):
# Set output file name and extension
if not os.path.exists(folder):
os.makedirs(folder)
file_name = f"{Path(input_file.name).stem.partition('.')[0]}_{insert_string}.{output_format}"
output_file = os.path.join(folder, file_name)
print(f"...set output file: {output_file}")
return output_file
def trim_video_helper(input_file, output_file, start_time, end_time):
if not start_time:
start_time = 0
end_time = 300
elif start_time and not end_time:
end_time = 300 + HHMMSS_to_sec(time_str=start_time)
print("...start time (s): ", start_time)
print("...end time (s): ", end_time)
# Trim the video
print("...start trimming")
output_file = convert_and_trim_video(input_file.name, output_file, start_time, end_time)
if not output_file:
print("...trimming failed due to FFMPEG error")
return None
print("...finished trimming")
return output_file
def convert_video_helper(input_file, output_file, output_format):
# convert video
output_file = convert_and_trim_video(input_file.name, output_file)
if not output_file:
print("...converting failed due to FFMPEG error")
return None
print("...finished converting")
return output_file
def convert_transcript_helper(input_transcript, output_transcript):
# convert transcript
table = xlsx_to_table(xl_file=input_transcript)
print("...parsed transcript to table")
output_file = table_to_ELAN_tsv(table, output_transcript)
print("...finished converting transcript")
return output_file
def trim_video_vtr(input_file, output_format):
print(f"BEGIN TASK: trimming {input_file} to 10-minute interval with random start time")
# trim video to 10-min interval with a random selected start time
try:
# randomly select start time
start_time = random.randint(300, 900)
end_time = start_time + 600 # since 10 minutes
audio_base_name = input_file.name.split("/")[-1].split(".")[-2]
print("...audio_base_name: ", audio_base_name)
# set output file
insert_string = f"start{start_time}_end{end_time}"
output_folder = f"{os.getcwd()}/results/"
output_file = set_output_file(input_file, output_format, output_folder, insert_string)
# write the start time, and end time to a txt file
time_file = f"{os.getcwd()}/results/{audio_base_name}_start_end_time.txt"
print("time_file: ", time_file)
with open(time_file, "w") as f:
f.write(f"{start_time}\n")
f.write(f"{end_time}\n")
# Trim the video
output_file = trim_video_helper(input_file, output_file, start_time, end_time)
if not output_file:
gr.Error(f"Error: FFMPEG failed to trim the video.")
return None, None
# delete threading
print("Done trimming. Deleting files...")
path_to_delete = [input_file.name, output_file, time_file]
threading.Thread(target=delete_files, args=([path_to_delete])).start()
return output_file, time_file
except Exception as e:
gr.Error(f"Error: {str(e)}")
return f"Error: {e}"
def trim_video_wt(input_file, input_transcript, output_format, start_time, end_time):
print(f"BEGIN TASK: trimming {input_file} with transcript {input_transcript} from {start_time} to {end_time}")
# trim video with transcript
try:
# set output file
output_folder = f"{os.getcwd()}/results/"
output_file = set_output_file(input_file, output_format, output_folder)
output_transcript = set_output_file(input_transcript, "tsv", output_folder)
# Trim the video
output_file = trim_video_helper(input_file, output_file, start_time, end_time)
if not output_file:
gr.Error(f"Error: FFMPEG failed to trim the video.")
return None, None
# convert transcript
path = input_transcript.name
output_transcript = convert_transcript_helper(path, output_transcript)
# output_transcript = output_file
# remove file after 10 minutes for security
print("Done trimming. Deleting files...")
path_to_delete = [input_file.name, input_transcript.name, output_file, output_transcript]
threading.Thread(target=delete_files, args=([path_to_delete])).start()
return output_file, output_transcript
except Exception as e:
gr.Error(f"Error: {str(e)}")
return f"Error: {str(e)}"
def trim_video(input_file, output_format, start_time, end_time):
print(f"\nBEGIN TASK: trimming {input_file} from {start_time} to {end_time}")
try:
# Set output file
output_folder = f"{os.getcwd()}/results/"
output_file = set_output_file(input_file, output_format, output_folder)
# Trim the video
output_file = trim_video_helper(input_file, output_file, start_time, end_time)
if not output_file:
gr.Error(f"Error: FFMPEG failed to trim the video.")
return None
# Remove files after 10 minutes for security
print("Done trimming. Deleting files...")
path_to_delete = [input_file.name, output_file]
threading.Thread(target=delete_files, args=([path_to_delete])).start()
return output_file
except Exception as e:
gr.Error(f"Error: {str(e)}")
return f"Error: {str(e)}"
def convert_video(input_file, output_format):
print(f"\nBEGIN TASK: converting {input_file} to {output_format}")
try:
# Set output file
output_folder = f"{os.getcwd()}/results/"
output_file = set_output_file(input_file, output_format, output_folder, \
insert_string = 'converted')
# Convert video
output_file = convert_video_helper(input_file, output_file, output_format)
if not output_file:
gr.Error(f"Error: FFMPEG failed to convert the video.")
return None
print(f"...created output file: {output_file}")
# remove file after 10 minutes for security
print("Done converting. Deleting files...")
path_to_delete = [input_file.name, output_file]
threading.Thread(target=delete_files, args=([path_to_delete])).start()
return output_file
except Exception as e:
gr.Error(f"Error: {str(e)}")
return f"Error: {str(e)}"
def delete_files(output_filepath_list, trans_log_filepath_list, global_log_filepath):
for output_filepath in output_filepath_list:
try:
os.remove(output_filepath)
except FileNotFoundError:
pass
for trans_log_filepath in trans_log_filepath_list:
try:
os.remove(trans_log_filepath)
except FileNotFoundError:
pass
try:
os.remove(global_log_filepath)
except FileNotFoundError:
pass
print("Files deleted")
def delete_files_thread(output_filepath_list, trans_log_filepath_list, global_log_filepath):
print("Thread started")
time.sleep(20)
delete_files(output_filepath_list, trans_log_filepath_list, global_log_filepath)
def convert_xlsx_to_TMxlsx(input_file_list):
file_list = [file.name for file in input_file_list]
output_filepath_list, trans_log_filepath_list, error_check, global_transfer_log_path = convert_transcript_for_TM(file_list=file_list)
if not error_check:
error_check = "No errors found."
delete_thread = threading.Thread(target=delete_files_thread, args=(output_filepath_list, trans_log_filepath_list, global_transfer_log_path))
delete_thread.start()
return output_filepath_list, trans_log_filepath_list, global_transfer_log_path, error_check
def convert_for_annotation(input_file_list, annotation_scheme):
output_files=[]
for input_transcript in input_file_list:
print("start converting transcript")
output_file = convert_transcript_for_annotation(file=input_transcript, annotation_scheme=annotation_scheme)
print("finished converting transcript to xlsx for annotation")
output_files.append(output_file)
return output_files
def convert_xlsx_to_ELANtsv(input_file_list):
output_files=[]
for input_transcript in input_file_list:
# convert transcript
print("start converting transcript")
table = old_xlsx_to_table(xl_file=input_transcript)
print("finished converting transcript to table")
output_transcript = input_transcript.replace('.xlsx', '.tsv')
output_file = table_to_ELAN_tsv(table, output_transcript)
print("saved table to tsv")
output_files.append(output_file)
return output_files
def sort_and_merge(input_file_list, merge_on_ellipsis=False):
# simply load a csv file using parse_label_csv, then merge the segments on ellipsis
# and save to a new file
output_files=[]
for input_transcript in input_file_list:
# convert transcript
# if is excel then use xlsx_to_table
if input_transcript.endswith('.xlsx') or input_transcript.endswith('.xls'):
print("...input is xlsx")
table = xlsx_to_table(xl_file=input_transcript)
input_transcript = input_transcript.replace('.xlsx', '.csv')
elif input_transcript.endswith('.csv') or input_transcript.endswith('.txt') or input_transcript.endswith('.tsv'):
print("...input is csv, txt, or tsv")
table = parse_label_csv(input_transcript)
else:
print(f"...input {input_transcript} is not a supported file type")
continue
table = table.sort_values(by=['start_sec'])
if merge_on_ellipsis:
table = merge_ellipsis(table)
print("finished sorting and merging segments")
# make filename
if 'seg_labels' in input_transcript:
output_file= input_transcript.replace('seg_labels', 'utt_labels')
elif 'seglabels' in input_transcript:
output_file= input_transcript.replace('seglabels', 'utt_labels')
else:
# prepend it to the filename (but it could be a path so be careful)
output_file_base = os.path.basename(input_transcript)
output_file = os.path.join(os.path.dirname(input_transcript), f"utt_labels_{output_file_base}")
else:
print("finished sorting segments")
# make filename
output_file = input_transcript.replace('.csv', '_sorted.csv')
# save to csv
table.to_csv(output_file, index=False)
print("saved processed transcript to csv")
output_files.append(output_file)
return output_files
#TODO: support sort and merge for XLSX output if this is needed
def convert_ELANtsv_to_CSV(input_file_list, merge_on_ellipsis=False):
output_files=[]
for input_transcript in input_file_list:
# convert transcript
print("start converting transcript")
output_transcript = input_transcript.replace('.tsv', '.csv')
output_file = ELAN_to_labels_csv(input_transcript, merge_segments = merge_on_ellipsis)
print("finish converting transcript")
output_files.append(output_file)
return output_files
# TODO: XLSX to csv (seg_labels or utt_labels)
def convert_xlsx_to_csv(input_file_list, merge_on_ellipsis=False):
output_files=[]
for input_transcript in input_file_list:
# read xl file to table
# write table to csv with option to merge segments on ellipsis
output_transcript = input_transcript.replace('.xlsx', '.csv')
output_file = old_xlsx_to_labels_csv(input_transcript, merge_segments = merge_on_ellipsis)
output_files.append(output_file)
return output_files
def deidentify_transcripts(input_file_list, who='student'):
output_files=[]
for file in input_file_list:
basename = os.path.basename(file)
ext = file.split('.')[-1]
if file.endswith('.xlsx') or file.endswith('.xls'):
df = pd.read_excel(file)
elif file.endswith('.csv'):
df = pd.read_csv(file)
elif file.endswith('.tsv'):
df = pd.read_csv(file, sep='\t')
elif file.endswith('.txt'):
df = pd.read_csv(file, sep='\t')
else:
gr.Warning("File type not supported (must be .xlsx, .xls, .csv, .tsv, or .txt)")
try:
df = deidentify_speaker(df, who=who)
except ValueError as e:
gr.Warning(f"{e}: {basename} ")
continue
output_file = file.replace(f'.{ext}', f'_deidentified.{ext}')
if ext == 'xlsx' or ext == 'xls':
df.to_excel(output_file, index=False)
elif ext == 'csv':
df.to_csv(output_file, index=False)
elif ext == 'tsv' or ext == 'txt':
df.to_csv(output_file, sep='\t', index=False)
output_files.append(output_file)
return output_files
###### GRADIO INTERFACE ######
# gr components for video trimmer
input_file = gr.File(label="Select video file")
output_format = gr.Dropdown(choices=["mkv", "MOV", "mp4", "wav"], label="Select output format", value="mp4", )
start_time = gr.Textbox(label="Start time (in seconds or HH:MM:SS). Leave blank to start at beginning.")
end_time = gr.Textbox(label="End time (in seconds or HH:MM:SS). Leave blank to trim a 5-minute interval since start.")
output_file = gr.File(label="Download trimmed file")
interface = gr.Interface(fn=trim_video, inputs=[input_file, output_format, start_time, end_time], outputs=output_file, title="Video Trimmer", flagging_mode="never",
description="Trim a video file to a specific time interval. Please wait for the file to upload before clicking the 'Submit' button.")
# gr components for video converter
input_file_c = gr.File(label="Select video file")
output_format_c = gr.Dropdown(choices=["mkv", "MOV", "mp4", "wav"], label="Select output format", value="mp4",)
output_file_c = gr.File(label="Download converted file")
interface_c = gr.Interface(fn=convert_video, inputs=[input_file_c, output_format_c], outputs=output_file_c, title="Video Converter", flagging_mode="never",
description="Convert a video file to a different format. Please wait for the file to upload before clicking the 'Submit' button.")
# gr components for video trimmer with random start
input_file_vtr = gr.File(label="Select video file")
output_format_vtr = gr.Dropdown(choices=["mkv", "MOV", "mp4", "wav"], label="Select output format", value="mp4", )
output_file_vtr = gr.File(label="Download trimmed file")
log_file_vtr = gr.File(label="Download log file")
interface_vtr = gr.Interface(fn=trim_video_vtr, inputs=[input_file_vtr, output_format_vtr],
outputs=[output_file_vtr, log_file_vtr], flagging_mode="never",
title="Video Trimmer with Random Start Time",
description="This app trims a 10-minute interval from a video file. \
The start time is randomly selected between 5 and 15 minutes. \
The log file contains the start time and end time of the trimmed video.",
)
# gr components for video trimmer with transcript
input_file_wt = gr.File(label="Select video file")
input_transcript_wt = gr.File(label="Transcript of the video")
output_format_wt = gr.Dropdown(choices=["mkv", "MOV", "mp4", "wav"], label="Select output format", value="mp4", )
start_time_wt = gr.Textbox(label="Start time (in seconds or HH:MM:SS). Leave blank to start at beginning.")
end_time_wt = gr.Textbox(label="End time (in seconds or HH:MM:SS). Leave blank to trim a 5-minute interval since start.")
output_file_wt = gr.File(label="Download trimmed file")
output_transcript_wt = gr.File(label="Download trimmed transcript")
interface_wt = gr.Interface(fn=trim_video_wt, inputs=[input_file_wt, input_transcript_wt, output_format_wt, start_time_wt, end_time_wt],
outputs=[output_file_wt, output_transcript_wt], title="Video Trimmer with transcript converted", flagging_mode="never",
description="Trim a video file to a specific time interval with transcript format converted. Please wait for the file to upload before clicking the 'Submit' button. \n\
This transcript should be .xlsx files from Happyscribe (an external transcription service). The columns in the file are as follows: \n\
`#`: an integer index over utterances. \n\
`Timecode`: a string in the format `HH:MM:SS:ss - HH:MM:SS:ss` representing the start and end time of the utterance. \n\
`Duration`: a string in the format `HH:MM:SS:ss` representing the duration of the utterance. \n\
`Speaker`: a string representing the speaker of the utterance. \n\
`Dialogue`: a string representing the text of the utterance. \n\
`Annotations`: a string that may be blank, representing any annotations for the utterance. \n\
`Error Type`: a string that may be blank, representing any errors in the transcription of the utterance. ")
#### TRANSCRIPT COMPONENTS ####
# gr components for TM converter
input_xlsx = gr.Files(label="Input XLSX or CSV transcript file", type="filepath", file_types=[".xlsx", ".csv"])
output_xlsx_tm = gr.Files(label="Output XLSX file", type="filepath", file_types=[".xlsx"])
process_log_tm = gr.File(label="Process Log", type="filepath", file_types=[".log", ".txt"] )
global_transfer_log_tm = gr.File(label="Global transfer log", type="filepath", file_types=[".log", ".txt"])
error_check_tm = gr.Textbox(label="Error Check", type="text")
interface_tm = gr.Interface(fn=convert_xlsx_to_TMxlsx,
inputs=input_xlsx,
outputs=[output_xlsx_tm, process_log_tm, global_transfer_log_tm, error_check_tm],
title="transcript-->XLSX+TM",
description="Converts XLSX or csv transcript to XLSX+TM transcript with prefilled dropdown for talkmoves",
live=False,
flagging_mode="never",)
# gr components for xlsx to ELAN
input_x2e = gr.Files(label="Input XLSX or CSV transcript file", type="filepath", file_types=[".xlsx", ".csv"])
output_x2e = gr.Files(label="Output ELAN-compatible tsv file", type="filepath", file_types=[".tsv",'.txt'])
# process_log_x2e = gr.File(label="Process Log", type="filepath", file_types=[".log", ".txt"] )
# global_transfer_log_x2e = gr.File(label="Global transfer log", type="filepath", file_types=[".log", ".txt"])
# error_check_x2e = gr.Textbox(label="Error Check", type="text")
interface_x2e = gr.Interface(fn=convert_xlsx_to_ELANtsv, # TODO: swap out for correct fn
inputs=input_x2e,
outputs=output_x2e,
title="XLSX-->ELAN",
description="Converts XLSX transcript to ELAN-compatible tsv file",
live=False,
flagging_mode="never",)
# gr components for ELAN to CSV
input_e2c = gr.Files(label="Input ELAN-compatible tsv file", type="filepath", file_types=[".tsv",'.txt'])
merge_e2c = gr.Checkbox(label="Merge segments on ellipsis?")
output_e2c = gr.Files(label="Output CSV file", type="filepath", file_types=[".csv"])
interface_e2c = gr.Interface(fn=convert_ELANtsv_to_CSV, # TODO: swap out for correct fn
inputs=[input_e2c, merge_e2c],
outputs=[output_e2c],
title="ELAN-->CSV",
description="Converts ELAN-exported file (.txt or .tsv, tab separated values) to standardized CSV file with rows sorted by segment start time. Optionally merges segments on ellipsis.",
live=False,
flagging_mode="never",)
# gr components for XLSX to CSV
input_x2c = gr.Files(label="Input XLSX file", type="filepath", file_types=[".xlsx", ".csv"])
merge_x2c = gr.Checkbox(label="Merge segments on ellipsis?")
output_x2c = gr.Files(label="Output CSV file", type="filepath", file_types=[".csv"])
interface_x2c = gr.Interface(fn=convert_xlsx_to_csv, # TODO: swap out for correct fn
inputs=[input_x2c, merge_x2c],
outputs=[output_x2c],
title="XLSX-->CSV",
description="Converts old version XLSX transcript (with a single Timecode column) to standardized CSV file with rows sorted by segment start time. Optionally merges segments on ellipsis.",
live=False,
flagging_mode="never",)
# gr components for annotation XLSX
input_c2a = gr.Files(label="Input CSV file", type="filepath", file_types=[".csv"])
annotation_scheme_c2a = gr.Radio(label="Annotation Scheme", choices=[("CPS","CPS"), ("TalkMove","TM"),("None",None)])
output_c2a = gr.Files(label="Output XLSX file", type="filepath", file_types=[".xlsx"])
interface_c2a = gr.Interface(
fn=convert_for_annotation, # TODO: swap out for correct fn
inputs=[input_c2a, annotation_scheme_c2a],
outputs=[output_c2a],
title="CSV-->XLSX+annotation",
description="Converts CSV file to XLSX file for annotation (added columns for CPS or TM or None)",
live=False,
flagging_mode="never",
# submit_btn="Convert"
)
# gr components for deidentification
input_di = gr.Files(label="Input transcript file", type="filepath", file_types=[".xlsx", ".xls",".csv", ".tsv", ".txt"])
who_di = gr.Radio(label="Who to deidentify", choices=[("student","student"), ("all","all")])
output_di = gr.Files(label="Output deidentified transcript file", type="filepath", file_types=[".xlsx", ".xls",".csv", ".tsv", ".txt"])
interface_di = gr.Interface(
fn=deidentify_transcripts,
inputs=[input_di, who_di],
outputs=[output_di],
title="Deidentify",
description="Deidentify speaker labels in a transcript. Compatible with .xlsx, .xls, .csv, .tsv, .txt files with a column containing speaker labels. Will not work if speaker column is missing a header. Speaker names or IDs will be replaced with a deidentified label numbered in order of appearance. Choose whether to deidentify just students or all speakers.",
live=False,
flagging_mode="never",
)
# gr components for transcript sorter
input_file_s = gr.Files(label="Select transcript files", type="filepath", file_types=[".csv", ".xlsx",".xls", ".tsv", ".txt"])
merge_s = gr.Checkbox(label="Merge segments on ellipsis?")
output_file_s = gr.Files(label="Download sorted/merged transcript as .csv", type="filepath", file_types=[".csv"])
interface_s = gr.Interface(fn=sort_and_merge,
inputs=[input_file_s, merge_s],
outputs=output_file_s,
title="Sort+Merge",
description="Sort a transcript file by time, and optionally merge partial utterances on ellipsis. Output is a .csv file in standard format.",
live=False,
flagging_mode="never")
######## LAUNCH APP ########
demo = gr.TabbedInterface(
[
interface_e2c,
interface_x2e,
interface_x2c,
interface_c2a,
interface_tm,
interface_di,
interface_s,
interface_c,
interface,
interface_vtr,
interface_wt
],
[
"๐Ÿ“โ†’๐Ÿ—’๏ธ ELANโ†’CSV",
"โŽโ†’๐Ÿ“ XLSXโ†’ELAN",
"โŽโ†’๐Ÿ—’๏ธ XLSXโ†’CSV",
"๐Ÿ—’๏ธโ†’โŽโ˜ท CSVโ†’XLSX",
"๐Ÿ—’๏ธโ†’โŽ๐Ÿ’ฌ CSVโ†’XLSX+TM",
"๐Ÿ—’๏ธโ†’๐Ÿฅท๐Ÿป Deidentify",
"๐Ÿ—’๏ธ๐Ÿ”€๐Ÿ—’๏ธ Sort+Merge",
"๐ŸŽฅโ†’๐Ÿ“ฝ Convert",
"๐ŸŽฅโœ‚๏ธ Trim",
"๐ŸŽฅโœ‚๏ธ๐ŸŽฒ Trim Random",
"๐ŸŽฅ๐Ÿ—’๏ธโœ‚๏ธ Trim + Transcript"
]
)
demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True)