File size: 19,202 Bytes
c4035de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
"""Helper functions for model loading and prediction."""

import os
import subprocess
import sys
import warnings
import joblib
import pandas as pd
from sklearn.exceptions import InconsistentVersionWarning

BASE_DIR    = os.path.dirname(os.path.abspath(__file__))
MODELS_DIR  = os.path.join(BASE_DIR, "..", "models")
ML_DIR      = os.path.abspath(os.path.join(BASE_DIR, ".."))
DISEASES    = ("heart", "diabetes", "cholesterol")

def load_all_models():
    """Load all models + scalers + feature metadata at startup."""
    loaded = {}
    load_errors = {}

    for d in DISEASES:
        model_path   = os.path.join(MODELS_DIR, f"{d}_model.joblib")
        scaler_path  = os.path.join(MODELS_DIR, f"{d}_scaler.joblib")
        feat_path    = os.path.join(MODELS_DIR, f"{d}_features.joblib")

        try:
            with warnings.catch_warnings():
                warnings.simplefilter("error", InconsistentVersionWarning)
                loaded[f"{d}_model"] = joblib.load(model_path)
                loaded[f"{d}_scaler"] = joblib.load(scaler_path)
                loaded[f"{d}_features"] = joblib.load(feat_path)
            print(f"   {d.capitalize()} model loaded")
        except FileNotFoundError:
            print(f"   {d.capitalize()} model not found at {model_path}")
            print(f"       Run: python ml/train_{d}.py first")
            loaded[f"{d}_model"]    = None
            loaded[f"{d}_scaler"]   = None
            loaded[f"{d}_features"] = None
            load_errors[d] = "model files not found"
        except InconsistentVersionWarning as exc:
            print(f"   {d.capitalize()} model version mismatch detected")
            print(f"       {exc}")
            loaded[f"{d}_model"] = None
            loaded[f"{d}_scaler"] = None
            loaded[f"{d}_features"] = None
            load_errors[d] = f"incompatible artifact: {exc}"
        except Exception as exc:
            print(f"   {d.capitalize()} model failed to load")
            print(f"       {type(exc).__name__}: {exc}")
            loaded[f"{d}_model"]    = None
            loaded[f"{d}_scaler"]   = None
            loaded[f"{d}_features"] = None
            load_errors[d] = f"{type(exc).__name__}: {exc}"

    loaded["_load_errors"] = load_errors
    return loaded


def get_missing_or_failed_diseases(models: dict) -> list[str]:
    """Return diseases with missing or failed model bundles."""
    unavailable = []
    for disease in DISEASES:
        if (
            models.get(f"{disease}_model") is None
            or models.get(f"{disease}_scaler") is None
            or models.get(f"{disease}_features") is None
        ):
            unavailable.append(disease)
    return unavailable


def retrain_models(diseases: list[str] | tuple[str, ...] | None = None) -> None:
    """Rebuild one or more model bundles using the current runtime dependencies."""
    diseases_to_train = list(diseases or DISEASES)

    for disease in diseases_to_train:
        script_path = os.path.join(ML_DIR, f"train_{disease}.py")
        print(f"   Rebuilding {disease} model from {script_path}")

        completed = subprocess.run(
            [sys.executable, script_path],
            cwd=ML_DIR,
            check=False,
            capture_output=True,
            text=True,
        )

        if completed.stdout:
            print(completed.stdout)
        if completed.stderr:
            print(completed.stderr)

        if completed.returncode != 0:
            raise RuntimeError(
                f"Training script failed for {disease} with exit code {completed.returncode}"
            )


def ensure_models_ready(auto_rebuild: bool = True) -> dict:
    """Load models and rebuild them automatically when artifacts are incompatible."""
    models = load_all_models()
    failed_diseases = get_missing_or_failed_diseases(models)

    if not failed_diseases or not auto_rebuild:
        return models

    print("Some model artifacts are missing or incompatible with this runtime.")
    print(f"   Affected diseases: {', '.join(failed_diseases)}")
    print("   Attempting to rebuild all models using the current environment...")

    retrain_models()
    models = load_all_models()

    remaining_failures = get_missing_or_failed_diseases(models)
    if remaining_failures:
        raise RuntimeError(
            "Model rebuild completed but some models are still unavailable: "
            + ", ".join(remaining_failures)
        )

    print("Model rebuild complete. All models loaded successfully.")
    return models

def validate_input(data: dict, required_fields: list) -> str | None:
    """Returns error string if validation fails, None if OK."""
    for field in required_fields:
        if field not in data or data[field] is None or data[field] == "":
            return f"Field '{field}' wajib diisi"
    return None


def get_risk_level(probability: float) -> dict:
    """Convert probability (0-1) to risk level label + color."""
    pct = round(probability * 100, 1)
    if probability <= 0.30:
        return {"risk_percent": pct, "risk_level": "Rendah",  "risk_color": "green"}
    elif probability <= 0.60:
        return {"risk_percent": pct, "risk_level": "Sedang",  "risk_color": "yellow"}
    else:
        return {"risk_percent": pct, "risk_level": "Tinggi",  "risk_color": "red"}


def get_top_factors(feature_importance: dict, top_n: int = 5) -> list:
    """Return top N most important features as human-readable labels."""
    sorted_feats = sorted(
        feature_importance.items(),
        key=lambda x: x[1]["importance"],
        reverse=True
    )
    return [data["label_id"] for _, data in sorted_feats[:top_n]]


MODIFIERS = {
    # (field, value_check_fn): probability_delta
    "family_history_yes":   +0.05,
    "smoking_active":       +0.08,
    "smoking_occasional":   +0.03,
    "diet_fat_daily":       +0.05,
    "diet_fat_frequent":    +0.03,
    "diet_sweet_daily":     +0.05,
    "diet_sweet_frequent":  +0.02,
    "exercise_never":       +0.05,
    "alcohol_frequent":     +0.03,
}

def apply_lifestyle_modifiers(base_prob: float, data: dict) -> float:
    """Add lifestyle risk modifiers not present in training dataset."""
    delta = 0.0

    # Family history
    if str(data.get("family_history", "tidak")).lower() in ("ya", "yes", "1", "true"):
        delta += MODIFIERS["family_history_yes"]

    # Smoking
    smoking = str(data.get("smoking", "tidak")).lower()
    if smoking in ("aktif setiap hari", "active", "yes", "ya", "1", "true"):
        delta += MODIFIERS["smoking_active"]
    elif smoking in ("kadang-kadang", "occasional"):
        delta += MODIFIERS["smoking_occasional"]

    # Diet fat (for cholesterol & heart)
    diet_fat = str(data.get("diet_fat", "jarang")).lower()
    if diet_fat in ("setiap hari", "daily"):
        delta += MODIFIERS["diet_fat_daily"]
    elif diet_fat in ("3-5x seminggu", "frequent"):
        delta += MODIFIERS["diet_fat_frequent"]

    # Diet sweet (for diabetes)
    diet_sweet = str(data.get("diet_sweet", "jarang")).lower()
    if diet_sweet in ("setiap hari", "daily"):
        delta += MODIFIERS["diet_sweet_daily"]
    elif diet_sweet in ("3-5x seminggu", "frequent"):
        delta += MODIFIERS["diet_sweet_frequent"]

    # Exercise
    exercise = str(data.get("exercise_freq", "jarang")).lower()
    if exercise in ("tidak pernah", "never", "0"):
        delta += MODIFIERS["exercise_never"]

    # Alcohol
    alcohol = str(data.get("alcohol", "tidak")).lower()
    if alcohol in ("sering", "frequent", "yes"):
        delta += MODIFIERS["alcohol_frequent"]

    # Cap between 0 and 1
    return min(1.0, max(0.0, base_prob + delta))


MEDIANS = {
    # Heart
    "trestbps":  122.0,
    "chol":      246.0,
    "thalach":   149.0,
    "oldpeak":   1.0,
    "restecg":   0.0,
    "slope":     1.0,
    "ca":        0.0,
    "thal":      2.0,
    # Diabetes
    "BloodPressure":  72.0,
    "SkinThickness":  29.0,
    "Insulin":        125.0,
    "DiabetesPedigreeFunction": 0.47,
    # Cholesterol
    "bmi_default":  27.5,
}

def safe_float(val, fallback: float) -> float:
    """Convert to float, or return fallback if unknown/invalid."""
    if val is None:
        return fallback
    s = str(val).lower().strip()
    if s in ("tidak tahu", "unknown", "", "none", "null"):
        return fallback
    try:
        return float(val)
    except (ValueError, TypeError):
        return fallback

def sex_to_int(val) -> int:
    """Convert sex field to 0/1 (0=female, 1=male)."""
    s = str(val).lower().strip()
    if s in ("laki-laki", "male", "l", "1"):
        return 1
    return 0

def fbs_to_int(val) -> int:
    """Fasting blood sugar > 120 mg/dL β†’ 1, else 0."""
    s = str(val).lower().strip()
    if s in ("ya", "yes", "1", "true"):
        return 1
    return 0

def cp_to_int(val) -> int:
    """Chest pain type 0-3."""
    cp_map = {
        "tidak pernah": 0, "none": 0,
        "nyeri ringan": 1, "mild": 1,
        "nyeri sedang": 2, "moderate": 2,
        "nyeri berat": 3, "severe": 3,
    }
    s = str(val).lower().strip()
    return cp_map.get(s, safe_float(val, 0))

def exang_to_int(val) -> int:
    """Exercise induced angina 0/1."""
    s = str(val).lower().strip()
    if s in ("ya", "yes", "1", "true"):
        return 1
    return 0

def bmi_from_input(data: dict) -> float:
    """Get BMI from direct input or calculate from weight/height."""
    if "bmi" in data and data["bmi"]:
        return safe_float(data["bmi"], MEDIANS["bmi_default"])
    if "weight_kg" in data and "height_cm" in data:
        w = safe_float(data["weight_kg"], 65)
        h = safe_float(data["height_cm"], 165) / 100
        if h > 0:
            return round(w / (h * h), 1)
    return MEDIANS["bmi_default"]

def pregnancies_from_input(data: dict) -> int:
    """Get pregnancies count; 0 for males."""
    if sex_to_int(data.get("sex", "perempuan")) == 1:
        return 0  # male
    return int(safe_float(data.get("pregnancies", 0), 0))


def build_feature_frame(feature_names: list, feature_values: list) -> pd.DataFrame:
    """Build a single-row DataFrame to preserve training feature names."""
    return pd.DataFrame([feature_values], columns=feature_names)


def predict_scaled_probability(model, scaler, feature_names: list, feature_values: list) -> float:
    """Run scaler + model prediction with consistent feature names."""
    feature_frame = build_feature_frame(feature_names, feature_values)
    scaled_features = scaler.transform(feature_frame)
    return float(model.predict_proba(scaled_features)[0][1])


# ─────────────────────────────────────────────
# 7. PREDICT HEART
# ─────────────────────────────────────────────
def predict_heart(data: dict, models: dict) -> dict:
    model    = models["heart_model"]
    scaler   = models["heart_scaler"]
    meta     = models["heart_features"]

    if model is None:
        raise RuntimeError("Heart model not loaded. Run train_heart.py first.")

    feature_names = meta["feature_names"]

    # Build feature vector matching training feature order
    feature_values = []
    defaults = {
        "age": 25, "sex": 1, "cp": 0, "trestbps": 122, "chol": 246,
        "fbs": 0, "restecg": 0, "thalach": 149, "exang": 0,
        "oldpeak": 1.0, "slope": 1, "ca": 0, "thal": 2
    }

    for feat in feature_names:
        if feat == "age":
            feature_values.append(safe_float(data.get("age"), defaults["age"]))
        elif feat == "sex":
            feature_values.append(sex_to_int(data.get("sex", "laki-laki")))
        elif feat == "cp":
            feature_values.append(cp_to_int(data.get("cp", 0)))
        elif feat == "trestbps":
            feature_values.append(safe_float(data.get("trestbps"), MEDIANS["trestbps"]))
        elif feat == "chol":
            feature_values.append(safe_float(data.get("chol"), MEDIANS["chol"]))
        elif feat == "fbs":
            feature_values.append(fbs_to_int(data.get("fbs", "tidak")))
        elif feat == "restecg":
            feature_values.append(safe_float(data.get("restecg"), MEDIANS["restecg"]))
        elif feat in ("thalach", "thalch"):
            feature_values.append(safe_float(data.get("thalach"), MEDIANS["thalach"]))
        elif feat == "exang":
            feature_values.append(exang_to_int(data.get("exang", "tidak")))
        elif feat == "oldpeak":
            feature_values.append(safe_float(data.get("oldpeak"), MEDIANS["oldpeak"]))
        elif feat == "slope":
            feature_values.append(safe_float(data.get("slope"), MEDIANS["slope"]))
        elif feat == "ca":
            feature_values.append(safe_float(data.get("ca"), MEDIANS["ca"]))
        elif feat == "thal":
            feature_values.append(safe_float(data.get("thal"), MEDIANS["thal"]))
        else:
            feature_values.append(defaults.get(feat, 0))

    base_prob = predict_scaled_probability(model, scaler, feature_names, feature_values)

    # Apply lifestyle modifiers
    final_prob = apply_lifestyle_modifiers(base_prob, data)
    risk = get_risk_level(final_prob)
    top_factors = get_top_factors(meta["feature_importance"])

    return {
        "disease": "Penyakit Jantung",
        "risk_percent": risk["risk_percent"],
        "risk_level": risk["risk_level"],
        "risk_color": risk["risk_color"],
        "top_factors": top_factors,
        "base_probability": round(base_prob * 100, 1),
        "lifestyle_adjustment": round((final_prob - base_prob) * 100, 1),
        "disclaimer": "Hasil ini adalah estimasi berdasarkan model Machine Learning dan tidak menggantikan diagnosis dokter."
    }


# ─────────────────────────────────────────────
# 8. PREDICT DIABETES
# ─────────────────────────────────────────────
def predict_diabetes(data: dict, models: dict) -> dict:
    model    = models["diabetes_model"]
    scaler   = models["diabetes_scaler"]
    meta     = models["diabetes_features"]

    if model is None:
        raise RuntimeError("Diabetes model not loaded. Run train_diabetes.py first.")

    feature_names = meta["feature_names"]
    bmi = bmi_from_input(data)
    pregnancies = pregnancies_from_input(data)

    feature_values = []
    for feat in feature_names:
        if feat == "Pregnancies":
            feature_values.append(pregnancies)
        elif feat == "Glucose":
            feature_values.append(safe_float(data.get("glucose"), 120))
        elif feat == "BloodPressure":
            feature_values.append(safe_float(data.get("blood_pressure"), MEDIANS["BloodPressure"]))
        elif feat == "SkinThickness":
            feature_values.append(safe_float(data.get("skin_thickness"), MEDIANS["SkinThickness"]))
        elif feat == "Insulin":
            feature_values.append(safe_float(data.get("insulin"), MEDIANS["Insulin"]))
        elif feat == "BMI":
            feature_values.append(bmi)
        elif feat == "DiabetesPedigreeFunction":
            # Map family history (Ya/Tidak) β†’ DiabetesPedigreeFunction proxy
            fam = str(data.get("family_history", "tidak")).lower()
            if fam in ("ya", "yes", "1"):
                feature_values.append(0.80)   # higher pedigree function β†’ higher risk
            elif fam in ("tidak tahu", "unknown"):
                feature_values.append(MEDIANS["DiabetesPedigreeFunction"])
            else:
                feature_values.append(0.25)   # low pedigree
        elif feat == "Age":
            feature_values.append(safe_float(data.get("age"), 25))
        else:
            feature_values.append(0)

    base_prob = predict_scaled_probability(model, scaler, feature_names, feature_values)
    final_prob = apply_lifestyle_modifiers(base_prob, data)
    risk = get_risk_level(final_prob)
    top_factors = get_top_factors(meta["feature_importance"])

    return {
        "disease": "Diabetes",
        "risk_percent": risk["risk_percent"],
        "risk_level": risk["risk_level"],
        "risk_color": risk["risk_color"],
        "top_factors": top_factors,
        "base_probability": round(base_prob * 100, 1),
        "lifestyle_adjustment": round((final_prob - base_prob) * 100, 1),
        "bmi_used": bmi,
        "disclaimer": "Hasil ini adalah estimasi berdasarkan model Machine Learning dan tidak menggantikan diagnosis dokter."
    }


# ─────────────────────────────────────────────
# 9. PREDICT CHOLESTEROL
# ─────────────────────────────────────────────
def predict_cholesterol(data: dict, models: dict) -> dict:
    model    = models["cholesterol_model"]
    scaler   = models["cholesterol_scaler"]
    meta     = models["cholesterol_features"]

    if model is None:
        raise RuntimeError("Cholesterol model not loaded. Run train_cholesterol.py first.")

    feature_names = meta["feature_names"]

    feature_values = []
    for feat in feature_names:
        if feat == "age":
            feature_values.append(safe_float(data.get("age"), 25))
        elif feat == "sex":
            feature_values.append(sex_to_int(data.get("sex", "laki-laki")))
        elif feat == "cp":
            feature_values.append(cp_to_int(data.get("cp", 0)))
        elif feat == "trestbps":
            feature_values.append(safe_float(data.get("trestbps"), MEDIANS["trestbps"]))
        elif feat == "fbs":
            feature_values.append(fbs_to_int(data.get("fbs", "tidak")))
        elif feat == "restecg":
            feature_values.append(safe_float(data.get("restecg"), MEDIANS["restecg"]))
        elif feat == "thalch":
            feature_values.append(safe_float(data.get("thalach"), MEDIANS["thalach"]))
        elif feat == "exang":
            feature_values.append(exang_to_int(data.get("exang", "tidak")))
        elif feat == "oldpeak":
            feature_values.append(safe_float(data.get("oldpeak"), MEDIANS["oldpeak"]))
        elif feat == "slope":
            feature_values.append(safe_float(data.get("slope"), MEDIANS["slope"]))
        else:
            feature_values.append(0)

    base_prob = predict_scaled_probability(model, scaler, feature_names, feature_values)

    # Apply lifestyle modifiers
    final_prob = apply_lifestyle_modifiers(base_prob, data)
    risk = get_risk_level(final_prob)
    top_factors = get_top_factors(meta["feature_importance"])

    return {
        "disease": "Kolesterol Tinggi",
        "risk_percent": risk["risk_percent"],
        "risk_level": risk["risk_level"],
        "risk_color": risk["risk_color"],
        "top_factors": top_factors,
        "base_probability": round(base_prob * 100, 1),
        "lifestyle_adjustment": round((final_prob - base_prob) * 100, 1),
        "disclaimer": "Hasil ini adalah estimasi berdasarkan model Machine Learning dan tidak menggantikan diagnosis dokter."
    }