ParkinsonDetection / app /ml_logic.py
Genos77's picture
first commit
e9ee222
import os
import subprocess
import shutil
import joblib
import pandas as pd
from imageio_ffmpeg import get_ffmpeg_exe
import tensorflow as tf
from tensorflow.keras.preprocessing import image
import numpy as np
import librosa
import librosa.display
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import noisereduce as nr
# --- Configuration ---
AUDIO_MODEL_PATH = 'parkinson_cnn_model_stft_grayscale.h5'
SYMPTOM_MODEL_PATH = 'symptom_model.joblib'
SPECTROGRAM_PATH = 'spectrograms_stft_5s_grayscale'
TARGET_DURATION_S = 5
# --- Load Both Models at Startup ---
try:
audio_model = tf.keras.models.load_model(AUDIO_MODEL_PATH)
print(f"Successfully loaded Audio CNN model from: {AUDIO_MODEL_PATH}")
except Exception as e:
print(f"FATAL: Could not load AUDIO model. Error: {e}")
audio_model = None
try:
symptom_model = joblib.load(SYMPTOM_MODEL_PATH)
print(f"Successfully loaded Symptom model from: {SYMPTOM_MODEL_PATH}")
except Exception as e:
print(f"FATAL: Could not load SYMPTOM model. Error: {e}")
symptom_model = None
# --- Spectrogram Creation Function (Unchanged) ---
def create_stft_spectrogram_from_audio(audio_path, save_path):
"""
Loads, converts, and saves a high-quality, clean grayscale STFT spectrogram.
"""
audio_path = os.path.abspath(audio_path)
save_path = os.path.abspath(save_path)
temp_dir = os.path.join(os.path.dirname(audio_path), f"temp_{os.path.basename(audio_path)}")
os.makedirs(temp_dir, exist_ok=True)
try:
if audio_path.lower().endswith('.webm'):
temp_wav_path = os.path.join(temp_dir, 'converted_audio.wav')
try:
ffmpeg_executable = get_ffmpeg_exe()
command = [ffmpeg_executable, '-i', audio_path, '-acodec', 'pcm_s16le', '-ac', '1', '-ar', '44100', '-y', temp_wav_path]
subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
shutil.rmtree(temp_dir)
return False
processing_path = temp_wav_path
else:
processing_path = audio_path
y, sr = librosa.load(processing_path, sr=None)
target_samples = TARGET_DURATION_S * sr
if len(y) > target_samples:
start_index = int((len(y) - target_samples) / 2)
y_segment = y[start_index : start_index + target_samples]
else:
y_segment = librosa.util.pad_center(y, size=target_samples)
y_reduced = nr.reduce_noise(y=y_segment, sr=sr)
N_FFT = 1024
HOP_LENGTH = 256
S_audio = librosa.stft(y_reduced, n_fft=N_FFT, hop_length=HOP_LENGTH)
Y_db = librosa.amplitude_to_db(np.abs(S_audio), ref=np.max)
plt.figure(figsize=(12, 4))
librosa.display.specshow(Y_db, sr=sr, hop_length=HOP_LENGTH, x_axis='time', y_axis='log', cmap='gray_r')
plt.axis('off')
plt.savefig(save_path, bbox_inches='tight', pad_inches=0)
plt.close()
shutil.rmtree(temp_dir)
return True
except Exception as e:
if 'temp_dir' in locals() and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
return False
# --- Master Prediction Function (Corrected Version) ---
def get_combined_prediction(symptom_data, audio_path, user_age):
"""
Gets predictions from both models, combines them, and applies business logic.
"""
if not audio_model or not symptom_model:
print("ERROR: One or both models are not loaded.")
return "Error: Model not loaded.", "Error", 0.5
# --- 1. Get Prediction from Symptom Model (M1) ---
try:
# Create a pandas DataFrame from the input dictionary.
symptom_df = pd.DataFrame([symptom_data])
# --- THIS IS THE CORRECTION ---
# Define the exact feature order the model was trained on.
feature_order = ['tremor', 'stiffness', 'walking_issue']
# Reorder the DataFrame columns to match the training order.
symptom_df_ordered = symptom_df[feature_order]
# Predict the probability using the correctly ordered data.
symptom_proba = symptom_model.predict_proba(symptom_df_ordered)[0][1]
print(f"Symptom Model (M1) Prediction: {symptom_proba:.4f}")
except Exception as e:
print(f"Error getting symptom prediction: {e}")
symptom_proba = 0.5
# --- 2. Get Prediction from Audio CNN Model (M2) ---
audio_proba = 0.5
try:
temp_predict_dir = os.path.join(SPECTROGRAM_PATH, 'temp')
os.makedirs(temp_predict_dir, exist_ok=True)
temp_spectrogram_path = os.path.join(temp_predict_dir, 'temp_spec.png')
if create_stft_spectrogram_from_audio(audio_path, temp_spectrogram_path):
img = image.load_img(temp_spectrogram_path, target_size=(224, 224), color_mode='grayscale')
img_array = image.img_to_array(img)
img_array = np.expand_dims(img_array, axis=0)
img_array /= 255.0
audio_proba = audio_model.predict(img_array)[0][0]
print(f"Audio Model (M2) Prediction: {audio_proba:.4f}")
if os.path.exists(temp_spectrogram_path):
os.remove(temp_spectrogram_path)
else:
raise ValueError("Spectrogram creation failed.")
except Exception as e:
print(f"Error getting audio prediction: {e}")
# --- 3. Calculate the Final Weighted Score ---
weight_symptoms = 0.7
weight_audio = 0.3
final_score = (weight_symptoms * symptom_proba) + (weight_audio * audio_proba)
print(f"Final Combined Score: {final_score:.4f}")
# --- 4. Make Final Decision Based on the Combined Score ---
cnn_result_label = "Positive" if final_score > 0.5 else "Negative"
final_result_label = cnn_result_label
if cnn_result_label == "Positive" and user_age < 40:
final_result_label = "Negative (Age Override)"
return final_result_label, cnn_result_label, float(final_score)