Spaces:
Build error
Build error
| import os | |
| import contextlib | |
| import wave | |
| import librosa | |
| import numpy as np | |
| import pandas as pd | |
| import parselmouth | |
| import soundfile as sf | |
| import webrtcvad | |
| from tensorflow.keras.models import load_model | |
| import joblib | |
| import warnings | |
| import tempfile | |
| import json | |
| # --- Streamlit Imports --- | |
| import streamlit as st | |
| from sklearn.preprocessing import StandardScaler | |
| # --- Configuration --- | |
| TARGET_SR = 16000 | |
| MODEL_PATH = "vocal_model.h5" | |
| # We now use the JSON file for the scaler | |
| SCALER_PATH_JSON = "vocal_scaler.json" | |
| FEATURES_PATH = "feature_names.joblib" | |
| # --- Suppress Warnings --- | |
| warnings.filterwarnings('ignore') | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' | |
| # --- Caching the Models and Scaler --- | |
| # This is a key Streamlit feature. It loads the models only once, making the app fast. | |
| def load_models_and_scaler(): | |
| """Load and cache the model, scaler, and feature names.""" | |
| model = load_model(MODEL_PATH) | |
| # Load scaler from JSON | |
| with open(SCALER_PATH_JSON, 'r') as f: | |
| scaler_data = json.load(f) | |
| scaler = StandardScaler() | |
| scaler.mean_ = np.array(scaler_data['mean_']) | |
| scaler.scale_ = np.array(scaler_data['scale_']) | |
| feature_names = joblib.load(FEATURES_PATH) | |
| return model, scaler, feature_names | |
| # --- Feature Extraction Functions (Your original functions) --- | |
| # ... (Copy ALL your feature extraction functions here, exactly as they were) ... | |
| def preprocess_audio(input_path, target_sr=TARGET_SR): | |
| try: | |
| data, sr = librosa.load(input_path, sr=None, mono=False) | |
| if data.ndim > 1: data = data.mean(axis=0) | |
| if sr != target_sr: data = librosa.resample(data, orig_sr=sr, target_sr=target_sr) | |
| base, ext = os.path.splitext(input_path) | |
| output_path = f"{base}_processed_for_prediction.wav" | |
| sf.write(output_path, data, target_sr, subtype='PCM_16') | |
| return output_path | |
| except Exception as e: | |
| st.error(f"Error preprocessing audio: {e}") | |
| return None | |
| def extract_features(file_path): | |
| try: | |
| y, sr = librosa.load(file_path, sr=None) | |
| duration = librosa.get_duration(y=y, sr=sr) | |
| mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13) | |
| mfcc_means = np.mean(mfccs, axis=1) | |
| snd = parselmouth.Sound(file_path) | |
| pitch = snd.to_pitch() | |
| pitch_values = pitch.selected_array['frequency'] | |
| pitch_values = pitch_values[pitch_values != 0] | |
| pitch_mean = np.mean(pitch_values) if len(pitch_values) > 0 else 0 | |
| pitch_std = np.std(pitch_values) if len(pitch_values) > 0 else 0 | |
| point_process = parselmouth.praat.call(snd, "To PointProcess (periodic, cc)", 75, 500) | |
| jitter_local = parselmouth.praat.call(point_process, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3) | |
| shimmer_local = parselmouth.praat.call([snd, point_process], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6) | |
| def read_wave(path): | |
| with contextlib.closing(wave.open(path, 'rb')) as wf: | |
| pcm_data, sample_rate = wf.readframes(wf.getnframes()), wf.getframerate() | |
| return pcm_data, sample_rate | |
| def frame_generator(frame_duration_ms, audio, sample_rate): | |
| n = int(sample_rate * (frame_duration_ms / 1000.0) * 2) | |
| offset = 0 | |
| while offset + n < len(audio): | |
| yield audio[offset:offset + n] | |
| offset += n | |
| vad = webrtcvad.Vad(1) | |
| audio, sample_rate = read_wave(file_path) | |
| frames = list(frame_generator(30, audio, sample_rate)) | |
| voiced_seconds = 0 | |
| num_segments = 0 | |
| if frames: | |
| for frame in frames: | |
| if vad.is_speech(frame, sample_rate): | |
| voiced_seconds += 0.03 # 30ms frame | |
| num_segments +=1 | |
| silence_ratio = max(0, (duration - voiced_seconds) / duration) if duration > 0 else 0 | |
| speaking_rate = num_segments / duration if duration > 0 else 0 | |
| features = { | |
| 'Duration': duration, | |
| 'Pitch_Mean': pitch_mean, | |
| 'Pitch_Std': pitch_std, | |
| 'Jitter': jitter_local, | |
| 'Shimmer': shimmer_local, | |
| 'Speaking_Rate': speaking_rate, | |
| 'Silence_Ratio': silence_ratio, | |
| } | |
| for idx, val in enumerate(mfcc_means): | |
| features[f'MFCC_{idx+1}'] = val | |
| return features | |
| except Exception as e: | |
| st.error(f"Error extracting features: {e}") | |
| return None | |
| # --- Main Prediction Logic (Refactored for Streamlit) --- | |
| def predict(audio_file_path, model, scaler, feature_names): | |
| """Takes an audio file path and returns the prediction results.""" | |
| processed_path = preprocess_audio(audio_file_path) | |
| if not processed_path: | |
| return None, None | |
| features_dict = extract_features(processed_path) | |
| os.remove(processed_path) # Clean up the processed file | |
| if not features_dict: | |
| return None, None | |
| # Convert to DataFrame and ensure correct column order | |
| feature_df = pd.DataFrame([features_dict]) | |
| feature_df = feature_df[feature_names] | |
| # Scale the features | |
| scaled_features = scaler.transform(feature_df) | |
| # Make prediction | |
| prediction_prob = model.predict(scaled_features, verbose=0)[0][0] | |
| return prediction_prob, features_dict | |
| # --- Streamlit App Interface --- | |
| st.set_page_config(page_title="Parkinson's Voice Detector", page_icon="🩺", layout="centered") | |
| st.title("🩺 Parkinson's Disease Detection from Voice") | |
| st.markdown(""" | |
| This app uses a machine learning model to predict the likelihood of Parkinson's disease based on vocal features. | |
| Upload a short voice recording (e.g., of someone saying "ahhh" for a few seconds) to get a prediction. | |
| **Disclaimer:** This is a demonstration tool and not a substitute for professional medical advice. | |
| """) | |
| # Load models | |
| try: | |
| model, scaler, feature_names = load_models_and_scaler() | |
| st.sidebar.success("Model and components loaded successfully!") | |
| except Exception as e: | |
| st.error(f"Error loading model components: {e}") | |
| st.stop() # Stop the app if models can't be loaded | |
| # File Uploader | |
| uploaded_file = st.file_uploader( | |
| "Choose a voice recording...", | |
| type=["wav", "mp3", "ogg", "flac"] | |
| ) | |
| if uploaded_file is not None: | |
| st.audio(uploaded_file, format='audio/wav') | |
| # When the user clicks the button, start prediction | |
| if st.button("Analyze Audio", type="primary"): | |
| # Save the uploaded file to a temporary location | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | |
| tmp_file.write(uploaded_file.getvalue()) | |
| tmp_file_path = tmp_file.name | |
| with st.spinner('Analyzing audio... This may take a moment.'): | |
| try: | |
| prediction_prob, features = predict(tmp_file_path, model, scaler, feature_names) | |
| if prediction_prob is not None: | |
| # Display results | |
| st.subheader("Analysis Result") | |
| is_parkinsons = prediction_prob > 0.5 | |
| if is_parkinsons: | |
| st.warning(f"**Parkinson's Detected** (Confidence: {prediction_prob:.2%})") | |
| else: | |
| st.success(f"**Healthy** (Confidence of being healthy: {(1-prediction_prob):.2%})") | |
| # Display confidence as a progress bar | |
| st.progress(prediction_prob) | |
| st.markdown(f"The model's confidence score for the presence of Parkinson's is **{prediction_prob:.2%}**.") | |
| # Show extracted features in an expander | |
| with st.expander("View Extracted Vocal Features"): | |
| st.json(features) | |
| else: | |
| st.error("Could not process the audio file. Please try a different file.") | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred during analysis: {e}") | |
| finally: | |
| # Clean up the temporary file | |
| os.remove(tmp_file_path) |