ELLS / nu /pattern_detection.py
Hyungseoky's picture
Upload 10 files
4efdf15 verified
# pattern_detection.py
"""
LLS ๊ฒฐํ•จ ํŒจํ„ด ์ž๋™ ๋ถ„๋ฅ˜ ๋ชจ๋“ˆ.
์›จ์ดํผ ํ•œ ์žฅ(๋˜๋Š” ํ•œ ๊ทธ๋ฃน) ์œ„์˜ ๊ฒฐํ•จ ์ขŒํ‘œ ์ง‘ํ•ฉ์„ ์ž…๋ ฅ์œผ๋กœ ๋ฐ›์•„
ํ™˜ํ˜•(Ring) / ์„ ํ˜•(Linear) / ๊ตฐ์ง‘(Cluster) / Others ์ค‘ ํ•˜๋‚˜๋กœ ๋ถ„๋ฅ˜ํ•œ๋‹ค.
๋ถ„๋ฅ˜ ํŒŒ์ดํ”„๋ผ์ธ
----------------
1. HDBSCAN์œผ๋กœ 1์ฐจ ํด๋Ÿฌ์Šคํ„ฐ๋ง โ†’ outlier(-1) ์ œ๊ฑฐ
โ”” ์‹คํŒจ ์‹œ DBSCAN fallback
2. LOF๋กœ 2์ฐจ outlier ์ œ๊ฑฐ (์ง€์—ญ ๋ฐ€๋„ ๊ธฐ๋ฐ˜)
3. inlier ์ง‘ํ•ฉ์— ๋Œ€ํ•ด ํŒจํ„ด ํ›„๋ณด ํ‰๊ฐ€ (์šฐ์„ ์ˆœ์œ„ ์ˆœ)
(a) ํ™˜ํ˜• ๊ฒ€์ถœ : ์› ํ”ผํŒ… RMSE + ๊ฐ๋„ ์ปค๋ฒ„๋ฆฌ์ง€ + ์‹œ๊ณ„ sector ์ปค๋ฒ„๋ฆฌ์ง€
+ PCA ์„ ํ˜•์„ฑ ๊ฑฐ๋ถ€(์›์  ํ†ต๊ณผ ์„ ํ˜• false-positive ๋ฐฉ์ง€)
(b) ์„ ํ˜• ๊ฒ€์ถœ : PCA eigenvalue ratio + ์ง์„  ํŽธ์ฐจ + gap ratio
(c) ๊ตฐ์ง‘ ๊ฒ€์ถœ : DBSCAN sub-cluster โ†’ compactness/PCA๋กœ ๊ตฐ์ง‘/์„ ํ˜• ์žฌํŒ์ •
4. dominant_zone ๊ณ„์‚ฐ (์‹œ๊ฐํ™”์šฉ)
5. centroid ์ขŒํ‘œ ์‚ฐ์ถœ
- ํ™˜ํ˜•: inlier ์ „์ฒด ํ‰๊ท 
- ์„ ํ˜•/๊ตฐ์ง‘: dominant_zone ์ ๋“ค์˜ ํ‰๊ท  (์—†์œผ๋ฉด inlier ํ‰๊ท )
API
---
- `PatternDetector(config).classify(df)` (๊ถŒ์žฅ)
- `classify_wafer_patterns(df, config)` (๊ตฌ๋ฒ„์ „ ํ˜ธํ™˜)
๋‘ API ๋ชจ๋‘ `(result_df, dominant_zone, pattern_list, centroid)` ํŠœํ”Œ ๋ฐ˜ํ™˜.
"""
from __future__ import annotations
from collections import Counter
from typing import Tuple, List, Optional
import numpy as np
import pandas as pd
import hdbscan
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
from sklearn.neighbors import LocalOutlierFactor
from utils import WaferUtils
# ======================================================================
# PatternDetector
# ======================================================================
class PatternDetector:
"""
config๋ฅผ ์ฃผ์ž…๋ฐ›์•„ LLS ๊ฒฐํ•จ ํŒจํ„ด์„ ๋ถ„๋ฅ˜ํ•˜๋Š” ๊ฒ€์ถœ๊ธฐ.
์ƒํƒœ๋กœ `self.cfg` ํ•œ ๊ฐ€์ง€๋งŒ ๋ณด์œ ํ•˜๋ฏ€๋กœ thread-safeํ•˜๋ฉฐ,
๋™์ผ ์ธ์Šคํ„ด์Šค๋ฅผ ์—ฌ๋Ÿฌ wafer ๊ทธ๋ฃน์— ๋ฐ˜๋ณต ์‚ฌ์šฉํ•ด๋„ ๋ฌด๋ฐฉํ•˜๋‹ค.
Parameters
----------
config : dict
`lls_config.json` ๊ตฌ์กฐ์˜ dict.
ํ•„์š”ํ•œ ํ‚ค (์„œ๋ธŒํŠธ๋ฆฌ):
- preprocessing.inner_radius_mm
- clustering.{min_cluster_size, min_samples, cluster_selection_method,
dbscan_eps, cluster_dbscan_eps}
- lof.{lof_min_points, lof_n_neighbors, lof_contamination}
- ring.{ring_min_points, ring_band_width, ring_r_absolute_tolerance,
ring_min_angular_coverage, ring_min_sectors, ring_fit_rmse_max,
(์„ ํƒ) ring_pca_ratio_max}
- linear.{linear_pca_ratio_min, linear_max_deviation, linear_min_length,
linear_max_gap_ratio,
centroid_linear_min_length, centroid_linear_pca_min,
centroid_linear_dev_max}
- cluster.cluster_compactness_radius
- misc.min_points_for_clustering
"""
def __init__(self, config: dict):
self.cfg = config
# ==================================================================
# ๊ณต๊ฐœ API
# ==================================================================
def classify(
self, df: pd.DataFrame
) -> Tuple[pd.DataFrame, str, List[str], Optional[tuple]]:
"""
๊ฒฐํ•จ DataFrame์„ ๋ฐ›์•„ ํŒจํ„ด์„ ๋ถ„๋ฅ˜.
Parameters
----------
df : pd.DataFrame
'coor_x', 'coor_y' ์ปฌ๋Ÿผ์„ ๋ฐ˜๋“œ์‹œ ํฌํ•จ. inner_radius ๊ธฐ๋ฐ˜ zone ๋ผ๋ฒจ์€
๋‚ด๋ถ€์—์„œ ์ž๋™์œผ๋กœ ๋ถ€์—ฌํ•œ๋‹ค.
Returns
-------
result_df : pd.DataFrame
์›๋ณธ df + 'inlier' (bool) + 'zone_label'/'r'/'theta_deg' ์ปฌ๋Ÿผ.
dominant_zone : str
inlier ์ค‘ ๊ฐ€์žฅ ๋งŽ์ด ๋‚˜ํƒ€๋‚œ zone_label. inlier๊ฐ€ ๋น„๋ฉด "๋ฐ์ดํ„ฐ ์—†์Œ"/"N/A".
pattern_list : list[str]
["ํ™˜ํ˜•"] / ["์„ ํ˜•"] / ["๊ตฐ์ง‘"] / ["Others"] / ["์ •์ƒ/๋ฏธ๋‹ฌ"].
centroid : tuple[float, float] | None
ํŒจํ„ด ๋ฐœ์ƒ ์ค‘์‹ฌ ์ขŒํ‘œ. ๋ถ„๋ฅ˜ ์‹คํŒจ ์‹œ None.
"""
cfg = self.cfg
if df.empty:
return df, "๋ฐ์ดํ„ฐ ์—†์Œ", ["None"], None
# Zone ๋ผ๋ฒจ๋ง + ์ขŒํ‘œ ํ‰ํƒ„ํ™”
df = df.copy().reset_index(drop=True)
df = WaferUtils.add_zone_labels(df, inner_radius=cfg["preprocessing"]["inner_radius_mm"])
coords = df[["coor_x", "coor_y"]].values
n_total = len(df)
if n_total < cfg["misc"]["min_points_for_clustering"]:
return (df.assign(inlier=np.zeros(len(df), dtype=bool)),
"๋ฐ์ดํ„ฐ ์—†์Œ", ["์ •์ƒ/๋ฏธ๋‹ฌ"], None)
# --- 1์ฐจ ํด๋Ÿฌ์Šคํ„ฐ๋ง (HDBSCAN โ†’ DBSCAN fallback) ---
labels = self._cluster_hdbscan(coords)
if np.all(labels == -1):
labels = self._cluster_dbscan_fallback(coords)
inlier_mask = labels != -1
if not any(inlier_mask):
return df.assign(inlier=inlier_mask), "๋ฐ์ดํ„ฐ ์—†์Œ", ["Others"], None
# --- 2์ฐจ outlier ์ œ๊ฑฐ (LOF) ---
inlier_mask = self._apply_lof(coords, inlier_mask)
inlier_df = df[inlier_mask].copy()
inlier_coords = coords[inlier_mask]
n_inlier = len(inlier_df)
if n_inlier < cfg["clustering"]["min_cluster_size"]:
return df.assign(inlier=inlier_mask), "๋ฐ์ดํ„ฐ ์—†์Œ", ["Others"], None
# --- ํŒจํ„ด ํŒ์ •: ํ™˜ํ˜• โ†’ ์„ ํ˜• โ†’ ๊ตฐ์ง‘(์„œ๋ธŒ๋ถ„๋ฅ˜) ---
if self._is_ring(inlier_df):
zone = self._dominant_zone(inlier_df)
centroid = tuple(np.mean(inlier_df[["coor_x", "coor_y"]].values, axis=0))
return df.assign(inlier=inlier_mask), zone, ["ํ™˜ํ˜•"], centroid
if self._is_linear_set(inlier_coords):
zone = self._dominant_zone(inlier_df)
centroid = self._zone_centroid(inlier_df, inlier_coords, zone)
return df.assign(inlier=inlier_mask), zone, ["์„ ํ˜•"], centroid
# ๊ตฐ์ง‘ ํ›„๋ณด: ์„œ๋ธŒํด๋Ÿฌ์Šคํ„ฐ ๊ฒ€์‚ฌ
zone = self._dominant_zone(inlier_df)
centroid = self._zone_centroid(inlier_df, inlier_coords, zone)
pattern = self._classify_cluster_or_sub_linear(inlier_coords)
return df.assign(inlier=inlier_mask), zone, [pattern], centroid
# ==================================================================
# 1์ฐจ ํด๋Ÿฌ์Šคํ„ฐ๋ง
# ==================================================================
def _cluster_hdbscan(self, coords: np.ndarray) -> np.ndarray:
"""HDBSCAN์œผ๋กœ ํด๋Ÿฌ์Šคํ„ฐ ๋ผ๋ฒจ ์‚ฐ์ถœ. outlier๋Š” -1."""
c = self.cfg["clustering"]
clusterer = hdbscan.HDBSCAN(
min_cluster_size=c["min_cluster_size"],
min_samples=c["min_samples"],
cluster_selection_method=c["cluster_selection_method"],
metric="euclidean",
gen_min_span_tree=True,
)
return clusterer.fit_predict(coords)
def _cluster_dbscan_fallback(self, coords: np.ndarray) -> np.ndarray:
"""HDBSCAN ์‹คํŒจ ์‹œ DBSCAN fallback."""
c = self.cfg["clustering"]
return DBSCAN(eps=c["dbscan_eps"], min_samples=c["min_cluster_size"]).fit(coords).labels_
# ==================================================================
# 2์ฐจ outlier ์ œ๊ฑฐ (LOF)
# ==================================================================
def _apply_lof(self, coords: np.ndarray, inlier_mask: np.ndarray) -> np.ndarray:
"""LOF๋กœ 1์ฐจ inlier์—์„œ ์ถ”๊ฐ€ outlier ์ œ๊ฑฐ."""
lof_cfg = self.cfg["lof"]
inlier_coords = coords[inlier_mask]
n_inlier = len(inlier_coords)
if n_inlier < lof_cfg["lof_min_points"]:
return inlier_mask
n_neighbors = min(lof_cfg["lof_n_neighbors"], n_inlier - 1)
if n_neighbors < 2:
return inlier_mask
lof = LocalOutlierFactor(
n_neighbors=n_neighbors,
contamination=lof_cfg["lof_contamination"],
metric="euclidean",
)
lof_labels = lof.fit_predict(inlier_coords)
# inlier_mask์™€ ๋™์ผ ๊ธธ์ด์˜ mask๋กœ ํ™•์žฅ
full_mask = np.zeros(len(coords), dtype=bool)
full_mask[inlier_mask] = lof_labels == 1
return inlier_mask & full_mask
# ==================================================================
# ํ™˜ํ˜• ๊ฒ€์ถœ
# ==================================================================
def _is_ring(self, inlier_df: pd.DataFrame) -> bool:
"""
ํ™˜ํ˜•(ring) ํŒ์ •.
๋‹จ๊ณ„
----
1. ์ตœ์†Œ ํฌ์ธํŠธ ์ˆ˜
2. PCA ์„ ํ˜•์„ฑ ๊ฑฐ๋ถ€: ์ „์ฒด inlier๊ฐ€ ๊ฐ•ํ•œ ์„ ํ˜•์„ฑ์„ ๋ณด์ด๋ฉด ring ์•„๋‹˜
(์›์  ํ†ต๊ณผ ์„ ํ˜• false-positive ๋ฐฉ์ง€)
3. r-ํžˆ์Šคํ† ๊ทธ๋žจ top bin๋งŒ ์ถ”์ถœ (main ring band)
4. band ๋‚ด ์  ์ˆ˜ / r ํญ / ๊ฐ๋„ ์ปค๋ฒ„๋ฆฌ์ง€ / sector ์ปค๋ฒ„๋ฆฌ์ง€
5. ์› ํ”ผํŒ… RMSE / ์ค‘์‹ฌ์  ์›์  ๊ทผ์ ‘๋„
"""
cfg = self.cfg
n_total = len(inlier_df)
if n_total < cfg["ring"]["ring_min_points"]:
return False
# ์„ ํ˜•์„ฑ ๊ฑฐ๋ถ€ (Ring pre-check)
coords = inlier_df[["coor_x", "coor_y"]].values
if len(coords) >= 3:
pca_all = PCA(n_components=2).fit(coords)
if len(pca_all.explained_variance_) >= 2:
eig_ratio = pca_all.explained_variance_[0] / (pca_all.explained_variance_[1] + 1e-9)
ring_pca_max = cfg["ring"].get("ring_pca_ratio_max",
cfg["linear"]["linear_pca_ratio_min"])
if np.sqrt(eig_ratio) >= ring_pca_max:
return False
# Main ring band (top r-bin)
main_ring_df = self._filter_main_ring_band(inlier_df,
r_bin_width=cfg["ring"]["ring_band_width"],
top_n_bins=1)
if len(main_ring_df) < cfg["ring"]["ring_min_points"]:
return False
r = main_ring_df["r"].values
theta_deg = main_ring_df["theta_deg"].values
x = main_ring_df["coor_x"].values
y = main_ring_df["coor_y"].values
if r.max() - r.min() > cfg["ring"]["ring_r_absolute_tolerance"]: return False
if self._circular_range_deg(theta_deg) < cfg["ring"]["ring_min_angular_coverage"]: return False
if not self._check_sector_coverage(theta_deg, min_sectors=cfg["ring"]["ring_min_sectors"]):
return False
cx, cy, _, rmse = self._fit_circle_least_squares(x, y)
if rmse == np.inf or rmse > cfg["ring"]["ring_fit_rmse_max"]: return False
# ์ค‘์‹ฌ์ด ์›์ ์—์„œ ๋„ˆ๋ฌด ๋ฉ€๋ฉด wafer ring์œผ๋กœ ๋ณด์ง€ ์•Š์Œ (10mm ํ•œ๊ณ„)
if np.sqrt(cx ** 2 + cy ** 2) > 10.0: return False
return True
@staticmethod
def _filter_main_ring_band(
df: pd.DataFrame, r_bin_width: float = 5.0, top_n_bins: int = 1
) -> pd.DataFrame:
"""r-์ถ• ํžˆ์Šคํ† ๊ทธ๋žจ์—์„œ ์ ์ด ๊ฐ€์žฅ ๋งŽ์€ bin(๋“ค)์— ์†ํ•˜๋Š” ์ ๋งŒ ์ถ”์ถœ."""
if len(df) == 0 or "r" not in df.columns:
return df.copy()
r = df["r"].values
r = r[(r >= 0) & (r <= 150)]
if len(r) == 0:
return pd.DataFrame(columns=df.columns)
r_bins = np.arange(0, 150 + r_bin_width, r_bin_width)
r_hist, r_edges = np.histogram(df["r"].values, bins=r_bins)
top_idx = np.argsort(r_hist)[::-1][:top_n_bins]
mask = np.zeros(len(df), dtype=bool)
for bi in top_idx:
r_min, r_max = r_edges[bi], r_edges[bi + 1]
mask |= ((df["r"] >= r_min) & (df["r"] < r_max)).values
return df[mask].copy()
@staticmethod
def _circular_range_deg(angles_deg: np.ndarray) -> float:
"""์›ํ˜• ๊ฐ๋„ ๋ถ„ํฌ์˜ ์ปค๋ฒ„๋ฆฌ์ง€ (๋„, 360ยฐ ์ค‘)."""
if len(angles_deg) < 2:
return 0.0
a = np.sort(np.array(angles_deg) % 360.0)
gaps = np.diff(a)
circ_gap = 360.0 - a[-1] + a[0]
return 360.0 - max(np.max(gaps), circ_gap)
@staticmethod
def _check_sector_coverage(theta_deg: np.ndarray, min_sectors: int = 8) -> bool:
"""30ยฐ ๊ฐ„๊ฒฉ 12 sector ์ค‘ min_sectors ์ด์ƒ ์ปค๋ฒ„ํ•˜๋Š”์ง€."""
if len(theta_deg) == 0:
return False
sectors = ((theta_deg % 360) // 30).astype(int) % 12
return len(np.unique(sectors)) >= min_sectors
@staticmethod
def _fit_circle_least_squares(
x: np.ndarray, y: np.ndarray
) -> Tuple[Optional[float], Optional[float], Optional[float], float]:
"""
๋Œ€์ˆ˜์  ์ตœ์†Œ์ œ๊ณฑ ์› ํ”ผํŒ….
Returns
-------
(cx, cy, radius, rmse) โ€” ์‹คํŒจ ์‹œ (None, None, None, inf)
"""
if len(x) < 3:
return None, None, None, np.inf
x = x[:, np.newaxis]
y = y[:, np.newaxis]
A = np.hstack([x, y, np.ones_like(x)])
b = x ** 2 + y ** 2
try:
sol, *_ = np.linalg.lstsq(A, b, rcond=None)
a, bb, c = sol.flatten()
cx, cy = a / 2, bb / 2
radius = np.sqrt((a ** 2 + bb ** 2) / 4 + c)
fitted = np.sqrt((x - cx) ** 2 + (y - cy) ** 2)
rmse = np.sqrt(np.mean((fitted - radius) ** 2))
return cx, cy, radius, rmse
except Exception:
return None, None, None, np.inf
# ==================================================================
# ์„ ํ˜• ๊ฒ€์ถœ
# ==================================================================
def _is_linear_set(self, coords: np.ndarray) -> bool:
"""์ „์ฒด inlier ์ง‘ํ•ฉ์ด ์ง์„ ์— ์ถฉ๋ถ„ํžˆ ๊ฐ€๊นŒ์šด์ง€."""
cfg = self.cfg["linear"]
n = len(coords)
if n < 3:
return False
centroid = np.mean(coords, axis=0)
max_dist = np.max(np.linalg.norm(coords - centroid, axis=1))
# ๊ธธ์ด ์กฐ๊ฑด (๋ฐ˜์ง€๋ฆ„์˜ 2๋ฐฐ = ์ตœ๋Œ€ ๊ธธ์ด)
if 2 * max_dist < cfg["linear_min_length"]:
return False
pca = PCA(n_components=min(2, n)).fit(coords)
if len(pca.explained_variance_) < 2:
return False
eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9)
if np.sqrt(eig_ratio) < cfg["linear_pca_ratio_min"]:
return False
# ์ฃผ์ถ• ์ง๊ฐ๋ฐฉํ–ฅ ํ‰๊ท  ํŽธ์ฐจ
normal = np.array([-pca.components_[0][1], pca.components_[0][0]])
if np.mean(np.abs(np.dot(coords - pca.mean_, normal))) > cfg["linear_max_deviation"]:
return False
# ์ฃผ์ถ• ํˆฌ์˜ ํ›„ gap ratio (์„ ์ด ๋Š๊ฒจ์žˆ์ง€ ์•Š์€์ง€)
proj = np.sort(np.dot(coords - pca.mean_, pca.components_[0]))
total_len = proj[-1] - proj[0]
if total_len > 0 and np.max(np.diff(proj)) / total_len > cfg["linear_max_gap_ratio"]:
return False
return True
def _is_centroids_linear(self, sub_coords_list: list) -> bool:
"""์—ฌ๋Ÿฌ ์„œ๋ธŒํด๋Ÿฌ์Šคํ„ฐ์˜ ์ค‘์‹ฌ์ ๋“ค์ด ์ผ์ง์„  ์œ„์— ์žˆ๋Š”์ง€."""
cfg = self.cfg["linear"]
if len(sub_coords_list) < 3:
return False
centroids = np.array([np.mean(sc, axis=0) for sc in sub_coords_list])
max_span = 2 * np.max(np.linalg.norm(centroids - np.mean(centroids, axis=0), axis=1))
if max_span < cfg["centroid_linear_min_length"]:
return False
pca = PCA(n_components=2).fit(centroids)
if len(pca.explained_variance_) < 2:
return False
if np.sqrt(pca.explained_variance_[0] /
(pca.explained_variance_[1] + 1e-9)) < cfg["centroid_linear_pca_min"]:
return False
normal = np.array([-pca.components_[0][1], pca.components_[0][0]])
if np.mean(np.abs(np.dot(centroids - pca.mean_, normal))) > cfg["centroid_linear_dev_max"]:
return False
return True
# ==================================================================
# ๊ตฐ์ง‘ / ์„œ๋ธŒ ๋ถ„๋ฅ˜
# ==================================================================
def _classify_cluster_or_sub_linear(self, inlier_coords: np.ndarray) -> str:
"""
ring/linear ๋‘˜ ๋‹ค ์•„๋‹ ๋•Œ ํ˜ธ์ถœ: ์„œ๋ธŒ DBSCAN์œผ๋กœ ๋ถ„ํ•  ํ›„ ํŒจํ„ด ์žฌํŒ์ •.
- ์„œ๋ธŒํด๋Ÿฌ์Šคํ„ฐ โ‰ฅ2๊ฐœ์ด๊ณ  ์ค‘์‹ฌ์ ๋“ค์ด ์ผ์ง์„  โ†’ ์„ ํ˜•
- ๊ทธ ์™ธ: ๊ฐ ์„œ๋ธŒ๋ฅผ ๊ตฐ์ง‘/์„ ํ˜•์œผ๋กœ ๋ผ๋ฒจ๋ง ํ›„ ๋ˆ„์  ๋‹ค์ˆ˜๊ฒฐ
"""
cfg = self.cfg
if len(inlier_coords) < 2:
return "๊ตฐ์ง‘"
sub = DBSCAN(eps=cfg["clustering"]["cluster_dbscan_eps"],
min_samples=cfg["clustering"]["min_cluster_size"]).fit(inlier_coords)
sub_labels = sub.labels_
n_sub = len(set(sub_labels)) - (1 if -1 in sub_labels else 0)
if n_sub >= 2:
sub_list = [inlier_coords[sub_labels == lbl]
for lbl in set(sub_labels) if lbl != -1]
if self._is_centroids_linear(sub_list):
return "์„ ํ˜•"
results = [(self._classify_subcluster(sc), len(sc)) for sc in sub_list]
totals = {}
for pat, cnt in results:
totals[pat] = totals.get(pat, 0) + cnt
return max(totals, key=totals.get)
return self._classify_subcluster(inlier_coords)
def _classify_subcluster(self, sub_coords: np.ndarray) -> str:
"""๋‹จ์ผ ์„œ๋ธŒํด๋Ÿฌ์Šคํ„ฐ๋ฅผ '๊ตฐ์ง‘' ๋˜๋Š” '์„ ํ˜•'์œผ๋กœ ๋ผ๋ฒจ๋ง."""
cfg = self.cfg
n = len(sub_coords)
if n < 3:
return "๊ตฐ์ง‘"
centroid = np.mean(sub_coords, axis=0)
max_dist = np.max(np.linalg.norm(sub_coords - centroid, axis=1))
# compactํ•œ ๊ตฐ์ง‘
if max_dist <= cfg["cluster"]["cluster_compactness_radius"]:
return "๊ตฐ์ง‘"
pca = PCA(n_components=min(2, n)).fit(sub_coords)
if len(pca.explained_variance_) >= 2:
eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9)
shape_idx = np.sqrt(eig_ratio)
if shape_idx >= cfg["linear"]["linear_pca_ratio_min"]:
normal = np.array([-pca.components_[0][1], pca.components_[0][0]])
mean_dev = np.mean(np.abs(np.dot(sub_coords - pca.mean_, normal)))
if (mean_dev <= cfg["linear"]["linear_max_deviation"]
and 2 * max_dist >= cfg["linear"]["linear_min_length"]):
return "์„ ํ˜•"
return "๊ตฐ์ง‘"
# ==================================================================
# Zone / Centroid ์œ ํ‹ธ
# ==================================================================
@staticmethod
def _dominant_zone(df: pd.DataFrame) -> str:
"""๊ฐ€์žฅ ๋นˆ๋ฒˆํ•œ zone_label."""
if len(df) == 0 or "zone_label" not in df.columns:
return "N/A"
counter = Counter(df["zone_label"])
return counter.most_common(1)[0][0]
@staticmethod
def _zone_centroid(
inlier_df: pd.DataFrame, inlier_coords: np.ndarray, zone: str
) -> tuple:
"""dominant zone์— ์†ํ•œ ์ ๋“ค์˜ ํ‰๊ท . ์—†์œผ๋ฉด inlier ์ „์ฒด ํ‰๊ท ."""
dom = inlier_df[inlier_df["zone_label"] == zone] if "zone_label" in inlier_df.columns else inlier_df
if not dom.empty:
return tuple(np.mean(dom[["coor_x", "coor_y"]].values, axis=0))
return tuple(np.mean(inlier_coords, axis=0))
# ======================================================================
# Backward-compat: ๊ธฐ์กด ํ•จ์ˆ˜ API ์œ ์ง€
# ======================================================================
def classify_wafer_patterns(df: pd.DataFrame, cfg: dict):
"""`PatternDetector(cfg).classify(df)`์˜ ํ•จ์ˆ˜ํ˜• alias."""
return PatternDetector(cfg).classify(df)