# utils.py """ 웨이퍼 결함 데이터 처리 공용 유틸리티. 이 모듈은 LLS(Laser Light Scattering) 결함 분석 파이프라인에서 패턴 분류 이전·이후 단계에 공통으로 사용되는 함수들을 제공한다. 크게 6가지 범주를 포함: 1. 환경 설정 : 한글 폰트, JSON config 로드 2. 결함 라벨 매핑 : roughbin_no → 한글 결함 분류명 3. Zone 라벨링 : 시계방향 12구역 × Inner/Outer 분류 4. Fine-grid 처리 : 결함 좌표를 격자 cell에 할당 5. 필터링 : cell 단위 wafer 수 기준 노이즈 제거 6. 시각화 : 웨이퍼 맵 (산점도 + zone + centroid 마킹) 클래스 `WaferUtils`로 모든 유틸리티를 묶어 IDE 자동완성/타입힌트 일관성을 높이고, 하위 호환을 위해 동일 이름의 모듈 레벨 함수도 함께 노출한다. """ from __future__ import annotations import os import json from typing import Optional, Tuple import numpy as np import pandas as pd import matplotlib.pyplot as plt import matplotlib.font_manager as fm from matplotlib.patches import Circle, Wedge # ---------------------------------------------------------------------- # 모듈 상수 # ---------------------------------------------------------------------- # roughbin_no(정수) → 결함 분류명(한글/영문) 매핑. # 검사기 raw 코드를 운영에서 통용되는 분류명으로 변환할 때 사용. ROUGHBIN_MAPPING = { 0: 'LPD', 100: 'LPD-N', 110: 'Micro-Scratch', 111: 'Void', 115: 'PID', 120: 'LPD-E', 130: 'LPD-S', 140: 'LLPD', 141: 'Air Pocket', 150: 'DIC-Unique', 160: 'Stain', 170: 'COP', 200: 'Cluster Area', 205: 'Extended Defects', 210: 'Scratch', 220: 'Slipline', 230: 'Line', 231: 'Area', 233: 'Radial', 234: 'Ring', 512: 'Residue', 520: 'Boat Mark', 902: 'Streak', 999: 'Nuisance', 990: 'LPD Nuisance', 991: 'PPD Nuisance', 501: 'Haze Slipline', 502: 'Hazeline', 600: 'Grid', 700: 'ROI', 800: 'X Section', } # 시계방향 12구역 라벨. 12시부터 시작해서 시계방향(1, 2, ... 11시)으로 진행. CLOCK_LABELS = ["12", "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11"] # 패턴별 시각화 색상. PATTERN_COLORS = { "환형": "darkorange", "선형": "forestgreen", "군집": "mediumpurple", "정상/미달": "gray", "Others": "gray", } # ====================================================================== # WaferUtils # ====================================================================== class WaferUtils: """ 웨이퍼 결함 데이터 처리 유틸리티 클래스 (facade 형태). 상태가 필요 없는 순수 함수들이므로 대부분 `@staticmethod`로 구성되며, 인스턴스화 없이 `WaferUtils.method(...)` 형태로 사용할 수 있다. Examples -------- >>> df = WaferUtils.assign_fine_grid(df, cell_size_mm=3.0) >>> df = WaferUtils.add_zone_labels(df, inner_radius=105.0) >>> WaferUtils.plot_wafer_map(result_df, key="...", pattern_list=["환형"], ...) """ # ------------------------------------------------------------------ # 1. 환경 설정 # ------------------------------------------------------------------ @staticmethod def setup_korean_font() -> Optional[str]: """ 시스템 한글 폰트를 matplotlib 기본 폰트로 등록. 선호 순서: Malgun Gothic > Nanum Gothic > NanumBarunGothic > Batang > Gulim > AppleGothic. 선호 후보가 없으면 시스템에서 'gothic/mincho/dotum/gulim/malgun/sans/korean' 키워드가 포함된 첫 번째 한글 후보를 사용. Returns ------- Optional[str] 적용된 폰트명. 시스템에 한글 폰트가 전혀 없으면 None. """ korean_fonts = [ f.name for f in fm.fontManager.ttflist if any(k in f.name.lower() for k in ["gothic", "mincho", "dotum", "gulim", "malgun", "sans", "korean"]) ] preferred = ["Malgun Gothic", "Nanum Gothic", "NanumBarunGothic", "Batang", "Gulim", "AppleGothic"] selected = next((f for f in preferred if f in korean_fonts), None) if selected is None and korean_fonts: selected = korean_fonts[0] if selected: plt.rcParams["font.family"] = selected plt.rcParams["font.size"] = 10 # 한글 폰트에서 음수 부호(−)가 깨지는 현상 방지 plt.rcParams["axes.unicode_minus"] = False print(f"✅ 한글 폰트 설정 완료: {selected}") else: print("⚠️ 경고: 시스템에 한글 폰트가 없습니다. 기본 폰트 사용 (글자 깨짐 발생)") return selected @staticmethod def load_config(config_path: str = "./lls_config.json") -> dict: """ JSON 형식의 분석 설정 파일 로드. Parameters ---------- config_path : str 설정 파일 경로 (UTF-8 인코딩 가정). Returns ------- dict 설정 트리 (preprocessing / clustering / ring / linear / lof / cluster / misc / contact_mapping 등의 키 포함). Raises ------ FileNotFoundError 지정한 경로에 파일이 없는 경우. """ if not os.path.exists(config_path): raise FileNotFoundError(f"설정 파일 없음: {config_path}") with open(config_path, "r", encoding="utf-8") as f: return json.load(f) # ------------------------------------------------------------------ # 2. 결함 라벨 매핑 # ------------------------------------------------------------------ @staticmethod def map_roughbin_no(roughbin) -> Optional[str]: """ roughbin_no(검사기 raw 코드)를 운영 결함 분류명으로 매핑. 고정 매핑 테이블(`ROUGHBIN_MAPPING`)을 우선 조회하고, 범위형 코드(541~548 등)는 별도 if-조건으로 처리한다. Parameters ---------- roughbin : Any 정수 또는 정수 변환 가능한 값. NaN/None/문자열 등은 None 반환. Returns ------- Optional[str] 분류명("LPD", "Haze Slipline" 등). 매핑 실패 시 "Unknown", 입력이 NaN/변환 불가 시 None. """ if pd.isna(roughbin): return None try: roughbin = int(roughbin) except (TypeError, ValueError): return None if roughbin in ROUGHBIN_MAPPING: return ROUGHBIN_MAPPING[roughbin] # 범위형 코드 처리 if 541 <= roughbin <= 548: return "Haze Slipline" if 531 <= roughbin <= 538: return "Hazeline" if 601 <= roughbin <= 609: return "Grid" if 701 <= roughbin <= 709: return "ROI" if 801 <= roughbin <= 809: return "X Section" return "Unknown" # ------------------------------------------------------------------ # 3. Zone 라벨링 # ------------------------------------------------------------------ @staticmethod def add_zone_labels(df: pd.DataFrame, inner_radius: float = 105.0) -> pd.DataFrame: """ 결함 좌표에 zone 라벨을 부여. Zone 라벨 형식: `{Inner|Outer}_{시계위치 2자리}` 예) "Inner_03" = 반지름 ≤ inner_radius, 3시 방향 "Outer_12" = 반지름 > inner_radius, 12시 방향 각도 변환: - 수학 각도(atan2) → 12시 기준 시계방향 각도(`theta_from_12 = (90° - math) mod 360`) - sector index = floor(theta_from_12 / 30°) % 12 Parameters ---------- df : pd.DataFrame 'coor_x', 'coor_y' 컬럼을 포함한 결함 좌표 DF. inner_radius : float Inner / Outer 경계가 되는 반지름 (mm). Returns ------- pd.DataFrame 'zone_label', 'r' (원점 거리), 'theta_deg' (수학 각도) 컬럼 추가된 사본. """ df = df.copy() r = np.hypot(df["coor_x"], df["coor_y"]) theta_deg = np.degrees(np.arctan2(df["coor_y"], df["coor_x"])) # 시계 방향 환산: 12시 = 0°, 시계방향으로 증가 theta_from_12 = (90.0 - theta_deg) % 360.0 sector_index = (theta_from_12 // 30).astype(int) % 12 clock_str = pd.Series([CLOCK_LABELS[i] for i in sector_index], index=df.index) zone_type = np.where(r <= inner_radius, "Inner", "Outer") df["zone_label"] = [f"{zt}_{c}" for zt, c in zip(zone_type, clock_str)] df["r"] = r df["theta_deg"] = theta_deg return df # ------------------------------------------------------------------ # 4. Fine-grid 처리 # ------------------------------------------------------------------ @staticmethod def assign_fine_grid(df: pd.DataFrame, cell_size_mm: float = 3.0) -> pd.DataFrame: """ 결함 좌표를 fine-grid cell 에 할당. 웨이퍼 좌표 범위 [-150, 150] × [-150, 150] (mm)를 `cell_size_mm` 크기의 정사각 격자로 분할하고, 각 결함이 속하는 cell의 중심 좌표와 ID를 부여. Parameters ---------- df : pd.DataFrame 'coor_x', 'coor_y' 컬럼 포함 DF. cell_size_mm : float 셀 한 변의 크기 (mm). 기본 3.0. Returns ------- pd.DataFrame 'cell_x', 'cell_y' (셀 중심 좌표 mm), 'cell_id' ("{int_x}_{int_y}" 형식의 unique ID) 추가된 사본. Notes ----- cell_id는 cell_x/cell_y를 반올림하여 정수화한 문자열이라 cell_size_mm가 정수 경계와 어긋나면 충돌 가능. 통상 3.0/5.0 등 정수 권장. """ df = df.copy() # 좌표 평행이동 후 floor → bin index bin_x = np.floor((df["coor_x"] + 150) / cell_size_mm).astype(int) bin_y = np.floor((df["coor_y"] + 150) / cell_size_mm).astype(int) # 셀 중심 좌표 (mm) df["cell_x"] = bin_x * cell_size_mm - 150 + cell_size_mm / 2 df["cell_y"] = bin_y * cell_size_mm - 150 + cell_size_mm / 2 cell_x_int = np.round(df["cell_x"]).astype(int) cell_y_int = np.round(df["cell_y"]).astype(int) df["cell_id"] = cell_x_int.astype(str) + "_" + cell_y_int.astype(str) return df @staticmethod def get_cell_wafer_counts(df: pd.DataFrame) -> pd.DataFrame: """ 각 cell에서 결함이 발생한 unique wafer 수와 결함 수를 집계. Parameters ---------- df : pd.DataFrame 'cell_id', 'WAF_ID' 컬럼 포함 DF. Returns ------- pd.DataFrame index = cell_id columns = ['wafer_count', 'defect_count', 'wafer_ratio'] - wafer_count : 해당 cell에서 결함을 보인 unique wafer 수 - defect_count : 해당 cell의 전체 결함 수 - wafer_ratio : wafer_count / 전체 unique wafer 수 """ total_wafers = df["WAF_ID"].nunique() cell_stats = df.groupby("cell_id").agg( wafer_count=("WAF_ID", "nunique"), defect_count=("WAF_ID", "size"), ) cell_stats["wafer_ratio"] = cell_stats["wafer_count"] / total_wafers if total_wafers else 0.0 return cell_stats @staticmethod def filter_by_cell_wafer_count( df: pd.DataFrame, n1_min_wafer: int, cell_size_mm: float = 3.0, ) -> pd.DataFrame: """ Fine-grid 기반 n1 필터: 충분한 wafer에서 공통 발생한 cell의 결함만 유지. '공통 위치에 반복 발생하는 결함만 유의미하다'는 가정을 구현. unique wafer 수가 `n1_min_wafer` 미만인 cell은 노이즈로 간주 제거. Parameters ---------- df : pd.DataFrame 'coor_x', 'coor_y', 'WAF_ID' 포함 DF (cell 할당은 내부에서 수행). n1_min_wafer : int cell이 유효하기 위해 필요한 최소 unique wafer 수. cell_size_mm : float fine-grid cell 크기 (mm). Returns ------- pd.DataFrame n1 조건을 통과한 cell의 결함만 포함. 'cell_wafer_count' 컬럼 추가. """ df = WaferUtils.assign_fine_grid(df, cell_size_mm=cell_size_mm) cell_stats = WaferUtils.get_cell_wafer_counts(df) valid_cells = cell_stats[cell_stats["wafer_count"] >= n1_min_wafer].index df_filtered = df[df["cell_id"].isin(valid_cells)].copy() df_filtered = df_filtered.merge( cell_stats[["wafer_count", "wafer_ratio"]], left_on="cell_id", right_index=True, how="left", ) df_filtered.rename(columns={"wafer_count": "cell_wafer_count"}, inplace=True) return df_filtered @staticmethod def summarize_filtering_result( df_original: pd.DataFrame, df_filtered: pd.DataFrame, ) -> dict: """ 필터링 전후 결함/Cell 수 요약 통계. Returns ------- dict original_defects, filtered_defects, removed_defects, removal_rate(%), original_cells, valid_cells. """ orig = len(df_original) filt = len(df_filtered) removed = orig - filt rate = (removed / orig * 100) if orig else 0.0 return { "original_defects": orig, "filtered_defects": filt, "removed_defects": removed, "removal_rate": round(rate, 2), "original_cells": df_original["cell_id"].nunique() if "cell_id" in df_original.columns else 0, "valid_cells": df_filtered["cell_id"].nunique() if "cell_id" in df_filtered.columns else 0, } # ------------------------------------------------------------------ # 5. 시각화 # ------------------------------------------------------------------ @staticmethod def plot_wafer_map( result_df: pd.DataFrame, key: str, pattern_list, dominant_zone: str, meta: Optional[dict] = None, figsize: Tuple[int, int] = (8, 8), save_path: Optional[str] = None, show_mode: bool = False, ) -> None: """ 웨이퍼 결함 맵 시각화. 구성 요소 ---------- 1. 배경 영역 - 환형 패턴: 전체 원 영역을 베이지색으로 표시 - 그 외: dominant_zone에 해당하는 wedge만 베이지색 표시 2. 결함 산점도 - inlier 컬럼이 있으면 inlier/outlier 색상 분리 - inlier 색상은 패턴에 따라 PATTERN_COLORS 매핑 3. Centroid 마커 (환형 제외) - 빨간 원(10mm) + X 마커 4. 동심원: 30/45/60/90/120/150mm 5. 시계방향 그리드 + 12시·1시·...·11시 라벨 6. 캡션: 패턴/구역/결함수/장비/웨이퍼 Parameters ---------- result_df : pd.DataFrame 'coor_x', 'coor_y', (선택) 'inlier', 'zone_label' 컬럼. key : str 저장 파일명·캡션에 사용할 키. pattern_list : list[str] | str 패턴명. ['환형','군집'] 같은 리스트도 허용. dominant_zone : str 주요 zone 라벨 (예: 'Inner_03'). 'N/A' 이면 미표시. meta : dict, optional 'main_centroid_x', 'main_centroid_y', 'wafer_count', 'EQP_NM_8030' 등. figsize : (int, int) Figure 크기. save_path : str, optional 저장 경로. None이면 './result/result_figures/{key}.jpg'. show_mode : bool True 면 plt.show() 호출. """ solid_radii = [45, 90, 150] dashed_radii = [30, 60, 120] # 패턴 문자열 정규화 if isinstance(pattern_list, list): pattern_str = ", ".join(pattern_list) first_pattern = pattern_list[0] else: pattern_str = str(pattern_list) first_pattern = pattern_str.split(",")[0].strip() color = PATTERN_COLORS.get(first_pattern, "steelblue") fig, ax = plt.subplots(figsize=figsize) # --- 배경: 환형 → 전체 원, 그 외 → dominant zone wedge --- if "환형" in pattern_str: ax.add_patch(Circle((0, 0), 150, facecolor="#F5F5DC", edgecolor="none", alpha=0.8, zorder=1)) elif dominant_zone and dominant_zone != "N/A": try: for zone in [z.strip() for z in dominant_zone.split(",")]: ztype, zclock = zone.split("_") r_min = 0 if ztype == "Inner" else 105 r_max = 105 if ztype == "Inner" else 150 if zclock in CLOCK_LABELS: idx = CLOCK_LABELS.index(zclock) # 시계 각도 → 수학 각도 변환 (Wedge는 수학 각도 사용) math_start = 90 - (idx + 1) * 30 math_end = 90 - idx * 30 ax.add_patch(Wedge((0, 0), r_max, math_start, math_end, width=(r_max - r_min), facecolor="#F5F5DC", edgecolor="none", alpha=0.8, zorder=1)) except Exception: # zone 파싱 실패 시 배경 생략 (시각화는 계속 진행) pass # --- 결함 산점도 --- if "inlier" in result_df.columns: inliers = result_df[result_df["inlier"] == True] outliers = result_df[result_df["inlier"] == False] ax.scatter(outliers["coor_x"], outliers["coor_y"], c="lightgray", s=15, alpha=0.3, zorder=4) ax.scatter(inliers["coor_x"], inliers["coor_y"], c=color, s=35, alpha=0.5, label=f"Inlier ({pattern_str})", zorder=5) else: ax.scatter(result_df["coor_x"], result_df["coor_y"], c=color, s=30, alpha=0.5, zorder=5) # --- Centroid 마커: 환형이면 생략 (ring center는 원점 근처라 정보 없음) --- if meta and "환형" not in pattern_str: cx = meta.get("main_centroid_x") cy = meta.get("main_centroid_y") if cx is not None and cy is not None: ax.add_patch(Circle((cx, cy), radius=10, facecolor="none", edgecolor="red", linewidth=2.5, linestyle="-", alpha=0.9, zorder=7)) ax.scatter(cx, cy, c="red", s=80, marker="x", linewidths=2.5, zorder=8, label="Centroid") # --- 웨이퍼 동심원 --- for r in solid_radii: ax.add_patch(plt.Circle((0, 0), r, color="black", fill=False, linestyle="-", linewidth=1.2, alpha=0.7, zorder=2)) for r in dashed_radii: ax.add_patch(plt.Circle((0, 0), r, color="gray", fill=False, linestyle="--", linewidth=0.8, alpha=0.5, zorder=2)) # --- 시계 방향 그리드 + 라벨 --- clock_angles = {0: "12시", 30: "1시", 60: "2시", 90: "3시", 120: "4시", 150: "5시", 180: "6시", 210: "7시", 240: "8시", 270: "9시", 300: "10시", 330: "11시"} grid_end = max(solid_radii) + 12 label_r = grid_end * 0.93 for angle_deg, label_text in clock_angles.items(): # 시계 각도 → 수학 각도 math_rad = np.deg2rad(90 - angle_deg) ax.plot([0, grid_end * np.cos(math_rad)], [0, grid_end * np.sin(math_rad)], color="gray", linestyle=":", linewidth=0.8, zorder=2) ax.text(label_r * np.cos(math_rad), label_r * np.sin(math_rad), label_text, color="darkblue", fontsize=8, ha="center", va="center", weight="bold", alpha=0.75, zorder=3) ax.axhline(0, color="k", linewidth=0.4, zorder=3) ax.axvline(0, color="k", linewidth=0.4, zorder=3) max_range = max(solid_radii) + 20 ax.set_xlim(-max_range, max_range) ax.set_ylim(-max_range, max_range) ax.set_aspect("equal", "box") ax.set_xlabel("X (mm)") ax.set_ylabel("Y (mm)") ax.legend(loc="upper right", fontsize=8) ax.grid(True, alpha=0.15) # --- 캡션 --- total = len(result_df) dom_cnt = 0 if "zone_label" in result_df.columns and dominant_zone != "N/A": dom_zones = [z.strip() for z in dominant_zone.split(",")] dom_cnt = result_df[result_df["zone_label"].isin(dom_zones)].shape[0] ratio = (dom_cnt / total * 100) if total else 0.0 lines = [ f"Key: {key}", f"패턴: {pattern_str} | 발생구역: {dominant_zone}", f"전체 결함: {total}건 | 주요영역 결함: {dom_cnt}건 | 비율: {ratio:.1f}%", ] if meta: lines.append(f"장비: {meta.get('EQP_NM_8030', '-')} | 웨이퍼: {meta.get('wafer_count', '-')}매") ax.set_title("\n".join(lines), fontsize=9, loc="left", pad=8) plt.tight_layout() if save_path is None: save_dir = "./result/result_figures" os.makedirs(save_dir, exist_ok=True) save_path = os.path.join(save_dir, f"{key}.jpg") plt.savefig(save_path, dpi=150, bbox_inches="tight") if show_mode: plt.show() plt.close() # ====================================================================== # Backward-compat: 기존 모듈 레벨 함수 alias # (기존 코드 `from utils import setup_korean_font, ...` 형태를 그대로 지원) # ====================================================================== setup_korean_font = WaferUtils.setup_korean_font load_config = WaferUtils.load_config map_roughbin_no = WaferUtils.map_roughbin_no add_zone_labels = WaferUtils.add_zone_labels assign_fine_grid = WaferUtils.assign_fine_grid get_cell_wafer_counts = WaferUtils.get_cell_wafer_counts filter_by_cell_wafer_count = WaferUtils.filter_by_cell_wafer_count summarize_filtering_result = WaferUtils.summarize_filtering_result plot_wafer_map = WaferUtils.plot_wafer_map