| |
| import io |
| import json |
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from tensorflow.keras.models import load_model |
| import numpy as np |
| import librosa |
|
|
| |
| MODEL_PATH = "resp_model.h5" |
| CLASS_NAMES = [ |
| "Bronchiectasis", |
| "Bronchiolitis", |
| "COPD", |
| "Healthy", |
| "Pneumonia", |
| "URTI" |
| ] |
| SR = 22050 |
| N_MFCC = 40 |
| MAX_PAD_LEN = 862 |
| CHUNK_DURATION = 4.0 |
| MIN_CONFIDENCE = 0.5 |
| MAX_FILE_SIZE = 10 * 1024 * 1024 |
| |
|
|
| |
| try: |
| model = load_model(MODEL_PATH) |
| except Exception as e: |
| raise RuntimeError(f"Failed to load model: {e}") |
|
|
| app = FastAPI(title="Respiratory Disease Prediction API") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| def extract_mfcc(audio_bytes): |
| """Extract 40 MFCCs and pad/truncate to 862 timesteps""" |
| try: |
| audio, _ = librosa.load(io.BytesIO(audio_bytes), sr=SR, mono=True, duration=20) |
| mfcc = librosa.feature.mfcc(y=audio, sr=SR, n_mfcc=N_MFCC) |
| if mfcc.shape[1] < MAX_PAD_LEN: |
| mfcc = np.pad(mfcc, ((0, 0), (0, MAX_PAD_LEN - mfcc.shape[1])), mode='constant') |
| else: |
| mfcc = mfcc[:, :MAX_PAD_LEN] |
| return mfcc |
| except Exception as e: |
| raise ValueError(f"Audio processing failed: {e}") |
|
|
| def split_audio_chunks(audio, sr, chunk_duration=CHUNK_DURATION): |
| """Split audio into fixed-duration non-overlapping chunks""" |
| chunk_samples = int(chunk_duration * sr) |
| chunks = [] |
| for i in range(0, len(audio), chunk_samples): |
| chunk = audio[i:i + chunk_samples] |
| if len(chunk) >= sr: |
| chunks.append(chunk) |
| return chunks |
|
|
| def calculate_risk_assessment(disease, symptoms, duration, smoker, severity): |
| """Simple rule-based risk assessment""" |
| risk_score = 0 |
| |
| |
| disease_weights = { |
| "COPD": 3, "Pneumonia": 3, |
| "Bronchiectasis": 2, "Bronchiolitis": 2, |
| "URTI": 1, "Healthy": 0 |
| } |
| risk_score += disease_weights.get(disease, 0) |
| |
| |
| symptom_weights = { |
| "Shortness of Breath": 2, |
| "Wheezing": 1.5, |
| "Chest Pain": 2, |
| "Fever": 1, |
| "Fatigue": 0.5, |
| "Sore Throat": 0.5, |
| "Nasal Congestion": 0.5 |
| } |
| for symptom in symptoms: |
| risk_score += symptom_weights.get(symptom, 0) |
| |
| |
| if duration == "More than a week": |
| risk_score += 2 |
| elif duration == "3-7 days": |
| risk_score += 1 |
| |
| |
| if smoker == "Yes": |
| risk_score += 2 |
| |
| |
| risk_score += severity / 2 |
| |
| |
| if risk_score >= 10: |
| risk_level = "Severe" |
| elif risk_score >= 5: |
| risk_level = "Moderate" |
| else: |
| risk_level = "Mild" |
| |
| return { |
| "risk_level": risk_level, |
| "risk_score": round(risk_score, 2), |
| "message": f"Based on your cough sound and symptoms, your condition is assessed as {risk_level}." |
| } |
|
|
| @app.post("/predict") |
| async def predict_respiratory_disease( |
| file: UploadFile = File(...), |
| symptoms: str = Form("[]"), |
| duration: str = Form("3-7 days"), |
| smoker: str = Form("No"), |
| severity: int = Form(5) |
| ): |
| |
| if not file.filename.lower().endswith(('.wav', '.mp3', '.ogg', '.flac')): |
| raise HTTPException(status_code=400, detail="Only audio files allowed (.wav, .mp3, .ogg, .flac)") |
| |
| |
| audio_bytes = await file.read() |
| if len(audio_bytes) > MAX_FILE_SIZE: |
| raise HTTPException(status_code=400, detail="File too large (>10 MB)") |
| |
| try: |
| |
| audio_buffer = io.BytesIO(audio_bytes) |
| audio, _ = librosa.load(audio_buffer, sr=SR, mono=True) |
| |
| |
| if np.max(np.abs(audio)) < 0.01: |
| return { |
| "disease": "Healthy", |
| "confidence": 0.99, |
| "probabilities": {cls: 0.0 for cls in CLASS_NAMES}, |
| "user_input": { |
| "symptoms": json.loads(symptoms), |
| "duration": duration, |
| "smoker": smoker, |
| "severity": severity |
| }, |
| "assessment": { |
| "risk_level": "Mild", |
| "risk_score": 0.0, |
| "message": "No significant sound detected. Likely healthy." |
| }, |
| "warning": "Silent or very quiet audio detected" |
| } |
| |
| |
| chunks = split_audio_chunks(audio, SR, CHUNK_DURATION) |
| if not chunks: |
| raise HTTPException(status_code=400, detail="Audio too short (<1 second)") |
| |
| |
| predictions = [] |
| for chunk in chunks: |
| mfcc = extract_mfcc(io.BytesIO(librosa.core.audio.to_wav(chunk, sr=SR)).getvalue()) |
| mfcc = np.expand_dims(mfcc, axis=0) |
| mfcc = np.expand_dims(mfcc, axis=-1) |
| pred = model.predict(mfcc, verbose=0)[0] |
| |
| |
| if np.max(pred) >= MIN_CONFIDENCE: |
| predictions.append(pred) |
| |
| if not predictions: |
| return { |
| "disease": "Uncertain", |
| "confidence": 0.0, |
| "probabilities": {cls: 0.0 for cls in CLASS_NAMES}, |
| "user_input": { |
| "symptoms": json.loads(symptoms), |
| "duration": duration, |
| "smoker": smoker, |
| "severity": severity |
| }, |
| "assessment": { |
| "risk_level": "Mild", |
| "risk_score": 0.0, |
| "message": "No clear respiratory pattern detected. Consider re-recording." |
| }, |
| "warning": "All chunks had low confidence" |
| } |
| |
| |
| avg_pred = np.mean(predictions, axis=0) |
| predicted_class = CLASS_NAMES[int(np.argmax(avg_pred))] |
| confidence = float(np.max(avg_pred)) |
| |
| |
| symptoms_list = json.loads(symptoms) |
| |
| |
| assessment = calculate_risk_assessment(predicted_class, symptoms_list, duration, smoker, severity) |
| |
| return { |
| "disease": predicted_class, |
| "confidence": round(confidence, 4), |
| "probabilities": { |
| cls: float(avg_pred[i]) for i, cls in enumerate(CLASS_NAMES) |
| }, |
| "user_input": { |
| "symptoms": symptoms_list, |
| "duration": duration, |
| "smoker": smoker, |
| "severity": severity |
| }, |
| "assessment": assessment, |
| "chunks_analyzed": len(chunks), |
| "usable_chunks": len(predictions) |
| } |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Prediction failed: {str(e)}") |
|
|
| |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |