|
|
| |
| import numpy as np |
| import soundfile |
| from Utils.text_utils import split_into_sentences |
| import msinference |
| import re |
| import srt |
| import subprocess |
| import cv2 |
| from pathlib import Path |
| from types import SimpleNamespace |
| from flask import Flask, request, send_from_directory |
| from moviepy.video.io.VideoFileClip import VideoFileClip |
| from moviepy.video.VideoClip import ImageClip |
| from audiocraft.builders import AudioGen |
|
|
| CACHE_DIR = 'flask_cache/' |
| PIECE_OF_SOUND_DURATION = 4.74 |
| sound_generator = AudioGen( |
| duration=PIECE_OF_SOUND_DURATION |
| ).to('cuda:0').eval() |
|
|
| Path(CACHE_DIR).mkdir(parents=True, exist_ok=True) |
|
|
| def _shorten(filename): |
| return filename.replace("/","")[-6:] |
|
|
| def _resize(image, width=None, height=None, inter=cv2.INTER_AREA): |
| '''https://github.com/PyImageSearch/imutils/blob/master/imutils/convenience.py''' |
| |
| |
| dim = None |
| (h, w) = image.shape[:2] |
|
|
| |
| |
| if width is None and height is None: |
| return image |
|
|
| |
| if width is None: |
| |
| |
| r = height / float(h) |
| dim = (int(w * r), height) |
|
|
| |
| else: |
| |
| |
| r = width / float(w) |
| dim = (width, int(h * r)) |
|
|
| |
| resized = cv2.resize(image, dim, interpolation=inter) |
|
|
| |
| return resized |
|
|
| def overlay(x, soundscape=None): |
| |
| if soundscape is not None: |
| |
| background = sound_generator.generate(soundscape, |
| n_repeat=int(len(x) / (PIECE_OF_SOUND_DURATION * 16000)) + 1 |
| ).detach().cpu().numpy() |
| |
| |
| |
| x = .5 * x + .5 * background[:len(x)] |
| |
| else: |
| |
| print('sound_background = None') |
| |
| return x |
|
|
|
|
| def tts_multi_sentence(precomputed_style_vector=None, |
| text=None, |
| voice=None, |
| soundscape=None, |
| speed=None): |
| '''create 24kHZ np.array with tts |
| |
| precomputed_style_vector : required if en_US or en_UK in voice, so |
| to perform affective TTS. |
| text : string |
| voice : string or None (falls to styleTTS) |
| soundscape : 'A castle in far away lands' -> if passed will generate background sound soundscape |
| ''' |
| |
| |
| |
| |
| if precomputed_style_vector is not None: |
| x = [] |
| for _sentence in text: |
| |
| |
| |
| _sentence = _sentence.lower() |
| if 'vctk_low#p326' in voice: |
| |
| _sentence = _sentence.replace('abstract', 'ahbstract') |
| x.append(msinference.inference(_sentence, |
| precomputed_style_vector) |
| ) |
| x = np.concatenate(x) |
| |
| |
| |
| else: |
| |
| |
| x = msinference.foreign(text=text, |
| lang=voice, |
| speed=speed) |
| |
| |
| |
| |
| x /= np.abs(x).max() + 1e-7 |
| |
| return overlay(x, soundscape=soundscape) |
| |
|
|
|
|
|
|
| |
| |
| |
|
|
| app = Flask(__name__) |
|
|
| @app.route("/", methods=['GET', 'POST', 'PUT']) |
| def serve_wav(): |
| |
| |
| r = request.form.to_dict(flat=False) |
| |
| |
| |
| for filename, obj in request.files.items(): |
| obj.save(f'{CACHE_DIR}{_shorten(filename)}') |
| |
| print('Saved all files on Server Side\n\n') |
|
|
| args = SimpleNamespace( |
| text = None if r.get('text') is None else CACHE_DIR + _shorten(r.get('text' )[0]), |
| video = None if r.get('video') is None else CACHE_DIR + _shorten(r.get('video')[0]), |
| image = None if r.get('image') is None else CACHE_DIR + _shorten(r.get('image')[0]), |
| native = None if r.get('native') is None else CACHE_DIR + _shorten(r.get('native')[0]), |
| affective = r.get('affective')[0], |
| voice = r.get('voice')[0], |
| speed = float(r.get('speed')[0]), |
| soundscape=r.get('soundscape')[0] if r.get('soundscape') is not None else None, |
| ) |
| |
| |
|
|
| print(args, 'ENTER Script') |
| do_video_dub = True if args.text.endswith('.srt') else False |
|
|
| SILENT_VIDEO = '_silent_video.mp4' |
| AUDIO_TRACK = '_audio_track.wav' |
|
|
| if do_video_dub: |
| print('==\nFound .srt : {args.txt}, thus Video should be given as well\n\n') |
| with open(args.text, "r") as f: |
| s = f.read() |
| text = [[j.content, j.start.total_seconds(), j.end.total_seconds()] for j in srt.parse(s)] |
| assert args.video is not None |
| native_audio_file = '_tmp.wav' |
| subprocess.call( |
| ["ffmpeg", |
| "-y", |
| "-i", |
| args.video, |
| "-f", |
| "mp3", |
| "-ar", |
| "24000", |
| "-vn", |
| native_audio_file]) |
| x_native, _ = soundfile.read(native_audio_file) |
| x_native = x_native[:, 0] |
| |
| else: |
| with open(args.text, 'r') as f: |
| t = ''.join(f) |
| t = re.sub(' +', ' ', t) |
| |
| text = split_into_sentences(t) |
| |
| |
|
|
| precomputed_style_vector = None |
| |
| if args.native: |
| try: |
| precomputed_style_vector = msinference.compute_style(args.native) |
| except soundfile.LibsndfileError: |
| print('\n Could not voice clone audio:', args.native, 'fallback to video or Internal TTS voice.\n') |
| if do_video_dub: |
| native_audio_file = args.video.replace('.', '').replace('/', '') |
| native_audio_file += '__native_audio_track.wav' |
| soundfile.write('tgt_spk.wav', |
| np.concatenate([ |
| x_native[:int(4 * 24000)]], 0).astype(np.float32), 24000) |
| precomputed_style_vector = msinference.compute_style('tgt_spk.wav') |
|
|
| |
| |
| |
|
|
| if precomputed_style_vector is None: |
| |
| if 'en_US' in args.voice or 'en_UK' in args.voice: |
| _dir = '/' if args.affective else '_v2/' |
| precomputed_style_vector = msinference.compute_style( |
| 'assets/wavs/style_vector' + _dir + args.voice.replace( |
| '/', '_').replace( |
| '#', '_').replace( |
| 'cmu-arctic', 'cmu_arctic').replace( |
| '_low', '') + '.wav') |
| |
| |
| |
| elif '_' in args.voice: |
| precomputed_style_vector = msinference.compute_style('assets/wavs/mimic3_foreign_4x/' + args.voice.replace( |
| '/', '_').replace('#', '_').replace( |
| 'cmu-arctic', 'cmu_arctic').replace( |
| '_low', '') + '.wav') |
| |
| |
| |
| else: |
| print(f'\n\n\n\n\n FallBack to MMS TTS due to: {args.voice=}') |
| |
| |
| |
| |
|
|
| if args.video is not None: |
| |
| frame_tts = np.zeros((104, 1920, 3), dtype=np.uint8) |
| font = cv2.FONT_HERSHEY_SIMPLEX |
| bottomLeftCornerOfText = (240, 74) |
| fontScale = 2 |
| fontColor = (255, 255, 255) |
| thickness = 4 |
| lineType = 2 |
| cv2.putText(frame_tts, 'TTS', |
| bottomLeftCornerOfText, |
| font, |
| fontScale, |
| fontColor, |
| thickness, |
| lineType) |
| |
| |
| frame_orig = np.zeros((104, 1920, 3), dtype=np.uint8) |
| font = cv2.FONT_HERSHEY_SIMPLEX |
| bottomLeftCornerOfText = (101, 74) |
| fontScale = 2 |
| fontColor = (255, 255, 255) |
| thickness = 4 |
| lineType = 1000 |
| cv2.putText(frame_orig, 'ORIGINAL VOICE', |
| bottomLeftCornerOfText, |
| font, |
| fontScale, |
| fontColor, |
| thickness, |
| lineType) |
| |
| print(f'\n______________________________\n' |
| f'Gen Banners for TTS/Native Title {frame_tts.shape=} {frame_orig.shape=}' |
| f'\n______________________________\n') |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| video_file = args.video |
| vf = VideoFileClip(video_file) |
| |
| |
| h, w, _ = vf.get_frame(0).shape |
| frame_tts = _resize(frame_tts, width=w) |
| frame_orig = _resize(frame_orig, width=w) |
| h, w, _ = frame_orig.shape |
| |
| try: |
| |
| |
| num = x_native.shape[0] |
| is_tts = .5 + .5 * np.tanh(4*(np.linspace(-10, 10, num) + 9.4)) |
| |
| def inpaint_banner(get_frame, t): |
| '''blend banner - (now plays) tts or native voic |
| ''' |
| |
| im = np.copy(get_frame(t)) |
| |
|
|
| ix = int(t * 24000) |
|
|
| if is_tts[ix] > .5: |
| frame = frame_tts |
| |
| |
| else: |
| frame = frame_orig |
| |
| |
| |
| |
|
|
| offset_h = 24 |
| |
| |
| print(f' > inpaint_banner() HAS NATIVE: {frame.shape=} {im.shape=}\n\n\n\n') |
| |
| |
| |
| im[offset_h:h + offset_h, :w, :] = (.4 * im[offset_h:h + offset_h, :w, :] |
| + .6 * frame).astype(np.uint8) |
| |
| |
| |
| return im |
| |
| except UnboundLocalError: |
| |
| def inpaint_banner(get_frame, t): |
| |
| im = np.copy(get_frame(t)) |
| |
| h, w, _ = frame_tts.shape |
| if w != im.shape[1]: |
| local_frame = _resize(frame_tts, width=im.shape[1]) |
| offset_h = 24 |
| im[offset_h:h + offset_h, :w, :] = (.4 * im[offset_h:h+offset_h, :w, :] |
| + .6 * local_frame).astype(np.uint8) |
| return im |
| vf = vf.fl(inpaint_banner) |
| vf.write_videofile(SILENT_VIDEO) |
|
|
| |
|
|
| if do_video_dub: |
| OUT_FILE = 'tmp.mp4' |
| subtitles = text |
| MAX_LEN = int(subtitles[-1][2] + 17) * 24000 |
| |
| print("TOTAL LEN SAMPLES ", MAX_LEN, '\n====================') |
| pieces = [] |
| for k, (_text_, orig_start, orig_end) in enumerate(subtitles): |
|
|
| |
|
|
|
|
| pieces.append(tts_multi_sentence(text=[_text_], |
| precomputed_style_vector=precomputed_style_vector, |
| voice=args.voice, |
| soundscape=args.soundscape, |
| speed=args.speed) |
| ) |
| total = np.concatenate(pieces, 0) |
| |
| |
| if len(x_native) > len(total): |
| total = np.pad(total, (0, max(0, x_native.shape[0] - total.shape[0]))) |
|
|
| else: |
| x_native = np.pad(x_native, (0, max(0, total.shape[0] - x_native.shape[0]))) |
| |
| soundfile.write(AUDIO_TRACK, |
| |
| (.64 * total + .27 * x_native)[:, None], |
| 24000) |
| else: |
| OUT_FILE = 'tmp.mp4' |
| x = tts_multi_sentence(text=text, |
| precomputed_style_vector=precomputed_style_vector, |
| voice=args.voice, |
| soundscape=args.soundscape, |
| speed=args.speed) |
| soundfile.write(AUDIO_TRACK, x, 24000) |
|
|
| |
|
|
| if args.image is not None: |
|
|
| STATIC_FRAME = args.image |
| OUT_FILE = 'tmp.mp4' |
|
|
| |
|
|
| clip_silent = ImageClip(img=STATIC_FRAME, |
| duration=5) |
| clip_silent.write_videofile(SILENT_VIDEO, fps=24) |
|
|
| x = tts_multi_sentence(text=text, |
| precomputed_style_vector=precomputed_style_vector, |
| voice=args.voice, |
| soundscape=args.soundscape, |
| speed=args.speed |
| ) |
| soundfile.write(AUDIO_TRACK, x, 24000) |
| if args.video or args.image: |
| |
| subprocess.call( |
| ["ffmpeg", |
| "-y", |
| "-i", |
| SILENT_VIDEO, |
| "-i", |
| AUDIO_TRACK, |
| "-c:v", |
| "copy", |
| "-map", |
| "0:v:0", |
| "-map", |
| " 1:a:0", |
| CACHE_DIR + OUT_FILE]) |
|
|
| print(f'\noutput video is saved as {OUT_FILE}') |
| |
| else: |
| |
| |
| x = tts_multi_sentence(text=text, |
| precomputed_style_vector=precomputed_style_vector, |
| voice=args.voice, |
| soundscape=args.soundscape, |
| speed=args.speed) |
| OUT_FILE = 'tmp.wav' |
| soundfile.write(CACHE_DIR + OUT_FILE, x, 24000) |
|
|
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| print(f'\n=SERVER saved as {OUT_FILE=}\n') |
| response = send_from_directory(CACHE_DIR, path=OUT_FILE) |
| response.headers['suffix-file-type'] = OUT_FILE |
| print('________________\n ? \n_______________') |
| return response |
|
|
|
|
| if __name__ == "__main__": |
| app.run(host="0.0.0.0") |
|
|