# utils/analysis.py from __future__ import annotations import random from datetime import datetime from pathlib import Path from typing import Dict, Tuple, Optional import pandas as pd # ------------------------------------------------- # GLOBALS & LABELS # ------------------------------------------------- # IMPORTANT: This order must match the training/order used for your WBC classifier. CLASS_NAMES = [ "neutrophil", "eosinophil", "basophil", "lymphocyte", "monocyte", "immature_granulocyte", "erythroblast", "platelet", ] # ------------------------------------------------- # CSV RANGE PARSING + REFERENCE HELPERS # ------------------------------------------------- def _parse_range(txt: str | float | int | None) -> Tuple[Optional[float], Optional[float]]: """ Parse a 'low-high' textual range (e.g. '40-70') into (low, high). Returns (None, None) if parsing fails. """ if txt is None or pd.isna(txt): return (None, None) s = str(txt).strip() if not s: return (None, None) s = s.replace("approx.", "") parts = [p.strip() for p in s.split("-") if p.strip()] if len(parts) < 2: return (None, None) try: low = float(parts[0]) high = float(parts[1]) return (low, high) except ValueError: return (None, None) def load_reference( age_group: str = "Adults (18-60y)", gender: Optional[str] = None, csv_path: str | Path = "", ) -> Dict: """ Load a single reference row from the WBC differential reference CSV, filtered by age group and optionally gender. Expected CSV columns (example): - Age Group - Gender - Neutrophils % (Range) - Lymphocytes % (Range) - Monocytes % (Range) - Eosinophils % (Range) - Basophils % (Range) - Immature Granulocytes % - Infection Insights (High) - Infection Insights (Low) """ if not csv_path: raise ValueError("csv_path must be provided to load_reference().") csv_path = Path(csv_path) if not csv_path.exists(): raise FileNotFoundError(f"Reference CSV not found: {csv_path}") df = pd.read_csv(csv_path) sub = df[df["Age Group"].astype(str).str.strip() == age_group] if sub.empty: raise ValueError(f"Age group '{age_group}' not found in reference file.") if gender: g = gender.strip().upper() sub2 = sub[sub["Gender"].astype(str).str.strip().str.upper() == g] if not sub2.empty: sub = sub2 return sub.iloc[0].to_dict() # ------------------------------------------------- # AGE/GENDER HELPERS # ------------------------------------------------- def map_age_to_group(age_years: float) -> str: """ Map a numeric age (in years) to an age-group label as described in the WBC reference CSV. """ if age_years < 0.01: # ~0–3 days return "Newborn (0-3d)" if age_years < 0.1: # ~4–28 days return "Infant (4-28d)" if age_years < 2: # 1m–2y return "Children (1m-2y)" if age_years < 6: return "Children (2-6y)" if age_years < 12: return "Children (6-12y)" if age_years < 18: return "Adolescents (12-18y)" if age_years <= 60: return "Adults (18-60y)" return "Elderly (>60y)" def pick_gender_for_group( age_group: str, csv_path: str | Path, ) -> Optional[str]: """ If gender is unknown, pick a valid gender for that age group from the reference CSV. Returns 'M', 'F', or None. """ csv_path = Path(csv_path) if not csv_path.exists(): return None df = pd.read_csv(csv_path) sub = df[df["Age Group"].astype(str).str.strip() == age_group] if sub.empty: return None genders = ( sub["Gender"] .dropna() .astype(str) .str.strip() .unique() .tolist() ) if not genders: return None # If "M/F" is present, just pick randomly if any("M/F" in g or "M / F" in g for g in genders): return random.choice(["M", "F"]) return random.choice(genders) # ------------------------------------------------- # DIFFERENTIAL & REPORT GENERATION # ------------------------------------------------- def compute_differential_percentages( wbc_subtypes: Dict[str, int], ) -> Dict[str, float]: """ Convert WBC subtype counts to percentages. Returns a dict with percentages, keyed by subtype name. """ total = sum(wbc_subtypes.values()) if wbc_subtypes else 0 if total == 0: return {k: 0.0 for k in wbc_subtypes.keys()} return { k: round((v / total) * 100.0, 1) for k, v in wbc_subtypes.items() } def generate_report_from_ai( ai_result: Dict, age_group: str, gender: Optional[str], csv_path: str | Path, ) -> str: """ Generate a human-readable report text using: - AI-derived results (coarse counts & WBC subtypes) - Reference ranges from CSV for the given age group & gender. ai_result expected keys: - patient_id - coarse_counts: {"WBC": int, "RBC": int, "Platelet": int} - wbc_subtypes: {subtype_name: int} - fovs_analyzed (optional) - calibration (optional, dict with FOV area, constant) - timestamp (optional) """ ref = load_reference(age_group=age_group, gender=gender, csv_path=csv_path) patient_id = ai_result.get("patient_id", "UNKNOWN") ts = ai_result.get("timestamp") or datetime.now().isoformat(timespec="seconds") fovs = ai_result.get("fovs_analyzed", 0) coarse = ai_result.get("coarse_counts", {}) or {} subtypes = ai_result.get("wbc_subtypes", {}) or {} calib = ai_result.get("calibration", {}) or {} fov_area = calib.get("fov_area_mm2") calib_const = calib.get("calibration_constant") total_wbc = coarse.get("WBC", 0) total_rbc = coarse.get("RBC", 0) total_plt = coarse.get("Platelet", 0) classified_total = sum(subtypes.values()) if subtypes else 0 def ai_pct(name: str) -> float: if not classified_total: return 0.0 return round((subtypes.get(name, 0) / classified_total) * 100.0, 1) ai_neut = ai_pct("neutrophil") ai_lymph = ai_pct("lymphocyte") ai_mono = ai_pct("monocyte") ai_eos = ai_pct("eosinophil") ai_baso = ai_pct("basophil") ai_ig = ai_pct("immature_granulocyte") ai_ery = ai_pct("erythroblast") # parse reference ranges from CSV columns ref_neut_lo, ref_neut_hi = _parse_range(ref.get("Neutrophils % (Range)")) ref_lymph_lo, ref_lymph_hi = _parse_range(ref.get("Lymphocytes % (Range)")) ref_mono_lo, ref_mono_hi = _parse_range(ref.get("Monocytes % (Range)")) ref_eos_lo, ref_eos_hi = _parse_range(ref.get("Eosinophils % (Range)")) ref_baso_lo, ref_baso_hi = _parse_range(ref.get("Basophils % (Range)")) ref_ig_txt = str(ref.get("Immature Granulocytes %", "")).lower() # crude check — if "3" is mentioned, set 3% as an upper threshold ref_ig_max = 3.0 if "3" in ref_ig_txt else None high_note = ref.get("Infection Insights (High)", "") low_note = ref.get("Infection Insights (Low)", "") insights: list[str] = [] def check_range(label: str, ai_val: float, lo: Optional[float], hi: Optional[float]): if lo is None or hi is None: return # Above reference range if ai_val > hi and high_note: insights.append( f"- {label} {ai_val}% is above reference ({lo}-{hi}%). {high_note}" ) # Below reference range elif ai_val < lo and low_note: insights.append( f"- {label} {ai_val}% is below reference ({lo}-{hi}%). {low_note}" ) check_range("Neutrophils", ai_neut, ref_neut_lo, ref_neut_hi) check_range("Lymphocytes", ai_lymph, ref_lymph_lo, ref_lymph_hi) check_range("Monocytes", ai_mono, ref_mono_lo, ref_mono_hi) check_range("Eosinophils", ai_eos, ref_eos_lo, ref_eos_hi) check_range("Basophils", ai_baso, ref_baso_lo, ref_baso_hi) if ref_ig_max is not None and ai_ig > ref_ig_max: insights.append( f"- Immature granulocytes {ai_ig}% > allowed ({ref_ig_max}%), " "suggesting left shift or active marrow response. Recommend manual review." ) if ai_ery > 0: insights.append( f"- Erythroblasts detected ({ai_ery}%), unusual in normal peripheral blood → manual review recommended." ) if classified_total < 100: insights.append( f"- Only {classified_total} WBCs classified; differential may be statistically unstable. " "Consider reviewing more fields." ) if not fov_area or not calib_const: insights.append( "- Absolute counts per µL are not reported (FOV area and calibration " "constant not provided). Use results as qualitative screening." ) # Assemble multi-line report lines: list[str] = [] lines.append("AI-Assisted Peripheral Blood Smear Report (Prototype)") lines.append("================================================================") lines.append(f"Patient ID : {patient_id}") lines.append(f"Date/Time : {ts}") lines.append(f"Age Group (ref) : {age_group}") if gender: lines.append(f"Gender (ref) : {gender}") if fovs: lines.append(f"FOVs Analyzed : {fovs}") lines.append("") lines.append("1. Coarse Counts (sum over analyzed fields)") lines.append(f" WBC : {total_wbc}") lines.append(f" RBC : {total_rbc}") lines.append(f" Platelets : {total_plt}") lines.append("") lines.append("2. AI Differential vs Reference") lines.append(f" Neutrophils : {ai_neut}% (ref {ref.get('Neutrophils % (Range)')})") lines.append(f" Lymphocytes : {ai_lymph}% (ref {ref.get('Lymphocytes % (Range)')})") lines.append(f" Monocytes : {ai_mono}% (ref {ref.get('Monocytes % (Range)')})") lines.append(f" Eosinophils : {ai_eos}% (ref {ref.get('Eosinophils % (Range)')})") lines.append(f" Basophils : {ai_baso}% (ref {ref.get('Basophils % (Range)')})") lines.append(f" Imm. granulocytes : {ai_ig}% (ref {ref.get('Immature Granulocytes %')})") lines.append(f" Erythroblasts : {ai_ery}% (no reference range in file)") lines.append("") lines.append("3. Calibration") lines.append(f" FOV area (mm²) : {fov_area if fov_area else 'not provided'}") lines.append(f" Calibration constant : {calib_const if calib_const else 'not provided'}") lines.append("") lines.append("4. AI Insights") if insights: for msg in insights: lines.append(f" {msg}") else: lines.append(" All AI-derived percentages fall within reference ranges for this age group.") lines.append("") lines.append( "Method: YOLO-based detector (RBC/WBC/Platelet) + WBC subtype classifier " "compared against age-/gender-specific reference ranges from CSV.\n" "This is a research prototype and not a substitute for formal lab testing." ) return "\n".join(lines)