File size: 8,032 Bytes
9c6b905
279af50
9c6b905
279af50
 
9c6b905
279af50
 
9c6b905
279af50
 
 
 
 
9c6b905
279af50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9c6b905
 
 
279af50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9c6b905
279af50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import os
import logging
import numpy as np
import librosa
from sklearn.preprocessing import normalize
from tensorflow.keras.models import load_model
from scipy.signal import butter, sosfilt
import pandas as pd

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("audio_classifier_test")

# Paths and Constants
MODEL_PATH = "./models"
FILE_PATH = "./data/Respiratory_Sound_Database/testsample/101_1b1_Al_sc_Meditron.wav"
MODELS = {
    "binary": {
        "augmented": "final_model_binary_augmented.h5",
        "log_mel": "final_model_binary_log_mel.h5",
        "mfcc": "final_model_binary_mfcc.h5",
    },
    "multi": {
        "augmented": "final_model_multi_augmented.h5",
        "log_mel": "final_model_multi_log_mel.h5",
        "mfcc": "final_model_multi_mfcc.h5",
    }
}
CLASS_NAMES = {
    "binary": ["Abnormal", "Normal"],
    "multi": ["Chronic Respiratory Diseases", "Normal", "Respiratory Infections"]
}


# Augmentation Functions
def add_noise(data, noise_factor=0.001):
    noise = np.random.randn(len(data))
    return data + noise_factor * noise

def shift(data, shift_factor=1600):
    return np.roll(data, shift_factor)

def stretch(data, rate=1.2):
    return librosa.effects.time_stretch(data, rate=rate)

def pitch_shift(data, sr, n_steps=3):
    return librosa.effects.pitch_shift(data, sr=sr, n_steps=n_steps)



def filtering(audio, sr):
    """

    Apply a bandpass filter to audio data.

    

    Returns filtered audio signal.

    """
    # Define cutoff frequencies
    low_cutoff = 50  # 50 Hz
    high_cutoff = min(5000, sr / 2 - 1)  # Ensure it is below Nyquist frequency

    if low_cutoff >= high_cutoff:
        raise ValueError(
            f"Invalid filter range: low_cutoff={low_cutoff}, high_cutoff={high_cutoff} for sampling rate {sr}"
        )

    # Design a bandpass filter
    sos = butter(N=10, Wn=[low_cutoff, high_cutoff], btype='band', fs=sr, output='sos')

    # Apply the filter
    filtered_audio = sosfilt(sos, audio)
    return filtered_audio


def preprocess_audio(audio_file, mode="augmented", input_shape=None):
    """

    Preprocess an audio file for classification by resampling, padding/truncating,

    and extracting features (e.g., MFCC, Log-Mel spectrogram, or Augmented features).

    """
    try:
        sr_new = 16000  # Resample audio to 16 kHz
        x, sr = librosa.load(audio_file, sr=sr_new)
        x = filtering(x, sr)
        logger.info(f"Loaded audio file '{audio_file}' with shape {x.shape} and sampling rate {sr}.")

        max_len = 5 * sr_new
        if x.shape[0] < max_len:
            x = np.pad(x, (0, max_len - x.shape[0]))
            logger.info(f"Audio padded to {max_len} samples.")
        else:
            x = x[:max_len]
            logger.info(f"Audio truncated to {max_len} samples.")

        # Handle each mode separately
        if mode == 'mfcc':
            feature = librosa.feature.mfcc(y=x, sr=sr_new, n_mfcc=20)  # Extract MFCC
            feature = normalize(feature, axis=1)

        elif mode == 'log_mel':
            mel_spec = librosa.feature.melspectrogram(y=x, sr=sr_new, n_mels=20, fmax=8000)
            feature = librosa.power_to_db(mel_spec, ref=np.max)  # Extract Log-Mel spectrogram
            feature = normalize(feature, axis=1)

        elif mode == 'augmented':
            features = []

            # Base MFCC
            base_mfcc = np.mean(librosa.feature.mfcc(y=x, sr=sr_new, n_mfcc=52).T, axis=0)
            features.append(base_mfcc)

            # Augmented features
            for augmentation in [
                lambda d: add_noise(d, 0.001),
                lambda d: shift(d, 1600),
                lambda d: stretch(d, 1.2),
                lambda d: pitch_shift(d, sr_new, 3)
            ]:
                augmented_data = augmentation(x)
                aug_mfcc = np.mean(librosa.feature.mfcc(y=augmented_data, sr=sr_new, n_mfcc=52).T, axis=0)
                features.append(aug_mfcc)

            # Average augmented features
            feature = np.mean(features, axis=0)
            feature = normalize(feature.reshape(1, -1), axis=1).flatten()  # Normalize

        else:
            raise ValueError(f"Unknown mode: {mode}")

        # Reshape for model input if required
        if input_shape:
            feature = _reshape_feature(feature, input_shape)

        logger.info(f"Feature extracted with shape {feature.shape}.")
        return np.expand_dims(feature, axis=-1)  # Add channel dimension

    except Exception as e:
        logger.error(f"Error in preprocessing audio: {e}")
        raise


def _reshape_feature(feature, input_shape):
    """

    Reshape the feature to match the expected input shape of the model.



    Returns reshaped feature.

    """
    expected_time_frames = input_shape[1]
    if len(feature) > expected_time_frames:
        feature = feature[:expected_time_frames]
    elif len(feature) < expected_time_frames:
        feature = np.pad(feature, (0, expected_time_frames - len(feature)))

    return feature


def classify_audio(model_type, feature_type, file_path):
    """

    Classify an audio file using the specified model and feature type.

    """
    try:
        model_file = os.path.join(MODEL_PATH, MODELS[model_type][feature_type])
        if not os.path.exists(model_file):
            raise FileNotFoundError(f"Model file '{model_file}' not found.")
        model = load_model(model_file)

        # Get input shape from the model
        input_shape = model.input_shape

        # Preprocess audio
        processed_audio = preprocess_audio(file_path, mode=feature_type, input_shape=input_shape)

        # Add batch dimension
        processed_audio = np.expand_dims(processed_audio, axis=0)

        # Predict
        predictions = model.predict(processed_audio)
        predicted_class = np.argmax(predictions, axis=1)[0]
        probabilities = predictions[0].tolist()

        logger.info(f"Prediction complete. Predicted class: {predicted_class}, Probabilities: {probabilities}")
        return predicted_class, probabilities

    except Exception as e:
        logger.error(f"Error in classification: {e}")
        raise


def main():
    logger.info("Starting audio classification test script.")

    if not os.path.exists(FILE_PATH):
        logger.error(f"Audio file not found: {FILE_PATH}")
        return

    results = []  # To store results for the summary table

    for model_type in MODELS.keys():
        for feature_type in MODELS[model_type].keys():
            try:
                logger.info(f"Testing {model_type} model with {feature_type} features.")
                predicted_class, probabilities = classify_audio(model_type, feature_type, FILE_PATH)
                class_name = CLASS_NAMES[model_type][predicted_class]
                logger.info(f"Predicted Class: {class_name} ({predicted_class}), Probabilities: {probabilities}")

                # Add result to the summary
                results.append({
                    "Model Type": model_type,
                    "Feature Type": feature_type,
                    "Predicted Class": class_name,
                    "Probabilities": probabilities
                })
            except Exception as e:
                logger.error(f"Failed for {model_type} - {feature_type}: {e}")
                results.append({
                    "Model Type": model_type,
                    "Feature Type": feature_type,
                    "Predicted Class": "Error",
                    "Probabilities": str(e)
                })

    # Create a DataFrame and print the table
    df_results = pd.DataFrame(results)
    print("\nSummary of Results:")
    print(df_results.to_string(index=False))

if __name__ == "__main__":
    main()