import os import re import threading import time from pathlib import Path import random import gradio as gr from utils import (HHMMSS_to_sec, convert_and_trim_video, table_to_ELAN_tsv, parse_label_csv, xlsx_to_table, merge_ellipsis, convert_transcript_for_TM, convert_transcript_for_annotation, table_to_ELAN_tsv, ELAN_to_labels_csv, deidentify_speaker) def delete_files(files): time.sleep(300) for file in files: try: os.remove(file) except FileNotFoundError: print(f"File {file} not found for deletion.") pass print("...files deleted") def set_output_file(input_file, output_format, folder, insert_string = 'trimmed'): # Set output file name and extension if not os.path.exists(folder): os.makedirs(folder) file_name = f"{Path(input_file.name).stem.partition('.')[0]}_{insert_string}.{output_format}" output_file = os.path.join(folder, file_name) print(f"...set output file: {output_file}") return output_file def trim_video_helper(input_file, output_file, start_time, end_time): if not start_time: start_time = 0 end_time = 300 elif start_time and not end_time: end_time = 300 + HHMMSS_to_sec(time_str=start_time) print("...start time (s): ", start_time) print("...end time (s): ", end_time) # Trim the video print("...start trimming") output_file = convert_and_trim_video(input_file.name, output_file, start_time, end_time) if not output_file: print("...trimming failed due to FFMPEG error") return None print("...finished trimming") return output_file def convert_video_helper(input_file, output_file, output_format): # convert video output_file = convert_and_trim_video(input_file.name, output_file) if not output_file: print("...converting failed due to FFMPEG error") return None print("...finished converting") return output_file def convert_transcript_helper(input_transcript, output_transcript): # convert transcript table = xlsx_to_table(xl_file=input_transcript) print("...parsed transcript to table") output_file = table_to_ELAN_tsv(table, output_transcript) print("...finished converting transcript") return output_file def trim_video_vtr(input_file, output_format): print(f"BEGIN TASK: trimming {input_file} to 10-minute interval with random start time") # trim video to 10-min interval with a random selected start time try: # randomly select start time start_time = random.randint(300, 900) end_time = start_time + 600 # since 10 minutes audio_base_name = input_file.name.split("/")[-1].split(".")[-2] print("...audio_base_name: ", audio_base_name) # set output file insert_string = f"start{start_time}_end{end_time}" output_folder = f"{os.getcwd()}/results/" output_file = set_output_file(input_file, output_format, output_folder, insert_string) # write the start time, and end time to a txt file time_file = f"{os.getcwd()}/results/{audio_base_name}_start_end_time.txt" print("time_file: ", time_file) with open(time_file, "w") as f: f.write(f"{start_time}\n") f.write(f"{end_time}\n") # Trim the video output_file = trim_video_helper(input_file, output_file, start_time, end_time) if not output_file: gr.Error(f"Error: FFMPEG failed to trim the video.") return None, None # delete threading print("Done trimming. Deleting files...") path_to_delete = [input_file.name, output_file, time_file] threading.Thread(target=delete_files, args=([path_to_delete])).start() return output_file, time_file except Exception as e: gr.Error(f"Error: {str(e)}") return f"Error: {e}" def trim_video_wt(input_file, input_transcript, output_format, start_time, end_time): print(f"BEGIN TASK: trimming {input_file} with transcript {input_transcript} from {start_time} to {end_time}") # trim video with transcript try: # set output file output_folder = f"{os.getcwd()}/results/" output_file = set_output_file(input_file, output_format, output_folder) output_transcript = set_output_file(input_transcript, "tsv", output_folder) # Trim the video output_file = trim_video_helper(input_file, output_file, start_time, end_time) if not output_file: gr.Error(f"Error: FFMPEG failed to trim the video.") return None, None # convert transcript path = input_transcript.name output_transcript = convert_transcript_helper(path, output_transcript) # output_transcript = output_file # remove file after 10 minutes for security print("Done trimming. Deleting files...") path_to_delete = [input_file.name, input_transcript.name, output_file, output_transcript] threading.Thread(target=delete_files, args=([path_to_delete])).start() return output_file, output_transcript except Exception as e: gr.Error(f"Error: {str(e)}") return f"Error: {str(e)}" def trim_video(input_file, output_format, start_time, end_time): print(f"\nBEGIN TASK: trimming {input_file} from {start_time} to {end_time}") try: # Set output file output_folder = f"{os.getcwd()}/results/" output_file = set_output_file(input_file, output_format, output_folder) # Trim the video output_file = trim_video_helper(input_file, output_file, start_time, end_time) if not output_file: gr.Error(f"Error: FFMPEG failed to trim the video.") return None # Remove files after 10 minutes for security print("Done trimming. Deleting files...") path_to_delete = [input_file.name, output_file] threading.Thread(target=delete_files, args=([path_to_delete])).start() return output_file except Exception as e: gr.Error(f"Error: {str(e)}") return f"Error: {str(e)}" def convert_video(input_file, output_format): print(f"\nBEGIN TASK: converting {input_file} to {output_format}") try: # Set output file output_folder = f"{os.getcwd()}/results/" output_file = set_output_file(input_file, output_format, output_folder, \ insert_string = 'converted') # Convert video output_file = convert_video_helper(input_file, output_file, output_format) if not output_file: gr.Error(f"Error: FFMPEG failed to convert the video.") return None print(f"...created output file: {output_file}") # remove file after 10 minutes for security print("Done converting. Deleting files...") path_to_delete = [input_file.name, output_file] threading.Thread(target=delete_files, args=([path_to_delete])).start() return output_file except Exception as e: gr.Error(f"Error: {str(e)}") return f"Error: {str(e)}" def delete_files(output_filepath_list, trans_log_filepath_list, global_log_filepath): for output_filepath in output_filepath_list: try: os.remove(output_filepath) except FileNotFoundError: pass for trans_log_filepath in trans_log_filepath_list: try: os.remove(trans_log_filepath) except FileNotFoundError: pass try: os.remove(global_log_filepath) except FileNotFoundError: pass print("Files deleted") def delete_files_thread(output_filepath_list, trans_log_filepath_list, global_log_filepath): print("Thread started") time.sleep(20) delete_files(output_filepath_list, trans_log_filepath_list, global_log_filepath) def convert_xlsx_to_TMxlsx(input_file_list): file_list = [file.name for file in input_file_list] output_filepath_list, trans_log_filepath_list, error_check, global_transfer_log_path = convert_transcript_for_TM(file_list=file_list) if not error_check: error_check = "No errors found." delete_thread = threading.Thread(target=delete_files_thread, args=(output_filepath_list, trans_log_filepath_list, global_transfer_log_path)) delete_thread.start() return output_filepath_list, trans_log_filepath_list, global_transfer_log_path, error_check def convert_for_annotation(input_file_list, annotation_scheme): output_files=[] for input_transcript in input_file_list: print("start converting transcript") output_file = convert_transcript_for_annotation(file=input_transcript, annotation_scheme=annotation_scheme) print("finished converting transcript to xlsx for annotation") output_files.append(output_file) return output_files def convert_xlsx_to_ELANtsv(input_file_list): output_files=[] for input_transcript in input_file_list: # convert transcript print("start converting transcript") table = old_xlsx_to_table(xl_file=input_transcript) print("finished converting transcript to table") output_transcript = input_transcript.replace('.xlsx', '.tsv') output_file = table_to_ELAN_tsv(table, output_transcript) print("saved table to tsv") output_files.append(output_file) return output_files def sort_and_merge(input_file_list, merge_on_ellipsis=False): # simply load a csv file using parse_label_csv, then merge the segments on ellipsis # and save to a new file output_files=[] for input_transcript in input_file_list: # convert transcript # if is excel then use xlsx_to_table if input_transcript.endswith('.xlsx') or input_transcript.endswith('.xls'): print("...input is xlsx") table = xlsx_to_table(xl_file=input_transcript) input_transcript = input_transcript.replace('.xlsx', '.csv') elif input_transcript.endswith('.csv') or input_transcript.endswith('.txt') or input_transcript.endswith('.tsv'): print("...input is csv, txt, or tsv") table = parse_label_csv(input_transcript) else: print(f"...input {input_transcript} is not a supported file type") continue table = table.sort_values(by=['start_sec']) if merge_on_ellipsis: table = merge_ellipsis(table) print("finished sorting and merging segments") # make filename if 'seg_labels' in input_transcript: output_file= input_transcript.replace('seg_labels', 'utt_labels') elif 'seglabels' in input_transcript: output_file= input_transcript.replace('seglabels', 'utt_labels') else: # prepend it to the filename (but it could be a path so be careful) output_file_base = os.path.basename(input_transcript) output_file = os.path.join(os.path.dirname(input_transcript), f"utt_labels_{output_file_base}") else: print("finished sorting segments") # make filename output_file = input_transcript.replace('.csv', '_sorted.csv') # save to csv table.to_csv(output_file, index=False) print("saved processed transcript to csv") output_files.append(output_file) return output_files #TODO: support sort and merge for XLSX output if this is needed def convert_ELANtsv_to_CSV(input_file_list, merge_on_ellipsis=False): output_files=[] for input_transcript in input_file_list: # convert transcript print("start converting transcript") output_transcript = input_transcript.replace('.tsv', '.csv') output_file = ELAN_to_labels_csv(input_transcript, merge_segments = merge_on_ellipsis) print("finish converting transcript") output_files.append(output_file) return output_files # TODO: XLSX to csv (seg_labels or utt_labels) def convert_xlsx_to_csv(input_file_list, merge_on_ellipsis=False): output_files=[] for input_transcript in input_file_list: # read xl file to table # write table to csv with option to merge segments on ellipsis output_transcript = input_transcript.replace('.xlsx', '.csv') output_file = old_xlsx_to_labels_csv(input_transcript, merge_segments = merge_on_ellipsis) output_files.append(output_file) return output_files def deidentify_transcripts(input_file_list, who='student'): output_files=[] for file in input_file_list: basename = os.path.basename(file) ext = file.split('.')[-1] if file.endswith('.xlsx') or file.endswith('.xls'): df = pd.read_excel(file) elif file.endswith('.csv'): df = pd.read_csv(file) elif file.endswith('.tsv'): df = pd.read_csv(file, sep='\t') elif file.endswith('.txt'): df = pd.read_csv(file, sep='\t') else: gr.Warning("File type not supported (must be .xlsx, .xls, .csv, .tsv, or .txt)") try: df = deidentify_speaker(df, who=who) except ValueError as e: gr.Warning(f"{e}: {basename} ") continue output_file = file.replace(f'.{ext}', f'_deidentified.{ext}') if ext == 'xlsx' or ext == 'xls': df.to_excel(output_file, index=False) elif ext == 'csv': df.to_csv(output_file, index=False) elif ext == 'tsv' or ext == 'txt': df.to_csv(output_file, sep='\t', index=False) output_files.append(output_file) return output_files ###### GRADIO INTERFACE ###### # gr components for video trimmer input_file = gr.File(label="Select video file") output_format = gr.Dropdown(choices=["mkv", "MOV", "mp4", "wav"], label="Select output format", value="mp4", ) start_time = gr.Textbox(label="Start time (in seconds or HH:MM:SS). Leave blank to start at beginning.") end_time = gr.Textbox(label="End time (in seconds or HH:MM:SS). Leave blank to trim a 5-minute interval since start.") output_file = gr.File(label="Download trimmed file") interface = gr.Interface(fn=trim_video, inputs=[input_file, output_format, start_time, end_time], outputs=output_file, title="Video Trimmer", flagging_mode="never", description="Trim a video file to a specific time interval. Please wait for the file to upload before clicking the 'Submit' button.") # gr components for video converter input_file_c = gr.File(label="Select video file") output_format_c = gr.Dropdown(choices=["mkv", "MOV", "mp4", "wav"], label="Select output format", value="mp4",) output_file_c = gr.File(label="Download converted file") interface_c = gr.Interface(fn=convert_video, inputs=[input_file_c, output_format_c], outputs=output_file_c, title="Video Converter", flagging_mode="never", description="Convert a video file to a different format. Please wait for the file to upload before clicking the 'Submit' button.") # gr components for video trimmer with random start input_file_vtr = gr.File(label="Select video file") output_format_vtr = gr.Dropdown(choices=["mkv", "MOV", "mp4", "wav"], label="Select output format", value="mp4", ) output_file_vtr = gr.File(label="Download trimmed file") log_file_vtr = gr.File(label="Download log file") interface_vtr = gr.Interface(fn=trim_video_vtr, inputs=[input_file_vtr, output_format_vtr], outputs=[output_file_vtr, log_file_vtr], flagging_mode="never", title="Video Trimmer with Random Start Time", description="This app trims a 10-minute interval from a video file. \ The start time is randomly selected between 5 and 15 minutes. \ The log file contains the start time and end time of the trimmed video.", ) # gr components for video trimmer with transcript input_file_wt = gr.File(label="Select video file") input_transcript_wt = gr.File(label="Transcript of the video") output_format_wt = gr.Dropdown(choices=["mkv", "MOV", "mp4", "wav"], label="Select output format", value="mp4", ) start_time_wt = gr.Textbox(label="Start time (in seconds or HH:MM:SS). Leave blank to start at beginning.") end_time_wt = gr.Textbox(label="End time (in seconds or HH:MM:SS). Leave blank to trim a 5-minute interval since start.") output_file_wt = gr.File(label="Download trimmed file") output_transcript_wt = gr.File(label="Download trimmed transcript") interface_wt = gr.Interface(fn=trim_video_wt, inputs=[input_file_wt, input_transcript_wt, output_format_wt, start_time_wt, end_time_wt], outputs=[output_file_wt, output_transcript_wt], title="Video Trimmer with transcript converted", flagging_mode="never", description="Trim a video file to a specific time interval with transcript format converted. Please wait for the file to upload before clicking the 'Submit' button. \n\ This transcript should be .xlsx files from Happyscribe (an external transcription service). The columns in the file are as follows: \n\ `#`: an integer index over utterances. \n\ `Timecode`: a string in the format `HH:MM:SS:ss - HH:MM:SS:ss` representing the start and end time of the utterance. \n\ `Duration`: a string in the format `HH:MM:SS:ss` representing the duration of the utterance. \n\ `Speaker`: a string representing the speaker of the utterance. \n\ `Dialogue`: a string representing the text of the utterance. \n\ `Annotations`: a string that may be blank, representing any annotations for the utterance. \n\ `Error Type`: a string that may be blank, representing any errors in the transcription of the utterance. ") #### TRANSCRIPT COMPONENTS #### # gr components for TM converter input_xlsx = gr.Files(label="Input XLSX or CSV transcript file", type="filepath", file_types=[".xlsx", ".csv"]) output_xlsx_tm = gr.Files(label="Output XLSX file", type="filepath", file_types=[".xlsx"]) process_log_tm = gr.File(label="Process Log", type="filepath", file_types=[".log", ".txt"] ) global_transfer_log_tm = gr.File(label="Global transfer log", type="filepath", file_types=[".log", ".txt"]) error_check_tm = gr.Textbox(label="Error Check", type="text") interface_tm = gr.Interface(fn=convert_xlsx_to_TMxlsx, inputs=input_xlsx, outputs=[output_xlsx_tm, process_log_tm, global_transfer_log_tm, error_check_tm], title="transcript-->XLSX+TM", description="Converts XLSX or csv transcript to XLSX+TM transcript with prefilled dropdown for talkmoves", live=False, flagging_mode="never",) # gr components for xlsx to ELAN input_x2e = gr.Files(label="Input XLSX or CSV transcript file", type="filepath", file_types=[".xlsx", ".csv"]) output_x2e = gr.Files(label="Output ELAN-compatible tsv file", type="filepath", file_types=[".tsv",'.txt']) # process_log_x2e = gr.File(label="Process Log", type="filepath", file_types=[".log", ".txt"] ) # global_transfer_log_x2e = gr.File(label="Global transfer log", type="filepath", file_types=[".log", ".txt"]) # error_check_x2e = gr.Textbox(label="Error Check", type="text") interface_x2e = gr.Interface(fn=convert_xlsx_to_ELANtsv, # TODO: swap out for correct fn inputs=input_x2e, outputs=output_x2e, title="XLSX-->ELAN", description="Converts XLSX transcript to ELAN-compatible tsv file", live=False, flagging_mode="never",) # gr components for ELAN to CSV input_e2c = gr.Files(label="Input ELAN-compatible tsv file", type="filepath", file_types=[".tsv",'.txt']) merge_e2c = gr.Checkbox(label="Merge segments on ellipsis?") output_e2c = gr.Files(label="Output CSV file", type="filepath", file_types=[".csv"]) interface_e2c = gr.Interface(fn=convert_ELANtsv_to_CSV, # TODO: swap out for correct fn inputs=[input_e2c, merge_e2c], outputs=[output_e2c], title="ELAN-->CSV", description="Converts ELAN-exported file (.txt or .tsv, tab separated values) to standardized CSV file with rows sorted by segment start time. Optionally merges segments on ellipsis.", live=False, flagging_mode="never",) # gr components for XLSX to CSV input_x2c = gr.Files(label="Input XLSX file", type="filepath", file_types=[".xlsx", ".csv"]) merge_x2c = gr.Checkbox(label="Merge segments on ellipsis?") output_x2c = gr.Files(label="Output CSV file", type="filepath", file_types=[".csv"]) interface_x2c = gr.Interface(fn=convert_xlsx_to_csv, # TODO: swap out for correct fn inputs=[input_x2c, merge_x2c], outputs=[output_x2c], title="XLSX-->CSV", description="Converts old version XLSX transcript (with a single Timecode column) to standardized CSV file with rows sorted by segment start time. Optionally merges segments on ellipsis.", live=False, flagging_mode="never",) # gr components for annotation XLSX input_c2a = gr.Files(label="Input CSV file", type="filepath", file_types=[".csv"]) annotation_scheme_c2a = gr.Radio(label="Annotation Scheme", choices=[("CPS","CPS"), ("TalkMove","TM"),("None",None)]) output_c2a = gr.Files(label="Output XLSX file", type="filepath", file_types=[".xlsx"]) interface_c2a = gr.Interface( fn=convert_for_annotation, # TODO: swap out for correct fn inputs=[input_c2a, annotation_scheme_c2a], outputs=[output_c2a], title="CSV-->XLSX+annotation", description="Converts CSV file to XLSX file for annotation (added columns for CPS or TM or None)", live=False, flagging_mode="never", # submit_btn="Convert" ) # gr components for deidentification input_di = gr.Files(label="Input transcript file", type="filepath", file_types=[".xlsx", ".xls",".csv", ".tsv", ".txt"]) who_di = gr.Radio(label="Who to deidentify", choices=[("student","student"), ("all","all")]) output_di = gr.Files(label="Output deidentified transcript file", type="filepath", file_types=[".xlsx", ".xls",".csv", ".tsv", ".txt"]) interface_di = gr.Interface( fn=deidentify_transcripts, inputs=[input_di, who_di], outputs=[output_di], title="Deidentify", description="Deidentify speaker labels in a transcript. Compatible with .xlsx, .xls, .csv, .tsv, .txt files with a column containing speaker labels. Will not work if speaker column is missing a header. Speaker names or IDs will be replaced with a deidentified label numbered in order of appearance. Choose whether to deidentify just students or all speakers.", live=False, flagging_mode="never", ) # gr components for transcript sorter input_file_s = gr.Files(label="Select transcript files", type="filepath", file_types=[".csv", ".xlsx",".xls", ".tsv", ".txt"]) merge_s = gr.Checkbox(label="Merge segments on ellipsis?") output_file_s = gr.Files(label="Download sorted/merged transcript as .csv", type="filepath", file_types=[".csv"]) interface_s = gr.Interface(fn=sort_and_merge, inputs=[input_file_s, merge_s], outputs=output_file_s, title="Sort+Merge", description="Sort a transcript file by time, and optionally merge partial utterances on ellipsis. Output is a .csv file in standard format.", live=False, flagging_mode="never") ######## LAUNCH APP ######## demo = gr.TabbedInterface( [ interface_e2c, interface_x2e, interface_x2c, interface_c2a, interface_tm, interface_di, interface_s, interface_c, interface, interface_vtr, interface_wt ], [ "πŸ“β†’πŸ—’οΈ ELANβ†’CSV", "βŽβ†’πŸ“ XLSXβ†’ELAN", "βŽβ†’πŸ—’οΈ XLSXβ†’CSV", "πŸ—’οΈβ†’βŽβ˜· CSVβ†’XLSX", "πŸ—’οΈβ†’βŽπŸ’¬ CSVβ†’XLSX+TM", "πŸ—’οΈβ†’πŸ₯·πŸ» Deidentify", "πŸ—’οΈπŸ”€πŸ—’οΈ Sort+Merge", "πŸŽ₯β†’πŸ“½ Convert", "πŸŽ₯βœ‚οΈ Trim", "πŸŽ₯βœ‚οΈπŸŽ² Trim Random", "πŸŽ₯πŸ—’οΈβœ‚οΈ Trim + Transcript" ] ) demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True)