| #!/usr/bin/env python3 | |
| # import csv | |
| import typing | |
| import wave | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| import gradio as gr | |
| # from typing import Tuple, Dict | |
| # import pandas as pd | |
| import socket | |
| # import yaml | |
| # from os.path import exists | |
| # def read_config(file) -> Dict: | |
| # with open(file, 'r') as f: | |
| # return yaml.safe_load(f) | |
| # def write_config(file, config_data): | |
| # with open(file, 'w') as yaml_file: | |
| # yaml.dump(config_data, yaml_file, default_flow_style=False) | |
| # def load_data(filename: str) -> pd.DataFrame: | |
| # text_df = pd.read_csv(filename, header=None, names=['wav', 'text'], sep='|', on_bad_lines='skip') | |
| # print(text_df.head(4)) | |
| # return text_df | |
| # def validate_index(index: int) -> int: | |
| # max_index = Globals['max_index'] | |
| # if index > max_index: | |
| # index = max_index | |
| # if index < 0: | |
| # index = 0 | |
| # return index | |
| def process_text(text: str) -> typing.List: | |
| text2 = text.lower().split('\n') | |
| # mucella_001|500|Nartıme zı pşıj-themate gore yaağ. A pşıjım zerécew ştığexer pşı Jećej. A pşıjım yıe yiĺ šıfxem lıyew arixırer afemış'ejew ḣuğe.|ady-lt | |
| index = 0 | |
| text3 = [] | |
| for line in text2: | |
| line = line.strip() | |
| if line == '': | |
| print('-D- Skipping an empty line') | |
| continue | |
| print(f'-D- Processing line: {line}') | |
| line = f'mucella_{index:04d}|500|{line}|ady-lt' | |
| index = index + 1 | |
| text3.append(line) | |
| print(f'-D- process_text() text\n-D- before:\n{text}\n-D- after:\n{text3}') | |
| return text3 | |
| # def get_item_data(index: int) -> Tuple[int, str, str, str]: | |
| # index = validate_index(index) | |
| # df = Globals['text_df'] | |
| # text = df.at[index, 'text'] | |
| # if not isinstance(text, str): | |
| # text = '' | |
| # # Limit text to 'max_characters' | |
| # # if len(text) > Globals['max_characters']: | |
| # # text = text[0:Globals['max_characters']] | |
| # audio_tag = df.at[index, 'wav'] | |
| # audio_file = None | |
| # if isinstance(audio_tag, str): | |
| # audio_file = Globals['audio_dir'] + '/' + audio_tag + '.wav' | |
| # save_index(index) | |
| # return index, text, audio_tag, audio_file | |
| # Globals = { | |
| # 'input_csv': '', | |
| # 'output_csv': '', | |
| # 'audio_dir': '', | |
| # 'session_config_file': '', | |
| # 'text_df': pd.DataFrame(), | |
| # 'max_index': 0 | |
| # } | |
| # config_file = '/home/haroon/PycharmProjects/gradio_creating_web_apis/mucella_metadata.cfg' | |
| # python eval.py --model-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella --log-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella/synthesize/kamzegur1 --data-dir=/home/haroon/git_repos/few-shot-transformer-tts/Samples/ --eval_meta=/home/haroon/git_repos/few-shot-transformer-tts/Samples/kamzegur1.txt --start_step=2580000 --no_wait=True | |
| Globals = { | |
| 'python_interpreter': '/home/haroon/python_virtual_envs/few_shot_tts/bin/python3', | |
| 'code_repository_dir': '/home/haroon/git_repos/few-shot-transformer-tts', | |
| 'model_dir': '/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella', | |
| 'start_step': 2580000, | |
| 'log_dir': '/tmp/few-shot-transformer-tts-server/log', | |
| 'data_dir': '/tmp/few-shot-transformer-tts-server/text', | |
| 'default_text': ' Maḣe keume sépĺı! \n \n Harun Şewgen \n', | |
| 'outfile': '' | |
| } | |
| # def incr_index(current_index: int) -> int: | |
| # current_index = current_index + 1 | |
| # current_index = validate_index(current_index) | |
| # return current_index | |
| # def decr_index(current_index: int) -> int: | |
| # current_index = current_index - 1 | |
| # current_index = validate_index(current_index) | |
| # return current_index | |
| def concat_wavs(processed_text: str) -> str: | |
| temp_wav_name = f'adiga_{next(tempfile._get_candidate_names())}' | |
| # outfile = f"{Globals['log_dir']}/output.wav" | |
| outfile = f"{Globals['log_dir']}/{temp_wav_name}.wav" | |
| print(f'-D- Concatenating to a single wav file: {outfile}') | |
| # /tmp/few-shot-transformer-tts-server/log/eval_2580000/mucella_0000.wav | |
| data = [] | |
| for line in processed_text: | |
| print(f'-D- concat_wavs() line before split: {line}') | |
| wav_file, _, _, _ = line.split('|') | |
| wav_file = f"{Globals['log_dir']}/eval_{Globals['start_step']}/{wav_file}.wav" | |
| print(f'-D- wav_file: {wav_file}') | |
| w = wave.open(wav_file, 'rb') | |
| data.append([w.getparams(), w.readframes(w.getnframes())]) | |
| w.close() | |
| output = wave.open(outfile, 'wb') | |
| output.setparams(data[0][0]) | |
| for i in range(len(data)): | |
| output.writeframes(data[i][1]) | |
| output.close() | |
| return outfile | |
| def speak(text: str) -> str: | |
| Globals['outfile'] = '' | |
| print('-I- Generating speech ...') | |
| print(f'-D- speak() text: {text}') | |
| processed_text = process_text(text) | |
| # Save text to a temporary file | |
| text_file = f"{Globals['data_dir']}/text.txt" | |
| with open(text_file, 'w') as f: | |
| for line in processed_text: | |
| f.write(f'{line}\n') | |
| f.close() | |
| # Prepare speech synthesis command: | |
| # python eval.py --model-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella --log-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella/synthesize/kamzegur1 --data-dir=/home/haroon/git_repos/few-shot-transformer-tts/Samples/ --eval_meta=/home/haroon/git_repos/few-shot-transformer-tts/Samples/kamzegur1.txt --start_step=2580000 --no_wait=True | |
| cmd = f"{Globals['python_interpreter']} {Globals['code_repository_dir']}/eval.py --model-dir={Globals['model_dir']} --log-dir={Globals['log_dir']} --data-dir={Globals['data_dir']} --eval_meta={text_file} --start_step={Globals['start_step']} --no_wait=True".split() | |
| print(f'-D- Speech synthesis command:\n{cmd}') | |
| subprocess.run(cmd) | |
| print(f'-D- Finished synthesizing speech.') | |
| outfile = concat_wavs(processed_text) | |
| Globals['outfile'] = outfile | |
| return outfile, outfile | |
| def download() -> str: | |
| outfile = Globals['outfile'] | |
| if outfile != '' and Path(outfile).is_file(): | |
| print(f'-I- Downloading {outfile}') | |
| return outfile | |
| return None | |
| # def handle_text_editing(index: int, text: str) -> dict: | |
| # index = int(index) | |
| # index = validate_index(index) | |
| # new_text = text | |
| # if "\n" in new_text: | |
| # if index == Globals['max_index']: | |
| # print("-W- Can't split text since there are no audio tags left. Please add more audio tags.") | |
| # return gr.update(value=new_text) | |
| # [new_text1, new_text2] = new_text.split("\n", 1) | |
| # orig_text = Globals['text_df'].iat[index, 1] | |
| # orig_text2_index = orig_text.find(new_text2) | |
| # if orig_text2_index == -1: | |
| # # If text after the new line has been modified then just take the whole original text of this item to the | |
| # # next item (not easy way to figure out how to cut the original text). | |
| # new_text2 = orig_text | |
| # else: | |
| # new_text2 = orig_text[orig_text2_index:] | |
| # next_item_orig_text = Globals['text_df'].iat[index + 1, 1] | |
| # if not isinstance(next_item_orig_text, str): | |
| # next_item_orig_text = '' | |
| # new_text2 = new_text2 + ' ' + next_item_orig_text | |
| # save_text(index, new_text1) | |
| # save_text(index + 1, new_text2) | |
| # return gr.update(value=new_text1) | |
| # save_text(index, new_text) | |
| # return gr.update(value=new_text) | |
| # def update_output_file(output_file: str): | |
| # Globals['output_csv'] = output_file | |
| # def save_text(index: int, text: str): | |
| # index = int(index) | |
| # index = validate_index(index) | |
| # Globals['text_df'].iat[index, 1] = text | |
| # Globals['text_df'].to_csv(Globals['output_csv'], index=False, sep='|', header=False, quoting=csv.QUOTE_NONE) | |
| # def save_index(index: int): | |
| # index = validate_index(index) | |
| # session_config_data = {'current_index': int(index)} | |
| # write_config(Globals['session_config_file'], session_config_data) | |
| def main(): | |
| global Globals | |
| # # Read main config file | |
| # config_data = {} | |
| # if exists(config_file): | |
| # config_data = read_config(config_file) | |
| # if config_data is not None: | |
| # Globals.update(config_data) | |
| # # Read config file which was written by an earlier session of the app | |
| # Globals['session_config_file'] = config_file + '.session' | |
| # session_config_data = None | |
| # if exists(Globals['session_config_file']): | |
| # session_config_data = read_config(Globals['session_config_file']) | |
| # if session_config_data is not None: | |
| # Globals.update(session_config_data) | |
| # Globals['output_csv'] = Globals['input_csv'] + '.new' | |
| # Globals['text_df'] = load_data(Globals['input_csv']) | |
| # Globals['max_index'] = Globals['text_df'].shape[0] - 1 | |
| # default_index = 0 | |
| # if 'current_index' in Globals: | |
| # default_index = Globals['current_index'] | |
| # default_index = validate_index(default_index) | |
| # _, default_text, default_audio_tag, default_audio_file = get_item_data(default_index) | |
| # print(f'-D- default_text: {default_text}, default_wav: {default_audio_file}') | |
| # Close port(s) in case it's still open from previous session | |
| gr.close_all() | |
| with gr.Blocks() as app: | |
| # output_file_elem = gr.Text(label='Output File', value=Globals['output_csv'], interactive=True, max_lines=1) | |
| # index_elem = gr.Number(label='Index', value=default_index) | |
| text_elem = gr.Text(show_label=False, value=Globals['default_text'], interactive=True, max_lines=5) | |
| # audio_tag_elem = gr.Text(label='Audio Tag', value=default_audio_tag, interactive=False) | |
| # audio_file_elem = gr.Audio(show_label=False, value='') | |
| speak_btn = gr.Button('Speak') | |
| # with gr.Row(): | |
| # speak_btn = gr.Button('Speak') | |
| # download_btn = gr.Button("Download") | |
| # audio_file_elem = gr.Audio(show_label=False) | |
| # file_elem = gr.File(visible=True) | |
| # file_elem.change(fn=download, inputs=[download_btn], outputs=[]) | |
| # index_elem.change(fn=get_item_data, inputs=[index_elem], outputs=[index_elem, text_elem, audio_tag_elem, audio_file_elem]) | |
| # prev_btn.click(fn=decr_index, inputs=[index_elem], outputs=[index_elem]) | |
| # next_btn.click(fn=incr_index, inputs=[index_elem], outputs=[index_elem]) | |
| speak_btn.click(fn=speak, inputs=[text_elem], outputs=[gr.Audio(show_label=False), gr.File()]) | |
| # download_btn.click(fn=download, inputs=[], outputs=[gr.File()]) | |
| # text_elem.change(fn=handle_text_editing, inputs=[index_elem, text_elem], outputs=[text_elem]) | |
| # output_file_elem.change(fn=update_output_file, inputs=[output_file_elem], outputs=[]) | |
| hostname = (([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2]if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0] | |
| print(f'-D- Hostname: {hostname}') | |
| # app.launch(server_name=hostname, server_port=6012, share=False) | |
| app.launch(server_name=hostname, share=True) | |
| exit(0) | |
| if __name__ == '__main__': | |
| main() | |