Spaces:
Paused
Paused
| import os | |
| import numpy as np | |
| import tensorflow as tf | |
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.staticfiles import StaticFiles | |
| from scipy import signal | |
| import soundfile as sf | |
| from datetime import datetime | |
| import threading, time | |
| from team_code import base_model # ton architecture | |
| # ---------------------------- | |
| # CONFIG | |
| # ---------------------------- | |
| SIG_LEN = 32256 | |
| N_FEATURES = 1 | |
| MODEL_PATH = "pretrained_model.h5" | |
| OUTPUT_DIR = "/tmp/audio_results" | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| BASE_URL = "https://stroke-ia-murmur-api.hf.space" # adapte selon ton déploiement | |
| # ---------------------------- | |
| # CHARGEMENT DU MODÈLE | |
| # ---------------------------- | |
| print("[INFO] Chargement du modèle TensorFlow...") | |
| model = base_model(SIG_LEN, N_FEATURES) | |
| model.load_weights(MODEL_PATH) | |
| model.compile() # juste pour être sûr | |
| print("[INFO] Modèle chargé ✅") | |
| # ---------------------------- | |
| # FASTAPI | |
| # ---------------------------- | |
| app = FastAPI(title="Heart Murmur Detection API") | |
| app.mount("/files", StaticFiles(directory=OUTPUT_DIR), name="files") | |
| # ---------------------------- | |
| # FONCTION DE PRÉTRAITEMENT | |
| # ---------------------------- | |
| def preprocess_audio(file_path): | |
| data, sr = sf.read(file_path) | |
| if data.ndim > 1: | |
| data = np.mean(data, axis=1) | |
| resampled = signal.resample(data, SIG_LEN) | |
| return np.expand_dims(resampled, axis=(0, 2)) # (1, sig_len, 1) | |
| # ---------------------------- | |
| # ENDPOINT PRINCIPAL | |
| # ---------------------------- | |
| async def predict_murmur(audio_file: UploadFile = File(...)): | |
| """ | |
| Upload un fichier audio (.wav, .mp3) → renvoie diagnostic + probabilité | |
| """ | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| tmp_path = os.path.join(OUTPUT_DIR, f"{timestamp}_{audio_file.filename}") | |
| with open(tmp_path, "wb") as f: | |
| f.write(await audio_file.read()) | |
| try: | |
| x = preprocess_audio(tmp_path) | |
| pred = float(model.predict(x)[0][0]) | |
| label = "Abnormal" if pred > 0.5 else "Normal" | |
| prob = round(pred, 3) | |
| report_path = os.path.join(OUTPUT_DIR, f"report_{timestamp}.txt") | |
| with open(report_path, "w") as f: | |
| f.write(f"Result: {label}\nProbability: {prob}\n") | |
| return { | |
| "diagnosis": label, | |
| "probability": prob, | |
| "rapport_url": f"{BASE_URL}/files/{os.path.basename(report_path)}", | |
| "message": "✅ Analyse audio terminée." | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| finally: | |
| if os.path.exists(tmp_path): | |
| os.remove(tmp_path) | |
| # ---------------------------- | |
| # AUTO-CLEANUP | |
| # ---------------------------- | |
| def auto_cleanup(interval_minutes=10): | |
| while True: | |
| time.sleep(interval_minutes * 60) | |
| for file in os.listdir(OUTPUT_DIR): | |
| try: | |
| os.remove(os.path.join(OUTPUT_DIR, file)) | |
| print(f"[CLEANUP] Fichier supprimé : {file}") | |
| except Exception as e: | |
| print(f"[CLEANUP] Erreur : {e}") | |
| threading.Thread(target=auto_cleanup, daemon=True).start() | |