File size: 8,991 Bytes
11eafe6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3c4e358
997d9c0
 
11eafe6
 
 
 
 
 
 
 
 
997d9c0
11eafe6
 
 
 
 
 
77d3459
11eafe6
 
 
 
 
 
 
 
 
 
 
 
 
 
3c4e358
11eafe6
997d9c0
11eafe6
997d9c0
11eafe6
997d9c0
11eafe6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import argparse
import os
import torch
import yaml
import numpy as np
import soundfile as sf
import librosa
from audiotools import AudioSignal
from model import DACVAE as VAE


class DACVAEInference:
    def __init__(self, checkpoint_path, config_path=None, device='cuda'):
        """
        Initialize DACVAE for inference.
        
        Args:
            checkpoint_path (str): Path to checkpoint file
            config_path (str): Path to config YAML (optional, will try to load from checkpoint)
            device (str): Device to run inference on ('cuda' or 'cpu')
        """
        self.device = device
        
        # Load checkpoint
        print(f"Loading checkpoint from {checkpoint_path}")
        checkpoint = torch.load(checkpoint_path, map_location='cpu')
        
        # Load config
        if config_path:
            with open(config_path, 'r') as f:
                self.config = yaml.safe_load(f)
        elif 'config' in checkpoint:
            self.config = checkpoint['config']
        else:
            raise ValueError("Config not found in checkpoint and no config_path provided")
        
        # Initialize model
        print("Initializing DACVAE model")
        self.model = VAE(**self.config['vae'])
        
        # Load weights
        if 'generator' in checkpoint:
            self.model.load_state_dict(checkpoint['generator'])
        else:
            # Try direct state dict
            self.model.load_state_dict(checkpoint)
        
        self.model.to(self.device)
        self.model.eval()
        
        # Get sample rate from config
        self.sample_rate = self.config['vae']['sample_rate']
        print(f"Model loaded successfully. Sample rate: {self.sample_rate} Hz")
    
    @torch.no_grad()
    def encode(self, audio_path):
        """
        Encode an audio file to latent representation.
        
        Args:
            audio_path (str): Path to input audio file
            
        Returns:
            tuple: (z, mu, logs) - latent representation and distribution parameters
        """
        # Load audio with librosa - always converts to mono and resamples
        print(f"Loading audio from {audio_path}")
        import librosa
        audio, sr = librosa.load(audio_path, sr=self.sample_rate, mono=True)
        
        print(f"Audio loaded: shape={audio.shape}, sample_rate={sr}")
        
        # Create tensor - audio is already mono [T]
        audio_tensor = torch.from_numpy(audio).float().unsqueeze(0).unsqueeze(0)  # [1, 1, T]
        audio_tensor = audio_tensor.to(self.device)
        
        # Normalize to [-1, 1]
        audio_tensor = torch.clamp(audio_tensor, -1.0, 1.0)
        
        # Encode
        print("Encoding audio...")
        z, mu, logs = self.model.encode(audio_tensor, self.sample_rate)
        
        return z, mu, logs
    
    @torch.no_grad()
    def decode(self, z):
        """
        Decode latent representation back to audio.
        
        Args:
            z (torch.Tensor): Latent representation
            
        Returns:
            np.ndarray: Decoded audio
        """
        print("Decoding latent representation...")
        audio_tensor = self.model.decode(z)
        
        # Convert to numpy
        audio = audio_tensor.squeeze().cpu().numpy()  # Remove batch dim and get [T] or [C, T]
        
        # If multi-channel, take first channel or average
        if audio.ndim == 2:
            audio = audio[0]  # Take first channel, or use audio.mean(axis=0) to average
        
        # Clamp to valid range
        audio = np.clip(audio, -1.0, 1.0)
        
        return audio
    
    @torch.no_grad()
    def encode_decode(self, audio_path, output_path=None):
        """
        Full encode-decode pipeline for an audio file.
        
        Args:
            audio_path (str): Path to input audio file
            output_path (str): Path to save output audio (optional)
            
        Returns:
            tuple: (reconstructed_audio, z, mu, logs)
        """
        # Load audio with librosa - always converts to mono and resamples
        print(f"Loading audio from {audio_path}")
        import librosa
        audio, sr = librosa.load(audio_path, sr=self.sample_rate, mono=True)
        
        print(f"Audio loaded: shape={audio.shape}, sample_rate={sr}")
        
        # Create tensor - audio is already mono [T]
        audio_tensor = torch.from_numpy(audio).float().unsqueeze(0).unsqueeze(0)  # [1, 1, T]
        audio_tensor = audio_tensor.to(self.device)
        
        # Normalize to [-1, 1]
        audio_tensor = torch.clamp(audio_tensor, -1.0, 1.0)
        
        # Forward pass through model
        print("Processing through DACVAE...")
        # audio_tensor = audio_tensor[:, :, :9120]

        print('audio_tensor shape: ', audio_tensor.shape)
        out = self.model(audio_tensor, self.sample_rate)
        
        # Extract outputs
        recons_audio = out['audio'].squeeze(0).cpu().numpy()  # [1, T] or [T]
        if recons_audio.ndim == 2:
            recons_audio = recons_audio.squeeze(0)  # [T]
        z = out['z']
        mu = out['mu']
        logs = out['logs']
        print('z shape: ', z.shape)
        # Clamp output
        recons_audio = np.clip(recons_audio, -1.0, 1.0)
        
        # Save if output path provided
        if output_path:
            print(f"Saving reconstructed audio to {output_path}")
            print('shape of recons_audio: ', recons_audio.shape)
            sf.write(output_path, recons_audio, self.sample_rate)
        
        return recons_audio, z, mu, logs
    
    def get_latent_shape(self):
        """Get the shape of the latent representation for a given audio length."""
        # Create dummy input - mono audio
        dummy_audio = torch.zeros(1, 1, self.sample_rate, device=self.device)  # 1 second mono
        z, _, _ = self.model.encode(dummy_audio, self.sample_rate)
        return z.shape


def main():
    parser = argparse.ArgumentParser(description="DACVAE Audio Inference")
    parser.add_argument('--checkpoint', type=str, required=False, default="checkpoint.pt",
                        help='Path to model checkpoint')
    parser.add_argument('--config', type=str, default="./config.yml",
                        help='Path to config YAML (optional if config is in checkpoint)')
    parser.add_argument('--input', type=str, required=False, default='./output.wav',
                        help='Path to input audio file')
    parser.add_argument('--output', type=str, default='./test.wav',
                        help='Path to save output audio (default: input_reconstructed.wav)')
    parser.add_argument('--device', type=str, default='cuda',
                        choices=['cuda', 'cpu'], help='Device to run on')
    parser.add_argument('--mode', type=str, default='encode_decode',
                        choices=['encode_decode', 'encode_only', 'decode_only'],
                        help='Inference mode')
    parser.add_argument('--latent_path', type=str, default=None,
                        help='Path to save/load latent representation')
    
    args = parser.parse_args()
    
    # Initialize model
    dac = DACVAEInference(args.checkpoint, args.config, args.device)
    
    # Set default output path
    if args.output is None:
        base_name = os.path.splitext(os.path.basename(args.input))[0]
        args.output = f"{base_name}_reconstructed.wav"
    
    if args.mode == 'encode_decode':
        # Full encode-decode pipeline
        recons_audio, z, mu, logs = dac.encode_decode(args.input, args.output)
        print(f"Reconstruction complete. Output saved to {args.output}")
        print(f"Latent shape: {z.shape}")
        
        # Optionally save latent
        if args.latent_path:
            torch.save({'z': z, 'mu': mu, 'logs': logs}, args.latent_path)
            print(f"Latent representation saved to {args.latent_path}")
    
    elif args.mode == 'encode_only':
        # Encode only
        z, mu, logs = dac.encode(args.input)
        print(f"Encoding complete. Latent shape: {z.shape}")
        
        # Save latent
        if args.latent_path:
            torch.save({'z': z, 'mu': mu, 'logs': logs}, args.latent_path)
            print(f"Latent representation saved to {args.latent_path}")
        else:
            print("Warning: No latent_path specified, latent representation not saved")
    
    elif args.mode == 'decode_only':
        # Decode only
        if not args.latent_path:
            raise ValueError("latent_path must be specified for decode_only mode")
        
        print(f"Loading latent from {args.latent_path}")
        latent_data = torch.load(args.latent_path, map_location=args.device)
        z = latent_data['z'].to(args.device)
        
        audio = dac.decode(z)
        sf.write(args.output, audio, dac.sample_rate)
        print(f"Decoding complete. Output saved to {args.output}")


if __name__ == "__main__":
    main()