fastapi / utils /audio /processing /audio_processing.py
precison9's picture
Add Dockerized Flask application files for deployment
2b85b0c
# This software is licensed under a **dual-license model**
# For individuals and businesses earning **under $1M per year**, this software is licensed under the **MIT License**
# Businesses or organizations with **annual revenue of $1,000,000 or more** must obtain permission to use this software commercially.
# audio_processing.py
import numpy as np
import torch
from torch.cuda.amp import autocast
def decode_audio_chunk(audio_chunk, model, device, config):
# Use precision based on config
use_half_precision = config.get("use_half_precision", True)
# Force float16 if half precision is desired; else float32
dtype = torch.float16 if use_half_precision else torch.float32
# Convert audio chunk directly to the desired precision
src_tensor = torch.tensor(audio_chunk, dtype=dtype).unsqueeze(0).to(device)
with torch.no_grad():
if use_half_precision:
with autocast(dtype=torch.float16):
encoder_outputs = model.encoder(src_tensor)
output_sequence = model.decoder(encoder_outputs)
else:
encoder_outputs = model.encoder(src_tensor)
output_sequence = model.decoder(encoder_outputs)
# Convert output tensor back to numpy array
decoded_outputs = output_sequence.squeeze(0).cpu().numpy()
return decoded_outputs
def concatenate_outputs(all_decoded_outputs, num_frames):
final_decoded_outputs = np.concatenate(all_decoded_outputs, axis=0)
final_decoded_outputs = final_decoded_outputs[:num_frames]
return final_decoded_outputs
def ensure_2d(final_decoded_outputs):
if final_decoded_outputs.ndim == 3:
final_decoded_outputs = final_decoded_outputs.reshape(-1, final_decoded_outputs.shape[-1])
return final_decoded_outputs
def pad_audio_chunk(audio_chunk, frame_length, num_features, pad_mode='replicate'):
"""
Pads the audio_chunk to ensure it has a number of frames equal to frame_length.
Parameters:
audio_chunk (np.array): Input audio data with shape (num_frames, num_features).
frame_length (int): Desired number of frames.
num_features (int): Number of features per frame.
pad_mode (str): Type of padding to use. Options are:
- 'reflect': Pads using reflection.
- 'replicate': Pads by replicating the last frame.
Returns:
np.array: Padded audio_chunk with shape (frame_length, num_features).
"""
if audio_chunk.shape[0] < frame_length:
pad_length = frame_length - audio_chunk.shape[0]
if pad_mode == 'reflect':
# --- Original reflect padding method ---
padding = np.pad(
audio_chunk,
pad_width=((0, pad_length), (0, 0)),
mode='reflect'
)
# Using the last pad_length frames from the reflected padding
audio_chunk = np.vstack((audio_chunk, padding[-pad_length:, :num_features]))
elif pad_mode == 'replicate':
# --- New replicate padding method ---
# Replicate the last frame to fill the remaining frames
last_frame = audio_chunk[-1:] # Select the last frame (shape: (1, num_features))
replication = np.tile(last_frame, (pad_length, 1)) # Replicate it pad_length times
audio_chunk = np.vstack((audio_chunk, replication))
else:
raise ValueError(f"Unsupported pad_mode: {pad_mode}. Choose 'reflect' or 'replicate'.")
return audio_chunk
def blend_chunks(chunk1, chunk2, overlap):
actual_overlap = min(overlap, len(chunk1), len(chunk2))
if actual_overlap == 0:
return np.vstack((chunk1, chunk2))
blended_chunk = np.copy(chunk1)
for i in range(actual_overlap):
alpha = i / actual_overlap
blended_chunk[-actual_overlap + i] = (1 - alpha) * chunk1[-actual_overlap + i] + alpha * chunk2[i]
return np.vstack((blended_chunk, chunk2[actual_overlap:]))
def process_audio_features(audio_features, model, device, config):
# Configuration settings
frame_length = config['frame_size'] # Number of frames per chunk (e.g., 64)
overlap = config.get('overlap', 32) # Number of overlapping frames between chunks
num_features = audio_features.shape[1]
num_frames = audio_features.shape[0]
all_decoded_outputs = []
# Set model to evaluation mode
model.eval()
# Process chunks with the specified overlap
start_idx = 0
while start_idx < num_frames:
end_idx = min(start_idx + frame_length, num_frames)
# Select and pad chunk if needed
audio_chunk = audio_features[start_idx:end_idx]
audio_chunk = pad_audio_chunk(audio_chunk, frame_length, num_features)
# 🔥 Pass config to dynamically choose precision
decoded_outputs = decode_audio_chunk(audio_chunk, model, device, config)
decoded_outputs = decoded_outputs[:end_idx - start_idx]
# Blend with the last chunk if it exists
if all_decoded_outputs:
last_chunk = all_decoded_outputs.pop()
blended_chunk = blend_chunks(last_chunk, decoded_outputs, overlap)
all_decoded_outputs.append(blended_chunk)
else:
all_decoded_outputs.append(decoded_outputs)
# Move start index forward by (frame_length - overlap)
start_idx += frame_length - overlap
# Process any remaining frames to ensure total frame count matches input
current_length = sum(len(chunk) for chunk in all_decoded_outputs)
if current_length < num_frames:
remaining_frames = num_frames - current_length
final_chunk_start = num_frames - remaining_frames
audio_chunk = audio_features[final_chunk_start:num_frames]
audio_chunk = pad_audio_chunk(audio_chunk, frame_length, num_features)
decoded_outputs = decode_audio_chunk(audio_chunk, model, device, config)
all_decoded_outputs.append(decoded_outputs[:remaining_frames])
# Concatenate all chunks and trim to the original frame count
final_decoded_outputs = np.concatenate(all_decoded_outputs, axis=0)[:num_frames]
# Normalize or apply any post-processing
final_decoded_outputs = ensure_2d(final_decoded_outputs)
final_decoded_outputs[:, :61] /= 100 # Normalize specific columns
# Easing effect for smooth start (fades in first 0.2 seconds)
ease_duration_frames = min(int(0.1 * 60), final_decoded_outputs.shape[0])
easing_factors = np.linspace(0, 1, ease_duration_frames)[:, None]
final_decoded_outputs[:ease_duration_frames] *= easing_factors
# Zero out unnecessary columns (optional post-processing)
final_decoded_outputs = zero_columns(final_decoded_outputs)
return final_decoded_outputs
def zero_columns(data):
columns_to_zero = [0, 1, 2, 3, 4, 7, 8, 9, 10, 11, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
modified_data = np.copy(data)
modified_data[:, columns_to_zero] = 0
return modified_data