File size: 7,052 Bytes
2b85b0c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# This software is licensed under a **dual-license model**
# For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
# Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.

# audio_processing.py

import numpy as np
import torch
from torch.cuda.amp import autocast

def decode_audio_chunk(audio_chunk, model, device, config):
    # Use precision based on config
    use_half_precision = config.get("use_half_precision", True)
    
    # Force float16 if half precision is desired; else float32
    dtype = torch.float16 if use_half_precision else torch.float32

    # Convert audio chunk directly to the desired precision
    src_tensor = torch.tensor(audio_chunk, dtype=dtype).unsqueeze(0).to(device)

    with torch.no_grad():
        if use_half_precision:

            with autocast(dtype=torch.float16):
                encoder_outputs = model.encoder(src_tensor)
                output_sequence = model.decoder(encoder_outputs)
        else:
            encoder_outputs = model.encoder(src_tensor)
            output_sequence = model.decoder(encoder_outputs)

        # Convert output tensor back to numpy array
        decoded_outputs = output_sequence.squeeze(0).cpu().numpy()
    return decoded_outputs


def concatenate_outputs(all_decoded_outputs, num_frames):
    final_decoded_outputs = np.concatenate(all_decoded_outputs, axis=0)
    final_decoded_outputs = final_decoded_outputs[:num_frames]
    return final_decoded_outputs

def ensure_2d(final_decoded_outputs):
    if final_decoded_outputs.ndim == 3:
        final_decoded_outputs = final_decoded_outputs.reshape(-1, final_decoded_outputs.shape[-1])
    return final_decoded_outputs

def pad_audio_chunk(audio_chunk, frame_length, num_features, pad_mode='replicate'):
    """
    Pads the audio_chunk to ensure it has a number of frames equal to frame_length.
    
    Parameters:
        audio_chunk (np.array): Input audio data with shape (num_frames, num_features).
        frame_length (int): Desired number of frames.
        num_features (int): Number of features per frame.
        pad_mode (str): Type of padding to use. Options are:
                        - 'reflect': Pads using reflection.
                        - 'replicate': Pads by replicating the last frame.
    
    Returns:
        np.array: Padded audio_chunk with shape (frame_length, num_features).
    """
    if audio_chunk.shape[0] < frame_length:
        pad_length = frame_length - audio_chunk.shape[0]
        
        if pad_mode == 'reflect':
            # --- Original reflect padding method ---
            padding = np.pad(
                audio_chunk,
                pad_width=((0, pad_length), (0, 0)),
                mode='reflect'
            )
            # Using the last pad_length frames from the reflected padding
            audio_chunk = np.vstack((audio_chunk, padding[-pad_length:, :num_features]))
        
        elif pad_mode == 'replicate':
            # --- New replicate padding method ---
            # Replicate the last frame to fill the remaining frames
            last_frame = audio_chunk[-1:]  # Select the last frame (shape: (1, num_features))
            replication = np.tile(last_frame, (pad_length, 1))  # Replicate it pad_length times
            audio_chunk = np.vstack((audio_chunk, replication))
        
        else:
            raise ValueError(f"Unsupported pad_mode: {pad_mode}. Choose 'reflect' or 'replicate'.")
    
    return audio_chunk


def blend_chunks(chunk1, chunk2, overlap):
    actual_overlap = min(overlap, len(chunk1), len(chunk2))
    if actual_overlap == 0:
        return np.vstack((chunk1, chunk2))
    
    blended_chunk = np.copy(chunk1)
    for i in range(actual_overlap):
        alpha = i / actual_overlap 
        blended_chunk[-actual_overlap + i] = (1 - alpha) * chunk1[-actual_overlap + i] + alpha * chunk2[i]
        
    return np.vstack((blended_chunk, chunk2[actual_overlap:]))

def process_audio_features(audio_features, model, device, config):
    # Configuration settings
    frame_length = config['frame_size']  # Number of frames per chunk (e.g., 64)
    overlap = config.get('overlap', 32)  # Number of overlapping frames between chunks
    num_features = audio_features.shape[1]
    num_frames = audio_features.shape[0]
    all_decoded_outputs = []

    # Set model to evaluation mode
    model.eval()

    # Process chunks with the specified overlap
    start_idx = 0
    while start_idx < num_frames:
        end_idx = min(start_idx + frame_length, num_frames)

        # Select and pad chunk if needed
        audio_chunk = audio_features[start_idx:end_idx]
        audio_chunk = pad_audio_chunk(audio_chunk, frame_length, num_features)

        # 🔥 Pass config to dynamically choose precision
        decoded_outputs = decode_audio_chunk(audio_chunk, model, device, config)
        decoded_outputs = decoded_outputs[:end_idx - start_idx]

        # Blend with the last chunk if it exists
        if all_decoded_outputs:
            last_chunk = all_decoded_outputs.pop()
            blended_chunk = blend_chunks(last_chunk, decoded_outputs, overlap)
            all_decoded_outputs.append(blended_chunk)
        else:
            all_decoded_outputs.append(decoded_outputs)

        # Move start index forward by (frame_length - overlap)
        start_idx += frame_length - overlap

    # Process any remaining frames to ensure total frame count matches input
    current_length = sum(len(chunk) for chunk in all_decoded_outputs)
    if current_length < num_frames:
        remaining_frames = num_frames - current_length
        final_chunk_start = num_frames - remaining_frames
        audio_chunk = audio_features[final_chunk_start:num_frames]
        audio_chunk = pad_audio_chunk(audio_chunk, frame_length, num_features)
        decoded_outputs = decode_audio_chunk(audio_chunk, model, device, config)
        all_decoded_outputs.append(decoded_outputs[:remaining_frames])

    # Concatenate all chunks and trim to the original frame count
    final_decoded_outputs = np.concatenate(all_decoded_outputs, axis=0)[:num_frames]

    # Normalize or apply any post-processing
    final_decoded_outputs = ensure_2d(final_decoded_outputs)
    final_decoded_outputs[:, :61] /= 100  # Normalize specific columns

    # Easing effect for smooth start (fades in first 0.2 seconds)
    ease_duration_frames = min(int(0.1 * 60), final_decoded_outputs.shape[0])
    easing_factors = np.linspace(0, 1, ease_duration_frames)[:, None]
    final_decoded_outputs[:ease_duration_frames] *= easing_factors

    # Zero out unnecessary columns (optional post-processing)
    final_decoded_outputs = zero_columns(final_decoded_outputs)

    return final_decoded_outputs


def zero_columns(data):
    columns_to_zero = [0, 1, 2, 3, 4, 7, 8, 9, 10, 11, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
    modified_data = np.copy(data) 
    modified_data[:, columns_to_zero] = 0
    return modified_data