| from __future__ import annotations |
|
|
| import json |
| import os |
| from functools import lru_cache |
| from typing import Any, Dict, List |
|
|
| import gradio as gr |
| import numpy as np |
| import torch |
|
|
| from model_inverse import inverse_design as thermoforming_inverse_design |
| from space_lib.cubic_curves import ( |
| EPS33_ROW, |
| LATERAL_ROW, |
| MODE_TO_ROW, |
| default_x_for_mode, |
| evaluate_cubic_no_intercept, |
| fit_cubic_no_intercept, |
| ) |
| from space_lib.infer import load_model_bundle, postprocess_sample, sample |
| from space_lib.metadata_v5 import load_metadata_v5, normalize_condition_v5 |
| from space_lib.plots_v5 import plot_condition_and_simulations_v5, plot_required_curves_from_series |
| from space_lib.simulate import simulate_instances |
|
|
| try: |
| import spaces |
| SPACES_RUNTIME = True |
| except ImportError: |
| SPACES_RUNTIME = False |
|
|
| class _SpacesCompat: |
| @staticmethod |
| def GPU(*_args, **_kwargs): |
| def decorator(fn): |
| return fn |
|
|
| return decorator |
|
|
| spaces = _SpacesCompat() |
|
|
|
|
| APP_DIR = os.path.dirname(os.path.abspath(__file__)) |
| ASSETS_DIR = os.path.join(APP_DIR, "assets") |
| MODEL_ASSETS_DIR = os.path.join(ASSETS_DIR, "model") |
| SIM_ASSETS_DIR = os.path.join(ASSETS_DIR, "quarter_sim") |
| DEFAULT_CHECKPOINT_DIR = os.environ.get( |
| "MG_CHECKPOINT_DIR", |
| MODEL_ASSETS_DIR, |
| ) |
| DEFAULT_DATA_DIR = os.environ.get( |
| "MG_DATA_DIR", |
| MODEL_ASSETS_DIR, |
| ) |
| DEFAULT_CURVE_DIR = os.environ.get( |
| "MG_CURVE_DIR", |
| os.path.join(SIM_ASSETS_DIR, "RVE_datasets"), |
| ) |
|
|
| CHECKPOINT_DIR = DEFAULT_CHECKPOINT_DIR |
| DATA_DIR = DEFAULT_DATA_DIR |
| CURVE_DIR = DEFAULT_CURVE_DIR |
| NORMALIZATION_METHOD = "zscore" |
| ANGLE_RESOLUTION = 1.0 |
| DEFAULT_N_GENERATE = 3 |
| MAX_N_GENERATE = int(os.environ.get("MG_MAX_GENERATE", "6")) |
| DEFAULT_N_GENERATE = min(int(os.environ.get("MG_DEFAULT_N_GENERATE", str(DEFAULT_N_GENERATE))), MAX_N_GENERATE) |
| GPU_DURATION_SEC = int(os.environ.get("ZEROGPU_DURATION", "45")) |
| DEFAULT_TEST_INDEX = int(os.environ.get("MG_DEFAULT_TEST_INDEX", "512")) |
| THERMO_N_RESTARTS = int(os.environ.get("MG_THERMO_RESTARTS", "6")) |
| THERMO_EPOCHS = int(os.environ.get("MG_THERMO_EPOCHS", "300")) |
| MATERIAL_TYPE_NAMES = {0: "CPP", 1: "CHDPE", 2: "GPP", 3: "GHDPE"} |
| NO_SELECTED_DESIGN = "No selected design is available." |
| THERMO_NOT_RUN = "Thermoforming has not run yet." |
|
|
|
|
| REQUIREMENTS: List[Dict[str, Any]] = [ |
| {"id": "11_sig11", "title": "Mode 11 Stress", "mode": "11", "response": "stress", "input_kind": "curve3"}, |
| {"id": "11_eps22", "title": "Mode 11 Lateral", "mode": "11", "response": "lateral", "input_kind": "ratio2", "ratio_name": "Poisson's ratio (vxy)"}, |
| {"id": "11_eps33", "title": "Mode 11 eps33", "mode": "11", "response": "eps33", "input_kind": "curve3"}, |
| {"id": "22_sig22", "title": "Mode 22 Stress", "mode": "22", "response": "stress", "input_kind": "curve3"}, |
| {"id": "22_eps11", "title": "Mode 22 Lateral", "mode": "22", "response": "lateral", "input_kind": "ratio2", "ratio_name": "Poisson's ratio (vyx)"}, |
| {"id": "22_eps33", "title": "Mode 22 eps33", "mode": "22", "response": "eps33", "input_kind": "curve3"}, |
| {"id": "12_sig12", "title": "Mode 12 Stress", "mode": "12", "response": "stress", "input_kind": "curve3"}, |
| ] |
|
|
|
|
| def _curve_field_labels(mode: str, response: str) -> tuple[str, str, str]: |
| x_end = float(default_x_for_mode(mode)[-1]) |
| pct = int(round(x_end * 100)) |
| if response == "stress": |
| return "Initial stiffness", f"{pct}% strain stiffness", f"{pct}% strain stress" |
| if response == "lateral": |
| return "Initial strain ratio", f"{pct}% strain ratio", f"{pct}% strain response" |
| return "Initial eps33 ratio", f"{pct}% strain ratio", f"{pct}% strain eps33" |
|
|
|
|
| def _coeffs_from_requirements(initial_slope: float, end_slope: float, end_value: float, x_end: float) -> np.ndarray: |
| A = np.array([[2.0 * x_end, 3.0 * x_end * x_end], [x_end * x_end, x_end * x_end * x_end]], dtype=np.float32) |
| b = np.array([end_slope - initial_slope, end_value - initial_slope * x_end], dtype=np.float32) |
| quad, cubic = np.linalg.solve(A, b) |
| return np.asarray([cubic, quad, initial_slope], dtype=np.float32) |
|
|
|
|
| def _requirements_from_coeffs(coeffs_7x3: np.ndarray) -> dict[str, float]: |
| coeffs = np.asarray(coeffs_7x3, dtype=np.float32).reshape(7, 3) |
| out: dict[str, float] = {} |
| for idx, spec in enumerate(REQUIREMENTS): |
| mode = spec["mode"] |
| x_end = float(default_x_for_mode(mode)[-1]) |
| c3, c2, c1 = [float(v) for v in coeffs[idx]] |
| if spec["input_kind"] == "ratio2": |
| out[f"{spec['id']}_initial"] = c1 |
| out[f"{spec['id']}_end_ratio"] = 3.0 * c3 * x_end * x_end + 2.0 * c2 * x_end + c1 |
| else: |
| initial = c1 |
| end_slope = 3.0 * c3 * x_end * x_end + 2.0 * c2 * x_end + c1 |
| end_value = c3 * x_end**3 + c2 * x_end**2 + c1 * x_end |
| out[f"{spec['id']}_initial"] = initial |
| out[f"{spec['id']}_end_slope"] = end_slope |
| out[f"{spec['id']}_end_value"] = end_value |
| return out |
|
|
|
|
| @lru_cache(maxsize=4) |
| def _default_requirements(data_dir: str) -> dict[str, float]: |
| arr = np.load(os.path.join(data_dir, "test_data.npz"))["polynomial_coefficients"] |
| idx = int(DEFAULT_TEST_INDEX) % int(len(arr)) |
| coeffs = np.asarray(arr[idx], dtype=np.float32).reshape(7, 3) |
| return _requirements_from_coeffs(coeffs) |
|
|
|
|
| @lru_cache(maxsize=4) |
| def _get_meta(data_dir: str): |
| return load_metadata_v5(data_dir) |
|
|
|
|
| _MODEL_CACHE: dict[tuple[str, str, float], Any] = {} |
|
|
|
|
| def _runtime_device() -> str: |
| return "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
| def _runtime_summary() -> str: |
| spaces_mode = "available" if SPACES_RUNTIME else "not installed locally; using no-op fallback" |
| return ( |
| "GPU-compatible Gradio app for the quarter-v5 model. " |
| f"Generation uses `{_runtime_device()}` when invoked; the Hugging Face `spaces` runtime is {spaces_mode}. " |
| "This supports both ZeroGPU (`@spaces.GPU`) and dedicated GPU hardware such as `Nvidia T4 small`." |
| ) |
|
|
|
|
| def _get_model(checkpoint_dir: str, device: str, angle_resolution: float): |
| key = (checkpoint_dir, device, float(angle_resolution)) |
| if key not in _MODEL_CACHE: |
| _MODEL_CACHE[key] = load_model_bundle(checkpoint_dir, device=device, angle_resolution=angle_resolution) |
| return _MODEL_CACHE[key] |
|
|
|
|
| def _angle_categories_from_meta(meta, angle_resolution: float) -> np.ndarray: |
| angle_min_raw = float(meta.raw.get("angle_min", 0.0)) |
| angle_max_raw = float(meta.raw.get("angle_max", 90.0)) |
| res = float(angle_resolution) |
| ang_min = np.floor(angle_min_raw / res) * res |
| ang_max = np.ceil(angle_max_raw / res) * res |
| num = int((ang_max - ang_min) / res) + 1 |
| return np.linspace(ang_min, ang_max, num, dtype=np.float32) |
|
|
|
|
| def _balance_partner(angle: float) -> float: |
| a = abs(float(angle)) % 180.0 |
| if np.isclose(a, 0.0): |
| return 0.0 |
| if np.isclose(a, 90.0): |
| return 90.0 |
| return -float(angle) |
|
|
|
|
| def _full_angles_from_quarter(quarter_angles: list[float]) -> list[float]: |
| ordered_quarter = [float(a) for a in sorted(quarter_angles)] |
| half_angles = ordered_quarter + [_balance_partner(a) for a in reversed(ordered_quarter)] |
| return half_angles + list(reversed(half_angles)) |
|
|
|
|
| def _mat_type_to_matrix_fiber(mat_type: str) -> tuple[str, str]: |
| mt = (mat_type or "").upper().strip() |
| fiber = "Carbon" if mt.startswith("C") else ("Glass" if mt.startswith("G") else mt[:1]) |
| matrix = "HDPE" if mt.endswith("HDPE") else ("PP" if mt.endswith("PP") else mt[1:]) |
| return matrix, fiber |
|
|
|
|
| def _score_design(cond_coeffs: np.ndarray, sim_modes: Dict[str, Dict[str, np.ndarray]]) -> float: |
| cond = np.asarray(cond_coeffs, dtype=np.float32).reshape(7, 3) |
| losses: list[float] = [] |
| for mode, data in sim_modes.items(): |
| strain = np.asarray(data["strain"], dtype=np.float32) |
| stress_pred = evaluate_cubic_no_intercept(cond[MODE_TO_ROW[mode]], strain) |
| losses.append(float(np.mean((stress_pred - np.asarray(data["stress"], dtype=np.float32)) ** 2))) |
| if mode in ("11", "22") and data.get("lateral") is not None: |
| lateral_pred = evaluate_cubic_no_intercept(cond[int(LATERAL_ROW[mode])], strain) |
| losses.append(float(np.mean((lateral_pred - np.asarray(data["lateral"], dtype=np.float32)) ** 2))) |
| if mode in ("11", "22") and data.get("eps33") is not None: |
| eps33_pred = evaluate_cubic_no_intercept(cond[int(EPS33_ROW[mode])], strain) |
| losses.append(float(np.mean((eps33_pred - np.asarray(data["eps33"], dtype=np.float32)) ** 2))) |
| return float(np.mean(losses)) if losses else float("inf") |
|
|
|
|
| def _build_required_curves_and_condition(values: List[float]) -> tuple[dict[str, np.ndarray], np.ndarray]: |
| coeff_rows = [] |
| curves: dict[str, np.ndarray] = {} |
| cursor = 0 |
| for idx, spec in enumerate(REQUIREMENTS): |
| mode = spec["mode"] |
| x = default_x_for_mode(mode) |
| x_end = float(x[-1]) |
| if spec["input_kind"] == "ratio2": |
| initial = float(values[cursor]) |
| end_ratio = float(values[cursor + 1]) |
| cursor += 2 |
| quad = (end_ratio - initial) / (2.0 * x_end) if x_end > 0 else 0.0 |
| y = initial * x + quad * x * x |
| else: |
| initial = float(values[cursor]) |
| end_slope = float(values[cursor + 1]) |
| end_value = float(values[cursor + 2]) |
| cursor += 3 |
| coeffs = _coeffs_from_requirements(initial, end_slope, end_value, x_end) |
| y = evaluate_cubic_no_intercept(coeffs, x) |
| fitted = fit_cubic_no_intercept(x, y, degree=3) |
| if fitted is None: |
| raise RuntimeError(f"Failed to fit cubic coefficients for requirement index {idx}") |
| coeff_rows.append(fitted) |
|
|
| req_id = spec["id"] |
| if req_id == "11_sig11": |
| curves["eps11"] = x |
| curves["sig11_mpa"] = y |
| elif req_id == "11_eps22": |
| curves["eps22_from_eps11"] = y |
| elif req_id == "11_eps33": |
| curves["eps33_from_eps11"] = y |
| elif req_id == "22_sig22": |
| curves["eps22"] = x |
| curves["sig22_mpa"] = y |
| elif req_id == "22_eps11": |
| curves["eps11_from_eps22"] = y |
| elif req_id == "22_eps33": |
| curves["eps33_from_eps22"] = y |
| elif req_id == "12_sig12": |
| curves["eps12"] = x |
| curves["sig12_mpa"] = y |
| return curves, np.stack(coeff_rows, axis=0).astype(np.float32) |
|
|
|
|
| def _load_condition_defaults_from_test(idx: int) -> List[float]: |
| arr = np.load(os.path.join(DATA_DIR, "test_data.npz"))["polynomial_coefficients"] |
| coeffs = np.asarray(arr[int(idx) % len(arr)], dtype=np.float32).reshape(7, 3) |
| reqs = _requirements_from_coeffs(coeffs) |
| values: List[float] = [] |
| for spec in REQUIREMENTS: |
| req_id = spec["id"] |
| if spec["input_kind"] == "ratio2": |
| values.extend([float(reqs[f"{req_id}_initial"]), float(reqs[f"{req_id}_end_ratio"])]) |
| else: |
| values.extend( |
| [ |
| float(reqs[f"{req_id}_initial"]), |
| float(reqs[f"{req_id}_end_slope"]), |
| float(reqs[f"{req_id}_end_value"]), |
| ] |
| ) |
| return values |
|
|
|
|
| def _load_exact_test_coeffs(idx: int) -> np.ndarray: |
| arr = np.load(os.path.join(DATA_DIR, "test_data.npz"))["polynomial_coefficients"] |
| coeffs = np.asarray(arr[int(idx) % len(arr)], dtype=np.float32).reshape(7, 3) |
| return coeffs |
|
|
|
|
| def _load_exact_test_payload(idx: int) -> str: |
| data = np.load(os.path.join(DATA_DIR, "test_data.npz")) |
| idx = int(idx) % int(len(data["polynomial_coefficients"])) |
| coeffs = np.asarray(data["polynomial_coefficients"][idx], dtype=np.float32).reshape(7, 3) |
| mat_id = int(data["material_type"][idx]) |
| vf = float(data["volume_fraction"][idx]) |
| seq_len = int(data["stacking_sequence_lengths"][idx]) |
| quarter_angles = [float(a) for a in data["stacking_sequence"][idx][:seq_len]] |
| return json.dumps( |
| { |
| "coeffs": coeffs.tolist(), |
| "ground_truth": { |
| "test_index": idx, |
| "mat_type": MATERIAL_TYPE_NAMES.get(mat_id, str(mat_id)), |
| "vf": vf, |
| "quarter_angles": quarter_angles, |
| "full_angles": _full_angles_from_quarter(quarter_angles), |
| }, |
| } |
| ) |
|
|
|
|
| def _condition_payload_from_coeffs(cond_coeffs: np.ndarray, source: str, ground_truth: dict[str, Any] | None = None) -> str: |
| payload: dict[str, Any] = { |
| "source": source, |
| "coeffs": np.asarray(cond_coeffs, dtype=np.float32).reshape(7, 3).tolist(), |
| } |
| if ground_truth: |
| payload["ground_truth"] = ground_truth |
| return json.dumps(payload) |
|
|
|
|
| def _curves_from_coeffs(cond_coeffs: np.ndarray) -> dict[str, np.ndarray]: |
| coeffs = np.asarray(cond_coeffs, dtype=np.float32).reshape(7, 3) |
| curves: dict[str, np.ndarray] = {} |
| mapping = [ |
| ("11_sig11", "11"), |
| ("11_eps22", "11"), |
| ("11_eps33", "11"), |
| ("22_sig22", "22"), |
| ("22_eps11", "22"), |
| ("22_eps33", "22"), |
| ("12_sig12", "12"), |
| ] |
| for idx, (req_id, mode) in enumerate(mapping): |
| x = default_x_for_mode(mode) |
| y = evaluate_cubic_no_intercept(coeffs[idx], x) |
| if req_id == "11_sig11": |
| curves["eps11"] = x |
| curves["sig11_mpa"] = y |
| elif req_id == "11_eps22": |
| curves["eps22_from_eps11"] = y |
| elif req_id == "11_eps33": |
| curves["eps33_from_eps11"] = y |
| elif req_id == "22_sig22": |
| curves["eps22"] = x |
| curves["sig22_mpa"] = y |
| elif req_id == "22_eps11": |
| curves["eps11_from_eps22"] = y |
| elif req_id == "22_eps33": |
| curves["eps33_from_eps22"] = y |
| elif req_id == "12_sig12": |
| curves["eps12"] = x |
| curves["sig12_mpa"] = y |
| return curves |
|
|
|
|
| def _serialize_designs(designs: list[Dict[str, Any]]) -> str: |
| return json.dumps(designs) |
|
|
|
|
| def _deserialize_designs(payload: str) -> list[Dict[str, Any]]: |
| if not payload: |
| return [] |
| return json.loads(payload) |
|
|
|
|
| def _format_design_choice(design: Dict[str, Any]) -> str: |
| return ( |
| f"Gen {design['design_id']} | score={design['score']:.4g} | " |
| f"{design['mat_type']} | vf={design['vf']:.4f} | q={design['quarter_angles']}" |
| ) |
|
|
|
|
| def _format_ground_truth_summary(gt: Dict[str, Any]) -> str: |
| return ( |
| f"Gen GT: {gt['mat_type']}, vf={float(gt['vf']):.4f}, " |
| f"q={gt['quarter_angles']}, score=ground truth" |
| ) |
|
|
|
|
| def _design_markdown(design: Dict[str, Any]) -> str: |
| quarter = ", ".join(f"{a:.1f}" for a in design["quarter_angles"]) |
| full = ", ".join(f"{a:.1f}" for a in design["full_angles"]) |
| return ( |
| f"Selected design: Gen {design['design_id']}\n\n" |
| f"Material: {design['mat_type']} ({design['matrix']} matrix, {design['fiber']} fiber)\n\n" |
| f"VF: {design['vf']:.4f}\n\n" |
| f"Quarter angles: {quarter}\n\n" |
| f"Full stack used by simulation: {full}\n\n" |
| f"Similarity score: {design['score']:.6g}" |
| ) |
|
|
|
|
| @spaces.GPU(duration=GPU_DURATION_SEC) |
| def _run_generation_model(cond_coeffs: np.ndarray, n_generate: int) -> list[dict[str, Any]]: |
| meta = _get_meta(DATA_DIR) |
| device = _runtime_device() |
| bundle = _get_model(CHECKPOINT_DIR, device=device, angle_resolution=ANGLE_RESOLUTION) |
| cond_norm = normalize_condition_v5(cond_coeffs, meta, NORMALIZATION_METHOD) |
| n_generate = int(max(1, min(int(n_generate), MAX_N_GENERATE))) |
| cond_t = torch.tensor(cond_norm, dtype=torch.float32, device=device).view(1, -1).repeat(n_generate, 1) |
| angle_categories = _angle_categories_from_meta(meta, ANGLE_RESOLUTION) if bundle.use_discrete_angles else None |
| out = sample( |
| model=bundle.model, |
| disc_diff_mat=bundle.disc_diff_mat, |
| disc_diff_vf_category=bundle.disc_diff_vf_category, |
| disc_diff_layer=bundle.disc_diff_layer, |
| disc_diff_angle=bundle.disc_diff_angle, |
| cont_diff=bundle.cont_diff, |
| cond=cond_t, |
| mask_ids=bundle.mask_ids, |
| device=device, |
| remask_prob=0.1, |
| use_discrete_angles=bundle.use_discrete_angles, |
| ) |
|
|
| generated: list[dict[str, Any]] = [] |
| for i in range(n_generate): |
| sample_i = { |
| "material_t": out["material_t"][i : i + 1], |
| "vf_category_t": out["vf_category_t"][i : i + 1], |
| "layer_t": out["layer_t"][i : i + 1], |
| "angle_t": out["angle_t"][i : i + 1], |
| } |
| mat_type, vf, quarter_angles = postprocess_sample( |
| sample_i, |
| use_discrete_angles=bundle.use_discrete_angles, |
| angle_categories_deg=angle_categories, |
| ) |
| generated.append( |
| { |
| "design_id": i + 1, |
| "mat_type": mat_type, |
| "vf": float(vf), |
| "quarter_angles": [float(a) for a in quarter_angles], |
| } |
| ) |
| return generated |
|
|
|
|
| def ui_generate_required_curves(*values: float): |
| curves, cond_coeffs = _build_required_curves_and_condition(list(values)) |
| return plot_required_curves_from_series(curves), _condition_payload_from_coeffs(cond_coeffs, source="manual") |
|
|
|
|
| def ui_set_buttons(preview: bool, load: bool, design: bool, thermo: bool): |
| return ( |
| gr.update(interactive=preview), |
| gr.update(interactive=load), |
| gr.update(interactive=design), |
| gr.update(interactive=thermo), |
| ) |
|
|
|
|
| def ui_disable_all_buttons(): |
| return ui_set_buttons(False, False, False, False) |
|
|
|
|
| def ui_clear_design_outputs(): |
| return ( |
| None, |
| "", |
| "", |
| "", |
| gr.update(choices=[], value=None), |
| NO_SELECTED_DESIGN, |
| "", |
| NO_SELECTED_DESIGN, |
| THERMO_NOT_RUN, |
| gr.update(value=0.0), |
| gr.update(value=0.0), |
| gr.update(value=0.0), |
| gr.update(value=0.0), |
| ) |
|
|
|
|
| def ui_generation_started(): |
| message = ( |
| "### Requesting ZeroGPU for candidate generation\n\n" |
| "The Space is requesting a temporary CUDA device from Hugging Face ZeroGPU. " |
| "If generation stops here or a backend error appears, ZeroGPU allocation likely failed " |
| "before CUDA became available, or the account/organization quota is exhausted. " |
| "Retry after quota is restored or switch the Space hardware to `Nvidia T4 small`." |
| ) |
| return ( |
| None, |
| "Requesting ZeroGPU allocation...", |
| message, |
| "Requesting ZeroGPU allocation for candidate generation.", |
| gr.update(choices=[], value=None), |
| NO_SELECTED_DESIGN, |
| "", |
| NO_SELECTED_DESIGN, |
| THERMO_NOT_RUN, |
| gr.update(value=0.0), |
| gr.update(value=0.0), |
| gr.update(value=0.0), |
| gr.update(value=0.0), |
| ) |
|
|
|
|
| def ui_manual_requirements_changed(): |
| return ( |
| "", |
| "Requirements changed. Generate curves before inverse design.", |
| None, |
| *ui_clear_design_outputs(), |
| *ui_set_buttons(True, False, False, False), |
| ) |
|
|
|
|
| def ui_test_index_changed(): |
| return ( |
| "", |
| "Test index changed. Load the test condition before inverse design.", |
| None, |
| *ui_clear_design_outputs(), |
| *ui_set_buttons(False, True, False, False), |
| ) |
|
|
|
|
| def ui_input_mode_changed(mode: str): |
| manual = mode == "Define by Input Values" |
| status = "" |
| if manual: |
| status = "Manual requirement mode is active. Generate curves before inverse design." |
| else: |
| status = "Load-from-test mode is active. Load a test condition before inverse design." |
| return ( |
| gr.update(visible=manual), |
| gr.update(visible=not manual), |
| "", |
| status, |
| None, |
| *ui_clear_design_outputs(), |
| *ui_set_buttons(manual, not manual, False, False), |
| ) |
|
|
|
|
| def ui_load_test_condition(idx: int): |
| vals = _load_condition_defaults_from_test(int(idx)) |
| coeffs = _load_exact_test_coeffs(int(idx)) |
| curves = _curves_from_coeffs(coeffs) |
| payload = _load_exact_test_payload(int(idx)) |
| return (*vals, f"Loaded v5 test condition `{int(idx)}`.", plot_required_curves_from_series(curves), payload) |
|
|
|
|
| def ui_after_requirement_ready(mode: str): |
| manual = mode == "Define by Input Values" |
| return ui_set_buttons(manual, not manual, True, False) |
|
|
|
|
| def ui_inverse_design(n_generate: float, mode: str, load_idx: float, *values: float): |
| gt_summary = "" |
| if mode == "Load from Test Data": |
| payload = json.loads(_load_exact_test_payload(int(load_idx))) |
| cond_coeffs = np.asarray(payload["coeffs"], dtype=np.float32).reshape(7, 3) |
| curves = _curves_from_coeffs(cond_coeffs) |
| gt = payload.get("ground_truth", {}) |
| if gt: |
| gt_summary = _format_ground_truth_summary(gt) |
| else: |
| curves, cond_coeffs = _build_required_curves_and_condition(list(values)) |
| req_plot = plot_required_curves_from_series(curves) |
| n_generate_int = int(max(1, min(int(n_generate), MAX_N_GENERATE))) |
| try: |
| generated = _run_generation_model(cond_coeffs, n_generate_int) |
| except Exception as exc: |
| exc_text = str(exc) |
| if "No CUDA GPUs are available" in exc_text or exc_text.strip("'\"") == "RuntimeError": |
| title = "ZeroGPU allocation failed or returned a generic backend error." |
| elif "ZeroGPU quota" in exc_text or "quota" in exc_text.lower(): |
| title = "ZeroGPU quota is exhausted or unavailable." |
| else: |
| title = "Generation failed before candidate simulation." |
| message = ( |
| f"{title}\n\n" |
| f"Error: {type(exc).__name__}: {exc_text}\n\n" |
| "Hugging Face's native ZeroGPU waiting popup only appears while a GPU allocation is actively queued. " |
| "If the ZeroGPU backend immediately fails, times out, or returns a generic `RuntimeError`, the app may " |
| "only receive this exception after waiting. Retry after quota is restored, authenticate for more quota, " |
| "or switch the Space hardware to `Nvidia T4 small`." |
| ) |
| return ( |
| req_plot, |
| None, |
| message, |
| f"### {title}\n\n{message}", |
| message, |
| gr.update(choices=[], value=None), |
| title, |
| "", |
| ) |
|
|
| simulations_by_design: Dict[int, Dict[str, Dict[str, np.ndarray]]] = {} |
| ranked: list[Dict[str, Any]] = [] |
| summaries: list[str] = [] |
| logs: list[str] = [] |
| for design in generated: |
| mat_type = design["mat_type"] |
| vf = float(design["vf"]) |
| quarter_angles = [float(a) for a in design["quarter_angles"]] |
| try: |
| sim = simulate_instances( |
| curve_dir=CURVE_DIR, |
| mat_type=mat_type, |
| vf=vf, |
| quarter_angles=quarter_angles, |
| instances=[int(design["design_id"])], |
| num_output_points=20, |
| ) |
| simulations_by_design.update(sim) |
| score = _score_design(cond_coeffs, sim.get(int(design["design_id"]), {})) |
| matrix, fiber = _mat_type_to_matrix_fiber(mat_type) |
| ranked.append( |
| { |
| **design, |
| "score": float(score), |
| "matrix": matrix, |
| "fiber": fiber, |
| "full_angles": _full_angles_from_quarter(quarter_angles), |
| } |
| ) |
| summaries.append( |
| f"Gen {design['design_id']}: {mat_type}, vf={vf:.4f}, q={quarter_angles}, score={score:.6g}" |
| ) |
| logs.append(f"Simulation complete for generated design {design['design_id']}.") |
| except Exception as exc: |
| logs.append(f"Simulation failed for generated design {design['design_id']}: {type(exc).__name__}: {exc}") |
|
|
| ranked = sorted(ranked, key=lambda d: d["score"]) |
| fig = plot_condition_and_simulations_v5(cond_coeffs, simulations_by_design) |
| if ranked: |
| selected = ranked[0] |
| choices = [_format_design_choice(d) for d in ranked] |
| payload = _serialize_designs(ranked) |
| return ( |
| req_plot, |
| fig, |
| ((gt_summary + "\n") if gt_summary else "") + "\n".join(summaries), |
| "\n".join(logs) if logs else "Done.", |
| "", |
| gr.update(choices=choices, value=choices[0]), |
| _design_markdown(selected), |
| payload, |
| ) |
|
|
| return ( |
| req_plot, |
| fig, |
| "No valid generated designs were scored.", |
| "\n".join(logs) if logs else "Generation failed.", |
| "### No valid generated designs were scored.\n\nSee the run log for simulation details.", |
| gr.update(choices=[], value=None), |
| "No selected design is available.", |
| "", |
| ) |
|
|
|
|
| def ui_after_design_complete(mode: str, payload: str): |
| manual = mode == "Define by Input Values" |
| thermo_ready = bool(payload) |
| return ui_set_buttons(manual, not manual, True, thermo_ready) |
|
|
|
|
| def ui_select_design(selected_label: str, payload: str): |
| designs = _deserialize_designs(payload) |
| if not designs: |
| return "No selected design is available." |
| for design in designs: |
| if _format_design_choice(design) == selected_label: |
| return _design_markdown(design) |
| return _design_markdown(designs[0]) |
|
|
|
|
| def ui_thermoforming(selected_label: str, payload: str, angle_a: float, angle_b: float, angle_c: float, max_stress: float): |
| designs = _deserialize_designs(payload) |
| if not designs: |
| return "No selected design is available.", "Thermoforming cannot run before inverse design." |
|
|
| selected = designs[0] |
| for design in designs: |
| if _format_design_choice(design) == selected_label: |
| selected = design |
| break |
|
|
| domain_issues: list[str] = [] |
| if selected["fiber"] != "Carbon": |
| domain_issues.append( |
| f"unsupported fiber `{selected['fiber']}`; thermoforming surrogate was trained only on carbon-fiber systems" |
| ) |
| if not (0.4 <= float(selected["vf"]) <= 0.6): |
| domain_issues.append( |
| f"volume fraction {float(selected['vf']):.4f} is outside the surrogate training range [0.4, 0.6]" |
| ) |
| n_layers = int(len(selected["full_angles"])) |
| if not (2 <= n_layers <= 8): |
| domain_issues.append( |
| f"ply count {n_layers} is outside the surrogate training range [2, 8]" |
| ) |
|
|
| cf_mat = "CF/PEEK" if selected["matrix"] == "HDPE" else "CF/PA6" |
| best = thermoforming_inverse_design( |
| material=cf_mat, |
| ply_number=n_layers, |
| fiber_vf=float(selected["vf"]), |
| y_target=[float(angle_a), float(angle_b), float(angle_c), float(max_stress)], |
| n_restarts=THERMO_N_RESTARTS, |
| epochs=THERMO_EPOCHS, |
| ) |
| warning_md = "" |
| if domain_issues: |
| warning_md = "Warning: surrogate is extrapolating outside its training domain.\n\n" + "\n".join( |
| f"- {issue}" for issue in domain_issues |
| ) + "\n\n" |
|
|
| result_md = ( |
| warning_md |
| + f"Thermoforming surrogate material: {cf_mat}\n\n" |
| f"Forming temperature (C): {best['input'][0]:.3f}\n\n" |
| f"Punching velocity (mm/s): {best['input'][1]:.3f}\n\n" |
| f"Cooling time (s): {best['input'][2]:.3f}\n\n" |
| f"Predicted angle A (deg): {best['output'][0]:.3f}\n\n" |
| f"Predicted angle B (deg): {best['output'][1]:.3f}\n\n" |
| f"Predicted angle C (deg): {best['output'][2]:.3f}\n\n" |
| f"Predicted max residual stress (MPa): {best['output'][3]:.3f}" |
| ) |
| return _design_markdown(selected), result_md |
|
|
|
|
| def build_app(): |
| defaults = _default_requirements(DATA_DIR) |
| input_components: list[gr.components.Component] = [] |
|
|
| css = """ |
| .curve-input-row { gap: 8px !important; flex-wrap: nowrap !important; } |
| .curve-card { min-width: 180px !important; } |
| .gradio-container .gr-number input[type="number"] { |
| width: 96px !important; |
| min-width: 96px !important; |
| } |
| """ |
|
|
| with gr.Blocks(css=css) as demo: |
| gr.Markdown("# Quarter Material Inverse Design") |
| gr.Markdown(_runtime_summary()) |
| gr.Markdown( |
| "Requirement curves are defined first, then batched candidates are generated, " |
| "the closest design is selected, and thermoforming is run from that selected design." |
| ) |
|
|
| gr.Markdown("## 1. Define Material Requirements") |
| input_mode = gr.Radio( |
| choices=["Define by Input Values", "Load from Test Data"], |
| value="Define by Input Values", |
| label="Requirement source", |
| ) |
|
|
| with gr.Group(visible=True) as manual_panel: |
| gr.Markdown("### Manual Requirement Input") |
| with gr.Row(elem_classes=["curve-input-row"]): |
| for spec in REQUIREMENTS: |
| req_id = spec["id"] |
| with gr.Column(elem_classes=["curve-card"]): |
| gr.Markdown(f"**{spec['title']}**") |
| if spec["input_kind"] == "ratio2": |
| pct = int(round(float(default_x_for_mode(spec["mode"])[-1]) * 100)) |
| ratio_name = str(spec["ratio_name"]) |
| input_components.append( |
| gr.Number(value=float(defaults[f"{req_id}_initial"]), label=f"Initial {ratio_name}", precision=6) |
| ) |
| input_components.append( |
| gr.Number(value=float(defaults[f"{req_id}_end_ratio"]), label=f"{pct}% strain {ratio_name}", precision=6) |
| ) |
| else: |
| l1, l2, l3 = _curve_field_labels(spec["mode"], spec["response"]) |
| input_components.append(gr.Number(value=float(defaults[f"{req_id}_initial"]), label=l1, precision=6)) |
| input_components.append(gr.Number(value=float(defaults[f"{req_id}_end_slope"]), label=l2, precision=6)) |
| input_components.append(gr.Number(value=float(defaults[f"{req_id}_end_value"]), label=l3, precision=6)) |
| preview_btn = gr.Button("Generate Required Stress-Strain Curves") |
|
|
| with gr.Group(visible=False) as test_panel: |
| gr.Markdown("### Load Requirement Curves from Saved Test Data") |
| with gr.Row(): |
| load_test_idx = gr.Number(value=DEFAULT_TEST_INDEX, precision=0, label="Load v5 test condition index") |
| load_btn = gr.Button("Load Test Condition", interactive=False) |
|
|
| n_generate = gr.Slider( |
| minimum=1, |
| maximum=MAX_N_GENERATE, |
| step=1, |
| value=DEFAULT_N_GENERATE, |
| label="Number of generated candidates", |
| ) |
| design_btn = gr.Button("Generate Parameter Sets", interactive=False) |
|
|
| load_status = gr.Markdown("") |
| req_plot = gr.Plot(label="Required Stress-Strain Curves") |
| exact_test_coeffs = gr.Textbox(value="", visible=False, label="Ready condition payload") |
|
|
| gr.Markdown("## 2. Material Inverse Design") |
| design_plot = gr.Plot(label="Conditioned vs Generated Curves") |
| design_summary = gr.Textbox(label="Generated Designs", lines=6) |
| generation_alert = gr.Markdown("") |
| run_log = gr.Textbox(label="Run Log", lines=8, interactive=False) |
|
|
| gr.Markdown("## 3. Choose the Closest Generated Design") |
| design_choice = gr.Radio(choices=[], label="Generated designs") |
| selected_design_md = gr.Markdown(NO_SELECTED_DESIGN) |
| design_payload = gr.Textbox(value="", visible=False, label="Generated design payload") |
|
|
| gr.Markdown("## 4. Thermoforming Requirements") |
| with gr.Row(): |
| thermo_image = gr.Image( |
| value=os.path.join(APP_DIR, "figures", "forming_angle.png"), |
| label="Thermoforming geometry", |
| interactive=False, |
| ) |
| angle_a = gr.Number(value=0.0, label="Maximum warpage angle A (degree)", precision=3) |
| angle_b = gr.Number(value=0.0, label="Maximum warpage angle B (degree)", precision=3) |
| angle_c = gr.Number(value=0.0, label="Maximum warpage angle C (degree)", precision=3) |
| max_stress = gr.Number(value=0.0, label="Maximum residual stress (MPa)", precision=3) |
| thermo_btn = gr.Button("Thermoforming Process Design", interactive=False) |
| thermo_selected_md = gr.Markdown(NO_SELECTED_DESIGN) |
| thermo_result_md = gr.Markdown(THERMO_NOT_RUN) |
|
|
| input_mode.change( |
| fn=ui_input_mode_changed, |
| inputs=[input_mode], |
| outputs=[ |
| manual_panel, |
| test_panel, |
| exact_test_coeffs, |
| load_status, |
| req_plot, |
| design_plot, |
| design_summary, |
| generation_alert, |
| run_log, |
| design_choice, |
| selected_design_md, |
| design_payload, |
| thermo_selected_md, |
| thermo_result_md, |
| angle_a, |
| angle_b, |
| angle_c, |
| max_stress, |
| preview_btn, |
| load_btn, |
| design_btn, |
| thermo_btn, |
| ], |
| show_progress=False, |
| ) |
| for component in input_components: |
| component.input( |
| fn=ui_manual_requirements_changed, |
| outputs=[ |
| exact_test_coeffs, |
| load_status, |
| req_plot, |
| design_plot, |
| design_summary, |
| generation_alert, |
| run_log, |
| design_choice, |
| selected_design_md, |
| design_payload, |
| thermo_selected_md, |
| thermo_result_md, |
| angle_a, |
| angle_b, |
| angle_c, |
| max_stress, |
| preview_btn, |
| load_btn, |
| design_btn, |
| thermo_btn, |
| ], |
| show_progress=False, |
| ) |
| load_test_idx.input( |
| fn=ui_test_index_changed, |
| outputs=[ |
| exact_test_coeffs, |
| load_status, |
| req_plot, |
| design_plot, |
| design_summary, |
| generation_alert, |
| run_log, |
| design_choice, |
| selected_design_md, |
| design_payload, |
| thermo_selected_md, |
| thermo_result_md, |
| angle_a, |
| angle_b, |
| angle_c, |
| max_stress, |
| preview_btn, |
| load_btn, |
| design_btn, |
| thermo_btn, |
| ], |
| show_progress=False, |
| ) |
| load_btn.click(fn=ui_disable_all_buttons, outputs=[preview_btn, load_btn, design_btn, thermo_btn], show_progress=False).then( |
| fn=ui_load_test_condition, |
| inputs=[load_test_idx], |
| outputs=input_components + [load_status, req_plot, exact_test_coeffs], |
| show_progress=False, |
| ).then( |
| fn=ui_clear_design_outputs, |
| outputs=[ |
| design_plot, |
| design_summary, |
| generation_alert, |
| run_log, |
| design_choice, |
| selected_design_md, |
| design_payload, |
| thermo_selected_md, |
| thermo_result_md, |
| angle_a, |
| angle_b, |
| angle_c, |
| max_stress, |
| ], |
| show_progress=False, |
| ).then( |
| fn=ui_after_requirement_ready, |
| inputs=[input_mode], |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], |
| show_progress=False, |
| ) |
| preview_btn.click( |
| fn=ui_disable_all_buttons, |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], |
| show_progress=False, |
| ).then( |
| fn=ui_generate_required_curves, |
| inputs=input_components, |
| outputs=[req_plot, exact_test_coeffs], |
| ).then( |
| fn=ui_clear_design_outputs, |
| outputs=[ |
| design_plot, |
| design_summary, |
| generation_alert, |
| run_log, |
| design_choice, |
| selected_design_md, |
| design_payload, |
| thermo_selected_md, |
| thermo_result_md, |
| angle_a, |
| angle_b, |
| angle_c, |
| max_stress, |
| ], |
| show_progress=False, |
| ).then( |
| fn=ui_after_requirement_ready, |
| inputs=[input_mode], |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], |
| show_progress=False, |
| ) |
| design_btn.click( |
| fn=ui_disable_all_buttons, |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], |
| show_progress=False, |
| ).then( |
| fn=ui_generation_started, |
| outputs=[ |
| design_plot, |
| design_summary, |
| generation_alert, |
| run_log, |
| design_choice, |
| selected_design_md, |
| design_payload, |
| thermo_selected_md, |
| thermo_result_md, |
| angle_a, |
| angle_b, |
| angle_c, |
| max_stress, |
| ], |
| show_progress=False, |
| ).then( |
| fn=ui_inverse_design, |
| inputs=[n_generate, input_mode, load_test_idx] + input_components, |
| outputs=[ |
| req_plot, |
| design_plot, |
| design_summary, |
| run_log, |
| generation_alert, |
| design_choice, |
| selected_design_md, |
| design_payload, |
| ], |
| ).then( |
| fn=ui_after_design_complete, |
| inputs=[input_mode, design_payload], |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], |
| show_progress=False, |
| ) |
| design_choice.change( |
| fn=ui_select_design, |
| inputs=[design_choice, design_payload], |
| outputs=[selected_design_md], |
| show_progress=False, |
| ) |
| thermo_btn.click( |
| fn=ui_disable_all_buttons, |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], |
| show_progress=False, |
| ).then( |
| fn=ui_thermoforming, |
| inputs=[design_choice, design_payload, angle_a, angle_b, angle_c, max_stress], |
| outputs=[thermo_selected_md, thermo_result_md], |
| ).then( |
| fn=ui_after_design_complete, |
| inputs=[input_mode, design_payload], |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], |
| show_progress=False, |
| ) |
|
|
| return demo |
|
|
|
|
| if __name__ == "__main__": |
| build_app().launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860"))) |
|
|