NOS-Innovation commited on
Commit
bcb3d72
·
verified ·
1 Parent(s): d9bf64b

Upload folder using huggingface_hub

Browse files
Files changed (12) hide show
  1. .gitattributes +1 -0
  2. dataset_preprocess.py +231 -0
  3. features.py +150 -0
  4. inference.py +515 -0
  5. model/model.h5 +3 -0
  6. model/model.npy +3 -0
  7. params.py +36 -0
  8. requirements.txt +6 -0
  9. test/audio.wav +3 -0
  10. train.py +514 -0
  11. yamnet.py +164 -0
  12. yamnet_test.py +56 -0
.gitattributes CHANGED
@@ -33,3 +33,4 @@ saved_model/**/* filter=lfs diff=lfs merge=lfs -text
33
  *.zip filter=lfs diff=lfs merge=lfs -text
34
  *.zst filter=lfs diff=lfs merge=lfs -text
35
  *tfevents* filter=lfs diff=lfs merge=lfs -text
 
 
33
  *.zip filter=lfs diff=lfs merge=lfs -text
34
  *.zst filter=lfs diff=lfs merge=lfs -text
35
  *tfevents* filter=lfs diff=lfs merge=lfs -text
36
+ test/audio.wav filter=lfs diff=lfs merge=lfs -text
dataset_preprocess.py ADDED
@@ -0,0 +1,231 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Audio Converter
3
+
4
+ This script scans a directory for audio files and converts them to WAV format.
5
+ It only processes audio files and skips all other file types.
6
+
7
+ Usage:
8
+ python audio_converter.py --input_dir /path/to/audio/files --output_dir /path/to/output
9
+ """
10
+
11
+ import os
12
+ import sys
13
+ import argparse
14
+ from pathlib import Path
15
+ from typing import List, Tuple
16
+ import subprocess
17
+
18
+
19
+ def print_info(message):
20
+ print(f"INFO: {message}")
21
+
22
+ def print_error(message):
23
+ print(f"ERROR: {message}")
24
+
25
+ def print_debug(message):
26
+ if VERBOSE:
27
+ print(f"DEBUG: {message}")
28
+
29
+ VERBOSE = False
30
+
31
+ # Audio formats that can be converted
32
+ AUDIO_FORMATS = {
33
+ '.mp3', '.m4a', '.aac', '.flac', '.ogg', '.wma', '.aiff', '.ape', '.opus'
34
+ }
35
+
36
+
37
+ def check_dependencies() -> bool:
38
+ """
39
+ Check if required dependencies are installed.
40
+
41
+ Returns:
42
+ bool: True if dependencies are met, False otherwise
43
+ """
44
+ # Check for ffmpeg
45
+ try:
46
+ subprocess.run(
47
+ ["ffmpeg", "-version"],
48
+ stdout=subprocess.PIPE,
49
+ stderr=subprocess.PIPE
50
+ )
51
+ print_info("ffmpeg is installed.")
52
+ return True
53
+ except FileNotFoundError:
54
+ print_error("ffmpeg is not installed. Please install it before running this script.")
55
+ return False
56
+
57
+
58
+ def scan_directory(directory: str) -> Tuple[List[Path], List[Path]]:
59
+ """
60
+ Scan directory for audio files.
61
+
62
+ Args:
63
+ directory: Path to the directory to scan
64
+
65
+ Returns:
66
+ Tuple containing lists of audio files and files to skip
67
+ """
68
+ audio_files = []
69
+ skip_files = []
70
+
71
+ dir_path = Path(directory)
72
+ if not dir_path.exists():
73
+ raise FileNotFoundError(f"Directory not found: {directory}")
74
+
75
+ for file_path in dir_path.glob('**/*'):
76
+ if file_path.is_file():
77
+ file_ext = file_path.suffix.lower()
78
+
79
+ if file_ext in AUDIO_FORMATS:
80
+ audio_files.append(file_path)
81
+ elif file_ext == '.wav':
82
+ # Skip existing WAV files
83
+ print_debug(f"Skipping existing WAV file: {file_path}")
84
+ skip_files.append(file_path)
85
+ else:
86
+ # Skip non-audio files
87
+ print_debug(f"Skipping non-audio file: {file_path}")
88
+ skip_files.append(file_path)
89
+
90
+ print_info(f"Found {len(audio_files)} audio files to convert")
91
+ print_info(f"Skipping {len(skip_files)} files (WAV or non-audio)")
92
+
93
+ return audio_files, skip_files
94
+
95
+
96
+ def convert_audio_to_wav(input_file: Path, output_file: Path) -> bool:
97
+ """
98
+ Convert audio file to WAV format using ffmpeg.
99
+
100
+ Args:
101
+ input_file: Path to input audio file
102
+ output_file: Path to output WAV file
103
+
104
+ Returns:
105
+ bool: True if conversion was successful, False otherwise
106
+ """
107
+ try:
108
+ # Ensure output directory exists
109
+ output_file.parent.mkdir(parents=True, exist_ok=True)
110
+
111
+ cmd = [
112
+ "ffmpeg",
113
+ "-y", # Overwrite output file if it exists
114
+ "-i", str(input_file), # Input file
115
+ "-acodec", "pcm_s16le", # Output codec (16-bit PCM)
116
+ "-ar", "44100", # Sample rate (44.1kHz)
117
+ "-ac", "1", # Mono audio (1 channel)
118
+ str(output_file) # Output file
119
+ ]
120
+
121
+ process = subprocess.run(
122
+ cmd,
123
+ stdout=subprocess.PIPE,
124
+ stderr=subprocess.PIPE
125
+ )
126
+
127
+ if process.returncode != 0:
128
+ print_error(f"Error converting {input_file}: {process.stderr.decode()}")
129
+ return False
130
+
131
+ print_info(f"Successfully converted {input_file} to WAV")
132
+ return True
133
+
134
+ except Exception as e:
135
+ print_error(f"Error converting {input_file}: {str(e)}")
136
+ return False
137
+
138
+
139
+ def process_files(audio_files: List[Path], input_dir: str, output_dir: str,
140
+ preserve_structure: bool = True) -> Tuple[int, int]:
141
+ """
142
+ Process all identified audio files for conversion.
143
+
144
+ Args:
145
+ audio_files: List of audio files to convert
146
+ input_dir: Input directory path
147
+ output_dir: Output directory path
148
+
149
+ Returns:
150
+ Tuple of successful conversions, failed conversions
151
+ """
152
+ input_base = Path(input_dir)
153
+ output_base = Path(output_dir)
154
+
155
+ success_count = 0
156
+ failure_count = 0
157
+
158
+ # Process audio files
159
+ for audio_file in audio_files:
160
+ if preserve_structure:
161
+ rel_path = audio_file.relative_to(input_base)
162
+ output_file = output_base / rel_path.with_suffix('.wav')
163
+ else:
164
+
165
+ output_file = output_base / f"{audio_file.stem}.wav"
166
+
167
+ if convert_audio_to_wav(audio_file, output_file):
168
+ success_count += 1
169
+ else:
170
+ failure_count += 1
171
+
172
+ return success_count, failure_count
173
+
174
+
175
+ def parse_arguments() -> argparse.Namespace:
176
+ """Parse command-line arguments."""
177
+ parser = argparse.ArgumentParser(description="Convert audio files to WAV format")
178
+ parser.add_argument('--input_dir', type=str, required=True,
179
+ help='Directory containing files to convert')
180
+ parser.add_argument('--output_dir', type=str, required=True,
181
+ help='Directory for output WAV files')
182
+ parser.add_argument('--flat', action='store_true',
183
+ help='Don\'t preserve directory structure')
184
+ parser.add_argument('--verbose', '-v', action='store_true',
185
+ help='Enable verbose output')
186
+
187
+ return parser.parse_args()
188
+
189
+
190
+ def main():
191
+ """Main function to run the script."""
192
+ global VERBOSE
193
+
194
+ try:
195
+ args = parse_arguments()
196
+
197
+ VERBOSE = args.verbose
198
+
199
+ print_info(f"Input directory: {args.input_dir}")
200
+ print_info(f"Output directory: {args.output_dir}")
201
+
202
+ if not check_dependencies():
203
+ print_error("Missing dependencies. Please install required packages.")
204
+ sys.exit(1)
205
+
206
+ # Create output directory if it doesn't exist
207
+ os.makedirs(args.output_dir, exist_ok=True)
208
+
209
+ audio_files, skip_files = scan_directory(args.input_dir)
210
+
211
+ # Process files
212
+ preserve_structure = not args.flat
213
+ success_count, failure_count = process_files(
214
+ audio_files,
215
+ args.input_dir,
216
+ args.output_dir,
217
+ preserve_structure
218
+ )
219
+
220
+ # Print summary
221
+ print_info(f"Conversion complete!")
222
+ print_info(f"Successfully converted: {success_count} files")
223
+ print_info(f"Failed conversions: {failure_count} files")
224
+
225
+ except Exception as e:
226
+ print_error(f"Error during execution: {str(e)}")
227
+ sys.exit(1)
228
+
229
+
230
+ if __name__ == "__main__":
231
+ main()
features.py ADDED
@@ -0,0 +1,150 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Feature computation for YAMNet."""
2
+
3
+ import numpy as np
4
+ import tensorflow as tf
5
+
6
+
7
+ def waveform_to_log_mel_spectrogram_patches(waveform, params):
8
+ """Compute log mel spectrogram patches of a 1-D waveform."""
9
+ with tf.name_scope('log_mel_features'):
10
+ # waveform has shape [<# samples>]
11
+
12
+ # Convert waveform into spectrogram using a Short-Time Fourier Transform.
13
+ # Note that tf.signal.stft() uses a periodic Hann window by default.
14
+ window_length_samples = int(
15
+ round(params.sample_rate * params.stft_window_seconds))
16
+ hop_length_samples = int(
17
+ round(params.sample_rate * params.stft_hop_seconds))
18
+ fft_length = 2 ** int(np.ceil(np.log(window_length_samples) / np.log(2.0)))
19
+ num_spectrogram_bins = fft_length // 2 + 1
20
+ if params.tflite_compatible:
21
+ magnitude_spectrogram = _tflite_stft_magnitude(
22
+ signal=waveform,
23
+ frame_length=window_length_samples,
24
+ frame_step=hop_length_samples,
25
+ fft_length=fft_length)
26
+ else:
27
+ magnitude_spectrogram = tf.abs(tf.signal.stft(
28
+ signals=waveform,
29
+ frame_length=window_length_samples,
30
+ frame_step=hop_length_samples,
31
+ fft_length=fft_length))
32
+ # magnitude_spectrogram has shape [<# STFT frames>, num_spectrogram_bins]
33
+
34
+ # Convert spectrogram into log mel spectrogram.
35
+ linear_to_mel_weight_matrix = tf.signal.linear_to_mel_weight_matrix(
36
+ num_mel_bins=params.mel_bands,
37
+ num_spectrogram_bins=num_spectrogram_bins,
38
+ sample_rate=params.sample_rate,
39
+ lower_edge_hertz=params.mel_min_hz,
40
+ upper_edge_hertz=params.mel_max_hz)
41
+ mel_spectrogram = tf.matmul(
42
+ magnitude_spectrogram, linear_to_mel_weight_matrix)
43
+ log_mel_spectrogram = tf.math.log(mel_spectrogram + params.log_offset)
44
+ # log_mel_spectrogram has shape [<# STFT frames>, params.mel_bands]
45
+
46
+ # Frame spectrogram (shape [<# STFT frames>, params.mel_bands]) into patches
47
+ # (the input examples). Only complete frames are emitted, so if there is
48
+ # less than params.patch_window_seconds of waveform then nothing is emitted
49
+ # (to avoid this, zero-pad before processing).
50
+ spectrogram_hop_length_samples = int(
51
+ round(params.sample_rate * params.stft_hop_seconds))
52
+ spectrogram_sample_rate = params.sample_rate / spectrogram_hop_length_samples
53
+ patch_window_length_samples = int(
54
+ round(spectrogram_sample_rate * params.patch_window_seconds))
55
+ patch_hop_length_samples = int(
56
+ round(spectrogram_sample_rate * params.patch_hop_seconds))
57
+ features = tf.signal.frame(
58
+ signal=log_mel_spectrogram,
59
+ frame_length=patch_window_length_samples,
60
+ frame_step=patch_hop_length_samples,
61
+ axis=0)
62
+ # features has shape [<# patches>, <# STFT frames in an patch>, params.mel_bands]
63
+
64
+ return log_mel_spectrogram, features
65
+
66
+
67
+ def pad_waveform(waveform, params):
68
+ """Pads waveform with silence if needed to get an integral number of patches."""
69
+ # In order to produce one patch of log mel spectrogram input to YAMNet, we
70
+ # need at least one patch window length of waveform plus enough extra samples
71
+ # to complete the final STFT analysis window.
72
+ min_waveform_seconds = (
73
+ params.patch_window_seconds +
74
+ params.stft_window_seconds - params.stft_hop_seconds)
75
+ min_num_samples = tf.cast(min_waveform_seconds * params.sample_rate, tf.int32)
76
+ num_samples = tf.shape(waveform)[0]
77
+ num_padding_samples = tf.maximum(0, min_num_samples - num_samples)
78
+
79
+ # In addition, there might be enough waveform for one or more additional
80
+ # patches formed by hopping forward. If there are more samples than one patch,
81
+ # round up to an integral number of hops.
82
+ num_samples = tf.maximum(num_samples, min_num_samples)
83
+ num_samples_after_first_patch = num_samples - min_num_samples
84
+ hop_samples = tf.cast(params.patch_hop_seconds * params.sample_rate, tf.int32)
85
+ num_hops_after_first_patch = tf.cast(tf.math.ceil(
86
+ tf.cast(num_samples_after_first_patch, tf.float32) /
87
+ tf.cast(hop_samples, tf.float32)), tf.int32)
88
+ num_padding_samples += (
89
+ hop_samples * num_hops_after_first_patch - num_samples_after_first_patch)
90
+
91
+ padded_waveform = tf.pad(waveform, [[0, num_padding_samples]],
92
+ mode='CONSTANT', constant_values=0.0)
93
+ return padded_waveform
94
+
95
+
96
+ def _tflite_stft_magnitude(signal, frame_length, frame_step, fft_length):
97
+ """TF-Lite-compatible version of tf.abs(tf.signal.stft())."""
98
+ def _hann_window():
99
+ return tf.reshape(
100
+ tf.constant(
101
+ (0.5 - 0.5 * np.cos(2 * np.pi * np.arange(0, 1.0, 1.0 / frame_length))
102
+ ).astype(np.float32),
103
+ name='hann_window'), [1, frame_length])
104
+
105
+ def _dft_matrix(dft_length):
106
+ """Calculate the full DFT matrix in NumPy."""
107
+ # See https://en.wikipedia.org/wiki/DFT_matrix
108
+ omega = (0 + 1j) * 2.0 * np.pi / float(dft_length)
109
+ # Don't include 1/sqrt(N) scaling, tf.signal.rfft doesn't apply it.
110
+ return np.exp(omega * np.outer(np.arange(dft_length), np.arange(dft_length)))
111
+
112
+ def _rdft(framed_signal, fft_length):
113
+ """Implement real-input Discrete Fourier Transform by matmul."""
114
+ # We are right-multiplying by the DFT matrix, and we are keeping only the
115
+ # first half ("positive frequencies"). So discard the second half of rows,
116
+ # but transpose the array for right-multiplication. The DFT matrix is
117
+ # symmetric, so we could have done it more directly, but this reflects our
118
+ # intention better.
119
+ complex_dft_matrix_kept_values = _dft_matrix(fft_length)[:(
120
+ fft_length // 2 + 1), :].transpose()
121
+ real_dft_matrix = tf.constant(
122
+ np.real(complex_dft_matrix_kept_values).astype(np.float32),
123
+ name='real_dft_matrix')
124
+ imag_dft_matrix = tf.constant(
125
+ np.imag(complex_dft_matrix_kept_values).astype(np.float32),
126
+ name='imaginary_dft_matrix')
127
+ signal_frame_length = tf.shape(framed_signal)[-1]
128
+ half_pad = (fft_length - signal_frame_length) // 2
129
+ padded_frames = tf.pad(
130
+ framed_signal,
131
+ [
132
+ # Don't add any padding in the frame dimension.
133
+ [0, 0],
134
+ # Pad before and after the signal within each frame.
135
+ [half_pad, fft_length - signal_frame_length - half_pad]
136
+ ],
137
+ mode='CONSTANT',
138
+ constant_values=0.0)
139
+ real_stft = tf.matmul(padded_frames, real_dft_matrix)
140
+ imag_stft = tf.matmul(padded_frames, imag_dft_matrix)
141
+ return real_stft, imag_stft
142
+
143
+ def _complex_abs(real, imag):
144
+ return tf.sqrt(tf.add(real * real, imag * imag))
145
+
146
+ framed_signal = tf.signal.frame(signal, frame_length, frame_step)
147
+ windowed_signal = framed_signal * _hann_window()
148
+ real_stft, imag_stft = _rdft(windowed_signal, fft_length)
149
+ stft_magnitude = _complex_abs(real_stft, imag_stft)
150
+ return stft_magnitude
inference.py ADDED
@@ -0,0 +1,515 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Audio Classification using YAMNet and Custom Models
3
+ A streamlined tool for classifying audio using pre-trained and custom models.
4
+ """
5
+
6
+ import os
7
+ import argparse
8
+ import logging
9
+ from pathlib import Path
10
+ from typing import Dict, List, Tuple, Optional, Union, Any
11
+ from dataclasses import dataclass, field
12
+
13
+ import numpy as np
14
+ import pandas as pd
15
+ import librosa
16
+ import resampy
17
+ import soundfile as sf
18
+ import tensorflow as tf
19
+ from tensorflow.keras.models import load_model
20
+
21
+
22
+ logging.basicConfig(
23
+ level=logging.INFO,
24
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
25
+ )
26
+ logger = logging.getLogger(__name__)
27
+
28
+ # Suppress TensorFlow warnings
29
+ tf.get_logger().setLevel(logging.ERROR)
30
+ os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
31
+
32
+
33
+ @dataclass(frozen=True)
34
+ class YAMNetParams:
35
+ """Parameters for YAMNet model."""
36
+ sample_rate: float = 16000.0
37
+ stft_window_seconds: float = 0.025
38
+ stft_hop_seconds: float = 0.010
39
+ mel_bands: int = 64
40
+ mel_min_hz: float = 125.0
41
+ mel_max_hz: float = 7500.0
42
+ log_offset: float = 0.001
43
+ patch_window_seconds: float = 0.96
44
+ patch_hop_seconds: float = 0.48
45
+ num_classes: int = 521
46
+ conv_padding: str = 'same'
47
+ batchnorm_center: bool = True
48
+ batchnorm_scale: bool = False
49
+ batchnorm_epsilon: float = 1e-4
50
+ classifier_activation: str = 'sigmoid'
51
+ tflite_compatible: bool = True
52
+
53
+ @property
54
+ def patch_frames(self) -> int:
55
+ """Calculate number of frames per patch."""
56
+ return int(round(self.patch_window_seconds / self.stft_hop_seconds))
57
+
58
+ @property
59
+ def patch_bands(self) -> int:
60
+ """Get number of mel bands."""
61
+ return self.mel_bands
62
+
63
+
64
+ @dataclass
65
+ class Config:
66
+ """Configuration for models and processing parameters."""
67
+
68
+ yamnet_model_path: str
69
+ yamnet_classes_path: str
70
+ model_path: Optional[str] = None
71
+ custom_classes_path: Optional[str] = None
72
+ output_dir: str = "results"
73
+ output_file: str = "classification.txt"
74
+
75
+ # Processing parameters
76
+ window_length: int = 10 # seconds
77
+ hop_length: int = 1 # seconds
78
+ custom_weight_factor: float = 5.0
79
+ top_k: int = 10 # Number of top predictions to keep
80
+
81
+ # Exclude certain classes
82
+ excluded_classes: List[str] = field(default_factory=lambda: ["Vehicle"])
83
+
84
+ def __post_init__(self):
85
+ """Convert paths to absolute paths and ensure output directory exists."""
86
+ self.yamnet_model_path = os.path.abspath(self.yamnet_model_path)
87
+ self.yamnet_classes_path = os.path.abspath(self.yamnet_classes_path)
88
+
89
+ if self.model_path:
90
+ self.model_path = os.path.abspath(self.model_path)
91
+
92
+ if self.custom_classes_path:
93
+ self.custom_classes_path = os.path.abspath(self.custom_classes_path)
94
+
95
+ # Create output directory
96
+ os.makedirs(Path(self.output_dir), exist_ok=True)
97
+
98
+ @property
99
+ def output_path(self) -> str:
100
+ """Get full path to output file."""
101
+ return os.path.join(self.output_dir, self.output_file)
102
+
103
+ @classmethod
104
+ def from_args(cls, args: argparse.Namespace) -> 'Config':
105
+ """Create config from command line arguments."""
106
+ output_dir = os.path.dirname(args.output) or "results"
107
+ output_file = os.path.basename(args.output) or "classification.txt"
108
+
109
+ return cls(
110
+ yamnet_model_path=args.yamnet_model,
111
+ yamnet_classes_path=args.yamnet_classes,
112
+ model_path=args.model if os.path.exists(args.model) else None,
113
+ custom_classes_path=args.custom_classes if os.path.exists(args.custom_classes) else None,
114
+ output_dir=output_dir,
115
+ output_file=output_file,
116
+ window_length=args.window,
117
+ hop_length=args.hop,
118
+ custom_weight_factor=args.weight
119
+ )
120
+
121
+
122
+ class AudioClassifier:
123
+ """Audio classification using YAMNet and custom models."""
124
+
125
+ def __init__(self, config: Config):
126
+ """Initialize classifier with configuration."""
127
+ self.config = config
128
+ self.params = YAMNetParams()
129
+
130
+ # Initialize models
131
+ self.yamnet_model = None
132
+ self.model = None
133
+ self.yamnet_classes = []
134
+ self.custom_classes = []
135
+
136
+ # Load models
137
+ self._load_models()
138
+
139
+ def _load_models(self) -> None:
140
+ """Load YAMNet and custom models."""
141
+ # Load YAMNet model
142
+ try:
143
+ from yamnet import yamnet_frames_model, class_names
144
+
145
+ logger.info(f"Loading YAMNet model from {self.config.yamnet_model_path}")
146
+ self.yamnet_model = yamnet_frames_model(self.params)
147
+ self.yamnet_model.load_weights(self.config.yamnet_model_path)
148
+
149
+ logger.info(f"Loading YAMNet classes from {self.config.yamnet_classes_path}")
150
+ self.yamnet_classes = class_names(self.config.yamnet_classes_path)
151
+
152
+ except ImportError:
153
+ logger.error("YAMNet module not found. Please install it or provide correct path.")
154
+ raise
155
+ except Exception as e:
156
+ logger.error(f"Failed to load YAMNet model: {e}")
157
+ raise
158
+
159
+ # Load custom model if available
160
+ if self.config.model_path:
161
+ try:
162
+ logger.info(f"Loading custom model from {self.config.model_path}")
163
+ self.model = load_model(self.config.model_path)
164
+
165
+ if self.config.custom_classes_path:
166
+ logger.info(f"Loading custom classes from {self.config.custom_classes_path}")
167
+ self.custom_classes = np.load(self.config.custom_classes_path, allow_pickle=True)
168
+
169
+ except Exception as e:
170
+ logger.warning(f"Failed to load custom model: {e}")
171
+ logger.warning("Continuing with YAMNet model only.")
172
+ self.model = None
173
+ self.custom_classes = []
174
+
175
+ def classify_file(self, audio_path: str) -> Dict[str, Any]:
176
+ """Classify audio file and return results."""
177
+ logger.info(f"Processing audio file: {audio_path}")
178
+
179
+ # Load audio
180
+ waveform, sr = self._load_audio(audio_path)
181
+
182
+ # Process audio segments
183
+ logger.info("Processing audio segments...")
184
+ segments_results = self._process_audio_segments(waveform, sr)
185
+
186
+ # Aggregate results
187
+ logger.info("Aggregating results...")
188
+ final_results = self._aggregate_results(segments_results)
189
+
190
+ # Save results
191
+ if self.config.output_path:
192
+ self._save_results(final_results)
193
+
194
+ return final_results
195
+
196
+ def _load_audio(self, file_path: str) -> Tuple[np.ndarray, int]:
197
+ """Load and preprocess audio file."""
198
+
199
+ if not os.path.exists(file_path):
200
+ raise FileNotFoundError(f"Audio file not found: {file_path}")
201
+
202
+ # Load audio data
203
+ logger.info(f"Loading audio from {file_path}")
204
+ wav_data, sr = sf.read(file_path, dtype=np.int16)
205
+
206
+ # Convert to float32 in range [-1.0, 1.0]
207
+ waveform = wav_data / 32768.0
208
+ waveform = waveform.astype('float32')
209
+
210
+ # Convert stereo to mono if needed
211
+ if len(waveform.shape) > 1:
212
+ logger.info("Converting stereo audio to mono")
213
+ waveform = np.mean(waveform, axis=1)
214
+
215
+ # Resample if needed
216
+ if sr != self.params.sample_rate:
217
+ logger.info(f"Resampling audio from {sr}Hz to {self.params.sample_rate}Hz")
218
+ waveform = resampy.resample(waveform, sr, self.params.sample_rate)
219
+ sr = int(self.params.sample_rate)
220
+
221
+ return waveform, sr
222
+
223
+ def _process_audio_segments(self, waveform: np.ndarray, sr: int) -> List[Dict[str, Any]]:
224
+ """Process audio in segments."""
225
+ segment_length_samples = int(sr * self.config.window_length)
226
+ hop_length_samples = int(sr * self.config.hop_length)
227
+
228
+ if segment_length_samples <= 0:
229
+ raise ValueError(f"Invalid segment length: {self.config.window_length} seconds")
230
+
231
+ segments_results = []
232
+
233
+ # Process each segment
234
+ total_segments = max(1, (len(waveform) - segment_length_samples + hop_length_samples) // hop_length_samples)
235
+ for i in range(0, len(waveform) - segment_length_samples + 1, hop_length_samples):
236
+ segment_idx = i // hop_length_samples + 1
237
+ logger.debug(f"Processing segment {segment_idx}/{total_segments}")
238
+
239
+ end_idx = min(i + segment_length_samples, len(waveform))
240
+ window = waveform[i:end_idx]
241
+
242
+ # Get YAMNet predictions
243
+ yamnet_predictions = self._get_yamnet_predictions(window)
244
+
245
+ # Get custom model predictions if available
246
+ custom_predictions = None
247
+ if self.model is not None:
248
+ custom_predictions = self._get_custom_predictions(window)
249
+
250
+ # Combine predictions
251
+ combined_results = self._combine_predictions(yamnet_predictions, custom_predictions)
252
+
253
+ # Store results
254
+ segment_result = {
255
+ 'yamnet_predictions': yamnet_predictions,
256
+ 'custom_predictions': custom_predictions,
257
+ 'combined_predictions': combined_results
258
+ }
259
+
260
+ segments_results.append(segment_result)
261
+
262
+ return segments_results
263
+
264
+ def _get_yamnet_predictions(self, audio_segment: np.ndarray) -> Dict[str, float]:
265
+ """Get YAMNet predictions for an audio segment."""
266
+ try:
267
+ scores, embeddings, spectrogram = self.yamnet_model(audio_segment)
268
+ prediction = np.mean(scores, axis=0)
269
+
270
+ # Get top predictions
271
+ top_indices = np.argsort(prediction)[::-1][:self.config.top_k]
272
+ top_labels = [self.yamnet_classes[i] for i in top_indices]
273
+ top_scores = prediction[top_indices]
274
+
275
+ return {label: float(score) for label, score in zip(top_labels, top_scores)}
276
+
277
+ except Exception as e:
278
+ logger.error(f"Error in YAMNet prediction: {e}")
279
+ return {}
280
+
281
+ def _get_custom_predictions(self, audio_segment: np.ndarray) -> Dict[str, float]:
282
+ """Get custom model predictions for an audio segment."""
283
+ try:
284
+ # Get YAMNet embeddings first
285
+ embeddings = self.yamnet_model(audio_segment)[1]
286
+
287
+ # Reshape embeddings for custom model
288
+ embeddings_reshaped = np.reshape(embeddings, (embeddings.shape[0], -1))
289
+
290
+ # Get predictions from custom model
291
+ predictions = self.model.predict(embeddings_reshaped, verbose=0)
292
+
293
+ # Calculate mean prediction over time
294
+ mean_predictions = np.mean(predictions, axis=0)
295
+
296
+ # Get top predictions
297
+ top_indices = np.argsort(mean_predictions)[::-1][:self.config.top_k]
298
+
299
+ # Check if custom classes are available
300
+ if len(self.custom_classes) > 0:
301
+ top_labels = [self.custom_classes[i] for i in top_indices]
302
+ else:
303
+ # Use numeric indices as labels if no class names are available
304
+ top_labels = [f"Class_{i}" for i in top_indices]
305
+
306
+ top_scores = mean_predictions[top_indices]
307
+
308
+ # Normalize scores
309
+ total_score = np.sum(top_scores)
310
+ if total_score > 0:
311
+ top_scores = top_scores / total_score
312
+
313
+ return {label: float(score) for label, score in zip(top_labels, top_scores)}
314
+
315
+ except Exception as e:
316
+ logger.error(f"Error in custom model prediction: {e}")
317
+ return {}
318
+
319
+ def _combine_predictions(
320
+ self,
321
+ yamnet_predictions: Dict[str, float],
322
+ custom_predictions: Optional[Dict[str, float]]
323
+ ) -> Dict[str, float]:
324
+ """Combine predictions from different models."""
325
+ combined = {}
326
+
327
+ # Add custom predictions with weighting
328
+ if custom_predictions:
329
+ for label, score in custom_predictions.items():
330
+ combined[label] = score * self.config.custom_weight_factor
331
+
332
+ # Add YAMNet predictions if not already present or if higher score
333
+ for label, score in yamnet_predictions.items():
334
+ if label not in self.config.excluded_classes:
335
+ if label not in combined or score > combined[label]:
336
+ combined[label] = score
337
+
338
+ return combined
339
+
340
+ def _aggregate_results(self, segments_results: List[Dict[str, Any]]) -> Dict[str, Any]:
341
+ """Aggregate results across all segments."""
342
+ # Initialize aggregated predictions
343
+ aggregated_predictions = {}
344
+
345
+ # Collect all combined predictions, keeping maximum score per label
346
+ for segment in segments_results:
347
+ for label, score in segment['combined_predictions'].items():
348
+ if label in aggregated_predictions:
349
+ aggregated_predictions[label] = max(aggregated_predictions[label], score)
350
+ else:
351
+ aggregated_predictions[label] = score
352
+
353
+ # Process results
354
+ if aggregated_predictions:
355
+ # Get top predictions
356
+ sorted_predictions = sorted(
357
+ aggregated_predictions.items(),
358
+ key=lambda x: x[1],
359
+ reverse=True
360
+ )[:self.config.top_k]
361
+
362
+ # Create a new dictionary with only the top predictions
363
+ top_predictions = {label: score for label, score in sorted_predictions}
364
+
365
+ # Normalize scores
366
+ total_score = sum(top_predictions.values())
367
+ if total_score > 0:
368
+ normalized_predictions = {
369
+ label: score / total_score
370
+ for label, score in top_predictions.items()
371
+ }
372
+ else:
373
+ # Default to equal probabilities if all scores are 0
374
+ normalized_predictions = {
375
+ label: 1.0 / len(top_predictions) if len(top_predictions) > 0 else 0.0
376
+ for label in top_predictions
377
+ }
378
+
379
+ # Find dominant label
380
+ dominant_label, dominant_score = max(normalized_predictions.items(), key=lambda x: x[1])
381
+ dominant_score_percentage = round(dominant_score * 100)
382
+
383
+ # Replace original predictions with normalized ones
384
+ aggregated_predictions = normalized_predictions
385
+ else:
386
+ dominant_label = "Unknown"
387
+ dominant_score = 0
388
+ dominant_score_percentage = 0
389
+
390
+ return {
391
+ 'aggregated_predictions': aggregated_predictions,
392
+ 'dominant_label': dominant_label,
393
+ 'dominant_score': dominant_score,
394
+ 'dominant_score_percentage': dominant_score_percentage
395
+ }
396
+
397
+ def _save_results(self, results: Dict[str, Any]) -> None:
398
+ """Save classification results to file."""
399
+ try:
400
+ with open(self.config.output_path, 'w') as file:
401
+ file.write("Audio Classification Results\n")
402
+ file.write("=========================\n\n")
403
+ file.write(f"Primary Classification: {results['dominant_label']} ({results['dominant_score_percentage']}%)\n\n")
404
+
405
+ # Add detailed breakdown
406
+ file.write("Classification Details:\n")
407
+ file.write("-----------------\n")
408
+ sorted_predictions = sorted(
409
+ results['aggregated_predictions'].items(),
410
+ key=lambda x: x[1],
411
+ reverse=True
412
+ )
413
+
414
+ for label, score in sorted_predictions:
415
+ percentage = round(score * 100)
416
+ file.write(f"{label}: {percentage}%\n")
417
+
418
+ logger.info(f"Results saved to {self.config.output_path}")
419
+
420
+ except Exception as e:
421
+ logger.error(f"Error saving results: {e}")
422
+ raise
423
+
424
+
425
+ def main():
426
+ """Main function to run audio classification."""
427
+ parser = argparse.ArgumentParser(
428
+ description='Audio classification using YAMNet and custom models',
429
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter
430
+ )
431
+
432
+ # Required arguments
433
+ parser.add_argument('audio_file', type=str, help='Path to audio file for classification')
434
+
435
+ # Model paths
436
+ parser.add_argument('--yamnet_model', type=str, default='yamnet/yamnet.h5',
437
+ help='Path to YAMNet model weights')
438
+ parser.add_argument('--yamnet_classes', type=str, default='yamnet/yamnet_class_map.csv',
439
+ help='Path to YAMNet class names')
440
+ parser.add_argument('--model', type=str, default='model/model.h5',
441
+ help='Path to custom model (optional)')
442
+ parser.add_argument('--custom_classes', type=str, default='model/model.npy',
443
+ help='Path to custom class names (optional)')
444
+
445
+ # Processing parameters
446
+ parser.add_argument('--window', type=int, default=10,
447
+ help='Window length in seconds')
448
+ parser.add_argument('--hop', type=int, default=1,
449
+ help='Hop length in seconds')
450
+ parser.add_argument('--weight', type=float, default=5.0,
451
+ help='Weighting factor for custom model predictions')
452
+
453
+ # Output options
454
+ parser.add_argument('--output', type=str, default='results/classification.txt',
455
+ help='Path to output file')
456
+
457
+ # Logging options
458
+ parser.add_argument('--verbose', action='store_true',
459
+ help='Enable verbose output')
460
+ parser.add_argument('--debug', action='store_true',
461
+ help='Enable debug logging')
462
+
463
+ args = parser.parse_args()
464
+
465
+ # Configure logging
466
+ if args.debug:
467
+ logger.setLevel(logging.DEBUG)
468
+ elif args.verbose:
469
+ logger.setLevel(logging.INFO)
470
+ else:
471
+ logger.setLevel(logging.WARNING)
472
+
473
+ try:
474
+ # Create configuration
475
+ config = Config.from_args(args)
476
+
477
+ # Create classifier
478
+ classifier = AudioClassifier(config)
479
+
480
+ # Process audio file
481
+ results = classifier.classify_file(args.audio_file)
482
+
483
+ # Print results
484
+ print("\nAudio Classification Results")
485
+ print("=========================")
486
+ print(f"\nPrimary Classification: {results['dominant_label']} ({results['dominant_score_percentage']}%)")
487
+
488
+ if args.verbose:
489
+ print("\nTop 10 Predictions:")
490
+ print("-----------------")
491
+ sorted_predictions = sorted(
492
+ results['aggregated_predictions'].items(),
493
+ key=lambda x: x[1],
494
+ reverse=True
495
+ )
496
+
497
+ for label, score in sorted_predictions:
498
+ percentage = round(score * 100)
499
+ print(f"{label}: {percentage}%")
500
+
501
+ print(f"\nFull results saved to: {config.output_path}")
502
+
503
+ except Exception as e:
504
+ logger.error(f"Error: {e}")
505
+ if args.debug:
506
+ import traceback
507
+ traceback.print_exc()
508
+ return 1
509
+
510
+ return 0
511
+
512
+
513
+ if __name__ == '__main__':
514
+ import sys
515
+ sys.exit(main())
model/model.h5 ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:fad065a5b7abb72dfe41e9fe1d5bb238c43b174559a9f94d62da217d41f44b6b
3
+ size 12671536
model/model.npy ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:0139d6fdecc7e44160ddbe67127e9b24760687be87ca6a030b9ffbccc2dbe126
3
+ size 640
params.py ADDED
@@ -0,0 +1,36 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Hyperparameters for YAMNet."""
2
+
3
+ from dataclasses import dataclass
4
+
5
+ # The following hyperparameters (except patch_hop_seconds) were used to train YAMNet,
6
+ # so expect some variability in performance if you change these. The patch hop can
7
+ # be changed arbitrarily: a smaller hop should give you more patches from the same
8
+ # clip and possibly better performance at a larger computational cost.
9
+ @dataclass(frozen=True) # Instances of this class are immutable.
10
+ class Params:
11
+ sample_rate: float = 16000.0
12
+ stft_window_seconds: float = 0.025
13
+ stft_hop_seconds: float = 0.010
14
+ mel_bands: int = 64
15
+ mel_min_hz: float = 125.0
16
+ mel_max_hz: float = 7500.0
17
+ log_offset: float = 0.001
18
+ patch_window_seconds: float = 0.96
19
+ patch_hop_seconds: float = 0.48
20
+
21
+ @property
22
+ def patch_frames(self):
23
+ return int(round(self.patch_window_seconds / self.stft_hop_seconds))
24
+
25
+ @property
26
+ def patch_bands(self):
27
+ return self.mel_bands
28
+
29
+ num_classes: int = 521
30
+ conv_padding: str = 'same'
31
+ batchnorm_center: bool = True
32
+ batchnorm_scale: bool = False
33
+ batchnorm_epsilon: float = 1e-4
34
+ classifier_activation: str = 'sigmoid'
35
+
36
+ tflite_compatible: bool = True
requirements.txt ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ librosa==0.11.0
2
+ numpy==1.26.4
3
+ pandas==2.2.3
4
+ tensorflow==2.9.0
5
+ resampy==0.4.3
6
+ ffmpeg
test/audio.wav ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:ebe25cec891496f3cdee7a8160dffe92f29fee75b2c1ac4424d7922350abfe98
3
+ size 1920044
train.py ADDED
@@ -0,0 +1,514 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Audio Classification System
3
+
4
+ This module trains a neural network model on audio data using YAMNet embeddings.
5
+ It extracts features from audio files and trains a classifier to recognize audio classes.
6
+
7
+ Usage:
8
+ python main.py --data_path <path_to_data> --model_name <model_name>
9
+
10
+ """
11
+
12
+ import os
13
+ import sys
14
+ import argparse
15
+ import logging
16
+ from pathlib import Path
17
+ from typing import Tuple, List, Dict, Optional, Any
18
+
19
+ import numpy as np
20
+ import pandas as pd
21
+ import tensorflow as tf
22
+ import librosa
23
+ from tqdm import tqdm
24
+ from sklearn.preprocessing import LabelBinarizer
25
+ from sklearn.utils import shuffle
26
+
27
+
28
+ logging.basicConfig(
29
+ level=logging.INFO,
30
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
31
+ handlers=[
32
+ logging.StreamHandler()
33
+ ]
34
+ )
35
+ logger = logging.getLogger(__name__)
36
+
37
+ # Default configuration
38
+ DEFAULT_CONFIG = {
39
+ 'yamnet_path': 'yamnet/yamnet.h5',
40
+ 'classes_path': 'yamnet/yamnet_class_map.csv',
41
+ 'sample_rate': 16000,
42
+ 'epochs': 100,
43
+ 'batch_size': 32,
44
+ 'learning_rate': 0.001,
45
+ 'num_hidden': 1024,
46
+ 'hidden_layer_size': 512,
47
+ 'num_extra_layers': 1,
48
+ 'dropout_rate': 0.3,
49
+ 'regularization': 0.01,
50
+ 'patience': 10,
51
+ 'validation_split': 0.2,
52
+ 'model_folder': 'model'
53
+ }
54
+
55
+
56
+ class Configuration:
57
+ """Handles configuration for the audio classification system."""
58
+
59
+ def __init__(self, custom_config: Optional[Dict[str, Any]] = None):
60
+ """
61
+ Initialize configuration handler.
62
+
63
+ Args:
64
+ custom_config: Custom configuration to override defaults
65
+ """
66
+ self.config = DEFAULT_CONFIG.copy()
67
+ if custom_config:
68
+ self.config.update(custom_config)
69
+
70
+ def get(self, key: str, default: Any = None) -> Any:
71
+ return self.config.get(key, default)
72
+
73
+ def set(self, key: str, value: Any) -> None:
74
+ self.config[key] = value
75
+
76
+ def __getitem__(self, key: str) -> Any:
77
+ return self.config[key]
78
+
79
+
80
+ class ClassMap:
81
+ """Handles audio class mapping and persistence."""
82
+
83
+ def __init__(self, config: Configuration):
84
+ """
85
+ Initialize class map.
86
+
87
+ Args:
88
+ config: Configuration handler
89
+ """
90
+ self.config = config
91
+ self.classes_path = config['classes_path']
92
+ self._ensure_classes_file_exists()
93
+
94
+ def _ensure_classes_file_exists(self) -> None:
95
+ """Ensure the classes mapping file exists."""
96
+ if not os.path.exists(self.classes_path):
97
+ logger.info(f"Class map file not found: {self.classes_path}. Creating a new one.")
98
+
99
+ pd.DataFrame({"display_name": [], "index": [], "mid": []}).to_csv(
100
+ self.classes_path, index=False
101
+ )
102
+
103
+ def load_yamnet_classes(self) -> np.ndarray:
104
+ """Load classes from YAMNet class map CSV file."""
105
+ try:
106
+ df = pd.read_csv(self.classes_path)
107
+ return df["display_name"].values
108
+ except Exception as e:
109
+ logger.error(f"Error loading classes: {str(e)}")
110
+ return np.array([])
111
+
112
+ def update_classes(self, data_path: str) -> List[str]:
113
+ """
114
+ Update classes based on directory structure.
115
+
116
+ Args:
117
+ data_path: Path to data directory
118
+
119
+ Returns:
120
+ List of all class names
121
+ """
122
+ try:
123
+ # Load existing classes mapping
124
+ existing_classes_df = pd.read_csv(self.classes_path)
125
+ existing_classes_set = set(existing_classes_df['display_name'])
126
+
127
+ # Find new classes
128
+ new_classes = []
129
+ for cls in sorted(os.listdir(data_path)):
130
+ class_path = os.path.join(data_path, cls)
131
+ if os.path.isdir(class_path) and cls not in existing_classes_set:
132
+ new_classes.append(cls)
133
+
134
+ # Append new classes to the existing classes dataframe
135
+ if new_classes:
136
+ logger.info(f"Adding {len(new_classes)} new classes: {', '.join(new_classes)}")
137
+ new_classes_df = pd.DataFrame({
138
+ 'display_name': new_classes,
139
+ 'index': [''] * len(new_classes),
140
+ 'mid': [''] * len(new_classes)
141
+ })
142
+ updated_classes_df = pd.concat([existing_classes_df, new_classes_df], ignore_index=True)
143
+ updated_classes_df.to_csv(self.classes_path, index=False)
144
+
145
+ # Return all classes from data directory
146
+ return [cls for cls in sorted(os.listdir(data_path))
147
+ if os.path.isdir(os.path.join(data_path, cls))]
148
+
149
+ except Exception as e:
150
+ logger.error(f"Error updating classes: {str(e)}")
151
+ raise
152
+
153
+
154
+ class FeatureExtractor:
155
+ """Extracts features from audio files using YAMNet."""
156
+
157
+ def __init__(self, config: Configuration):
158
+ """
159
+ Initialize feature extractor.
160
+
161
+ Args:
162
+ config: Configuration handler
163
+ """
164
+ self.config = config
165
+ self.yamnet_model = self._load_yamnet_model()
166
+
167
+ def _load_yamnet_model(self):
168
+ """Load YAMNet model for feature extraction."""
169
+ try:
170
+ logger.info("Loading YAMNet model...")
171
+ # Import here to avoid circular imports
172
+ from yamnet import yamnet_frames_model
173
+ from params import Params
174
+
175
+ model = yamnet_frames_model(Params())
176
+ model.load_weights(self.config['yamnet_path'])
177
+ return model
178
+ except Exception as e:
179
+ logger.error(f"Error loading YAMNet model: {str(e)}")
180
+ raise
181
+
182
+ def extract_features(self, audio_path: str) -> np.ndarray:
183
+ """
184
+ Extract features from an audio file using YAMNet.
185
+
186
+ Args:
187
+ audio_path: Path to audio file
188
+
189
+ Returns:
190
+ Numpy array of extracted features
191
+ """
192
+ try:
193
+ # Load audio file
194
+ wav, _ = librosa.load(
195
+ audio_path,
196
+ sr=self.config['sample_rate'],
197
+ mono=True
198
+ )
199
+ wav = wav.astype(np.float32)
200
+
201
+ if len(wav) == 0:
202
+ logger.warning(f"Warning: Empty audio file: {audio_path}")
203
+ return np.array([])
204
+
205
+ # Extract embeddings using YAMNet
206
+ _, embeddings, _ = self.yamnet_model(wav)
207
+ return embeddings.numpy()
208
+
209
+ except Exception as e:
210
+ logger.error(f"Error extracting features from {audio_path}: {str(e)}")
211
+ return np.array([])
212
+
213
+
214
+ class DatasetLoader:
215
+ """Creates a dataset from audio files."""
216
+
217
+ def __init__(self, config: Configuration, feature_extractor: FeatureExtractor):
218
+ """
219
+ Initialize dataset creator.
220
+
221
+ Args:
222
+ config: Configuration handler
223
+ feature_extractor: Feature extractor
224
+ """
225
+ self.config = config
226
+ self.feature_extractor = feature_extractor
227
+
228
+ def create_dataset(self, data_path: str, classes: List[str]) -> Tuple[np.ndarray, np.ndarray]:
229
+ """
230
+ Create a dataset from audio files in the specified path.
231
+
232
+ Args:
233
+ data_path: Path to the directory containing audio files organized in class folders
234
+ classes: List of class names
235
+
236
+ Returns:
237
+ samples: Numpy array of audio features
238
+ labels: Numpy array of corresponding labels
239
+ """
240
+ samples, labels = [], []
241
+
242
+ for cls in classes:
243
+ class_path = os.path.join(data_path, cls)
244
+ if not os.path.isdir(class_path):
245
+ continue
246
+
247
+ logger.info(f"Processing class: {cls}")
248
+ audio_files = os.listdir(class_path)
249
+
250
+ for sound in tqdm(audio_files, desc=f"Processing {cls}"):
251
+ audio_path = os.path.join(class_path, sound)
252
+ embeddings = self.feature_extractor.extract_features(audio_path)
253
+
254
+ if len(embeddings) == 0:
255
+ continue
256
+
257
+ # Store each embedding frame with its label
258
+ for embedding in embeddings:
259
+ samples.append(embedding)
260
+ labels.append(cls)
261
+
262
+ # Convert to numpy arrays
263
+ if not samples:
264
+ error_msg = "No valid audio samples were processed!"
265
+ logger.error(error_msg)
266
+ raise ValueError(error_msg)
267
+
268
+ samples = np.asarray(samples)
269
+ labels = np.asarray(labels)
270
+
271
+ logger.info(f"Created dataset with {len(samples)} samples across {len(set(labels))} classes")
272
+ return samples, labels
273
+
274
+
275
+ class ModelBuilder:
276
+ """Builds and trains neural network models for audio classification."""
277
+
278
+ def __init__(self, config: Configuration):
279
+ """
280
+ Initialize model builder.
281
+
282
+ Args:
283
+ config: Configuration handler
284
+ """
285
+ self.config = config
286
+
287
+ def build_model(self, num_classes: int) -> tf.keras.Model:
288
+ """
289
+ Build a neural network model for audio classification.
290
+
291
+ Args:
292
+ num_classes: Number of output classes
293
+
294
+ Returns:
295
+ Keras Model object
296
+ """
297
+ # Input layer (YAMNet embeddings are 1024-dimensional)
298
+ inputs = tf.keras.layers.Input(shape=(1024,))
299
+
300
+ # First hidden layer with L2 regularization
301
+ x = tf.keras.layers.Dense(
302
+ self.config['num_hidden'],
303
+ activation='relu',
304
+ kernel_regularizer=tf.keras.regularizers.l2(self.config['regularization'])
305
+ )(inputs)
306
+ x = tf.keras.layers.BatchNormalization()(x)
307
+ x = tf.keras.layers.Dropout(self.config['dropout_rate'])(x)
308
+
309
+ # Additional hidden layers
310
+ for i in range(self.config['num_extra_layers']):
311
+ layer_size = self.config['hidden_layer_size'] // (i+1)
312
+ x = tf.keras.layers.Dense(
313
+ layer_size,
314
+ activation='relu',
315
+ kernel_regularizer=tf.keras.regularizers.l2(self.config['regularization'])
316
+ )(x)
317
+ x = tf.keras.layers.BatchNormalization()(x)
318
+ x = tf.keras.layers.Dropout(self.config['dropout_rate'])(x)
319
+
320
+ # Output layer
321
+ outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x)
322
+
323
+ # Create and return model
324
+ model = tf.keras.Model(inputs=inputs, outputs=outputs)
325
+ return model
326
+
327
+ def _create_callbacks(self, model_path: str) -> List[tf.keras.callbacks.Callback]:
328
+ """
329
+ Create callbacks for model training.
330
+
331
+ Args:
332
+ model_path: Path to save the model
333
+
334
+ Returns:
335
+ List of callbacks
336
+ """
337
+ # Create tensorboard callback
338
+ log_dir = Path(f"logs/{os.path.basename(model_path)}")
339
+ log_dir.mkdir(parents=True, exist_ok=True)
340
+
341
+ tensorboard = tf.keras.callbacks.TensorBoard(
342
+ log_dir=log_dir,
343
+ histogram_freq=1
344
+ )
345
+
346
+ # Early stopping callback
347
+ early_stopping = tf.keras.callbacks.EarlyStopping(
348
+ monitor='val_accuracy',
349
+ patience=self.config['patience'],
350
+ restore_best_weights=True,
351
+ verbose=1
352
+ )
353
+
354
+ # Learning rate reduction callback
355
+ reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
356
+ monitor='val_loss',
357
+ factor=0.5,
358
+ patience=5,
359
+ min_lr=0.00001,
360
+ verbose=1
361
+ )
362
+
363
+ return [early_stopping, reduce_lr, tensorboard]
364
+
365
+ def train_model(self, X: np.ndarray, y: np.ndarray, model_name: str) -> Tuple[tf.keras.Model, LabelBinarizer]:
366
+ """
367
+ Train a model on the provided data.
368
+
369
+ Args:
370
+ X: Input features
371
+ y: Target labels
372
+ model_name: Name of the model
373
+
374
+ Returns:
375
+ Tuple of (trained model, label encoder)
376
+ """
377
+ # Encode the labels (one-hot encoding)
378
+ encoder = LabelBinarizer()
379
+ encoded_labels = encoder.fit_transform(y)
380
+ num_classes = len(encoder.classes_)
381
+
382
+ logger.info(f"Training model with {num_classes} classes: {', '.join(encoder.classes_)}")
383
+
384
+ # Create model
385
+ model = self.build_model(num_classes=num_classes)
386
+
387
+ # Print model summary
388
+ model.summary()
389
+
390
+ # Compile model
391
+ optimizer = tf.keras.optimizers.Adam(learning_rate=self.config['learning_rate'])
392
+ model.compile(
393
+ optimizer=optimizer,
394
+ loss=tf.keras.losses.CategoricalCrossentropy(),
395
+ metrics=['accuracy']
396
+ )
397
+
398
+ model_folder = os.path.join(self.config['model_folder'])
399
+ os.makedirs(model_folder, exist_ok=True)
400
+
401
+ model_path = os.path.join(model_folder, model_name)
402
+
403
+ callbacks = self._create_callbacks(model_path)
404
+
405
+ # Train the model
406
+ history = model.fit(
407
+ X, encoded_labels,
408
+ epochs=self.config['epochs'],
409
+ batch_size=self.config['batch_size'],
410
+ validation_split=self.config['validation_split'],
411
+ callbacks=callbacks,
412
+ verbose=1
413
+ )
414
+
415
+ # Save the model and class names
416
+ model.save(f"{model_path}.h5")
417
+ np.save(f"{model_path}_classes.npy", encoder.classes_)
418
+
419
+ # Save training history
420
+ hist_df = pd.DataFrame(history.history)
421
+ hist_df.to_csv(f"{model_path}_history.csv", index=False)
422
+
423
+ logger.info(f"Model saved as {model_path}.h5")
424
+ logger.info(f"Class names saved as {model_path}_classes.npy")
425
+
426
+ return model, encoder
427
+
428
+
429
+ def parse_arguments() -> argparse.Namespace:
430
+ """Parse command-line arguments."""
431
+ parser = argparse.ArgumentParser(description="Train an audio classification model")
432
+ parser.add_argument('--data_path', type=str, required=True,
433
+ help='Path to the directory containing audio files')
434
+ parser.add_argument('--model_name', type=str, required=True,
435
+ help='Name for the saved model')
436
+ parser.add_argument('--config', type=str,
437
+ help='Path to config JSON file (optional)')
438
+ parser.add_argument('--epochs', type=int, default=DEFAULT_CONFIG['epochs'],
439
+ help='Number of training epochs')
440
+ parser.add_argument('--batch_size', type=int, default=DEFAULT_CONFIG['batch_size'],
441
+ help='Batch size for training')
442
+ parser.add_argument('--learning_rate', type=float, default=DEFAULT_CONFIG['learning_rate'],
443
+ help='Initial learning rate')
444
+ parser.add_argument('--model_folder', type=str, default=DEFAULT_CONFIG['model_folder'],
445
+ help='Folder to save the model')
446
+
447
+ return parser.parse_args()
448
+
449
+
450
+ def load_custom_config(config_path: Optional[str]) -> Dict[str, Any]:
451
+ """Load custom configuration from a JSON file."""
452
+ if not config_path:
453
+ return {}
454
+
455
+ try:
456
+ import json
457
+ with open(config_path, 'r') as f:
458
+ return json.load(f)
459
+ except Exception as e:
460
+ logger.error(f"Error loading config file: {str(e)}")
461
+ return {}
462
+
463
+
464
+ def main():
465
+ """Main function to run the script."""
466
+ try:
467
+
468
+ args = parse_arguments()
469
+
470
+ # Load custom configuration
471
+ custom_config = load_custom_config(args.config)
472
+
473
+
474
+ custom_config.update({
475
+ 'epochs': args.epochs,
476
+ 'batch_size': args.batch_size,
477
+ 'learning_rate': args.learning_rate,
478
+ 'model_folder': args.model_folder
479
+ })
480
+
481
+ # Create configuration handler
482
+ config = Configuration(custom_config)
483
+
484
+ logger.info(f"Data path: {args.data_path}")
485
+ logger.info(f"Model name: {args.model_name}")
486
+ logger.info(f"Model folder: {config['model_folder']}")
487
+
488
+ # Initialize components
489
+ class_map = ClassMap(config)
490
+ feature_extractor = FeatureExtractor(config)
491
+ dataset_creator = DatasetLoader(config, feature_extractor)
492
+ model_builder = ModelBuilder(config)
493
+
494
+ # Update classes and get class list
495
+ classes = class_map.update_classes(args.data_path)
496
+
497
+ # Create dataset
498
+ samples, labels = dataset_creator.create_dataset(args.data_path, classes)
499
+
500
+ # Shuffle the data for better training
501
+ samples, labels = shuffle(samples, labels, random_state=42)
502
+
503
+ # Train model
504
+ model, encoder = model_builder.train_model(samples, labels, args.model_name)
505
+
506
+ logger.info("Training completed successfully!")
507
+
508
+ except Exception as e:
509
+ logger.error(f"Error during execution: {str(e)}", exc_info=True)
510
+ sys.exit(1)
511
+
512
+
513
+ if __name__ == "__main__":
514
+ main()
yamnet.py ADDED
@@ -0,0 +1,164 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Core model definition of YAMNet."""
2
+ from tensorflow.keras.models import load_model
3
+
4
+ import csv
5
+
6
+ import numpy as np
7
+ import tensorflow as tf
8
+ from tensorflow.keras import Model, layers
9
+
10
+ import features as features_lib
11
+
12
+
13
+ def _batch_norm(name, params):
14
+ def _bn_layer(layer_input):
15
+ return layers.BatchNormalization(
16
+ name=name,
17
+ center=params.batchnorm_center,
18
+ scale=params.batchnorm_scale,
19
+ epsilon=params.batchnorm_epsilon)(layer_input)
20
+
21
+ return _bn_layer
22
+
23
+
24
+ def _conv(name, kernel, stride, filters, params):
25
+ def _conv_layer(layer_input):
26
+ output = layers.Conv2D(name='{}/conv'.format(name),
27
+ filters=filters,
28
+ kernel_size=kernel,
29
+ strides=stride,
30
+ padding=params.conv_padding,
31
+ use_bias=False,
32
+ activation=None)(layer_input)
33
+ output = _batch_norm('{}/conv/bn'.format(name), params)(output)
34
+ output = layers.ReLU(name='{}/relu'.format(name))(output)
35
+ return output
36
+
37
+ return _conv_layer
38
+
39
+
40
+ def _separable_conv(name, kernel, stride, filters, params):
41
+ def _separable_conv_layer(layer_input):
42
+ output = layers.DepthwiseConv2D(name='{}/depthwise_conv'.format(name),
43
+ kernel_size=kernel,
44
+ strides=stride,
45
+ depth_multiplier=1,
46
+ padding=params.conv_padding,
47
+ use_bias=False,
48
+ activation=None)(layer_input)
49
+ output = _batch_norm('{}/depthwise_conv/bn'.format(name), params)(output)
50
+ output = layers.ReLU(name='{}/depthwise_conv/relu'.format(name))(output)
51
+ output = layers.Conv2D(name='{}/pointwise_conv'.format(name),
52
+ filters=filters,
53
+ kernel_size=(1, 1),
54
+ strides=1,
55
+ padding=params.conv_padding,
56
+ use_bias=False,
57
+ activation=None)(output)
58
+ output = _batch_norm('{}/pointwise_conv/bn'.format(name), params)(output)
59
+ output = layers.ReLU(name='{}/pointwise_conv/relu'.format(name))(output)
60
+ return output
61
+
62
+ return _separable_conv_layer
63
+
64
+
65
+ _YAMNET_LAYER_DEFS = [
66
+ # (layer_function, kernel, stride, num_filters)
67
+ (_conv, [3, 3], 2, 32),
68
+ (_separable_conv, [3, 3], 1, 64),
69
+ (_separable_conv, [3, 3], 2, 128),
70
+ (_separable_conv, [3, 3], 1, 128),
71
+ (_separable_conv, [3, 3], 2, 256),
72
+ (_separable_conv, [3, 3], 1, 256),
73
+ (_separable_conv, [3, 3], 2, 512),
74
+ (_separable_conv, [3, 3], 1, 512),
75
+ (_separable_conv, [3, 3], 1, 512),
76
+ (_separable_conv, [3, 3], 1, 512),
77
+ (_separable_conv, [3, 3], 1, 512),
78
+ (_separable_conv, [3, 3], 1, 512),
79
+ (_separable_conv, [3, 3], 2, 1024),
80
+ (_separable_conv, [3, 3], 1, 1024)
81
+ ]
82
+
83
+
84
+ def yamnet(features, params):
85
+ """Define the core YAMNet mode in Keras."""
86
+ net = layers.Reshape(
87
+ (params.patch_frames, params.patch_bands, 1),
88
+ input_shape=(params.patch_frames, params.patch_bands))(features)
89
+ for (i, (layer_fun, kernel, stride, filters)) in enumerate(_YAMNET_LAYER_DEFS):
90
+ net = layer_fun('layer{}'.format(i + 1), kernel, stride, filters, params)(net)
91
+ embeddings = layers.GlobalAveragePooling2D()(net)
92
+ logits = layers.Dense(units=params.num_classes, use_bias=True)(embeddings)
93
+ predictions = layers.Activation(activation=params.classifier_activation)(logits)
94
+ return predictions, embeddings
95
+
96
+
97
+ def yamnet_frames_model(params):
98
+ """Defines the YAMNet waveform-to-class-scores model.
99
+
100
+ Args:
101
+ params: An instance of Params containing hyperparameters.
102
+
103
+ Returns:
104
+ A model accepting (num_samples,) waveform input and emitting:
105
+ - predictions: (num_patches, num_classes) matrix of class scores per time frame
106
+ - embeddings: (num_patches, embedding size) matrix of embeddings per time frame
107
+ - log_mel_spectrogram: (num_spectrogram_frames, num_mel_bins) spectrogram feature matrix
108
+ """
109
+ waveform = layers.Input(batch_shape=(None,), dtype=tf.float32)
110
+ waveform_padded = features_lib.pad_waveform(waveform, params)
111
+ log_mel_spectrogram, features = features_lib.waveform_to_log_mel_spectrogram_patches(
112
+ waveform_padded, params)
113
+ predictions, embeddings = yamnet(features, params)
114
+ frames_model = Model(
115
+ name='yamnet_frames', inputs=waveform,
116
+ outputs=[predictions, embeddings, log_mel_spectrogram])
117
+ return frames_model
118
+
119
+
120
+ def yamnet_transfer(features, params):
121
+ net = layers.Reshape(
122
+ (params.patch_frames, params.patch_bands, 1),
123
+ input_shape=(params.patch_frames, params.patch_bands))(features)
124
+ for (i, (layer_fun, kernel, stride, filters)) in enumerate(_YAMNET_LAYER_DEFS):
125
+ net = layer_fun('layer{}'.format(i + 1), kernel, stride, filters, params)(net)
126
+ embeddings = layers.GlobalAveragePooling2D()(net)
127
+ return embeddings
128
+
129
+
130
+ def yamnet_frames_model_transfer(params, last_layers):
131
+ """Defines the YAMNet waveform-to-class-scores model.
132
+
133
+ Args:
134
+ params: An instance of Params containing hyperparameters;
135
+ last_layers: Path to the classifier model.
136
+ Returns:
137
+ A model accepting (num_samples,) waveform input and emitting:
138
+ - predictions: (num_patches, num_classes) matrix of class scores per time frame
139
+ - embeddings: (num_patches, embedding size) matrix of embeddings per time frame
140
+ """
141
+
142
+ waveform = layers.Input(batch_shape=(None,), dtype=tf.float32)
143
+ waveform_padded = features_lib.pad_waveform(waveform, params)
144
+ _, features = features_lib.waveform_to_log_mel_spectrogram_patches(
145
+ waveform_padded, params)
146
+ embeddings = yamnet_transfer(features, params)
147
+ prediction = embeddings
148
+ last_layers = load_model(last_layers)
149
+ for layer in last_layers.layers[1:]:
150
+ prediction = layer(prediction)
151
+ frames_model = Model(
152
+ name='yamnet_frames', inputs=waveform,
153
+ outputs=[prediction, embeddings])
154
+ return frames_model
155
+
156
+
157
+ def class_names(class_map_csv):
158
+ """Read the class name definition file and return a list of strings."""
159
+ if tf.is_tensor(class_map_csv):
160
+ class_map_csv = class_map_csv.numpy()
161
+ with open(class_map_csv) as csv_file:
162
+ reader = csv.reader(csv_file)
163
+ next(reader) # Skip header
164
+ return np.array([display_name for (_, _, display_name) in reader])
yamnet_test.py ADDED
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Installation test for YAMNet."""
2
+
3
+ import numpy as np
4
+ import tensorflow as tf
5
+
6
+ import params
7
+ import yamnet
8
+
9
+
10
+ class YAMNetTest(tf.test.TestCase):
11
+ _params = None
12
+ _yamnet = None
13
+ _yamnet_classes = None
14
+
15
+ @classmethod
16
+ def setUpClass(cls):
17
+ super().setUpClass()
18
+ cls._params = params.Params()
19
+ cls._yamnet = yamnet.yamnet_frames_model(cls._params)
20
+ cls._yamnet.load_weights('yamnet/yamnet.h5')
21
+ cls._yamnet_classes = yamnet.class_names('yamnet/yamnet_class_map.csv')
22
+
23
+ def clip_test(self, waveform, expected_class_name, top_n=10):
24
+ """Run the model on the waveform, check that expected class is in top-n."""
25
+ predictions, _, _ = YAMNetTest._yamnet(waveform)
26
+ clip_predictions = np.mean(predictions, axis=0)
27
+ top_n_indices = np.argsort(clip_predictions)[-top_n:]
28
+ top_n_scores = clip_predictions[top_n_indices]
29
+ top_n_class_names = YAMNetTest._yamnet_classes[top_n_indices]
30
+ top_n_predictions = list(zip(top_n_class_names, top_n_scores))
31
+ self.assertIn(expected_class_name, top_n_class_names,
32
+ 'Did not find expected class {} in top {} predictions: {}'.format(
33
+ expected_class_name, top_n, top_n_predictions))
34
+
35
+ def testZeros(self):
36
+ self.clip_test(
37
+ waveform=np.zeros((int(3 * YAMNetTest._params.sample_rate),)),
38
+ expected_class_name='Silence')
39
+
40
+ def testRandom(self):
41
+ # Create a numpy random Generator with a fixed seed for repeatability
42
+ rng = np.random.default_rng(51773)
43
+ self.clip_test(
44
+ waveform=rng.uniform(-1.0, +1.0,
45
+ (int(3 * YAMNetTest._params.sample_rate),)),
46
+ expected_class_name='White noise')
47
+
48
+ def testSine(self):
49
+ self.clip_test(
50
+ waveform=np.sin(2 * np.pi * 440 *
51
+ np.arange(0, 3, 1 / YAMNetTest._params.sample_rate)),
52
+ expected_class_name='Sine wave')
53
+
54
+
55
+ if __name__ == '__main__':
56
+ tf.test.main()