# pattern_detection.py import numpy as np import pandas as pd from collections import Counter from sklearn.decomposition import PCA from sklearn.cluster import DBSCAN from sklearn.neighbors import LocalOutlierFactor import hdbscan from scipy.stats import circvar from utils import add_zone_labels def get_dominant_zone(df: pd.DataFrame) -> str: if len(df) == 0 or 'zone_label' not in df.columns: return "N/A" counter = Counter(df['zone_label']) most_common_zone, _ = counter.most_common(1)[0] return most_common_zone def circular_range_deg(angles_deg: np.ndarray) -> float: if len(angles_deg) < 2: return 0.0 angles_sorted = np.sort(np.array(angles_deg) % 360.0) gaps = np.diff(angles_sorted) circular_gap = 360.0 - angles_sorted[-1] + angles_sorted[0] max_gap = max(np.max(gaps), circular_gap) return 360.0 - max_gap def check_sector_coverage(theta_deg: np.ndarray, min_sectors: int = 8) -> bool: if len(theta_deg) == 0: return False sector_indices = ((theta_deg % 360) // 30).astype(int) % 12 unique_sectors = len(np.unique(sector_indices)) return unique_sectors >= min_sectors def fit_circle_least_squares(x: np.ndarray, y: np.ndarray): if len(x) < 3: return None, None, None, np.inf x = x[:, np.newaxis] y = y[:, np.newaxis] A = np.hstack([x, y, np.ones_like(x)]) b = x**2 + y**2 try: solution, residuals, _, _ = np.linalg.lstsq(A, b, rcond=None) a, b, c = solution.flatten() center_x = a / 2 center_y = b / 2 radius = np.sqrt((a**2 + b**2) / 4 + c) fitted_dists = np.sqrt((x - center_x)**2 + (y - center_y)**2) rmse = np.sqrt(np.mean((fitted_dists - radius)**2)) return center_x, center_y, radius, rmse except: return None, None, None, np.inf def filter_main_ring_band(df: pd.DataFrame, r_bin_width: float = 5.0, top_n_bins: int = 1) -> pd.DataFrame: if len(df) == 0 or 'r' not in df.columns: return df.copy() r = df['r'].values r = r[(r >= 0) & (r <= 150)] if len(r) == 0: return pd.DataFrame(columns=df.columns) r_bins = np.arange(0, 150 + r_bin_width, r_bin_width) r_hist, r_edges = np.histogram(r, bins=r_bins) top_bin_indices = np.argsort(r_hist)[::-1][:top_n_bins] mask = np.zeros(len(df), dtype=bool) for bin_idx in top_bin_indices: r_min = r_edges[bin_idx] r_max = r_edges[bin_idx + 1] bin_mask = (df['r'] >= r_min) & (df['r'] < r_max) mask = mask | bin_mask.values return df[mask].copy() def is_ring_pattern_robust(inlier_df: pd.DataFrame, cfg: dict) -> bool: n_total = len(inlier_df) if n_total < cfg['ring']['ring_min_points']: return False main_ring_df = filter_main_ring_band(inlier_df, r_bin_width=cfg['ring']['ring_band_width'], top_n_bins=1) if len(main_ring_df) < cfg['ring']['ring_min_points']: return False r = main_ring_df['r'].values theta_deg = main_ring_df['theta_deg'].values x = main_ring_df['coor_x'].values y = main_ring_df['coor_y'].values if r.max() - r.min() > cfg['ring']['ring_r_absolute_tolerance']: return False if circular_range_deg(theta_deg) < cfg['ring']['ring_min_angular_coverage']: return False if not check_sector_coverage(theta_deg, min_sectors=cfg['ring']['ring_min_sectors']): return False cx, cy, r_fit, rmse = fit_circle_least_squares(x, y) if rmse == np.inf or rmse > cfg['ring']['ring_fit_rmse_max']: return False if np.sqrt(cx**2 + cy**2) > 10.0: return False return True def _is_linear_set(coords: np.ndarray, cfg: dict) -> bool: n = len(coords) if n < 3: return False centroid = np.mean(coords, axis=0) max_dist = np.max(np.linalg.norm(coords - centroid, axis=1)) if 2 * max_dist < cfg['linear']['linear_min_length']: return False pca = PCA(n_components=min(2, n)).fit(coords) if len(pca.explained_variance_) < 2: return False eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9) if np.sqrt(eig_ratio) < cfg['linear']['linear_pca_ratio_min']: return False normal_vec = np.array([-pca.components_[0][1], pca.components_[0][0]]) if np.mean(np.abs(np.dot(coords - pca.mean_, normal_vec))) > cfg['linear']['linear_max_deviation']: return False proj = np.sort(np.dot(coords - pca.mean_, pca.components_[0])) total_len = proj[-1] - proj[0] if total_len > 0 and np.max(np.diff(proj)) / total_len > cfg['linear']['linear_max_gap_ratio']: return False return True def _is_centroids_linear(sub_coords_list: list, cfg: dict) -> bool: if len(sub_coords_list) < 3: return False centroids = np.array([np.mean(sc, axis=0) for sc in sub_coords_list]) max_span = 2 * np.max(np.linalg.norm(centroids - np.mean(centroids, axis=0), axis=1)) if max_span < cfg['linear']['centroid_linear_min_length']: return False pca = PCA(n_components=2).fit(centroids) if len(pca.explained_variance_) < 2: return False if np.sqrt(pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9)) < cfg['linear']['centroid_linear_pca_min']: return False normal = np.array([-pca.components_[0][1], pca.components_[0][0]]) if np.mean(np.abs(np.dot(centroids - pca.mean_, normal))) > cfg['linear']['centroid_linear_dev_max']: return False return True def _classify_subcluster(sub_coords: np.ndarray, cfg: dict) -> str: n = len(sub_coords) if n < 3: return "군집" centroid = np.mean(sub_coords, axis=0) dists_from_centroid = np.linalg.norm(sub_coords - centroid, axis=1) max_dist = np.max(dists_from_centroid) if max_dist <= cfg['cluster']['cluster_compactness_radius']: return "군집" pca = PCA(n_components=min(2, n)).fit(sub_coords) if len(pca.explained_variance_) >= 2: eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9) shape_idx = np.sqrt(eig_ratio) if shape_idx >= cfg['linear']['linear_pca_ratio_min']: normal_vec = np.array([-pca.components_[0][1], pca.components_[0][0]]) mean_dev = np.mean(np.abs(np.dot(sub_coords - pca.mean_, normal_vec))) if mean_dev <= cfg['linear']['linear_max_deviation'] and 2*max_dist >= cfg['linear']['linear_min_length']: return "선형" return "군집" def classify_wafer_patterns(df: pd.DataFrame, cfg: dict) -> tuple: if df.empty: return df, "데이터 없음", ["None"], None df = df.copy().reset_index(drop=True) df = add_zone_labels(df, inner_radius=cfg['preprocessing']['inner_radius_mm']) coords = df[["coor_x", "coor_y"]].values n_total = len(df) if n_total < cfg['misc']['min_points_for_clustering']: return df.assign(inlier=np.zeros(len(df), dtype=bool)), "데이터 없음", ["정상/미달"], None clusterer = hdbscan.HDBSCAN( min_cluster_size=cfg['clustering']['min_cluster_size'], min_samples=cfg['clustering']['min_samples'], cluster_selection_method=cfg['clustering']['cluster_selection_method'], metric="euclidean", gen_min_span_tree=True ) labels = clusterer.fit_predict(coords) if np.all(labels == -1): labels = DBSCAN(eps=cfg['clustering']['dbscan_eps'], min_samples=cfg['clustering']['min_cluster_size']).fit(coords).labels_ inlier_mask = (labels != -1) if not any(inlier_mask): return df.assign(inlier=inlier_mask), "데이터 없음", ["Others"], None inlier_df_pre = df[inlier_mask].copy() inlier_coords = coords[inlier_mask] n_inlier = len(inlier_coords) if n_inlier >= cfg['lof']['lof_min_points']: n_neighbors_lof = min(cfg['lof']['lof_n_neighbors'], n_inlier - 1) if n_neighbors_lof >= 2: lof = LocalOutlierFactor( n_neighbors=n_neighbors_lof, contamination=cfg['lof']['lof_contamination'], metric="euclidean" ) lof_labels = lof.fit_predict(inlier_coords) full_lof_mask = np.zeros(len(df), dtype=bool) full_lof_mask[inlier_mask] = (lof_labels == 1) inlier_mask = inlier_mask & full_lof_mask inlier_df = df[inlier_mask].copy() inlier_coords = coords[inlier_mask] n_inlier = len(inlier_df) if n_inlier < cfg['clustering']['min_cluster_size']: return df.assign(inlier=inlier_mask), "데이터 없음", ["Others"], None if is_ring_pattern_robust(inlier_df, cfg): dominant_zone = get_dominant_zone(inlier_df) centroid = tuple(np.mean(inlier_df[['coor_x', 'coor_y']].values, axis=0)) return df.assign(inlier=inlier_mask), dominant_zone, ["환형"], centroid if _is_linear_set(inlier_coords, cfg): dominant_zone = get_dominant_zone(inlier_df) dom_points = inlier_df[inlier_df['zone_label'] == dominant_zone] centroid = tuple(np.mean(dom_points[['coor_x', 'coor_y']].values, axis=0)) if not dom_points.empty else tuple(np.mean(inlier_coords, axis=0)) return df.assign(inlier=inlier_mask), dominant_zone, ["선형"], centroid dominant_zone = get_dominant_zone(inlier_df) dom_points = inlier_df[inlier_df['zone_label'] == dominant_zone] centroid = tuple(np.mean(dom_points[['coor_x', 'coor_y']].values, axis=0)) if not dom_points.empty else tuple(np.mean(inlier_coords, axis=0)) if n_inlier >= 2: dbscan_sub = DBSCAN(eps=cfg['clustering']['cluster_dbscan_eps'], min_samples=cfg['clustering']['min_cluster_size']).fit(inlier_coords) sub_labels = dbscan_sub.labels_ n_sub_clusters = len(set(sub_labels)) - (1 if -1 in sub_labels else 0) if n_sub_clusters >= 2: sub_coords_list = [inlier_coords[sub_labels == lbl] for lbl in set(sub_labels) if lbl != -1] if _is_centroids_linear(sub_coords_list, cfg): return df.assign(inlier=inlier_mask), dominant_zone, ["선형"], centroid sub_results = [(_classify_subcluster(sc, cfg), len(sc)) for sc in sub_coords_list] pat_totals = {} for pat, cnt in sub_results: pat_totals[pat] = pat_totals.get(pat, 0) + cnt dominant_pattern = max(pat_totals, key=pat_totals.get) return df.assign(inlier=inlier_mask), dominant_zone, [dominant_pattern], centroid pattern = _classify_subcluster(inlier_coords, cfg) return df.assign(inlier=inlier_mask), dominant_zone, [pattern], centroid return df.assign(inlier=inlier_mask), dominant_zone, ["Others"], None