Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter | |
| from datetime import datetime | |
| from datasets import load_dataset | |
| from sklearn.metrics import accuracy_score | |
| import os | |
| import torch | |
| from torch import nn | |
| import torch.nn.functional as F | |
| from torch.utils.data import DataLoader, TensorDataset | |
| from torchaudio import transforms | |
| from torchvision import models | |
| from .utils.evaluation import AudioEvaluationRequest | |
| from .utils.emissions import tracker, clean_emissions_data, get_space_info | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| router = APIRouter() | |
| DESCRIPTION = "Tiny_DNN" | |
| ROUTE = "/audio" | |
| async def evaluate_audio(request: AudioEvaluationRequest): | |
| """ | |
| Evaluate audio classification for rainforest sound detection. | |
| Current Model: Random Baseline | |
| - Makes random predictions from the label space (0-1) | |
| - Used as a baseline for comparison | |
| """ | |
| # Get space info | |
| username, space_url = get_space_info() | |
| # Define the label mapping | |
| LABEL_MAPPING = { | |
| "chainsaw": 0, | |
| "environment": 1 | |
| } | |
| # Load and prepare the dataset | |
| # Because the dataset is gated, we need to use the HF_TOKEN environment variable to authenticate | |
| dataset = load_dataset(request.dataset_name,token=os.getenv("HF_TOKEN")) | |
| # Split dataset | |
| train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed) | |
| test_dataset = train_test["test"] | |
| true_labels = test_dataset["label"] | |
| resampler = transforms.Resample(orig_freq=12000, new_freq=16000) | |
| mel_transform = transforms.MelSpectrogram(sample_rate=16000, n_mels=64) | |
| amplitude_to_db = transforms.AmplitudeToDB() | |
| def resize_audio(_waveform, target_length): | |
| """Resizes the audio waveform to the target length using resampling""" | |
| num_frames = _waveform.shape[-1] | |
| if num_frames != target_length: | |
| _resampler = transforms.Resample(orig_freq=num_frames, new_freq=target_length) | |
| _waveform = _resampler(_waveform) | |
| return _waveform | |
| resized_waveforms = [ | |
| resize_audio(torch.tensor(sample['audio']['array'], dtype=torch.float32).unsqueeze(0), target_length=72000) | |
| for sample in test_dataset | |
| ] | |
| waveforms, labels = [], [] | |
| for waveform, label in zip(resized_waveforms, true_labels): | |
| waveforms.append(amplitude_to_db(mel_transform(resampler(waveform)))) | |
| labels.append(label) | |
| waveforms = torch.stack(waveforms) | |
| labels = torch.tensor(labels) | |
| test_loader = DataLoader( | |
| TensorDataset(waveforms, labels), | |
| batch_size=64, | |
| shuffle=False | |
| ) | |
| class BlazeFace(nn.Module): | |
| def __init__(self, input_channels=1, use_double_block=False, activation="relu", use_optional_block=True): | |
| super(BlazeFace, self).__init__() | |
| self.activation = activation | |
| self.use_double_block = use_double_block | |
| self.use_optional_block = use_optional_block | |
| def conv_block(in_channels, out_channels, kernel_size, stride, padding): | |
| return nn.Sequential( | |
| nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding), | |
| nn.BatchNorm2d(out_channels), | |
| nn.ReLU() if activation == "relu" else nn.Sigmoid() # Apply ReLU activation (default) or Sigmoid | |
| ) | |
| def depthwise_separable_block(in_channels, out_channels, stride): | |
| return nn.Sequential( | |
| nn.Conv2d(in_channels, in_channels, kernel_size=5, stride=stride, padding=2, groups=in_channels, bias=False), | |
| nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0), | |
| nn.BatchNorm2d(out_channels), | |
| nn.ReLU() if activation == "relu" else nn.Sigmoid() | |
| ) | |
| def double_block(in_channels, filters_1, filters_2, stride): | |
| return nn.Sequential( | |
| depthwise_separable_block(in_channels, filters_1, stride), | |
| depthwise_separable_block(filters_1, filters_2, 1) | |
| ) | |
| # Define layers (first part: conv layers) | |
| self.conv1 = conv_block(input_channels, 24, kernel_size=5, stride=2, padding=2) | |
| # Define single blocks (subsequent conv blocks) | |
| self.single_blocks = nn.ModuleList([ | |
| depthwise_separable_block(24, 24, stride=1), | |
| depthwise_separable_block(24, 24, stride=1), | |
| depthwise_separable_block(24, 48, stride=2), | |
| depthwise_separable_block(48, 48, stride=1), | |
| depthwise_separable_block(48, 48, stride=1) | |
| ]) | |
| # Define double blocks if `use_double_block` is True | |
| if self.use_double_block: | |
| self.double_blocks = nn.ModuleList([ | |
| double_block(48, 24, 96, stride=2), | |
| double_block(96, 24, 96, stride=1), | |
| double_block(96, 24, 96, stride=2), | |
| double_block(96, 24, 96, stride=1), | |
| double_block(96, 24, 96, stride=2) | |
| ]) | |
| else: | |
| self.double_blocks = nn.ModuleList([ | |
| depthwise_separable_block(48, 96, stride=2), | |
| depthwise_separable_block(96, 96, stride=1), | |
| depthwise_separable_block(96, 96, stride=2), | |
| depthwise_separable_block(96, 96, stride=1), | |
| depthwise_separable_block(96, 96, stride=2) | |
| ]) | |
| # Final convolutional head | |
| self.conv_head = nn.Conv2d(96, 64, kernel_size=1, stride=1) | |
| self.bn_head = nn.BatchNorm2d(64) | |
| # Global Average Pooling | |
| self.global_avg_pooling = nn.AdaptiveAvgPool2d(1) | |
| def forward(self, x): | |
| # First conv layer | |
| x = self.conv1(x) | |
| # Apply single blocks | |
| for block in self.single_blocks: | |
| x = block(x) | |
| # Apply double blocks | |
| for block in self.double_blocks: | |
| x = block(x) | |
| # Final head | |
| x = self.conv_head(x) | |
| x = self.bn_head(x) | |
| x = F.relu(x) | |
| # Global Average Pooling and Flatten | |
| x = self.global_avg_pooling(x) | |
| x = torch.flatten(x, 1) | |
| return x | |
| class BlazeFaceModel(nn.Module): | |
| def __init__(self, input_channels, label_count, use_double_block=False, activation="relu", use_optional_block=True): | |
| super(BlazeFaceModel, self).__init__() | |
| self.blazeface_backbone = BlazeFace(input_channels=input_channels, use_double_block=use_double_block, activation=activation, use_optional_block=use_optional_block) | |
| self.fc = nn.Linear(64, label_count) | |
| def forward(self, x): | |
| features = self.blazeface_backbone(x) | |
| output = self.fc(features) | |
| return output | |
| # Example Usage | |
| model_settings = { | |
| 'spectrogram_length': 64, | |
| 'dct_coefficient_count': 481, | |
| 'label_count': 2 | |
| } | |
| # Create model | |
| model = BlazeFaceModel(input_channels=1, label_count=model_settings['label_count'], use_double_block=False, activation='relu', use_optional_block=False) | |
| model.load_state_dict(torch.load("./best_blazeface_model.pth", map_location=torch.device('cpu'))) | |
| # Start tracking emissions | |
| tracker.start() | |
| tracker.start_task("inference") | |
| #-------------------------------------------------------------------------------------------- | |
| # YOUR MODEL INFERENCE CODE HERE | |
| # Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked. | |
| #-------------------------------------------------------------------------------------------- | |
| predictions = [] | |
| with torch.inference_mode(): | |
| for data, target in test_loader: | |
| output = model(data).squeeze() | |
| pred = torch.argmax(output, dim=-1) | |
| predictions.extend(pred.tolist()) | |
| #-------------------------------------------------------------------------------------------- | |
| # YOUR MODEL INFERENCE STOPS HERE | |
| #-------------------------------------------------------------------------------------------- | |
| # Stop tracking emissions | |
| emissions_data = tracker.stop_task() | |
| # Calculate accuracy | |
| accuracy = accuracy_score(true_labels, predictions) | |
| # Prepare results dictionary | |
| results = { | |
| "username": username, | |
| "space_url": space_url, | |
| "submission_timestamp": datetime.now().isoformat(), | |
| "model_description": DESCRIPTION, | |
| "accuracy": float(accuracy), | |
| "energy_consumed_wh": emissions_data.energy_consumed * 1000, | |
| "emissions_gco2eq": emissions_data.emissions * 1000, | |
| "emissions_data": clean_emissions_data(emissions_data), | |
| "api_route": ROUTE, | |
| "dataset_config": { | |
| "dataset_name": request.dataset_name, | |
| "test_size": request.test_size, | |
| "test_seed": request.test_seed | |
| } | |
| } | |
| return results |