| | import os |
| | import gradio as gr |
| | import numpy as np |
| | import soundfile as sf |
| | from semanticodec import SemantiCodec |
| | from huggingface_hub import HfApi |
| | import spaces |
| | import torch |
| | import tempfile |
| | import io |
| | import uuid |
| | import pickle |
| | import time |
| | from pathlib import Path |
| |
|
| | |
| | def load_model(): |
| | model = SemantiCodec(token_rate=100, semantic_vocab_size=32768) |
| | if torch.cuda.is_available(): |
| | |
| | model = model.to("cuda:0") |
| | |
| | dummy_input = torch.zeros(1, 1, 1, dtype=torch.long).cuda() |
| | try: |
| | with torch.no_grad(): |
| | _ = model.decoder(dummy_input) |
| | except: |
| | print("Dummy forward pass failed, but CUDA initialization attempted") |
| | return model |
| |
|
| | |
| | semanticodec = load_model() |
| | |
| | model_device = "cuda:0" if torch.cuda.is_available() else "cpu" |
| | print(f"Model initialized on device: {model_device}") |
| |
|
| | |
| | |
| | SAMPLE_RATE = 16000 |
| |
|
| | @spaces.GPU(duration=20) |
| | def encode_audio(audio_path): |
| | """Encode audio file to tokens and return them as a file""" |
| | try: |
| | print(f"Encoding audio on device: {model_device}") |
| | |
| | semanticodec.to(model_device) |
| | tokens = semanticodec.encode(audio_path) |
| | print(f"Tokens device after encode: {tokens.device if isinstance(tokens, torch.Tensor) else 'numpy'}") |
| | |
| | |
| | if isinstance(tokens, torch.Tensor): |
| | tokens = tokens.cpu().numpy() |
| | |
| | |
| | if tokens.ndim == 1: |
| | |
| | tokens = tokens.reshape(1, -1, 1) |
| | |
| | |
| | token_data = { |
| | 'tokens': tokens, |
| | 'shape': tokens.shape, |
| | 'device': str(model_device) |
| | } |
| | |
| | |
| | temp_dir = "/tmp" |
| | os.makedirs(temp_dir, exist_ok=True) |
| | temp_file_path = os.path.join(temp_dir, f"tokens_{uuid.uuid4()}.oterin") |
| | |
| | |
| | with open(temp_file_path, "wb") as f: |
| | pickle.dump(token_data, f) |
| | |
| | |
| | if not os.path.exists(temp_file_path) or os.path.getsize(temp_file_path) == 0: |
| | raise Exception("Failed to create token file") |
| | |
| | return temp_file_path, f"Encoded to {tokens.shape[1]} tokens" |
| | except Exception as e: |
| | print(f"Encoding error: {str(e)}") |
| | return None, f"Error encoding audio: {str(e)}" |
| |
|
| | @spaces.GPU(duration=160) |
| | def decode_tokens(token_file): |
| | """Decode tokens to audio""" |
| | |
| | if not token_file or not os.path.exists(token_file): |
| | return None, "Error: Empty or missing token file" |
| | |
| | try: |
| | |
| | with open(token_file, "rb") as f: |
| | token_data = pickle.load(f) |
| | |
| | tokens = token_data['tokens'] |
| | intended_device = token_data.get('device', model_device) |
| | print(f"Loaded tokens with shape {tokens.shape}, intended device: {intended_device}") |
| | |
| | |
| | semanticodec.to(model_device) |
| | print(f"Model device before tensor creation: {next(semanticodec.parameters()).device}") |
| | |
| | |
| | tokens_tensor = torch.tensor(tokens, dtype=torch.long) |
| | print(f"Tokens tensor created on device: {tokens_tensor.device} with dtype: {tokens_tensor.dtype}") |
| | |
| | |
| | tokens_tensor = tokens_tensor.to(model_device) |
| | print(f"Tokens moved to device: {tokens_tensor.device}") |
| | |
| | |
| | waveform = semanticodec.decode(tokens_tensor) |
| | print(f"Waveform device after decode: {waveform.device if isinstance(waveform, torch.Tensor) else 'numpy'}") |
| | |
| | |
| | if isinstance(waveform, torch.Tensor): |
| | waveform = waveform.cpu().numpy() |
| | |
| | |
| | audio_data = waveform[0, 0] |
| | |
| | print(f"Audio data shape: {audio_data.shape}, dtype: {audio_data.dtype}") |
| | |
| | |
| | return (SAMPLE_RATE, audio_data), f"Decoded {tokens.shape[1]} tokens to audio" |
| | except Exception as e: |
| | print(f"Decoding error: {str(e)}") |
| | return None, f"Error decoding tokens: {str(e)}" |
| |
|
| | @spaces.GPU(duration=250) |
| | def process_both(audio_path): |
| | """Encode and then decode the audio without saving intermediate files""" |
| | try: |
| | print(f"Processing both on device: {model_device}") |
| | |
| | semanticodec.to(model_device) |
| | |
| | tokens = semanticodec.encode(audio_path) |
| | print(f"Tokens device after encode: {tokens.device if isinstance(tokens, torch.Tensor) else 'numpy'}") |
| | |
| | if isinstance(tokens, torch.Tensor): |
| | tokens = tokens.cpu().numpy() |
| | |
| | |
| | if tokens.ndim == 1: |
| | |
| | tokens = tokens.reshape(1, -1, 1) |
| | |
| | |
| | tokens_tensor = torch.tensor(tokens, dtype=torch.long) |
| | print(f"Tokens tensor created on device: {tokens_tensor.device} with dtype: {tokens_tensor.dtype}") |
| | |
| | |
| | tokens_tensor = tokens_tensor.to(model_device) |
| | print(f"Tokens moved to device: {tokens_tensor.device}") |
| | |
| | |
| | semanticodec.to(model_device) |
| | print(f"Model device before decode: {next(semanticodec.parameters()).device}") |
| | |
| | |
| | waveform = semanticodec.decode(tokens_tensor) |
| | print(f"Waveform device after decode: {waveform.device if isinstance(waveform, torch.Tensor) else 'numpy'}") |
| | |
| | |
| | if isinstance(waveform, torch.Tensor): |
| | waveform = waveform.cpu().numpy() |
| | |
| | |
| | audio_data = waveform[0, 0] |
| | |
| | print(f"Audio data shape: {audio_data.shape}, dtype: {audio_data.dtype}") |
| | |
| | |
| | return (SAMPLE_RATE, audio_data), f"Encoded to {tokens.shape[1]} tokens\nDecoded {tokens.shape[1]} tokens to audio" |
| | except Exception as e: |
| | print(f"Processing error: {str(e)}") |
| | return None, f"Error processing audio: {str(e)}" |
| |
|
| | @spaces.GPU(duration=250) |
| | def stream_both(audio_path): |
| | """Encode and then stream decode the audio""" |
| | try: |
| | print(f"Processing both (streaming) on device: {model_device}") |
| | |
| | semanticodec.to(model_device) |
| | |
| | |
| | tokens = semanticodec.encode(audio_path) |
| | if isinstance(tokens, torch.Tensor): |
| | tokens = tokens.cpu().numpy() |
| | |
| | |
| | if tokens.ndim == 1: |
| | tokens = tokens.reshape(1, -1, 1) |
| | |
| | print(f"Encoded audio to {tokens.shape[1]} tokens, now streaming decoding...") |
| | yield None, f"Encoded to {tokens.shape[1]} tokens, starting decoding..." |
| | |
| | |
| | if tokens.shape[1] < 1500: |
| | |
| | tokens_tensor = torch.tensor(tokens, dtype=torch.long).to(model_device) |
| | |
| | |
| | semanticodec.to(model_device) |
| | waveform = semanticodec.decode(tokens_tensor) |
| | if isinstance(waveform, torch.Tensor): |
| | waveform = waveform.cpu().numpy() |
| | |
| | audio_data = waveform[0, 0] |
| | yield (SAMPLE_RATE, audio_data), f"Encoded to {tokens.shape[1]} tokens and decoded to audio" |
| | return |
| | |
| | |
| | chunk_size = 1500 |
| | num_chunks = (tokens.shape[1] + chunk_size - 1) // chunk_size |
| | |
| | all_audio_chunks = [] |
| | |
| | for i in range(num_chunks): |
| | start_idx = i * chunk_size |
| | end_idx = min((i + 1) * chunk_size, tokens.shape[1]) |
| | |
| | print(f"Decoding chunk {i+1}/{num_chunks}, tokens {start_idx} to {end_idx}") |
| | |
| | |
| | token_chunk = tokens[:, start_idx:end_idx, :] |
| | |
| | |
| | tokens_tensor = torch.tensor(token_chunk, dtype=torch.long).to(model_device) |
| | |
| | |
| | semanticodec.to(model_device) |
| | |
| | |
| | waveform = semanticodec.decode(tokens_tensor) |
| | if isinstance(waveform, torch.Tensor): |
| | waveform = waveform.cpu().numpy() |
| | |
| | |
| | audio_chunk = waveform[0, 0] |
| | all_audio_chunks.append(audio_chunk) |
| | |
| | |
| | combined_audio = np.concatenate(all_audio_chunks) |
| | |
| | |
| | yield (SAMPLE_RATE, combined_audio), f"Encoded to {tokens.shape[1]} tokens\nDecoded chunk {i+1}/{num_chunks} ({end_idx}/{tokens.shape[1]} tokens)" |
| | |
| | |
| | time.sleep(0.1) |
| | |
| | |
| | combined_audio = np.concatenate(all_audio_chunks) |
| | yield (SAMPLE_RATE, combined_audio), f"Completed: Encoded to {tokens.shape[1]} tokens and fully decoded" |
| | |
| | except Exception as e: |
| | print(f"Streaming process error: {str(e)}") |
| | yield None, f"Error processing audio: {str(e)}" |
| |
|
| | @spaces.GPU(duration=250) |
| | def stream_decode_tokens(token_file): |
| | """Decode tokens to audio in streaming chunks""" |
| | |
| | if not token_file or not os.path.exists(token_file): |
| | yield None, "Error: Empty or missing token file" |
| | return |
| | |
| | try: |
| | |
| | with open(token_file, "rb") as f: |
| | token_data = pickle.load(f) |
| | |
| | tokens = token_data['tokens'] |
| | intended_device = token_data.get('device', model_device) |
| | print(f"Loaded tokens with shape {tokens.shape}, intended device: {intended_device}") |
| | |
| | |
| | semanticodec.to(model_device) |
| | |
| | |
| | if tokens.shape[1] < 1500: |
| | |
| | tokens_tensor = torch.tensor(tokens, dtype=torch.long) |
| | tokens_tensor = tokens_tensor.to(model_device) |
| | |
| | |
| | waveform = semanticodec.decode(tokens_tensor) |
| | if isinstance(waveform, torch.Tensor): |
| | waveform = waveform.cpu().numpy() |
| | |
| | audio_data = waveform[0, 0] |
| | yield (SAMPLE_RATE, audio_data), f"Decoded {tokens.shape[1]} tokens to audio" |
| | return |
| | |
| | |
| | chunk_size = 1500 |
| | num_chunks = (tokens.shape[1] + chunk_size - 1) // chunk_size |
| | |
| | |
| | yield None, f"Starting decoding of {tokens.shape[1]} tokens in {num_chunks} chunks..." |
| | |
| | all_audio_chunks = [] |
| | |
| | for i in range(num_chunks): |
| | start_idx = i * chunk_size |
| | end_idx = min((i + 1) * chunk_size, tokens.shape[1]) |
| | |
| | print(f"Decoding chunk {i+1}/{num_chunks}, tokens {start_idx} to {end_idx}") |
| | |
| | |
| | token_chunk = tokens[:, start_idx:end_idx, :] |
| | |
| | |
| | tokens_tensor = torch.tensor(token_chunk, dtype=torch.long) |
| | tokens_tensor = tokens_tensor.to(model_device) |
| | |
| | |
| | semanticodec.to(model_device) |
| | |
| | |
| | waveform = semanticodec.decode(tokens_tensor) |
| | if isinstance(waveform, torch.Tensor): |
| | waveform = waveform.cpu().numpy() |
| | |
| | |
| | audio_chunk = waveform[0, 0] |
| | all_audio_chunks.append(audio_chunk) |
| | |
| | |
| | combined_audio = np.concatenate(all_audio_chunks) |
| | |
| | |
| | yield (SAMPLE_RATE, combined_audio), f"Decoded chunk {i+1}/{num_chunks} ({end_idx}/{tokens.shape[1]} tokens)" |
| | |
| | |
| | time.sleep(0.1) |
| | |
| | |
| | combined_audio = np.concatenate(all_audio_chunks) |
| | yield (SAMPLE_RATE, combined_audio), f"Completed decoding all {tokens.shape[1]} tokens" |
| | |
| | except Exception as e: |
| | print(f"Streaming decode error: {str(e)}") |
| | yield None, f"Error decoding tokens: {str(e)}" |
| |
|
| | |
| | with gr.Blocks(title="Oterin Audio Codec") as demo: |
| | gr.Markdown("# Oterin Audio Codec") |
| | gr.Markdown("Upload an audio file to encode it to semantic tokens, decode tokens back to audio, or do both.") |
| | |
| | with gr.Tab("Encode Audio"): |
| | with gr.Row(): |
| | encode_input = gr.Audio(type="filepath", label="Input Audio") |
| | encode_output = gr.File(label="Encoded Tokens (.oterin)", file_types=[".oterin"]) |
| | encode_status = gr.Textbox(label="Status") |
| | encode_btn = gr.Button("Encode") |
| | encode_btn.click(encode_audio, inputs=encode_input, outputs=[encode_output, encode_status]) |
| | |
| | with gr.Tab("Decode Tokens"): |
| | with gr.Row(): |
| | decode_input = gr.File(label="Token File (.oterin)", file_types=[".oterin"]) |
| | decode_output = gr.Audio(label="Decoded Audio") |
| | decode_status = gr.Textbox(label="Status") |
| | decode_btn = gr.Button("Decode") |
| | decode_btn.click(decode_tokens, inputs=decode_input, outputs=[decode_output, decode_status]) |
| | |
| | with gr.Tab("Stream Decode (Listen while decoding)"): |
| | with gr.Row(): |
| | stream_decode_input = gr.File(label="Token File (.oterin)", file_types=[".oterin"]) |
| | stream_decode_output = gr.Audio(label="Streaming Audio Output") |
| | stream_decode_status = gr.Textbox(label="Status") |
| | stream_decode_btn = gr.Button("Start Streaming Decode") |
| | stream_decode_btn.click( |
| | stream_decode_tokens, |
| | inputs=stream_decode_input, |
| | outputs=[stream_decode_output, stream_decode_status], |
| | show_progress=True |
| | ) |
| | |
| | with gr.Tab("Both (Encode & Decode)"): |
| | with gr.Row(): |
| | both_input = gr.Audio(type="filepath", label="Input Audio") |
| | both_output = gr.Audio(label="Reconstructed Audio") |
| | both_status = gr.Textbox(label="Status") |
| | both_btn = gr.Button("Process") |
| | both_btn.click(process_both, inputs=both_input, outputs=[both_output, both_status]) |
| | |
| | with gr.Tab("Both Streaming (Encode & Stream Decode)"): |
| | with gr.Row(): |
| | stream_both_input = gr.Audio(type="filepath", label="Input Audio") |
| | stream_both_output = gr.Audio(label="Streaming Reconstructed Audio") |
| | stream_both_status = gr.Textbox(label="Status") |
| | stream_both_btn = gr.Button("Encode & Stream Decode") |
| | stream_both_btn.click( |
| | stream_both, |
| | inputs=stream_both_input, |
| | outputs=[stream_both_output, stream_both_status], |
| | show_progress=True |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch(share=True) |