| |
| """ |
| LLS ๊ฒฐํจ ํจํด ์๋ ๋ถ๋ฅ ๋ชจ๋. |
| |
| ์จ์ดํผ ํ ์ฅ(๋๋ ํ ๊ทธ๋ฃน) ์์ ๊ฒฐํจ ์ขํ ์งํฉ์ ์
๋ ฅ์ผ๋ก ๋ฐ์ |
| ํํ(Ring) / ์ ํ(Linear) / ๊ตฐ์ง(Cluster) / Others ์ค ํ๋๋ก ๋ถ๋ฅํ๋ค. |
| |
| ๋ถ๋ฅ ํ์ดํ๋ผ์ธ |
| ---------------- |
| 1. HDBSCAN์ผ๋ก 1์ฐจ ํด๋ฌ์คํฐ๋ง โ outlier(-1) ์ ๊ฑฐ |
| โ ์คํจ ์ DBSCAN fallback |
| 2. LOF๋ก 2์ฐจ outlier ์ ๊ฑฐ (์ง์ญ ๋ฐ๋ ๊ธฐ๋ฐ) |
| 3. inlier ์งํฉ์ ๋ํด ํจํด ํ๋ณด ํ๊ฐ (์ฐ์ ์์ ์) |
| (a) ํํ ๊ฒ์ถ : ์ ํผํ
RMSE + ๊ฐ๋ ์ปค๋ฒ๋ฆฌ์ง + ์๊ณ sector ์ปค๋ฒ๋ฆฌ์ง |
| + PCA ์ ํ์ฑ ๊ฑฐ๋ถ(์์ ํต๊ณผ ์ ํ false-positive ๋ฐฉ์ง) |
| (b) ์ ํ ๊ฒ์ถ : PCA eigenvalue ratio + ์ง์ ํธ์ฐจ + gap ratio |
| (c) ๊ตฐ์ง ๊ฒ์ถ : DBSCAN sub-cluster โ compactness/PCA๋ก ๊ตฐ์ง/์ ํ ์ฌํ์ |
| 4. dominant_zone ๊ณ์ฐ (์๊ฐํ์ฉ) |
| 5. centroid ์ขํ ์ฐ์ถ |
| - ํํ: inlier ์ ์ฒด ํ๊ท |
| - ์ ํ/๊ตฐ์ง: dominant_zone ์ ๋ค์ ํ๊ท (์์ผ๋ฉด inlier ํ๊ท ) |
| |
| API |
| --- |
| - `PatternDetector(config).classify(df)` (๊ถ์ฅ) |
| - `classify_wafer_patterns(df, config)` (๊ตฌ๋ฒ์ ํธํ) |
| |
| ๋ API ๋ชจ๋ `(result_df, dominant_zone, pattern_list, centroid)` ํํ ๋ฐํ. |
| """ |
| from __future__ import annotations |
|
|
| from collections import Counter |
| from typing import Tuple, List, Optional |
|
|
| import numpy as np |
| import pandas as pd |
| import hdbscan |
| from sklearn.decomposition import PCA |
| from sklearn.cluster import DBSCAN |
| from sklearn.neighbors import LocalOutlierFactor |
|
|
| from utils import WaferUtils |
|
|
|
|
| |
| |
| |
| class PatternDetector: |
| """ |
| config๋ฅผ ์ฃผ์
๋ฐ์ LLS ๊ฒฐํจ ํจํด์ ๋ถ๋ฅํ๋ ๊ฒ์ถ๊ธฐ. |
| |
| ์ํ๋ก `self.cfg` ํ ๊ฐ์ง๋ง ๋ณด์ ํ๋ฏ๋ก thread-safeํ๋ฉฐ, |
| ๋์ผ ์ธ์คํด์ค๋ฅผ ์ฌ๋ฌ wafer ๊ทธ๋ฃน์ ๋ฐ๋ณต ์ฌ์ฉํด๋ ๋ฌด๋ฐฉํ๋ค. |
| |
| Parameters |
| ---------- |
| config : dict |
| `lls_config.json` ๊ตฌ์กฐ์ dict. |
| ํ์ํ ํค (์๋ธํธ๋ฆฌ): |
| - preprocessing.inner_radius_mm |
| - clustering.{min_cluster_size, min_samples, cluster_selection_method, |
| dbscan_eps, cluster_dbscan_eps} |
| - lof.{lof_min_points, lof_n_neighbors, lof_contamination} |
| - ring.{ring_min_points, ring_band_width, ring_r_absolute_tolerance, |
| ring_min_angular_coverage, ring_min_sectors, ring_fit_rmse_max, |
| (์ ํ) ring_pca_ratio_max} |
| - linear.{linear_pca_ratio_min, linear_max_deviation, linear_min_length, |
| linear_max_gap_ratio, |
| centroid_linear_min_length, centroid_linear_pca_min, |
| centroid_linear_dev_max} |
| - cluster.cluster_compactness_radius |
| - misc.min_points_for_clustering |
| """ |
|
|
| def __init__(self, config: dict): |
| self.cfg = config |
|
|
| |
| |
| |
| def classify( |
| self, df: pd.DataFrame |
| ) -> Tuple[pd.DataFrame, str, List[str], Optional[tuple]]: |
| """ |
| ๊ฒฐํจ DataFrame์ ๋ฐ์ ํจํด์ ๋ถ๋ฅ. |
| |
| Parameters |
| ---------- |
| df : pd.DataFrame |
| 'coor_x', 'coor_y' ์ปฌ๋ผ์ ๋ฐ๋์ ํฌํจ. inner_radius ๊ธฐ๋ฐ zone ๋ผ๋ฒจ์ |
| ๋ด๋ถ์์ ์๋์ผ๋ก ๋ถ์ฌํ๋ค. |
| |
| Returns |
| ------- |
| result_df : pd.DataFrame |
| ์๋ณธ df + 'inlier' (bool) + 'zone_label'/'r'/'theta_deg' ์ปฌ๋ผ. |
| dominant_zone : str |
| inlier ์ค ๊ฐ์ฅ ๋ง์ด ๋ํ๋ zone_label. inlier๊ฐ ๋น๋ฉด "๋ฐ์ดํฐ ์์"/"N/A". |
| pattern_list : list[str] |
| ["ํํ"] / ["์ ํ"] / ["๊ตฐ์ง"] / ["Others"] / ["์ ์/๋ฏธ๋ฌ"]. |
| centroid : tuple[float, float] | None |
| ํจํด ๋ฐ์ ์ค์ฌ ์ขํ. ๋ถ๋ฅ ์คํจ ์ None. |
| """ |
| cfg = self.cfg |
|
|
| if df.empty: |
| return df, "๋ฐ์ดํฐ ์์", ["None"], None |
|
|
| |
| df = df.copy().reset_index(drop=True) |
| df = WaferUtils.add_zone_labels(df, inner_radius=cfg["preprocessing"]["inner_radius_mm"]) |
| coords = df[["coor_x", "coor_y"]].values |
|
|
| n_total = len(df) |
| if n_total < cfg["misc"]["min_points_for_clustering"]: |
| return (df.assign(inlier=np.zeros(len(df), dtype=bool)), |
| "๋ฐ์ดํฐ ์์", ["์ ์/๋ฏธ๋ฌ"], None) |
|
|
| |
| labels = self._cluster_hdbscan(coords) |
| if np.all(labels == -1): |
| labels = self._cluster_dbscan_fallback(coords) |
| inlier_mask = labels != -1 |
| if not any(inlier_mask): |
| return df.assign(inlier=inlier_mask), "๋ฐ์ดํฐ ์์", ["Others"], None |
|
|
| |
| inlier_mask = self._apply_lof(coords, inlier_mask) |
| inlier_df = df[inlier_mask].copy() |
| inlier_coords = coords[inlier_mask] |
| n_inlier = len(inlier_df) |
|
|
| if n_inlier < cfg["clustering"]["min_cluster_size"]: |
| return df.assign(inlier=inlier_mask), "๋ฐ์ดํฐ ์์", ["Others"], None |
|
|
| |
| if self._is_ring(inlier_df): |
| zone = self._dominant_zone(inlier_df) |
| centroid = tuple(np.mean(inlier_df[["coor_x", "coor_y"]].values, axis=0)) |
| return df.assign(inlier=inlier_mask), zone, ["ํํ"], centroid |
|
|
| if self._is_linear_set(inlier_coords): |
| zone = self._dominant_zone(inlier_df) |
| centroid = self._zone_centroid(inlier_df, inlier_coords, zone) |
| return df.assign(inlier=inlier_mask), zone, ["์ ํ"], centroid |
|
|
| |
| zone = self._dominant_zone(inlier_df) |
| centroid = self._zone_centroid(inlier_df, inlier_coords, zone) |
| pattern = self._classify_cluster_or_sub_linear(inlier_coords) |
| return df.assign(inlier=inlier_mask), zone, [pattern], centroid |
|
|
| |
| |
| |
| def _cluster_hdbscan(self, coords: np.ndarray) -> np.ndarray: |
| """HDBSCAN์ผ๋ก ํด๋ฌ์คํฐ ๋ผ๋ฒจ ์ฐ์ถ. outlier๋ -1.""" |
| c = self.cfg["clustering"] |
| clusterer = hdbscan.HDBSCAN( |
| min_cluster_size=c["min_cluster_size"], |
| min_samples=c["min_samples"], |
| cluster_selection_method=c["cluster_selection_method"], |
| metric="euclidean", |
| gen_min_span_tree=True, |
| ) |
| return clusterer.fit_predict(coords) |
|
|
| def _cluster_dbscan_fallback(self, coords: np.ndarray) -> np.ndarray: |
| """HDBSCAN ์คํจ ์ DBSCAN fallback.""" |
| c = self.cfg["clustering"] |
| return DBSCAN(eps=c["dbscan_eps"], min_samples=c["min_cluster_size"]).fit(coords).labels_ |
|
|
| |
| |
| |
| def _apply_lof(self, coords: np.ndarray, inlier_mask: np.ndarray) -> np.ndarray: |
| """LOF๋ก 1์ฐจ inlier์์ ์ถ๊ฐ outlier ์ ๊ฑฐ.""" |
| lof_cfg = self.cfg["lof"] |
| inlier_coords = coords[inlier_mask] |
| n_inlier = len(inlier_coords) |
| if n_inlier < lof_cfg["lof_min_points"]: |
| return inlier_mask |
|
|
| n_neighbors = min(lof_cfg["lof_n_neighbors"], n_inlier - 1) |
| if n_neighbors < 2: |
| return inlier_mask |
|
|
| lof = LocalOutlierFactor( |
| n_neighbors=n_neighbors, |
| contamination=lof_cfg["lof_contamination"], |
| metric="euclidean", |
| ) |
| lof_labels = lof.fit_predict(inlier_coords) |
| |
| full_mask = np.zeros(len(coords), dtype=bool) |
| full_mask[inlier_mask] = lof_labels == 1 |
| return inlier_mask & full_mask |
|
|
| |
| |
| |
| def _is_ring(self, inlier_df: pd.DataFrame) -> bool: |
| """ |
| ํํ(ring) ํ์ . |
| |
| ๋จ๊ณ |
| ---- |
| 1. ์ต์ ํฌ์ธํธ ์ |
| 2. PCA ์ ํ์ฑ ๊ฑฐ๋ถ: ์ ์ฒด inlier๊ฐ ๊ฐํ ์ ํ์ฑ์ ๋ณด์ด๋ฉด ring ์๋ |
| (์์ ํต๊ณผ ์ ํ false-positive ๋ฐฉ์ง) |
| 3. r-ํ์คํ ๊ทธ๋จ top bin๋ง ์ถ์ถ (main ring band) |
| 4. band ๋ด ์ ์ / r ํญ / ๊ฐ๋ ์ปค๋ฒ๋ฆฌ์ง / sector ์ปค๋ฒ๋ฆฌ์ง |
| 5. ์ ํผํ
RMSE / ์ค์ฌ์ ์์ ๊ทผ์ ๋ |
| """ |
| cfg = self.cfg |
| n_total = len(inlier_df) |
| if n_total < cfg["ring"]["ring_min_points"]: |
| return False |
|
|
| |
| coords = inlier_df[["coor_x", "coor_y"]].values |
| if len(coords) >= 3: |
| pca_all = PCA(n_components=2).fit(coords) |
| if len(pca_all.explained_variance_) >= 2: |
| eig_ratio = pca_all.explained_variance_[0] / (pca_all.explained_variance_[1] + 1e-9) |
| ring_pca_max = cfg["ring"].get("ring_pca_ratio_max", |
| cfg["linear"]["linear_pca_ratio_min"]) |
| if np.sqrt(eig_ratio) >= ring_pca_max: |
| return False |
|
|
| |
| main_ring_df = self._filter_main_ring_band(inlier_df, |
| r_bin_width=cfg["ring"]["ring_band_width"], |
| top_n_bins=1) |
| if len(main_ring_df) < cfg["ring"]["ring_min_points"]: |
| return False |
|
|
| r = main_ring_df["r"].values |
| theta_deg = main_ring_df["theta_deg"].values |
| x = main_ring_df["coor_x"].values |
| y = main_ring_df["coor_y"].values |
|
|
| if r.max() - r.min() > cfg["ring"]["ring_r_absolute_tolerance"]: return False |
| if self._circular_range_deg(theta_deg) < cfg["ring"]["ring_min_angular_coverage"]: return False |
| if not self._check_sector_coverage(theta_deg, min_sectors=cfg["ring"]["ring_min_sectors"]): |
| return False |
|
|
| cx, cy, _, rmse = self._fit_circle_least_squares(x, y) |
| if rmse == np.inf or rmse > cfg["ring"]["ring_fit_rmse_max"]: return False |
| |
| if np.sqrt(cx ** 2 + cy ** 2) > 10.0: return False |
| return True |
|
|
| @staticmethod |
| def _filter_main_ring_band( |
| df: pd.DataFrame, r_bin_width: float = 5.0, top_n_bins: int = 1 |
| ) -> pd.DataFrame: |
| """r-์ถ ํ์คํ ๊ทธ๋จ์์ ์ ์ด ๊ฐ์ฅ ๋ง์ bin(๋ค)์ ์ํ๋ ์ ๋ง ์ถ์ถ.""" |
| if len(df) == 0 or "r" not in df.columns: |
| return df.copy() |
| r = df["r"].values |
| r = r[(r >= 0) & (r <= 150)] |
| if len(r) == 0: |
| return pd.DataFrame(columns=df.columns) |
|
|
| r_bins = np.arange(0, 150 + r_bin_width, r_bin_width) |
| r_hist, r_edges = np.histogram(df["r"].values, bins=r_bins) |
| top_idx = np.argsort(r_hist)[::-1][:top_n_bins] |
|
|
| mask = np.zeros(len(df), dtype=bool) |
| for bi in top_idx: |
| r_min, r_max = r_edges[bi], r_edges[bi + 1] |
| mask |= ((df["r"] >= r_min) & (df["r"] < r_max)).values |
| return df[mask].copy() |
|
|
| @staticmethod |
| def _circular_range_deg(angles_deg: np.ndarray) -> float: |
| """์ํ ๊ฐ๋ ๋ถํฌ์ ์ปค๋ฒ๋ฆฌ์ง (๋, 360ยฐ ์ค).""" |
| if len(angles_deg) < 2: |
| return 0.0 |
| a = np.sort(np.array(angles_deg) % 360.0) |
| gaps = np.diff(a) |
| circ_gap = 360.0 - a[-1] + a[0] |
| return 360.0 - max(np.max(gaps), circ_gap) |
|
|
| @staticmethod |
| def _check_sector_coverage(theta_deg: np.ndarray, min_sectors: int = 8) -> bool: |
| """30ยฐ ๊ฐ๊ฒฉ 12 sector ์ค min_sectors ์ด์ ์ปค๋ฒํ๋์ง.""" |
| if len(theta_deg) == 0: |
| return False |
| sectors = ((theta_deg % 360) // 30).astype(int) % 12 |
| return len(np.unique(sectors)) >= min_sectors |
|
|
| @staticmethod |
| def _fit_circle_least_squares( |
| x: np.ndarray, y: np.ndarray |
| ) -> Tuple[Optional[float], Optional[float], Optional[float], float]: |
| """ |
| ๋์์ ์ต์์ ๊ณฑ ์ ํผํ
. |
| |
| Returns |
| ------- |
| (cx, cy, radius, rmse) โ ์คํจ ์ (None, None, None, inf) |
| """ |
| if len(x) < 3: |
| return None, None, None, np.inf |
| x = x[:, np.newaxis] |
| y = y[:, np.newaxis] |
| A = np.hstack([x, y, np.ones_like(x)]) |
| b = x ** 2 + y ** 2 |
| try: |
| sol, *_ = np.linalg.lstsq(A, b, rcond=None) |
| a, bb, c = sol.flatten() |
| cx, cy = a / 2, bb / 2 |
| radius = np.sqrt((a ** 2 + bb ** 2) / 4 + c) |
| fitted = np.sqrt((x - cx) ** 2 + (y - cy) ** 2) |
| rmse = np.sqrt(np.mean((fitted - radius) ** 2)) |
| return cx, cy, radius, rmse |
| except Exception: |
| return None, None, None, np.inf |
|
|
| |
| |
| |
| def _is_linear_set(self, coords: np.ndarray) -> bool: |
| """์ ์ฒด inlier ์งํฉ์ด ์ง์ ์ ์ถฉ๋ถํ ๊ฐ๊น์ด์ง.""" |
| cfg = self.cfg["linear"] |
| n = len(coords) |
| if n < 3: |
| return False |
|
|
| centroid = np.mean(coords, axis=0) |
| max_dist = np.max(np.linalg.norm(coords - centroid, axis=1)) |
| |
| if 2 * max_dist < cfg["linear_min_length"]: |
| return False |
|
|
| pca = PCA(n_components=min(2, n)).fit(coords) |
| if len(pca.explained_variance_) < 2: |
| return False |
| eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9) |
| if np.sqrt(eig_ratio) < cfg["linear_pca_ratio_min"]: |
| return False |
|
|
| |
| normal = np.array([-pca.components_[0][1], pca.components_[0][0]]) |
| if np.mean(np.abs(np.dot(coords - pca.mean_, normal))) > cfg["linear_max_deviation"]: |
| return False |
|
|
| |
| proj = np.sort(np.dot(coords - pca.mean_, pca.components_[0])) |
| total_len = proj[-1] - proj[0] |
| if total_len > 0 and np.max(np.diff(proj)) / total_len > cfg["linear_max_gap_ratio"]: |
| return False |
| return True |
|
|
| def _is_centroids_linear(self, sub_coords_list: list) -> bool: |
| """์ฌ๋ฌ ์๋ธํด๋ฌ์คํฐ์ ์ค์ฌ์ ๋ค์ด ์ผ์ง์ ์์ ์๋์ง.""" |
| cfg = self.cfg["linear"] |
| if len(sub_coords_list) < 3: |
| return False |
| centroids = np.array([np.mean(sc, axis=0) for sc in sub_coords_list]) |
| max_span = 2 * np.max(np.linalg.norm(centroids - np.mean(centroids, axis=0), axis=1)) |
| if max_span < cfg["centroid_linear_min_length"]: |
| return False |
| pca = PCA(n_components=2).fit(centroids) |
| if len(pca.explained_variance_) < 2: |
| return False |
| if np.sqrt(pca.explained_variance_[0] / |
| (pca.explained_variance_[1] + 1e-9)) < cfg["centroid_linear_pca_min"]: |
| return False |
| normal = np.array([-pca.components_[0][1], pca.components_[0][0]]) |
| if np.mean(np.abs(np.dot(centroids - pca.mean_, normal))) > cfg["centroid_linear_dev_max"]: |
| return False |
| return True |
|
|
| |
| |
| |
| def _classify_cluster_or_sub_linear(self, inlier_coords: np.ndarray) -> str: |
| """ |
| ring/linear ๋ ๋ค ์๋ ๋ ํธ์ถ: ์๋ธ DBSCAN์ผ๋ก ๋ถํ ํ ํจํด ์ฌํ์ . |
| |
| - ์๋ธํด๋ฌ์คํฐ โฅ2๊ฐ์ด๊ณ ์ค์ฌ์ ๋ค์ด ์ผ์ง์ โ ์ ํ |
| - ๊ทธ ์ธ: ๊ฐ ์๋ธ๋ฅผ ๊ตฐ์ง/์ ํ์ผ๋ก ๋ผ๋ฒจ๋ง ํ ๋์ ๋ค์๊ฒฐ |
| """ |
| cfg = self.cfg |
| if len(inlier_coords) < 2: |
| return "๊ตฐ์ง" |
|
|
| sub = DBSCAN(eps=cfg["clustering"]["cluster_dbscan_eps"], |
| min_samples=cfg["clustering"]["min_cluster_size"]).fit(inlier_coords) |
| sub_labels = sub.labels_ |
| n_sub = len(set(sub_labels)) - (1 if -1 in sub_labels else 0) |
|
|
| if n_sub >= 2: |
| sub_list = [inlier_coords[sub_labels == lbl] |
| for lbl in set(sub_labels) if lbl != -1] |
| if self._is_centroids_linear(sub_list): |
| return "์ ํ" |
| results = [(self._classify_subcluster(sc), len(sc)) for sc in sub_list] |
| totals = {} |
| for pat, cnt in results: |
| totals[pat] = totals.get(pat, 0) + cnt |
| return max(totals, key=totals.get) |
| return self._classify_subcluster(inlier_coords) |
|
|
| def _classify_subcluster(self, sub_coords: np.ndarray) -> str: |
| """๋จ์ผ ์๋ธํด๋ฌ์คํฐ๋ฅผ '๊ตฐ์ง' ๋๋ '์ ํ'์ผ๋ก ๋ผ๋ฒจ๋ง.""" |
| cfg = self.cfg |
| n = len(sub_coords) |
| if n < 3: |
| return "๊ตฐ์ง" |
| centroid = np.mean(sub_coords, axis=0) |
| max_dist = np.max(np.linalg.norm(sub_coords - centroid, axis=1)) |
|
|
| |
| if max_dist <= cfg["cluster"]["cluster_compactness_radius"]: |
| return "๊ตฐ์ง" |
|
|
| pca = PCA(n_components=min(2, n)).fit(sub_coords) |
| if len(pca.explained_variance_) >= 2: |
| eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9) |
| shape_idx = np.sqrt(eig_ratio) |
| if shape_idx >= cfg["linear"]["linear_pca_ratio_min"]: |
| normal = np.array([-pca.components_[0][1], pca.components_[0][0]]) |
| mean_dev = np.mean(np.abs(np.dot(sub_coords - pca.mean_, normal))) |
| if (mean_dev <= cfg["linear"]["linear_max_deviation"] |
| and 2 * max_dist >= cfg["linear"]["linear_min_length"]): |
| return "์ ํ" |
| return "๊ตฐ์ง" |
|
|
| |
| |
| |
| @staticmethod |
| def _dominant_zone(df: pd.DataFrame) -> str: |
| """๊ฐ์ฅ ๋น๋ฒํ zone_label.""" |
| if len(df) == 0 or "zone_label" not in df.columns: |
| return "N/A" |
| counter = Counter(df["zone_label"]) |
| return counter.most_common(1)[0][0] |
|
|
| @staticmethod |
| def _zone_centroid( |
| inlier_df: pd.DataFrame, inlier_coords: np.ndarray, zone: str |
| ) -> tuple: |
| """dominant zone์ ์ํ ์ ๋ค์ ํ๊ท . ์์ผ๋ฉด inlier ์ ์ฒด ํ๊ท .""" |
| dom = inlier_df[inlier_df["zone_label"] == zone] if "zone_label" in inlier_df.columns else inlier_df |
| if not dom.empty: |
| return tuple(np.mean(dom[["coor_x", "coor_y"]].values, axis=0)) |
| return tuple(np.mean(inlier_coords, axis=0)) |
|
|
|
|
| |
| |
| |
| def classify_wafer_patterns(df: pd.DataFrame, cfg: dict): |
| """`PatternDetector(cfg).classify(df)`์ ํจ์ํ alias.""" |
| return PatternDetector(cfg).classify(df) |
|
|