import os import numpy as np import tensorflow as tf from fastapi import FastAPI, UploadFile, File from fastapi.staticfiles import StaticFiles from scipy import signal import soundfile as sf from datetime import datetime import threading, time from team_code import base_model # ton architecture # ---------------------------- # CONFIG # ---------------------------- SIG_LEN = 32256 N_FEATURES = 1 MODEL_PATH = "pretrained_model.h5" OUTPUT_DIR = "/tmp/audio_results" os.makedirs(OUTPUT_DIR, exist_ok=True) BASE_URL = "https://stroke-ia-murmur-api.hf.space" # adapte selon ton déploiement # ---------------------------- # CHARGEMENT DU MODÈLE # ---------------------------- print("[INFO] Chargement du modèle TensorFlow...") model = base_model(SIG_LEN, N_FEATURES) model.load_weights(MODEL_PATH) model.compile() # juste pour être sûr print("[INFO] Modèle chargé ✅") # ---------------------------- # FASTAPI # ---------------------------- app = FastAPI(title="Heart Murmur Detection API") app.mount("/files", StaticFiles(directory=OUTPUT_DIR), name="files") # ---------------------------- # FONCTION DE PRÉTRAITEMENT # ---------------------------- def preprocess_audio(file_path): data, sr = sf.read(file_path) if data.ndim > 1: data = np.mean(data, axis=1) resampled = signal.resample(data, SIG_LEN) return np.expand_dims(resampled, axis=(0, 2)) # (1, sig_len, 1) # ---------------------------- # ENDPOINT PRINCIPAL # ---------------------------- @app.post("/predict/") async def predict_murmur(audio_file: UploadFile = File(...)): """ Upload un fichier audio (.wav, .mp3) → renvoie diagnostic + probabilité """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") tmp_path = os.path.join(OUTPUT_DIR, f"{timestamp}_{audio_file.filename}") with open(tmp_path, "wb") as f: f.write(await audio_file.read()) try: x = preprocess_audio(tmp_path) pred = float(model.predict(x)[0][0]) label = "Abnormal" if pred > 0.5 else "Normal" prob = round(pred, 3) report_path = os.path.join(OUTPUT_DIR, f"report_{timestamp}.txt") with open(report_path, "w") as f: f.write(f"Result: {label}\nProbability: {prob}\n") return { "diagnosis": label, "probability": prob, "rapport_url": f"{BASE_URL}/files/{os.path.basename(report_path)}", "message": "✅ Analyse audio terminée." } except Exception as e: return {"error": str(e)} finally: if os.path.exists(tmp_path): os.remove(tmp_path) # ---------------------------- # AUTO-CLEANUP # ---------------------------- def auto_cleanup(interval_minutes=10): while True: time.sleep(interval_minutes * 60) for file in os.listdir(OUTPUT_DIR): try: os.remove(os.path.join(OUTPUT_DIR, file)) print(f"[CLEANUP] Fichier supprimé : {file}") except Exception as e: print(f"[CLEANUP] Erreur : {e}") threading.Thread(target=auto_cleanup, daemon=True).start()