Spaces:
Runtime error
Runtime error
| # This is version 2 updated on 17th Sept 2024. | |
| # Uses the Whiper Medium model ( on RTX 4070 with 8GB vram) | |
| #Beep done changed and beepify_segments function not used instead now using audio_to_beep.overlay | |
| # Please change beep sound wave filepath according to your local dir in "Beeped_Audio_Path": line 254 | |
| #output audio stored in "pii_beep_audio_uploads" in local dir where this file located | |
| import gradio as gr | |
| import os | |
| import random | |
| import whisper_timestamped as whisper | |
| from pydub import AudioSegment | |
| import numpy as np | |
| import spacy | |
| import torch | |
| import threading | |
| import zipfile | |
| import shutil | |
| from pathlib import Path | |
| from werkzeug.utils import secure_filename | |
| import time | |
| from gradio_rich_textbox import RichTextbox | |
| import re | |
| # Worker class to process the audio file and load models | |
| class Worker(threading.Thread): | |
| def __init__(self, audio_file_path, model_directory, callback): | |
| threading.Thread.__init__(self) | |
| self._AudiofileName = audio_file_path | |
| self._ModelDirectory = model_directory | |
| self._BeepAudiofileName = "beep2.wav" | |
| self.callback = callback | |
| self._PII_text_and_Timestamp ="" | |
| self._Transcribe_Text_With_Entities ="" | |
| self._Metrics ="" | |
| self._BeepedAudiofileName ="" | |
| print(f"Audio File: {self._AudiofileName}") | |
| print(f"Model Directory: {self._ModelDirectory}") | |
| print(f"Beep Audio File: {self._BeepAudiofileName}") | |
| def run(self): | |
| try: | |
| print("loading SpaCy model with custom model ",str(self._ModelDirectory)) | |
| # Load spaCy model from directory or a known model name | |
| self.nlp = spacy.load(str(self._ModelDirectory)) | |
| print("SpaCy model loaded.") | |
| # Load Whisper model | |
| devices = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| print(devices) | |
| time.sleep(0.2) | |
| self.model = whisper.load_model("medium", device=devices) | |
| print("Whisper model loaded.") | |
| self.processData() | |
| self.callback("callback Processing complete!") | |
| except Exception as e: | |
| print(f"Error during processing: {str(e)}") | |
| def count_entities(self,entities): | |
| entity_counts = {} # Initialize an empty dictionary to store counts | |
| for _, entity_type in entities: | |
| # Increment the count for each entity type | |
| entity_counts[entity_type] = entity_counts.get(entity_type, 0) + 1 | |
| return entity_counts | |
| def colorize_entities(self, data, entities): | |
| # Define color mappings (you can customize these) | |
| color_map = { | |
| 'PERSON': 'blue', | |
| 'GPE': 'green', | |
| 'LOC': 'purple', | |
| 'PHONE': 'orange', | |
| 'EMAIL': 'blue', | |
| 'CAR_PLATE':'red', | |
| 'ORG':'purple', | |
| 'NRIC': 'red', | |
| 'PASSPORT_NUM':'green' | |
| } | |
| print("entities",entities) | |
| # Replace entities with colored versions | |
| for entity, entity_type in entities: | |
| #print("before update data",data) | |
| color = color_map.get(entity_type, 'blue') # Default to blue if type not found | |
| colored_entity = f'<span style="color: {color};">{entity} {entity_type}</span>' | |
| data = data.replace(entity, colored_entity) | |
| #print("after update data",data) | |
| return data | |
| def processData(self): | |
| # Transcribe audio and extract entities | |
| try: | |
| # Load audio | |
| audio = whisper.load_audio(self._AudiofileName) | |
| output = whisper.transcribe(self.model, audio, beam_size=5, best_of=5, temperature=(0.0, 0.2, 0.4, 0.6, 0.8, 1.0),vad=True, language="en", remove_punctuation_from_words=True,refine_whisper_precision=0.6,min_word_duration=0.01) | |
| #output = whisper.transcribe(self.model, audio, language="en", task='transcribe', temperature=(0.0, 0.2, 0.4, 0.6, 0.8, 1.0), best_of=5, beam_size=5)"" | |
| transcription_text = output['text'] | |
| transcription_text = re.sub(r"\.(?!\S)", " ", transcription_text) | |
| print("~~~~~~~~~~~~~~~~") | |
| print(transcription_text) | |
| #append text | |
| self._PII_text_and_Timestamp += (transcription_text)+"\n" | |
| # Run NER with spaCy | |
| doc = self.nlp(transcription_text) | |
| entities = [(ent.text, ent.label_) for ent in doc.ents] | |
| uniqueentities = list(set(entities)) | |
| entity_counts = self.count_entities(entities) | |
| for entity_type, count in entity_counts.items(): | |
| #append to metrics | |
| self._Metrics += (entity_type+ " : "+ str(count))+"\n" | |
| transcribeWithEntities = self.colorize_entities(transcription_text, uniqueentities) | |
| #append to transcribeWithEntities | |
| self._Transcribe_Text_With_Entities = transcribeWithEntities | |
| print(f"Transcription: {transcription_text}") | |
| print(f"Entities: {entities}") | |
| # Beepify audio segments containing PII entities | |
| audio_to_beep = AudioSegment.from_file(self._AudiofileName) | |
| # Process the audio file to beepify words (remaining unchanged) | |
| # Extract segments to be beeped | |
| self.segments_to_beep = [] | |
| pii_Text_TimeStamp = [] | |
| for ent in doc.ents: | |
| self.segments_to_beep.append((ent.start_char, ent.end_char)) | |
| pii_Text_TimeStamp.append((ent.text,ent.start_char*200,ent.end_char*200)) | |
| print("=======") | |
| print("ent.text",ent.text) | |
| print("ent.start",ent.start_char) | |
| print("ent.end",ent.end_char) | |
| print(pii_Text_TimeStamp) | |
| for ent in pii_Text_TimeStamp: | |
| self._PII_text_and_Timestamp += ("Timestamp: "+str(ent[1]/1000)+ " --- "+str(ent[2]/1000)+" sec")+"\n" | |
| self._PII_text_and_Timestamp += ("Text: "+ent[0])+"\n" | |
| # Convert character offsets to time (assuming 1 character = 20 ms) | |
| segments_in_ms = [(start*200, end*200) for start, end in self.segments_to_beep] | |
| print("Segments:", segments_in_ms) | |
| words_to_beepify =[] | |
| # append the all text in the doc the words_to_beepify array | |
| for word in doc.ents: | |
| # words_to_beepify.append(word.text) | |
| words_to_beepify.append(word.text.replace('.', '')) | |
| print(words_to_beepify) | |
| # New list to store individual words | |
| individual_words_to_beepify = [] | |
| # Split each phrase into individual words and append to the new list | |
| for phrase in words_to_beepify: | |
| individual_words_to_beepify.extend(phrase.split()) | |
| # Remove duplicates by converting the list to a set and then back to a list | |
| #individual_words_to_beepify = list(set(individual_words_to_beepify)) | |
| individual_words_to_beepify = list(dict.fromkeys(individual_words_to_beepify)) | |
| print(individual_words_to_beepify) | |
| # Load the beep sound | |
| beep_sound = AudioSegment.from_file(self._BeepAudiofileName) | |
| # Iterate over the words array in segment array of the output | |
| for segment in output["segments"]: | |
| for word in segment["words"]: | |
| # Check if the word is in the list of words to beepify | |
| if word["text"] in individual_words_to_beepify: | |
| # Get the start and end time of the word | |
| print("*******") | |
| print(word) | |
| start_time = word["start"] | |
| end_time = word["end"] | |
| # Get the start and end indices of the word | |
| start_index = float(start_time * 1000) | |
| end_index = float(end_time * 1000 + 100) # Add 100ms buffer | |
| # Calculate the duration of the word segment | |
| word_duration = (end_index - start_index) | |
| print(word_duration) | |
| # Create a silent segment with the same duration as the word | |
| silent_segment = AudioSegment.silent(duration=word_duration) | |
| # Replace the word segment with the silent segment in the original audio | |
| audio_to_beep = audio_to_beep[:int(start_index)] + silent_segment + audio_to_beep[int(end_index):] | |
| # Get the start and end indices of the beep sound to match the word's duration | |
| beep_start_index = 0 | |
| beep_end_index = word_duration + 200 # Add 200ms | |
| #beep_end_index = word_duration | |
| # Trim the beep sound to match the word's duration | |
| beep_sound = beep_sound[beep_start_index:beep_end_index] | |
| """ if word_duration > len(beep_sound): | |
| beep_sound = beep_sound + AudioSegment.silent(duration=word_duration - len(beep_sound)) | |
| else: | |
| beep_sound = beep_sound[:word_duration] """ | |
| #Overlay the beep sound on the silent segment | |
| audio_to_beep = audio_to_beep.overlay(beep_sound, position=int(start_index)) | |
| # Save the beeped audio file | |
| random_filename = str(random.getrandbits(32)) + secure_filename(Path(self._AudiofileName).name) | |
| output_path = os.path.join("pii_beep_audio_uploads", f"new_{random_filename}") | |
| os.makedirs("pii_beep_audio_uploads", exist_ok=True) | |
| audio_to_beep.export(output_path) | |
| #audio_to_beep.export(output_path, format="wav") | |
| self._BeepedAudiofileName =output_path | |
| print(f"Beeped audio file saved at: {output_path}") | |
| self.callback({ | |
| "PII_text_and_Timestamp": self._Transcribe_Text_With_Entities, | |
| "Transcribe_Text_With_Entities": self._PII_text_and_Timestamp, | |
| "Metrics": self._Metrics, | |
| "Beeped_Audio_Path": self._BeepedAudiofileName | |
| }) | |
| except Exception as e: | |
| print(f"An error occurred during transcription: {str(e)}") | |
| # Callback function for Gradio | |
| def start_worker(audio_file_path, model_directory): | |
| result = { | |
| "PII_text_and_Timestamp": "Processing...", | |
| "Transcribe_Text_With_Entities": "Processing...", | |
| "Metrics": "Processing...", | |
| #"Beeped_Audio_Path": "/home/prema/Documents/Audio/beep2.wav" | |
| "Beeped_Audio_Path": "/content/drive/MyDrive/2024_Project/Pipeline/NER/beep2.wav" | |
| } | |
| def update_result(message): | |
| if isinstance(message, dict): | |
| result.update({ | |
| "PII_text_and_Timestamp": str(message.get("PII_text_and_Timestamp")), | |
| "Transcribe_Text_With_Entities": message.get("Transcribe_Text_With_Entities"), | |
| "Metrics": str(message.get('Metrics')), | |
| "Beeped_Audio_Path": str(message.get('Beeped_Audio_Path')) | |
| }) | |
| print("Processing complete.") | |
| if not audio_file_path or os.stat(audio_file_path).st_size == 0: | |
| return gr.update(visible=True), "Error: No input provided. Please upload a audio file" | |
| if not model_directory or os.stat(model_directory).st_size == 0: | |
| return gr.update(visible=True), "Error: No input provided. Please upload model(.zip)file" | |
| # Start worker in a separate thread | |
| worker = Worker(audio_file_path, model_directory, update_result) | |
| worker.start() | |
| # Wait for the worker to finish | |
| worker.join() | |
| #returning result to called function | |
| return result["PII_text_and_Timestamp"], result["Transcribe_Text_With_Entities"], result["Metrics"], result["Beeped_Audio_Path"] | |
| def reset(): | |
| return None, None, None, None, None | |
| def get_audio_file_path(audio): | |
| return audio | |
| def load_model(files): | |
| if files: | |
| # Assume the uploaded file is a zip file representing the directory | |
| zip_file_path = files.name | |
| # Define a directory to extract the zip | |
| extract_dir = "extracted_model" | |
| # Clean the directory if it already exists | |
| if os.path.exists(extract_dir): | |
| shutil.rmtree(extract_dir) | |
| os.makedirs(extract_dir, exist_ok=True) | |
| # Extract the zip file contents | |
| with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| # Debug output: List the contents of the extracted directory | |
| extracted_files = [] | |
| for root, dirs, files in os.walk(extract_dir): | |
| for file in files: | |
| extracted_files.append(os.path.join(root, file)) | |
| print("Extracted files:") | |
| for file in extracted_files: | |
| print(file) | |
| # Determine the base directory inside the extracted directory | |
| base_dir = None | |
| for root, dirs, files in os.walk(extract_dir): | |
| if files and 'meta.json' in files: | |
| base_dir = root | |
| break | |
| # Check if meta.json was found and construct the path | |
| if base_dir: | |
| meta_path = os.path.join(base_dir, "meta.json") | |
| if os.path.exists(meta_path): | |
| return base_dir | |
| else: | |
| directory_message = "Invalid model directory: meta.json not found" | |
| else: | |
| directory_message = "Invalid model directory: meta.json not found" | |
| else: | |
| directory_message = "No directory selected" | |
| return directory_message | |
| # Function to load and return the audio file path | |
| def load_audio(beep_audio_file_output): | |
| if beep_audio_file_output is not None: | |
| return beep_audio_file_output.name # Return the path to the uploaded file | |
| return None | |
| # Gradio UI | |
| with gr.Blocks(css=""" | |
| .centered { | |
| display: flex; | |
| justify-content: center; | |
| align-items: center; } | |
| .custom-label { | |
| font-size: 14px; | |
| font-weight: bold; | |
| text-align: left; | |
| height: 100px; | |
| border: 0px solid black; | |
| } | |
| """) as demo: | |
| gr.Markdown("# Speech De-Identification Framework ver-2.0", elem_classes="centered") | |
| with gr.Column(): | |
| with gr.Row(): | |
| audio_input = gr.Audio(label="Upload Audio File", type="filepath") | |
| audio_output = gr.Textbox(label="Audio File Path", interactive=False, visible = False) | |
| audio_input.change(fn=get_audio_file_path, inputs=audio_input, outputs=audio_output) | |
| # Model directory input (as a zip file) | |
| model_dir_input = gr.File(label="Select ML Model as zip file", file_count="single") | |
| model_output_path = gr.Textbox(label="Model Load Status", interactive=False, visible = False) | |
| model_dir_input.change(fn=load_model, inputs=model_dir_input, outputs=model_output_path) | |
| with gr.Row(): | |
| gr.Markdown("") | |
| gr.Markdown("") | |
| gr.Markdown("") | |
| gr.Markdown("") | |
| gr.Markdown("") | |
| reset_button = gr.Button("Reset") | |
| submit_button = gr.Button("Submit") | |
| gr.Markdown("### Transcribe Text and Entities:") | |
| pii_text_output = RichTextbox(show_label=False , interactive=False) | |
| gr.Markdown("### PII Text and Time Stamps:") | |
| transcribe_text_output = gr.Textbox(show_label=False , interactive=False) | |
| gr.Markdown("### Metrics:") | |
| metrics_output = gr.Textbox(show_label=False , interactive=False) | |
| with gr.Row(): | |
| # Audio component to display the audio file in the interface | |
| beep_audio_file_output = gr.File(label="Download Beeped Audio", interactive=False) | |
| # Audio player component to play the selected audio file | |
| audio_player = gr.Audio(label="Play Beeped Audio", type="filepath") | |
| # Automatically update the audio player when the file component changes | |
| beep_audio_file_output.change(load_audio, inputs=beep_audio_file_output, outputs=audio_player) | |
| # Event Handlers | |
| reset_button.click(reset, [], [audio_input, model_dir_input, pii_text_output, transcribe_text_output, metrics_output]) | |
| submit_button.click(start_worker, [audio_output, model_output_path], [pii_text_output, transcribe_text_output, metrics_output,beep_audio_file_output]) | |
| demo.launch(inbrowser=True, show_error=True,share = True) | |