import os import sys import uuid from pathlib import Path from contextlib import contextmanager import ruptures as rpt import numpy as np import torch import gradio as gr import librosa from pyharp.core import ModelCard, build_endpoint from pyharp.media.audio import save_audio from pyharp import LabelList, AudioLabel, OutputLabel from audiotools import AudioSignal from audioseal import AudioSeal LOUDNESS_DB = -16. SAMPLE_RATE = 48_000 ENCODEC_SAMPLE_RATE = 16_000 AUDIOSEAL_SAMPLE_RATE = 16_000 model_card = ModelCard( name="Meta AudioSeal Watermarking", description=("Meta AudioSeal watermarking generation and detection model\n" "The watermark is applied under 16kHz."), author="Robin San Roman, Pierre Fernandez, Alexandre Défossez, Teddy Furon, Tuan Tran, Hady Elsahar", tags=["watermarking"] ) print("Initializing AudioSeal model...") generator = AudioSeal.load_generator("audioseal_wm_16bits") detector = AudioSeal.load_detector("audioseal_detector_16bits") generator.eval() detector.eval() def load_audio(audio_path): try: wav, sr = librosa.load(audio_path, mono=True) return wav, sr except Exception as e: print(f"Audio preprocessing failed: {e}") raise ValueError(f"Failed to load audio: {str(e)}") @torch.no_grad() def split_bands(signal: AudioSignal, sample_rate: float = ENCODEC_SAMPLE_RATE): nyq = sample_rate // 2 high = signal.clone().high_pass(cutoffs=int(nyq * 0.95), zeros=51) low = signal.clone().low_pass(cutoffs=int(nyq * 1.05), zeros=51) loud_db = low.loudness() low = low.resample(sample_rate) return low, high, loud_db @torch.no_grad() def merge_bands(low, high, loud_db): low = low.clone().to(high.device).resample(high.sample_rate) low.audio_data = low.audio_data[..., :high.signal_length] low.audio_data = torch.nn.functional.pad( low.audio_data, (0, max(0, high.signal_length - low.signal_length)) ) return low.normalize(loud_db) + high @torch.no_grad() def embed(signal: AudioSignal, embedder: torch.nn.Module): orig_ch, orig_sr = signal.num_channels, signal.sample_rate sig = signal.clone().resample(SAMPLE_RATE) if orig_ch > 1: b, c, n = sig.audio_data.shape sig.audio_data = sig.audio_data.reshape(b * c, 1, n) low, high, loud = split_bands(sig.clone(), AUDIOSEAL_SAMPLE_RATE) wm = embedder.get_watermark(low.audio_data, AUDIOSEAL_SAMPLE_RATE) low.audio_data = low.audio_data + wm merged = merge_bands(low, high, loud) if orig_ch > 1: b2, c2, n2 = merged.audio_data.shape merged.audio_data = merged.audio_data.reshape(-1, orig_ch * c2, n2) return merged.resample(orig_sr) @torch.no_grad() def detect(signal: AudioSignal, detector: torch.nn.Module): sig = signal.clone().to_mono().resample(AUDIOSEAL_SAMPLE_RATE) result, _ = detector.forward(sig.audio_data, sample_rate=AUDIOSEAL_SAMPLE_RATE) return result[0, 1, :].detach().cpu().numpy() def process_fn(inp_audio, option_text): audio_np, sr = load_audio(inp_audio) print(f"sr: {sr}, audio shape: {audio_np.shape}") if audio_np.ndim == 1: audio_np = audio_np[None, None, :] else: audio_np = np.transpose(audio_np, (1, 0))[None, ...] print(f"formatted audio: {audio_np.shape}") ori_sig = AudioSignal(torch.from_numpy(audio_np).float(), sample_rate=sr) orig_loud = ori_sig.loudness() sig = ori_sig.to_mono().resample(SAMPLE_RATE).normalize(LOUDNESS_DB).ensure_max_of_audio() output_labels = LabelList() if option_text == "Generate Watermark": with torch.no_grad(): wm_sig = embed(sig.clone(), generator).normalize(orig_loud).ensure_max_of_audio() output_labels.labels.append( AudioLabel( t = 0, label = "watermark: 1.0", duration = wm_sig.duration, description = f"watermark confidence: 1.0, start: 0.0s, end: {wm_sig.duration:.2f}s", color = OutputLabel.rgb_color_to_int(255, 0, 0), amplitude = 1.0 ) ) return save_audio(wm_sig), output_labels else: with torch.no_grad(): scores = detect(sig, detector) # AUDIOSEAL_SAMPLE_RATE N = len(scores) hop = int(0.01 * AUDIOSEAL_SAMPLE_RATE) avg_curve = [] for i in range(0, N, hop): seg = scores[i:i+hop] value = np.mean(seg) avg_curve.append(value) avg_curve = np.array(avg_curve) print(avg_curve.shape) min_size = max(2, int(0.25 * AUDIOSEAL_SAMPLE_RATE)) bkps = rpt.Pelt(model="l2", min_size=1).fit_predict(avg_curve, 1.0) t0 = 0 for t1 in bkps: print(t0, t1) seg = avg_curve[t0:t1] value = seg.mean() output_labels.labels.append( AudioLabel( t = (t0 / 100), label = f"watermark: {value:.2f}", duration = (t1 - t0) / 100, description = f"watermark confidence: {value:.2f}, start: {(t0 / 100):.2f}s, end: {(t1 / 100):.2f}s", color = OutputLabel.rgb_color_to_int(int(value * 255), int((1 - value) * 255), 0), amplitude = value * 2 - 1 ) ) t0 = t1 return inp_audio, output_labels with gr.Blocks() as app: gr.Markdown("## Meta AudioSeal Watermarking") # Inputs input_audio = gr.Audio( label="Input Audio", type="filepath", sources=["upload", "microphone"] ) option_dropdown = gr.Dropdown( ["Generate Watermark", "Detect Watermark"], value='Generate Watermark', label='Option', info='Model Options' ) # Outputs output_wav = gr.Audio( type="filepath", label="Watermarked Speech" ) output_label = gr.JSON(label="Watermark Confidence") _ = build_endpoint( model_card=model_card, input_components=[ input_audio, option_dropdown ], output_components=[ output_wav, output_label ], process_fn=process_fn ) if __name__ == '__main__': app.launch(share=True, show_error=True, debug=True)