Spaces:
Sleeping
Sleeping
rosyvs
Remove unused sort_transcript function and update column renaming in merge_ellipsis and parse_label_csv functions
693e4cf
| import csv | |
| import logging | |
| import os | |
| import re | |
| import subprocess | |
| from pathlib import Path | |
| import sys | |
| import gradio as gr | |
| import pandas as pd | |
| from pathlib import Path | |
| import nltk | |
| from openpyxl import Workbook | |
| from openpyxl.utils.dataframe import dataframe_to_rows | |
| from openpyxl.worksheet.datavalidation import DataValidation | |
| os.makedirs(f'{os.getcwd()}/logs', exist_ok=True) | |
| os.makedirs(f'{os.getcwd()}/results', exist_ok=True) | |
| logging.basicConfig(filename=f'{os.getcwd()}/logs/logfile.log', level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s') | |
| logging.info('Starting the application...') | |
| def subprocess_run_verbose(cmd): | |
| res = subprocess.check_call(cmd, stdout=sys.stdout, stderr=subprocess.STDOUT) | |
| return res | |
| def HHMMSS_to_sec(time_str): | |
| """Get Seconds from timestamp string with milliseconds.""" | |
| if not time_str: | |
| return None | |
| if isinstance(time_str, (int, float)): | |
| return float(time_str) | |
| if time_str.count(':')==2: | |
| h, m, s = time_str.split(':') | |
| elif time_str.count(':')==3: | |
| # weird timestamps where there is a field followign seconds delimited by colon | |
| h, m, s, u = time_str.split(':') | |
| # determine whether ms field is in tenths or hundredths or thousandths by countng how many digits | |
| if len(u)==1: | |
| print('Weird time format with 3 colons detected - HH:MM:SS:X . Interpreting X as tenths of a second. - please verify this is how you want the time interpreted') | |
| ms = float(u)/10 | |
| elif len(u)==2: # hundredths | |
| print('Weird time format with 3 colons detected - HH:MM:SS:XX . Interpreting XX as hundredths of a second. - please verify this is how you want the time interpreted') | |
| ms = float(u)/100 | |
| elif len(u)==3: # hundredths | |
| print('Weird time format with 3 colons detected - HH:MM:SS:XXX . Interpreting XX as milliseconds. - please verify this is how you want the time interpreted') | |
| ms = float(u)/1000 | |
| else: | |
| print(f'input string format not supported: {time_str}') | |
| return None | |
| s = int(s)+ms | |
| elif time_str.count(':')==1: | |
| # print('missing HH from timestamp, assuming MM:SS') | |
| m, s = time_str.split(':') | |
| h=0 | |
| else: | |
| try: | |
| time_str=float(time_str) # maybe its already in seconds! | |
| return time_str | |
| except Exception as e: | |
| gr.Error(f"Error converting time to seconds: {e}") | |
| return None | |
| return int(h) * 3600 + int(m) * 60 + float(s) | |
| def molly_xlsx_to_table(xl_file): | |
| # contractor transcribers provide an xlsx with the following columns | |
| # utt_ix: int | |
| # Timecode: "HH:MM:SS:ss - HH:MM:SS:ss" | |
| # Duration: HH:MM:SS:ss | |
| # Speaker: str | |
| # Dialogue: str | |
| # Annotations: blank | |
| # Error Type: blank | |
| with pd.ExcelFile(xl_file) as xls: | |
| sheetname = xls.sheet_names | |
| table = pd.DataFrame(pd.read_excel(xls, sheetname[0])) | |
| table[['start_time','end_time']] = table['Timecode'].str.split('-',expand=True) | |
| table['start_sec'] = table['start_time'].str.strip().apply(HHMMSS_to_sec) | |
| table['end_sec'] = table['end_time'].str.strip().apply(HHMMSS_to_sec) | |
| table.drop(labels=['Annotations','Error Type','Duration'], axis=1, inplace=True) | |
| table=table[['#','Speaker','Dialogue','start_sec','end_sec']] | |
| table.rename(columns={'#':'uttID','Speaker':'speaker', 'Dialogue':'transcript'}, inplace=True) | |
| return table | |
| def xlsx_to_table(xl_file): | |
| try: | |
| # read the first sheet of the Excel file into a DataFrame | |
| print(f'...reading {xl_file}...') | |
| table = pd.read_excel(xl_file, sheet_name=0) | |
| print(f'...done reading {xl_file}...') | |
| # convert column names to lowercase | |
| table.columns = map(str.lower, table.columns) | |
| # extract start and end time from the Timecode column | |
| print(f'...splitting Timecode column into start and end time...') | |
| timecodes = table['timecode'].str.split(' - ', expand=True) | |
| table['start_time'] = timecodes[0] | |
| table['end_time'] = timecodes[1] | |
| print(f'...done splitting Timecode column into start and end time...') | |
| # convert start and end time to seconds using the HHMMSS_to_sec function | |
| print(f'...converting start and end time to seconds...') | |
| table['start_sec'] = table['start_time'].apply(HHMMSS_to_sec) | |
| table['end_sec'] = table['end_time'].apply(HHMMSS_to_sec) | |
| print(f'...done converting start and end time to seconds...') | |
| # drop unnecessary columns | |
| print(f'...dropping unnecessary columns...') | |
| table.drop(['timecode', 'annotations', 'error type', 'duration'], axis=1, inplace=True) | |
| # rename columns | |
| print(f'...renaming columns...') | |
| table.rename(columns={'#': 'uttID', 'speaker': 'speaker', 'dialogue': 'transcript'}, inplace=True) | |
| # reorder columns | |
| print(f'...reordering columns...') | |
| table = table[['uttID', 'speaker', 'transcript', 'start_sec', 'end_sec']] | |
| # sort by start time | |
| table.sort_values('start_sec', inplace=True) | |
| return table | |
| except Exception as e: | |
| gr.Error(f'Error converting {xl_file}: {e}') | |
| def table_to_ELAN_tsv(table:pd.DataFrame, path:str): | |
| # write table to tsv compatible with ELAN import | |
| table.to_csv(path, index=False, float_format='%.3f',sep='\t') | |
| return path | |
| def convert_and_trim_video(media_in, media_out, start=None, end=None): | |
| WAV_CHANNELS = 1 | |
| WAV_SAMPLE_RATE = 16000 | |
| start_sec = HHMMSS_to_sec(start) | |
| end_sec = HHMMSS_to_sec(end) | |
| try: | |
| if start_sec is None and end_sec is None: | |
| logging.info(f'...No start and end times provided. Converting entire video without trimming...') | |
| trim_command=[] | |
| else: | |
| if start_sec is None: | |
| logging.info(f'...No start time provided. Trimming video from start to specified end...') | |
| start_sec = 0.0 | |
| trim_command = ['-ss',f'{start_sec}'] | |
| if end_sec is None: | |
| logging.info(f'...No end time provided. Trimming video from specified start to end of video...') | |
| end_sec = None | |
| else: | |
| trim_command.extend(['-to', f'{end_sec}']) | |
| if not isinstance(media_in, (str, Path)): | |
| raise TypeError("media_in must be a string or a PathLike object") | |
| if not isinstance(media_out, (str, Path)): | |
| raise TypeError("media_out must be a string or a PathLike object") | |
| in_ext = Path(media_in).suffix.lower() | |
| out_ext = Path(media_out).suffix.lower() | |
| print(f'...detected extensions from filename: input={in_ext} output={out_ext}') | |
| if in_ext == out_ext: | |
| logging.info(f'...No media conversion needed...') | |
| else: | |
| logging.info(f'...Using ffmpeg to convert {in_ext} to {out_ext}...') | |
| if out_ext == '.wav': | |
| if in_ext == '.webm': | |
| command = [ | |
| 'ffmpeg', '-y', | |
| '-i', media_in, | |
| *trim_command, | |
| media_out, | |
| '-hide_banner', '-loglevel', 'info'] | |
| else: | |
| # convert to wav with standard format for audio models | |
| command = [ | |
| 'ffmpeg', | |
| "-f", "s16le", | |
| '-y', | |
| '-i', media_in, | |
| *trim_command, | |
| '-vn', | |
| '-acodec', 'pcm_s16le', | |
| '-ac', str(WAV_CHANNELS), | |
| '-ar', str(WAV_SAMPLE_RATE), | |
| media_out, | |
| '-hide_banner', '-loglevel', 'info'] | |
| else: # convert using copy codec | |
| if in_ext == '.webm': | |
| command = [ | |
| 'ffmpeg', '-y', | |
| '-i', media_in, | |
| '-strict', '-2', | |
| *trim_command, | |
| '-c:v', 'copy', | |
| # '-vcodec', 'h264', | |
| # '-acodec', 'aac', | |
| media_out, | |
| '-hide_banner', '-loglevel', 'info'] | |
| else: # not webm | |
| command = [ | |
| 'ffmpeg', | |
| '-y', | |
| '-i', media_in, | |
| *trim_command, | |
| '-c','copy', | |
| media_out, | |
| '-hide_banner', '-loglevel', 'info'] | |
| # run the ffmpeg command | |
| logging.info(f"FFMPEG command: {' '.join(command)}") | |
| gr.Info(f"FFMPEG command: {' '.join(command)}", visible=False) | |
| print(f"...FFMPEG command: {' '.join(command)}") | |
| # process = subprocess.run(command, capture_output=True, text=True) | |
| # if process.returncode != 0: | |
| # logging.info(f"FFMPEG error: {process.stderr}") | |
| # print(f"FFMPEG error: {process.stderr}") | |
| # gr.Error(f"FFMPEG error: {process.stderr}") | |
| # else: | |
| # logging.info(process.stdout) | |
| # print(f"...FFMPEG status: {process.stdout}") | |
| return_code = subprocess_run_verbose(command) | |
| print(f"FFMPEG return code: {return_code}") | |
| if return_code != 0: | |
| logging.info(f"FFMPEG error: {return_code}") | |
| print(f"FFMPEG error: {return_code}") | |
| gr.Error(f"FFMPEG error: {return_code}") | |
| return None | |
| else: | |
| logging.info(f"...FFMPEG completed successfully...") | |
| print(f"...FFMPEG completed successfully...") | |
| return media_out | |
| except Exception as e: | |
| print(f"Error converting video format: {e}") | |
| gr.Error(f"Error converting video format: {e}") | |
| ###### TRANSCRIT UTILS ###### | |
| def convert_transcript_for_TM(file_list): | |
| """Convert transcripts for TalkMoves Annotation | |
| Input can be xlsx or csv transcript file | |
| Can handle sepraate start and end time columns or a single timecode column | |
| Output will have separate start and end timestamps in HH:MM:SS.sss format | |
| Args: | |
| file_list (_type_): _description_ | |
| Raises: | |
| gr.Error: _description_ | |
| gr.Error: _description_ | |
| Returns: | |
| _type_: _description_ | |
| """ | |
| # Regular expression pattern for matching speaker names and timecodes. | |
| bracket_re = re.compile(r'(?:\[[UI|ui|Inaudible|inaudible|overlapping speech|VIDEO SILENCE|teacher explaining in background].*\]\W{0,2})') | |
| # Regular expression pattern for matching anything enclosed in square brackets. | |
| all_bracket_re = re.compile(r'(?:\[.*\]\W{0,2})') | |
| # whether remove the inaudible | |
| do_remove_inaudible = True | |
| # whether_keep_context_switch | |
| do_keep_context_switch = True | |
| # whether_convert_to_timestamp if start and end time are in seconds and in separate columns | |
| convert_to_timestamp = True | |
| error_message = [] # List of error messages to be displayed to the user. | |
| global_stat_dict = {} # Dictionary of global statistics. | |
| output_filepath_list = [] # List of output file paths. | |
| trans_log_filepath_list = [] # List of transcription log file paths. | |
| for file in file_list: | |
| filename = file.split('/')[-1] # Get the filename from the file. | |
| filepath = os.path.dirname(file) # Get the file path from the file. | |
| # Read the file into a Pandas DataFrame depending on its file format. | |
| if filename.endswith('.xlsx'): | |
| df = pd.read_excel(file, index_col=0) | |
| output_filename = f"{filename[:-5]}" + "_TMcoded.xlsx" | |
| elif filename.endswith('.csv'): | |
| df = pd.read_csv(file, index_col=0, error_bad_lines=False) | |
| output_filename = f"{filename[:-4]}" + "_TMcoded.xlsx" | |
| else: | |
| raise gr.Error(f"{file} format is wrong") | |
| # Remove the "Copy of" prefix from the output filename, if present. | |
| if output_filename.startswith("Copy of "): | |
| output_filename = output_filename[8:] | |
| # Remove the word "_Transcript" from the output filename, if present. | |
| if '_Transcript' in output_filename: | |
| # print("before: "+output_filename) | |
| error_message.append("before: "+output_filename) | |
| output_filename = ''.join(output_filename.split('_Transcript')) | |
| # print("after: "+output_filename) | |
| error_message.append("after: "+output_filename) | |
| # Construct the output file and transcription log file paths. | |
| output_filepath = os.path.join(filepath, output_filename) | |
| trans_log_filepath = os.path.join(filepath, f"{output_filename}"+ ".log") | |
| # Open the transcription log file for writing. | |
| with open(trans_log_filepath, "w") as outfile: | |
| sub_cnt_in_file = 0 | |
| empty_speaker_cnt_in_file = 0 | |
| turn_skipped_in_file = 0 | |
| turn_skipped_speaker_switch_in_file = 0 | |
| snt_mark_skip_in_file = 0 | |
| snt_skipped_in_file = 0 | |
| chat_flag_in_speaker_time_line = 0 | |
| chat_flag_in_content_line = 0 | |
| all_inaudible_in_file = 0 | |
| all_bracket_in_file = 0 | |
| all_snts_in_file = 0 | |
| all_token_cnt_in_file = 0 | |
| #index Timecode Duration Speaker Dialogue Annotations Error Type | |
| #1 00:00:05:04 - 00:00:07:12 00:00:02:08 Tutor Did you... How was your Halloween? | |
| turns = [] | |
| time_stamps = [] | |
| speakers = [] | |
| chat_flags = [] | |
| sentences = [] | |
| snt_ids = [] | |
| ## parse the df flexibly: find key column names which might vary dependign on transcript source | |
| # set all column names to lowercase | |
| df.columns = map(str.lower, df.columns) | |
| # several possibilities for column names, detect which are present | |
| uttID_keys = ['utt','seg','utt_id','seg_id','index'] | |
| speaker_keys = ['speaker'] | |
| start_keys=['start_sec','start','start_time','timestart'] | |
| end_keys=['end_sec','end','end_time','timeend'] | |
| timestamp_keys = ['timecode','timestamp'] | |
| content_keys=['dialogue','utterance','transcript','text'] | |
| # detect which is used in this df | |
| uttID_key = next((key for key in uttID_keys if key in df.columns), None) | |
| speaker_key = next((key for key in speaker_keys if key in df.columns), None) | |
| content_key = next((key for key in content_keys if key in df.columns), None) | |
| # check if separate start and end times are present, otherwise assume single timecode column | |
| if any(df.columns.isin(start_keys)): | |
| start_key = next((key for key in start_keys if key in df.columns), None) | |
| end_key = next((key for key in end_keys if key in df.columns), None) | |
| time_format = 'seconds' | |
| if convert_to_timestamp: | |
| # convert to timestamp format HH:MM:SS.sss - HH:MM:SS.sss | |
| df['timecode'] = df.apply(lambda x: f"{sec_to_HHMMSS(x[start_key])} - {sec_to_HHMMSS(x[end_key])}", axis=1) | |
| timestamp_key='timecode' | |
| time_format = 'timestamp' | |
| else: | |
| timestamp_key=next((key for key in timestamp_keys if key in df.columns), None) | |
| time_format = 'timestamp' | |
| # Turn started with 1, the same as molly's transcripts | |
| for i, row in df.iterrows(): | |
| turn = row[uttID_key] if uttID_key else i+1 | |
| speaker = row[speaker_key] | |
| time_str = row[timestamp_key] | |
| content = "" if pd.isna(row[content_key]) else row[content_key].strip("\n") | |
| # when speaker is empty, use the previous speaker | |
| if speaker == "": | |
| if speakers: | |
| speaker = speakers[-1] | |
| empty_speaker_cnt_in_file += 1 | |
| outfile.write(f"{turn}: found empty speaker, use the speaker in previous turn: {speaker}\n") | |
| else: | |
| raise gr.Error(f"{row}, the first turn is empty speaker") | |
| # clean after the sentence tokenize | |
| snts = sent_tokenize(content) | |
| all_snts_in_file += len(snts) | |
| snt_skipped_in_turn = 0 | |
| for i, snt in enumerate(snts): | |
| remove_flag = False | |
| inaudible_search = re.findall(bracket_re, snt) | |
| if inaudible_search: | |
| all_inaudible_in_file += len(inaudible_search) | |
| outfile.write(f"{turn}, {inaudible_search}, inaudible found in snt: {snt}\n") | |
| all_bracket_search = re.findall(all_bracket_re, snt) | |
| if all_bracket_search: | |
| all_bracket_in_file += len(all_bracket_search) | |
| outfile.write(f"{turn}, {all_bracket_search} bracket found in snt: {snt}\n") | |
| # only remove the [inaudible xxx] when it is the whole sentence. | |
| inaudible_match = re.fullmatch(bracket_re, snt) | |
| if inaudible_match: | |
| if do_keep_context_switch: | |
| # if keep context switch | |
| if speakers and speaker == speakers[-1]: | |
| # share the same speaker, no context switching, just remove it | |
| remove_flag = True | |
| else: | |
| # different speakers, it is the context switching. | |
| if len(snts) == 1: | |
| # current empty sentence is the only single sentence | |
| remove_flag = False | |
| else: | |
| if i != len(snts)-1: | |
| # current empty utterance is not the last one, just delete it | |
| remove_flag = True | |
| else: | |
| # current empty utterance is the last one, keep it. | |
| if snt_skipped_in_turn == len(snts)-1: | |
| # all previous snts are empty, then keep this to not skip the whole turn | |
| remove_flag = False | |
| else: | |
| remove_flag = True | |
| else: | |
| # if not keep context switch, then simply remove all empty utterance | |
| remove_flag = True | |
| # If remove_flag is true: | |
| if remove_flag: | |
| # Increment sub_cnt_in_file and snt_mark_skip_in_file | |
| sub_cnt_in_file += 1 | |
| snt_mark_skip_in_file += 1 | |
| # Write the following message to outfile: | |
| outfile.write(f"{turn}, sub happend: {snt}, skip this sentence\n") | |
| # If do_remove_inaudible is true: | |
| if do_remove_inaudible: | |
| snt_skipped_in_file += 1 | |
| snt_skipped_in_turn += 1 | |
| continue | |
| # Add to pd: | |
| # Append turn to turns list | |
| turns.append(turn) | |
| # Set snt_id to the string f"{turn}.{i}" | |
| snt_id = f"{turn}.{i}" | |
| # Append time_str to time_stamps list | |
| time_stamps.append(time_str) | |
| # Append speaker to speakers list | |
| speakers.append(speaker) | |
| # Set sentence to the string representation of snt, with whitespace removed from the start and end | |
| sentence = str(snt).strip().rstrip("\n") | |
| # Calculate the number of tokens in sentence and add to all_token_cnt_in_file | |
| token_cnt = len(nltk.word_tokenize(sentence)) | |
| all_token_cnt_in_file += token_cnt | |
| # Append snt_id to snt_ids list | |
| snt_ids.append(snt_id) | |
| # Append sentence to sentences list | |
| sentences.append(sentence) | |
| if snt_skipped_in_turn == len(snts): | |
| # all snts in turn are skiped, then skip the turn | |
| turn_skipped_in_file += 1 | |
| if (speakers and speaker != speakers[-1]) or not speakers: | |
| turn_skipped_speaker_switch_in_file += 1 | |
| outfile.write(f"{turn}, since all snts are empty, skip this whole turn {content}\n") | |
| # Create a new DataFrame with the following columns: | |
| new_df = pd.DataFrame({ | |
| "Sentence_ID": snt_ids, # A | |
| "TimeStamp": time_stamps, #B | |
| "Turn" : turns, #C | |
| "Speaker" : speakers, #D | |
| "Sentence" : sentences #E | |
| }) | |
| # assert turn_skipped_speaker_switch_in_file==0, "Some speaker switch turn skipped" | |
| new_df["Teacher_TM"] = None #F | |
| new_df["Student_TM"] = None #G | |
| # write new_df to xlsx file | |
| new_df.to_excel(output_filepath, index=False) | |
| # https://openpyxl.readthedocs.io/en/latest/api/openpyxl.utils.dataframe.html#openpyxl.utils.dataframe.dataframe_to_rows | |
| wb = Workbook() | |
| ws = wb.active | |
| teacher_dv = DataValidation(type="list", formula1='",1-None,2-Keep-Together,3-Getting-Student-to-Relate,4-Restating,5-Revoicing,6-Context,7-Press-for-Accuracy,8-Press-for-Reasoning"', allow_blank=True) | |
| student_dv = DataValidation(type="list", formula1='",1-None,2-Relate-to-Another-Student,3-Asking-for-More-info,4-Making-a-Claim,5-Providing-Evidence/Reasoning"', allow_blank=True) | |
| ws.add_data_validation(teacher_dv) | |
| ws.add_data_validation(student_dv) | |
| teacher_dv.add('F2:F1048576') | |
| student_dv.add('G2:G1048576') | |
| for r in dataframe_to_rows(new_df, index=False, header=True): | |
| ws.append(r) | |
| wb.save(output_filepath) | |
| stat_dict = { | |
| "chat_flag_in_speaker_time_line": chat_flag_in_speaker_time_line, | |
| "chat_flag_in_content_line": chat_flag_in_content_line, | |
| "empty_speaker_cnt_in_file": empty_speaker_cnt_in_file, | |
| "ori_total_turn": df.shape[0], | |
| "ori_total_snt": all_snts_in_file, | |
| "turn_skipped": turn_skipped_in_file, | |
| "turn_skipped_speaker_switch_in_file": turn_skipped_speaker_switch_in_file, | |
| "snt_skipped": snt_skipped_in_file, | |
| "remaining_snt": all_snts_in_file - snt_skipped_in_file, | |
| "all_token_cnt_in_file": all_token_cnt_in_file, | |
| "avg_token_cnt_per_snt": all_token_cnt_in_file/(all_snts_in_file - snt_skipped_in_file), | |
| "sub_cnt_in_file": sub_cnt_in_file, | |
| "all_inaudible_in_file": all_inaudible_in_file, | |
| "all_bracket_in_file": all_bracket_in_file, | |
| "other_bracket_in_file": all_bracket_in_file - all_inaudible_in_file | |
| } | |
| if all_inaudible_in_file != all_bracket_in_file: | |
| # print(f"{filename} has special brakets") | |
| error_message.append(f"Warning: {filename} has special brakets") | |
| for k, v in stat_dict.items(): | |
| global_stat_dict[k] = global_stat_dict.get(k,0) + v | |
| outfile.write(f"{output_filepath}, {json.dumps(stat_dict, indent=4)}") | |
| output_filepath_list.append(output_filepath) | |
| trans_log_filepath_list.append(trans_log_filepath) | |
| for k, v in global_stat_dict.items(): | |
| if "avg" in k: | |
| global_stat_dict[k] = global_stat_dict[k]/len(file_list) | |
| global_log_filepath = os.path.join(filepath, "global_transfer"+ ".log") | |
| with open(global_log_filepath, "w") as outfile: | |
| outfile.write(f"global_stat_dict: {json.dumps(global_stat_dict, indent=4)}") | |
| # error_check | |
| if global_stat_dict["all_inaudible_in_file"] != global_stat_dict["all_bracket_in_file"]: | |
| error_message.append("Error: 'all_inaudible_in_file' does not match 'all_bracket_in_file'") | |
| if global_stat_dict["other_bracket_in_file"] != 0: | |
| error_message.append("Error: 'other_bracket_in_file' is not zero") | |
| return output_filepath_list, trans_log_filepath_list, error_message, global_log_filepath | |
| def add_CPS_columns(df): | |
| # Observation Instructions CONST_SharesU_Situation CONST_SharesU_CorrectSolutions CONST_SharesU_IncorrectSolutions CONST_EstablishesCG_Confirms CONST_EstablishesCG_Interrupts NEG_Responds_Reasons NEG_Responds_QuestionsOthers NEG_Responds_Responds MAINTAIN_Initiative_Criticizes NEG_MonitorsE_Results NEG_MonitorsE_GivingUp NEG_MonitorsE_Strategizes NEG_MonitorsE_Save MAINTAIN_Initiative_Suggestions MAINTAIN_Initiative_Compliments MAINTAIN_FulfillsR_InitiatesOffTopic MAINTAIN_FulfillsR_JoinsOffTopic MAINTAIN_FulfillsR_Support MAINTAIN_FulfillsR_Apologizes Notes | |
| annotation_columns = ['Observation','Instructions', 'CONST_SharesU_Situation', 'CONST_SharesU_CorrectSolutions', 'CONST_SharesU_IncorrectSolutions', 'CONST_EstablishesCG_Confirms', 'CONST_EstablishesCG_Interrupts', 'NEG_Responds_Reasons', 'NEG_Responds_QuestionsOthers', 'NEG_Responds_Responds', 'MAINTAIN_Initiative_Criticizes', 'NEG_MonitorsE_Results', 'NEG_MonitorsE_GivingUp', 'NEG_MonitorsE_Strategizes', 'NEG_MonitorsE_Save', 'MAINTAIN_Initiative_Suggestions', 'MAINTAIN_Initiative_Compliments', 'MAINTAIN_FulfillsR_InitiatesOffTopic', 'MAINTAIN_FulfillsR_JoinsOffTopic', 'MAINTAIN_FulfillsR_Support', 'MAINTAIN_FulfillsR_Apologizes', 'Notes'] | |
| # add these columns to the end of the df in this order | |
| for col in annotation_columns: | |
| df[col]='' | |
| return df | |
| def add_TM_columns(df): | |
| annotation_columns = ['Teacher_TM', 'Student_TM'] | |
| # add these columns to the end of the df in this order | |
| for col in annotation_columns: | |
| df[col]='' | |
| return df | |
| def convert_transcript_for_annotation(file, annotation_scheme=None): | |
| """Convert transcript for annotation: | |
| Input standard csv transcript file | |
| Output will have separate start and end timestamps in HH:MM:SS.sss format | |
| Filename column will infer the video filename from the transcript filename | |
| Columns for CPS annotators are added | |
| """ | |
| filename,ext = os.path.splitext(os.path.basename(file)) # Get the filename from the file. | |
| filepath = os.path.dirname(file) # Get the file path from the file. | |
| # Read the file into a Pandas DataFrame depending on its file format. | |
| try: | |
| table = parse_label_csv(file) | |
| media_filename = get_sessname_from_filename(filename) | |
| out_df=table.copy() | |
| out_df['recordingID']=media_filename | |
| out_df['TimeStart']=out_df['start_sec'].apply(sec_to_HHMMSS) | |
| out_df['TimeEnd']=out_df['end_sec'].apply(sec_to_HHMMSS) | |
| out_df=out_df[['speaker','TimeStart','TimeEnd','utterance','recordingID','uttID']] | |
| if annotation_scheme=='CPS': | |
| out_df=add_CPS_columns(out_df) | |
| output_file = os.path.join(filepath, f"CPS_{filename}.xlsx") | |
| out_df.to_excel(output_file, index=False) | |
| elif annotation_scheme=='TM': | |
| out_df=add_TM_columns(out_df) | |
| output_file = os.path.join(filepath, f"TM_{filename}.xlsx") | |
| out_df.to_excel(output_file, index=False) | |
| else: | |
| output_file = os.path.join(filepath, f"{filename}.xlsx") | |
| out_df.to_excel(output_file, index=False) | |
| return output_file | |
| except Exception as e: | |
| raise gr.Error(f"{filename}: error {e}") | |
| def sec_to_HHMMSS(seconds): | |
| """Get timestamp string from seconds.""" | |
| seconds = float(seconds) | |
| m, s = divmod(seconds, 60) | |
| h, m = divmod(m, 60) | |
| h=int(h) | |
| m=int(m) | |
| return f"{h:02d}:{m:02d}:{s:06.3f}" | |
| def readELANtsv(file, fmt=None): | |
| with open(file,'r',newline='') as in_file: | |
| reader = csv.reader(in_file, delimiter="\t", quoting=csv.QUOTE_NONE) | |
| skiprows=0 | |
| row=next(reader) | |
| while not len(row)>=4: # 4 being the min numbert of cols ELAN exports have | |
| skiprows+=1 | |
| row=next(reader) | |
| in_file.seek(skiprows) | |
| if skiprows>0: | |
| print(f'Detected {skiprows} header rows to skip') | |
| reader = csv.reader(in_file, delimiter="\t") | |
| for _ in range(skiprows): | |
| next(reader) | |
| labels = [] # transcript with speaker labels and timestamp in sec | |
| for i,utt in enumerate(reader): | |
| if not ''.join(utt).strip(): # skip blank lines | |
| continue | |
| try: | |
| if len(utt) == 5: # IF data comes straight from ELAN sometimes there is a superfluous blank column 2 | |
| if i==0: | |
| print('detected extra blank column in first row, will remove') | |
| if fmt=='AUG23': | |
| if i==0: | |
| print('detected extra blank 1st column, will remove') | |
| _,speaker,start_HHMMSS,end_HHMMSS,utterance= utt | |
| convert_timestamps=True | |
| else: | |
| if i==0: | |
| print('detected extra blank 2nd column, will remove') | |
| speaker,_,start_HHMMSS, end_HHMMSS, utterance = utt | |
| convert_timestamps=True | |
| elif len(utt) == 4: # sometimes the blank col is already removed | |
| if i==0: | |
| print('detected 4 columns, assuming: speaker,start_HHMMSS, end_HHMMSS, utterance ') | |
| speaker,start_HHMMSS, end_HHMMSS, utterance = utt | |
| convert_timestamps=True | |
| elif len(utt) == 6: # New one from 2023 Aug has a redundant extra start col!? | |
| if i==0: | |
| print('detected 6 columns, assuming: _,speaker,start_HHMMSS, end_HHMMSS, utterance,_ ') | |
| _,speaker,start_HHMMSS,end_HHMMSS,utterance,_ = utt | |
| convert_timestamps=True | |
| elif len(utt) == 9: # 2023 transcribers tend to give full elan output | |
| if i==0: | |
| print('detected 9 columns, assuming: speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance ') | |
| speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance = utt | |
| convert_timestamps=True | |
| elif len(utt) == 10: # sometimes an extra blank column appears at the end | |
| if i==0: | |
| print('detected 10 columns, assuming: speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance,_ ') | |
| speaker,_,start_HHMMSS,_,end_HHMMSS,_,_,_,utterance,_ = utt | |
| convert_timestamps=True | |
| elif len(utt) == 12: # WOw how many redundant columns can ELAN make... | |
| if i==0: | |
| print('detected 12 columns, assuming: speaker,_,start_HHMMSS,_,_,end_HHMMSS,_,_,_,_,_,utterance ') | |
| speaker,_,start_HHMMSS,_,_,end_HHMMSS,_,_,_,_,_,utterance = utt | |
| convert_timestamps=True | |
| else: | |
| raise ValueError(f'Unknown transcript format with {len(utt)} columns for {file}') | |
| except BaseException as err: | |
| print(f'!!! transcript parse error on line {i} for {file}') | |
| print(utt) | |
| raise err | |
| if convert_timestamps: | |
| start_sec = HHMMSS_to_sec(start_HHMMSS) | |
| end_sec = HHMMSS_to_sec(end_HHMMSS) | |
| labels.append((speaker, utterance, start_sec,end_sec)) | |
| labels= pd.DataFrame(labels, columns = ('speaker', 'utterance', 'start_sec','end_sec')) | |
| labels.sort_values(by='start_sec', inplace=True, ignore_index=True) | |
| labels.reset_index(inplace=True) | |
| labels = labels.rename(columns = {'index':'seg'}) | |
| return(labels) | |
| def merge_ellipsis(seg_labels): | |
| # merge utterances with ellipsis | |
| # input is seg_labels format: [optional index] speaker, utterance, start_sec, end_sec | |
| if isinstance(seg_labels,str) and seg_labels.endswith(('.csv','.tsv','.txt')): | |
| df=pd.read_csv(seg_labels) | |
| elif isinstance(seg_labels, pd.DataFrame): | |
| df=seg_labels | |
| else: | |
| raise ValueError('input seg_labels should be path to csv or pd.DataFrame') | |
| if len(df.columns)==4: | |
| # no seg index yet | |
| df.reset_index(inplace=True) | |
| df = df.rename(columns = {'index':'seg'}) | |
| elif len(df.columns)==5: | |
| # first col is seg | |
| df = df.rename(columns = {df.columns[0]:'seg'}) | |
| else: | |
| raise ValueError('input seg_labels should have 4 or 5 columns') | |
| df2=[] | |
| prev_spk=None | |
| prev_utt="" | |
| prev_start=0 | |
| prev_end=0 | |
| segs=[0] | |
| merge_utt={"seg":None, "speaker":None,"utterance":None,"start_sec":None, "end_sec":None} | |
| for i,row in df.iterrows(): | |
| if i==0: | |
| merge_utt=row | |
| else: | |
| # if same speaker as last and ellipsis | |
| if merge_utt["speaker"]==row["speaker"] and str(merge_utt["utterance"]).endswith('...') and str(row["utterance"]).startswith('...'): | |
| # append current to temporary merged utt: use prev_ items | |
| merge_utt["utterance"]+=str(row["utterance"]) | |
| merge_utt["end_sec"]=row["end_sec"] | |
| segs.append(row["seg"]) | |
| else: | |
| # append merge_utt to df2 | |
| merge_utt["seg"]=segs | |
| df2.append(merge_utt) | |
| # clear merge_utt and set to current | |
| merge_utt=row | |
| segs=[merge_utt["seg"]] | |
| merge_utt["seg"]=segs | |
| # if not isinstance(merge_utt["seg"],list): | |
| # merge_utt["seg"]=list(segs) | |
| df2.append(merge_utt) # catch final merge_utt if not terminated | |
| df2=pd.DataFrame(df2) | |
| df2['utterance']=df2['utterance'].str.replace('\\.+',' ', regex=True) | |
| # clear up "......" | |
| # enumerate utterances | |
| df2.reset_index(inplace=True,drop=True) | |
| df2 = df2.reset_index().rename(columns = {'index':'utt'}) | |
| return df2 | |
| def add_dummy_seg_column(table): | |
| # adds a dummy seg column (listing segments comprising utterance) for a df without this column | |
| # labelfiles generated from merge_ellipsis have an 'utt' column giving utterance ID, and a seg column | |
| # containing a list of original segments comprising each utterance | |
| # but you may need all label files top have the exact same format even if they weren't produced by | |
| # merge_ellipsis() | |
| # returns a table with columns 'utt' and 'seg' | |
| if 'seg' in table.columns.tolist(): | |
| print('\'seg\' column already exists, not changing anything') | |
| return table | |
| if 'uttID' in table.columns.tolist(): | |
| table=table.rename(columns={"uttID":"utt"}) | |
| if not 'utt' in table.columns.tolist(): | |
| table['utt']=table.index | |
| table['seg']=[[u] for u in table['utt']] | |
| table=table[['utt','seg','speaker','start_sec','end_sec','utterance']] | |
| return table | |
| def get_sessname_from_filename(filename): | |
| sessname=Path(filename).stem | |
| sessname = re.sub('reworked-transcript-diarized-timestamped-', '', sessname,flags=re.I) | |
| sessname = re.sub('reworked_transcript-diarized-timestamped-', '', sessname,flags=re.I) | |
| sessname = re.sub('reworked-diarized-timestamped-', '', sessname,flags=re.I) | |
| sessname = re.sub('reworked_timestamped_', '', sessname,flags=re.I) | |
| sessname = re.sub('reworked_', '', sessname,flags=re.I) | |
| sessname = re.sub('reworked-', '', sessname,flags=re.I) | |
| sessname = re.sub('transcript_diarized_timestamped_', '', sessname,flags=re.I) | |
| sessname = re.sub('transcript-diarized-timestamped_', '', sessname,flags=re.I) | |
| sessname = re.sub('transcript-diarized-timestamped-', '', sessname,flags=re.I) | |
| sessname = re.sub('_transcript', '', sessname,flags=re.I) | |
| sessname = re.sub('_tmcoded', '', sessname,flags=re.I) | |
| sessname = re.sub('utt_labels_', '', sessname,flags=re.I) | |
| sessname = re.sub('seg_labels_', '', sessname,flags=re.I) | |
| sessname = re.sub('_redacted', '', sessname,flags=re.I) | |
| return sessname | |
| def ELAN_to_labels_csv(ELANfile, merge_segments = True): | |
| # dumb but effective string wrangling to get sess name | |
| sessname=get_sessname_from_filename(ELANfile) | |
| # reads ELAN output to pd.DataFrame in a unified format | |
| labels=readELANtsv(ELANfile) | |
| if merge_segments: | |
| save_file=f'utt_labels_{sessname}.csv' | |
| # merge segments to form utterances where there have been splits separated by '...' | |
| merged_labels=merge_ellipsis(labels) | |
| merged_labels.to_csv(save_file,index=False, float_format='%.3f') | |
| else: | |
| save_file=f'seg_labels_{sessname}.csv' | |
| labels.to_csv(save_file,index=False, float_format='%.3f') | |
| return save_file | |
| def parse_label_csv(label_csv:str): | |
| # utt_labels_csv is the usual format used for diarized, timed transcripts in this repo | |
| # There are several versions with differnt columns (with/without segment &/ utterance index, | |
| # withouot column headers etc) | |
| # table: | |
| # [uttID, speaker, transcript, start_sec, end_sec] | |
| table = pd.read_csv(label_csv,keep_default_na=False, header=None) | |
| row0=table.iloc[0] | |
| is_header = not any(str(cell).replace('.','').isdigit() for cell in row0) | |
| if is_header: | |
| table.columns=row0.tolist() | |
| table=table.iloc[1:] | |
| table=table.reset_index(drop=True) | |
| else: | |
| if len(table.columns)==4: | |
| print('no header detected, assuming annotation file has columns [speaker,utterance,start_sec, end_sec] ') | |
| table.columns=['speaker','utterance','start_sec', 'end_sec'] | |
| elif len(table.columns)==5: | |
| print('no header detected, assuming annotation file has columns [seg,speaker,utterance,start_sec, end_sec] ') | |
| table.columns=['seg','speaker','utterance','start_sec', 'end_sec'] | |
| elif len(table.columns)==6: | |
| print('no header detected, assuming annotation file has columns [utt,seg,speaker,utterance,start_sec, end_sec] ') | |
| table.columns=['utt','seg','speaker','utterance','start_sec', 'end_sec'] | |
| else: | |
| print(f'no header detected, csv has {len(table.columns)} columns, could not determine column names.') | |
| return None | |
| # choose which column to use for uttID in table | |
| if 'utt' in table.columns.tolist(): | |
| table=table.rename(columns={"utt":"uttID"}).drop('seg', axis=1) | |
| elif 'seg' in table.columns.tolist(): | |
| table=table.rename(columns={"seg":"uttID"}) | |
| else: | |
| table=table.reset_index().rename(columns={"index":"uttID"}) | |
| table=table[['uttID','speaker','utterance','start_sec','end_sec']] | |
| return table | |
| def deidentify_speaker(df, who='all'): | |
| """replace speaker ID with generic labels | |
| in order of appearance (speaker1, speaker2)' | |
| if who is "student", only student names are replaced | |
| Args: | |
| df (_type_): _description_ | |
| who (str, optional): 'all','student'. Which names to replace. Defaults to 'all'. | |
| """ | |
| colnames = df.columns.tolist() | |
| speaker_key = next((key for key in ['speaker','Speaker','speaker_id','Speaker_ID'] if key in colnames),None) | |
| if not speaker_key: | |
| raise ValueError('No speaker column found in dataframe!') | |
| speakers = df[speaker_key].unique() | |
| if who=='student': | |
| # detect student. ID format can be student_xxx or 00-0000 numeric | |
| speakers = [s for s in speakers if ('student' in s.lower() or re.match(r'^\d{2}-\d{4}$',s))] | |
| generic_speakers = [f'student_{i+1}' for i in range(len(speakers))] | |
| else: | |
| generic_speakers = [f'speaker_{i+1}' for i in range(len(speakers))] | |
| speaker_dict = dict(zip(speakers, generic_speakers)) | |
| df[speaker_key] = df[speaker_key].replace(speaker_dict) | |
| return df |