feiyang-cai's picture
Clarify generic ZeroGPU runtime failures
4b289bc verified
Raw
History Blame Contribute Delete
39.6 kB
from __future__ import annotations
import json
import os
from functools import lru_cache
from typing import Any, Dict, List
import gradio as gr
import numpy as np
import torch
from model_inverse import inverse_design as thermoforming_inverse_design
from space_lib.cubic_curves import (
EPS33_ROW,
LATERAL_ROW,
MODE_TO_ROW,
default_x_for_mode,
evaluate_cubic_no_intercept,
fit_cubic_no_intercept,
)
from space_lib.infer import load_model_bundle, postprocess_sample, sample
from space_lib.metadata_v5 import load_metadata_v5, normalize_condition_v5
from space_lib.plots_v5 import plot_condition_and_simulations_v5, plot_required_curves_from_series
from space_lib.simulate import simulate_instances
try:
import spaces # type: ignore
SPACES_RUNTIME = True
except ImportError: # local / non-ZeroGPU fallback
SPACES_RUNTIME = False
class _SpacesCompat:
@staticmethod
def GPU(*_args, **_kwargs):
def decorator(fn):
return fn
return decorator
spaces = _SpacesCompat() # type: ignore
APP_DIR = os.path.dirname(os.path.abspath(__file__))
ASSETS_DIR = os.path.join(APP_DIR, "assets")
MODEL_ASSETS_DIR = os.path.join(ASSETS_DIR, "model")
SIM_ASSETS_DIR = os.path.join(ASSETS_DIR, "quarter_sim")
DEFAULT_CHECKPOINT_DIR = os.environ.get(
"MG_CHECKPOINT_DIR",
MODEL_ASSETS_DIR,
)
DEFAULT_DATA_DIR = os.environ.get(
"MG_DATA_DIR",
MODEL_ASSETS_DIR,
)
DEFAULT_CURVE_DIR = os.environ.get(
"MG_CURVE_DIR",
os.path.join(SIM_ASSETS_DIR, "RVE_datasets"),
)
CHECKPOINT_DIR = DEFAULT_CHECKPOINT_DIR
DATA_DIR = DEFAULT_DATA_DIR
CURVE_DIR = DEFAULT_CURVE_DIR
NORMALIZATION_METHOD = "zscore"
ANGLE_RESOLUTION = 1.0
DEFAULT_N_GENERATE = 3
MAX_N_GENERATE = int(os.environ.get("MG_MAX_GENERATE", "6"))
DEFAULT_N_GENERATE = min(int(os.environ.get("MG_DEFAULT_N_GENERATE", str(DEFAULT_N_GENERATE))), MAX_N_GENERATE)
GPU_DURATION_SEC = int(os.environ.get("ZEROGPU_DURATION", "45"))
DEFAULT_TEST_INDEX = int(os.environ.get("MG_DEFAULT_TEST_INDEX", "512"))
THERMO_N_RESTARTS = int(os.environ.get("MG_THERMO_RESTARTS", "6"))
THERMO_EPOCHS = int(os.environ.get("MG_THERMO_EPOCHS", "300"))
MATERIAL_TYPE_NAMES = {0: "CPP", 1: "CHDPE", 2: "GPP", 3: "GHDPE"}
NO_SELECTED_DESIGN = "No selected design is available."
THERMO_NOT_RUN = "Thermoforming has not run yet."
REQUIREMENTS: List[Dict[str, Any]] = [
{"id": "11_sig11", "title": "Mode 11 Stress", "mode": "11", "response": "stress", "input_kind": "curve3"},
{"id": "11_eps22", "title": "Mode 11 Lateral", "mode": "11", "response": "lateral", "input_kind": "ratio2", "ratio_name": "Poisson's ratio (vxy)"},
{"id": "11_eps33", "title": "Mode 11 eps33", "mode": "11", "response": "eps33", "input_kind": "curve3"},
{"id": "22_sig22", "title": "Mode 22 Stress", "mode": "22", "response": "stress", "input_kind": "curve3"},
{"id": "22_eps11", "title": "Mode 22 Lateral", "mode": "22", "response": "lateral", "input_kind": "ratio2", "ratio_name": "Poisson's ratio (vyx)"},
{"id": "22_eps33", "title": "Mode 22 eps33", "mode": "22", "response": "eps33", "input_kind": "curve3"},
{"id": "12_sig12", "title": "Mode 12 Stress", "mode": "12", "response": "stress", "input_kind": "curve3"},
]
def _curve_field_labels(mode: str, response: str) -> tuple[str, str, str]:
x_end = float(default_x_for_mode(mode)[-1])
pct = int(round(x_end * 100))
if response == "stress":
return "Initial stiffness", f"{pct}% strain stiffness", f"{pct}% strain stress"
if response == "lateral":
return "Initial strain ratio", f"{pct}% strain ratio", f"{pct}% strain response"
return "Initial eps33 ratio", f"{pct}% strain ratio", f"{pct}% strain eps33"
def _coeffs_from_requirements(initial_slope: float, end_slope: float, end_value: float, x_end: float) -> np.ndarray:
A = np.array([[2.0 * x_end, 3.0 * x_end * x_end], [x_end * x_end, x_end * x_end * x_end]], dtype=np.float32)
b = np.array([end_slope - initial_slope, end_value - initial_slope * x_end], dtype=np.float32)
quad, cubic = np.linalg.solve(A, b)
return np.asarray([cubic, quad, initial_slope], dtype=np.float32)
def _requirements_from_coeffs(coeffs_7x3: np.ndarray) -> dict[str, float]:
coeffs = np.asarray(coeffs_7x3, dtype=np.float32).reshape(7, 3)
out: dict[str, float] = {}
for idx, spec in enumerate(REQUIREMENTS):
mode = spec["mode"]
x_end = float(default_x_for_mode(mode)[-1])
c3, c2, c1 = [float(v) for v in coeffs[idx]]
if spec["input_kind"] == "ratio2":
out[f"{spec['id']}_initial"] = c1
out[f"{spec['id']}_end_ratio"] = 3.0 * c3 * x_end * x_end + 2.0 * c2 * x_end + c1
else:
initial = c1
end_slope = 3.0 * c3 * x_end * x_end + 2.0 * c2 * x_end + c1
end_value = c3 * x_end**3 + c2 * x_end**2 + c1 * x_end
out[f"{spec['id']}_initial"] = initial
out[f"{spec['id']}_end_slope"] = end_slope
out[f"{spec['id']}_end_value"] = end_value
return out
@lru_cache(maxsize=4)
def _default_requirements(data_dir: str) -> dict[str, float]:
arr = np.load(os.path.join(data_dir, "test_data.npz"))["polynomial_coefficients"]
idx = int(DEFAULT_TEST_INDEX) % int(len(arr))
coeffs = np.asarray(arr[idx], dtype=np.float32).reshape(7, 3)
return _requirements_from_coeffs(coeffs)
@lru_cache(maxsize=4)
def _get_meta(data_dir: str):
return load_metadata_v5(data_dir)
_MODEL_CACHE: dict[tuple[str, str, float], Any] = {}
def _runtime_device() -> str:
return "cuda" if torch.cuda.is_available() else "cpu"
def _runtime_summary() -> str:
spaces_mode = "available" if SPACES_RUNTIME else "not installed locally; using no-op fallback"
return (
"GPU-compatible Gradio app for the quarter-v5 model. "
f"Generation uses `{_runtime_device()}` when invoked; the Hugging Face `spaces` runtime is {spaces_mode}. "
"This supports both ZeroGPU (`@spaces.GPU`) and dedicated GPU hardware such as `Nvidia T4 small`."
)
def _get_model(checkpoint_dir: str, device: str, angle_resolution: float):
key = (checkpoint_dir, device, float(angle_resolution))
if key not in _MODEL_CACHE:
_MODEL_CACHE[key] = load_model_bundle(checkpoint_dir, device=device, angle_resolution=angle_resolution)
return _MODEL_CACHE[key]
def _angle_categories_from_meta(meta, angle_resolution: float) -> np.ndarray:
angle_min_raw = float(meta.raw.get("angle_min", 0.0))
angle_max_raw = float(meta.raw.get("angle_max", 90.0))
res = float(angle_resolution)
ang_min = np.floor(angle_min_raw / res) * res
ang_max = np.ceil(angle_max_raw / res) * res
num = int((ang_max - ang_min) / res) + 1
return np.linspace(ang_min, ang_max, num, dtype=np.float32)
def _balance_partner(angle: float) -> float:
a = abs(float(angle)) % 180.0
if np.isclose(a, 0.0):
return 0.0
if np.isclose(a, 90.0):
return 90.0
return -float(angle)
def _full_angles_from_quarter(quarter_angles: list[float]) -> list[float]:
ordered_quarter = [float(a) for a in sorted(quarter_angles)]
half_angles = ordered_quarter + [_balance_partner(a) for a in reversed(ordered_quarter)]
return half_angles + list(reversed(half_angles))
def _mat_type_to_matrix_fiber(mat_type: str) -> tuple[str, str]:
mt = (mat_type or "").upper().strip()
fiber = "Carbon" if mt.startswith("C") else ("Glass" if mt.startswith("G") else mt[:1])
matrix = "HDPE" if mt.endswith("HDPE") else ("PP" if mt.endswith("PP") else mt[1:])
return matrix, fiber
def _score_design(cond_coeffs: np.ndarray, sim_modes: Dict[str, Dict[str, np.ndarray]]) -> float:
cond = np.asarray(cond_coeffs, dtype=np.float32).reshape(7, 3)
losses: list[float] = []
for mode, data in sim_modes.items():
strain = np.asarray(data["strain"], dtype=np.float32)
stress_pred = evaluate_cubic_no_intercept(cond[MODE_TO_ROW[mode]], strain)
losses.append(float(np.mean((stress_pred - np.asarray(data["stress"], dtype=np.float32)) ** 2)))
if mode in ("11", "22") and data.get("lateral") is not None:
lateral_pred = evaluate_cubic_no_intercept(cond[int(LATERAL_ROW[mode])], strain)
losses.append(float(np.mean((lateral_pred - np.asarray(data["lateral"], dtype=np.float32)) ** 2)))
if mode in ("11", "22") and data.get("eps33") is not None:
eps33_pred = evaluate_cubic_no_intercept(cond[int(EPS33_ROW[mode])], strain)
losses.append(float(np.mean((eps33_pred - np.asarray(data["eps33"], dtype=np.float32)) ** 2)))
return float(np.mean(losses)) if losses else float("inf")
def _build_required_curves_and_condition(values: List[float]) -> tuple[dict[str, np.ndarray], np.ndarray]:
coeff_rows = []
curves: dict[str, np.ndarray] = {}
cursor = 0
for idx, spec in enumerate(REQUIREMENTS):
mode = spec["mode"]
x = default_x_for_mode(mode)
x_end = float(x[-1])
if spec["input_kind"] == "ratio2":
initial = float(values[cursor])
end_ratio = float(values[cursor + 1])
cursor += 2
quad = (end_ratio - initial) / (2.0 * x_end) if x_end > 0 else 0.0
y = initial * x + quad * x * x
else:
initial = float(values[cursor])
end_slope = float(values[cursor + 1])
end_value = float(values[cursor + 2])
cursor += 3
coeffs = _coeffs_from_requirements(initial, end_slope, end_value, x_end)
y = evaluate_cubic_no_intercept(coeffs, x)
fitted = fit_cubic_no_intercept(x, y, degree=3)
if fitted is None:
raise RuntimeError(f"Failed to fit cubic coefficients for requirement index {idx}")
coeff_rows.append(fitted)
req_id = spec["id"]
if req_id == "11_sig11":
curves["eps11"] = x
curves["sig11_mpa"] = y
elif req_id == "11_eps22":
curves["eps22_from_eps11"] = y
elif req_id == "11_eps33":
curves["eps33_from_eps11"] = y
elif req_id == "22_sig22":
curves["eps22"] = x
curves["sig22_mpa"] = y
elif req_id == "22_eps11":
curves["eps11_from_eps22"] = y
elif req_id == "22_eps33":
curves["eps33_from_eps22"] = y
elif req_id == "12_sig12":
curves["eps12"] = x
curves["sig12_mpa"] = y
return curves, np.stack(coeff_rows, axis=0).astype(np.float32)
def _load_condition_defaults_from_test(idx: int) -> List[float]:
arr = np.load(os.path.join(DATA_DIR, "test_data.npz"))["polynomial_coefficients"]
coeffs = np.asarray(arr[int(idx) % len(arr)], dtype=np.float32).reshape(7, 3)
reqs = _requirements_from_coeffs(coeffs)
values: List[float] = []
for spec in REQUIREMENTS:
req_id = spec["id"]
if spec["input_kind"] == "ratio2":
values.extend([float(reqs[f"{req_id}_initial"]), float(reqs[f"{req_id}_end_ratio"])])
else:
values.extend(
[
float(reqs[f"{req_id}_initial"]),
float(reqs[f"{req_id}_end_slope"]),
float(reqs[f"{req_id}_end_value"]),
]
)
return values
def _load_exact_test_coeffs(idx: int) -> np.ndarray:
arr = np.load(os.path.join(DATA_DIR, "test_data.npz"))["polynomial_coefficients"]
coeffs = np.asarray(arr[int(idx) % len(arr)], dtype=np.float32).reshape(7, 3)
return coeffs
def _load_exact_test_payload(idx: int) -> str:
data = np.load(os.path.join(DATA_DIR, "test_data.npz"))
idx = int(idx) % int(len(data["polynomial_coefficients"]))
coeffs = np.asarray(data["polynomial_coefficients"][idx], dtype=np.float32).reshape(7, 3)
mat_id = int(data["material_type"][idx])
vf = float(data["volume_fraction"][idx])
seq_len = int(data["stacking_sequence_lengths"][idx])
quarter_angles = [float(a) for a in data["stacking_sequence"][idx][:seq_len]]
return json.dumps(
{
"coeffs": coeffs.tolist(),
"ground_truth": {
"test_index": idx,
"mat_type": MATERIAL_TYPE_NAMES.get(mat_id, str(mat_id)),
"vf": vf,
"quarter_angles": quarter_angles,
"full_angles": _full_angles_from_quarter(quarter_angles),
},
}
)
def _condition_payload_from_coeffs(cond_coeffs: np.ndarray, source: str, ground_truth: dict[str, Any] | None = None) -> str:
payload: dict[str, Any] = {
"source": source,
"coeffs": np.asarray(cond_coeffs, dtype=np.float32).reshape(7, 3).tolist(),
}
if ground_truth:
payload["ground_truth"] = ground_truth
return json.dumps(payload)
def _curves_from_coeffs(cond_coeffs: np.ndarray) -> dict[str, np.ndarray]:
coeffs = np.asarray(cond_coeffs, dtype=np.float32).reshape(7, 3)
curves: dict[str, np.ndarray] = {}
mapping = [
("11_sig11", "11"),
("11_eps22", "11"),
("11_eps33", "11"),
("22_sig22", "22"),
("22_eps11", "22"),
("22_eps33", "22"),
("12_sig12", "12"),
]
for idx, (req_id, mode) in enumerate(mapping):
x = default_x_for_mode(mode)
y = evaluate_cubic_no_intercept(coeffs[idx], x)
if req_id == "11_sig11":
curves["eps11"] = x
curves["sig11_mpa"] = y
elif req_id == "11_eps22":
curves["eps22_from_eps11"] = y
elif req_id == "11_eps33":
curves["eps33_from_eps11"] = y
elif req_id == "22_sig22":
curves["eps22"] = x
curves["sig22_mpa"] = y
elif req_id == "22_eps11":
curves["eps11_from_eps22"] = y
elif req_id == "22_eps33":
curves["eps33_from_eps22"] = y
elif req_id == "12_sig12":
curves["eps12"] = x
curves["sig12_mpa"] = y
return curves
def _serialize_designs(designs: list[Dict[str, Any]]) -> str:
return json.dumps(designs)
def _deserialize_designs(payload: str) -> list[Dict[str, Any]]:
if not payload:
return []
return json.loads(payload)
def _format_design_choice(design: Dict[str, Any]) -> str:
return (
f"Gen {design['design_id']} | score={design['score']:.4g} | "
f"{design['mat_type']} | vf={design['vf']:.4f} | q={design['quarter_angles']}"
)
def _format_ground_truth_summary(gt: Dict[str, Any]) -> str:
return (
f"Gen GT: {gt['mat_type']}, vf={float(gt['vf']):.4f}, "
f"q={gt['quarter_angles']}, score=ground truth"
)
def _design_markdown(design: Dict[str, Any]) -> str:
quarter = ", ".join(f"{a:.1f}" for a in design["quarter_angles"])
full = ", ".join(f"{a:.1f}" for a in design["full_angles"])
return (
f"Selected design: Gen {design['design_id']}\n\n"
f"Material: {design['mat_type']} ({design['matrix']} matrix, {design['fiber']} fiber)\n\n"
f"VF: {design['vf']:.4f}\n\n"
f"Quarter angles: {quarter}\n\n"
f"Full stack used by simulation: {full}\n\n"
f"Similarity score: {design['score']:.6g}"
)
@spaces.GPU(duration=GPU_DURATION_SEC)
def _run_generation_model(cond_coeffs: np.ndarray, n_generate: int) -> list[dict[str, Any]]:
meta = _get_meta(DATA_DIR)
device = _runtime_device()
bundle = _get_model(CHECKPOINT_DIR, device=device, angle_resolution=ANGLE_RESOLUTION)
cond_norm = normalize_condition_v5(cond_coeffs, meta, NORMALIZATION_METHOD)
n_generate = int(max(1, min(int(n_generate), MAX_N_GENERATE)))
cond_t = torch.tensor(cond_norm, dtype=torch.float32, device=device).view(1, -1).repeat(n_generate, 1)
angle_categories = _angle_categories_from_meta(meta, ANGLE_RESOLUTION) if bundle.use_discrete_angles else None
out = sample(
model=bundle.model,
disc_diff_mat=bundle.disc_diff_mat,
disc_diff_vf_category=bundle.disc_diff_vf_category,
disc_diff_layer=bundle.disc_diff_layer,
disc_diff_angle=bundle.disc_diff_angle,
cont_diff=bundle.cont_diff,
cond=cond_t,
mask_ids=bundle.mask_ids,
device=device,
remask_prob=0.1,
use_discrete_angles=bundle.use_discrete_angles,
)
generated: list[dict[str, Any]] = []
for i in range(n_generate):
sample_i = {
"material_t": out["material_t"][i : i + 1],
"vf_category_t": out["vf_category_t"][i : i + 1],
"layer_t": out["layer_t"][i : i + 1],
"angle_t": out["angle_t"][i : i + 1],
}
mat_type, vf, quarter_angles = postprocess_sample(
sample_i,
use_discrete_angles=bundle.use_discrete_angles,
angle_categories_deg=angle_categories,
)
generated.append(
{
"design_id": i + 1,
"mat_type": mat_type,
"vf": float(vf),
"quarter_angles": [float(a) for a in quarter_angles],
}
)
return generated
def ui_generate_required_curves(*values: float):
curves, cond_coeffs = _build_required_curves_and_condition(list(values))
return plot_required_curves_from_series(curves), _condition_payload_from_coeffs(cond_coeffs, source="manual")
def ui_set_buttons(preview: bool, load: bool, design: bool, thermo: bool):
return (
gr.update(interactive=preview),
gr.update(interactive=load),
gr.update(interactive=design),
gr.update(interactive=thermo),
)
def ui_disable_all_buttons():
return ui_set_buttons(False, False, False, False)
def ui_clear_design_outputs():
return (
None,
"",
"",
"",
gr.update(choices=[], value=None),
NO_SELECTED_DESIGN,
"",
NO_SELECTED_DESIGN,
THERMO_NOT_RUN,
gr.update(value=0.0),
gr.update(value=0.0),
gr.update(value=0.0),
gr.update(value=0.0),
)
def ui_generation_started():
message = (
"### Requesting ZeroGPU for candidate generation\n\n"
"The Space is requesting a temporary CUDA device from Hugging Face ZeroGPU. "
"If generation stops here or a backend error appears, ZeroGPU allocation likely failed "
"before CUDA became available, or the account/organization quota is exhausted. "
"Retry after quota is restored or switch the Space hardware to `Nvidia T4 small`."
)
return (
None,
"Requesting ZeroGPU allocation...",
message,
"Requesting ZeroGPU allocation for candidate generation.",
gr.update(choices=[], value=None),
NO_SELECTED_DESIGN,
"",
NO_SELECTED_DESIGN,
THERMO_NOT_RUN,
gr.update(value=0.0),
gr.update(value=0.0),
gr.update(value=0.0),
gr.update(value=0.0),
)
def ui_manual_requirements_changed():
return (
"",
"Requirements changed. Generate curves before inverse design.",
None,
*ui_clear_design_outputs(),
*ui_set_buttons(True, False, False, False),
)
def ui_test_index_changed():
return (
"",
"Test index changed. Load the test condition before inverse design.",
None,
*ui_clear_design_outputs(),
*ui_set_buttons(False, True, False, False),
)
def ui_input_mode_changed(mode: str):
manual = mode == "Define by Input Values"
status = ""
if manual:
status = "Manual requirement mode is active. Generate curves before inverse design."
else:
status = "Load-from-test mode is active. Load a test condition before inverse design."
return (
gr.update(visible=manual),
gr.update(visible=not manual),
"",
status,
None,
*ui_clear_design_outputs(),
*ui_set_buttons(manual, not manual, False, False),
)
def ui_load_test_condition(idx: int):
vals = _load_condition_defaults_from_test(int(idx))
coeffs = _load_exact_test_coeffs(int(idx))
curves = _curves_from_coeffs(coeffs)
payload = _load_exact_test_payload(int(idx))
return (*vals, f"Loaded v5 test condition `{int(idx)}`.", plot_required_curves_from_series(curves), payload)
def ui_after_requirement_ready(mode: str):
manual = mode == "Define by Input Values"
return ui_set_buttons(manual, not manual, True, False)
def ui_inverse_design(n_generate: float, mode: str, load_idx: float, *values: float):
gt_summary = ""
if mode == "Load from Test Data":
payload = json.loads(_load_exact_test_payload(int(load_idx)))
cond_coeffs = np.asarray(payload["coeffs"], dtype=np.float32).reshape(7, 3)
curves = _curves_from_coeffs(cond_coeffs)
gt = payload.get("ground_truth", {})
if gt:
gt_summary = _format_ground_truth_summary(gt)
else:
curves, cond_coeffs = _build_required_curves_and_condition(list(values))
req_plot = plot_required_curves_from_series(curves)
n_generate_int = int(max(1, min(int(n_generate), MAX_N_GENERATE)))
try:
generated = _run_generation_model(cond_coeffs, n_generate_int)
except Exception as exc:
exc_text = str(exc)
if "No CUDA GPUs are available" in exc_text or exc_text.strip("'\"") == "RuntimeError":
title = "ZeroGPU allocation failed or returned a generic backend error."
elif "ZeroGPU quota" in exc_text or "quota" in exc_text.lower():
title = "ZeroGPU quota is exhausted or unavailable."
else:
title = "Generation failed before candidate simulation."
message = (
f"{title}\n\n"
f"Error: {type(exc).__name__}: {exc_text}\n\n"
"Hugging Face's native ZeroGPU waiting popup only appears while a GPU allocation is actively queued. "
"If the ZeroGPU backend immediately fails, times out, or returns a generic `RuntimeError`, the app may "
"only receive this exception after waiting. Retry after quota is restored, authenticate for more quota, "
"or switch the Space hardware to `Nvidia T4 small`."
)
return (
req_plot,
None,
message,
f"### {title}\n\n{message}",
message,
gr.update(choices=[], value=None),
title,
"",
)
simulations_by_design: Dict[int, Dict[str, Dict[str, np.ndarray]]] = {}
ranked: list[Dict[str, Any]] = []
summaries: list[str] = []
logs: list[str] = []
for design in generated:
mat_type = design["mat_type"]
vf = float(design["vf"])
quarter_angles = [float(a) for a in design["quarter_angles"]]
try:
sim = simulate_instances(
curve_dir=CURVE_DIR,
mat_type=mat_type,
vf=vf,
quarter_angles=quarter_angles,
instances=[int(design["design_id"])],
num_output_points=20,
)
simulations_by_design.update(sim)
score = _score_design(cond_coeffs, sim.get(int(design["design_id"]), {}))
matrix, fiber = _mat_type_to_matrix_fiber(mat_type)
ranked.append(
{
**design,
"score": float(score),
"matrix": matrix,
"fiber": fiber,
"full_angles": _full_angles_from_quarter(quarter_angles),
}
)
summaries.append(
f"Gen {design['design_id']}: {mat_type}, vf={vf:.4f}, q={quarter_angles}, score={score:.6g}"
)
logs.append(f"Simulation complete for generated design {design['design_id']}.")
except Exception as exc:
logs.append(f"Simulation failed for generated design {design['design_id']}: {type(exc).__name__}: {exc}")
ranked = sorted(ranked, key=lambda d: d["score"])
fig = plot_condition_and_simulations_v5(cond_coeffs, simulations_by_design)
if ranked:
selected = ranked[0]
choices = [_format_design_choice(d) for d in ranked]
payload = _serialize_designs(ranked)
return (
req_plot,
fig,
((gt_summary + "\n") if gt_summary else "") + "\n".join(summaries),
"\n".join(logs) if logs else "Done.",
"",
gr.update(choices=choices, value=choices[0]),
_design_markdown(selected),
payload,
)
return (
req_plot,
fig,
"No valid generated designs were scored.",
"\n".join(logs) if logs else "Generation failed.",
"### No valid generated designs were scored.\n\nSee the run log for simulation details.",
gr.update(choices=[], value=None),
"No selected design is available.",
"",
)
def ui_after_design_complete(mode: str, payload: str):
manual = mode == "Define by Input Values"
thermo_ready = bool(payload)
return ui_set_buttons(manual, not manual, True, thermo_ready)
def ui_select_design(selected_label: str, payload: str):
designs = _deserialize_designs(payload)
if not designs:
return "No selected design is available."
for design in designs:
if _format_design_choice(design) == selected_label:
return _design_markdown(design)
return _design_markdown(designs[0])
def ui_thermoforming(selected_label: str, payload: str, angle_a: float, angle_b: float, angle_c: float, max_stress: float):
designs = _deserialize_designs(payload)
if not designs:
return "No selected design is available.", "Thermoforming cannot run before inverse design."
selected = designs[0]
for design in designs:
if _format_design_choice(design) == selected_label:
selected = design
break
domain_issues: list[str] = []
if selected["fiber"] != "Carbon":
domain_issues.append(
f"unsupported fiber `{selected['fiber']}`; thermoforming surrogate was trained only on carbon-fiber systems"
)
if not (0.4 <= float(selected["vf"]) <= 0.6):
domain_issues.append(
f"volume fraction {float(selected['vf']):.4f} is outside the surrogate training range [0.4, 0.6]"
)
n_layers = int(len(selected["full_angles"]))
if not (2 <= n_layers <= 8):
domain_issues.append(
f"ply count {n_layers} is outside the surrogate training range [2, 8]"
)
cf_mat = "CF/PEEK" if selected["matrix"] == "HDPE" else "CF/PA6"
best = thermoforming_inverse_design(
material=cf_mat,
ply_number=n_layers,
fiber_vf=float(selected["vf"]),
y_target=[float(angle_a), float(angle_b), float(angle_c), float(max_stress)],
n_restarts=THERMO_N_RESTARTS,
epochs=THERMO_EPOCHS,
)
warning_md = ""
if domain_issues:
warning_md = "Warning: surrogate is extrapolating outside its training domain.\n\n" + "\n".join(
f"- {issue}" for issue in domain_issues
) + "\n\n"
result_md = (
warning_md
+ f"Thermoforming surrogate material: {cf_mat}\n\n"
f"Forming temperature (C): {best['input'][0]:.3f}\n\n"
f"Punching velocity (mm/s): {best['input'][1]:.3f}\n\n"
f"Cooling time (s): {best['input'][2]:.3f}\n\n"
f"Predicted angle A (deg): {best['output'][0]:.3f}\n\n"
f"Predicted angle B (deg): {best['output'][1]:.3f}\n\n"
f"Predicted angle C (deg): {best['output'][2]:.3f}\n\n"
f"Predicted max residual stress (MPa): {best['output'][3]:.3f}"
)
return _design_markdown(selected), result_md
def build_app():
defaults = _default_requirements(DATA_DIR)
input_components: list[gr.components.Component] = []
css = """
.curve-input-row { gap: 8px !important; flex-wrap: nowrap !important; }
.curve-card { min-width: 180px !important; }
.gradio-container .gr-number input[type="number"] {
width: 96px !important;
min-width: 96px !important;
}
"""
with gr.Blocks(css=css) as demo:
gr.Markdown("# Quarter Material Inverse Design")
gr.Markdown(_runtime_summary())
gr.Markdown(
"Requirement curves are defined first, then batched candidates are generated, "
"the closest design is selected, and thermoforming is run from that selected design."
)
gr.Markdown("## 1. Define Material Requirements")
input_mode = gr.Radio(
choices=["Define by Input Values", "Load from Test Data"],
value="Define by Input Values",
label="Requirement source",
)
with gr.Group(visible=True) as manual_panel:
gr.Markdown("### Manual Requirement Input")
with gr.Row(elem_classes=["curve-input-row"]):
for spec in REQUIREMENTS:
req_id = spec["id"]
with gr.Column(elem_classes=["curve-card"]):
gr.Markdown(f"**{spec['title']}**")
if spec["input_kind"] == "ratio2":
pct = int(round(float(default_x_for_mode(spec["mode"])[-1]) * 100))
ratio_name = str(spec["ratio_name"])
input_components.append(
gr.Number(value=float(defaults[f"{req_id}_initial"]), label=f"Initial {ratio_name}", precision=6)
)
input_components.append(
gr.Number(value=float(defaults[f"{req_id}_end_ratio"]), label=f"{pct}% strain {ratio_name}", precision=6)
)
else:
l1, l2, l3 = _curve_field_labels(spec["mode"], spec["response"])
input_components.append(gr.Number(value=float(defaults[f"{req_id}_initial"]), label=l1, precision=6))
input_components.append(gr.Number(value=float(defaults[f"{req_id}_end_slope"]), label=l2, precision=6))
input_components.append(gr.Number(value=float(defaults[f"{req_id}_end_value"]), label=l3, precision=6))
preview_btn = gr.Button("Generate Required Stress-Strain Curves")
with gr.Group(visible=False) as test_panel:
gr.Markdown("### Load Requirement Curves from Saved Test Data")
with gr.Row():
load_test_idx = gr.Number(value=DEFAULT_TEST_INDEX, precision=0, label="Load v5 test condition index")
load_btn = gr.Button("Load Test Condition", interactive=False)
n_generate = gr.Slider(
minimum=1,
maximum=MAX_N_GENERATE,
step=1,
value=DEFAULT_N_GENERATE,
label="Number of generated candidates",
)
design_btn = gr.Button("Generate Parameter Sets", interactive=False)
load_status = gr.Markdown("")
req_plot = gr.Plot(label="Required Stress-Strain Curves")
exact_test_coeffs = gr.Textbox(value="", visible=False, label="Ready condition payload")
gr.Markdown("## 2. Material Inverse Design")
design_plot = gr.Plot(label="Conditioned vs Generated Curves")
design_summary = gr.Textbox(label="Generated Designs", lines=6)
generation_alert = gr.Markdown("")
run_log = gr.Textbox(label="Run Log", lines=8, interactive=False)
gr.Markdown("## 3. Choose the Closest Generated Design")
design_choice = gr.Radio(choices=[], label="Generated designs")
selected_design_md = gr.Markdown(NO_SELECTED_DESIGN)
design_payload = gr.Textbox(value="", visible=False, label="Generated design payload")
gr.Markdown("## 4. Thermoforming Requirements")
with gr.Row():
thermo_image = gr.Image(
value=os.path.join(APP_DIR, "figures", "forming_angle.png"),
label="Thermoforming geometry",
interactive=False,
)
angle_a = gr.Number(value=0.0, label="Maximum warpage angle A (degree)", precision=3)
angle_b = gr.Number(value=0.0, label="Maximum warpage angle B (degree)", precision=3)
angle_c = gr.Number(value=0.0, label="Maximum warpage angle C (degree)", precision=3)
max_stress = gr.Number(value=0.0, label="Maximum residual stress (MPa)", precision=3)
thermo_btn = gr.Button("Thermoforming Process Design", interactive=False)
thermo_selected_md = gr.Markdown(NO_SELECTED_DESIGN)
thermo_result_md = gr.Markdown(THERMO_NOT_RUN)
input_mode.change(
fn=ui_input_mode_changed,
inputs=[input_mode],
outputs=[
manual_panel,
test_panel,
exact_test_coeffs,
load_status,
req_plot,
design_plot,
design_summary,
generation_alert,
run_log,
design_choice,
selected_design_md,
design_payload,
thermo_selected_md,
thermo_result_md,
angle_a,
angle_b,
angle_c,
max_stress,
preview_btn,
load_btn,
design_btn,
thermo_btn,
],
show_progress=False,
)
for component in input_components:
component.input(
fn=ui_manual_requirements_changed,
outputs=[
exact_test_coeffs,
load_status,
req_plot,
design_plot,
design_summary,
generation_alert,
run_log,
design_choice,
selected_design_md,
design_payload,
thermo_selected_md,
thermo_result_md,
angle_a,
angle_b,
angle_c,
max_stress,
preview_btn,
load_btn,
design_btn,
thermo_btn,
],
show_progress=False,
)
load_test_idx.input(
fn=ui_test_index_changed,
outputs=[
exact_test_coeffs,
load_status,
req_plot,
design_plot,
design_summary,
generation_alert,
run_log,
design_choice,
selected_design_md,
design_payload,
thermo_selected_md,
thermo_result_md,
angle_a,
angle_b,
angle_c,
max_stress,
preview_btn,
load_btn,
design_btn,
thermo_btn,
],
show_progress=False,
)
load_btn.click(fn=ui_disable_all_buttons, outputs=[preview_btn, load_btn, design_btn, thermo_btn], show_progress=False).then(
fn=ui_load_test_condition,
inputs=[load_test_idx],
outputs=input_components + [load_status, req_plot, exact_test_coeffs],
show_progress=False,
).then(
fn=ui_clear_design_outputs,
outputs=[
design_plot,
design_summary,
generation_alert,
run_log,
design_choice,
selected_design_md,
design_payload,
thermo_selected_md,
thermo_result_md,
angle_a,
angle_b,
angle_c,
max_stress,
],
show_progress=False,
).then(
fn=ui_after_requirement_ready,
inputs=[input_mode],
outputs=[preview_btn, load_btn, design_btn, thermo_btn],
show_progress=False,
)
preview_btn.click(
fn=ui_disable_all_buttons,
outputs=[preview_btn, load_btn, design_btn, thermo_btn],
show_progress=False,
).then(
fn=ui_generate_required_curves,
inputs=input_components,
outputs=[req_plot, exact_test_coeffs],
).then(
fn=ui_clear_design_outputs,
outputs=[
design_plot,
design_summary,
generation_alert,
run_log,
design_choice,
selected_design_md,
design_payload,
thermo_selected_md,
thermo_result_md,
angle_a,
angle_b,
angle_c,
max_stress,
],
show_progress=False,
).then(
fn=ui_after_requirement_ready,
inputs=[input_mode],
outputs=[preview_btn, load_btn, design_btn, thermo_btn],
show_progress=False,
)
design_btn.click(
fn=ui_disable_all_buttons,
outputs=[preview_btn, load_btn, design_btn, thermo_btn],
show_progress=False,
).then(
fn=ui_generation_started,
outputs=[
design_plot,
design_summary,
generation_alert,
run_log,
design_choice,
selected_design_md,
design_payload,
thermo_selected_md,
thermo_result_md,
angle_a,
angle_b,
angle_c,
max_stress,
],
show_progress=False,
).then(
fn=ui_inverse_design,
inputs=[n_generate, input_mode, load_test_idx] + input_components,
outputs=[
req_plot,
design_plot,
design_summary,
run_log,
generation_alert,
design_choice,
selected_design_md,
design_payload,
],
).then(
fn=ui_after_design_complete,
inputs=[input_mode, design_payload],
outputs=[preview_btn, load_btn, design_btn, thermo_btn],
show_progress=False,
)
design_choice.change(
fn=ui_select_design,
inputs=[design_choice, design_payload],
outputs=[selected_design_md],
show_progress=False,
)
thermo_btn.click(
fn=ui_disable_all_buttons,
outputs=[preview_btn, load_btn, design_btn, thermo_btn],
show_progress=False,
).then(
fn=ui_thermoforming,
inputs=[design_choice, design_payload, angle_a, angle_b, angle_c, max_stress],
outputs=[thermo_selected_md, thermo_result_md],
).then(
fn=ui_after_design_complete,
inputs=[input_mode, design_payload],
outputs=[preview_btn, load_btn, design_btn, thermo_btn],
show_progress=False,
)
return demo
if __name__ == "__main__":
build_app().launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860")))