File size: 11,304 Bytes
2c2fc49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# utils/analysis.py

from __future__ import annotations

import random
from datetime import datetime
from pathlib import Path
from typing import Dict, Tuple, Optional

import pandas as pd

# -------------------------------------------------
# GLOBALS & LABELS
# -------------------------------------------------

# IMPORTANT: This order must match the training/order used for your WBC classifier.
CLASS_NAMES = [
    "neutrophil",
    "eosinophil",
    "basophil",
    "lymphocyte",
    "monocyte",
    "immature_granulocyte",
    "erythroblast",
    "platelet",
]


# -------------------------------------------------
# CSV RANGE PARSING + REFERENCE HELPERS
# -------------------------------------------------

def _parse_range(txt: str | float | int | None) -> Tuple[Optional[float], Optional[float]]:
    """
    Parse a 'low-high' textual range (e.g. '40-70') into (low, high).
    Returns (None, None) if parsing fails.
    """
    if txt is None or pd.isna(txt):
        return (None, None)

    s = str(txt).strip()
    if not s:
        return (None, None)

    s = s.replace("approx.", "")
    parts = [p.strip() for p in s.split("-") if p.strip()]
    if len(parts) < 2:
        return (None, None)

    try:
        low = float(parts[0])
        high = float(parts[1])
        return (low, high)
    except ValueError:
        return (None, None)


def load_reference(
    age_group: str = "Adults (18-60y)",
    gender: Optional[str] = None,
    csv_path: str | Path = "",
) -> Dict:
    """
    Load a single reference row from the WBC differential reference CSV,
    filtered by age group and optionally gender.

    Expected CSV columns (example):
      - Age Group
      - Gender
      - Neutrophils % (Range)
      - Lymphocytes % (Range)
      - Monocytes % (Range)
      - Eosinophils % (Range)
      - Basophils % (Range)
      - Immature Granulocytes %
      - Infection Insights (High)
      - Infection Insights (Low)
    """
    if not csv_path:
        raise ValueError("csv_path must be provided to load_reference().")

    csv_path = Path(csv_path)
    if not csv_path.exists():
        raise FileNotFoundError(f"Reference CSV not found: {csv_path}")

    df = pd.read_csv(csv_path)

    sub = df[df["Age Group"].astype(str).str.strip() == age_group]
    if sub.empty:
        raise ValueError(f"Age group '{age_group}' not found in reference file.")

    if gender:
        g = gender.strip().upper()
        sub2 = sub[sub["Gender"].astype(str).str.strip().str.upper() == g]
        if not sub2.empty:
            sub = sub2

    return sub.iloc[0].to_dict()


# -------------------------------------------------
# AGE/GENDER HELPERS
# -------------------------------------------------

def map_age_to_group(age_years: float) -> str:
    """
    Map a numeric age (in years) to an age-group label
    as described in the WBC reference CSV.
    """
    if age_years < 0.01:   # ~0–3 days
        return "Newborn (0-3d)"
    if age_years < 0.1:    # ~4–28 days
        return "Infant (4-28d)"
    if age_years < 2:      # 1m–2y
        return "Children (1m-2y)"
    if age_years < 6:
        return "Children (2-6y)"
    if age_years < 12:
        return "Children (6-12y)"
    if age_years < 18:
        return "Adolescents (12-18y)"
    if age_years <= 60:
        return "Adults (18-60y)"
    return "Elderly (>60y)"


def pick_gender_for_group(
    age_group: str,
    csv_path: str | Path,
) -> Optional[str]:
    """
    If gender is unknown, pick a valid gender for that age group
    from the reference CSV. Returns 'M', 'F', or None.
    """
    csv_path = Path(csv_path)
    if not csv_path.exists():
        return None

    df = pd.read_csv(csv_path)
    sub = df[df["Age Group"].astype(str).str.strip() == age_group]
    if sub.empty:
        return None

    genders = (
        sub["Gender"]
        .dropna()
        .astype(str)
        .str.strip()
        .unique()
        .tolist()
    )
    if not genders:
        return None

    # If "M/F" is present, just pick randomly
    if any("M/F" in g or "M / F" in g for g in genders):
        return random.choice(["M", "F"])

    return random.choice(genders)


# -------------------------------------------------
# DIFFERENTIAL & REPORT GENERATION
# -------------------------------------------------

def compute_differential_percentages(
    wbc_subtypes: Dict[str, int],
) -> Dict[str, float]:
    """
    Convert WBC subtype counts to percentages.
    Returns a dict with percentages, keyed by subtype name.
    """
    total = sum(wbc_subtypes.values()) if wbc_subtypes else 0
    if total == 0:
        return {k: 0.0 for k in wbc_subtypes.keys()}

    return {
        k: round((v / total) * 100.0, 1)
        for k, v in wbc_subtypes.items()
    }


def generate_report_from_ai(
    ai_result: Dict,
    age_group: str,
    gender: Optional[str],
    csv_path: str | Path,
) -> str:
    """
    Generate a human-readable report text using:
      - AI-derived results (coarse counts & WBC subtypes)
      - Reference ranges from CSV for the given age group & gender.

    ai_result expected keys:
      - patient_id
      - coarse_counts: {"WBC": int, "RBC": int, "Platelet": int}
      - wbc_subtypes: {subtype_name: int}
      - fovs_analyzed (optional)
      - calibration (optional, dict with FOV area, constant)
      - timestamp (optional)
    """

    ref = load_reference(age_group=age_group, gender=gender, csv_path=csv_path)

    patient_id = ai_result.get("patient_id", "UNKNOWN")
    ts = ai_result.get("timestamp") or datetime.now().isoformat(timespec="seconds")

    fovs = ai_result.get("fovs_analyzed", 0)
    coarse = ai_result.get("coarse_counts", {}) or {}
    subtypes = ai_result.get("wbc_subtypes", {}) or {}
    calib = ai_result.get("calibration", {}) or {}

    fov_area = calib.get("fov_area_mm2")
    calib_const = calib.get("calibration_constant")

    total_wbc = coarse.get("WBC", 0)
    total_rbc = coarse.get("RBC", 0)
    total_plt = coarse.get("Platelet", 0)

    classified_total = sum(subtypes.values()) if subtypes else 0

    def ai_pct(name: str) -> float:
        if not classified_total:
            return 0.0
        return round((subtypes.get(name, 0) / classified_total) * 100.0, 1)

    ai_neut = ai_pct("neutrophil")
    ai_lymph = ai_pct("lymphocyte")
    ai_mono = ai_pct("monocyte")
    ai_eos = ai_pct("eosinophil")
    ai_baso = ai_pct("basophil")
    ai_ig = ai_pct("immature_granulocyte")
    ai_ery = ai_pct("erythroblast")

    # parse reference ranges from CSV columns
    ref_neut_lo, ref_neut_hi = _parse_range(ref.get("Neutrophils % (Range)"))
    ref_lymph_lo, ref_lymph_hi = _parse_range(ref.get("Lymphocytes % (Range)"))
    ref_mono_lo, ref_mono_hi = _parse_range(ref.get("Monocytes % (Range)"))
    ref_eos_lo, ref_eos_hi = _parse_range(ref.get("Eosinophils % (Range)"))
    ref_baso_lo, ref_baso_hi = _parse_range(ref.get("Basophils % (Range)"))

    ref_ig_txt = str(ref.get("Immature Granulocytes %", "")).lower()
    # crude check — if "3" is mentioned, set 3% as an upper threshold
    ref_ig_max = 3.0 if "3" in ref_ig_txt else None

    high_note = ref.get("Infection Insights (High)", "")
    low_note = ref.get("Infection Insights (Low)", "")

    insights: list[str] = []

    def check_range(label: str, ai_val: float, lo: Optional[float], hi: Optional[float]):
        if lo is None or hi is None:
            return
        # Above reference range
        if ai_val > hi and high_note:
            insights.append(
                f"- {label} {ai_val}% is above reference ({lo}-{hi}%). {high_note}"
            )
        # Below reference range
        elif ai_val < lo and low_note:
            insights.append(
                f"- {label} {ai_val}% is below reference ({lo}-{hi}%). {low_note}"
            )

    check_range("Neutrophils", ai_neut, ref_neut_lo, ref_neut_hi)
    check_range("Lymphocytes", ai_lymph, ref_lymph_lo, ref_lymph_hi)
    check_range("Monocytes", ai_mono, ref_mono_lo, ref_mono_hi)
    check_range("Eosinophils", ai_eos, ref_eos_lo, ref_eos_hi)
    check_range("Basophils", ai_baso, ref_baso_lo, ref_baso_hi)

    if ref_ig_max is not None and ai_ig > ref_ig_max:
        insights.append(
            f"- Immature granulocytes {ai_ig}% > allowed ({ref_ig_max}%), "
            "suggesting left shift or active marrow response. Recommend manual review."
        )

    if ai_ery > 0:
        insights.append(
            f"- Erythroblasts detected ({ai_ery}%), unusual in normal peripheral blood → manual review recommended."
        )

    if classified_total < 100:
        insights.append(
            f"- Only {classified_total} WBCs classified; differential may be statistically unstable. "
            "Consider reviewing more fields."
        )

    if not fov_area or not calib_const:
        insights.append(
            "- Absolute counts per µL are not reported (FOV area and calibration "
            "constant not provided). Use results as qualitative screening."
        )

    # Assemble multi-line report
    lines: list[str] = []
    lines.append("AI-Assisted Peripheral Blood Smear Report (Prototype)")
    lines.append("================================================================")
    lines.append(f"Patient ID       : {patient_id}")
    lines.append(f"Date/Time        : {ts}")
    lines.append(f"Age Group (ref)  : {age_group}")
    if gender:
        lines.append(f"Gender (ref)     : {gender}")
    if fovs:
        lines.append(f"FOVs Analyzed    : {fovs}")
    lines.append("")

    lines.append("1. Coarse Counts (sum over analyzed fields)")
    lines.append(f"   WBC       : {total_wbc}")
    lines.append(f"   RBC       : {total_rbc}")
    lines.append(f"   Platelets : {total_plt}")
    lines.append("")

    lines.append("2. AI Differential vs Reference")
    lines.append(f"   Neutrophils       : {ai_neut}%   (ref {ref.get('Neutrophils % (Range)')})")
    lines.append(f"   Lymphocytes       : {ai_lymph}%  (ref {ref.get('Lymphocytes % (Range)')})")
    lines.append(f"   Monocytes         : {ai_mono}%   (ref {ref.get('Monocytes % (Range)')})")
    lines.append(f"   Eosinophils       : {ai_eos}%    (ref {ref.get('Eosinophils % (Range)')})")
    lines.append(f"   Basophils         : {ai_baso}%   (ref {ref.get('Basophils % (Range)')})")
    lines.append(f"   Imm. granulocytes : {ai_ig}%     (ref {ref.get('Immature Granulocytes %')})")
    lines.append(f"   Erythroblasts     : {ai_ery}%     (no reference range in file)")
    lines.append("")

    lines.append("3. Calibration")
    lines.append(f"   FOV area (mm²)         : {fov_area if fov_area else 'not provided'}")
    lines.append(f"   Calibration constant   : {calib_const if calib_const else 'not provided'}")
    lines.append("")

    lines.append("4. AI Insights")
    if insights:
        for msg in insights:
            lines.append(f"   {msg}")
    else:
        lines.append("   All AI-derived percentages fall within reference ranges for this age group.")
    lines.append("")
    lines.append(
        "Method: YOLO-based detector (RBC/WBC/Platelet) + WBC subtype classifier "
        "compared against age-/gender-specific reference ranges from CSV.\n"
        "This is a research prototype and not a substitute for formal lab testing."
    )

    return "\n".join(lines)