# pattern_detection.py """ LLS 결함 패턴 자동 분류 모듈. 웨이퍼 한 장(또는 한 그룹) 위의 결함 좌표 집합을 입력으로 받아 환형(Ring) / 선형(Linear) / 군집(Cluster) / Others 중 하나로 분류한다. 분류 파이프라인 ---------------- 1. HDBSCAN으로 1차 클러스터링 → outlier(-1) 제거 └ 실패 시 DBSCAN fallback 2. LOF로 2차 outlier 제거 (지역 밀도 기반) 3. inlier 집합에 대해 패턴 후보 평가 (우선순위 순) (a) 환형 검출 : 원 피팅 RMSE + 각도 커버리지 + 시계 sector 커버리지 + PCA 선형성 거부(원점 통과 선형 false-positive 방지) (b) 선형 검출 : PCA eigenvalue ratio + 직선 편차 + gap ratio (c) 군집 검출 : DBSCAN sub-cluster → compactness/PCA로 군집/선형 재판정 4. dominant_zone 계산 (시각화용) 5. centroid 좌표 산출 - 환형: inlier 전체 평균 - 선형/군집: dominant_zone 점들의 평균 (없으면 inlier 평균) API --- - `PatternDetector(config).classify(df)` (권장) - `classify_wafer_patterns(df, config)` (구버전 호환) 두 API 모두 `(result_df, dominant_zone, pattern_list, centroid)` 튜플 반환. """ from __future__ import annotations from collections import Counter from typing import Tuple, List, Optional import numpy as np import pandas as pd import hdbscan from sklearn.decomposition import PCA from sklearn.cluster import DBSCAN from sklearn.neighbors import LocalOutlierFactor from utils import WaferUtils # ====================================================================== # PatternDetector # ====================================================================== class PatternDetector: """ config를 주입받아 LLS 결함 패턴을 분류하는 검출기. 상태로 `self.cfg` 한 가지만 보유하므로 thread-safe하며, 동일 인스턴스를 여러 wafer 그룹에 반복 사용해도 무방하다. Parameters ---------- config : dict `lls_config.json` 구조의 dict. 필요한 키 (서브트리): - preprocessing.inner_radius_mm - clustering.{min_cluster_size, min_samples, cluster_selection_method, dbscan_eps, cluster_dbscan_eps} - lof.{lof_min_points, lof_n_neighbors, lof_contamination} - ring.{ring_min_points, ring_band_width, ring_r_absolute_tolerance, ring_min_angular_coverage, ring_min_sectors, ring_fit_rmse_max, (선택) ring_pca_ratio_max} - linear.{linear_pca_ratio_min, linear_max_deviation, linear_min_length, linear_max_gap_ratio, centroid_linear_min_length, centroid_linear_pca_min, centroid_linear_dev_max} - cluster.cluster_compactness_radius - misc.min_points_for_clustering """ def __init__(self, config: dict): self.cfg = config # ================================================================== # 공개 API # ================================================================== def classify( self, df: pd.DataFrame ) -> Tuple[pd.DataFrame, str, List[str], Optional[tuple]]: """ 결함 DataFrame을 받아 패턴을 분류. Parameters ---------- df : pd.DataFrame 'coor_x', 'coor_y' 컬럼을 반드시 포함. inner_radius 기반 zone 라벨은 내부에서 자동으로 부여한다. Returns ------- result_df : pd.DataFrame 원본 df + 'inlier' (bool) + 'zone_label'/'r'/'theta_deg' 컬럼. dominant_zone : str inlier 중 가장 많이 나타난 zone_label. inlier가 비면 "데이터 없음"/"N/A". pattern_list : list[str] ["환형"] / ["선형"] / ["군집"] / ["Others"] / ["정상/미달"]. centroid : tuple[float, float] | None 패턴 발생 중심 좌표. 분류 실패 시 None. """ cfg = self.cfg if df.empty: return df, "데이터 없음", ["None"], None # Zone 라벨링 + 좌표 평탄화 df = df.copy().reset_index(drop=True) df = WaferUtils.add_zone_labels(df, inner_radius=cfg["preprocessing"]["inner_radius_mm"]) coords = df[["coor_x", "coor_y"]].values n_total = len(df) if n_total < cfg["misc"]["min_points_for_clustering"]: return (df.assign(inlier=np.zeros(len(df), dtype=bool)), "데이터 없음", ["정상/미달"], None) # --- 1차 클러스터링 (HDBSCAN → DBSCAN fallback) --- labels = self._cluster_hdbscan(coords) if np.all(labels == -1): labels = self._cluster_dbscan_fallback(coords) inlier_mask = labels != -1 if not any(inlier_mask): return df.assign(inlier=inlier_mask), "데이터 없음", ["Others"], None # --- 2차 outlier 제거 (LOF) --- inlier_mask = self._apply_lof(coords, inlier_mask) inlier_df = df[inlier_mask].copy() inlier_coords = coords[inlier_mask] n_inlier = len(inlier_df) if n_inlier < cfg["clustering"]["min_cluster_size"]: return df.assign(inlier=inlier_mask), "데이터 없음", ["Others"], None # --- 패턴 판정: 환형 → 선형 → 군집(서브분류) --- if self._is_ring(inlier_df): zone = self._dominant_zone(inlier_df) centroid = tuple(np.mean(inlier_df[["coor_x", "coor_y"]].values, axis=0)) return df.assign(inlier=inlier_mask), zone, ["환형"], centroid if self._is_linear_set(inlier_coords): zone = self._dominant_zone(inlier_df) centroid = self._zone_centroid(inlier_df, inlier_coords, zone) return df.assign(inlier=inlier_mask), zone, ["선형"], centroid # 군집 후보: 서브클러스터 검사 zone = self._dominant_zone(inlier_df) centroid = self._zone_centroid(inlier_df, inlier_coords, zone) pattern = self._classify_cluster_or_sub_linear(inlier_coords) return df.assign(inlier=inlier_mask), zone, [pattern], centroid # ================================================================== # 1차 클러스터링 # ================================================================== def _cluster_hdbscan(self, coords: np.ndarray) -> np.ndarray: """HDBSCAN으로 클러스터 라벨 산출. outlier는 -1.""" c = self.cfg["clustering"] clusterer = hdbscan.HDBSCAN( min_cluster_size=c["min_cluster_size"], min_samples=c["min_samples"], cluster_selection_method=c["cluster_selection_method"], metric="euclidean", gen_min_span_tree=True, ) return clusterer.fit_predict(coords) def _cluster_dbscan_fallback(self, coords: np.ndarray) -> np.ndarray: """HDBSCAN 실패 시 DBSCAN fallback.""" c = self.cfg["clustering"] return DBSCAN(eps=c["dbscan_eps"], min_samples=c["min_cluster_size"]).fit(coords).labels_ # ================================================================== # 2차 outlier 제거 (LOF) # ================================================================== def _apply_lof(self, coords: np.ndarray, inlier_mask: np.ndarray) -> np.ndarray: """LOF로 1차 inlier에서 추가 outlier 제거.""" lof_cfg = self.cfg["lof"] inlier_coords = coords[inlier_mask] n_inlier = len(inlier_coords) if n_inlier < lof_cfg["lof_min_points"]: return inlier_mask n_neighbors = min(lof_cfg["lof_n_neighbors"], n_inlier - 1) if n_neighbors < 2: return inlier_mask lof = LocalOutlierFactor( n_neighbors=n_neighbors, contamination=lof_cfg["lof_contamination"], metric="euclidean", ) lof_labels = lof.fit_predict(inlier_coords) # inlier_mask와 동일 길이의 mask로 확장 full_mask = np.zeros(len(coords), dtype=bool) full_mask[inlier_mask] = lof_labels == 1 return inlier_mask & full_mask # ================================================================== # 환형 검출 # ================================================================== def _is_ring(self, inlier_df: pd.DataFrame) -> bool: """ 환형(ring) 판정. 단계 ---- 1. 최소 포인트 수 2. PCA 선형성 거부: 전체 inlier가 강한 선형성을 보이면 ring 아님 (원점 통과 선형 false-positive 방지) 3. r-히스토그램 top bin만 추출 (main ring band) 4. band 내 점 수 / r 폭 / 각도 커버리지 / sector 커버리지 5. 원 피팅 RMSE / 중심점 원점 근접도 """ cfg = self.cfg n_total = len(inlier_df) if n_total < cfg["ring"]["ring_min_points"]: return False # 선형성 거부 (Ring pre-check) coords = inlier_df[["coor_x", "coor_y"]].values if len(coords) >= 3: pca_all = PCA(n_components=2).fit(coords) if len(pca_all.explained_variance_) >= 2: eig_ratio = pca_all.explained_variance_[0] / (pca_all.explained_variance_[1] + 1e-9) ring_pca_max = cfg["ring"].get("ring_pca_ratio_max", cfg["linear"]["linear_pca_ratio_min"]) if np.sqrt(eig_ratio) >= ring_pca_max: return False # Main ring band (top r-bin) main_ring_df = self._filter_main_ring_band(inlier_df, r_bin_width=cfg["ring"]["ring_band_width"], top_n_bins=1) if len(main_ring_df) < cfg["ring"]["ring_min_points"]: return False r = main_ring_df["r"].values theta_deg = main_ring_df["theta_deg"].values x = main_ring_df["coor_x"].values y = main_ring_df["coor_y"].values if r.max() - r.min() > cfg["ring"]["ring_r_absolute_tolerance"]: return False if self._circular_range_deg(theta_deg) < cfg["ring"]["ring_min_angular_coverage"]: return False if not self._check_sector_coverage(theta_deg, min_sectors=cfg["ring"]["ring_min_sectors"]): return False cx, cy, _, rmse = self._fit_circle_least_squares(x, y) if rmse == np.inf or rmse > cfg["ring"]["ring_fit_rmse_max"]: return False # 중심이 원점에서 너무 멀면 wafer ring으로 보지 않음 (10mm 한계) if np.sqrt(cx ** 2 + cy ** 2) > 10.0: return False return True @staticmethod def _filter_main_ring_band( df: pd.DataFrame, r_bin_width: float = 5.0, top_n_bins: int = 1 ) -> pd.DataFrame: """r-축 히스토그램에서 점이 가장 많은 bin(들)에 속하는 점만 추출.""" if len(df) == 0 or "r" not in df.columns: return df.copy() r = df["r"].values r = r[(r >= 0) & (r <= 150)] if len(r) == 0: return pd.DataFrame(columns=df.columns) r_bins = np.arange(0, 150 + r_bin_width, r_bin_width) r_hist, r_edges = np.histogram(df["r"].values, bins=r_bins) top_idx = np.argsort(r_hist)[::-1][:top_n_bins] mask = np.zeros(len(df), dtype=bool) for bi in top_idx: r_min, r_max = r_edges[bi], r_edges[bi + 1] mask |= ((df["r"] >= r_min) & (df["r"] < r_max)).values return df[mask].copy() @staticmethod def _circular_range_deg(angles_deg: np.ndarray) -> float: """원형 각도 분포의 커버리지 (도, 360° 중).""" if len(angles_deg) < 2: return 0.0 a = np.sort(np.array(angles_deg) % 360.0) gaps = np.diff(a) circ_gap = 360.0 - a[-1] + a[0] return 360.0 - max(np.max(gaps), circ_gap) @staticmethod def _check_sector_coverage(theta_deg: np.ndarray, min_sectors: int = 8) -> bool: """30° 간격 12 sector 중 min_sectors 이상 커버하는지.""" if len(theta_deg) == 0: return False sectors = ((theta_deg % 360) // 30).astype(int) % 12 return len(np.unique(sectors)) >= min_sectors @staticmethod def _fit_circle_least_squares( x: np.ndarray, y: np.ndarray ) -> Tuple[Optional[float], Optional[float], Optional[float], float]: """ 대수적 최소제곱 원 피팅. Returns ------- (cx, cy, radius, rmse) — 실패 시 (None, None, None, inf) """ if len(x) < 3: return None, None, None, np.inf x = x[:, np.newaxis] y = y[:, np.newaxis] A = np.hstack([x, y, np.ones_like(x)]) b = x ** 2 + y ** 2 try: sol, *_ = np.linalg.lstsq(A, b, rcond=None) a, bb, c = sol.flatten() cx, cy = a / 2, bb / 2 radius = np.sqrt((a ** 2 + bb ** 2) / 4 + c) fitted = np.sqrt((x - cx) ** 2 + (y - cy) ** 2) rmse = np.sqrt(np.mean((fitted - radius) ** 2)) return cx, cy, radius, rmse except Exception: return None, None, None, np.inf # ================================================================== # 선형 검출 # ================================================================== def _is_linear_set(self, coords: np.ndarray) -> bool: """전체 inlier 집합이 직선에 충분히 가까운지.""" cfg = self.cfg["linear"] n = len(coords) if n < 3: return False centroid = np.mean(coords, axis=0) max_dist = np.max(np.linalg.norm(coords - centroid, axis=1)) # 길이 조건 (반지름의 2배 = 최대 길이) if 2 * max_dist < cfg["linear_min_length"]: return False pca = PCA(n_components=min(2, n)).fit(coords) if len(pca.explained_variance_) < 2: return False eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9) if np.sqrt(eig_ratio) < cfg["linear_pca_ratio_min"]: return False # 주축 직각방향 평균 편차 normal = np.array([-pca.components_[0][1], pca.components_[0][0]]) if np.mean(np.abs(np.dot(coords - pca.mean_, normal))) > cfg["linear_max_deviation"]: return False # 주축 투영 후 gap ratio (선이 끊겨있지 않은지) proj = np.sort(np.dot(coords - pca.mean_, pca.components_[0])) total_len = proj[-1] - proj[0] if total_len > 0 and np.max(np.diff(proj)) / total_len > cfg["linear_max_gap_ratio"]: return False return True def _is_centroids_linear(self, sub_coords_list: list) -> bool: """여러 서브클러스터의 중심점들이 일직선 위에 있는지.""" cfg = self.cfg["linear"] if len(sub_coords_list) < 3: return False centroids = np.array([np.mean(sc, axis=0) for sc in sub_coords_list]) max_span = 2 * np.max(np.linalg.norm(centroids - np.mean(centroids, axis=0), axis=1)) if max_span < cfg["centroid_linear_min_length"]: return False pca = PCA(n_components=2).fit(centroids) if len(pca.explained_variance_) < 2: return False if np.sqrt(pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9)) < cfg["centroid_linear_pca_min"]: return False normal = np.array([-pca.components_[0][1], pca.components_[0][0]]) if np.mean(np.abs(np.dot(centroids - pca.mean_, normal))) > cfg["centroid_linear_dev_max"]: return False return True # ================================================================== # 군집 / 서브 분류 # ================================================================== def _classify_cluster_or_sub_linear(self, inlier_coords: np.ndarray) -> str: """ ring/linear 둘 다 아닐 때 호출: 서브 DBSCAN으로 분할 후 패턴 재판정. - 서브클러스터 ≥2개이고 중심점들이 일직선 → 선형 - 그 외: 각 서브를 군집/선형으로 라벨링 후 누적 다수결 """ cfg = self.cfg if len(inlier_coords) < 2: return "군집" sub = DBSCAN(eps=cfg["clustering"]["cluster_dbscan_eps"], min_samples=cfg["clustering"]["min_cluster_size"]).fit(inlier_coords) sub_labels = sub.labels_ n_sub = len(set(sub_labels)) - (1 if -1 in sub_labels else 0) if n_sub >= 2: sub_list = [inlier_coords[sub_labels == lbl] for lbl in set(sub_labels) if lbl != -1] if self._is_centroids_linear(sub_list): return "선형" results = [(self._classify_subcluster(sc), len(sc)) for sc in sub_list] totals = {} for pat, cnt in results: totals[pat] = totals.get(pat, 0) + cnt return max(totals, key=totals.get) return self._classify_subcluster(inlier_coords) def _classify_subcluster(self, sub_coords: np.ndarray) -> str: """단일 서브클러스터를 '군집' 또는 '선형'으로 라벨링.""" cfg = self.cfg n = len(sub_coords) if n < 3: return "군집" centroid = np.mean(sub_coords, axis=0) max_dist = np.max(np.linalg.norm(sub_coords - centroid, axis=1)) # compact한 군집 if max_dist <= cfg["cluster"]["cluster_compactness_radius"]: return "군집" pca = PCA(n_components=min(2, n)).fit(sub_coords) if len(pca.explained_variance_) >= 2: eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9) shape_idx = np.sqrt(eig_ratio) if shape_idx >= cfg["linear"]["linear_pca_ratio_min"]: normal = np.array([-pca.components_[0][1], pca.components_[0][0]]) mean_dev = np.mean(np.abs(np.dot(sub_coords - pca.mean_, normal))) if (mean_dev <= cfg["linear"]["linear_max_deviation"] and 2 * max_dist >= cfg["linear"]["linear_min_length"]): return "선형" return "군집" # ================================================================== # Zone / Centroid 유틸 # ================================================================== @staticmethod def _dominant_zone(df: pd.DataFrame) -> str: """가장 빈번한 zone_label.""" if len(df) == 0 or "zone_label" not in df.columns: return "N/A" counter = Counter(df["zone_label"]) return counter.most_common(1)[0][0] @staticmethod def _zone_centroid( inlier_df: pd.DataFrame, inlier_coords: np.ndarray, zone: str ) -> tuple: """dominant zone에 속한 점들의 평균. 없으면 inlier 전체 평균.""" dom = inlier_df[inlier_df["zone_label"] == zone] if "zone_label" in inlier_df.columns else inlier_df if not dom.empty: return tuple(np.mean(dom[["coor_x", "coor_y"]].values, axis=0)) return tuple(np.mean(inlier_coords, axis=0)) # ====================================================================== # Backward-compat: 기존 함수 API 유지 # ====================================================================== def classify_wafer_patterns(df: pd.DataFrame, cfg: dict): """`PatternDetector(cfg).classify(df)`의 함수형 alias.""" return PatternDetector(cfg).classify(df)