Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| Hypertension Disease Progression and Intervention Effects Model | |
| with Beta (SE) structure and stochastic PSA option | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from scipy.linalg import expm | |
| import matplotlib.pyplot as plt | |
| import io | |
| import base64 | |
| # ----------------------------------------------------------- | |
| # 1) β coefficients (log-HR) + SE (from your provided table) | |
| # ----------------------------------------------------------- | |
| # Transition names for display | |
| TRANSITION_NAMES_EN = ["N→P", "P→S1", "S1→S2", "P→N"] | |
| beta_men = { | |
| "N2P": { # Normal → Prehypertension (β1) | |
| "Education_high": {"beta": -0.2950, "se": 0.1721}, | |
| "BMI_ge25": {"beta": 0.5275, "se": 0.1568}, | |
| "Waist_ge90": {"beta": 0.5040, "se": 0.2526}, | |
| "Fasting_glu_high": {"beta": -0.0164, "se": 0.3720}, | |
| "TC_ge200": {"beta": 0.1328, "se": 0.1385}, | |
| "UA_high": {"beta": 0.0398, "se": 0.1469}, | |
| "Smoking_current": {"beta": 0.1014, "se": 0.1455}, | |
| "Betel_current": {"beta": -0.3524, "se": 0.1871}, | |
| "Alcohol_current": {"beta": -0.2193, "se": 0.1458}, | |
| "Exercise_freq": {"beta": 0.3028, "se": 0.1754}, | |
| "FHx_yes": {"beta": 0.0896, "se": 0.1786} | |
| }, | |
| "P2S1": { # Prehypertension → Stage 1 hypertension (β2) | |
| "Education_high": {"beta": -0.1105, "se": 0.1079}, | |
| "BMI_ge25": {"beta": 0.1272, "se": 0.1062}, | |
| "Waist_ge90": {"beta": -0.0909, "se": 0.1222}, | |
| "Fasting_glu_high": {"beta": 0.0078, "se": 0.1778}, | |
| "TC_ge200": {"beta": -0.1197, "se": 0.0961}, | |
| "UA_high": {"beta": 0.3740, "se": 0.0986}, | |
| "Smoking_current": {"beta": -0.0505, "se": 0.1017}, | |
| "Betel_current": {"beta": 0.2878, "se": 0.1433}, | |
| "Alcohol_current": {"beta": 0.0422, "se": 0.1003}, | |
| "Exercise_freq": {"beta": 0.0642, "se": 0.1497}, | |
| "FHx_yes": {"beta": 0.2461, "se": 0.1280} | |
| }, | |
| "S12S2": { # Stage 1 → Stage 2 hypertension (β3) | |
| "Education_high": {"beta": -0.6211, "se": 0.3150}, | |
| "BMI_ge25": {"beta": -0.6488, "se": 0.3189}, | |
| "Waist_ge90": {"beta": 0.2272, "se": 0.3577}, | |
| "Fasting_glu_high": {"beta": 0.3553, "se": 0.4633}, | |
| "TC_ge200": {"beta": -0.0633, "se": 0.2687}, | |
| "UA_high": {"beta": 0.0411, "se": 0.2725}, | |
| "Smoking_current": {"beta": -0.3919, "se": 0.2850}, | |
| "Betel_current": {"beta": -0.0243, "se": 0.4090}, | |
| "Alcohol_current": {"beta": 0.6950, "se": 0.2863}, | |
| "Exercise_freq": {"beta": -0.5746, "se": 0.3871}, | |
| "FHx_yes": {"beta": -0.2716, "se": 0.4013} | |
| }, | |
| "P2N": { # Prehypertension → Normal (β4) | |
| "Education_high": {"beta": -0.3251, "se": 0.2192}, | |
| "BMI_ge25": {"beta": 0.0265, "se": 0.1978}, | |
| "Waist_ge90": {"beta": 0.4057, "se": 0.3004}, | |
| "Fasting_glu_high": {"beta": -0.3138, "se": 0.4235}, | |
| "TC_ge200": {"beta": -0.0306, "se": 0.1749}, | |
| "UA_high": {"beta": -0.3187, "se": 0.1964}, | |
| "Smoking_current": {"beta": 0.4710, "se": 0.1816}, | |
| "Betel_current": {"beta": -0.6040, "se": 0.2568}, | |
| "Alcohol_current": {"beta": -0.5499, "se": 0.1849}, | |
| "Exercise_freq": {"beta": 0.4304, "se": 0.2314}, | |
| "FHx_yes": {"beta": -0.0033, "se": 0.2351} | |
| } | |
| } | |
| beta_women = { | |
| "N2P": { # Normal → Prehypertension (β1) | |
| "Education_high": {"beta": -0.1497, "se": 0.1029}, | |
| "BMI_ge25": {"beta": 0.3171, "se": 0.1128}, | |
| "Waist_ge80": {"beta": -0.1668, "se": 0.1167}, | |
| "Fasting_glu_high": {"beta": 0.5199, "se": 0.2591}, | |
| "TC_ge200": {"beta": 0.2077, "se": 0.0940}, | |
| "UA_high": {"beta": 0.0705, "se": 0.1161}, | |
| "Smoking_current": {"beta": -0.5675, "se": 0.1968}, | |
| "Alcohol_current": {"beta": -0.1241, "se": 0.1670}, | |
| "Exercise_freq": {"beta": -0.1400, "se": 0.1177}, | |
| "FHx_yes": {"beta": -0.0344, "se": 0.1078} | |
| }, | |
| "P2S1": { # Prehypertension → Stage 1 hypertension (β2) | |
| "Education_high": {"beta": -0.0813, "se": 0.1061}, | |
| "BMI_ge25": {"beta": 0.1029, "se": 0.0982}, | |
| "Waist_ge80": {"beta": -0.0223, "se": 0.1020}, | |
| "Fasting_glu_high": {"beta": 0.1663, "se": 0.1416}, | |
| "TC_ge200": {"beta": -0.0473, "se": 0.0805}, | |
| "UA_high": {"beta": 0.2912, "se": 0.0957}, | |
| "Smoking_current": {"beta": -0.4045, "se": 0.2225}, | |
| "Alcohol_current": {"beta": -0.0472, "se": 0.1685}, | |
| "Exercise_freq": {"beta": -0.0872, "se": 0.1160}, | |
| "FHx_yes": {"beta": 0.3253, "se": 0.1075} | |
| }, | |
| "S12S2": { # Stage 1 → Stage 2 hypertension (β3) | |
| "Education_high": {"beta": -0.3908, "se": 0.3162}, | |
| "BMI_ge25": {"beta": -0.3142, "se": 0.2948}, | |
| "Waist_ge80": {"beta": 0.1843, "se": 0.2955}, | |
| "Fasting_glu_high": {"beta": -1.3789, "se": 0.7272}, | |
| "TC_ge200": {"beta": 0.1766, "se": 0.2380}, | |
| "UA_high": {"beta": 0.0682, "se": 0.2806}, | |
| "Smoking_current": {"beta": -8.0955, "se": 21.3033}, | |
| "Alcohol_current": {"beta": 0.0780, "se": 0.5369}, | |
| "Exercise_freq": {"beta": 0.2557, "se": 0.4326}, | |
| "FHx_yes": {"beta": 0.2805, "se": 0.3081} | |
| }, | |
| "P2N": { # Prehypertension → Normal (β4) | |
| "Education_high": {"beta": 0.0195, "se": 0.1214}, | |
| "BMI_ge25": {"beta": -0.2213, "se": 0.1391}, | |
| "Waist_ge80": {"beta": -0.4769, "se": 0.1649}, | |
| "Fasting_glu_high": {"beta": 0.1771, "se": 0.3247}, | |
| "TC_ge200": {"beta": 0.0501, "se": 0.1156}, | |
| "UA_high": {"beta": -0.2798, "se": 0.1517}, | |
| "Smoking_current": {"beta": -0.1689, "se": 0.2330}, | |
| "Alcohol_current": {"beta": -0.1731, "se": 0.1995}, | |
| "Exercise_freq": {"beta": -0.0527, "se": 0.1429}, | |
| "FHx_yes": {"beta": -0.4005, "se": 0.1375} | |
| } | |
| } | |
| # ----------------------------------------------------------- | |
| # 2) Baseline hazards | |
| # ----------------------------------------------------------- | |
| lam10_0, lam20_0, lam30_0, rho0_0 = 0.08, 0.10, 0.12, 0.05 | |
| # ----------------------------------------------------------- | |
| # 3) Lambda calculation with stochastic option | |
| # ----------------------------------------------------------- | |
| def calc_lambda(betas: dict, features: dict, baseline: float, randomize=False): | |
| """Compute λ = λ0 * exp(Xβ), optionally sampling β ~ Normal(mean, SE)""" | |
| logHR = 0.0 | |
| for k, vals in betas.items(): | |
| beta = vals["beta"] | |
| se = vals["se"] | |
| if randomize: | |
| beta = np.random.normal(beta, se) | |
| logHR += beta * features.get(k, 0) | |
| return baseline * np.exp(logHR) | |
| def hazards_from_beta(sex: str, features: dict, | |
| lam10, lam20, lam30, rho0, randomize=False): | |
| B = beta_men if sex.upper().startswith('M') else beta_women | |
| l1 = calc_lambda(B["N2P"], features, lam10, randomize) | |
| l2 = calc_lambda(B["P2S1"], features, lam20, randomize) | |
| l3 = calc_lambda(B["S12S2"], features, lam30, randomize) | |
| r = calc_lambda(B["P2N"], features, rho0, randomize) | |
| return l1, l2, l3, r | |
| def Q_matrix(l1, l2, l3, rho): | |
| # State order: [N, P, S1, S2] | |
| Q = np.zeros((4, 4)) | |
| Q[0, 1] = l1 | |
| Q[0, 0] = -l1 | |
| Q[1, 0] = rho | |
| Q[1, 2] = l2 | |
| Q[1, 1] = -(rho + l2) | |
| Q[2, 3] = l3 | |
| Q[2, 2] = -l3 | |
| Q[3, 3] = 0.0 | |
| return Q | |
| def discrete_P(Q, years=1.0): | |
| P = expm(Q * years) | |
| # Numerical stability | |
| P = np.clip(P, 0, 1) | |
| P = P / P.sum(axis=1, keepdims=True) | |
| return P | |
| # ----------------------------------------------------------- | |
| # 4) (Optional) Calibration: achieve target 5-year S2 cumulative proportion | |
| # ----------------------------------------------------------- | |
| def calibrate_scale(sex: str, features_ref: dict, | |
| lam10, lam20, lam30, rho0, | |
| target_s2_5y: float, | |
| max_iter=40): | |
| lo, hi = 0.2, 5.0 | |
| for _ in range(max_iter): | |
| mid = 0.5 * (lo + hi) | |
| l1, l2, l3, r = hazards_from_beta(sex, features_ref, | |
| lam10 * mid, lam20 * mid, lam30 * mid, rho0) | |
| Q = Q_matrix(l1, l2, l3, r) | |
| P = discrete_P(Q, 1.0) | |
| s = np.array([1, 0, 0, 0], float) | |
| for _ in range(5): | |
| s = s @ P | |
| if s[3] < target_s2_5y: | |
| lo = mid | |
| else: | |
| hi = mid | |
| return 0.5 * (lo + hi) | |
| # ----------------------------------------------------------- | |
| # 5) Markov CEA: cost, utility, discounting, ICER | |
| # ----------------------------------------------------------- | |
| def run_markov(P, C, U, start_dist, cycles=10, discount=0.03): | |
| s = start_dist.astype(float) | |
| total_cost, total_qaly = 0.0, 0.0 | |
| trace = [s.copy()] | |
| for t in range(cycles): | |
| total_cost += float(s @ C) / ((1 + discount) ** t) | |
| total_qaly += float(s @ U) / ((1 + discount) ** t) | |
| s = s @ P | |
| trace.append(s.copy()) | |
| return total_cost, total_qaly, np.vstack(trace) | |
| def icer(costA, qalyA, costB, qalyB): | |
| """Calculate ICER, handling edge cases""" | |
| deltaC = costB - costA | |
| deltaQ = qalyB - qalyA | |
| # Handle special cases | |
| if abs(deltaQ) < 1e-9: # QALY difference too small, consider equal | |
| return float('inf') if deltaC > 0 else float('-inf'), deltaC, deltaQ | |
| # Normal case | |
| return deltaC / deltaQ, deltaC, deltaQ | |
| # ----------------------------------------------------------- | |
| # 6) Graphics: CE plane, CEAC curve | |
| # ----------------------------------------------------------- | |
| def plot_ce_plane(deltaQ, deltaC, icer_val, intervention_name="Intervention"): | |
| plt.figure(figsize=(10, 7)) # 加大圖表尺寸 | |
| # Set up axes and quadrant lines | |
| plt.axhline(0, color='gray', linestyle='--', alpha=0.7, linewidth=1) | |
| plt.axvline(0, color='gray', linestyle='--', alpha=0.7, linewidth=1) | |
| # Plot ICER point with larger marker | |
| plt.scatter(deltaQ, deltaC, s=200, color='#DC143C', edgecolors='darkred', | |
| linewidths=2, zorder=5, alpha=0.9) | |
| # Add different explanations based on quadrant | |
| if deltaQ > 0 and deltaC > 0: # Northeast quadrant | |
| title_text = f"ICER = ${icer_val:,.1f}/QALY - More expensive but more effective" | |
| quadrant = "NE" | |
| elif deltaQ < 0 and deltaC > 0: # Northwest quadrant | |
| title_text = f"ICER = ${icer_val:,.1f}/QALY - More expensive and less effective" | |
| quadrant = "NW" | |
| elif deltaQ < 0 and deltaC < 0: # Southwest quadrant | |
| title_text = f"ICER = ${icer_val:,.1f}/QALY - Less expensive but less effective" | |
| quadrant = "SW" | |
| else: # Southeast quadrant | |
| title_text = f"ICER = ${icer_val:,.1f}/QALY - Less expensive and more effective" | |
| quadrant = "SE" | |
| plt.title(title_text, fontsize=14, fontweight='bold', pad=20) | |
| # Add labels | |
| plt.xlabel("Effect Difference (QALYs)", fontsize=12, fontweight='bold') | |
| plt.ylabel("Cost Difference ($)", fontsize=12, fontweight='bold') | |
| # Add WTP threshold line ($50,000/QALY) | |
| wtp = 50000 | |
| x_max = max(abs(deltaQ) * 1.3, 0.05) # 確保有足夠的範圍 | |
| x_range = [-x_max * 0.1, x_max] | |
| plt.xlim(x_range) | |
| # 計算 y 軸範圍 | |
| y_max = max(abs(deltaC) * 1.3, wtp * x_max * 0.5) | |
| y_range = [-y_max * 0.2, y_max] | |
| plt.ylim(y_range) | |
| # 畫 WTP 閾值線 | |
| plt.plot([0, x_range[1]], [0, x_range[1] * wtp], 'k--', alpha=0.5, | |
| linewidth=2, label=f'WTP Threshold ${wtp:,}/QALY') | |
| # Add annotation with better positioning | |
| # 根據點的位置調整標註位置 | |
| if deltaQ > 0 and deltaC < 0: | |
| # 右下象限 - 標註放在左上 | |
| xytext = (-80, 40) | |
| ha = 'right' | |
| elif deltaQ > 0 and deltaC > 0: | |
| # 右上象限 - 標註放在左下 | |
| xytext = (-80, -40) | |
| ha = 'right' | |
| elif deltaQ < 0 and deltaC < 0: | |
| # 左下象限 - 標註放在右上 | |
| xytext = (80, 40) | |
| ha = 'left' | |
| else: | |
| # 左上象限 - 標註放在右下 | |
| xytext = (80, -40) | |
| ha = 'left' | |
| plt.annotate( | |
| f"{intervention_name}\nΔC=${deltaC:.1f}\nΔQ={deltaQ:.3f}", | |
| xy=(deltaQ, deltaC), | |
| xytext=xytext, | |
| textcoords="offset points", | |
| fontsize=11, | |
| fontweight='bold', | |
| ha=ha, | |
| bbox=dict(boxstyle='round,pad=0.5', facecolor='yellow', alpha=0.7, edgecolor='black'), | |
| arrowprops=dict(arrowstyle="->", connectionstyle="arc3,rad=.2", | |
| lw=2, color='black') | |
| ) | |
| plt.grid(alpha=0.3, linestyle=':', linewidth=0.5) | |
| plt.legend(fontsize=11, loc='upper left', framealpha=0.9) | |
| # 調整邊距 | |
| plt.tight_layout() | |
| # Save figure as base64 format | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', dpi=150, bbox_inches='tight') # 提高 DPI | |
| plt.close() | |
| buf.seek(0) | |
| img_str = base64.b64encode(buf.read()).decode('utf-8') | |
| return f"data:image/png;base64,{img_str}" | |
| def generate_psa_samples(costA, qalyA, costB, qalyB, n_samples=1000, cv=0.2): | |
| """Generate probabilistic sensitivity analysis samples""" | |
| samples = [] | |
| # Assume costs and QALYs follow lognormal distribution | |
| for _ in range(n_samples): | |
| c_a = np.random.lognormal(np.log(costA), cv) | |
| q_a = np.random.lognormal(np.log(qalyA), cv) | |
| c_b = np.random.lognormal(np.log(costB), cv) | |
| q_b = np.random.lognormal(np.log(qalyB), cv) | |
| # Calculate increments | |
| delta_c = c_b - c_a | |
| delta_q = q_b - q_a | |
| # Handle division by zero | |
| if abs(delta_q) < 1e-9: | |
| continue | |
| # Calculate ICER | |
| icer_val = delta_c / delta_q | |
| samples.append((delta_q, delta_c, icer_val)) | |
| return samples | |
| def plot_ceac(costA, qalyA, costB, qalyB, intervention_name="Intervention", n_samples=1000): | |
| # Generate PSA samples | |
| samples = generate_psa_samples(costA, qalyA, costB, qalyB, n_samples) | |
| # Set up WTP threshold range | |
| wtp_range = np.linspace(0, 100000, 100) | |
| prob_B_ce = [] | |
| # Calculate probability of B being cost-effective at each WTP threshold | |
| for wtp in wtp_range: | |
| count_ce = 0 | |
| for delta_q, delta_c, _ in samples: | |
| # Condition for B being more cost-effective than A: | |
| # Either (saves money and improves health) or (incremental cost/effect < WTP) | |
| if (delta_c < 0 and delta_q > 0) or (delta_q > 0 and (delta_c / delta_q) < wtp): | |
| count_ce += 1 | |
| prob_B_ce.append(count_ce / len(samples)) | |
| # Plot CEAC curve | |
| plt.figure(figsize=(8, 6)) | |
| plt.plot(wtp_range, prob_B_ce, 'b-', linewidth=2) | |
| plt.axhline(0.5, color='gray', linestyle='--', alpha=0.5) | |
| plt.grid(alpha=0.3) | |
| plt.xlabel("Willingness-to-Pay Threshold ($/QALY)") | |
| plt.ylabel("Probability of Intervention Being Cost-Effective") | |
| plt.title(f"{intervention_name} Cost-Effectiveness Acceptability Curve (CEAC)") | |
| plt.ylim(0, 1) | |
| # Save figure as base64 format | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', dpi=100) | |
| plt.close() | |
| buf.seek(0) | |
| img_str = base64.b64encode(buf.read()).decode('utf-8') | |
| return f"data:image/png;base64,{img_str}", wtp_range, prob_B_ce | |
| def plot_state_distribution(trace, states, title="Health State Distribution"): | |
| plt.figure(figsize=(8, 6)) | |
| for i, state in enumerate(states): | |
| plt.plot(range(len(trace)), trace[:, i], label=state) | |
| plt.xlabel("Year") | |
| plt.ylabel("Proportion") | |
| plt.title(title) | |
| plt.grid(alpha=0.3) | |
| plt.legend() | |
| # Save figure as base64 format | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', dpi=100) | |
| plt.close() | |
| buf.seek(0) | |
| img_str = base64.b64encode(buf.read()).decode('utf-8') | |
| return f"data:image/png;base64,{img_str}" | |
| # ----------------------------------------------------------- | |
| # 7) Wrapper: integrated function for general intervention analysis | |
| # ----------------------------------------------------------- | |
| def person_P(sex: str, features: dict, alpha=1.0, dt=1.0): | |
| """Get a person's annual transition probability matrix""" | |
| l1, l2, l3, r = hazards_from_beta(sex, features, | |
| lam10_0 * alpha, lam20_0 * alpha, lam30_0 * alpha, rho0_0) | |
| Q = Q_matrix(l1, l2, l3, r) | |
| P = discrete_P(Q, dt) | |
| return P, [l1, l2, l3, r] | |
| def run_analysis(sex: str, features: dict, intervention_feature: str, | |
| C_A, C_B, U, cycles=10, discount_rate=0.03, target_s2_5y=0.20): | |
| """Single intervention effect analysis""" | |
| # Define state names | |
| states = ["Normal", "Prehypertension", "Stage 1", "Stage 2"] | |
| # Create copies of baseline and intervention feature dictionaries | |
| features_base = features.copy() | |
| features_int = features.copy() | |
| # Set baseline and intervention values based on intervention type | |
| if intervention_feature == "Exercise_freq": | |
| # Exercise intervention: 0 -> 1 (increase exercise) | |
| features_base[intervention_feature] = 0 | |
| features_int[intervention_feature] = 1 | |
| intervention_name = "Increase Exercise" | |
| elif intervention_feature in ["BMI_ge25", "Waist_ge90", "Waist_ge80", | |
| "Fasting_glu_high", "TC_ge200", "UA_high", | |
| "Smoking_current", "Betel_current", "Alcohol_current"]: | |
| # These interventions go from 1->0 (reduce risk factor) | |
| features_base[intervention_feature] = 1 | |
| features_int[intervention_feature] = 0 | |
| if intervention_feature == "BMI_ge25": | |
| intervention_name = "Reduce BMI to <25" | |
| elif intervention_feature in ["Waist_ge90", "Waist_ge80"]: | |
| intervention_name = "Reduce Waist Circumference" | |
| elif intervention_feature == "Fasting_glu_high": | |
| intervention_name = "Lower Fasting Glucose" | |
| elif intervention_feature == "TC_ge200": | |
| intervention_name = "Lower Cholesterol" | |
| elif intervention_feature == "UA_high": | |
| intervention_name = "Lower Uric Acid" | |
| elif intervention_feature == "Smoking_current": | |
| intervention_name = "Quit Smoking" | |
| elif intervention_feature == "Betel_current": | |
| intervention_name = "Quit Betel Nut" | |
| elif intervention_feature == "Alcohol_current": | |
| intervention_name = "Quit Drinking" | |
| elif intervention_feature == "Education_high": | |
| # Education intervention: 0 -> 1 (increase education level) | |
| features_base[intervention_feature] = 0 | |
| features_int[intervention_feature] = 1 | |
| intervention_name = "Improve Education" | |
| else: | |
| # Other cases, default baseline=0, intervention=1 | |
| features_base[intervention_feature] = 0 | |
| features_int[intervention_feature] = 1 | |
| intervention_name = f"Intervention ({intervention_feature})" | |
| # Print parameter information | |
| print(f"Sex: {'Male' if sex.upper().startswith('M') else 'Female'}") | |
| print(f"Intervention: {intervention_name}") | |
| print(f"Baseline parameter: {features_base[intervention_feature]}") | |
| print(f"Post-intervention parameter: {features_int[intervention_feature]}") | |
| # Set reference features for calibration | |
| ref_features = {k: 0 for k in features.keys()} | |
| # Calibration | |
| alpha = calibrate_scale( | |
| sex=sex, features_ref=ref_features, | |
| lam10=lam10_0, lam20=lam20_0, lam30=lam30_0, rho0=rho0_0, | |
| target_s2_5y=target_s2_5y | |
| ) | |
| # Get transition matrices | |
| P_A, lamA = person_P(sex, features_base, alpha=alpha) | |
| P_B, lamB = person_P(sex, features_int, alpha=alpha) | |
| # Starting distribution | |
| start_dist = np.array([1, 0, 0, 0], float) # Start in Normal state | |
| # Run Markov model | |
| cost_A, qaly_A, trace_A = run_markov(P_A, C_A, U, start_dist, cycles, discount_rate) | |
| cost_B, qaly_B, trace_B = run_markov(P_B, C_B, U, start_dist, cycles, discount_rate) | |
| # Calculate ICER | |
| ICER, dC, dQ = icer(cost_A, qaly_A, cost_B, qaly_B) | |
| # Generate charts | |
| ce_plane_img = plot_ce_plane(dQ, dC, ICER, intervention_name) | |
| ceac_img, wtp_range, prob_B_ce = plot_ceac(cost_A, qaly_A, cost_B, qaly_B, intervention_name) | |
| stateA_img = plot_state_distribution(trace_A, states, "No Intervention") | |
| stateB_img = plot_state_distribution(trace_B, states, intervention_name) | |
| # Prepare results | |
| results = { | |
| "intervention": intervention_name, | |
| "feature_name": intervention_feature, | |
| "feature_base_value": features_base[intervention_feature], | |
| "feature_int_value": features_int[intervention_feature], | |
| "hazards_A": dict(zip(TRANSITION_NAMES_EN, np.round(lamA, 4))), | |
| "hazards_B": dict(zip(TRANSITION_NAMES_EN, np.round(lamB, 4))), | |
| "transition_matrix_A": pd.DataFrame(P_A, index=states, columns=states).round(4).to_dict(), | |
| "transition_matrix_B": pd.DataFrame(P_B, index=states, columns=states).round(4).to_dict(), | |
| "cost_A": cost_A, | |
| "cost_B": cost_B, | |
| "qaly_A": qaly_A, | |
| "qaly_B": qaly_B, | |
| "delta_cost": dC, | |
| "delta_qaly": dQ, | |
| "ICER": ICER, | |
| "CE_plane_img": ce_plane_img, | |
| "CEAC_img": ceac_img, | |
| "stateA_img": stateA_img, | |
| "stateB_img": stateB_img, | |
| "wtp_values": wtp_range.tolist(), | |
| "probability_cost_effective": prob_B_ce | |
| } | |
| # Display main results | |
| print(f"\n--- {intervention_name} Cost-Effectiveness Analysis Results ---") | |
| print( | |
| f"{cycles}-year total cost: Baseline=${results['cost_A']:.1f}, Intervention=${results['cost_B']:.1f}, ΔC=${results['delta_cost']:.1f}") | |
| print( | |
| f"{cycles}-year total QALY: Baseline={results['qaly_A']:.3f}, Intervention={results['qaly_B']:.3f}, ΔQ={results['delta_qaly']:.3f}") | |
| print(f"ICER = ${results['ICER']:.1f}/QALY") | |
| # Intervention effect explanation | |
| if dQ > 0: | |
| if dC <= 0: | |
| print("Conclusion: This intervention both saves money and improves health (Dominant)") | |
| elif ICER < 50000: | |
| print("Conclusion: This intervention is cost-effective (ICER < $50,000/QALY)") | |
| else: | |
| print("Conclusion: This intervention is not cost-effective") | |
| else: | |
| if dC >= 0: | |
| print("Conclusion: This intervention both costs more and worsens health (Dominated)") | |
| else: | |
| print("Conclusion: This intervention saves money but worsens health") | |
| return results | |
| # ----------------------------------------------------------- | |
| # 7) Standard simulation main function - test different interventions | |
| # ----------------------------------------------------------- | |
| def main_simulation(): | |
| # Standard male features | |
| male_features = { | |
| "Education_high": 0, | |
| "BMI_ge25": 1, | |
| "Waist_ge90": 1, | |
| "Fasting_glu_high": 0, | |
| "TC_ge200": 0, | |
| "UA_high": 1, | |
| "Smoking_current": 1, | |
| "Betel_current": 0, | |
| "Alcohol_current": 1, | |
| "Exercise_freq": 0, | |
| "FHx_yes": 1 | |
| } | |
| # Standard female features | |
| female_features = { | |
| "Education_high": 0, | |
| "BMI_ge25": 1, | |
| "Waist_ge80": 1, | |
| "Fasting_glu_high": 0, | |
| "TC_ge200": 0, | |
| "UA_high": 1, | |
| "Smoking_current": 0, | |
| "Betel_current": 0, | |
| "Alcohol_current": 0, | |
| "Exercise_freq": 0, | |
| "FHx_yes": 1 | |
| } | |
| # Cost and utility | |
| C_A = np.array([200, 600, 1200, 2200]) # No intervention | |
| U = np.array([1.00, 0.90, 0.70, 0.50]) # Utilities for each state | |
| # Test weight loss intervention (BMI_ge25) | |
| print("\n" + "=" * 50) | |
| print("Testing Weight Loss Intervention (BMI_ge25: 1->0)") | |
| print("=" * 50) | |
| # Male weight loss | |
| C_B_bmi = np.array([300, 650, 1250, 2250]) # Weight loss increases cost | |
| results_bmi_m = run_analysis( | |
| sex="M", | |
| features=male_features, | |
| intervention_feature="BMI_ge25", # Reduce BMI to <25 | |
| C_A=C_A, | |
| C_B=C_B_bmi, | |
| U=U, | |
| cycles=10, | |
| discount_rate=0.03, | |
| target_s2_5y=0.2365 | |
| ) | |
| # Female weight loss | |
| results_bmi_f = run_analysis( | |
| sex="F", | |
| features=female_features, | |
| intervention_feature="BMI_ge25", # Reduce BMI to <25 | |
| C_A=C_A, | |
| C_B=C_B_bmi, | |
| U=U, | |
| cycles=10, | |
| discount_rate=0.03, | |
| target_s2_5y=0.2365 | |
| ) | |
| # Test exercise intervention (Exercise_freq) | |
| print("\n" + "=" * 50) | |
| print("Testing Exercise Intervention (Exercise_freq: 0->1)") | |
| print("=" * 50) | |
| # Male exercise | |
| C_B_exercise = np.array([250, 600, 1200, 2200]) # Exercise increases cost slightly | |
| results_exercise_m = run_analysis( | |
| sex="M", | |
| features=male_features, | |
| intervention_feature="Exercise_freq", # Increase exercise frequency | |
| C_A=C_A, | |
| C_B=C_B_exercise, | |
| U=U, | |
| cycles=10, | |
| discount_rate=0.03, | |
| target_s2_5y=0.2365 | |
| ) | |
| # Female exercise | |
| results_exercise_f = run_analysis( | |
| sex="F", | |
| features=female_features, | |
| intervention_feature="Exercise_freq", # Increase exercise frequency | |
| C_A=C_A, | |
| C_B=C_B_exercise, | |
| U=U, | |
| cycles=10, | |
| discount_rate=0.03, | |
| target_s2_5y=0.2365 | |
| ) | |
| # Test smoking cessation intervention (Smoking_current) | |
| print("\n" + "=" * 50) | |
| print("Testing Smoking Cessation Intervention (Smoking_current: 1->0)") | |
| print("=" * 50) | |
| # Male smoking cessation | |
| C_B_smoking = np.array([220, 600, 1200, 2200]) # Smoking cessation increases cost slightly | |
| results_smoking_m = run_analysis( | |
| sex="M", | |
| features=male_features, | |
| intervention_feature="Smoking_current", # Quit smoking | |
| C_A=C_A, | |
| C_B=C_B_smoking, | |
| U=U, | |
| cycles=10, | |
| discount_rate=0.03, | |
| target_s2_5y=0.2365 | |
| ) | |
| return { | |
| "BMI_male": results_bmi_m, | |
| "BMI_female": results_bmi_f, | |
| "Exercise_male": results_exercise_m, | |
| "Exercise_female": results_exercise_f, | |
| "Smoking_male": results_smoking_m | |
| } | |
| # Main program | |
| if __name__ == "__main__": | |
| print("=" * 50) | |
| print("Hypertension Disease Progression and Intervention Effects Model") | |
| print("=" * 50) | |
| # Run main simulation | |
| results = main_simulation() | |