File size: 7,052 Bytes
2b85b0c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# This software is licensed under a **dual-license model**
# For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
# Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.
# audio_processing.py
import numpy as np
import torch
from torch.cuda.amp import autocast
def decode_audio_chunk(audio_chunk, model, device, config):
# Use precision based on config
use_half_precision = config.get("use_half_precision", True)
# Force float16 if half precision is desired; else float32
dtype = torch.float16 if use_half_precision else torch.float32
# Convert audio chunk directly to the desired precision
src_tensor = torch.tensor(audio_chunk, dtype=dtype).unsqueeze(0).to(device)
with torch.no_grad():
if use_half_precision:
with autocast(dtype=torch.float16):
encoder_outputs = model.encoder(src_tensor)
output_sequence = model.decoder(encoder_outputs)
else:
encoder_outputs = model.encoder(src_tensor)
output_sequence = model.decoder(encoder_outputs)
# Convert output tensor back to numpy array
decoded_outputs = output_sequence.squeeze(0).cpu().numpy()
return decoded_outputs
def concatenate_outputs(all_decoded_outputs, num_frames):
final_decoded_outputs = np.concatenate(all_decoded_outputs, axis=0)
final_decoded_outputs = final_decoded_outputs[:num_frames]
return final_decoded_outputs
def ensure_2d(final_decoded_outputs):
if final_decoded_outputs.ndim == 3:
final_decoded_outputs = final_decoded_outputs.reshape(-1, final_decoded_outputs.shape[-1])
return final_decoded_outputs
def pad_audio_chunk(audio_chunk, frame_length, num_features, pad_mode='replicate'):
"""
Pads the audio_chunk to ensure it has a number of frames equal to frame_length.
Parameters:
audio_chunk (np.array): Input audio data with shape (num_frames, num_features).
frame_length (int): Desired number of frames.
num_features (int): Number of features per frame.
pad_mode (str): Type of padding to use. Options are:
- 'reflect': Pads using reflection.
- 'replicate': Pads by replicating the last frame.
Returns:
np.array: Padded audio_chunk with shape (frame_length, num_features).
"""
if audio_chunk.shape[0] < frame_length:
pad_length = frame_length - audio_chunk.shape[0]
if pad_mode == 'reflect':
# --- Original reflect padding method ---
padding = np.pad(
audio_chunk,
pad_width=((0, pad_length), (0, 0)),
mode='reflect'
)
# Using the last pad_length frames from the reflected padding
audio_chunk = np.vstack((audio_chunk, padding[-pad_length:, :num_features]))
elif pad_mode == 'replicate':
# --- New replicate padding method ---
# Replicate the last frame to fill the remaining frames
last_frame = audio_chunk[-1:] # Select the last frame (shape: (1, num_features))
replication = np.tile(last_frame, (pad_length, 1)) # Replicate it pad_length times
audio_chunk = np.vstack((audio_chunk, replication))
else:
raise ValueError(f"Unsupported pad_mode: {pad_mode}. Choose 'reflect' or 'replicate'.")
return audio_chunk
def blend_chunks(chunk1, chunk2, overlap):
actual_overlap = min(overlap, len(chunk1), len(chunk2))
if actual_overlap == 0:
return np.vstack((chunk1, chunk2))
blended_chunk = np.copy(chunk1)
for i in range(actual_overlap):
alpha = i / actual_overlap
blended_chunk[-actual_overlap + i] = (1 - alpha) * chunk1[-actual_overlap + i] + alpha * chunk2[i]
return np.vstack((blended_chunk, chunk2[actual_overlap:]))
def process_audio_features(audio_features, model, device, config):
# Configuration settings
frame_length = config['frame_size'] # Number of frames per chunk (e.g., 64)
overlap = config.get('overlap', 32) # Number of overlapping frames between chunks
num_features = audio_features.shape[1]
num_frames = audio_features.shape[0]
all_decoded_outputs = []
# Set model to evaluation mode
model.eval()
# Process chunks with the specified overlap
start_idx = 0
while start_idx < num_frames:
end_idx = min(start_idx + frame_length, num_frames)
# Select and pad chunk if needed
audio_chunk = audio_features[start_idx:end_idx]
audio_chunk = pad_audio_chunk(audio_chunk, frame_length, num_features)
# 🔥 Pass config to dynamically choose precision
decoded_outputs = decode_audio_chunk(audio_chunk, model, device, config)
decoded_outputs = decoded_outputs[:end_idx - start_idx]
# Blend with the last chunk if it exists
if all_decoded_outputs:
last_chunk = all_decoded_outputs.pop()
blended_chunk = blend_chunks(last_chunk, decoded_outputs, overlap)
all_decoded_outputs.append(blended_chunk)
else:
all_decoded_outputs.append(decoded_outputs)
# Move start index forward by (frame_length - overlap)
start_idx += frame_length - overlap
# Process any remaining frames to ensure total frame count matches input
current_length = sum(len(chunk) for chunk in all_decoded_outputs)
if current_length < num_frames:
remaining_frames = num_frames - current_length
final_chunk_start = num_frames - remaining_frames
audio_chunk = audio_features[final_chunk_start:num_frames]
audio_chunk = pad_audio_chunk(audio_chunk, frame_length, num_features)
decoded_outputs = decode_audio_chunk(audio_chunk, model, device, config)
all_decoded_outputs.append(decoded_outputs[:remaining_frames])
# Concatenate all chunks and trim to the original frame count
final_decoded_outputs = np.concatenate(all_decoded_outputs, axis=0)[:num_frames]
# Normalize or apply any post-processing
final_decoded_outputs = ensure_2d(final_decoded_outputs)
final_decoded_outputs[:, :61] /= 100 # Normalize specific columns
# Easing effect for smooth start (fades in first 0.2 seconds)
ease_duration_frames = min(int(0.1 * 60), final_decoded_outputs.shape[0])
easing_factors = np.linspace(0, 1, ease_duration_frames)[:, None]
final_decoded_outputs[:ease_duration_frames] *= easing_factors
# Zero out unnecessary columns (optional post-processing)
final_decoded_outputs = zero_columns(final_decoded_outputs)
return final_decoded_outputs
def zero_columns(data):
columns_to_zero = [0, 1, 2, 3, 4, 7, 8, 9, 10, 11, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
modified_data = np.copy(data)
modified_data[:, columns_to_zero] = 0
return modified_data
|