Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Dict | |
| import numpy as np | |
| import pandas as pd | |
| from .calibration import CONTINUOUS_GENES, RFCalibrationParamsV2, V5Thresholds, V5Weights | |
| from .core import calibrated_surrogate_v4_core | |
| from .materials import attach_material_recommendations | |
| from .metrics import ( | |
| apply_v5_filters_and_scores, | |
| bdr_eff, | |
| cooling_proxy_nc, | |
| cooling_proxy_sc, | |
| duty_cycle, | |
| pulse_heating_proxy, | |
| source_compatibility, | |
| tolerance_components, | |
| wall_power_avg_kW, | |
| ) | |
| from .sampling import sample_base_designs, sample_family_conditioned_operating_point | |
| from .utils import infer_family, infer_type_from_family, make_candidate_id | |
| def repair_child(child: pd.Series) -> pd.Series: | |
| family = child.get("family", infer_family(str(child.get("type", "")))) | |
| if pd.isna(family) or str(family).strip() == "": | |
| family = infer_family(str(child.get("type", ""))) | |
| if family == "unknown": | |
| family = "Lband" | |
| child["family"] = family | |
| child["type"] = infer_type_from_family(family) | |
| if family == "Lband": | |
| child["freq_GHz"] = float(np.clip(child["freq_GHz"], 0.20, 0.80)) | |
| child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 60.0, 150.0)) | |
| child["Q0"] = float(np.clip(child["Q0"], 1e4, 8e4)) | |
| child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 8.0, 20.0)) | |
| child["L_m"] = float(np.clip(child["L_m"], 0.15, 0.60)) | |
| child["beta"] = float(np.clip(child["beta"], 0.7, 2.5)) | |
| child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 800.0, 3000.0)) | |
| child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 100.0, 2000.0)) | |
| child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 0.4, 1.5)) | |
| child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 4.0, 8.0)) | |
| child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 6.0, 12.0)) | |
| child["surf_factor"] = float(np.clip(child["surf_factor"], 0.9, 1.1)) | |
| child["geom_factor"] = float(np.clip(child["geom_factor"], 0.8, 1.1)) | |
| child["freq_factor"] = float(np.clip(child["freq_factor"], 0.9, 1.05)) | |
| child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 10.0, 40.0)) | |
| child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 40.0, 100.0)) | |
| child["S_f"] = float(np.clip(child["S_f"], 0.003, 0.010)) | |
| child["S_phi"] = float(np.clip(child["S_phi"], 0.002, 0.008)) | |
| child["S_c"] = float(np.clip(child["S_c"], 0.001, 0.006)) | |
| child["cryo_temp_K"] = np.nan | |
| elif family == "Cband": | |
| child["freq_GHz"] = float(np.clip(child["freq_GHz"], 5.00, 6.00)) | |
| child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 100.0, 280.0)) | |
| child["Q0"] = float(np.clip(child["Q0"], 2e4, 8e4)) | |
| child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 12.0, 30.0)) | |
| child["L_m"] = float(np.clip(child["L_m"], 0.20, 0.60)) | |
| child["beta"] = float(np.clip(child["beta"], 0.8, 3.0)) | |
| child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 200.0, 800.0)) | |
| child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 50.0, 500.0)) | |
| child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 0.8, 2.2)) | |
| child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 9.0, 15.0)) | |
| child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 10.0, 20.0)) | |
| child["surf_factor"] = float(np.clip(child["surf_factor"], 0.95, 1.20)) | |
| child["geom_factor"] = float(np.clip(child["geom_factor"], 0.9, 1.2)) | |
| child["freq_factor"] = float(np.clip(child["freq_factor"], 1.0, 1.15)) | |
| child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 5.0, 20.0)) | |
| child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 15.0, 40.0)) | |
| child["S_f"] = float(np.clip(child["S_f"], 0.010, 0.025)) | |
| child["S_phi"] = float(np.clip(child["S_phi"], 0.008, 0.020)) | |
| child["S_c"] = float(np.clip(child["S_c"], 0.006, 0.015)) | |
| child["cryo_temp_K"] = np.nan | |
| elif family == "Xband": | |
| child["freq_GHz"] = float(np.clip(child["freq_GHz"], 9.00, 12.00)) | |
| child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 80.0, 200.0)) | |
| child["Q0"] = float(np.clip(child["Q0"], 1e4, 6e4)) | |
| child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 20.0, 45.0)) | |
| child["L_m"] = float(np.clip(child["L_m"], 0.15, 0.45)) | |
| child["beta"] = float(np.clip(child["beta"], 0.8, 3.0)) | |
| child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 50.0, 300.0)) | |
| child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 10.0, 200.0)) | |
| child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 1.0, 3.0)) | |
| child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 12.0, 20.0)) | |
| child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 12.0, 24.0)) | |
| child["surf_factor"] = float(np.clip(child["surf_factor"], 1.0, 1.35)) | |
| child["geom_factor"] = float(np.clip(child["geom_factor"], 1.0, 1.3)) | |
| child["freq_factor"] = float(np.clip(child["freq_factor"], 1.1, 1.3)) | |
| child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 2.0, 10.0)) | |
| child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 5.0, 18.0)) | |
| child["S_f"] = float(np.clip(child["S_f"], 0.020, 0.060)) | |
| child["S_phi"] = float(np.clip(child["S_phi"], 0.015, 0.050)) | |
| child["S_c"] = float(np.clip(child["S_c"], 0.010, 0.035)) | |
| child["cryo_temp_K"] = np.nan | |
| else: # SC | |
| child["freq_GHz"] = float(np.clip(child["freq_GHz"], 0.80, 1.60)) | |
| child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 200.0, 400.0)) | |
| child["Q0"] = float(np.clip(child["Q0"], 5e9, 2e10)) | |
| child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 10.0, 25.0)) | |
| child["L_m"] = float(np.clip(child["L_m"], 0.60, 1.20)) | |
| child["beta"] = float(np.clip(child["beta"], 0.8, 2.0)) | |
| child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 5000.0, 100000.0)) | |
| child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 1.0, 100.0)) | |
| child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 4.0, 15.0)) | |
| child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 8.0, 20.0)) | |
| child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 15.0, 40.0)) | |
| child["surf_factor"] = float(np.clip(child["surf_factor"], 0.9, 1.1)) | |
| child["geom_factor"] = float(np.clip(child["geom_factor"], 0.8, 1.0)) | |
| child["freq_factor"] = float(np.clip(child["freq_factor"], 0.9, 1.05)) | |
| child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 10.0, 30.0)) | |
| child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 30.0, 80.0)) | |
| child["S_f"] = float(np.clip(child["S_f"], 0.004, 0.012)) | |
| child["S_phi"] = float(np.clip(child["S_phi"], 0.003, 0.010)) | |
| child["S_c"] = float(np.clip(child["S_c"], 0.002, 0.008)) | |
| cryo = child.get("cryo_temp_K", 2.0) | |
| if pd.isna(cryo): | |
| cryo = 2.0 | |
| child["cryo_temp_K"] = float(np.clip(cryo, 1.5, 5.0)) | |
| child["freq_Hz"] = child["freq_GHz"] * 1e9 | |
| child["Vc_MV"] = child["Eacc_MVpm"] * child["L_m"] | |
| child["Q_loaded"] = child["Q0"] / (1.0 + child["beta"]) | |
| return child | |
| def recalculate_physics_and_score( | |
| df: pd.DataFrame, | |
| thresholds: V5Thresholds, | |
| weights: V5Weights, | |
| bdr_preset="nominal", | |
| ) -> pd.DataFrame: | |
| params = RFCalibrationParamsV2() | |
| scored = df.copy() | |
| scored["family"] = scored["family"].fillna(scored["type"].map(infer_family)) | |
| scored["type"] = np.where( | |
| scored["family"].eq("SC"), | |
| "SC_elliptical_1p3GHz", | |
| np.where( | |
| scored["family"].eq("Lband"), | |
| "NC_pillbox_Lband", | |
| np.where( | |
| scored["family"].eq("Cband"), | |
| "NC_Cband_TW", | |
| np.where(scored["family"].eq("Xband"), "NC_Xband_SW", scored["type"]) | |
| ), | |
| ), | |
| ) | |
| scored["freq_Hz"] = scored["freq_GHz"] * 1e9 | |
| scored["Vc_MV"] = scored["Eacc_MVpm"] * scored["L_m"] | |
| scored["Q_loaded"] = scored["Q0"] / (1.0 + scored["beta"]) | |
| core = calibrated_surrogate_v4_core(scored, params=params) | |
| for k, v in core.items(): | |
| scored[k] = v | |
| scored["duty_cycle"] = duty_cycle( | |
| scored["rep_rate_Hz"].values, | |
| scored["pulse_length_ns"].values, | |
| ) | |
| scored["P_wall_avg_kW"] = wall_power_avg_kW( | |
| scored["P_wall_peak_kW"].values, | |
| scored["duty_cycle"].values, | |
| scored["P_aux_kW"].values, | |
| ) | |
| scored["deltaT_pulse"] = pulse_heating_proxy( | |
| scored["Eacc_MVpm"].values, | |
| scored["pulse_length_ns"].values, | |
| scored["geom_factor"].values, | |
| scored["family"].values, | |
| ) | |
| scored["BDR_eff"] = bdr_eff( | |
| scored["Eacc_MVpm"].values, | |
| scored["pulse_length_ns"].values, | |
| scored["surf_factor"].values, | |
| scored["freq_factor"].values, | |
| preset=bdr_preset, | |
| ) | |
| scored["C_src"] = source_compatibility( | |
| scored["P_wall_peak_kW"].values, | |
| scored["source_power_avail_kW"].values, | |
| scored["pulse_length_ns"].values, | |
| scored["rep_rate_Hz"].values, | |
| scored["family"].values, | |
| ) | |
| is_sc = scored["family"].eq("SC").values | |
| c_cool = np.empty(len(scored), dtype=float) | |
| if np.any(~is_sc): | |
| c_cool[~is_sc] = cooling_proxy_nc( | |
| scored.loc[~is_sc, "P_wall_avg_kW"].values, | |
| scored.loc[~is_sc, "cooling_capacity_kW"].values, | |
| ) | |
| if np.any(is_sc): | |
| p_dyn = ( | |
| scored.loc[is_sc, "P_wall_peak_kW"].values | |
| * scored.loc[is_sc, "duty_cycle"].values | |
| ) | |
| p_static = scored.loc[is_sc, "P_aux_kW"].values | |
| q_cool = scored.loc[is_sc, "cooling_capacity_kW"].values | |
| c_cool[is_sc] = cooling_proxy_sc( | |
| p_dyn, | |
| p_static, | |
| q_cool, | |
| ) | |
| scored["C_cool"] = c_cool | |
| R_f, R_phi, R_c, R_tol = tolerance_components( | |
| scored["fab_sigma_um"].values, | |
| scored["S_f"].values, | |
| scored["S_phi"].values, | |
| scored["S_c"].values, | |
| scored["delta_allow_um"].values, | |
| ) | |
| scored["R_f"] = R_f | |
| scored["R_phi"] = R_phi | |
| scored["R_c"] = R_c | |
| scored["R_tol"] = R_tol | |
| scored["candidate_id"] = scored.apply(make_candidate_id, axis=1) | |
| scored = attach_material_recommendations(scored) | |
| return apply_v5_filters_and_scores(scored, thresholds, weights) | |
| def crossover(p1: pd.Series, p2: pd.Series, rng: np.random.Generator) -> pd.Series: | |
| child = p1.copy() | |
| for gene in CONTINUOUS_GENES: | |
| if gene in p1.index and gene in p2.index: | |
| if not pd.isna(p1[gene]) and not pd.isna(p2[gene]): | |
| if rng.random() > 0.5: | |
| child[gene] = p2[gene] | |
| return child | |
| def mutate( | |
| child: pd.Series, | |
| rng: np.random.Generator, | |
| mutation_rate=0.20, | |
| mutation_scale=0.05, | |
| ) -> pd.Series: | |
| for gene in CONTINUOUS_GENES: | |
| if gene in child.index and not pd.isna(child[gene]) and rng.random() < mutation_rate: | |
| magnitude = max(abs(float(child[gene])), 1e-9) | |
| noise = rng.normal(0.0, mutation_scale * magnitude) | |
| child[gene] = float(child[gene]) + noise | |
| child = repair_child(child) | |
| return child | |
| def run_evolution( | |
| pop_size: int, | |
| generations: int, | |
| target_class: str, | |
| bdr_preset: str, | |
| class_config: Dict[str, V5Thresholds], | |
| seed: int = 7, | |
| out_dir: Path | None = None, | |
| ) -> pd.DataFrame: | |
| rng = np.random.default_rng(seed) | |
| print(f"\nLaunching evolution ({generations} generations, population {pop_size}) target: {target_class}") | |
| thresholds = class_config[target_class] | |
| weights = V5Weights() | |
| df = sample_base_designs(n=pop_size, seed=seed) | |
| df = sample_family_conditioned_operating_point(df, seed=seed + 101) | |
| current_pop = recalculate_physics_and_score(df, thresholds, weights, bdr_preset) | |
| current_pop["fitness"] = np.where(current_pop["hard_pass"], current_pop["balanced_v5"], 0.0) | |
| history = [] | |
| for gen in range(1, generations + 1): | |
| current_pop = current_pop.sort_values("fitness", ascending=False).reset_index(drop=True) | |
| best_fitness = float(current_pop["fitness"].max()) | |
| best_yield = float(current_pop.iloc[0]["delivered"]) if best_fitness > 0 else 0.0 | |
| best_family = current_pop.iloc[0]["family"] | |
| best_type = current_pop.iloc[0]["type"] | |
| print( | |
| f"Generation {gen}/{generations} | Max balanced score: {best_fitness:.4f} " | |
| f"| Max yield: {best_yield:.2e} | Best: {best_family} / {best_type}" | |
| ) | |
| history.append({ | |
| "generation": gen, | |
| "best_fitness": best_fitness, | |
| "best_yield": best_yield, | |
| "best_family": best_family, | |
| "best_type": best_type, | |
| "qualified_count": int(current_pop["hard_pass"].sum()), | |
| }) | |
| elite_size = max(1, int(pop_size * 0.10)) | |
| parent_pool_size = max(2, int(pop_size * 0.40)) | |
| elite = current_pop.head(elite_size).copy() | |
| parents = current_pop.head(parent_pool_size).copy() | |
| offspring_list = [] | |
| for _ in range(pop_size - elite_size): | |
| families = parents["family"].dropna().unique() | |
| if len(families) == 0: | |
| families = np.array(["Lband"]) | |
| family = rng.choice(families) | |
| fam_parents = parents[parents["family"] == family] | |
| if len(fam_parents) >= 2: | |
| pair_idx = rng.choice(fam_parents.index.to_numpy(), size=2, replace=True) | |
| p1 = fam_parents.loc[pair_idx[0]] | |
| p2 = fam_parents.loc[pair_idx[1]] | |
| else: | |
| pair_idx = rng.choice(parents.index.to_numpy(), size=2, replace=True) | |
| p1 = parents.loc[pair_idx[0]] | |
| p2 = parents.loc[pair_idx[1]] | |
| child = crossover(p1, p2, rng) | |
| child["family"] = family | |
| child["type"] = infer_type_from_family(family) | |
| child = mutate(child, rng, mutation_rate=0.25, mutation_scale=0.08) | |
| offspring_list.append(child) | |
| next_gen_base = pd.concat([elite, pd.DataFrame(offspring_list)], ignore_index=True) | |
| current_pop = recalculate_physics_and_score(next_gen_base, thresholds, weights, bdr_preset) | |
| current_pop["fitness"] = np.where(current_pop["hard_pass"], current_pop["balanced_v5"], 0.0) | |
| print("\nEvolution completed. Best evolved RF design:") | |
| winner = current_pop.sort_values("fitness", ascending=False).iloc[0] | |
| print(f"Type: {winner['type']} | Frequency: {winner['freq_GHz']:.3f} GHz | Eacc: {winner['Eacc_MVpm']:.2f} MV/m") | |
| print(f"Pulse: {winner['pulse_length_ns']:.1f} ns | Rep rate: {winner['rep_rate_Hz']:.1f} Hz") | |
| print(f"Yield: {winner['delivered']:.2e} | Pavg: {winner['P_wall_avg_kW']:.2f} kW | ΔT: {winner['deltaT_pulse']:.3f} K") | |
| print(f"Balanced v5: {winner['balanced_v5']:.4f}") | |
| print(f"Suggested material: {winner['suggested_primary_material']}") | |
| if out_dir is not None: | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| current_pop.sort_values(["fitness", "delivered"], ascending=False).to_csv( | |
| out_dir / "rf_mode_pack_v6_0_evolution_population.csv", | |
| index=False, | |
| ) | |
| pd.DataFrame(history).to_csv( | |
| out_dir / "rf_mode_pack_v6_0_evolution_history.csv", | |
| index=False, | |
| ) | |
| winner.to_frame().T.to_csv( | |
| out_dir / "rf_mode_pack_v6_0_evolution_winner.csv", | |
| index=False, | |
| ) | |
| (out_dir / "rf_mode_pack_v6_0_evolution_meta.json").write_text( | |
| json.dumps({ | |
| "pop_size": pop_size, | |
| "generations": generations, | |
| "target_class": target_class, | |
| "bdr_preset": bdr_preset, | |
| "seed": seed, | |
| }, ensure_ascii=False, indent=2), | |
| encoding="utf-8", | |
| ) | |
| return current_pop | |