from __future__ import annotations import json from pathlib import Path from typing import Dict import numpy as np import pandas as pd from .calibration import CONTINUOUS_GENES, RFCalibrationParamsV2, V5Thresholds, V5Weights from .core import calibrated_surrogate_v4_core from .materials import attach_material_recommendations from .metrics import ( apply_v5_filters_and_scores, bdr_eff, cooling_proxy_nc, cooling_proxy_sc, duty_cycle, pulse_heating_proxy, source_compatibility, tolerance_components, wall_power_avg_kW, ) from .sampling import sample_base_designs, sample_family_conditioned_operating_point from .utils import infer_family, infer_type_from_family, make_candidate_id def repair_child(child: pd.Series) -> pd.Series: family = child.get("family", infer_family(str(child.get("type", "")))) if pd.isna(family) or str(family).strip() == "": family = infer_family(str(child.get("type", ""))) if family == "unknown": family = "Lband" child["family"] = family child["type"] = infer_type_from_family(family) if family == "Lband": child["freq_GHz"] = float(np.clip(child["freq_GHz"], 0.20, 0.80)) child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 60.0, 150.0)) child["Q0"] = float(np.clip(child["Q0"], 1e4, 8e4)) child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 8.0, 20.0)) child["L_m"] = float(np.clip(child["L_m"], 0.15, 0.60)) child["beta"] = float(np.clip(child["beta"], 0.7, 2.5)) child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 800.0, 3000.0)) child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 100.0, 2000.0)) child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 0.4, 1.5)) child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 4.0, 8.0)) child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 6.0, 12.0)) child["surf_factor"] = float(np.clip(child["surf_factor"], 0.9, 1.1)) child["geom_factor"] = float(np.clip(child["geom_factor"], 0.8, 1.1)) child["freq_factor"] = float(np.clip(child["freq_factor"], 0.9, 1.05)) child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 10.0, 40.0)) child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 40.0, 100.0)) child["S_f"] = float(np.clip(child["S_f"], 0.003, 0.010)) child["S_phi"] = float(np.clip(child["S_phi"], 0.002, 0.008)) child["S_c"] = float(np.clip(child["S_c"], 0.001, 0.006)) child["cryo_temp_K"] = np.nan elif family == "Cband": child["freq_GHz"] = float(np.clip(child["freq_GHz"], 5.00, 6.00)) child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 100.0, 280.0)) child["Q0"] = float(np.clip(child["Q0"], 2e4, 8e4)) child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 12.0, 30.0)) child["L_m"] = float(np.clip(child["L_m"], 0.20, 0.60)) child["beta"] = float(np.clip(child["beta"], 0.8, 3.0)) child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 200.0, 800.0)) child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 50.0, 500.0)) child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 0.8, 2.2)) child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 9.0, 15.0)) child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 10.0, 20.0)) child["surf_factor"] = float(np.clip(child["surf_factor"], 0.95, 1.20)) child["geom_factor"] = float(np.clip(child["geom_factor"], 0.9, 1.2)) child["freq_factor"] = float(np.clip(child["freq_factor"], 1.0, 1.15)) child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 5.0, 20.0)) child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 15.0, 40.0)) child["S_f"] = float(np.clip(child["S_f"], 0.010, 0.025)) child["S_phi"] = float(np.clip(child["S_phi"], 0.008, 0.020)) child["S_c"] = float(np.clip(child["S_c"], 0.006, 0.015)) child["cryo_temp_K"] = np.nan elif family == "Xband": child["freq_GHz"] = float(np.clip(child["freq_GHz"], 9.00, 12.00)) child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 80.0, 200.0)) child["Q0"] = float(np.clip(child["Q0"], 1e4, 6e4)) child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 20.0, 45.0)) child["L_m"] = float(np.clip(child["L_m"], 0.15, 0.45)) child["beta"] = float(np.clip(child["beta"], 0.8, 3.0)) child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 50.0, 300.0)) child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 10.0, 200.0)) child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 1.0, 3.0)) child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 12.0, 20.0)) child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 12.0, 24.0)) child["surf_factor"] = float(np.clip(child["surf_factor"], 1.0, 1.35)) child["geom_factor"] = float(np.clip(child["geom_factor"], 1.0, 1.3)) child["freq_factor"] = float(np.clip(child["freq_factor"], 1.1, 1.3)) child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 2.0, 10.0)) child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 5.0, 18.0)) child["S_f"] = float(np.clip(child["S_f"], 0.020, 0.060)) child["S_phi"] = float(np.clip(child["S_phi"], 0.015, 0.050)) child["S_c"] = float(np.clip(child["S_c"], 0.010, 0.035)) child["cryo_temp_K"] = np.nan else: # SC child["freq_GHz"] = float(np.clip(child["freq_GHz"], 0.80, 1.60)) child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 200.0, 400.0)) child["Q0"] = float(np.clip(child["Q0"], 5e9, 2e10)) child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 10.0, 25.0)) child["L_m"] = float(np.clip(child["L_m"], 0.60, 1.20)) child["beta"] = float(np.clip(child["beta"], 0.8, 2.0)) child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 5000.0, 100000.0)) child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 1.0, 100.0)) child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 4.0, 15.0)) child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 8.0, 20.0)) child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 15.0, 40.0)) child["surf_factor"] = float(np.clip(child["surf_factor"], 0.9, 1.1)) child["geom_factor"] = float(np.clip(child["geom_factor"], 0.8, 1.0)) child["freq_factor"] = float(np.clip(child["freq_factor"], 0.9, 1.05)) child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 10.0, 30.0)) child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 30.0, 80.0)) child["S_f"] = float(np.clip(child["S_f"], 0.004, 0.012)) child["S_phi"] = float(np.clip(child["S_phi"], 0.003, 0.010)) child["S_c"] = float(np.clip(child["S_c"], 0.002, 0.008)) cryo = child.get("cryo_temp_K", 2.0) if pd.isna(cryo): cryo = 2.0 child["cryo_temp_K"] = float(np.clip(cryo, 1.5, 5.0)) child["freq_Hz"] = child["freq_GHz"] * 1e9 child["Vc_MV"] = child["Eacc_MVpm"] * child["L_m"] child["Q_loaded"] = child["Q0"] / (1.0 + child["beta"]) return child def recalculate_physics_and_score( df: pd.DataFrame, thresholds: V5Thresholds, weights: V5Weights, bdr_preset="nominal", ) -> pd.DataFrame: params = RFCalibrationParamsV2() scored = df.copy() scored["family"] = scored["family"].fillna(scored["type"].map(infer_family)) scored["type"] = np.where( scored["family"].eq("SC"), "SC_elliptical_1p3GHz", np.where( scored["family"].eq("Lband"), "NC_pillbox_Lband", np.where( scored["family"].eq("Cband"), "NC_Cband_TW", np.where(scored["family"].eq("Xband"), "NC_Xband_SW", scored["type"]) ), ), ) scored["freq_Hz"] = scored["freq_GHz"] * 1e9 scored["Vc_MV"] = scored["Eacc_MVpm"] * scored["L_m"] scored["Q_loaded"] = scored["Q0"] / (1.0 + scored["beta"]) core = calibrated_surrogate_v4_core(scored, params=params) for k, v in core.items(): scored[k] = v scored["duty_cycle"] = duty_cycle( scored["rep_rate_Hz"].values, scored["pulse_length_ns"].values, ) scored["P_wall_avg_kW"] = wall_power_avg_kW( scored["P_wall_peak_kW"].values, scored["duty_cycle"].values, scored["P_aux_kW"].values, ) scored["deltaT_pulse"] = pulse_heating_proxy( scored["Eacc_MVpm"].values, scored["pulse_length_ns"].values, scored["geom_factor"].values, scored["family"].values, ) scored["BDR_eff"] = bdr_eff( scored["Eacc_MVpm"].values, scored["pulse_length_ns"].values, scored["surf_factor"].values, scored["freq_factor"].values, preset=bdr_preset, ) scored["C_src"] = source_compatibility( scored["P_wall_peak_kW"].values, scored["source_power_avail_kW"].values, scored["pulse_length_ns"].values, scored["rep_rate_Hz"].values, scored["family"].values, ) is_sc = scored["family"].eq("SC").values c_cool = np.empty(len(scored), dtype=float) if np.any(~is_sc): c_cool[~is_sc] = cooling_proxy_nc( scored.loc[~is_sc, "P_wall_avg_kW"].values, scored.loc[~is_sc, "cooling_capacity_kW"].values, ) if np.any(is_sc): p_dyn = ( scored.loc[is_sc, "P_wall_peak_kW"].values * scored.loc[is_sc, "duty_cycle"].values ) p_static = scored.loc[is_sc, "P_aux_kW"].values q_cool = scored.loc[is_sc, "cooling_capacity_kW"].values c_cool[is_sc] = cooling_proxy_sc( p_dyn, p_static, q_cool, ) scored["C_cool"] = c_cool R_f, R_phi, R_c, R_tol = tolerance_components( scored["fab_sigma_um"].values, scored["S_f"].values, scored["S_phi"].values, scored["S_c"].values, scored["delta_allow_um"].values, ) scored["R_f"] = R_f scored["R_phi"] = R_phi scored["R_c"] = R_c scored["R_tol"] = R_tol scored["candidate_id"] = scored.apply(make_candidate_id, axis=1) scored = attach_material_recommendations(scored) return apply_v5_filters_and_scores(scored, thresholds, weights) def crossover(p1: pd.Series, p2: pd.Series, rng: np.random.Generator) -> pd.Series: child = p1.copy() for gene in CONTINUOUS_GENES: if gene in p1.index and gene in p2.index: if not pd.isna(p1[gene]) and not pd.isna(p2[gene]): if rng.random() > 0.5: child[gene] = p2[gene] return child def mutate( child: pd.Series, rng: np.random.Generator, mutation_rate=0.20, mutation_scale=0.05, ) -> pd.Series: for gene in CONTINUOUS_GENES: if gene in child.index and not pd.isna(child[gene]) and rng.random() < mutation_rate: magnitude = max(abs(float(child[gene])), 1e-9) noise = rng.normal(0.0, mutation_scale * magnitude) child[gene] = float(child[gene]) + noise child = repair_child(child) return child def run_evolution( pop_size: int, generations: int, target_class: str, bdr_preset: str, class_config: Dict[str, V5Thresholds], seed: int = 7, out_dir: Path | None = None, ) -> pd.DataFrame: rng = np.random.default_rng(seed) print(f"\nLaunching evolution ({generations} generations, population {pop_size}) target: {target_class}") thresholds = class_config[target_class] weights = V5Weights() df = sample_base_designs(n=pop_size, seed=seed) df = sample_family_conditioned_operating_point(df, seed=seed + 101) current_pop = recalculate_physics_and_score(df, thresholds, weights, bdr_preset) current_pop["fitness"] = np.where(current_pop["hard_pass"], current_pop["balanced_v5"], 0.0) history = [] for gen in range(1, generations + 1): current_pop = current_pop.sort_values("fitness", ascending=False).reset_index(drop=True) best_fitness = float(current_pop["fitness"].max()) best_yield = float(current_pop.iloc[0]["delivered"]) if best_fitness > 0 else 0.0 best_family = current_pop.iloc[0]["family"] best_type = current_pop.iloc[0]["type"] print( f"Generation {gen}/{generations} | Max balanced score: {best_fitness:.4f} " f"| Max yield: {best_yield:.2e} | Best: {best_family} / {best_type}" ) history.append({ "generation": gen, "best_fitness": best_fitness, "best_yield": best_yield, "best_family": best_family, "best_type": best_type, "qualified_count": int(current_pop["hard_pass"].sum()), }) elite_size = max(1, int(pop_size * 0.10)) parent_pool_size = max(2, int(pop_size * 0.40)) elite = current_pop.head(elite_size).copy() parents = current_pop.head(parent_pool_size).copy() offspring_list = [] for _ in range(pop_size - elite_size): families = parents["family"].dropna().unique() if len(families) == 0: families = np.array(["Lband"]) family = rng.choice(families) fam_parents = parents[parents["family"] == family] if len(fam_parents) >= 2: pair_idx = rng.choice(fam_parents.index.to_numpy(), size=2, replace=True) p1 = fam_parents.loc[pair_idx[0]] p2 = fam_parents.loc[pair_idx[1]] else: pair_idx = rng.choice(parents.index.to_numpy(), size=2, replace=True) p1 = parents.loc[pair_idx[0]] p2 = parents.loc[pair_idx[1]] child = crossover(p1, p2, rng) child["family"] = family child["type"] = infer_type_from_family(family) child = mutate(child, rng, mutation_rate=0.25, mutation_scale=0.08) offspring_list.append(child) next_gen_base = pd.concat([elite, pd.DataFrame(offspring_list)], ignore_index=True) current_pop = recalculate_physics_and_score(next_gen_base, thresholds, weights, bdr_preset) current_pop["fitness"] = np.where(current_pop["hard_pass"], current_pop["balanced_v5"], 0.0) print("\nEvolution completed. Best evolved RF design:") winner = current_pop.sort_values("fitness", ascending=False).iloc[0] print(f"Type: {winner['type']} | Frequency: {winner['freq_GHz']:.3f} GHz | Eacc: {winner['Eacc_MVpm']:.2f} MV/m") print(f"Pulse: {winner['pulse_length_ns']:.1f} ns | Rep rate: {winner['rep_rate_Hz']:.1f} Hz") print(f"Yield: {winner['delivered']:.2e} | Pavg: {winner['P_wall_avg_kW']:.2f} kW | ΔT: {winner['deltaT_pulse']:.3f} K") print(f"Balanced v5: {winner['balanced_v5']:.4f}") print(f"Suggested material: {winner['suggested_primary_material']}") if out_dir is not None: out_dir.mkdir(parents=True, exist_ok=True) current_pop.sort_values(["fitness", "delivered"], ascending=False).to_csv( out_dir / "rf_mode_pack_v6_0_evolution_population.csv", index=False, ) pd.DataFrame(history).to_csv( out_dir / "rf_mode_pack_v6_0_evolution_history.csv", index=False, ) winner.to_frame().T.to_csv( out_dir / "rf_mode_pack_v6_0_evolution_winner.csv", index=False, ) (out_dir / "rf_mode_pack_v6_0_evolution_meta.json").write_text( json.dumps({ "pop_size": pop_size, "generations": generations, "target_class": target_class, "bdr_preset": bdr_preset, "seed": seed, }, ensure_ascii=False, indent=2), encoding="utf-8", ) return current_pop