# This is version 2 updated on 17th Sept 2024. # Uses the Whiper Medium model ( on RTX 4070 with 8GB vram) #Beep done changed and beepify_segments function not used instead now using audio_to_beep.overlay # Please change beep sound wave filepath according to your local dir in "Beeped_Audio_Path": line 254 #output audio stored in "pii_beep_audio_uploads" in local dir where this file located import gradio as gr import os import random import whisper_timestamped as whisper from pydub import AudioSegment import numpy as np import spacy import torch import threading import zipfile import shutil from pathlib import Path from werkzeug.utils import secure_filename import time from gradio_rich_textbox import RichTextbox import re # Worker class to process the audio file and load models class Worker(threading.Thread): def __init__(self, audio_file_path, model_directory, callback): threading.Thread.__init__(self) self._AudiofileName = audio_file_path self._ModelDirectory = model_directory self._BeepAudiofileName = "beep2.wav" self.callback = callback self._PII_text_and_Timestamp ="" self._Transcribe_Text_With_Entities ="" self._Metrics ="" self._BeepedAudiofileName ="" print(f"Audio File: {self._AudiofileName}") print(f"Model Directory: {self._ModelDirectory}") print(f"Beep Audio File: {self._BeepAudiofileName}") def run(self): try: print("loading SpaCy model with custom model ",str(self._ModelDirectory)) # Load spaCy model from directory or a known model name self.nlp = spacy.load(str(self._ModelDirectory)) print("SpaCy model loaded.") # Load Whisper model devices = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print(devices) time.sleep(0.2) self.model = whisper.load_model("medium", device=devices) print("Whisper model loaded.") self.processData() self.callback("callback Processing complete!") except Exception as e: print(f"Error during processing: {str(e)}") def count_entities(self,entities): entity_counts = {} # Initialize an empty dictionary to store counts for _, entity_type in entities: # Increment the count for each entity type entity_counts[entity_type] = entity_counts.get(entity_type, 0) + 1 return entity_counts def colorize_entities(self, data, entities): # Define color mappings (you can customize these) color_map = { 'PERSON': 'blue', 'GPE': 'green', 'LOC': 'purple', 'PHONE': 'orange', 'EMAIL': 'blue', 'CAR_PLATE':'red', 'ORG':'purple', 'NRIC': 'red', 'PASSPORT_NUM':'green' } print("entities",entities) # Replace entities with colored versions for entity, entity_type in entities: #print("before update data",data) color = color_map.get(entity_type, 'blue') # Default to blue if type not found colored_entity = f'{entity} {entity_type}' data = data.replace(entity, colored_entity) #print("after update data",data) return data def processData(self): # Transcribe audio and extract entities try: # Load audio audio = whisper.load_audio(self._AudiofileName) output = whisper.transcribe(self.model, audio, beam_size=5, best_of=5, temperature=(0.0, 0.2, 0.4, 0.6, 0.8, 1.0),vad=True, language="en", remove_punctuation_from_words=True,refine_whisper_precision=0.6,min_word_duration=0.01) #output = whisper.transcribe(self.model, audio, language="en", task='transcribe', temperature=(0.0, 0.2, 0.4, 0.6, 0.8, 1.0), best_of=5, beam_size=5)"" transcription_text = output['text'] transcription_text = re.sub(r"\.(?!\S)", " ", transcription_text) print("~~~~~~~~~~~~~~~~") print(transcription_text) #append text self._PII_text_and_Timestamp += (transcription_text)+"\n" # Run NER with spaCy doc = self.nlp(transcription_text) entities = [(ent.text, ent.label_) for ent in doc.ents] uniqueentities = list(set(entities)) entity_counts = self.count_entities(entities) for entity_type, count in entity_counts.items(): #append to metrics self._Metrics += (entity_type+ " : "+ str(count))+"\n" transcribeWithEntities = self.colorize_entities(transcription_text, uniqueentities) #append to transcribeWithEntities self._Transcribe_Text_With_Entities = transcribeWithEntities print(f"Transcription: {transcription_text}") print(f"Entities: {entities}") # Beepify audio segments containing PII entities audio_to_beep = AudioSegment.from_file(self._AudiofileName) # Process the audio file to beepify words (remaining unchanged) # Extract segments to be beeped self.segments_to_beep = [] pii_Text_TimeStamp = [] for ent in doc.ents: self.segments_to_beep.append((ent.start_char, ent.end_char)) pii_Text_TimeStamp.append((ent.text,ent.start_char*200,ent.end_char*200)) print("=======") print("ent.text",ent.text) print("ent.start",ent.start_char) print("ent.end",ent.end_char) print(pii_Text_TimeStamp) for ent in pii_Text_TimeStamp: self._PII_text_and_Timestamp += ("Timestamp: "+str(ent[1]/1000)+ " --- "+str(ent[2]/1000)+" sec")+"\n" self._PII_text_and_Timestamp += ("Text: "+ent[0])+"\n" # Convert character offsets to time (assuming 1 character = 20 ms) segments_in_ms = [(start*200, end*200) for start, end in self.segments_to_beep] print("Segments:", segments_in_ms) words_to_beepify =[] # append the all text in the doc the words_to_beepify array for word in doc.ents: # words_to_beepify.append(word.text) words_to_beepify.append(word.text.replace('.', '')) print(words_to_beepify) # New list to store individual words individual_words_to_beepify = [] # Split each phrase into individual words and append to the new list for phrase in words_to_beepify: individual_words_to_beepify.extend(phrase.split()) # Remove duplicates by converting the list to a set and then back to a list #individual_words_to_beepify = list(set(individual_words_to_beepify)) individual_words_to_beepify = list(dict.fromkeys(individual_words_to_beepify)) print(individual_words_to_beepify) # Load the beep sound beep_sound = AudioSegment.from_file(self._BeepAudiofileName) # Iterate over the words array in segment array of the output for segment in output["segments"]: for word in segment["words"]: # Check if the word is in the list of words to beepify if word["text"] in individual_words_to_beepify: # Get the start and end time of the word print("*******") print(word) start_time = word["start"] end_time = word["end"] # Get the start and end indices of the word start_index = float(start_time * 1000) end_index = float(end_time * 1000 + 100) # Add 100ms buffer # Calculate the duration of the word segment word_duration = (end_index - start_index) print(word_duration) # Create a silent segment with the same duration as the word silent_segment = AudioSegment.silent(duration=word_duration) # Replace the word segment with the silent segment in the original audio audio_to_beep = audio_to_beep[:int(start_index)] + silent_segment + audio_to_beep[int(end_index):] # Get the start and end indices of the beep sound to match the word's duration beep_start_index = 0 beep_end_index = word_duration + 200 # Add 200ms #beep_end_index = word_duration # Trim the beep sound to match the word's duration beep_sound = beep_sound[beep_start_index:beep_end_index] """ if word_duration > len(beep_sound): beep_sound = beep_sound + AudioSegment.silent(duration=word_duration - len(beep_sound)) else: beep_sound = beep_sound[:word_duration] """ #Overlay the beep sound on the silent segment audio_to_beep = audio_to_beep.overlay(beep_sound, position=int(start_index)) # Save the beeped audio file random_filename = str(random.getrandbits(32)) + secure_filename(Path(self._AudiofileName).name) output_path = os.path.join("pii_beep_audio_uploads", f"new_{random_filename}") os.makedirs("pii_beep_audio_uploads", exist_ok=True) audio_to_beep.export(output_path) #audio_to_beep.export(output_path, format="wav") self._BeepedAudiofileName =output_path print(f"Beeped audio file saved at: {output_path}") self.callback({ "PII_text_and_Timestamp": self._Transcribe_Text_With_Entities, "Transcribe_Text_With_Entities": self._PII_text_and_Timestamp, "Metrics": self._Metrics, "Beeped_Audio_Path": self._BeepedAudiofileName }) except Exception as e: print(f"An error occurred during transcription: {str(e)}") # Callback function for Gradio def start_worker(audio_file_path, model_directory): result = { "PII_text_and_Timestamp": "Processing...", "Transcribe_Text_With_Entities": "Processing...", "Metrics": "Processing...", #"Beeped_Audio_Path": "/home/prema/Documents/Audio/beep2.wav" "Beeped_Audio_Path": "/content/drive/MyDrive/2024_Project/Pipeline/NER/beep2.wav" } def update_result(message): if isinstance(message, dict): result.update({ "PII_text_and_Timestamp": str(message.get("PII_text_and_Timestamp")), "Transcribe_Text_With_Entities": message.get("Transcribe_Text_With_Entities"), "Metrics": str(message.get('Metrics')), "Beeped_Audio_Path": str(message.get('Beeped_Audio_Path')) }) print("Processing complete.") if not audio_file_path or os.stat(audio_file_path).st_size == 0: return gr.update(visible=True), "Error: No input provided. Please upload a audio file" if not model_directory or os.stat(model_directory).st_size == 0: return gr.update(visible=True), "Error: No input provided. Please upload model(.zip)file" # Start worker in a separate thread worker = Worker(audio_file_path, model_directory, update_result) worker.start() # Wait for the worker to finish worker.join() #returning result to called function return result["PII_text_and_Timestamp"], result["Transcribe_Text_With_Entities"], result["Metrics"], result["Beeped_Audio_Path"] def reset(): return None, None, None, None, None def get_audio_file_path(audio): return audio def load_model(files): if files: # Assume the uploaded file is a zip file representing the directory zip_file_path = files.name # Define a directory to extract the zip extract_dir = "extracted_model" # Clean the directory if it already exists if os.path.exists(extract_dir): shutil.rmtree(extract_dir) os.makedirs(extract_dir, exist_ok=True) # Extract the zip file contents with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) # Debug output: List the contents of the extracted directory extracted_files = [] for root, dirs, files in os.walk(extract_dir): for file in files: extracted_files.append(os.path.join(root, file)) print("Extracted files:") for file in extracted_files: print(file) # Determine the base directory inside the extracted directory base_dir = None for root, dirs, files in os.walk(extract_dir): if files and 'meta.json' in files: base_dir = root break # Check if meta.json was found and construct the path if base_dir: meta_path = os.path.join(base_dir, "meta.json") if os.path.exists(meta_path): return base_dir else: directory_message = "Invalid model directory: meta.json not found" else: directory_message = "Invalid model directory: meta.json not found" else: directory_message = "No directory selected" return directory_message # Function to load and return the audio file path def load_audio(beep_audio_file_output): if beep_audio_file_output is not None: return beep_audio_file_output.name # Return the path to the uploaded file return None # Gradio UI with gr.Blocks(css=""" .centered { display: flex; justify-content: center; align-items: center; } .custom-label { font-size: 14px; font-weight: bold; text-align: left; height: 100px; border: 0px solid black; } """) as demo: gr.Markdown("# Speech De-Identification Framework ver-2.0", elem_classes="centered") with gr.Column(): with gr.Row(): audio_input = gr.Audio(label="Upload Audio File", type="filepath") audio_output = gr.Textbox(label="Audio File Path", interactive=False, visible = False) audio_input.change(fn=get_audio_file_path, inputs=audio_input, outputs=audio_output) # Model directory input (as a zip file) model_dir_input = gr.File(label="Select ML Model as zip file", file_count="single") model_output_path = gr.Textbox(label="Model Load Status", interactive=False, visible = False) model_dir_input.change(fn=load_model, inputs=model_dir_input, outputs=model_output_path) with gr.Row(): gr.Markdown("") gr.Markdown("") gr.Markdown("") gr.Markdown("") gr.Markdown("") reset_button = gr.Button("Reset") submit_button = gr.Button("Submit") gr.Markdown("### Transcribe Text and Entities:") pii_text_output = RichTextbox(show_label=False , interactive=False) gr.Markdown("### PII Text and Time Stamps:") transcribe_text_output = gr.Textbox(show_label=False , interactive=False) gr.Markdown("### Metrics:") metrics_output = gr.Textbox(show_label=False , interactive=False) with gr.Row(): # Audio component to display the audio file in the interface beep_audio_file_output = gr.File(label="Download Beeped Audio", interactive=False) # Audio player component to play the selected audio file audio_player = gr.Audio(label="Play Beeped Audio", type="filepath") # Automatically update the audio player when the file component changes beep_audio_file_output.change(load_audio, inputs=beep_audio_file_output, outputs=audio_player) # Event Handlers reset_button.click(reset, [], [audio_input, model_dir_input, pii_text_output, transcribe_text_output, metrics_output]) submit_button.click(start_worker, [audio_output, model_output_path], [pii_text_output, transcribe_text_output, metrics_output,beep_audio_file_output]) demo.launch(inbrowser=True, show_error=True,share = True)