ELLS / pattern_detection.py
Hyungseoky's picture
Create pattern_detection.py
eaa982f verified
# pattern_detection.py
import numpy as np
import pandas as pd
from collections import Counter
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
from sklearn.neighbors import LocalOutlierFactor
import hdbscan
from scipy.stats import circvar
from utils import add_zone_labels
def get_dominant_zone(df: pd.DataFrame) -> str:
if len(df) == 0 or 'zone_label' not in df.columns:
return "N/A"
counter = Counter(df['zone_label'])
most_common_zone, _ = counter.most_common(1)[0]
return most_common_zone
def circular_range_deg(angles_deg: np.ndarray) -> float:
if len(angles_deg) < 2: return 0.0
angles_sorted = np.sort(np.array(angles_deg) % 360.0)
gaps = np.diff(angles_sorted)
circular_gap = 360.0 - angles_sorted[-1] + angles_sorted[0]
max_gap = max(np.max(gaps), circular_gap)
return 360.0 - max_gap
def check_sector_coverage(theta_deg: np.ndarray, min_sectors: int = 8) -> bool:
if len(theta_deg) == 0: return False
sector_indices = ((theta_deg % 360) // 30).astype(int) % 12
unique_sectors = len(np.unique(sector_indices))
return unique_sectors >= min_sectors
def fit_circle_least_squares(x: np.ndarray, y: np.ndarray):
if len(x) < 3: return None, None, None, np.inf
x = x[:, np.newaxis]
y = y[:, np.newaxis]
A = np.hstack([x, y, np.ones_like(x)])
b = x**2 + y**2
try:
solution, residuals, _, _ = np.linalg.lstsq(A, b, rcond=None)
a, b, c = solution.flatten()
center_x = a / 2
center_y = b / 2
radius = np.sqrt((a**2 + b**2) / 4 + c)
fitted_dists = np.sqrt((x - center_x)**2 + (y - center_y)**2)
rmse = np.sqrt(np.mean((fitted_dists - radius)**2))
return center_x, center_y, radius, rmse
except:
return None, None, None, np.inf
def filter_main_ring_band(df: pd.DataFrame, r_bin_width: float = 5.0, top_n_bins: int = 1) -> pd.DataFrame:
if len(df) == 0 or 'r' not in df.columns: return df.copy()
r = df['r'].values
r = r[(r >= 0) & (r <= 150)]
if len(r) == 0: return pd.DataFrame(columns=df.columns)
r_bins = np.arange(0, 150 + r_bin_width, r_bin_width)
r_hist, r_edges = np.histogram(r, bins=r_bins)
top_bin_indices = np.argsort(r_hist)[::-1][:top_n_bins]
mask = np.zeros(len(df), dtype=bool)
for bin_idx in top_bin_indices:
r_min = r_edges[bin_idx]
r_max = r_edges[bin_idx + 1]
bin_mask = (df['r'] >= r_min) & (df['r'] < r_max)
mask = mask | bin_mask.values
return df[mask].copy()
def is_ring_pattern_robust(inlier_df: pd.DataFrame, cfg: dict) -> bool:
n_total = len(inlier_df)
if n_total < cfg['ring']['ring_min_points']: return False
main_ring_df = filter_main_ring_band(inlier_df, r_bin_width=cfg['ring']['ring_band_width'], top_n_bins=1)
if len(main_ring_df) < cfg['ring']['ring_min_points']: return False
r = main_ring_df['r'].values
theta_deg = main_ring_df['theta_deg'].values
x = main_ring_df['coor_x'].values
y = main_ring_df['coor_y'].values
if r.max() - r.min() > cfg['ring']['ring_r_absolute_tolerance']: return False
if circular_range_deg(theta_deg) < cfg['ring']['ring_min_angular_coverage']: return False
if not check_sector_coverage(theta_deg, min_sectors=cfg['ring']['ring_min_sectors']): return False
cx, cy, r_fit, rmse = fit_circle_least_squares(x, y)
if rmse == np.inf or rmse > cfg['ring']['ring_fit_rmse_max']: return False
if np.sqrt(cx**2 + cy**2) > 10.0: return False
return True
def _is_linear_set(coords: np.ndarray, cfg: dict) -> bool:
n = len(coords)
if n < 3: return False
centroid = np.mean(coords, axis=0)
max_dist = np.max(np.linalg.norm(coords - centroid, axis=1))
if 2 * max_dist < cfg['linear']['linear_min_length']: return False
pca = PCA(n_components=min(2, n)).fit(coords)
if len(pca.explained_variance_) < 2: return False
eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9)
if np.sqrt(eig_ratio) < cfg['linear']['linear_pca_ratio_min']: return False
normal_vec = np.array([-pca.components_[0][1], pca.components_[0][0]])
if np.mean(np.abs(np.dot(coords - pca.mean_, normal_vec))) > cfg['linear']['linear_max_deviation']: return False
proj = np.sort(np.dot(coords - pca.mean_, pca.components_[0]))
total_len = proj[-1] - proj[0]
if total_len > 0 and np.max(np.diff(proj)) / total_len > cfg['linear']['linear_max_gap_ratio']: return False
return True
def _is_centroids_linear(sub_coords_list: list, cfg: dict) -> bool:
if len(sub_coords_list) < 3: return False
centroids = np.array([np.mean(sc, axis=0) for sc in sub_coords_list])
max_span = 2 * np.max(np.linalg.norm(centroids - np.mean(centroids, axis=0), axis=1))
if max_span < cfg['linear']['centroid_linear_min_length']: return False
pca = PCA(n_components=2).fit(centroids)
if len(pca.explained_variance_) < 2: return False
if np.sqrt(pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9)) < cfg['linear']['centroid_linear_pca_min']: return False
normal = np.array([-pca.components_[0][1], pca.components_[0][0]])
if np.mean(np.abs(np.dot(centroids - pca.mean_, normal))) > cfg['linear']['centroid_linear_dev_max']: return False
return True
def _classify_subcluster(sub_coords: np.ndarray, cfg: dict) -> str:
n = len(sub_coords)
if n < 3: return "군집"
centroid = np.mean(sub_coords, axis=0)
dists_from_centroid = np.linalg.norm(sub_coords - centroid, axis=1)
max_dist = np.max(dists_from_centroid)
if max_dist <= cfg['cluster']['cluster_compactness_radius']: return "군집"
pca = PCA(n_components=min(2, n)).fit(sub_coords)
if len(pca.explained_variance_) >= 2:
eig_ratio = pca.explained_variance_[0] / (pca.explained_variance_[1] + 1e-9)
shape_idx = np.sqrt(eig_ratio)
if shape_idx >= cfg['linear']['linear_pca_ratio_min']:
normal_vec = np.array([-pca.components_[0][1], pca.components_[0][0]])
mean_dev = np.mean(np.abs(np.dot(sub_coords - pca.mean_, normal_vec)))
if mean_dev <= cfg['linear']['linear_max_deviation'] and 2*max_dist >= cfg['linear']['linear_min_length']:
return "선형"
return "군집"
def classify_wafer_patterns(df: pd.DataFrame, cfg: dict) -> tuple:
if df.empty: return df, "데이터 없음", ["None"], None
df = df.copy().reset_index(drop=True)
df = add_zone_labels(df, inner_radius=cfg['preprocessing']['inner_radius_mm'])
coords = df[["coor_x", "coor_y"]].values
n_total = len(df)
if n_total < cfg['misc']['min_points_for_clustering']:
return df.assign(inlier=np.zeros(len(df), dtype=bool)), "데이터 없음", ["정상/미달"], None
clusterer = hdbscan.HDBSCAN(
min_cluster_size=cfg['clustering']['min_cluster_size'],
min_samples=cfg['clustering']['min_samples'],
cluster_selection_method=cfg['clustering']['cluster_selection_method'],
metric="euclidean",
gen_min_span_tree=True
)
labels = clusterer.fit_predict(coords)
if np.all(labels == -1):
labels = DBSCAN(eps=cfg['clustering']['dbscan_eps'], min_samples=cfg['clustering']['min_cluster_size']).fit(coords).labels_
inlier_mask = (labels != -1)
if not any(inlier_mask):
return df.assign(inlier=inlier_mask), "데이터 없음", ["Others"], None
inlier_df_pre = df[inlier_mask].copy()
inlier_coords = coords[inlier_mask]
n_inlier = len(inlier_coords)
if n_inlier >= cfg['lof']['lof_min_points']:
n_neighbors_lof = min(cfg['lof']['lof_n_neighbors'], n_inlier - 1)
if n_neighbors_lof >= 2:
lof = LocalOutlierFactor(
n_neighbors=n_neighbors_lof,
contamination=cfg['lof']['lof_contamination'],
metric="euclidean"
)
lof_labels = lof.fit_predict(inlier_coords)
full_lof_mask = np.zeros(len(df), dtype=bool)
full_lof_mask[inlier_mask] = (lof_labels == 1)
inlier_mask = inlier_mask & full_lof_mask
inlier_df = df[inlier_mask].copy()
inlier_coords = coords[inlier_mask]
n_inlier = len(inlier_df)
if n_inlier < cfg['clustering']['min_cluster_size']:
return df.assign(inlier=inlier_mask), "데이터 없음", ["Others"], None
if is_ring_pattern_robust(inlier_df, cfg):
dominant_zone = get_dominant_zone(inlier_df)
centroid = tuple(np.mean(inlier_df[['coor_x', 'coor_y']].values, axis=0))
return df.assign(inlier=inlier_mask), dominant_zone, ["환형"], centroid
if _is_linear_set(inlier_coords, cfg):
dominant_zone = get_dominant_zone(inlier_df)
dom_points = inlier_df[inlier_df['zone_label'] == dominant_zone]
centroid = tuple(np.mean(dom_points[['coor_x', 'coor_y']].values, axis=0)) if not dom_points.empty else tuple(np.mean(inlier_coords, axis=0))
return df.assign(inlier=inlier_mask), dominant_zone, ["선형"], centroid
dominant_zone = get_dominant_zone(inlier_df)
dom_points = inlier_df[inlier_df['zone_label'] == dominant_zone]
centroid = tuple(np.mean(dom_points[['coor_x', 'coor_y']].values, axis=0)) if not dom_points.empty else tuple(np.mean(inlier_coords, axis=0))
if n_inlier >= 2:
dbscan_sub = DBSCAN(eps=cfg['clustering']['cluster_dbscan_eps'], min_samples=cfg['clustering']['min_cluster_size']).fit(inlier_coords)
sub_labels = dbscan_sub.labels_
n_sub_clusters = len(set(sub_labels)) - (1 if -1 in sub_labels else 0)
if n_sub_clusters >= 2:
sub_coords_list = [inlier_coords[sub_labels == lbl] for lbl in set(sub_labels) if lbl != -1]
if _is_centroids_linear(sub_coords_list, cfg):
return df.assign(inlier=inlier_mask), dominant_zone, ["선형"], centroid
sub_results = [(_classify_subcluster(sc, cfg), len(sc)) for sc in sub_coords_list]
pat_totals = {}
for pat, cnt in sub_results: pat_totals[pat] = pat_totals.get(pat, 0) + cnt
dominant_pattern = max(pat_totals, key=pat_totals.get)
return df.assign(inlier=inlier_mask), dominant_zone, [dominant_pattern], centroid
pattern = _classify_subcluster(inlier_coords, cfg)
return df.assign(inlier=inlier_mask), dominant_zone, [pattern], centroid
return df.assign(inlier=inlier_mask), dominant_zone, ["Others"], None