niol08's picture
Upload 8 files
a8e4c2f verified
import numpy as np
import pandas as pd
from scipy.signal import resample
from sklearn.preprocessing import scale
import soundfile as sf
from gemini import query_gemini_rest
import librosa
import tempfile
EXPECTED_LEN = 256
STEP = 128
PCG_LABELS = [
"Normal",
"Aortic Stenosis",
"Mitral Stenosis",
"Mitral Valve Prolapse",
"Pericardial Murmurs"
]
LABELS_EMG = ["healthy", "myopathy", "neuropathy"]
def load_uploaded_file(file, signal_type="ECG") -> np.ndarray:
name = file.name.lower()
if signal_type in ("ECG", "EMG"):
text = file.read().decode("utf-8").strip()
if "," in text:
vals = [float(x) for x in text.split(",") if x.strip()]
else:
vals = [float(x) for x in text.splitlines() if x.strip()]
return np.array(vals, dtype=np.float32)
if signal_type == "VAG":
if name.endswith(".csv"):
df = pd.read_csv(file)
features = [
"rms_amplitude",
"peak_frequency",
"spectral_entropy",
"zero_crossing_rate",
"mean_frequency",
]
return df[features].iloc[0].values.astype(np.float32)
elif name.endswith(".npy"):
return np.load(file)
elif name.endswith(".wav"):
data, _ = sf.read(file)
return data.astype(np.float32)
raise ValueError("Unsupported VAG file format.")
if signal_type == "PCG" and name.endswith((".wav", ".flac", ".mp3")):
data, _ = sf.read(file)
if data.ndim > 1:
data = data[:, 0]
return data.astype(np.float32)
raise ValueError("Unsupported file format.")
def preprocess_signal(x: np.ndarray) -> np.ndarray:
if x.size != EXPECTED_LEN:
x = resample(x, EXPECTED_LEN)
return scale(x).astype(np.float32)
def segment_signal(raw: np.ndarray) -> np.ndarray:
raw = preprocess_signal(raw)
seg = raw.reshape(EXPECTED_LEN, 1)
return seg[np.newaxis, ...]
PCG_INPUT_LEN = 995
def preprocess_pcg_waveform(wave: np.ndarray) -> np.ndarray:
if wave.ndim > 1:
wave = wave.mean(axis=1)
if len(wave) < PCG_INPUT_LEN:
wave = np.pad(wave, (0, PCG_INPUT_LEN - len(wave)))
else:
wave = wave[:PCG_INPUT_LEN]
wave = (wave - np.mean(wave)) / (np.std(wave) + 1e-8)
return wave.astype(np.float32)
def analyze_pcg_signal(file, model, gemini_key=None):
signal, _ = sf.read(file)
signal = preprocess_pcg_waveform(signal)
input_data = signal.reshape(1, PCG_INPUT_LEN, 1)
preds = model.predict(input_data, verbose=0)[0]
labels = [
"Normal",
"Aortic Stenosis",
"Mitral Stenosis",
"Mitral Valve Prolapse",
"Pericardial Murmurs",
]
idx = int(np.argmax(preds))
confidence = float(preds[idx])
label = labels[idx]
gem_txt = None
if gemini_key:
gem_txt = query_gemini_rest("PCG", label, confidence, gemini_key)
return label, label, confidence, gem_txt
def pcg_to_features(file_obj, target_sr=16000, n_mels=128, n_frames=112):
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp:
tmp.write(file_obj.read())
tmp_path = tmp.name
y, sr = librosa.load(tmp_path, sr=target_sr, mono=True)
mel = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=512, hop_length=256, n_mels=n_mels)
logmel = librosa.power_to_db(mel, ref=np.max)
if logmel.shape[1] < n_frames:
pad_width = n_frames - logmel.shape[1]
pad = np.zeros((n_mels, pad_width))
logmel = np.hstack((logmel, pad))
else:
logmel = logmel[:, :n_frames]
feat = logmel.flatten().astype(np.float32)
return feat[np.newaxis, ...]
def analyze_emg_signal(file, model, gemini_key=""):
raw = load_uploaded_file(file, signal_type="EMG")
WINDOW = 1000
wins = []
if len(raw) < WINDOW:
pad = np.pad(raw, (0, WINDOW - len(raw)))
wins.append(((pad - pad.mean()) / (pad.std()+1e-6)).reshape(WINDOW, 1))
else:
for i in range(0, len(raw) - WINDOW + 1, WINDOW):
win = raw[i:i+WINDOW]
win = (win - win.mean()) / (win.std() + 1e-6)
wins.append(win.reshape(WINDOW, 1))
X = np.array(wins, dtype=np.float32)
preds = model.predict(X, verbose=0)
classes = np.argmax(preds, axis=1)
final = int(np.bincount(classes).argmax())
conf = float(preds[:, final].mean())
human = LABELS_EMG[final]
gemini_txt = None
if gemini_key:
gemini_txt = query_gemini_rest("EMG", human, conf, gemini_key)
return human, conf, gemini_txt
FEATURE_COLS = [
"rms_amplitude",
"peak_frequency",
"spectral_entropy",
"zero_crossing_rate",
"mean_frequency",
]
def vag_to_features(file_obj) -> np.ndarray:
df = pd.read_csv(file_obj)
x = df[FEATURE_COLS].iloc[0].values.astype(np.float32)
return x.reshape(1, -1)
def predict_vag_from_features(file_obj, model_bundle, gemini_key=""):
model = model_bundle["model"]
scaler = model_bundle["scaler"]
encoder = model_bundle["encoder"]
x = vag_to_features(file_obj)
x_s = scaler.transform(x)
prob = model.predict_proba(x_s)[0]
idx = int(np.argmax(prob))
conf = float(prob[idx])
label = encoder.inverse_transform([idx])[0].title()
gem_note = (
query_gemini_rest("VAG", label, conf, gemini_key)
if gemini_key else None
)
return label, label, conf, gem_note