submission / tasks /audio.py
AB739's picture
Update tasks/audio.py
079338f verified
raw
history blame
9.4 kB
from fastapi import APIRouter
from datetime import datetime
from datasets import load_dataset
from sklearn.metrics import accuracy_score
import os
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from torchaudio import transforms
from torchvision import models
from .utils.evaluation import AudioEvaluationRequest
from .utils.emissions import tracker, clean_emissions_data, get_space_info
from dotenv import load_dotenv
load_dotenv()
router = APIRouter()
DESCRIPTION = "Tiny_DNN"
ROUTE = "/audio"
@router.post(ROUTE, tags=["Audio Task"],
description=DESCRIPTION)
async def evaluate_audio(request: AudioEvaluationRequest):
"""
Evaluate audio classification for rainforest sound detection.
Current Model: Random Baseline
- Makes random predictions from the label space (0-1)
- Used as a baseline for comparison
"""
# Get space info
username, space_url = get_space_info()
# Define the label mapping
LABEL_MAPPING = {
"chainsaw": 0,
"environment": 1
}
# Load and prepare the dataset
# Because the dataset is gated, we need to use the HF_TOKEN environment variable to authenticate
dataset = load_dataset(request.dataset_name,token=os.getenv("HF_TOKEN"))
# Split dataset
train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed)
test_dataset = train_test["test"]
true_labels = test_dataset["label"]
resampler = transforms.Resample(orig_freq=12000, new_freq=16000)
mel_transform = transforms.MelSpectrogram(sample_rate=16000, n_mels=64)
amplitude_to_db = transforms.AmplitudeToDB()
def resize_audio(_waveform, target_length):
"""Resizes the audio waveform to the target length using resampling"""
num_frames = _waveform.shape[-1]
if num_frames != target_length:
_resampler = transforms.Resample(orig_freq=num_frames, new_freq=target_length)
_waveform = _resampler(_waveform)
return _waveform
resized_waveforms = [
resize_audio(torch.tensor(sample['audio']['array'], dtype=torch.float32).unsqueeze(0), target_length=72000)
for sample in test_dataset
]
waveforms, labels = [], []
for waveform, label in zip(resized_waveforms, true_labels):
waveforms.append(amplitude_to_db(mel_transform(resampler(waveform))))
labels.append(label)
waveforms = torch.stack(waveforms)
labels = torch.tensor(labels)
test_loader = DataLoader(
TensorDataset(waveforms, labels),
batch_size=64,
shuffle=False
)
class BlazeFace(nn.Module):
def __init__(self, input_channels=1, use_double_block=False, activation="relu", use_optional_block=True):
super(BlazeFace, self).__init__()
self.activation = activation
self.use_double_block = use_double_block
self.use_optional_block = use_optional_block
def conv_block(in_channels, out_channels, kernel_size, stride, padding):
return nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding),
nn.BatchNorm2d(out_channels),
nn.ReLU() if activation == "relu" else nn.Sigmoid() # Apply ReLU activation (default) or Sigmoid
)
def depthwise_separable_block(in_channels, out_channels, stride):
return nn.Sequential(
nn.Conv2d(in_channels, in_channels, kernel_size=5, stride=stride, padding=2, groups=in_channels, bias=False),
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0),
nn.BatchNorm2d(out_channels),
nn.ReLU() if activation == "relu" else nn.Sigmoid()
)
def double_block(in_channels, filters_1, filters_2, stride):
return nn.Sequential(
depthwise_separable_block(in_channels, filters_1, stride),
depthwise_separable_block(filters_1, filters_2, 1)
)
# Define layers (first part: conv layers)
self.conv1 = conv_block(input_channels, 24, kernel_size=5, stride=2, padding=2)
# Define single blocks (subsequent conv blocks)
self.single_blocks = nn.ModuleList([
depthwise_separable_block(24, 24, stride=1),
depthwise_separable_block(24, 24, stride=1),
depthwise_separable_block(24, 48, stride=2),
depthwise_separable_block(48, 48, stride=1),
depthwise_separable_block(48, 48, stride=1)
])
# Define double blocks if `use_double_block` is True
if self.use_double_block:
self.double_blocks = nn.ModuleList([
double_block(48, 24, 96, stride=2),
double_block(96, 24, 96, stride=1),
double_block(96, 24, 96, stride=2),
double_block(96, 24, 96, stride=1),
double_block(96, 24, 96, stride=2)
])
else:
self.double_blocks = nn.ModuleList([
depthwise_separable_block(48, 96, stride=2),
depthwise_separable_block(96, 96, stride=1),
depthwise_separable_block(96, 96, stride=2),
depthwise_separable_block(96, 96, stride=1),
depthwise_separable_block(96, 96, stride=2)
])
# Final convolutional head
self.conv_head = nn.Conv2d(96, 64, kernel_size=1, stride=1)
self.bn_head = nn.BatchNorm2d(64)
# Global Average Pooling
self.global_avg_pooling = nn.AdaptiveAvgPool2d(1)
def forward(self, x):
# First conv layer
x = self.conv1(x)
# Apply single blocks
for block in self.single_blocks:
x = block(x)
# Apply double blocks
for block in self.double_blocks:
x = block(x)
# Final head
x = self.conv_head(x)
x = self.bn_head(x)
x = F.relu(x)
# Global Average Pooling and Flatten
x = self.global_avg_pooling(x)
x = torch.flatten(x, 1)
return x
class BlazeFaceModel(nn.Module):
def __init__(self, input_channels, label_count, use_double_block=False, activation="relu", use_optional_block=True):
super(BlazeFaceModel, self).__init__()
self.blazeface_backbone = BlazeFace(input_channels=input_channels, use_double_block=use_double_block, activation=activation, use_optional_block=use_optional_block)
self.fc = nn.Linear(64, label_count)
def forward(self, x):
features = self.blazeface_backbone(x)
output = self.fc(features)
return output
# Example Usage
model_settings = {
'spectrogram_length': 64,
'dct_coefficient_count': 481,
'label_count': 2
}
# Create model
model = BlazeFaceModel(input_channels=1, label_count=model_settings['label_count'], use_double_block=False, activation='relu', use_optional_block=False)
model.load_state_dict(torch.load("./best_blazeface_model.pth", map_location=torch.device('cpu')))
# Start tracking emissions
tracker.start()
tracker.start_task("inference")
#--------------------------------------------------------------------------------------------
# YOUR MODEL INFERENCE CODE HERE
# Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked.
#--------------------------------------------------------------------------------------------
predictions = []
with torch.inference_mode():
for data, target in test_loader:
output = model(data).squeeze()
pred = torch.argmax(output, dim=-1)
predictions.extend(pred.tolist())
#--------------------------------------------------------------------------------------------
# YOUR MODEL INFERENCE STOPS HERE
#--------------------------------------------------------------------------------------------
# Stop tracking emissions
emissions_data = tracker.stop_task()
# Calculate accuracy
accuracy = accuracy_score(true_labels, predictions)
# Prepare results dictionary
results = {
"username": username,
"space_url": space_url,
"submission_timestamp": datetime.now().isoformat(),
"model_description": DESCRIPTION,
"accuracy": float(accuracy),
"energy_consumed_wh": emissions_data.energy_consumed * 1000,
"emissions_gco2eq": emissions_data.emissions * 1000,
"emissions_data": clean_emissions_data(emissions_data),
"api_route": ROUTE,
"dataset_config": {
"dataset_name": request.dataset_name,
"test_size": request.test_size,
"test_seed": request.test_seed
}
}
return results