Spaces:
Sleeping
Sleeping
File size: 4,683 Bytes
2e96e8a 618c3b6 2e96e8a c2db44f 2e96e8a 89b0e89 c2db44f 89b0e89 c2db44f 89b0e89 2e96e8a c2db44f 2e96e8a f5f0a79 2e96e8a 89b0e89 2e96e8a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
from fastapi import APIRouter
from datetime import datetime
from datasets import load_dataset
from sklearn.metrics import accuracy_score
import os
import torch
from torch.utils.data import DataLoader, TensorDataset
from torchaudio import transforms
#from torchvision import models
#import onnxruntime as ort # Add ONNX Runtime
from openvino.runtime import Core
import numpy as np
from .utils.evaluation import AudioEvaluationRequest
from .utils.emissions import tracker, clean_emissions_data, get_space_info
from dotenv import load_dotenv
load_dotenv()
router = APIRouter()
DESCRIPTION = "Tiny_DNN_new_split"
ROUTE = "/audio"
torch.set_num_threads(4)
torch.set_num_interop_threads(2)
@router.post(ROUTE, tags=["Audio Task"], description=DESCRIPTION)
async def evaluate_audio(request: AudioEvaluationRequest):
# Get space info
username, space_url = get_space_info()
# Define the label mapping
LABEL_MAPPING = {
"chainsaw": 0,
"environment": 1
}
# Load and prepare the dataset
dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN"))
## old split
# train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed)
# test_dataset = train_test["test"]
## new split
# Split dataset
train_test = dataset["train"]
test_dataset = dataset["test"]
true_labels = test_dataset["label"]
resampler = transforms.Resample(orig_freq=12000, new_freq=16000)
mel_transform = transforms.MelSpectrogram(sample_rate=16000, n_mels=64)
amplitude_to_db = transforms.AmplitudeToDB()
# def audio_array_is_not_empty(sample): # remove empty samples like mashpi_2020_8d4d84bd-3b24-4af1-82e2-168b678774f3_64-67.wav
# return sample["audio"]["array"].shape[0] != 0
# test_dataset = test_dataset.filter(audio_array_is_not_empty)
def replace_empty_audio_array(batch): # fill empty samples like mashpi_2020_8d4d84bd-3b24-4af1-82e2-168b678774f3_64-67.wav with one 0
for audio in batch["audio"]:
if audio["array"].shape[0] == 0:
audio["array"] = np.array([0])
return batch
test_dataset = test_dataset.map(replace_empty_audio_array, batched=True)
def resize_audio(_waveform, target_length):
num_frames = _waveform.shape[-1]
if num_frames != target_length:
_resampler = transforms.Resample(orig_freq=num_frames, new_freq=target_length)
_waveform = _resampler(_waveform)
return _waveform
resized_waveforms = [
resize_audio(torch.tensor(sample['audio']['array'], dtype=torch.float32).unsqueeze(0), target_length=72000)
for sample in test_dataset
]
waveforms, labels = [], []
for waveform, label in zip(resized_waveforms, true_labels):
waveforms.append(amplitude_to_db(mel_transform(resampler(waveform))))
labels.append(label)
waveforms = torch.stack(waveforms)
labels = torch.tensor(labels)
test_loader = DataLoader(
TensorDataset(waveforms, labels),
batch_size=128,
shuffle=False
#pin_memory=True,
#num_workers=4
)
# Load Openvino model
core = Core()
model_path = "./openvino_model/model.xml"
compiled_model = core.compile_model(model=model_path, device_name="CPU")
input_layer = compiled_model.input(0)
output_layer = compiled_model.output(0)
# Start tracking emissions
tracker.start()
tracker.start_task("inference")
# Openvino inference
predictions = []
for data, target in test_loader:
inputs = data.numpy() # Convert tensor to numpy
inputs = inputs.reshape((-1, 1, 64, 481))
output = compiled_model([inputs])[output_layer]
predicted = np.argmax(output, axis=1)
predictions.extend(predicted.tolist())
# Stop tracking emissions
emissions_data = tracker.stop_task()
# Calculate accuracy
accuracy = accuracy_score(true_labels, predictions)
# Prepare results dictionary
results = {
"username": username,
"space_url": space_url,
"submission_timestamp": datetime.now().isoformat(),
"model_description": DESCRIPTION,
"accuracy": float(accuracy),
"energy_consumed_wh": emissions_data.energy_consumed * 1000,
"emissions_gco2eq": emissions_data.emissions * 1000,
"emissions_data": clean_emissions_data(emissions_data),
"api_route": ROUTE,
"dataset_config": {
"dataset_name": request.dataset_name,
"test_size": request.test_size,
"test_seed": request.test_seed
}
}
return results |