voxforge-world / engine /sim /rf /evolution.py
peiti's picture
Upload folder using huggingface_hub
b154e4c verified
from __future__ import annotations
import json
from pathlib import Path
from typing import Dict
import numpy as np
import pandas as pd
from .calibration import CONTINUOUS_GENES, RFCalibrationParamsV2, V5Thresholds, V5Weights
from .core import calibrated_surrogate_v4_core
from .materials import attach_material_recommendations
from .metrics import (
apply_v5_filters_and_scores,
bdr_eff,
cooling_proxy_nc,
cooling_proxy_sc,
duty_cycle,
pulse_heating_proxy,
source_compatibility,
tolerance_components,
wall_power_avg_kW,
)
from .sampling import sample_base_designs, sample_family_conditioned_operating_point
from .utils import infer_family, infer_type_from_family, make_candidate_id
def repair_child(child: pd.Series) -> pd.Series:
family = child.get("family", infer_family(str(child.get("type", ""))))
if pd.isna(family) or str(family).strip() == "":
family = infer_family(str(child.get("type", "")))
if family == "unknown":
family = "Lband"
child["family"] = family
child["type"] = infer_type_from_family(family)
if family == "Lband":
child["freq_GHz"] = float(np.clip(child["freq_GHz"], 0.20, 0.80))
child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 60.0, 150.0))
child["Q0"] = float(np.clip(child["Q0"], 1e4, 8e4))
child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 8.0, 20.0))
child["L_m"] = float(np.clip(child["L_m"], 0.15, 0.60))
child["beta"] = float(np.clip(child["beta"], 0.7, 2.5))
child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 800.0, 3000.0))
child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 100.0, 2000.0))
child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 0.4, 1.5))
child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 4.0, 8.0))
child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 6.0, 12.0))
child["surf_factor"] = float(np.clip(child["surf_factor"], 0.9, 1.1))
child["geom_factor"] = float(np.clip(child["geom_factor"], 0.8, 1.1))
child["freq_factor"] = float(np.clip(child["freq_factor"], 0.9, 1.05))
child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 10.0, 40.0))
child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 40.0, 100.0))
child["S_f"] = float(np.clip(child["S_f"], 0.003, 0.010))
child["S_phi"] = float(np.clip(child["S_phi"], 0.002, 0.008))
child["S_c"] = float(np.clip(child["S_c"], 0.001, 0.006))
child["cryo_temp_K"] = np.nan
elif family == "Cband":
child["freq_GHz"] = float(np.clip(child["freq_GHz"], 5.00, 6.00))
child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 100.0, 280.0))
child["Q0"] = float(np.clip(child["Q0"], 2e4, 8e4))
child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 12.0, 30.0))
child["L_m"] = float(np.clip(child["L_m"], 0.20, 0.60))
child["beta"] = float(np.clip(child["beta"], 0.8, 3.0))
child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 200.0, 800.0))
child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 50.0, 500.0))
child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 0.8, 2.2))
child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 9.0, 15.0))
child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 10.0, 20.0))
child["surf_factor"] = float(np.clip(child["surf_factor"], 0.95, 1.20))
child["geom_factor"] = float(np.clip(child["geom_factor"], 0.9, 1.2))
child["freq_factor"] = float(np.clip(child["freq_factor"], 1.0, 1.15))
child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 5.0, 20.0))
child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 15.0, 40.0))
child["S_f"] = float(np.clip(child["S_f"], 0.010, 0.025))
child["S_phi"] = float(np.clip(child["S_phi"], 0.008, 0.020))
child["S_c"] = float(np.clip(child["S_c"], 0.006, 0.015))
child["cryo_temp_K"] = np.nan
elif family == "Xband":
child["freq_GHz"] = float(np.clip(child["freq_GHz"], 9.00, 12.00))
child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 80.0, 200.0))
child["Q0"] = float(np.clip(child["Q0"], 1e4, 6e4))
child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 20.0, 45.0))
child["L_m"] = float(np.clip(child["L_m"], 0.15, 0.45))
child["beta"] = float(np.clip(child["beta"], 0.8, 3.0))
child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 50.0, 300.0))
child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 10.0, 200.0))
child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 1.0, 3.0))
child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 12.0, 20.0))
child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 12.0, 24.0))
child["surf_factor"] = float(np.clip(child["surf_factor"], 1.0, 1.35))
child["geom_factor"] = float(np.clip(child["geom_factor"], 1.0, 1.3))
child["freq_factor"] = float(np.clip(child["freq_factor"], 1.1, 1.3))
child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 2.0, 10.0))
child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 5.0, 18.0))
child["S_f"] = float(np.clip(child["S_f"], 0.020, 0.060))
child["S_phi"] = float(np.clip(child["S_phi"], 0.015, 0.050))
child["S_c"] = float(np.clip(child["S_c"], 0.010, 0.035))
child["cryo_temp_K"] = np.nan
else: # SC
child["freq_GHz"] = float(np.clip(child["freq_GHz"], 0.80, 1.60))
child["R_over_Q_ohm"] = float(np.clip(child["R_over_Q_ohm"], 200.0, 400.0))
child["Q0"] = float(np.clip(child["Q0"], 5e9, 2e10))
child["Eacc_MVpm"] = float(np.clip(child["Eacc_MVpm"], 10.0, 25.0))
child["L_m"] = float(np.clip(child["L_m"], 0.60, 1.20))
child["beta"] = float(np.clip(child["beta"], 0.8, 2.0))
child["pulse_length_ns"] = float(np.clip(child["pulse_length_ns"], 5000.0, 100000.0))
child["rep_rate_Hz"] = float(np.clip(child["rep_rate_Hz"], 1.0, 100.0))
child["P_aux_kW"] = float(np.clip(child["P_aux_kW"], 4.0, 15.0))
child["source_power_avail_kW"] = float(np.clip(child["source_power_avail_kW"], 8.0, 20.0))
child["cooling_capacity_kW"] = float(np.clip(child["cooling_capacity_kW"], 15.0, 40.0))
child["surf_factor"] = float(np.clip(child["surf_factor"], 0.9, 1.1))
child["geom_factor"] = float(np.clip(child["geom_factor"], 0.8, 1.0))
child["freq_factor"] = float(np.clip(child["freq_factor"], 0.9, 1.05))
child["fab_sigma_um"] = float(np.clip(child["fab_sigma_um"], 10.0, 30.0))
child["delta_allow_um"] = float(np.clip(child["delta_allow_um"], 30.0, 80.0))
child["S_f"] = float(np.clip(child["S_f"], 0.004, 0.012))
child["S_phi"] = float(np.clip(child["S_phi"], 0.003, 0.010))
child["S_c"] = float(np.clip(child["S_c"], 0.002, 0.008))
cryo = child.get("cryo_temp_K", 2.0)
if pd.isna(cryo):
cryo = 2.0
child["cryo_temp_K"] = float(np.clip(cryo, 1.5, 5.0))
child["freq_Hz"] = child["freq_GHz"] * 1e9
child["Vc_MV"] = child["Eacc_MVpm"] * child["L_m"]
child["Q_loaded"] = child["Q0"] / (1.0 + child["beta"])
return child
def recalculate_physics_and_score(
df: pd.DataFrame,
thresholds: V5Thresholds,
weights: V5Weights,
bdr_preset="nominal",
) -> pd.DataFrame:
params = RFCalibrationParamsV2()
scored = df.copy()
scored["family"] = scored["family"].fillna(scored["type"].map(infer_family))
scored["type"] = np.where(
scored["family"].eq("SC"),
"SC_elliptical_1p3GHz",
np.where(
scored["family"].eq("Lband"),
"NC_pillbox_Lband",
np.where(
scored["family"].eq("Cband"),
"NC_Cband_TW",
np.where(scored["family"].eq("Xband"), "NC_Xband_SW", scored["type"])
),
),
)
scored["freq_Hz"] = scored["freq_GHz"] * 1e9
scored["Vc_MV"] = scored["Eacc_MVpm"] * scored["L_m"]
scored["Q_loaded"] = scored["Q0"] / (1.0 + scored["beta"])
core = calibrated_surrogate_v4_core(scored, params=params)
for k, v in core.items():
scored[k] = v
scored["duty_cycle"] = duty_cycle(
scored["rep_rate_Hz"].values,
scored["pulse_length_ns"].values,
)
scored["P_wall_avg_kW"] = wall_power_avg_kW(
scored["P_wall_peak_kW"].values,
scored["duty_cycle"].values,
scored["P_aux_kW"].values,
)
scored["deltaT_pulse"] = pulse_heating_proxy(
scored["Eacc_MVpm"].values,
scored["pulse_length_ns"].values,
scored["geom_factor"].values,
scored["family"].values,
)
scored["BDR_eff"] = bdr_eff(
scored["Eacc_MVpm"].values,
scored["pulse_length_ns"].values,
scored["surf_factor"].values,
scored["freq_factor"].values,
preset=bdr_preset,
)
scored["C_src"] = source_compatibility(
scored["P_wall_peak_kW"].values,
scored["source_power_avail_kW"].values,
scored["pulse_length_ns"].values,
scored["rep_rate_Hz"].values,
scored["family"].values,
)
is_sc = scored["family"].eq("SC").values
c_cool = np.empty(len(scored), dtype=float)
if np.any(~is_sc):
c_cool[~is_sc] = cooling_proxy_nc(
scored.loc[~is_sc, "P_wall_avg_kW"].values,
scored.loc[~is_sc, "cooling_capacity_kW"].values,
)
if np.any(is_sc):
p_dyn = (
scored.loc[is_sc, "P_wall_peak_kW"].values
* scored.loc[is_sc, "duty_cycle"].values
)
p_static = scored.loc[is_sc, "P_aux_kW"].values
q_cool = scored.loc[is_sc, "cooling_capacity_kW"].values
c_cool[is_sc] = cooling_proxy_sc(
p_dyn,
p_static,
q_cool,
)
scored["C_cool"] = c_cool
R_f, R_phi, R_c, R_tol = tolerance_components(
scored["fab_sigma_um"].values,
scored["S_f"].values,
scored["S_phi"].values,
scored["S_c"].values,
scored["delta_allow_um"].values,
)
scored["R_f"] = R_f
scored["R_phi"] = R_phi
scored["R_c"] = R_c
scored["R_tol"] = R_tol
scored["candidate_id"] = scored.apply(make_candidate_id, axis=1)
scored = attach_material_recommendations(scored)
return apply_v5_filters_and_scores(scored, thresholds, weights)
def crossover(p1: pd.Series, p2: pd.Series, rng: np.random.Generator) -> pd.Series:
child = p1.copy()
for gene in CONTINUOUS_GENES:
if gene in p1.index and gene in p2.index:
if not pd.isna(p1[gene]) and not pd.isna(p2[gene]):
if rng.random() > 0.5:
child[gene] = p2[gene]
return child
def mutate(
child: pd.Series,
rng: np.random.Generator,
mutation_rate=0.20,
mutation_scale=0.05,
) -> pd.Series:
for gene in CONTINUOUS_GENES:
if gene in child.index and not pd.isna(child[gene]) and rng.random() < mutation_rate:
magnitude = max(abs(float(child[gene])), 1e-9)
noise = rng.normal(0.0, mutation_scale * magnitude)
child[gene] = float(child[gene]) + noise
child = repair_child(child)
return child
def run_evolution(
pop_size: int,
generations: int,
target_class: str,
bdr_preset: str,
class_config: Dict[str, V5Thresholds],
seed: int = 7,
out_dir: Path | None = None,
) -> pd.DataFrame:
rng = np.random.default_rng(seed)
print(f"\nLaunching evolution ({generations} generations, population {pop_size}) target: {target_class}")
thresholds = class_config[target_class]
weights = V5Weights()
df = sample_base_designs(n=pop_size, seed=seed)
df = sample_family_conditioned_operating_point(df, seed=seed + 101)
current_pop = recalculate_physics_and_score(df, thresholds, weights, bdr_preset)
current_pop["fitness"] = np.where(current_pop["hard_pass"], current_pop["balanced_v5"], 0.0)
history = []
for gen in range(1, generations + 1):
current_pop = current_pop.sort_values("fitness", ascending=False).reset_index(drop=True)
best_fitness = float(current_pop["fitness"].max())
best_yield = float(current_pop.iloc[0]["delivered"]) if best_fitness > 0 else 0.0
best_family = current_pop.iloc[0]["family"]
best_type = current_pop.iloc[0]["type"]
print(
f"Generation {gen}/{generations} | Max balanced score: {best_fitness:.4f} "
f"| Max yield: {best_yield:.2e} | Best: {best_family} / {best_type}"
)
history.append({
"generation": gen,
"best_fitness": best_fitness,
"best_yield": best_yield,
"best_family": best_family,
"best_type": best_type,
"qualified_count": int(current_pop["hard_pass"].sum()),
})
elite_size = max(1, int(pop_size * 0.10))
parent_pool_size = max(2, int(pop_size * 0.40))
elite = current_pop.head(elite_size).copy()
parents = current_pop.head(parent_pool_size).copy()
offspring_list = []
for _ in range(pop_size - elite_size):
families = parents["family"].dropna().unique()
if len(families) == 0:
families = np.array(["Lband"])
family = rng.choice(families)
fam_parents = parents[parents["family"] == family]
if len(fam_parents) >= 2:
pair_idx = rng.choice(fam_parents.index.to_numpy(), size=2, replace=True)
p1 = fam_parents.loc[pair_idx[0]]
p2 = fam_parents.loc[pair_idx[1]]
else:
pair_idx = rng.choice(parents.index.to_numpy(), size=2, replace=True)
p1 = parents.loc[pair_idx[0]]
p2 = parents.loc[pair_idx[1]]
child = crossover(p1, p2, rng)
child["family"] = family
child["type"] = infer_type_from_family(family)
child = mutate(child, rng, mutation_rate=0.25, mutation_scale=0.08)
offspring_list.append(child)
next_gen_base = pd.concat([elite, pd.DataFrame(offspring_list)], ignore_index=True)
current_pop = recalculate_physics_and_score(next_gen_base, thresholds, weights, bdr_preset)
current_pop["fitness"] = np.where(current_pop["hard_pass"], current_pop["balanced_v5"], 0.0)
print("\nEvolution completed. Best evolved RF design:")
winner = current_pop.sort_values("fitness", ascending=False).iloc[0]
print(f"Type: {winner['type']} | Frequency: {winner['freq_GHz']:.3f} GHz | Eacc: {winner['Eacc_MVpm']:.2f} MV/m")
print(f"Pulse: {winner['pulse_length_ns']:.1f} ns | Rep rate: {winner['rep_rate_Hz']:.1f} Hz")
print(f"Yield: {winner['delivered']:.2e} | Pavg: {winner['P_wall_avg_kW']:.2f} kW | ΔT: {winner['deltaT_pulse']:.3f} K")
print(f"Balanced v5: {winner['balanced_v5']:.4f}")
print(f"Suggested material: {winner['suggested_primary_material']}")
if out_dir is not None:
out_dir.mkdir(parents=True, exist_ok=True)
current_pop.sort_values(["fitness", "delivered"], ascending=False).to_csv(
out_dir / "rf_mode_pack_v6_0_evolution_population.csv",
index=False,
)
pd.DataFrame(history).to_csv(
out_dir / "rf_mode_pack_v6_0_evolution_history.csv",
index=False,
)
winner.to_frame().T.to_csv(
out_dir / "rf_mode_pack_v6_0_evolution_winner.csv",
index=False,
)
(out_dir / "rf_mode_pack_v6_0_evolution_meta.json").write_text(
json.dumps({
"pop_size": pop_size,
"generations": generations,
"target_class": target_class,
"bdr_preset": bdr_preset,
"seed": seed,
}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return current_pop