| ############################################### | |
| # PROFESSIONAL DAW-STYLE VOICE CLONING STUDIO | |
| # Features: | |
| # - Noise Reduction | |
| # - Multi-band Parametric EQ | |
| # - Reverb with Early/Late Reflections | |
| # - Compressor, Delay, Chorus | |
| # - AI-based Pitch & Timing Correction | |
| # - Dynamic Music Ducking | |
| # - Batch Voice Cloning | |
| # - Multiple Voice Models | |
| # - Gradio Interactive Mixer GUI | |
| # - FastAPI Server | |
| ############################################### | |
| import os, uuid, shutil, logging | |
| import numpy as np | |
| import librosa, soundfile as sf | |
| import noisereduce as nr | |
| from pydub import AudioSegment, effects | |
| from pydub.generators import Sine | |
| from scipy.signal import butter, lfilter | |
| import torch | |
| from fastapi import FastAPI, UploadFile, Form | |
| from fastapi.responses import FileResponse | |
| import uvicorn | |
| import gradio as gr | |
| from TTS.api import TTS | |
| # Optional speaker verification for consent | |
| try: | |
| from speechbrain.pretrained import SpeakerRecognition | |
| speaker_verifier = SpeakerRecognition.from_hparams(source="speechbrain/spkrec-ecapa-voxceleb") | |
| CONSENT_VERIFICATION = True | |
| except Exception: | |
| CONSENT_VERIFICATION = False | |
| logging.warning("Speaker verification unavailable.") | |
| # Logging | |
| logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') | |
| TEMP_DIR = "temp" | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| ########################################### | |
| # 1. CONSENT VERIFICATION | |
| ########################################### | |
| def verify_consent(voice_sample, consent_phrase="I consent to voice cloning"): | |
| print(f"Please say the following phrase: '{consent_phrase}'") | |
| if CONSENT_VERIFICATION: | |
| try: | |
| score, _ = speaker_verifier.verify_files(voice_sample, voice_sample) | |
| logging.info(f"Speaker verification score: {score:.2f}") | |
| if score < 0.7: | |
| raise PermissionError("Consent phrase does not match voice sample.") | |
| except Exception as e: | |
| logging.warning(f"Speaker verification failed: {e}") | |
| consent = input("Do you confirm this sample is used with your consent? (y/n): ") | |
| if consent.lower() != 'y': | |
| raise PermissionError("Consent not granted.") | |
| logging.info("Consent verified.") | |
| ########################################### | |
| # 2. NOISE REDUCTION | |
| ########################################### | |
| def clean_audio(input_file, output_file=None): | |
| if not output_file: | |
| output_file = os.path.join(TEMP_DIR, f"clean_{uuid.uuid4()}.wav") | |
| audio, sr = librosa.load(input_file, sr=None) | |
| reduced = nr.reduce_noise(y=audio, sr=sr) | |
| sf.write(output_file, reduced, sr) | |
| logging.info(f"Cleaned audio saved to {output_file}") | |
| return output_file | |
| ########################################### | |
| # 3. PARAMETRIC EQ (Multi-Band) | |
| ########################################### | |
| def parametric_eq(samples, sr, bands): | |
| """ | |
| bands = list of tuples: (center_freq, Q, gain_dB) | |
| """ | |
| def apply_band(samples, center, Q, gain_db): | |
| nyq = sr / 2 | |
| low = center / np.sqrt(2) / nyq | |
| high = center * np.sqrt(2) / nyq | |
| b, a = butter(2, [low, high], btype='band') | |
| filtered = lfilter(b, a, samples) | |
| gain = 10 ** (gain_db / 20) | |
| return filtered * gain | |
| out = np.zeros_like(samples) | |
| for (f, Q, g) in bands: | |
| out += apply_band(samples, f, Q, g) | |
| return out | |
| ########################################### | |
| # 4. REVERB (Early/Late Reflections) | |
| ########################################### | |
| def add_reverb(samples, sr, early=0.1, late=0.3, decay=0.5): | |
| early_samples = int(early * sr) | |
| late_samples = int(late * sr) | |
| reverbed = np.copy(samples) | |
| if early_samples > 0: | |
| reverbed[early_samples:] += decay * samples[:-early_samples] | |
| if late_samples > 0: | |
| reverbed[late_samples:] += (decay/2) * samples[:-late_samples] | |
| return reverbed | |
| ########################################### | |
| # 5. DYNAMIC DUCKING | |
| ########################################### | |
| def dynamic_ducking(voice, music, threshold_db=-35, reduction_db=-12): | |
| voice_rms = voice.rms | |
| if 20 * np.log10(voice_rms+1e-6) > threshold_db: | |
| music = music - abs(reduction_db) | |
| combined = music.overlay(voice) | |
| return combined | |
| ########################################### | |
| # 6. AI PITCH & TIMING CORRECTION | |
| ########################################### | |
| def pitch_and_timing_correction(samples, sr, pitch_steps=0, target_tempo=1.0): | |
| if pitch_steps != 0: | |
| samples = librosa.effects.pitch_shift(samples, sr, n_steps=pitch_steps) | |
| if target_tempo != 1.0: | |
| samples = librosa.effects.time_stretch(samples, rate=target_tempo) | |
| return samples | |
| ########################################### | |
| # 7. APPLY EFFECTS CHAIN | |
| ########################################### | |
| def apply_effects_chain(audio_file, sr=16000, eq_bands=[(100,1,0),(1000,1,0),(5000,1,0)], | |
| pitch_steps=0, target_tempo=1.0, | |
| reverb_early=0.05, reverb_late=0.3, reverb_decay=0.5, | |
| compressor=True, delay_ms=0, chorus=False): | |
| audio_seg = AudioSegment.from_file(audio_file) | |
| samples = np.array(audio_seg.get_array_of_samples()).astype(np.float32) | |
| # EQ | |
| samples = parametric_eq(samples, sr, eq_bands) | |
| # Compressor | |
| if compressor: | |
| max_amp = np.max(np.abs(samples)) | |
| if max_amp > 0.9 * np.iinfo(samples.dtype).max: | |
| samples = samples * (0.9 * np.iinfo(samples.dtype).max / max_amp) | |
| # Pitch & Timing | |
| samples = pitch_and_timing_correction(samples, sr, pitch_steps=pitch_steps, target_tempo=target_tempo) | |
| # Reverb | |
| samples = add_reverb(samples, sr, early=reverb_early, late=reverb_late, decay=reverb_decay) | |
| # Convert back | |
| out_seg = AudioSegment( | |
| samples.tobytes(), | |
| frame_rate=sr, | |
| sample_width=audio_seg.sample_width, | |
| channels=audio_seg.channels | |
| ) | |
| # Delay | |
| if delay_ms > 0: | |
| delayed = out_seg - 6 | |
| out_seg = out_seg.overlay(delayed, delay=delay_ms) | |
| # Chorus | |
| if chorus: | |
| chorus_tone = Sine(2).to_audio_segment(duration=len(out_seg)) | |
| out_seg = out_seg.overlay(chorus_tone - 18) | |
| # Normalize | |
| out_seg = effects.normalize(out_seg) | |
| output_file = os.path.join(TEMP_DIR, f"pro_effects_{uuid.uuid4()}.wav") | |
| out_seg.export(output_file, format="wav") | |
| logging.info(f"Effects applied: {output_file}") | |
| return output_file | |
| ########################################### | |
| # 8. MULTIPLE VOICE CLONING MODELS | |
| ########################################### | |
| AVAILABLE_MODELS = { | |
| "XTTS v2": "tts_models/multilingual/multi-dataset/xtts_v2", | |
| "VCTK VITS": "tts_models/en/vctk/vits" | |
| } | |
| def load_tts_model(model_name="XTTS v2"): | |
| if model_name not in AVAILABLE_MODELS: | |
| raise ValueError(f"Model '{model_name}' not available.") | |
| logging.info(f"Loading {model_name}...") | |
| return TTS(AVAILABLE_MODELS[model_name]) | |
| ########################################### | |
| # 9. CLONE VOICE | |
| ########################################### | |
| def clone_voice(text, voice_sample, output_file=None, model_name="XTTS v2", effects_params={}): | |
| if not output_file: | |
| output_file = os.path.join(TEMP_DIR, f"cloned_{uuid.uuid4()}.wav") | |
| verify_consent(voice_sample) | |
| cleaned = clean_audio(voice_sample) | |
| model = load_tts_model(model_name) | |
| model.tts_to_file(text=text, speaker_wav=cleaned, file_path=output_file) | |
| if effects_params: | |
| output_file = apply_effects_chain(output_file, **effects_params) | |
| logging.info(f"Cloned voice saved: {output_file}") | |
| return output_file | |
| ########################################### | |
| # 10. BATCH CLONING | |
| ########################################### | |
| def batch_clone(texts, voice_samples, model_name="XTTS v2", effects_params={}): | |
| results = [] | |
| for i, (text, voice_file) in enumerate(zip(texts, voice_samples)): | |
| logging.info(f"Processing batch {i+1}/{len(texts)}") | |
| out_file = clone_voice(text, voice_file, model_name=model_name, effects_params=effects_params) | |
| results.append(out_file) | |
| return results | |
| ########################################### | |
| # 11. MUSIC MIXING | |
| ########################################### | |
| def mix_audio(voice_file, music_file, output_file=None): | |
| if not output_file: | |
| output_file = os.path.join(TEMP_DIR, f"mixed_{uuid.uuid4()}.wav") | |
| voice = AudioSegment.from_file(voice_file) | |
| music = AudioSegment.from_file(music_file).apply_gain(-15) | |
| combined = dynamic_ducking(voice, music) | |
| combined.export(output_file, format="wav") | |
| logging.info(f"Mixed audio saved: {output_file}") | |
| return output_file | |
| ########################################### | |
| # 12. REAL-TIME VOICE CONVERSION (Optional) | |
| ########################################### | |
| try: | |
| from openvoice import VoiceConverter | |
| import sounddevice as sd | |
| vc_model = VoiceConverter() | |
| def realtime_voice_conversion(target_voice, sr=16000, block=1024): | |
| logging.info("Real-time conversion ON. Ctrl+C to stop.") | |
| def callback(indata, outdata, frames, t, status): | |
| audio_tensor = torch.tensor(indata[:, 0]).unsqueeze(0) | |
| converted = vc_model.convert(audio_tensor, target_voice) | |
| outdata[:] = converted.squeeze().numpy().reshape(-1, 1) | |
| with sd.Stream(channels=1, callback=callback, samplerate=sr, blocksize=block): | |
| sd.sleep(999999999) | |
| except Exception: | |
| logging.warning("OpenVoice module unavailable.") | |
| def realtime_voice_conversion(*args, **kwargs): | |
| logging.warning("Real-time voice conversion unavailable.") | |
| ########################################### | |
| # 13. CLEANUP TEMP | |
| ########################################### | |
| def cleanup_temp(): | |
| try: | |
| shutil.rmtree(TEMP_DIR) | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| logging.info("Temporary files cleaned.") | |
| except Exception as e: | |
| logging.error(f"Error cleaning temp files: {e}") | |
| ########################################### | |
| # 14. GRADIO INTERACTIVE MIXER GUI | |
| ########################################### | |
| DEFAULT_EFFECTS = { | |
| "eq_low_gain": 2, "eq_mid_gain":0, "eq_high_gain":1, | |
| "reverb_early":0.05, "reverb_late":0.3, "reverb_decay":0.5, | |
| "pitch_steps":0, "tempo":1.0, "compressor":True, "delay_ms":50, | |
| "chorus":True, "music_ducking":True | |
| } | |
| def interactive_clone(text, voice_file, music_file=None, | |
| eq_low_gain=2, eq_mid_gain=0, eq_high_gain=1, | |
| reverb_early=0.05, reverb_late=0.3, reverb_decay=0.5, | |
| pitch_steps=0, tempo=1.0, compressor=True, delay_ms=50, | |
| chorus=True, music_ducking=True, model="XTTS v2"): | |
| effects_params = { | |
| "eq_bands":[(100,1,eq_low_gain),(1000,1,eq_mid_gain),(5000,1,eq_high_gain)], | |
| "reverb_early":reverb_early, "reverb_late":reverb_late, "reverb_decay":reverb_decay, | |
| "pitch_steps":pitch_steps, "target_tempo":tempo, | |
| "compressor":compressor, "delay_ms":delay_ms, "chorus":chorus | |
| } | |
| output = clone_voice(text, voice_file, model_name=model, effects_params=effects_params) | |
| if music_file and music_ducking: | |
| output = mix_audio(output, music_file) | |
| return output | |
| def launch_mixer_gui(): | |
| interface = gr.Interface( | |
| fn=interactive_clone, | |
| inputs=[ | |
| gr.Textbox(label="Text to speak"), | |
| gr.Audio(label="Voice Sample (consensual)", type="filepath"), | |
| gr.Audio(label="Background Music (optional)", type="filepath"), | |
| gr.Slider(-12,12,value=DEFAULT_EFFECTS["eq_low_gain"],label="EQ Low Gain (dB)"), | |
| gr.Slider(-12,12,value=DEFAULT_EFFECTS["eq_mid_gain"],label="EQ Mid Gain (dB)"), | |
| gr.Slider(-12,12,value=DEFAULT_EFFECTS["eq_high_gain"],label="EQ High Gain (dB)"), | |
| gr.Slider(0,0.5,step=0.01,value=DEFAULT_EFFECTS["reverb_early"],label="Reverb Early Reflections (s)"), | |
| gr.Slider(0,1.0,step=0.01,value=DEFAULT_EFFECTS["reverb_late"],label="Reverb Late Reflections (s)"), | |
| gr.Slider(0,1.0,step=0.01,value=DEFAULT_EFFECTS["reverb_decay"],label="Reverb Decay"), | |
| gr.Slider(-12,12,step=1,value=DEFAULT_EFFECTS["pitch_steps"],label="Pitch Correction (semitones)"), | |
| gr.Slider(0.5,2.0,step=0.01,value=DEFAULT_EFFECTS["tempo"],label="Tempo Adjustment"), | |
| gr.Checkbox(label="Compressor",value=DEFAULT_EFFECTS["compressor"]), | |
| gr.Slider(0,500,step=10,value=DEFAULT_EFFECTS["delay_ms"],label="Delay (ms)"), | |
| gr.Checkbox(label="Chorus",value=DEFAULT_EFFECTS["chorus"]), | |
| gr.Checkbox(label="Dynamic Ducking for Music",value=DEFAULT_EFFECTS["music_ducking"]), | |
| gr.Dropdown(list(AVAILABLE_MODELS.keys()),value="XTTS v2",label="Voice Cloning Model") | |
| ], | |
| outputs=[gr.Audio(label="Cloned Audio Output")], | |
| title="DAW-Style Voice Cloning Mixer", | |
| description="Interactive voice cloning studio with real-time adjustable effects." | |
| ) | |
| interface.launch() | |
| ########################################### | |
| # 15. FASTAPI SERVER | |
| ########################################### | |
| app = FastAPI() | |
| @app.post("/api/clone_batch") | |
| async def api_clone_batch(texts: str = Form(...), voices: list[UploadFile] = None, model: str = Form("XTTS v2")): | |
| texts_list = texts.split(";") | |
| output_files = [] | |
| for i, voice in enumerate(voices): | |
| temp_voice = os.path.join(TEMP_DIR, f"temp_{uuid.uuid4()}.wav") | |
| with open(temp_voice, "wb") as f: | |
| f.write(await voice.read()) | |
| out_file = clone_voice(texts_list[i], temp_voice, model_name=model) | |
| output_files.append(out_file) | |
| return {"outputs": output_files} | |
| ########################################### | |
| # 16. MAIN MENU | |
| ########################################### | |
| def menu(): | |
| print(""" | |
| ======================================== | |
| PROFESSIONAL DAW-STYLE VOICE CLONING STUDIO | |
| ======================================== | |
| 1. Clone voice (offline) | |
| 2. Batch clone | |
| 3. Real-time voice conversion | |
| 4. Launch Gradio Mixer GUI | |
| 5. Launch API server | |
| 6. Cleanup temporary files | |
| 7. Exit | |
| """) | |
| choice = input("Select option: ") | |
| if choice == "1": | |
| text = input("Enter text: ") | |
| voice = input("Path to voice sample: ") | |
| music = input("Optional background music path: ") | |
| print("Available models:", list(AVAILABLE_MODELS.keys())) | |
| model = input("Choose model: ") or "XTTS v2" | |
| output = clone_voice(text, voice, model_name=model) | |
| if music: | |
| output = mix_audio(output, music) | |
| print("Output saved:", output) | |
| elif choice == "2": | |
| texts = input("Enter batch texts separated by ';': ") | |
| voice_paths = input("Enter batch voice sample paths separated by ';': ").split(";") | |
| print("Available models:", list(AVAILABLE_MODELS.keys())) | |
| model = input("Choose model: ") or "XTTS v2" | |
| outputs = batch_clone(texts.split(";"), voice_paths, model_name=model) | |
| print("Batch outputs:", outputs) | |
| elif choice == "3": | |
| target = input("Path to target voice sample: ") | |
| realtime_voice_conversion(target) | |
| elif choice == "4": | |
| launch_mixer_gui() | |
| elif choice == "5": | |
| logging.info("Starting API server...") | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |
| elif choice == "6": | |
| cleanup_temp() | |
| else: | |
| print("Goodbye.") | |
| if __name__ == "__main__": | |
| menu() | |