|
|
import os |
|
|
import subprocess |
|
|
import shutil |
|
|
import joblib |
|
|
import pandas as pd |
|
|
from imageio_ffmpeg import get_ffmpeg_exe |
|
|
import tensorflow as tf |
|
|
from tensorflow.keras.preprocessing import image |
|
|
import numpy as np |
|
|
import librosa |
|
|
import librosa.display |
|
|
import matplotlib |
|
|
matplotlib.use('Agg') |
|
|
import matplotlib.pyplot as plt |
|
|
import noisereduce as nr |
|
|
|
|
|
|
|
|
AUDIO_MODEL_PATH = 'parkinson_cnn_model_stft_grayscale.h5' |
|
|
SYMPTOM_MODEL_PATH = 'symptom_model.joblib' |
|
|
SPECTROGRAM_PATH = 'spectrograms_stft_5s_grayscale' |
|
|
TARGET_DURATION_S = 5 |
|
|
|
|
|
|
|
|
try: |
|
|
audio_model = tf.keras.models.load_model(AUDIO_MODEL_PATH) |
|
|
print(f"Successfully loaded Audio CNN model from: {AUDIO_MODEL_PATH}") |
|
|
except Exception as e: |
|
|
print(f"FATAL: Could not load AUDIO model. Error: {e}") |
|
|
audio_model = None |
|
|
|
|
|
try: |
|
|
symptom_model = joblib.load(SYMPTOM_MODEL_PATH) |
|
|
print(f"Successfully loaded Symptom model from: {SYMPTOM_MODEL_PATH}") |
|
|
except Exception as e: |
|
|
print(f"FATAL: Could not load SYMPTOM model. Error: {e}") |
|
|
symptom_model = None |
|
|
|
|
|
|
|
|
def create_stft_spectrogram_from_audio(audio_path, save_path): |
|
|
""" |
|
|
Loads, converts, and saves a high-quality, clean grayscale STFT spectrogram. |
|
|
""" |
|
|
audio_path = os.path.abspath(audio_path) |
|
|
save_path = os.path.abspath(save_path) |
|
|
temp_dir = os.path.join(os.path.dirname(audio_path), f"temp_{os.path.basename(audio_path)}") |
|
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
|
|
try: |
|
|
if audio_path.lower().endswith('.webm'): |
|
|
temp_wav_path = os.path.join(temp_dir, 'converted_audio.wav') |
|
|
try: |
|
|
ffmpeg_executable = get_ffmpeg_exe() |
|
|
command = [ffmpeg_executable, '-i', audio_path, '-acodec', 'pcm_s16le', '-ac', '1', '-ar', '44100', '-y', temp_wav_path] |
|
|
subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
except Exception as e: |
|
|
shutil.rmtree(temp_dir) |
|
|
return False |
|
|
processing_path = temp_wav_path |
|
|
else: |
|
|
processing_path = audio_path |
|
|
|
|
|
y, sr = librosa.load(processing_path, sr=None) |
|
|
target_samples = TARGET_DURATION_S * sr |
|
|
if len(y) > target_samples: |
|
|
start_index = int((len(y) - target_samples) / 2) |
|
|
y_segment = y[start_index : start_index + target_samples] |
|
|
else: |
|
|
y_segment = librosa.util.pad_center(y, size=target_samples) |
|
|
|
|
|
y_reduced = nr.reduce_noise(y=y_segment, sr=sr) |
|
|
N_FFT = 1024 |
|
|
HOP_LENGTH = 256 |
|
|
S_audio = librosa.stft(y_reduced, n_fft=N_FFT, hop_length=HOP_LENGTH) |
|
|
Y_db = librosa.amplitude_to_db(np.abs(S_audio), ref=np.max) |
|
|
plt.figure(figsize=(12, 4)) |
|
|
librosa.display.specshow(Y_db, sr=sr, hop_length=HOP_LENGTH, x_axis='time', y_axis='log', cmap='gray_r') |
|
|
plt.axis('off') |
|
|
plt.savefig(save_path, bbox_inches='tight', pad_inches=0) |
|
|
plt.close() |
|
|
shutil.rmtree(temp_dir) |
|
|
return True |
|
|
except Exception as e: |
|
|
if 'temp_dir' in locals() and os.path.exists(temp_dir): |
|
|
shutil.rmtree(temp_dir) |
|
|
return False |
|
|
|
|
|
|
|
|
def get_combined_prediction(symptom_data, audio_path, user_age): |
|
|
""" |
|
|
Gets predictions from both models, combines them, and applies business logic. |
|
|
""" |
|
|
if not audio_model or not symptom_model: |
|
|
print("ERROR: One or both models are not loaded.") |
|
|
return "Error: Model not loaded.", "Error", 0.5 |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
symptom_df = pd.DataFrame([symptom_data]) |
|
|
|
|
|
|
|
|
|
|
|
feature_order = ['tremor', 'stiffness', 'walking_issue'] |
|
|
|
|
|
symptom_df_ordered = symptom_df[feature_order] |
|
|
|
|
|
|
|
|
symptom_proba = symptom_model.predict_proba(symptom_df_ordered)[0][1] |
|
|
print(f"Symptom Model (M1) Prediction: {symptom_proba:.4f}") |
|
|
except Exception as e: |
|
|
print(f"Error getting symptom prediction: {e}") |
|
|
symptom_proba = 0.5 |
|
|
|
|
|
|
|
|
audio_proba = 0.5 |
|
|
try: |
|
|
temp_predict_dir = os.path.join(SPECTROGRAM_PATH, 'temp') |
|
|
os.makedirs(temp_predict_dir, exist_ok=True) |
|
|
temp_spectrogram_path = os.path.join(temp_predict_dir, 'temp_spec.png') |
|
|
|
|
|
if create_stft_spectrogram_from_audio(audio_path, temp_spectrogram_path): |
|
|
img = image.load_img(temp_spectrogram_path, target_size=(224, 224), color_mode='grayscale') |
|
|
img_array = image.img_to_array(img) |
|
|
img_array = np.expand_dims(img_array, axis=0) |
|
|
img_array /= 255.0 |
|
|
audio_proba = audio_model.predict(img_array)[0][0] |
|
|
print(f"Audio Model (M2) Prediction: {audio_proba:.4f}") |
|
|
if os.path.exists(temp_spectrogram_path): |
|
|
os.remove(temp_spectrogram_path) |
|
|
else: |
|
|
raise ValueError("Spectrogram creation failed.") |
|
|
except Exception as e: |
|
|
print(f"Error getting audio prediction: {e}") |
|
|
|
|
|
|
|
|
weight_symptoms = 0.7 |
|
|
weight_audio = 0.3 |
|
|
final_score = (weight_symptoms * symptom_proba) + (weight_audio * audio_proba) |
|
|
print(f"Final Combined Score: {final_score:.4f}") |
|
|
|
|
|
|
|
|
cnn_result_label = "Positive" if final_score > 0.5 else "Negative" |
|
|
|
|
|
final_result_label = cnn_result_label |
|
|
if cnn_result_label == "Positive" and user_age < 40: |
|
|
final_result_label = "Negative (Age Override)" |
|
|
|
|
|
return final_result_label, cnn_result_label, float(final_score) |