| | """ |
| | normative_calculator.py - v2 |
| | |
| | Utility functions for computing z-scores and percentiles for any biomarker |
| | contained in *Table_1_summary_measure.xlsx*. |
| | |
| | |
| | |
| | Author: Lars Masanneck 06-05-2025 |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import math |
| | import pathlib |
| | import warnings |
| | from typing import Dict, Iterable, List, Sequence, Union |
| |
|
| | import pandas as pd |
| | from scipy import stats |
| | from datetime import datetime |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | __all__ = [ |
| | "load_normative_table", |
| | "compute_normative_position", |
| | "add_normative_columns", |
| | "categorize_bmi", |
| | "compute_skew_corrected_position", |
| | ] |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | _BMI_BOUNDS: List[tuple[float, float, str]] = [ |
| | (0, 18.5, "Underweight"), |
| | (18.5, 25, "Healthy"), |
| | (25, 30, "Overweight"), |
| | (30, math.inf, "Obesity"), |
| | ] |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _categorize(value: float, bounds: Sequence[tuple]) -> str: |
| | """Return category *label* for *value* given (lower, upper, label) tuples.""" |
| | for lower, upper, label in bounds: |
| | if lower <= value < upper: |
| | return label |
| | raise ValueError(f"{value} outside defined bounds.") |
| |
|
| |
|
| | def categorize_bmi(bmi: Union[str, float]) -> str: |
| | """Map numeric BMI to the table's BMI category strings.""" |
| | if isinstance(bmi, str): |
| | return bmi.strip().capitalize() |
| | return _categorize(float(bmi), _BMI_BOUNDS) |
| |
|
| |
|
| | def _categorize_age(age: Union[str, int], normative_df: pd.DataFrame) -> str: |
| | """Return an age‐group string for a numeric age, or pass through if already a string.""" |
| | if isinstance(age, str): |
| | return age.strip() |
| | for grp in normative_df["Age"].unique(): |
| | grp = grp.strip() |
| | if "-" in grp: |
| | lo, hi = grp.split("-", 1) |
| | try: |
| | lo_i, hi_i = int(lo), int(hi) |
| | except ValueError: |
| | continue |
| | if lo_i <= age <= hi_i: |
| | return grp |
| | elif grp.endswith("+"): |
| | try: |
| | lo_i = int(grp[:-1]) |
| | except ValueError: |
| | continue |
| | if age >= lo_i: |
| | return grp |
| | raise ValueError(f"No normative age group found for age {age!r}.") |
| |
|
| |
|
| | def load_normative_table(path): |
| | path = pathlib.Path(path) |
| | if not path.exists(): |
| | raise FileNotFoundError(path) |
| | |
| | str_cols = ["Age", "area", "gender", "Bmi", "Biomarkers", "nb_category"] |
| | |
| | float_cols = [ |
| | "min", |
| | "max", |
| | "median", |
| | "q1", |
| | "q3", |
| | "iqr", |
| | "mad", |
| | "mean", |
| | "sd", |
| | "se", |
| | "ci", |
| | ] |
| |
|
| | def parse_num(x): |
| | |
| | if isinstance(x, datetime): |
| | |
| | |
| | if x.year > datetime.now().year: |
| | return x.year + x.month / 100 |
| | |
| | |
| | return x.day + x.month / 100 |
| | |
| | try: |
| | return float(x) |
| | except Exception: |
| | return pd.NA |
| |
|
| | |
| | converters = {col: str for col in str_cols} |
| | converters.update({col: parse_num for col in float_cols}) |
| |
|
| | |
| | if path.suffix.lower() == ".csv": |
| | df = pd.read_csv(path, converters=converters) |
| | else: |
| | df = pd.read_excel(path, converters=converters) |
| |
|
| | |
| | for c in str_cols: |
| | df[c] = df[c].astype(str) |
| | df.columns = df.columns.str.strip() |
| |
|
| | return df |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _extract_stats( |
| | normative_df: pd.DataFrame, |
| | biomarker: str, |
| | age_group: str, |
| | region: str, |
| | gender: str, |
| | bmi_category: str, |
| | ) -> Dict[str, Union[float, str]]: |
| | """Return all summary statistics for the requested stratum.""" |
| | mask = ( |
| | (normative_df["Biomarkers"].str.lower() == biomarker.lower()) |
| | & (normative_df["Age"].str.lower() == age_group.lower()) |
| | & (normative_df["area"].str.lower() == region.lower()) |
| | & (normative_df["gender"].str.lower() == gender.lower()) |
| | & (normative_df["Bmi"].str.lower() == bmi_category.lower()) |
| | ) |
| | subset = normative_df.loc[mask] |
| | if subset.empty: |
| | raise KeyError("No normative stats found for the specified stratum.") |
| | if len(subset) > 1: |
| | warnings.warn( |
| | "Multiple normative rows found; using the first one (check your table)." |
| | ) |
| | row = subset.iloc[0] |
| | |
| | n_col = "nb_category" if "nb_category" in row else "n" |
| | n_raw = row[n_col] |
| | n = str(row[n_col]) |
| |
|
| | return { |
| | "median": float(row["median"]), |
| | "q1": float(row["q1"]), |
| | "q3": float(row["q3"]), |
| | "iqr": float(row["iqr"]), |
| | "mad": float(row["mad"]), |
| | "mean": float(row["mean"]), |
| | "sd": float(row["sd"]), |
| | "se": float(row["se"]), |
| | "ci": float(row["ci"]), |
| | "n": n, |
| | } |
| |
|
| |
|
| | def z_score(value: float, mean: float, sd: float) -> float: |
| | """Compute z-score; returns NaN if SD is 0.""" |
| | if sd == 0: |
| | return float("nan") |
| | return (value - mean) / sd |
| |
|
| |
|
| | def percentile_from_z(z: float) -> float: |
| | """Convert z-score to percentile (0-100).""" |
| | return float(stats.norm.cdf(z) * 100) |
| |
|
| |
|
| | def compute_normative_position( |
| | *, |
| | value: float, |
| | biomarker: str, |
| | age_group: Union[str, int], |
| | region: str, |
| | gender: str, |
| | bmi: Union[str, float], |
| | normative_df: pd.DataFrame, |
| | ) -> Dict[str, Union[float, str]]: |
| | """ |
| | Compute where a single measurement falls relative to a normative distribution. |
| | |
| | Parameters |
| | ---------- |
| | value : float |
| | Raw measurement for the specified biomarker. |
| | biomarker : str |
| | Name of the biomarker (must match a value in the "Biomarkers" column |
| | of `normative_df`). |
| | age_group : Union[str, int] |
| | Either: |
| | - A string age-group label (e.g. "40-49") matching `normative_df["Age"]`, or |
| | - An integer age, which will be mapped into the correct age-group bracket. |
| | region : str |
| | Region name matching `normative_df["area"]` (case-insensitive). |
| | gender : str |
| | Gender label matching `normative_df["gender"]` (case-insensitive). |
| | bmi : Union[str, float] |
| | Either: |
| | - A string BMI category (e.g. "Healthy"), or |
| | - A numeric BMI value, which will be bucketed into WHO categories. |
| | normative_df : pd.DataFrame |
| | Table of normative summary statistics as returned by `load_normative_table`. |
| | |
| | Returns |
| | ------- |
| | Dict[str, Union[float, str]] |
| | A dictionary containing: |
| | - "z_score" (float): the computed z-score, |
| | - "percentile" (float): the percentile (0–100), |
| | - "mean" (float): the normative mean, |
| | - "sd" (float): the normative standard deviation, |
| | - "n" (str): the sample-size category string from the normative table. |
| | - "median" (float): the normative median, |
| | - "q1" (float): the first quartile, |
| | - "q3" (float): the third quartile, |
| | - "iqr" (float): the interquartile range, |
| | - "mad" (float): the median absolute deviation, |
| | - "se" (float): the standard error, |
| | - "ci" (float): the confidence interval. |
| | |
| | Raises |
| | ------ |
| | KeyError |
| | If no matching stratum is found in `normative_df`. |
| | ValueError |
| | If an integer `age_group` cannot be mapped to any age bracket. |
| | """ |
| | |
| | age_group_str = _categorize_age(age_group, normative_df) |
| | bmi_cat = categorize_bmi(bmi) |
| | stats_d = _extract_stats( |
| | normative_df=normative_df, |
| | biomarker=biomarker, |
| | age_group=age_group_str, |
| | region=region, |
| | gender=gender, |
| | bmi_category=bmi_cat, |
| | ) |
| | z = z_score(value, stats_d["mean"], stats_d["sd"]) |
| | pct = percentile_from_z(z) |
| | return { |
| | "z_score": z, |
| | "percentile": pct, |
| | "mean": stats_d["mean"], |
| | "sd": stats_d["sd"], |
| | "n": stats_d["n"], |
| | "median": stats_d["median"], |
| | "q1": stats_d["q1"], |
| | "q3": stats_d["q3"], |
| | "iqr": stats_d["iqr"], |
| | "mad": stats_d["mad"], |
| | "se": stats_d["se"], |
| | "ci": stats_d["ci"], |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _compute_for_row( |
| | row: pd.Series, |
| | biomarker: str, |
| | normative_df: pd.DataFrame, |
| | age_col: str, |
| | region_col: str, |
| | gender_col: str, |
| | bmi_col: str, |
| | value_col: str, |
| | ): |
| | try: |
| | res = compute_normative_position( |
| | value=row[value_col], |
| | biomarker=biomarker, |
| | age_group=row[age_col], |
| | region=row[region_col], |
| | gender=row[gender_col], |
| | bmi=row[bmi_col], |
| | normative_df=normative_df, |
| | ) |
| | return pd.Series( |
| | [res["z_score"], res["percentile"]], |
| | index=[f"{biomarker}_z", f"{biomarker}_pct"], |
| | ) |
| | except Exception as exc: |
| | warnings.warn(str(exc)) |
| | return pd.Series( |
| | [float("nan"), float("nan")], index=[f"{biomarker}_z", f"{biomarker}_pct"] |
| | ) |
| |
|
| |
|
| | def add_normative_columns( |
| | df: pd.DataFrame, |
| | *, |
| | biomarkers: Iterable[str], |
| | normative_df: pd.DataFrame, |
| | age_col: str = "Age", |
| | region_col: str = "area", |
| | gender_col: str = "gender", |
| | bmi_col: str = "Bmi", |
| | value_cols: dict[str, str] | None = None, |
| | output_prefixes: dict[str, str] | None = None, |
| | ) -> pd.DataFrame: |
| | """ |
| | Append z-score and percentile columns for multiple biomarkers, with optional |
| | custom prefixes for the output column names. |
| | |
| | Parameters |
| | ---------- |
| | df : pd.DataFrame |
| | Participant-level data, must include demographic columns and raw biomarker |
| | values. |
| | biomarkers : Iterable[str] |
| | List of biomarker names to process. |
| | normative_df : pd.DataFrame |
| | Normative summary table as loaded by `load_normative_table`. |
| | age_col : str, default "Age" |
| | Column in `df` containing age-group labels or integer ages. |
| | region_col : str, default "area" |
| | Column in `df` matching the "area" field in `normative_df`. |
| | gender_col : str, default "gender" |
| | Column in `df` matching the "gender" field in `normative_df`. |
| | bmi_col : str, default "Bmi" |
| | Column in `df` containing BMI values or categories. |
| | value_cols : dict[str, str], optional |
| | Mapping from each biomarker name to the column in `df` that holds its |
| | raw numeric value. Defaults to identity mapping. |
| | output_prefixes : dict[str, str], optional |
| | Mapping from each biomarker name to the prefix to use for the output |
| | columns. Defaults to using the biomarker name itself. |
| | |
| | Returns |
| | ------- |
| | pd.DataFrame |
| | A copy of `df` with two new columns for each biomarker: |
| | `<prefix>_z` and `<prefix>_pct`. |
| | """ |
| | value_cols = value_cols or {bm: bm for bm in biomarkers} |
| | output_prefixes = output_prefixes or {} |
| | out = df.copy() |
| |
|
| | for bm in biomarkers: |
| | prefix = output_prefixes.get(bm, bm) |
| | out[[f"{prefix}_z", f"{prefix}_pct"]] = df.apply( |
| | _compute_for_row, |
| | axis=1, |
| | biomarker=bm, |
| | normative_df=normative_df, |
| | age_col=age_col, |
| | region_col=region_col, |
| | gender_col=gender_col, |
| | bmi_col=bmi_col, |
| | value_col=value_cols[bm], |
| | ) |
| |
|
| | return out |
| |
|
| |
|
| | |
| | def compute_skew_corrected_position( |
| | value: float, mean: float, sd: float, median: float |
| | ) -> dict[str, float]: |
| | """Compute skew-corrected z-score and percentile using Pearson Type III distribution.""" |
| | |
| | if sd == 0: |
| | skewness = float("nan") |
| | else: |
| | skewness = 3 * (mean - median) / sd |
| | |
| | dist = stats.pearson3(skewness, loc=mean, scale=sd) |
| | |
| | p = dist.cdf(value) |
| | |
| | z_corr = stats.norm.ppf(p) |
| | return {"z_skew_corrected": z_corr, "percentile_skew_corrected": float(p * 100)} |
| |
|