| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import os, uuid, shutil, logging |
| | import numpy as np |
| | import librosa, soundfile as sf |
| | import noisereduce as nr |
| | from pydub import AudioSegment, effects |
| | from pydub.generators import Sine |
| | from scipy.signal import butter, lfilter |
| | import torch |
| |
|
| | from fastapi import FastAPI, UploadFile, Form |
| | from fastapi.responses import FileResponse |
| | import uvicorn |
| | import gradio as gr |
| | from TTS.api import TTS |
| |
|
| | |
| | try: |
| | from speechbrain.pretrained import SpeakerRecognition |
| | speaker_verifier = SpeakerRecognition.from_hparams(source="speechbrain/spkrec-ecapa-voxceleb") |
| | CONSENT_VERIFICATION = True |
| | except Exception: |
| | CONSENT_VERIFICATION = False |
| | logging.warning("Speaker verification unavailable.") |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') |
| |
|
| | TEMP_DIR = "temp" |
| | os.makedirs(TEMP_DIR, exist_ok=True) |
| |
|
| | |
| | |
| | |
| | def verify_consent(voice_sample, consent_phrase="I consent to voice cloning"): |
| | print(f"Please say the following phrase: '{consent_phrase}'") |
| | if CONSENT_VERIFICATION: |
| | try: |
| | score, _ = speaker_verifier.verify_files(voice_sample, voice_sample) |
| | logging.info(f"Speaker verification score: {score:.2f}") |
| | if score < 0.7: |
| | raise PermissionError("Consent phrase does not match voice sample.") |
| | except Exception as e: |
| | logging.warning(f"Speaker verification failed: {e}") |
| | consent = input("Do you confirm this sample is used with your consent? (y/n): ") |
| | if consent.lower() != 'y': |
| | raise PermissionError("Consent not granted.") |
| | logging.info("Consent verified.") |
| |
|
| | |
| | |
| | |
| | def clean_audio(input_file, output_file=None): |
| | if not output_file: |
| | output_file = os.path.join(TEMP_DIR, f"clean_{uuid.uuid4()}.wav") |
| | audio, sr = librosa.load(input_file, sr=None) |
| | reduced = nr.reduce_noise(y=audio, sr=sr) |
| | sf.write(output_file, reduced, sr) |
| | logging.info(f"Cleaned audio saved to {output_file}") |
| | return output_file |
| |
|
| | |
| | |
| | |
| | def parametric_eq(samples, sr, bands): |
| | """ |
| | bands = list of tuples: (center_freq, Q, gain_dB) |
| | """ |
| | def apply_band(samples, center, Q, gain_db): |
| | nyq = sr / 2 |
| | low = center / np.sqrt(2) / nyq |
| | high = center * np.sqrt(2) / nyq |
| | b, a = butter(2, [low, high], btype='band') |
| | filtered = lfilter(b, a, samples) |
| | gain = 10 ** (gain_db / 20) |
| | return filtered * gain |
| | out = np.zeros_like(samples) |
| | for (f, Q, g) in bands: |
| | out += apply_band(samples, f, Q, g) |
| | return out |
| |
|
| | |
| | |
| | |
| | def add_reverb(samples, sr, early=0.1, late=0.3, decay=0.5): |
| | early_samples = int(early * sr) |
| | late_samples = int(late * sr) |
| | reverbed = np.copy(samples) |
| | if early_samples > 0: |
| | reverbed[early_samples:] += decay * samples[:-early_samples] |
| | if late_samples > 0: |
| | reverbed[late_samples:] += (decay/2) * samples[:-late_samples] |
| | return reverbed |
| |
|
| | |
| | |
| | |
| | def dynamic_ducking(voice, music, threshold_db=-35, reduction_db=-12): |
| | voice_rms = voice.rms |
| | if 20 * np.log10(voice_rms+1e-6) > threshold_db: |
| | music = music - abs(reduction_db) |
| | combined = music.overlay(voice) |
| | return combined |
| |
|
| | |
| | |
| | |
| | def pitch_and_timing_correction(samples, sr, pitch_steps=0, target_tempo=1.0): |
| | if pitch_steps != 0: |
| | samples = librosa.effects.pitch_shift(samples, sr, n_steps=pitch_steps) |
| | if target_tempo != 1.0: |
| | samples = librosa.effects.time_stretch(samples, rate=target_tempo) |
| | return samples |
| |
|
| | |
| | |
| | |
| | def apply_effects_chain(audio_file, sr=16000, eq_bands=[(100,1,0),(1000,1,0),(5000,1,0)], |
| | pitch_steps=0, target_tempo=1.0, |
| | reverb_early=0.05, reverb_late=0.3, reverb_decay=0.5, |
| | compressor=True, delay_ms=0, chorus=False): |
| | audio_seg = AudioSegment.from_file(audio_file) |
| | samples = np.array(audio_seg.get_array_of_samples()).astype(np.float32) |
| |
|
| | |
| | samples = parametric_eq(samples, sr, eq_bands) |
| |
|
| | |
| | if compressor: |
| | max_amp = np.max(np.abs(samples)) |
| | if max_amp > 0.9 * np.iinfo(samples.dtype).max: |
| | samples = samples * (0.9 * np.iinfo(samples.dtype).max / max_amp) |
| |
|
| | |
| | samples = pitch_and_timing_correction(samples, sr, pitch_steps=pitch_steps, target_tempo=target_tempo) |
| |
|
| | |
| | samples = add_reverb(samples, sr, early=reverb_early, late=reverb_late, decay=reverb_decay) |
| |
|
| | |
| | out_seg = AudioSegment( |
| | samples.tobytes(), |
| | frame_rate=sr, |
| | sample_width=audio_seg.sample_width, |
| | channels=audio_seg.channels |
| | ) |
| |
|
| | |
| | if delay_ms > 0: |
| | delayed = out_seg - 6 |
| | out_seg = out_seg.overlay(delayed, delay=delay_ms) |
| |
|
| | |
| | if chorus: |
| | chorus_tone = Sine(2).to_audio_segment(duration=len(out_seg)) |
| | out_seg = out_seg.overlay(chorus_tone - 18) |
| |
|
| | |
| | out_seg = effects.normalize(out_seg) |
| |
|
| | output_file = os.path.join(TEMP_DIR, f"pro_effects_{uuid.uuid4()}.wav") |
| | out_seg.export(output_file, format="wav") |
| | logging.info(f"Effects applied: {output_file}") |
| | return output_file |
| |
|
| | |
| | |
| | |
| | AVAILABLE_MODELS = { |
| | "XTTS v2": "tts_models/multilingual/multi-dataset/xtts_v2", |
| | "VCTK VITS": "tts_models/en/vctk/vits" |
| | } |
| |
|
| | def load_tts_model(model_name="XTTS v2"): |
| | if model_name not in AVAILABLE_MODELS: |
| | raise ValueError(f"Model '{model_name}' not available.") |
| | logging.info(f"Loading {model_name}...") |
| | return TTS(AVAILABLE_MODELS[model_name]) |
| |
|
| | |
| | |
| | |
| | def clone_voice(text, voice_sample, output_file=None, model_name="XTTS v2", effects_params={}): |
| | if not output_file: |
| | output_file = os.path.join(TEMP_DIR, f"cloned_{uuid.uuid4()}.wav") |
| | verify_consent(voice_sample) |
| | cleaned = clean_audio(voice_sample) |
| | model = load_tts_model(model_name) |
| | model.tts_to_file(text=text, speaker_wav=cleaned, file_path=output_file) |
| | if effects_params: |
| | output_file = apply_effects_chain(output_file, **effects_params) |
| | logging.info(f"Cloned voice saved: {output_file}") |
| | return output_file |
| |
|
| | |
| | |
| | |
| | def batch_clone(texts, voice_samples, model_name="XTTS v2", effects_params={}): |
| | results = [] |
| | for i, (text, voice_file) in enumerate(zip(texts, voice_samples)): |
| | logging.info(f"Processing batch {i+1}/{len(texts)}") |
| | out_file = clone_voice(text, voice_file, model_name=model_name, effects_params=effects_params) |
| | results.append(out_file) |
| | return results |
| |
|
| | |
| | |
| | |
| | def mix_audio(voice_file, music_file, output_file=None): |
| | if not output_file: |
| | output_file = os.path.join(TEMP_DIR, f"mixed_{uuid.uuid4()}.wav") |
| | voice = AudioSegment.from_file(voice_file) |
| | music = AudioSegment.from_file(music_file).apply_gain(-15) |
| | combined = dynamic_ducking(voice, music) |
| | combined.export(output_file, format="wav") |
| | logging.info(f"Mixed audio saved: {output_file}") |
| | return output_file |
| |
|
| | |
| | |
| | |
| | try: |
| | from openvoice import VoiceConverter |
| | import sounddevice as sd |
| | vc_model = VoiceConverter() |
| | def realtime_voice_conversion(target_voice, sr=16000, block=1024): |
| | logging.info("Real-time conversion ON. Ctrl+C to stop.") |
| | def callback(indata, outdata, frames, t, status): |
| | audio_tensor = torch.tensor(indata[:, 0]).unsqueeze(0) |
| | converted = vc_model.convert(audio_tensor, target_voice) |
| | outdata[:] = converted.squeeze().numpy().reshape(-1, 1) |
| | with sd.Stream(channels=1, callback=callback, samplerate=sr, blocksize=block): |
| | sd.sleep(999999999) |
| | except Exception: |
| | logging.warning("OpenVoice module unavailable.") |
| | def realtime_voice_conversion(*args, **kwargs): |
| | logging.warning("Real-time voice conversion unavailable.") |
| |
|
| | |
| | |
| | |
| | def cleanup_temp(): |
| | try: |
| | shutil.rmtree(TEMP_DIR) |
| | os.makedirs(TEMP_DIR, exist_ok=True) |
| | logging.info("Temporary files cleaned.") |
| | except Exception as e: |
| | logging.error(f"Error cleaning temp files: {e}") |
| |
|
| | |
| | |
| | |
| | DEFAULT_EFFECTS = { |
| | "eq_low_gain": 2, "eq_mid_gain":0, "eq_high_gain":1, |
| | "reverb_early":0.05, "reverb_late":0.3, "reverb_decay":0.5, |
| | "pitch_steps":0, "tempo":1.0, "compressor":True, "delay_ms":50, |
| | "chorus":True, "music_ducking":True |
| | } |
| |
|
| | def interactive_clone(text, voice_file, music_file=None, |
| | eq_low_gain=2, eq_mid_gain=0, eq_high_gain=1, |
| | reverb_early=0.05, reverb_late=0.3, reverb_decay=0.5, |
| | pitch_steps=0, tempo=1.0, compressor=True, delay_ms=50, |
| | chorus=True, music_ducking=True, model="XTTS v2"): |
| |
|
| | effects_params = { |
| | "eq_bands":[(100,1,eq_low_gain),(1000,1,eq_mid_gain),(5000,1,eq_high_gain)], |
| | "reverb_early":reverb_early, "reverb_late":reverb_late, "reverb_decay":reverb_decay, |
| | "pitch_steps":pitch_steps, "target_tempo":tempo, |
| | "compressor":compressor, "delay_ms":delay_ms, "chorus":chorus |
| | } |
| |
|
| | output = clone_voice(text, voice_file, model_name=model, effects_params=effects_params) |
| | if music_file and music_ducking: |
| | output = mix_audio(output, music_file) |
| | return output |
| |
|
| | def launch_mixer_gui(): |
| | interface = gr.Interface( |
| | fn=interactive_clone, |
| | inputs=[ |
| | gr.Textbox(label="Text to speak"), |
| | gr.Audio(label="Voice Sample (consensual)", type="filepath"), |
| | gr.Audio(label="Background Music (optional)", type="filepath"), |
| | gr.Slider(-12,12,value=DEFAULT_EFFECTS["eq_low_gain"],label="EQ Low Gain (dB)"), |
| | gr.Slider(-12,12,value=DEFAULT_EFFECTS["eq_mid_gain"],label="EQ Mid Gain (dB)"), |
| | gr.Slider(-12,12,value=DEFAULT_EFFECTS["eq_high_gain"],label="EQ High Gain (dB)"), |
| | gr.Slider(0,0.5,step=0.01,value=DEFAULT_EFFECTS["reverb_early"],label="Reverb Early Reflections (s)"), |
| | gr.Slider(0,1.0,step=0.01,value=DEFAULT_EFFECTS["reverb_late"],label="Reverb Late Reflections (s)"), |
| | gr.Slider(0,1.0,step=0.01,value=DEFAULT_EFFECTS["reverb_decay"],label="Reverb Decay"), |
| | gr.Slider(-12,12,step=1,value=DEFAULT_EFFECTS["pitch_steps"],label="Pitch Correction (semitones)"), |
| | gr.Slider(0.5,2.0,step=0.01,value=DEFAULT_EFFECTS["tempo"],label="Tempo Adjustment"), |
| | gr.Checkbox(label="Compressor",value=DEFAULT_EFFECTS["compressor"]), |
| | gr.Slider(0,500,step=10,value=DEFAULT_EFFECTS["delay_ms"],label="Delay (ms)"), |
| | gr.Checkbox(label="Chorus",value=DEFAULT_EFFECTS["chorus"]), |
| | gr.Checkbox(label="Dynamic Ducking for Music",value=DEFAULT_EFFECTS["music_ducking"]), |
| | gr.Dropdown(list(AVAILABLE_MODELS.keys()),value="XTTS v2",label="Voice Cloning Model") |
| | ], |
| | outputs=[gr.Audio(label="Cloned Audio Output")], |
| | title="DAW-Style Voice Cloning Mixer", |
| | description="Interactive voice cloning studio with real-time adjustable effects." |
| | ) |
| | interface.launch() |
| |
|
| | |
| | |
| | |
| | app = FastAPI() |
| |
|
| | @app.post("/api/clone_batch") |
| | async def api_clone_batch(texts: str = Form(...), voices: list[UploadFile] = None, model: str = Form("XTTS v2")): |
| | texts_list = texts.split(";") |
| | output_files = [] |
| | for i, voice in enumerate(voices): |
| | temp_voice = os.path.join(TEMP_DIR, f"temp_{uuid.uuid4()}.wav") |
| | with open(temp_voice, "wb") as f: |
| | f.write(await voice.read()) |
| | out_file = clone_voice(texts_list[i], temp_voice, model_name=model) |
| | output_files.append(out_file) |
| | return {"outputs": output_files} |
| |
|
| | |
| | |
| | |
| | def menu(): |
| | print(""" |
| | ======================================== |
| | PROFESSIONAL DAW-STYLE VOICE CLONING STUDIO |
| | ======================================== |
| | 1. Clone voice (offline) |
| | 2. Batch clone |
| | 3. Real-time voice conversion |
| | 4. Launch Gradio Mixer GUI |
| | 5. Launch API server |
| | 6. Cleanup temporary files |
| | 7. Exit |
| | """) |
| | choice = input("Select option: ") |
| |
|
| | if choice == "1": |
| | text = input("Enter text: ") |
| | voice = input("Path to voice sample: ") |
| | music = input("Optional background music path: ") |
| | print("Available models:", list(AVAILABLE_MODELS.keys())) |
| | model = input("Choose model: ") or "XTTS v2" |
| | output = clone_voice(text, voice, model_name=model) |
| | if music: |
| | output = mix_audio(output, music) |
| | print("Output saved:", output) |
| |
|
| | elif choice == "2": |
| | texts = input("Enter batch texts separated by ';': ") |
| | voice_paths = input("Enter batch voice sample paths separated by ';': ").split(";") |
| | print("Available models:", list(AVAILABLE_MODELS.keys())) |
| | model = input("Choose model: ") or "XTTS v2" |
| | outputs = batch_clone(texts.split(";"), voice_paths, model_name=model) |
| | print("Batch outputs:", outputs) |
| |
|
| | elif choice == "3": |
| | target = input("Path to target voice sample: ") |
| | realtime_voice_conversion(target) |
| |
|
| | elif choice == "4": |
| | launch_mixer_gui() |
| |
|
| | elif choice == "5": |
| | logging.info("Starting API server...") |
| | uvicorn.run(app, host="0.0.0.0", port=8000) |
| |
|
| | elif choice == "6": |
| | cleanup_temp() |
| |
|
| | else: |
| | print("Goodbye.") |
| |
|
| | if __name__ == "__main__": |
| | menu() |
| |
|