Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # | |
| # Copyright 2022-2024 Xiaomi Corp. (authors: Fangjun Kuang) | |
| # | |
| # See LICENSE for clarification regarding multiple authors | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # References: | |
| # https://gradio.app/docs/#dropdown | |
| import logging | |
| import os | |
| import tempfile | |
| import time | |
| import urllib.request | |
| from datetime import datetime | |
| from examples import examples | |
| import gradio as gr | |
| import soundfile as sf | |
| from model import decode, get_pretrained_model, models | |
| def convert_to_wav(in_filename: str) -> str: | |
| """Convert the input audio file to a wave file""" | |
| out_filename = in_filename + ".wav" | |
| logging.info(f"Converting '{in_filename}' to '{out_filename}'") | |
| _ = os.system( | |
| f"ffmpeg -hide_banner -i \"{in_filename}\" -ar 16000 -ac 1 \"{out_filename}\" -y" | |
| ) | |
| return out_filename | |
| def build_html_output(s: str, style: str = "result_item_success"): | |
| return f""" | |
| <div class='result'> | |
| <div class='result_item {style}'> | |
| {s} | |
| </div> | |
| </div> | |
| """ | |
| def submit_fn( | |
| repo_id: str, | |
| uploaded_file_path: str, | |
| microphone_audio_path: str, | |
| url_text: str, | |
| ): | |
| in_filename = None | |
| source_description = "" | |
| if uploaded_file_path: | |
| in_filename = uploaded_file_path | |
| source_description = "uploaded file" | |
| logging.info(f"Processing {source_description}: {in_filename}") | |
| if not os.path.exists(in_filename): | |
| return "", [], build_html_output( | |
| f"Uploaded file not found: {in_filename}, " | |
| "Please first upload a file and then click " | |
| 'the button "submit for recognition"', | |
| "result_item_error", | |
| ) | |
| elif microphone_audio_path: | |
| in_filename = microphone_audio_path | |
| source_description = "microphone recording" | |
| logging.info(f"Processing {source_description}: {in_filename}") | |
| if not os.path.exists(in_filename): | |
| return "", [], build_html_output( | |
| f"Microphone recording not found: {in_filename}, " | |
| "Please first click 'Record from microphone', speak, " | |
| "click 'Stop recording', and then " | |
| "click the button 'submit for recognition'", | |
| "result_item_error" | |
| ) | |
| elif url_text and url_text.strip(): | |
| source_description = "URL" | |
| logging.info(f"Processing {source_description}: {url_text}") | |
| # Ensure a temporary file is created that persists until process() is done with it. | |
| # Using delete=False and manual cleanup is safer here. | |
| temp_file = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".tmpaudio") as f: | |
| temp_file = f.name # Store the name for cleanup | |
| urllib.request.urlretrieve(url_text, temp_file) | |
| in_filename = temp_file | |
| # Process the downloaded file | |
| eventStr, events, html_info = process(repo_id=repo_id, in_filename=in_filename) | |
| # Clean up the temporary file after processing | |
| if temp_file and os.path.exists(temp_file): | |
| os.unlink(temp_file) | |
| return eventStr, events, html_info | |
| except Exception as e: | |
| logging.error(f"Error processing URL {url_text}: {e}", exc_info=True) | |
| # Clean up the temporary file in case of error | |
| if temp_file and os.path.exists(temp_file): | |
| os.unlink(temp_file) | |
| return "", [], build_html_output(f"Error processing URL: {str(e)}", "result_item_error") | |
| else: | |
| return "", [], build_html_output( | |
| "Please provide an audio input via upload, microphone, or URL.", | |
| "result_item_error", | |
| ) | |
| # Common processing for file-based inputs (upload, mic) | |
| if in_filename: | |
| try: | |
| return process( | |
| in_filename=in_filename, | |
| repo_id=repo_id, | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error processing {source_description} {in_filename}: {e}", exc_info=True) | |
| return "", [], build_html_output(f"Error processing {source_description}: {str(e)}", "result_item_error") | |
| # Should not be reached if logic is correct, but as a fallback: | |
| return "", [], build_html_output("No valid input provided.", "result_item_error") | |
| def process( | |
| repo_id: str, | |
| in_filename: str, | |
| ): | |
| logging.info(f"repo_id: {repo_id}") | |
| logging.info(f"in_filename: {in_filename}") | |
| filename = convert_to_wav(in_filename) | |
| now = datetime.now() | |
| date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f") | |
| logging.info(f"Started at {date_time}") | |
| start = time.time() | |
| tagger = get_pretrained_model(repo_id) | |
| events = decode(tagger, filename) | |
| eventStr = ", ".join([e.name for e in events]) | |
| events = [[e.name, f"{e.prob:.2f}"] for e in events] | |
| date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f") | |
| end = time.time() | |
| info = sf.info(filename) | |
| duration = info.duration | |
| elapsed = end - start | |
| rtf = elapsed / duration | |
| logging.info(f"Finished at {date_time} s. Elapsed: {elapsed: .3f} s") | |
| info = f""" | |
| Wave duration : {duration: .3f} s <br/> | |
| Processing time: {elapsed: .3f} s <br/> | |
| RTF: {elapsed: .3f}/{duration: .3f} = {rtf:.3f} <br/> | |
| """ | |
| if rtf > 1: | |
| info += ( | |
| "<br/>We are loading the model for the first run. " | |
| "Please run again to measure the real RTF.<br/>" | |
| ) | |
| logging.info(info) | |
| logging.info(f"\nrepo_id: {repo_id}\nDetected events: {events}") | |
| return eventStr, events, build_html_output(info) | |
| title = "# Audio tagging with [Next-gen Kaldi](https://github.com/k2-fsa) " | |
| description = """ | |
| This space shows how to do audio tagging with [Next-gen Kaldi](https://github.com/k2-fsa) | |
| It is running on a machine with 2 vCPUs with 16 GB RAM within a docker container provided by Hugging Face. | |
| See more information by visiting the following links: | |
| - <https://github.com/k2-fsa/sherpa-onnx> | |
| If you want to deploy it locally, please see | |
| <https://k2-fsa.github.io/sherpa/onnx> | |
| """ | |
| # css style is copied from | |
| # https://huggingface.co/spaces/alphacep/asr/blob/main/app.py#L113 | |
| css = """ | |
| .result {display:flex;flex-direction:column} | |
| .result_item {padding:15px;margin-bottom:8px;border-radius:15px;width:100%} | |
| .result_item_success {background-color:mediumaquamarine;color:white;align-self:start} | |
| .result_item_error {background-color:#ff7070;color:white;align-self:start} | |
| """ | |
| demo = gr.Blocks(css=css) | |
| with demo: | |
| gr.Markdown(title) | |
| model_choices = list(models.keys()) | |
| model_dropdown = gr.Dropdown( | |
| choices=model_choices, | |
| label="Select a model", | |
| value=model_choices[0], | |
| ) | |
| with gr.Tabs(): | |
| with gr.TabItem("Upload from disk"): | |
| uploaded_file = gr.Audio( | |
| sources=["upload"], # Choose between "microphone", "upload" | |
| type="filepath", | |
| label="Upload from disk", | |
| ) | |
| with gr.TabItem("Record from microphone"): | |
| microphone = gr.Audio( | |
| sources=["microphone"], # Choose between "microphone", "upload" | |
| type="filepath", | |
| label="Record from microphone", | |
| ) | |
| with gr.TabItem("From URL"): | |
| url_textbox = gr.Textbox( | |
| max_lines=1, | |
| placeholder="URL to an audio file", | |
| label="URL", | |
| interactive=True, | |
| ) | |
| submit_button = gr.Button("Submit for recognition", variant="primary") | |
| output_string = gr.Textbox(label="Event (string)", show_label=True, show_copy_button=True) | |
| output_dataframe = gr.Dataframe(headers=["Event", "Probability"], label="Detected Events") | |
| output_html_info = gr.HTML(label="Processing Info") | |
| gr.Examples( | |
| examples=examples, | |
| inputs=[ | |
| model_dropdown, | |
| uploaded_file, | |
| ], | |
| outputs=[ | |
| output_string, | |
| output_dataframe, | |
| output_html_info, | |
| ], | |
| fn=submit_fn, | |
| ) | |
| # Connect the submit button to the unified function | |
| submit_button.click( | |
| fn=submit_fn, | |
| inputs=[ | |
| model_dropdown, | |
| uploaded_file, | |
| microphone, | |
| url_textbox, | |
| ], | |
| outputs=[ | |
| output_string, | |
| output_dataframe, | |
| output_html_info, | |
| ], | |
| ) | |
| gr.Markdown(description) | |
| if __name__ == "__main__": | |
| formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s" | |
| logging.basicConfig(format=formatter, level=logging.INFO) | |
| demo.launch(inbrowser=True) | |