Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import json | |
| import os | |
| from functools import lru_cache | |
| from typing import Any, Dict, List | |
| import gradio as gr | |
| import numpy as np | |
| import torch | |
| from model_inverse import inverse_design as thermoforming_inverse_design | |
| from space_lib.cubic_curves import ( | |
| EPS33_ROW, | |
| LATERAL_ROW, | |
| MODE_TO_ROW, | |
| default_x_for_mode, | |
| evaluate_cubic_no_intercept, | |
| fit_cubic_no_intercept, | |
| ) | |
| from space_lib.infer import load_model_bundle, postprocess_sample, sample | |
| from space_lib.metadata_v5 import load_metadata_v5, normalize_condition_v5 | |
| from space_lib.plots_v5 import plot_condition_and_simulations_v5, plot_required_curves_from_series | |
| from space_lib.simulate import simulate_instances | |
| try: | |
| import spaces # type: ignore | |
| SPACES_RUNTIME = True | |
| except ImportError: # local / non-ZeroGPU fallback | |
| SPACES_RUNTIME = False | |
| class _SpacesCompat: | |
| def GPU(*_args, **_kwargs): | |
| def decorator(fn): | |
| return fn | |
| return decorator | |
| spaces = _SpacesCompat() # type: ignore | |
| APP_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| ASSETS_DIR = os.path.join(APP_DIR, "assets") | |
| MODEL_ASSETS_DIR = os.path.join(ASSETS_DIR, "model") | |
| SIM_ASSETS_DIR = os.path.join(ASSETS_DIR, "quarter_sim") | |
| DEFAULT_CHECKPOINT_DIR = os.environ.get( | |
| "MG_CHECKPOINT_DIR", | |
| MODEL_ASSETS_DIR, | |
| ) | |
| DEFAULT_DATA_DIR = os.environ.get( | |
| "MG_DATA_DIR", | |
| MODEL_ASSETS_DIR, | |
| ) | |
| DEFAULT_CURVE_DIR = os.environ.get( | |
| "MG_CURVE_DIR", | |
| os.path.join(SIM_ASSETS_DIR, "RVE_datasets"), | |
| ) | |
| CHECKPOINT_DIR = DEFAULT_CHECKPOINT_DIR | |
| DATA_DIR = DEFAULT_DATA_DIR | |
| CURVE_DIR = DEFAULT_CURVE_DIR | |
| NORMALIZATION_METHOD = "zscore" | |
| ANGLE_RESOLUTION = 1.0 | |
| DEFAULT_N_GENERATE = 3 | |
| MAX_N_GENERATE = int(os.environ.get("MG_MAX_GENERATE", "6")) | |
| DEFAULT_N_GENERATE = min(int(os.environ.get("MG_DEFAULT_N_GENERATE", str(DEFAULT_N_GENERATE))), MAX_N_GENERATE) | |
| GPU_DURATION_SEC = int(os.environ.get("ZEROGPU_DURATION", "45")) | |
| DEFAULT_TEST_INDEX = int(os.environ.get("MG_DEFAULT_TEST_INDEX", "512")) | |
| THERMO_N_RESTARTS = int(os.environ.get("MG_THERMO_RESTARTS", "6")) | |
| THERMO_EPOCHS = int(os.environ.get("MG_THERMO_EPOCHS", "300")) | |
| MATERIAL_TYPE_NAMES = {0: "CPP", 1: "CHDPE", 2: "GPP", 3: "GHDPE"} | |
| NO_SELECTED_DESIGN = "No selected design is available." | |
| THERMO_NOT_RUN = "Thermoforming has not run yet." | |
| REQUIREMENTS: List[Dict[str, Any]] = [ | |
| {"id": "11_sig11", "title": "Mode 11 Stress", "mode": "11", "response": "stress", "input_kind": "curve3"}, | |
| {"id": "11_eps22", "title": "Mode 11 Lateral", "mode": "11", "response": "lateral", "input_kind": "ratio2", "ratio_name": "Poisson's ratio (vxy)"}, | |
| {"id": "11_eps33", "title": "Mode 11 eps33", "mode": "11", "response": "eps33", "input_kind": "curve3"}, | |
| {"id": "22_sig22", "title": "Mode 22 Stress", "mode": "22", "response": "stress", "input_kind": "curve3"}, | |
| {"id": "22_eps11", "title": "Mode 22 Lateral", "mode": "22", "response": "lateral", "input_kind": "ratio2", "ratio_name": "Poisson's ratio (vyx)"}, | |
| {"id": "22_eps33", "title": "Mode 22 eps33", "mode": "22", "response": "eps33", "input_kind": "curve3"}, | |
| {"id": "12_sig12", "title": "Mode 12 Stress", "mode": "12", "response": "stress", "input_kind": "curve3"}, | |
| ] | |
| def _curve_field_labels(mode: str, response: str) -> tuple[str, str, str]: | |
| x_end = float(default_x_for_mode(mode)[-1]) | |
| pct = int(round(x_end * 100)) | |
| if response == "stress": | |
| return "Initial stiffness", f"{pct}% strain stiffness", f"{pct}% strain stress" | |
| if response == "lateral": | |
| return "Initial strain ratio", f"{pct}% strain ratio", f"{pct}% strain response" | |
| return "Initial eps33 ratio", f"{pct}% strain ratio", f"{pct}% strain eps33" | |
| def _coeffs_from_requirements(initial_slope: float, end_slope: float, end_value: float, x_end: float) -> np.ndarray: | |
| A = np.array([[2.0 * x_end, 3.0 * x_end * x_end], [x_end * x_end, x_end * x_end * x_end]], dtype=np.float32) | |
| b = np.array([end_slope - initial_slope, end_value - initial_slope * x_end], dtype=np.float32) | |
| quad, cubic = np.linalg.solve(A, b) | |
| return np.asarray([cubic, quad, initial_slope], dtype=np.float32) | |
| def _requirements_from_coeffs(coeffs_7x3: np.ndarray) -> dict[str, float]: | |
| coeffs = np.asarray(coeffs_7x3, dtype=np.float32).reshape(7, 3) | |
| out: dict[str, float] = {} | |
| for idx, spec in enumerate(REQUIREMENTS): | |
| mode = spec["mode"] | |
| x_end = float(default_x_for_mode(mode)[-1]) | |
| c3, c2, c1 = [float(v) for v in coeffs[idx]] | |
| if spec["input_kind"] == "ratio2": | |
| out[f"{spec['id']}_initial"] = c1 | |
| out[f"{spec['id']}_end_ratio"] = 3.0 * c3 * x_end * x_end + 2.0 * c2 * x_end + c1 | |
| else: | |
| initial = c1 | |
| end_slope = 3.0 * c3 * x_end * x_end + 2.0 * c2 * x_end + c1 | |
| end_value = c3 * x_end**3 + c2 * x_end**2 + c1 * x_end | |
| out[f"{spec['id']}_initial"] = initial | |
| out[f"{spec['id']}_end_slope"] = end_slope | |
| out[f"{spec['id']}_end_value"] = end_value | |
| return out | |
| def _default_requirements(data_dir: str) -> dict[str, float]: | |
| arr = np.load(os.path.join(data_dir, "test_data.npz"))["polynomial_coefficients"] | |
| idx = int(DEFAULT_TEST_INDEX) % int(len(arr)) | |
| coeffs = np.asarray(arr[idx], dtype=np.float32).reshape(7, 3) | |
| return _requirements_from_coeffs(coeffs) | |
| def _get_meta(data_dir: str): | |
| return load_metadata_v5(data_dir) | |
| _MODEL_CACHE: dict[tuple[str, str, float], Any] = {} | |
| def _runtime_device() -> str: | |
| return "cuda" if torch.cuda.is_available() else "cpu" | |
| def _runtime_summary() -> str: | |
| spaces_mode = "available" if SPACES_RUNTIME else "not installed locally; using no-op fallback" | |
| return ( | |
| "GPU-compatible Gradio app for the quarter-v5 model. " | |
| f"Generation uses `{_runtime_device()}` when invoked; the Hugging Face `spaces` runtime is {spaces_mode}. " | |
| "This supports both ZeroGPU (`@spaces.GPU`) and dedicated GPU hardware such as `Nvidia T4 small`." | |
| ) | |
| def _get_model(checkpoint_dir: str, device: str, angle_resolution: float): | |
| key = (checkpoint_dir, device, float(angle_resolution)) | |
| if key not in _MODEL_CACHE: | |
| _MODEL_CACHE[key] = load_model_bundle(checkpoint_dir, device=device, angle_resolution=angle_resolution) | |
| return _MODEL_CACHE[key] | |
| def _angle_categories_from_meta(meta, angle_resolution: float) -> np.ndarray: | |
| angle_min_raw = float(meta.raw.get("angle_min", 0.0)) | |
| angle_max_raw = float(meta.raw.get("angle_max", 90.0)) | |
| res = float(angle_resolution) | |
| ang_min = np.floor(angle_min_raw / res) * res | |
| ang_max = np.ceil(angle_max_raw / res) * res | |
| num = int((ang_max - ang_min) / res) + 1 | |
| return np.linspace(ang_min, ang_max, num, dtype=np.float32) | |
| def _balance_partner(angle: float) -> float: | |
| a = abs(float(angle)) % 180.0 | |
| if np.isclose(a, 0.0): | |
| return 0.0 | |
| if np.isclose(a, 90.0): | |
| return 90.0 | |
| return -float(angle) | |
| def _full_angles_from_quarter(quarter_angles: list[float]) -> list[float]: | |
| ordered_quarter = [float(a) for a in sorted(quarter_angles)] | |
| half_angles = ordered_quarter + [_balance_partner(a) for a in reversed(ordered_quarter)] | |
| return half_angles + list(reversed(half_angles)) | |
| def _mat_type_to_matrix_fiber(mat_type: str) -> tuple[str, str]: | |
| mt = (mat_type or "").upper().strip() | |
| fiber = "Carbon" if mt.startswith("C") else ("Glass" if mt.startswith("G") else mt[:1]) | |
| matrix = "HDPE" if mt.endswith("HDPE") else ("PP" if mt.endswith("PP") else mt[1:]) | |
| return matrix, fiber | |
| def _score_design(cond_coeffs: np.ndarray, sim_modes: Dict[str, Dict[str, np.ndarray]]) -> float: | |
| cond = np.asarray(cond_coeffs, dtype=np.float32).reshape(7, 3) | |
| losses: list[float] = [] | |
| for mode, data in sim_modes.items(): | |
| strain = np.asarray(data["strain"], dtype=np.float32) | |
| stress_pred = evaluate_cubic_no_intercept(cond[MODE_TO_ROW[mode]], strain) | |
| losses.append(float(np.mean((stress_pred - np.asarray(data["stress"], dtype=np.float32)) ** 2))) | |
| if mode in ("11", "22") and data.get("lateral") is not None: | |
| lateral_pred = evaluate_cubic_no_intercept(cond[int(LATERAL_ROW[mode])], strain) | |
| losses.append(float(np.mean((lateral_pred - np.asarray(data["lateral"], dtype=np.float32)) ** 2))) | |
| if mode in ("11", "22") and data.get("eps33") is not None: | |
| eps33_pred = evaluate_cubic_no_intercept(cond[int(EPS33_ROW[mode])], strain) | |
| losses.append(float(np.mean((eps33_pred - np.asarray(data["eps33"], dtype=np.float32)) ** 2))) | |
| return float(np.mean(losses)) if losses else float("inf") | |
| def _build_required_curves_and_condition(values: List[float]) -> tuple[dict[str, np.ndarray], np.ndarray]: | |
| coeff_rows = [] | |
| curves: dict[str, np.ndarray] = {} | |
| cursor = 0 | |
| for idx, spec in enumerate(REQUIREMENTS): | |
| mode = spec["mode"] | |
| x = default_x_for_mode(mode) | |
| x_end = float(x[-1]) | |
| if spec["input_kind"] == "ratio2": | |
| initial = float(values[cursor]) | |
| end_ratio = float(values[cursor + 1]) | |
| cursor += 2 | |
| quad = (end_ratio - initial) / (2.0 * x_end) if x_end > 0 else 0.0 | |
| y = initial * x + quad * x * x | |
| else: | |
| initial = float(values[cursor]) | |
| end_slope = float(values[cursor + 1]) | |
| end_value = float(values[cursor + 2]) | |
| cursor += 3 | |
| coeffs = _coeffs_from_requirements(initial, end_slope, end_value, x_end) | |
| y = evaluate_cubic_no_intercept(coeffs, x) | |
| fitted = fit_cubic_no_intercept(x, y, degree=3) | |
| if fitted is None: | |
| raise RuntimeError(f"Failed to fit cubic coefficients for requirement index {idx}") | |
| coeff_rows.append(fitted) | |
| req_id = spec["id"] | |
| if req_id == "11_sig11": | |
| curves["eps11"] = x | |
| curves["sig11_mpa"] = y | |
| elif req_id == "11_eps22": | |
| curves["eps22_from_eps11"] = y | |
| elif req_id == "11_eps33": | |
| curves["eps33_from_eps11"] = y | |
| elif req_id == "22_sig22": | |
| curves["eps22"] = x | |
| curves["sig22_mpa"] = y | |
| elif req_id == "22_eps11": | |
| curves["eps11_from_eps22"] = y | |
| elif req_id == "22_eps33": | |
| curves["eps33_from_eps22"] = y | |
| elif req_id == "12_sig12": | |
| curves["eps12"] = x | |
| curves["sig12_mpa"] = y | |
| return curves, np.stack(coeff_rows, axis=0).astype(np.float32) | |
| def _load_condition_defaults_from_test(idx: int) -> List[float]: | |
| arr = np.load(os.path.join(DATA_DIR, "test_data.npz"))["polynomial_coefficients"] | |
| coeffs = np.asarray(arr[int(idx) % len(arr)], dtype=np.float32).reshape(7, 3) | |
| reqs = _requirements_from_coeffs(coeffs) | |
| values: List[float] = [] | |
| for spec in REQUIREMENTS: | |
| req_id = spec["id"] | |
| if spec["input_kind"] == "ratio2": | |
| values.extend([float(reqs[f"{req_id}_initial"]), float(reqs[f"{req_id}_end_ratio"])]) | |
| else: | |
| values.extend( | |
| [ | |
| float(reqs[f"{req_id}_initial"]), | |
| float(reqs[f"{req_id}_end_slope"]), | |
| float(reqs[f"{req_id}_end_value"]), | |
| ] | |
| ) | |
| return values | |
| def _load_exact_test_coeffs(idx: int) -> np.ndarray: | |
| arr = np.load(os.path.join(DATA_DIR, "test_data.npz"))["polynomial_coefficients"] | |
| coeffs = np.asarray(arr[int(idx) % len(arr)], dtype=np.float32).reshape(7, 3) | |
| return coeffs | |
| def _load_exact_test_payload(idx: int) -> str: | |
| data = np.load(os.path.join(DATA_DIR, "test_data.npz")) | |
| idx = int(idx) % int(len(data["polynomial_coefficients"])) | |
| coeffs = np.asarray(data["polynomial_coefficients"][idx], dtype=np.float32).reshape(7, 3) | |
| mat_id = int(data["material_type"][idx]) | |
| vf = float(data["volume_fraction"][idx]) | |
| seq_len = int(data["stacking_sequence_lengths"][idx]) | |
| quarter_angles = [float(a) for a in data["stacking_sequence"][idx][:seq_len]] | |
| return json.dumps( | |
| { | |
| "coeffs": coeffs.tolist(), | |
| "ground_truth": { | |
| "test_index": idx, | |
| "mat_type": MATERIAL_TYPE_NAMES.get(mat_id, str(mat_id)), | |
| "vf": vf, | |
| "quarter_angles": quarter_angles, | |
| "full_angles": _full_angles_from_quarter(quarter_angles), | |
| }, | |
| } | |
| ) | |
| def _condition_payload_from_coeffs(cond_coeffs: np.ndarray, source: str, ground_truth: dict[str, Any] | None = None) -> str: | |
| payload: dict[str, Any] = { | |
| "source": source, | |
| "coeffs": np.asarray(cond_coeffs, dtype=np.float32).reshape(7, 3).tolist(), | |
| } | |
| if ground_truth: | |
| payload["ground_truth"] = ground_truth | |
| return json.dumps(payload) | |
| def _curves_from_coeffs(cond_coeffs: np.ndarray) -> dict[str, np.ndarray]: | |
| coeffs = np.asarray(cond_coeffs, dtype=np.float32).reshape(7, 3) | |
| curves: dict[str, np.ndarray] = {} | |
| mapping = [ | |
| ("11_sig11", "11"), | |
| ("11_eps22", "11"), | |
| ("11_eps33", "11"), | |
| ("22_sig22", "22"), | |
| ("22_eps11", "22"), | |
| ("22_eps33", "22"), | |
| ("12_sig12", "12"), | |
| ] | |
| for idx, (req_id, mode) in enumerate(mapping): | |
| x = default_x_for_mode(mode) | |
| y = evaluate_cubic_no_intercept(coeffs[idx], x) | |
| if req_id == "11_sig11": | |
| curves["eps11"] = x | |
| curves["sig11_mpa"] = y | |
| elif req_id == "11_eps22": | |
| curves["eps22_from_eps11"] = y | |
| elif req_id == "11_eps33": | |
| curves["eps33_from_eps11"] = y | |
| elif req_id == "22_sig22": | |
| curves["eps22"] = x | |
| curves["sig22_mpa"] = y | |
| elif req_id == "22_eps11": | |
| curves["eps11_from_eps22"] = y | |
| elif req_id == "22_eps33": | |
| curves["eps33_from_eps22"] = y | |
| elif req_id == "12_sig12": | |
| curves["eps12"] = x | |
| curves["sig12_mpa"] = y | |
| return curves | |
| def _serialize_designs(designs: list[Dict[str, Any]]) -> str: | |
| return json.dumps(designs) | |
| def _deserialize_designs(payload: str) -> list[Dict[str, Any]]: | |
| if not payload: | |
| return [] | |
| return json.loads(payload) | |
| def _format_design_choice(design: Dict[str, Any]) -> str: | |
| return ( | |
| f"Gen {design['design_id']} | score={design['score']:.4g} | " | |
| f"{design['mat_type']} | vf={design['vf']:.4f} | q={design['quarter_angles']}" | |
| ) | |
| def _format_ground_truth_summary(gt: Dict[str, Any]) -> str: | |
| return ( | |
| f"Gen GT: {gt['mat_type']}, vf={float(gt['vf']):.4f}, " | |
| f"q={gt['quarter_angles']}, score=ground truth" | |
| ) | |
| def _design_markdown(design: Dict[str, Any]) -> str: | |
| quarter = ", ".join(f"{a:.1f}" for a in design["quarter_angles"]) | |
| full = ", ".join(f"{a:.1f}" for a in design["full_angles"]) | |
| return ( | |
| f"Selected design: Gen {design['design_id']}\n\n" | |
| f"Material: {design['mat_type']} ({design['matrix']} matrix, {design['fiber']} fiber)\n\n" | |
| f"VF: {design['vf']:.4f}\n\n" | |
| f"Quarter angles: {quarter}\n\n" | |
| f"Full stack used by simulation: {full}\n\n" | |
| f"Similarity score: {design['score']:.6g}" | |
| ) | |
| def _run_generation_model(cond_coeffs: np.ndarray, n_generate: int) -> list[dict[str, Any]]: | |
| meta = _get_meta(DATA_DIR) | |
| device = _runtime_device() | |
| bundle = _get_model(CHECKPOINT_DIR, device=device, angle_resolution=ANGLE_RESOLUTION) | |
| cond_norm = normalize_condition_v5(cond_coeffs, meta, NORMALIZATION_METHOD) | |
| n_generate = int(max(1, min(int(n_generate), MAX_N_GENERATE))) | |
| cond_t = torch.tensor(cond_norm, dtype=torch.float32, device=device).view(1, -1).repeat(n_generate, 1) | |
| angle_categories = _angle_categories_from_meta(meta, ANGLE_RESOLUTION) if bundle.use_discrete_angles else None | |
| out = sample( | |
| model=bundle.model, | |
| disc_diff_mat=bundle.disc_diff_mat, | |
| disc_diff_vf_category=bundle.disc_diff_vf_category, | |
| disc_diff_layer=bundle.disc_diff_layer, | |
| disc_diff_angle=bundle.disc_diff_angle, | |
| cont_diff=bundle.cont_diff, | |
| cond=cond_t, | |
| mask_ids=bundle.mask_ids, | |
| device=device, | |
| remask_prob=0.1, | |
| use_discrete_angles=bundle.use_discrete_angles, | |
| ) | |
| generated: list[dict[str, Any]] = [] | |
| for i in range(n_generate): | |
| sample_i = { | |
| "material_t": out["material_t"][i : i + 1], | |
| "vf_category_t": out["vf_category_t"][i : i + 1], | |
| "layer_t": out["layer_t"][i : i + 1], | |
| "angle_t": out["angle_t"][i : i + 1], | |
| } | |
| mat_type, vf, quarter_angles = postprocess_sample( | |
| sample_i, | |
| use_discrete_angles=bundle.use_discrete_angles, | |
| angle_categories_deg=angle_categories, | |
| ) | |
| generated.append( | |
| { | |
| "design_id": i + 1, | |
| "mat_type": mat_type, | |
| "vf": float(vf), | |
| "quarter_angles": [float(a) for a in quarter_angles], | |
| } | |
| ) | |
| return generated | |
| def ui_generate_required_curves(*values: float): | |
| curves, cond_coeffs = _build_required_curves_and_condition(list(values)) | |
| return plot_required_curves_from_series(curves), _condition_payload_from_coeffs(cond_coeffs, source="manual") | |
| def ui_set_buttons(preview: bool, load: bool, design: bool, thermo: bool): | |
| return ( | |
| gr.update(interactive=preview), | |
| gr.update(interactive=load), | |
| gr.update(interactive=design), | |
| gr.update(interactive=thermo), | |
| ) | |
| def ui_disable_all_buttons(): | |
| return ui_set_buttons(False, False, False, False) | |
| def ui_clear_design_outputs(): | |
| return ( | |
| None, | |
| "", | |
| "", | |
| "", | |
| gr.update(choices=[], value=None), | |
| NO_SELECTED_DESIGN, | |
| "", | |
| NO_SELECTED_DESIGN, | |
| THERMO_NOT_RUN, | |
| gr.update(value=0.0), | |
| gr.update(value=0.0), | |
| gr.update(value=0.0), | |
| gr.update(value=0.0), | |
| ) | |
| def ui_generation_started(): | |
| message = ( | |
| "### Requesting ZeroGPU for candidate generation\n\n" | |
| "The Space is requesting a temporary CUDA device from Hugging Face ZeroGPU. " | |
| "If generation stops here or a backend error appears, ZeroGPU allocation likely failed " | |
| "before CUDA became available, or the account/organization quota is exhausted. " | |
| "Retry after quota is restored or switch the Space hardware to `Nvidia T4 small`." | |
| ) | |
| return ( | |
| None, | |
| "Requesting ZeroGPU allocation...", | |
| message, | |
| "Requesting ZeroGPU allocation for candidate generation.", | |
| gr.update(choices=[], value=None), | |
| NO_SELECTED_DESIGN, | |
| "", | |
| NO_SELECTED_DESIGN, | |
| THERMO_NOT_RUN, | |
| gr.update(value=0.0), | |
| gr.update(value=0.0), | |
| gr.update(value=0.0), | |
| gr.update(value=0.0), | |
| ) | |
| def ui_manual_requirements_changed(): | |
| return ( | |
| "", | |
| "Requirements changed. Generate curves before inverse design.", | |
| None, | |
| *ui_clear_design_outputs(), | |
| *ui_set_buttons(True, False, False, False), | |
| ) | |
| def ui_test_index_changed(): | |
| return ( | |
| "", | |
| "Test index changed. Load the test condition before inverse design.", | |
| None, | |
| *ui_clear_design_outputs(), | |
| *ui_set_buttons(False, True, False, False), | |
| ) | |
| def ui_input_mode_changed(mode: str): | |
| manual = mode == "Define by Input Values" | |
| status = "" | |
| if manual: | |
| status = "Manual requirement mode is active. Generate curves before inverse design." | |
| else: | |
| status = "Load-from-test mode is active. Load a test condition before inverse design." | |
| return ( | |
| gr.update(visible=manual), | |
| gr.update(visible=not manual), | |
| "", | |
| status, | |
| None, | |
| *ui_clear_design_outputs(), | |
| *ui_set_buttons(manual, not manual, False, False), | |
| ) | |
| def ui_load_test_condition(idx: int): | |
| vals = _load_condition_defaults_from_test(int(idx)) | |
| coeffs = _load_exact_test_coeffs(int(idx)) | |
| curves = _curves_from_coeffs(coeffs) | |
| payload = _load_exact_test_payload(int(idx)) | |
| return (*vals, f"Loaded v5 test condition `{int(idx)}`.", plot_required_curves_from_series(curves), payload) | |
| def ui_after_requirement_ready(mode: str): | |
| manual = mode == "Define by Input Values" | |
| return ui_set_buttons(manual, not manual, True, False) | |
| def ui_inverse_design(n_generate: float, mode: str, load_idx: float, *values: float): | |
| gt_summary = "" | |
| if mode == "Load from Test Data": | |
| payload = json.loads(_load_exact_test_payload(int(load_idx))) | |
| cond_coeffs = np.asarray(payload["coeffs"], dtype=np.float32).reshape(7, 3) | |
| curves = _curves_from_coeffs(cond_coeffs) | |
| gt = payload.get("ground_truth", {}) | |
| if gt: | |
| gt_summary = _format_ground_truth_summary(gt) | |
| else: | |
| curves, cond_coeffs = _build_required_curves_and_condition(list(values)) | |
| req_plot = plot_required_curves_from_series(curves) | |
| n_generate_int = int(max(1, min(int(n_generate), MAX_N_GENERATE))) | |
| try: | |
| generated = _run_generation_model(cond_coeffs, n_generate_int) | |
| except Exception as exc: | |
| exc_text = str(exc) | |
| if "No CUDA GPUs are available" in exc_text or exc_text.strip("'\"") == "RuntimeError": | |
| title = "ZeroGPU allocation failed or returned a generic backend error." | |
| elif "ZeroGPU quota" in exc_text or "quota" in exc_text.lower(): | |
| title = "ZeroGPU quota is exhausted or unavailable." | |
| else: | |
| title = "Generation failed before candidate simulation." | |
| message = ( | |
| f"{title}\n\n" | |
| f"Error: {type(exc).__name__}: {exc_text}\n\n" | |
| "Hugging Face's native ZeroGPU waiting popup only appears while a GPU allocation is actively queued. " | |
| "If the ZeroGPU backend immediately fails, times out, or returns a generic `RuntimeError`, the app may " | |
| "only receive this exception after waiting. Retry after quota is restored, authenticate for more quota, " | |
| "or switch the Space hardware to `Nvidia T4 small`." | |
| ) | |
| return ( | |
| req_plot, | |
| None, | |
| message, | |
| f"### {title}\n\n{message}", | |
| message, | |
| gr.update(choices=[], value=None), | |
| title, | |
| "", | |
| ) | |
| simulations_by_design: Dict[int, Dict[str, Dict[str, np.ndarray]]] = {} | |
| ranked: list[Dict[str, Any]] = [] | |
| summaries: list[str] = [] | |
| logs: list[str] = [] | |
| for design in generated: | |
| mat_type = design["mat_type"] | |
| vf = float(design["vf"]) | |
| quarter_angles = [float(a) for a in design["quarter_angles"]] | |
| try: | |
| sim = simulate_instances( | |
| curve_dir=CURVE_DIR, | |
| mat_type=mat_type, | |
| vf=vf, | |
| quarter_angles=quarter_angles, | |
| instances=[int(design["design_id"])], | |
| num_output_points=20, | |
| ) | |
| simulations_by_design.update(sim) | |
| score = _score_design(cond_coeffs, sim.get(int(design["design_id"]), {})) | |
| matrix, fiber = _mat_type_to_matrix_fiber(mat_type) | |
| ranked.append( | |
| { | |
| **design, | |
| "score": float(score), | |
| "matrix": matrix, | |
| "fiber": fiber, | |
| "full_angles": _full_angles_from_quarter(quarter_angles), | |
| } | |
| ) | |
| summaries.append( | |
| f"Gen {design['design_id']}: {mat_type}, vf={vf:.4f}, q={quarter_angles}, score={score:.6g}" | |
| ) | |
| logs.append(f"Simulation complete for generated design {design['design_id']}.") | |
| except Exception as exc: | |
| logs.append(f"Simulation failed for generated design {design['design_id']}: {type(exc).__name__}: {exc}") | |
| ranked = sorted(ranked, key=lambda d: d["score"]) | |
| fig = plot_condition_and_simulations_v5(cond_coeffs, simulations_by_design) | |
| if ranked: | |
| selected = ranked[0] | |
| choices = [_format_design_choice(d) for d in ranked] | |
| payload = _serialize_designs(ranked) | |
| return ( | |
| req_plot, | |
| fig, | |
| ((gt_summary + "\n") if gt_summary else "") + "\n".join(summaries), | |
| "\n".join(logs) if logs else "Done.", | |
| "", | |
| gr.update(choices=choices, value=choices[0]), | |
| _design_markdown(selected), | |
| payload, | |
| ) | |
| return ( | |
| req_plot, | |
| fig, | |
| "No valid generated designs were scored.", | |
| "\n".join(logs) if logs else "Generation failed.", | |
| "### No valid generated designs were scored.\n\nSee the run log for simulation details.", | |
| gr.update(choices=[], value=None), | |
| "No selected design is available.", | |
| "", | |
| ) | |
| def ui_after_design_complete(mode: str, payload: str): | |
| manual = mode == "Define by Input Values" | |
| thermo_ready = bool(payload) | |
| return ui_set_buttons(manual, not manual, True, thermo_ready) | |
| def ui_select_design(selected_label: str, payload: str): | |
| designs = _deserialize_designs(payload) | |
| if not designs: | |
| return "No selected design is available." | |
| for design in designs: | |
| if _format_design_choice(design) == selected_label: | |
| return _design_markdown(design) | |
| return _design_markdown(designs[0]) | |
| def ui_thermoforming(selected_label: str, payload: str, angle_a: float, angle_b: float, angle_c: float, max_stress: float): | |
| designs = _deserialize_designs(payload) | |
| if not designs: | |
| return "No selected design is available.", "Thermoforming cannot run before inverse design." | |
| selected = designs[0] | |
| for design in designs: | |
| if _format_design_choice(design) == selected_label: | |
| selected = design | |
| break | |
| domain_issues: list[str] = [] | |
| if selected["fiber"] != "Carbon": | |
| domain_issues.append( | |
| f"unsupported fiber `{selected['fiber']}`; thermoforming surrogate was trained only on carbon-fiber systems" | |
| ) | |
| if not (0.4 <= float(selected["vf"]) <= 0.6): | |
| domain_issues.append( | |
| f"volume fraction {float(selected['vf']):.4f} is outside the surrogate training range [0.4, 0.6]" | |
| ) | |
| n_layers = int(len(selected["full_angles"])) | |
| if not (2 <= n_layers <= 8): | |
| domain_issues.append( | |
| f"ply count {n_layers} is outside the surrogate training range [2, 8]" | |
| ) | |
| cf_mat = "CF/PEEK" if selected["matrix"] == "HDPE" else "CF/PA6" | |
| best = thermoforming_inverse_design( | |
| material=cf_mat, | |
| ply_number=n_layers, | |
| fiber_vf=float(selected["vf"]), | |
| y_target=[float(angle_a), float(angle_b), float(angle_c), float(max_stress)], | |
| n_restarts=THERMO_N_RESTARTS, | |
| epochs=THERMO_EPOCHS, | |
| ) | |
| warning_md = "" | |
| if domain_issues: | |
| warning_md = "Warning: surrogate is extrapolating outside its training domain.\n\n" + "\n".join( | |
| f"- {issue}" for issue in domain_issues | |
| ) + "\n\n" | |
| result_md = ( | |
| warning_md | |
| + f"Thermoforming surrogate material: {cf_mat}\n\n" | |
| f"Forming temperature (C): {best['input'][0]:.3f}\n\n" | |
| f"Punching velocity (mm/s): {best['input'][1]:.3f}\n\n" | |
| f"Cooling time (s): {best['input'][2]:.3f}\n\n" | |
| f"Predicted angle A (deg): {best['output'][0]:.3f}\n\n" | |
| f"Predicted angle B (deg): {best['output'][1]:.3f}\n\n" | |
| f"Predicted angle C (deg): {best['output'][2]:.3f}\n\n" | |
| f"Predicted max residual stress (MPa): {best['output'][3]:.3f}" | |
| ) | |
| return _design_markdown(selected), result_md | |
| def build_app(): | |
| defaults = _default_requirements(DATA_DIR) | |
| input_components: list[gr.components.Component] = [] | |
| css = """ | |
| .curve-input-row { gap: 8px !important; flex-wrap: nowrap !important; } | |
| .curve-card { min-width: 180px !important; } | |
| .gradio-container .gr-number input[type="number"] { | |
| width: 96px !important; | |
| min-width: 96px !important; | |
| } | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| gr.Markdown("# Quarter Material Inverse Design") | |
| gr.Markdown(_runtime_summary()) | |
| gr.Markdown( | |
| "Requirement curves are defined first, then batched candidates are generated, " | |
| "the closest design is selected, and thermoforming is run from that selected design." | |
| ) | |
| gr.Markdown("## 1. Define Material Requirements") | |
| input_mode = gr.Radio( | |
| choices=["Define by Input Values", "Load from Test Data"], | |
| value="Define by Input Values", | |
| label="Requirement source", | |
| ) | |
| with gr.Group(visible=True) as manual_panel: | |
| gr.Markdown("### Manual Requirement Input") | |
| with gr.Row(elem_classes=["curve-input-row"]): | |
| for spec in REQUIREMENTS: | |
| req_id = spec["id"] | |
| with gr.Column(elem_classes=["curve-card"]): | |
| gr.Markdown(f"**{spec['title']}**") | |
| if spec["input_kind"] == "ratio2": | |
| pct = int(round(float(default_x_for_mode(spec["mode"])[-1]) * 100)) | |
| ratio_name = str(spec["ratio_name"]) | |
| input_components.append( | |
| gr.Number(value=float(defaults[f"{req_id}_initial"]), label=f"Initial {ratio_name}", precision=6) | |
| ) | |
| input_components.append( | |
| gr.Number(value=float(defaults[f"{req_id}_end_ratio"]), label=f"{pct}% strain {ratio_name}", precision=6) | |
| ) | |
| else: | |
| l1, l2, l3 = _curve_field_labels(spec["mode"], spec["response"]) | |
| input_components.append(gr.Number(value=float(defaults[f"{req_id}_initial"]), label=l1, precision=6)) | |
| input_components.append(gr.Number(value=float(defaults[f"{req_id}_end_slope"]), label=l2, precision=6)) | |
| input_components.append(gr.Number(value=float(defaults[f"{req_id}_end_value"]), label=l3, precision=6)) | |
| preview_btn = gr.Button("Generate Required Stress-Strain Curves") | |
| with gr.Group(visible=False) as test_panel: | |
| gr.Markdown("### Load Requirement Curves from Saved Test Data") | |
| with gr.Row(): | |
| load_test_idx = gr.Number(value=DEFAULT_TEST_INDEX, precision=0, label="Load v5 test condition index") | |
| load_btn = gr.Button("Load Test Condition", interactive=False) | |
| n_generate = gr.Slider( | |
| minimum=1, | |
| maximum=MAX_N_GENERATE, | |
| step=1, | |
| value=DEFAULT_N_GENERATE, | |
| label="Number of generated candidates", | |
| ) | |
| design_btn = gr.Button("Generate Parameter Sets", interactive=False) | |
| load_status = gr.Markdown("") | |
| req_plot = gr.Plot(label="Required Stress-Strain Curves") | |
| exact_test_coeffs = gr.Textbox(value="", visible=False, label="Ready condition payload") | |
| gr.Markdown("## 2. Material Inverse Design") | |
| design_plot = gr.Plot(label="Conditioned vs Generated Curves") | |
| design_summary = gr.Textbox(label="Generated Designs", lines=6) | |
| generation_alert = gr.Markdown("") | |
| run_log = gr.Textbox(label="Run Log", lines=8, interactive=False) | |
| gr.Markdown("## 3. Choose the Closest Generated Design") | |
| design_choice = gr.Radio(choices=[], label="Generated designs") | |
| selected_design_md = gr.Markdown(NO_SELECTED_DESIGN) | |
| design_payload = gr.Textbox(value="", visible=False, label="Generated design payload") | |
| gr.Markdown("## 4. Thermoforming Requirements") | |
| with gr.Row(): | |
| thermo_image = gr.Image( | |
| value=os.path.join(APP_DIR, "figures", "forming_angle.png"), | |
| label="Thermoforming geometry", | |
| interactive=False, | |
| ) | |
| angle_a = gr.Number(value=0.0, label="Maximum warpage angle A (degree)", precision=3) | |
| angle_b = gr.Number(value=0.0, label="Maximum warpage angle B (degree)", precision=3) | |
| angle_c = gr.Number(value=0.0, label="Maximum warpage angle C (degree)", precision=3) | |
| max_stress = gr.Number(value=0.0, label="Maximum residual stress (MPa)", precision=3) | |
| thermo_btn = gr.Button("Thermoforming Process Design", interactive=False) | |
| thermo_selected_md = gr.Markdown(NO_SELECTED_DESIGN) | |
| thermo_result_md = gr.Markdown(THERMO_NOT_RUN) | |
| input_mode.change( | |
| fn=ui_input_mode_changed, | |
| inputs=[input_mode], | |
| outputs=[ | |
| manual_panel, | |
| test_panel, | |
| exact_test_coeffs, | |
| load_status, | |
| req_plot, | |
| design_plot, | |
| design_summary, | |
| generation_alert, | |
| run_log, | |
| design_choice, | |
| selected_design_md, | |
| design_payload, | |
| thermo_selected_md, | |
| thermo_result_md, | |
| angle_a, | |
| angle_b, | |
| angle_c, | |
| max_stress, | |
| preview_btn, | |
| load_btn, | |
| design_btn, | |
| thermo_btn, | |
| ], | |
| show_progress=False, | |
| ) | |
| for component in input_components: | |
| component.input( | |
| fn=ui_manual_requirements_changed, | |
| outputs=[ | |
| exact_test_coeffs, | |
| load_status, | |
| req_plot, | |
| design_plot, | |
| design_summary, | |
| generation_alert, | |
| run_log, | |
| design_choice, | |
| selected_design_md, | |
| design_payload, | |
| thermo_selected_md, | |
| thermo_result_md, | |
| angle_a, | |
| angle_b, | |
| angle_c, | |
| max_stress, | |
| preview_btn, | |
| load_btn, | |
| design_btn, | |
| thermo_btn, | |
| ], | |
| show_progress=False, | |
| ) | |
| load_test_idx.input( | |
| fn=ui_test_index_changed, | |
| outputs=[ | |
| exact_test_coeffs, | |
| load_status, | |
| req_plot, | |
| design_plot, | |
| design_summary, | |
| generation_alert, | |
| run_log, | |
| design_choice, | |
| selected_design_md, | |
| design_payload, | |
| thermo_selected_md, | |
| thermo_result_md, | |
| angle_a, | |
| angle_b, | |
| angle_c, | |
| max_stress, | |
| preview_btn, | |
| load_btn, | |
| design_btn, | |
| thermo_btn, | |
| ], | |
| show_progress=False, | |
| ) | |
| load_btn.click(fn=ui_disable_all_buttons, outputs=[preview_btn, load_btn, design_btn, thermo_btn], show_progress=False).then( | |
| fn=ui_load_test_condition, | |
| inputs=[load_test_idx], | |
| outputs=input_components + [load_status, req_plot, exact_test_coeffs], | |
| show_progress=False, | |
| ).then( | |
| fn=ui_clear_design_outputs, | |
| outputs=[ | |
| design_plot, | |
| design_summary, | |
| generation_alert, | |
| run_log, | |
| design_choice, | |
| selected_design_md, | |
| design_payload, | |
| thermo_selected_md, | |
| thermo_result_md, | |
| angle_a, | |
| angle_b, | |
| angle_c, | |
| max_stress, | |
| ], | |
| show_progress=False, | |
| ).then( | |
| fn=ui_after_requirement_ready, | |
| inputs=[input_mode], | |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], | |
| show_progress=False, | |
| ) | |
| preview_btn.click( | |
| fn=ui_disable_all_buttons, | |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], | |
| show_progress=False, | |
| ).then( | |
| fn=ui_generate_required_curves, | |
| inputs=input_components, | |
| outputs=[req_plot, exact_test_coeffs], | |
| ).then( | |
| fn=ui_clear_design_outputs, | |
| outputs=[ | |
| design_plot, | |
| design_summary, | |
| generation_alert, | |
| run_log, | |
| design_choice, | |
| selected_design_md, | |
| design_payload, | |
| thermo_selected_md, | |
| thermo_result_md, | |
| angle_a, | |
| angle_b, | |
| angle_c, | |
| max_stress, | |
| ], | |
| show_progress=False, | |
| ).then( | |
| fn=ui_after_requirement_ready, | |
| inputs=[input_mode], | |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], | |
| show_progress=False, | |
| ) | |
| design_btn.click( | |
| fn=ui_disable_all_buttons, | |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], | |
| show_progress=False, | |
| ).then( | |
| fn=ui_generation_started, | |
| outputs=[ | |
| design_plot, | |
| design_summary, | |
| generation_alert, | |
| run_log, | |
| design_choice, | |
| selected_design_md, | |
| design_payload, | |
| thermo_selected_md, | |
| thermo_result_md, | |
| angle_a, | |
| angle_b, | |
| angle_c, | |
| max_stress, | |
| ], | |
| show_progress=False, | |
| ).then( | |
| fn=ui_inverse_design, | |
| inputs=[n_generate, input_mode, load_test_idx] + input_components, | |
| outputs=[ | |
| req_plot, | |
| design_plot, | |
| design_summary, | |
| run_log, | |
| generation_alert, | |
| design_choice, | |
| selected_design_md, | |
| design_payload, | |
| ], | |
| ).then( | |
| fn=ui_after_design_complete, | |
| inputs=[input_mode, design_payload], | |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], | |
| show_progress=False, | |
| ) | |
| design_choice.change( | |
| fn=ui_select_design, | |
| inputs=[design_choice, design_payload], | |
| outputs=[selected_design_md], | |
| show_progress=False, | |
| ) | |
| thermo_btn.click( | |
| fn=ui_disable_all_buttons, | |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], | |
| show_progress=False, | |
| ).then( | |
| fn=ui_thermoforming, | |
| inputs=[design_choice, design_payload, angle_a, angle_b, angle_c, max_stress], | |
| outputs=[thermo_selected_md, thermo_result_md], | |
| ).then( | |
| fn=ui_after_design_complete, | |
| inputs=[input_mode, design_payload], | |
| outputs=[preview_btn, load_btn, design_btn, thermo_btn], | |
| show_progress=False, | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| build_app().launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860"))) | |