import csv import logging import os import re import subprocess from pathlib import Path import sys import gradio as gr import pandas as pd from pathlib import Path import nltk from openpyxl import Workbook from openpyxl.utils.dataframe import dataframe_to_rows from openpyxl.worksheet.datavalidation import DataValidation os.makedirs(f'{os.getcwd()}/logs', exist_ok=True) os.makedirs(f'{os.getcwd()}/results', exist_ok=True) logging.basicConfig(filename=f'{os.getcwd()}/logs/logfile.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.info('Starting the application...') def subprocess_run_verbose(cmd): res = subprocess.check_call(cmd, stdout=sys.stdout, stderr=subprocess.STDOUT) return res def HHMMSS_to_sec(time_str): """Get Seconds from timestamp string with milliseconds.""" if not time_str: return None if isinstance(time_str, (int, float)): return float(time_str) if time_str.count(':')==2: h, m, s = time_str.split(':') elif time_str.count(':')==3: # weird timestamps where there is a field followign seconds delimited by colon h, m, s, u = time_str.split(':') # determine whether ms field is in tenths or hundredths or thousandths by countng how many digits if len(u)==1: print('Weird time format with 3 colons detected - HH:MM:SS:X . Interpreting X as tenths of a second. - please verify this is how you want the time interpreted') ms = float(u)/10 elif len(u)==2: # hundredths print('Weird time format with 3 colons detected - HH:MM:SS:XX . Interpreting XX as hundredths of a second. - please verify this is how you want the time interpreted') ms = float(u)/100 elif len(u)==3: # hundredths print('Weird time format with 3 colons detected - HH:MM:SS:XXX . Interpreting XX as milliseconds. - please verify this is how you want the time interpreted') ms = float(u)/1000 else: print(f'input string format not supported: {time_str}') return None s = int(s)+ms elif time_str.count(':')==1: # print('missing HH from timestamp, assuming MM:SS') m, s = time_str.split(':') h=0 else: try: time_str=float(time_str) # maybe its already in seconds! return time_str except Exception as e: gr.Error(f"Error converting time to seconds: {e}") return None return int(h) * 3600 + int(m) * 60 + float(s) def molly_xlsx_to_table(xl_file): # contractor transcribers provide an xlsx with the following columns # utt_ix: int # Timecode: "HH:MM:SS:ss - HH:MM:SS:ss" # Duration: HH:MM:SS:ss # Speaker: str # Dialogue: str # Annotations: blank # Error Type: blank with pd.ExcelFile(xl_file) as xls: sheetname = xls.sheet_names table = pd.DataFrame(pd.read_excel(xls, sheetname[0])) table[['start_time','end_time']] = table['Timecode'].str.split('-',expand=True) table['start_sec'] = table['start_time'].str.strip().apply(HHMMSS_to_sec) table['end_sec'] = table['end_time'].str.strip().apply(HHMMSS_to_sec) table.drop(labels=['Annotations','Error Type','Duration'], axis=1, inplace=True) table=table[['#','Speaker','Dialogue','start_sec','end_sec']] table.rename(columns={'#':'uttID','Speaker':'speaker', 'Dialogue':'transcript'}, inplace=True) return table def xlsx_to_table(xl_file): try: # read the first sheet of the Excel file into a DataFrame print(f'...reading {xl_file}...') table = pd.read_excel(xl_file, sheet_name=0) print(f'...done reading {xl_file}...') # convert column names to lowercase table.columns = map(str.lower, table.columns) # extract start and end time from the Timecode column print(f'...splitting Timecode column into start and end time...') timecodes = table['timecode'].str.split(' - ', expand=True) table['start_time'] = timecodes[0] table['end_time'] = timecodes[1] print(f'...done splitting Timecode column into start and end time...') # convert start and end time to seconds using the HHMMSS_to_sec function print(f'...converting start and end time to seconds...') table['start_sec'] = table['start_time'].apply(HHMMSS_to_sec) table['end_sec'] = table['end_time'].apply(HHMMSS_to_sec) print(f'...done converting start and end time to seconds...') # drop unnecessary columns print(f'...dropping unnecessary columns...') table.drop(['timecode', 'annotations', 'error type', 'duration'], axis=1, inplace=True) # rename columns print(f'...renaming columns...') table.rename(columns={'#': 'uttID', 'speaker': 'speaker', 'dialogue': 'transcript'}, inplace=True) # reorder columns print(f'...reordering columns...') table = table[['uttID', 'speaker', 'transcript', 'start_sec', 'end_sec']] # sort by start time table.sort_values('start_sec', inplace=True) return table except Exception as e: gr.Error(f'Error converting {xl_file}: {e}') def table_to_ELAN_tsv(table:pd.DataFrame, path:str): # write table to tsv compatible with ELAN import table.to_csv(path, index=False, float_format='%.3f',sep='\t') return path def convert_and_trim_video(media_in, media_out, start=None, end=None): WAV_CHANNELS = 1 WAV_SAMPLE_RATE = 16000 start_sec = HHMMSS_to_sec(start) end_sec = HHMMSS_to_sec(end) try: if start_sec is None and end_sec is None: logging.info(f'...No start and end times provided. Converting entire video without trimming...') trim_command=[] else: if start_sec is None: logging.info(f'...No start time provided. Trimming video from start to specified end...') start_sec = 0.0 trim_command = ['-ss',f'{start_sec}'] if end_sec is None: logging.info(f'...No end time provided. Trimming video from specified start to end of video...') end_sec = None else: trim_command.extend(['-to', f'{end_sec}']) if not isinstance(media_in, (str, Path)): raise TypeError("media_in must be a string or a PathLike object") if not isinstance(media_out, (str, Path)): raise TypeError("media_out must be a string or a PathLike object") in_ext = Path(media_in).suffix.lower() out_ext = Path(media_out).suffix.lower() print(f'...detected extensions from filename: input={in_ext} output={out_ext}') if in_ext == out_ext: logging.info(f'...No media conversion needed...') else: logging.info(f'...Using ffmpeg to convert {in_ext} to {out_ext}...') if out_ext == '.wav': if in_ext == '.webm': command = [ 'ffmpeg', '-y', '-i', media_in, *trim_command, media_out, '-hide_banner', '-loglevel', 'info'] else: # convert to wav with standard format for audio models command = [ 'ffmpeg', "-f", "s16le", '-y', '-i', media_in, *trim_command, '-vn', '-acodec', 'pcm_s16le', '-ac', str(WAV_CHANNELS), '-ar', str(WAV_SAMPLE_RATE), media_out, '-hide_banner', '-loglevel', 'info'] else: # convert using copy codec if in_ext == '.webm': command = [ 'ffmpeg', '-y', '-i', media_in, '-strict', '-2', *trim_command, '-c:v', 'copy', # '-vcodec', 'h264', # '-acodec', 'aac', media_out, '-hide_banner', '-loglevel', 'info'] else: # not webm command = [ 'ffmpeg', '-y', '-i', media_in, *trim_command, '-c','copy', media_out, '-hide_banner', '-loglevel', 'info'] # run the ffmpeg command logging.info(f"FFMPEG command: {' '.join(command)}") gr.Info(f"FFMPEG command: {' '.join(command)}", visible=False) print(f"...FFMPEG command: {' '.join(command)}") # process = subprocess.run(command, capture_output=True, text=True) # if process.returncode != 0: # logging.info(f"FFMPEG error: {process.stderr}") # print(f"FFMPEG error: {process.stderr}") # gr.Error(f"FFMPEG error: {process.stderr}") # else: # logging.info(process.stdout) # print(f"...FFMPEG status: {process.stdout}") return_code = subprocess_run_verbose(command) print(f"FFMPEG return code: {return_code}") if return_code != 0: logging.info(f"FFMPEG error: {return_code}") print(f"FFMPEG error: {return_code}") gr.Error(f"FFMPEG error: {return_code}") return None else: logging.info(f"...FFMPEG completed successfully...") print(f"...FFMPEG completed successfully...") return media_out except Exception as e: print(f"Error converting video format: {e}") gr.Error(f"Error converting video format: {e}") ###### TRANSCRIT UTILS ###### def convert_transcript_for_TM(file_list): """Convert transcripts for TalkMoves Annotation Input can be xlsx or csv transcript file Can handle sepraate start and end time columns or a single timecode column Output will have separate start and end timestamps in HH:MM:SS.sss format Args: file_list (_type_): _description_ Raises: gr.Error: _description_ gr.Error: _description_ Returns: _type_: _description_ """ # Regular expression pattern for matching speaker names and timecodes. bracket_re = re.compile(r'(?:\[[UI|ui|Inaudible|inaudible|overlapping speech|VIDEO SILENCE|teacher explaining in background].*\]\W{0,2})') # Regular expression pattern for matching anything enclosed in square brackets. all_bracket_re = re.compile(r'(?:\[.*\]\W{0,2})') # whether remove the inaudible do_remove_inaudible = True # whether_keep_context_switch do_keep_context_switch = True # whether_convert_to_timestamp if start and end time are in seconds and in separate columns convert_to_timestamp = True error_message = [] # List of error messages to be displayed to the user. global_stat_dict = {} # Dictionary of global statistics. output_filepath_list = [] # List of output file paths. trans_log_filepath_list = [] # List of transcription log file paths. for file in file_list: filename = file.split('/')[-1] # Get the filename from the file. filepath = os.path.dirname(file) # Get the file path from the file. # Read the file into a Pandas DataFrame depending on its file format. if filename.endswith('.xlsx'): df = pd.read_excel(file, index_col=0) output_filename = f"{filename[:-5]}" + "_TMcoded.xlsx" elif filename.endswith('.csv'): df = pd.read_csv(file, index_col=0, error_bad_lines=False) output_filename = f"{filename[:-4]}" + "_TMcoded.xlsx" else: raise gr.Error(f"{file} format is wrong") # Remove the "Copy of" prefix from the output filename, if present. if output_filename.startswith("Copy of "): output_filename = output_filename[8:] # Remove the word "_Transcript" from the output filename, if present. if '_Transcript' in output_filename: # print("before: "+output_filename) error_message.append("before: "+output_filename) output_filename = ''.join(output_filename.split('_Transcript')) # print("after: "+output_filename) error_message.append("after: "+output_filename) # Construct the output file and transcription log file paths. output_filepath = os.path.join(filepath, output_filename) trans_log_filepath = os.path.join(filepath, f"{output_filename}"+ ".log") # Open the transcription log file for writing. with open(trans_log_filepath, "w") as outfile: sub_cnt_in_file = 0 empty_speaker_cnt_in_file = 0 turn_skipped_in_file = 0 turn_skipped_speaker_switch_in_file = 0 snt_mark_skip_in_file = 0 snt_skipped_in_file = 0 chat_flag_in_speaker_time_line = 0 chat_flag_in_content_line = 0 all_inaudible_in_file = 0 all_bracket_in_file = 0 all_snts_in_file = 0 all_token_cnt_in_file = 0 #index Timecode Duration Speaker Dialogue Annotations Error Type #1 00:00:05:04 - 00:00:07:12 00:00:02:08 Tutor Did you... How was your Halloween? turns = [] time_stamps = [] speakers = [] chat_flags = [] sentences = [] snt_ids = [] ## parse the df flexibly: find key column names which might vary dependign on transcript source # set all column names to lowercase df.columns = map(str.lower, df.columns) # several possibilities for column names, detect which are present uttID_keys = ['utt','seg','utt_id','seg_id','index'] speaker_keys = ['speaker'] start_keys=['start_sec','start','start_time','timestart'] end_keys=['end_sec','end','end_time','timeend'] timestamp_keys = ['timecode','timestamp'] content_keys=['dialogue','utterance','transcript','text'] # detect which is used in this df uttID_key = next((key for key in uttID_keys if key in df.columns), None) speaker_key = next((key for key in speaker_keys if key in df.columns), None) content_key = next((key for key in content_keys if key in df.columns), None) # check if separate start and end times are present, otherwise assume single timecode column if any(df.columns.isin(start_keys)): start_key = next((key for key in start_keys if key in df.columns), None) end_key = next((key for key in end_keys if key in df.columns), None) time_format = 'seconds' if convert_to_timestamp: # convert to timestamp format HH:MM:SS.sss - HH:MM:SS.sss df['timecode'] = df.apply(lambda x: f"{sec_to_HHMMSS(x[start_key])} - {sec_to_HHMMSS(x[end_key])}", axis=1) timestamp_key='timecode' time_format = 'timestamp' else: timestamp_key=next((key for key in timestamp_keys if key in df.columns), None) time_format = 'timestamp' # Turn started with 1, the same as molly's transcripts for i, row in df.iterrows(): turn = row[uttID_key] if uttID_key else i+1 speaker = row[speaker_key] time_str = row[timestamp_key] content = "" if pd.isna(row[content_key]) else row[content_key].strip("\n") # when speaker is empty, use the previous speaker if speaker == "": if speakers: speaker = speakers[-1] empty_speaker_cnt_in_file += 1 outfile.write(f"{turn}: found empty speaker, use the speaker in previous turn: {speaker}\n") else: raise gr.Error(f"{row}, the first turn is empty speaker") # clean after the sentence tokenize snts = sent_tokenize(content) all_snts_in_file += len(snts) snt_skipped_in_turn = 0 for i, snt in enumerate(snts): remove_flag = False inaudible_search = re.findall(bracket_re, snt) if inaudible_search: all_inaudible_in_file += len(inaudible_search) outfile.write(f"{turn}, {inaudible_search}, inaudible found in snt: {snt}\n") all_bracket_search = re.findall(all_bracket_re, snt) if all_bracket_search: all_bracket_in_file += len(all_bracket_search) outfile.write(f"{turn}, {all_bracket_search} bracket found in snt: {snt}\n") # only remove the [inaudible xxx] when it is the whole sentence. inaudible_match = re.fullmatch(bracket_re, snt) if inaudible_match: if do_keep_context_switch: # if keep context switch if speakers and speaker == speakers[-1]: # share the same speaker, no context switching, just remove it remove_flag = True else: # different speakers, it is the context switching. if len(snts) == 1: # current empty sentence is the only single sentence remove_flag = False else: if i != len(snts)-1: # current empty utterance is not the last one, just delete it remove_flag = True else: # current empty utterance is the last one, keep it. if snt_skipped_in_turn == len(snts)-1: # all previous snts are empty, then keep this to not skip the whole turn remove_flag = False else: remove_flag = True else: # if not keep context switch, then simply remove all empty utterance remove_flag = True # If remove_flag is true: if remove_flag: # Increment sub_cnt_in_file and snt_mark_skip_in_file sub_cnt_in_file += 1 snt_mark_skip_in_file += 1 # Write the following message to outfile: outfile.write(f"{turn}, sub happend: {snt}, skip this sentence\n") # If do_remove_inaudible is true: if do_remove_inaudible: snt_skipped_in_file += 1 snt_skipped_in_turn += 1 continue # Add to pd: # Append turn to turns list turns.append(turn) # Set snt_id to the string f"{turn}.{i}" snt_id = f"{turn}.{i}" # Append time_str to time_stamps list time_stamps.append(time_str) # Append speaker to speakers list speakers.append(speaker) # Set sentence to the string representation of snt, with whitespace removed from the start and end sentence = str(snt).strip().rstrip("\n") # Calculate the number of tokens in sentence and add to all_token_cnt_in_file token_cnt = len(nltk.word_tokenize(sentence)) all_token_cnt_in_file += token_cnt # Append snt_id to snt_ids list snt_ids.append(snt_id) # Append sentence to sentences list sentences.append(sentence) if snt_skipped_in_turn == len(snts): # all snts in turn are skiped, then skip the turn turn_skipped_in_file += 1 if (speakers and speaker != speakers[-1]) or not speakers: turn_skipped_speaker_switch_in_file += 1 outfile.write(f"{turn}, since all snts are empty, skip this whole turn {content}\n") # Create a new DataFrame with the following columns: new_df = pd.DataFrame({ "Sentence_ID": snt_ids, # A "TimeStamp": time_stamps, #B "Turn" : turns, #C "Speaker" : speakers, #D "Sentence" : sentences #E }) # assert turn_skipped_speaker_switch_in_file==0, "Some speaker switch turn skipped" new_df["Teacher_TM"] = None #F new_df["Student_TM"] = None #G # write new_df to xlsx file new_df.to_excel(output_filepath, index=False) # https://openpyxl.readthedocs.io/en/latest/api/openpyxl.utils.dataframe.html#openpyxl.utils.dataframe.dataframe_to_rows wb = Workbook() ws = wb.active teacher_dv = DataValidation(type="list", formula1='",1-None,2-Keep-Together,3-Getting-Student-to-Relate,4-Restating,5-Revoicing,6-Context,7-Press-for-Accuracy,8-Press-for-Reasoning"', allow_blank=True) student_dv = DataValidation(type="list", formula1='",1-None,2-Relate-to-Another-Student,3-Asking-for-More-info,4-Making-a-Claim,5-Providing-Evidence/Reasoning"', allow_blank=True) ws.add_data_validation(teacher_dv) ws.add_data_validation(student_dv) teacher_dv.add('F2:F1048576') student_dv.add('G2:G1048576') for r in dataframe_to_rows(new_df, index=False, header=True): ws.append(r) wb.save(output_filepath) stat_dict = { "chat_flag_in_speaker_time_line": chat_flag_in_speaker_time_line, "chat_flag_in_content_line": chat_flag_in_content_line, "empty_speaker_cnt_in_file": empty_speaker_cnt_in_file, "ori_total_turn": df.shape[0], "ori_total_snt": all_snts_in_file, "turn_skipped": turn_skipped_in_file, "turn_skipped_speaker_switch_in_file": turn_skipped_speaker_switch_in_file, "snt_skipped": snt_skipped_in_file, "remaining_snt": all_snts_in_file - snt_skipped_in_file, "all_token_cnt_in_file": all_token_cnt_in_file, "avg_token_cnt_per_snt": all_token_cnt_in_file/(all_snts_in_file - snt_skipped_in_file), "sub_cnt_in_file": sub_cnt_in_file, "all_inaudible_in_file": all_inaudible_in_file, "all_bracket_in_file": all_bracket_in_file, "other_bracket_in_file": all_bracket_in_file - all_inaudible_in_file } if all_inaudible_in_file != all_bracket_in_file: # print(f"{filename} has special brakets") error_message.append(f"Warning: {filename} has special brakets") for k, v in stat_dict.items(): global_stat_dict[k] = global_stat_dict.get(k,0) + v outfile.write(f"{output_filepath}, {json.dumps(stat_dict, indent=4)}") output_filepath_list.append(output_filepath) trans_log_filepath_list.append(trans_log_filepath) for k, v in global_stat_dict.items(): if "avg" in k: global_stat_dict[k] = global_stat_dict[k]/len(file_list) global_log_filepath = os.path.join(filepath, "global_transfer"+ ".log") with open(global_log_filepath, "w") as outfile: outfile.write(f"global_stat_dict: {json.dumps(global_stat_dict, indent=4)}") # error_check if global_stat_dict["all_inaudible_in_file"] != global_stat_dict["all_bracket_in_file"]: error_message.append("Error: 'all_inaudible_in_file' does not match 'all_bracket_in_file'") if global_stat_dict["other_bracket_in_file"] != 0: error_message.append("Error: 'other_bracket_in_file' is not zero") return output_filepath_list, trans_log_filepath_list, error_message, global_log_filepath def add_CPS_columns(df): # Observation Instructions CONST_SharesU_Situation CONST_SharesU_CorrectSolutions CONST_SharesU_IncorrectSolutions CONST_EstablishesCG_Confirms CONST_EstablishesCG_Interrupts NEG_Responds_Reasons NEG_Responds_QuestionsOthers NEG_Responds_Responds MAINTAIN_Initiative_Criticizes NEG_MonitorsE_Results NEG_MonitorsE_GivingUp NEG_MonitorsE_Strategizes NEG_MonitorsE_Save MAINTAIN_Initiative_Suggestions MAINTAIN_Initiative_Compliments MAINTAIN_FulfillsR_InitiatesOffTopic MAINTAIN_FulfillsR_JoinsOffTopic MAINTAIN_FulfillsR_Support MAINTAIN_FulfillsR_Apologizes Notes annotation_columns = ['Observation','Instructions', 'CONST_SharesU_Situation', 'CONST_SharesU_CorrectSolutions', 'CONST_SharesU_IncorrectSolutions', 'CONST_EstablishesCG_Confirms', 'CONST_EstablishesCG_Interrupts', 'NEG_Responds_Reasons', 'NEG_Responds_QuestionsOthers', 'NEG_Responds_Responds', 'MAINTAIN_Initiative_Criticizes', 'NEG_MonitorsE_Results', 'NEG_MonitorsE_GivingUp', 'NEG_MonitorsE_Strategizes', 'NEG_MonitorsE_Save', 'MAINTAIN_Initiative_Suggestions', 'MAINTAIN_Initiative_Compliments', 'MAINTAIN_FulfillsR_InitiatesOffTopic', 'MAINTAIN_FulfillsR_JoinsOffTopic', 'MAINTAIN_FulfillsR_Support', 'MAINTAIN_FulfillsR_Apologizes', 'Notes'] # add these columns to the end of the df in this order for col in annotation_columns: df[col]='' return df def add_TM_columns(df): annotation_columns = ['Teacher_TM', 'Student_TM'] # add these columns to the end of the df in this order for col in annotation_columns: df[col]='' return df def convert_transcript_for_annotation(file, annotation_scheme=None): """Convert transcript for annotation: Input standard csv transcript file Output will have separate start and end timestamps in HH:MM:SS.sss format Filename column will infer the video filename from the transcript filename Columns for CPS annotators are added """ filename,ext = os.path.splitext(os.path.basename(file)) # Get the filename from the file. filepath = os.path.dirname(file) # Get the file path from the file. # Read the file into a Pandas DataFrame depending on its file format. try: table = parse_label_csv(file) media_filename = get_sessname_from_filename(filename) out_df=table.copy() out_df['recordingID']=media_filename out_df['TimeStart']=out_df['start_sec'].apply(sec_to_HHMMSS) out_df['TimeEnd']=out_df['end_sec'].apply(sec_to_HHMMSS) out_df=out_df[['speaker','TimeStart','TimeEnd','utterance','recordingID','uttID']] if annotation_scheme=='CPS': out_df=add_CPS_columns(out_df) output_file = os.path.join(filepath, f"CPS_{filename}.xlsx") out_df.to_excel(output_file, index=False) elif annotation_scheme=='TM': out_df=add_TM_columns(out_df) output_file = os.path.join(filepath, f"TM_{filename}.xlsx") out_df.to_excel(output_file, index=False) else: output_file = os.path.join(filepath, f"{filename}.xlsx") out_df.to_excel(output_file, index=False) return output_file except Exception as e: raise gr.Error(f"{filename}: error {e}") def sec_to_HHMMSS(seconds): """Get timestamp string from seconds.""" seconds = float(seconds) m, s = divmod(seconds, 60) h, m = divmod(m, 60) h=int(h) m=int(m) return f"{h:02d}:{m:02d}:{s:06.3f}" def readELANtsv(file, fmt=None): with open(file,'r',newline='') as in_file: reader = csv.reader(in_file, delimiter="\t", quoting=csv.QUOTE_NONE) skiprows=0 row=next(reader) while not len(row)>=4: # 4 being the min numbert of cols ELAN exports have skiprows+=1 row=next(reader) in_file.seek(skiprows) if skiprows>0: print(f'Detected {skiprows} header rows to skip') reader = csv.reader(in_file, delimiter="\t") for _ in range(skiprows): next(reader) labels = [] # transcript with speaker labels and timestamp in sec for i,utt in enumerate(reader): if not ''.join(utt).strip(): # skip blank lines continue try: if len(utt) == 5: # IF data comes straight from ELAN sometimes there is a superfluous blank column 2 if i==0: print('detected extra blank column in first row, will remove') if fmt=='AUG23': if i==0: print('detected extra blank 1st column, will remove') _,speaker,start_HHMMSS,end_HHMMSS,utterance= utt convert_timestamps=True else: if i==0: print('detected extra blank 2nd column, will remove') speaker,_,start_HHMMSS, end_HHMMSS, utterance = utt convert_timestamps=True elif len(utt) == 4: # sometimes the blank col is already removed if i==0: print('detected 4 columns, assuming: speaker,start_HHMMSS, end_HHMMSS, utterance ') speaker,start_HHMMSS, end_HHMMSS, utterance = utt convert_timestamps=True elif len(utt) == 6: # New one from 2023 Aug has a redundant extra start col!? if i==0: print('detected 6 columns, assuming: _,speaker,start_HHMMSS, end_HHMMSS, utterance,_ ') _,speaker,start_HHMMSS,end_HHMMSS,utterance,_ = utt convert_timestamps=True elif len(utt) == 9: # 2023 transcribers tend to give full elan output if i==0: print('detected 9 columns, assuming: speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance ') speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance = utt convert_timestamps=True elif len(utt) == 10: # sometimes an extra blank column appears at the end if i==0: print('detected 10 columns, assuming: speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance,_ ') speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance,_ = utt convert_timestamps=True elif len(utt) == 12: # WOw how many redundant columns can ELAN make... if i==0: print('detected 12 columns, assuming: speaker,_,start_HHMMSS,_,_,end_HHMMSS,_,_,_,_,_,utterance ') speaker,_,start_HHMMSS,_,_,end_HHMMSS,_,_,_,_,_,utterance = utt convert_timestamps=True else: raise ValueError(f'Unknown transcript format with {len(utt)} columns for {file}') except BaseException as err: print(f'!!! transcript parse error on line {i} for {file}') print(utt) raise err if convert_timestamps: start_sec = HHMMSS_to_sec(start_HHMMSS) end_sec = HHMMSS_to_sec(end_HHMMSS) labels.append((speaker, utterance, start_sec,end_sec)) labels= pd.DataFrame(labels, columns = ('speaker', 'utterance', 'start_sec','end_sec')) labels.sort_values(by='start_sec', inplace=True, ignore_index=True) labels.reset_index(inplace=True) labels = labels.rename(columns = {'index':'seg'}) return(labels) def merge_ellipsis(seg_labels): # merge utterances with ellipsis # input is seg_labels format: [optional index] speaker, utterance, start_sec, end_sec if isinstance(seg_labels,str) and seg_labels.endswith(('.csv','.tsv','.txt')): df=pd.read_csv(seg_labels) elif isinstance(seg_labels, pd.DataFrame): df=seg_labels else: raise ValueError('input seg_labels should be path to csv or pd.DataFrame') if len(df.columns)==4: # no seg index yet df.reset_index(inplace=True) df = df.rename(columns = {'index':'seg'}) elif len(df.columns)==5: # first col is seg df = df.rename(columns = {df.columns[0]:'seg'}) else: raise ValueError('input seg_labels should have 4 or 5 columns') df2=[] prev_spk=None prev_utt="" prev_start=0 prev_end=0 segs=[0] merge_utt={"seg":None, "speaker":None,"utterance":None,"start_sec":None, "end_sec":None} for i,row in df.iterrows(): if i==0: merge_utt=row else: # if same speaker as last and ellipsis if merge_utt["speaker"]==row["speaker"] and str(merge_utt["utterance"]).endswith('...') and str(row["utterance"]).startswith('...'): # append current to temporary merged utt: use prev_ items merge_utt["utterance"]+=str(row["utterance"]) merge_utt["end_sec"]=row["end_sec"] segs.append(row["seg"]) else: # append merge_utt to df2 merge_utt["seg"]=segs df2.append(merge_utt) # clear merge_utt and set to current merge_utt=row segs=[merge_utt["seg"]] merge_utt["seg"]=segs # if not isinstance(merge_utt["seg"],list): # merge_utt["seg"]=list(segs) df2.append(merge_utt) # catch final merge_utt if not terminated df2=pd.DataFrame(df2) df2['utterance']=df2['utterance'].str.replace('\\.+',' ', regex=True) # clear up "......" # enumerate utterances df2.reset_index(inplace=True,drop=True) df2 = df2.reset_index().rename(columns = {'index':'utt'}) return df2 def add_dummy_seg_column(table): # adds a dummy seg column (listing segments comprising utterance) for a df without this column # labelfiles generated from merge_ellipsis have an 'utt' column giving utterance ID, and a seg column # containing a list of original segments comprising each utterance # but you may need all label files top have the exact same format even if they weren't produced by # merge_ellipsis() # returns a table with columns 'utt' and 'seg' if 'seg' in table.columns.tolist(): print('\'seg\' column already exists, not changing anything') return table if 'uttID' in table.columns.tolist(): table=table.rename(columns={"uttID":"utt"}) if not 'utt' in table.columns.tolist(): table['utt']=table.index table['seg']=[[u] for u in table['utt']] table=table[['utt','seg','speaker','start_sec','end_sec','utterance']] return table def get_sessname_from_filename(filename): sessname=Path(filename).stem sessname = re.sub('reworked-transcript-diarized-timestamped-', '', sessname,flags=re.I) sessname = re.sub('reworked_transcript-diarized-timestamped-', '', sessname,flags=re.I) sessname = re.sub('reworked-diarized-timestamped-', '', sessname,flags=re.I) sessname = re.sub('reworked_timestamped_', '', sessname,flags=re.I) sessname = re.sub('reworked_', '', sessname,flags=re.I) sessname = re.sub('reworked-', '', sessname,flags=re.I) sessname = re.sub('transcript_diarized_timestamped_', '', sessname,flags=re.I) sessname = re.sub('transcript-diarized-timestamped_', '', sessname,flags=re.I) sessname = re.sub('transcript-diarized-timestamped-', '', sessname,flags=re.I) sessname = re.sub('_transcript', '', sessname,flags=re.I) sessname = re.sub('_tmcoded', '', sessname,flags=re.I) sessname = re.sub('utt_labels_', '', sessname,flags=re.I) sessname = re.sub('seg_labels_', '', sessname,flags=re.I) sessname = re.sub('_redacted', '', sessname,flags=re.I) return sessname def ELAN_to_labels_csv(ELANfile, merge_segments = True): # dumb but effective string wrangling to get sess name sessname=get_sessname_from_filename(ELANfile) # reads ELAN output to pd.DataFrame in a unified format labels=readELANtsv(ELANfile) if merge_segments: save_file=f'utt_labels_{sessname}.csv' # merge segments to form utterances where there have been splits separated by '...' merged_labels=merge_ellipsis(labels) merged_labels.to_csv(save_file,index=False, float_format='%.3f') else: save_file=f'seg_labels_{sessname}.csv' labels.to_csv(save_file,index=False, float_format='%.3f') return save_file def parse_label_csv(label_csv:str): # utt_labels_csv is the usual format used for diarized, timed transcripts in this repo # There are several versions with differnt columns (with/without segment &/ utterance index, # withouot column headers etc) # table: # [uttID, speaker, transcript, start_sec, end_sec] table = pd.read_csv(label_csv,keep_default_na=False, header=None) row0=table.iloc[0] is_header = not any(str(cell).replace('.','').isdigit() for cell in row0) if is_header: table.columns=row0.tolist() table=table.iloc[1:] table=table.reset_index(drop=True) else: if len(table.columns)==4: print('no header detected, assuming annotation file has columns [speaker,utterance,start_sec, end_sec] ') table.columns=['speaker','utterance','start_sec', 'end_sec'] elif len(table.columns)==5: print('no header detected, assuming annotation file has columns [seg,speaker,utterance,start_sec, end_sec] ') table.columns=['seg','speaker','utterance','start_sec', 'end_sec'] elif len(table.columns)==6: print('no header detected, assuming annotation file has columns [utt,seg,speaker,utterance,start_sec, end_sec] ') table.columns=['utt','seg','speaker','utterance','start_sec', 'end_sec'] else: print(f'no header detected, csv has {len(table.columns)} columns, could not determine column names.') return None # choose which column to use for uttID in table if 'utt' in table.columns.tolist(): table=table.rename(columns={"utt":"uttID"}).drop('seg', axis=1) elif 'seg' in table.columns.tolist(): table=table.rename(columns={"seg":"uttID"}) else: table=table.reset_index().rename(columns={"index":"uttID"}) table=table[['uttID','speaker','utterance','start_sec','end_sec']] return table def deidentify_speaker(df, who='all'): """replace speaker ID with generic labels in order of appearance (speaker1, speaker2)' if who is "student", only student names are replaced Args: df (_type_): _description_ who (str, optional): 'all','student'. Which names to replace. Defaults to 'all'. """ colnames = df.columns.tolist() speaker_key = next((key for key in ['speaker','Speaker','speaker_id','Speaker_ID'] if key in colnames),None) if not speaker_key: raise ValueError('No speaker column found in dataframe!') speakers = df[speaker_key].unique() if who=='student': # detect student. ID format can be student_xxx or 00-0000 numeric speakers = [s for s in speakers if ('student' in s.lower() or re.match(r'^\d{2}-\d{4}$',s))] generic_speakers = [f'student_{i+1}' for i in range(len(speakers))] else: generic_speakers = [f'speaker_{i+1}' for i in range(len(speakers))] speaker_dict = dict(zip(speakers, generic_speakers)) df[speaker_key] = df[speaker_key].replace(speaker_dict) return df