Spaces:
Running
Running
| # make_data_grid.py (clean + robust paths) | |
| from __future__ import annotations | |
| from decimal import Decimal | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Union | |
| import itertools | |
| import warnings | |
| import os | |
| import numpy as np | |
| import pandas as pd | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| # ============================= | |
| # Grid & helpers | |
| # ============================= | |
| def dseq(start: float, stop: float, step: float, q: str = "0.001") -> List[float]: | |
| s, e, st = map(lambda x: Decimal(str(x)), [start, stop, step]) | |
| vals, cur = [], s | |
| while cur <= e + Decimal("1e-12"): | |
| vals.append(float(cur.quantize(Decimal(q)))) | |
| cur += st | |
| return vals | |
| def bead_to_lr(bead_value: Union[str, None]) -> Tuple[int, int]: | |
| mapping = {None:(2,2), "none":(0,0), "right":(0,1), "left":(1,0), "double":(1,1)} | |
| key = bead_value.lower() if isinstance(bead_value, str) else bead_value | |
| return mapping.get(key, (0,0)) | |
| def make_all_combinations(cfg: Dict) -> pd.DataFrame: | |
| bead_values = cfg.get("beads") or [None] | |
| bead_info = [(b, *bead_to_lr(b)) for b in bead_values] | |
| materials = cfg["materials"] | |
| thickness = dseq(cfg["min_thickness"], cfg["max_thickness"], cfg["thickness_step"]) | |
| diameter = [int(x) for x in dseq(cfg["min_diameter"], cfg["max_diameter"], cfg["diameter_step"], q="1")] | |
| upper_r = dseq(cfg["upper_min"], cfg["upper_max"], cfg["upper_step"]) | |
| lower_r = dseq(cfg["lower_min"], cfg["lower_max"], cfg["lower_step"]) | |
| degree = [int(x) for x in dseq(cfg["min_degree"], cfg["max_degree"], cfg["degree_step"], q="1")] | |
| grid = itertools.product(materials, thickness, upper_r, lower_r, diameter, degree, bead_info) | |
| rows = [] | |
| for mat, th, ur, lr, dia, deg, bead_t in grid: | |
| bead_name, lb, rb = bead_t | |
| rows.append((str(mat), float(th), float(ur), float(lr), int(dia), int(deg), bead_name, int(lb), int(rb))) | |
| return pd.DataFrame( | |
| rows, | |
| columns=["material","thickness","upper_radius","lower_radius", | |
| "diameter","degree","bead","LB","RB"] | |
| ) | |
| # ============================= | |
| # Sweep-limit helpers | |
| # ============================= | |
| def build_limit_dicts(df_sweep: pd.DataFrame): | |
| t = df_sweep.copy().replace("F", np.nan) | |
| if "Sweep" in t.columns: | |
| t = t.set_index("Sweep") | |
| new_cols = [] | |
| for c in t.columns: | |
| try: new_cols.append(int(c)) | |
| except: new_cols.append(c) | |
| t.columns = new_cols | |
| long = t.stack(dropna=False).reset_index() | |
| long.columns = ["row","degree","limit"] | |
| tmp = long["row"].str.extract(r"(?P<diameter>\d+)_(?P<which>upper|lower)_radius") | |
| long = pd.concat([long, tmp], axis=1) | |
| long["diameter"] = pd.to_numeric(long["diameter"], errors="coerce") | |
| long["degree"] = pd.to_numeric(long["degree"], errors="coerce") | |
| long["limit"] = pd.to_numeric(long["limit"], errors="coerce") | |
| upper = long[long["which"]=="upper"].dropna(subset=["diameter","degree"]) | |
| lower = long[long["which"]=="lower"].dropna(subset=["diameter","degree"]) | |
| upper_dict = {(int(d), int(g)): v for d, g, v in zip(upper["diameter"], upper["degree"], upper["limit"])} | |
| lower_dict = {(int(d), int(g)): v for d, g, v in zip(lower["diameter"], lower["degree"], lower["limit"])} | |
| return upper_dict, lower_dict | |
| def filter_grid_by_sweep_limits(df_grid: pd.DataFrame, df_sweep: pd.DataFrame) -> pd.DataFrame: | |
| upper_dict, lower_dict = build_limit_dicts(df_sweep) | |
| key = list(zip(df_grid["diameter"].astype(int), df_grid["degree"].astype(int))) | |
| df_grid = df_grid.copy() | |
| df_grid["limit_upper"] = [upper_dict.get(k, np.nan) for k in key] | |
| df_grid["limit_lower"] = [lower_dict.get(k, np.nan) for k in key] | |
| not_nan = df_grid["limit_upper"].notna() & df_grid["limit_lower"].notna() | |
| within = (df_grid["upper_radius"] <= df_grid["limit_upper"]) & \ | |
| (df_grid["lower_radius"] <= df_grid["limit_lower"]) | |
| return df_grid[not_nan & within].reset_index(drop=True) | |
| # ============================= | |
| # Rule-by-bead helpers | |
| # ============================= | |
| SHEET_BY_BEAD = {"left":"left", "right":"right", "double":"both", "none":"none"} | |
| def _normalize_input_df(df: pd.DataFrame) -> pd.DataFrame: | |
| df2 = df.copy() | |
| df2.columns = [str(c).strip() for c in df2.columns] | |
| rename = {} | |
| for c in df2.columns: | |
| lc = c.lower().strip() | |
| if lc == "diamater": rename[c] = "diameter" | |
| elif lc in ("material","diameter","degree","bead"): rename[c] = lc | |
| df2 = df2.rename(columns=rename) | |
| need = {"material","diameter","degree","bead"} | |
| missing = need - set(df2.columns) | |
| if missing: | |
| raise ValueError(f"์ ๋ ฅ ๋ฐ์ดํฐํ๋ ์์ ํ์ํ ์ปฌ๋ผ์ด ์์ต๋๋ค: {missing}") | |
| df2["bead"] = df2["bead"].astype(str).str.strip().str.lower() | |
| for c in ["material","diameter","degree"]: | |
| df2[c] = pd.to_numeric(df2[c], errors="coerce") | |
| df2 = df2.dropna(subset=["material","diameter","degree"]).copy() | |
| return df2 | |
| def _read_rule_sheet(xlsx_path: str, sheet_name: str) -> pd.DataFrame: | |
| rule = pd.read_excel(xlsx_path, sheet_name=sheet_name) | |
| rule.columns = rule.columns.str.strip().str.lower() | |
| rule = rule.rename(columns={"diamater":"diameter"}) | |
| need_cols = {"material","diameter","min_degree","max_degree"} | |
| missing = need_cols - set(rule.columns) | |
| if missing: | |
| raise ValueError(f"๊ท์น ์ํธ '{sheet_name}'์ ํ์ํ ์ปฌ๋ผ์ด ์์ต๋๋ค: {missing}") | |
| for c in need_cols: | |
| rule[c] = pd.to_numeric(rule[c], errors="coerce") | |
| rule = rule.dropna(subset=list(need_cols)).copy() | |
| rule = rule.astype({"material":"int64","diameter":"int64"}) | |
| return rule[["material","diameter","min_degree","max_degree"]] | |
| def _apply_rules(df_part: pd.DataFrame, rule: pd.DataFrame) -> pd.DataFrame: | |
| if df_part.empty: | |
| return df_part.copy() | |
| df_part = df_part[df_part["material"].isin(rule["material"].unique())].copy() | |
| if df_part.empty: | |
| return df_part | |
| merged = df_part.merge(rule, on=["material","diameter"], how="left") | |
| mask = ( | |
| merged["min_degree"].notna() | |
| & merged["max_degree"].notna() | |
| & (merged["degree"] >= merged["min_degree"]) | |
| & (merged["degree"] <= merged["max_degree"]) | |
| ) | |
| kept = merged.loc[mask, df_part.columns].reset_index(drop=True) | |
| return kept | |
| def filter_all_by_bead(df: pd.DataFrame, rules_xlsx: str) -> pd.DataFrame: | |
| base = _normalize_input_df(df) | |
| outs = [] | |
| for bead_value, sheet in SHEET_BY_BEAD.items(): | |
| part = base[base["bead"] == bead_value].copy() | |
| if part.empty: | |
| continue | |
| rule = _read_rule_sheet(rules_xlsx, sheet) | |
| kept = _apply_rules(part, rule) | |
| outs.append(kept) | |
| if not outs: | |
| return base.iloc[0:0].copy() | |
| result = pd.concat(outs, axis=0, ignore_index=True) | |
| return result.reset_index(drop=True) | |
| # ============================= | |
| # Predictors (blend models) | |
| # ============================= | |
| DISPLAY_TO_MODEL = {"440":"440.0", "590":"590.0", "780":"780.0"} | |
| def _here() -> Path: | |
| try: return Path(__file__).resolve().parent | |
| except Exception: return Path.cwd() | |
| def _abs(p: Path | str) -> str: | |
| return str(Path(p).resolve()) | |
| def _find_art_dir(name: str) -> str: | |
| """ | |
| ์ ๋๊ฒฝ๋ก ํ์ ์ฐ์ ์์: | |
| 1) ํ๊ฒฝ๋ณ์ FS_MF_ART_DIR / FS_THIN_ART_DIR | |
| 2) ์ด ๋ชจ๋ ํ์ผ ๊ธฐ์ค | |
| 3) ํ์ฌ ์์ ๋๋ ํ ๋ฆฌ | |
| 4) ๋ชจ๋ ์์ ๊ฒฝ๋ก | |
| """ | |
| env_map = { | |
| "artifacts_blend": os.getenv("FS_MF_ART_DIR"), | |
| "artifacts_blend_thinning": os.getenv("FS_THIN_ART_DIR"), | |
| } | |
| hinted = env_map.get(name) | |
| if hinted and Path(hinted).exists(): | |
| return _abs(hinted) | |
| for base in (_here(), Path.cwd(), _here().parent): | |
| cand = (base / name).resolve() | |
| if cand.exists(): | |
| return _abs(cand) | |
| return _abs(name) | |
| _predictor_mf = None | |
| _predictor_th = None | |
| def _prep_pred_df(df: pd.DataFrame) -> pd.DataFrame: | |
| need = ["material","thickness","diameter","degree","upper_radius","lower_radius","LB","RB"] | |
| missing = [c for c in need if c not in df.columns] | |
| if missing: | |
| raise ValueError(f"์์ธก ์ ๋ ฅ ์ปฌ๋ผ ๋๋ฝ: {missing}") | |
| x = df.copy() | |
| x["material"] = x["material"].astype(str).map(lambda v: DISPLAY_TO_MODEL.get(v, v)) | |
| x["LB"] = x["LB"].astype(int); x["RB"] = x["RB"].astype(int) | |
| for c in ["thickness","diameter","degree","upper_radius","lower_radius"]: | |
| x[c] = pd.to_numeric(x[c], errors="coerce") | |
| if x[["thickness","diameter","degree","upper_radius","lower_radius"]].isnull().any().any(): | |
| raise ValueError("์ซ์ ์ปฌ๋ผ์ NaN์ด ์์ต๋๋ค.") | |
| return x[need] | |
| def _get_predictor_mf(): | |
| global _predictor_mf | |
| if _predictor_mf is not None: | |
| return _predictor_mf | |
| from predict_blend import BlendPredictor | |
| art_dir = _find_art_dir("artifacts_blend") | |
| _predictor_mf = BlendPredictor(art_dir) | |
| return _predictor_mf | |
| def _get_predictor_th(): | |
| global _predictor_th | |
| if _predictor_th is not None: | |
| return _predictor_th | |
| try: | |
| from predict_blend_thinning import BlendPredictor as ThinPredictor | |
| art_dir = _find_art_dir("artifacts_blend_thinning") | |
| _predictor_th = ThinPredictor(art_dir) | |
| return _predictor_th | |
| except FileNotFoundError: | |
| # ํด๋๊ฐ ์์ ๋ ๊ฐ๋จ ํด๋ฆฌ์คํฑ ํด๋ฐฑ | |
| def _heuristic_thinning(thickness, upperR, lowerR): | |
| t = float(thickness); ur = float(upperR); lr = float(lowerR) | |
| base = 0.18 + (0.9 - t) * 0.25 | |
| geom = max(0.0, ur - lr) * 0.01 | |
| return float(max(0.05, min(0.8, base + geom))) | |
| class _HeuristicThinPredictor: | |
| def predict_blend(self, df: pd.DataFrame): | |
| return np.array([ | |
| _heuristic_thinning(r["thickness"], r["upper_radius"], r["lower_radius"]) | |
| for _, r in df.iterrows() | |
| ], dtype=float) | |
| _predictor_th = _HeuristicThinPredictor() | |
| return _predictor_th | |
| def predict_max_failure(df: pd.DataFrame) -> np.ndarray: | |
| pred_df = _prep_pred_df(df) | |
| return _get_predictor_mf().predict_blend(pred_df) | |
| def predict_thinning(df: pd.DataFrame) -> np.ndarray: | |
| pred_df = _prep_pred_df(df) | |
| return _get_predictor_th().predict_blend(pred_df) | |