# pattern_analyzer.py """ LLS 일자별 데이터 분석 오케스트레이터. `./daily/YYYYMMDD.parquet` 형태의 일자별 결함 데이터를 일괄 처리하여 패턴 분류 + Contact 매핑 + 시각화까지 한 번에 수행한다. 두 가지 실행 모드 지원 ---------------------- - ``"by_cst"`` : CAR_ID(캐리어) × HIS_REGIST_DTTM(스캔시각) 단위로 분류. 동일 캐리어 내 동일 시각 그룹별 패턴 발생 추적에 사용. - ``"daily"`` : 하루 전체 결함을 한 그룹으로 합쳐 1회 분류. 일자별 공정 트렌드/장비 이상 추적에 사용. 유의 패턴 여부와 무관하게 필터링된 결함은 항상 보존. 내부 의존성 ----------- - :class:`utils.WaferUtils` : 전처리·시각화 유틸 - :func:`pattern_detection.classify_wafer_patterns` : 패턴 분류 - :class:`contact_mapper.ContactMapper` : 설비 부위 매핑 출력 구조 --------- output_dir/ ├── by_cst/{date}_LLS_CST_analysis.csv # Mode 2 ├── daily_agg/{date}_LLS_daily_analysis.csv # Mode 1 ├── daily_agg/filtered_defects/{date}_filtered.parquet ├── figures_by_cst/{date}/{CST_ID}_{dttm}.jpg ├── figures_daily/{significant|others}/DAILY_{date}.jpg ├── config_used/{ts}_config.json └── LLS_{by_cst|daily_agg}_full_analysis.csv """ from __future__ import annotations import os import sys import shutil import glob import warnings from datetime import datetime from typing import Optional, Literal, List import numpy as np import pandas as pd import urllib3 from tqdm import tqdm from utils import ( setup_korean_font, load_config, add_zone_labels, plot_wafer_map, assign_fine_grid, filter_by_cell_wafer_count, ) from pattern_detection import classify_wafer_patterns from contact_mapper import ContactMapper warnings.filterwarnings("ignore") urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 실행 모드 타입. Mode = Literal["by_cst", "daily"] # Mode 1(daily aggregated)에서 '유의 패턴' 판정시 기본 제외 라벨. EXCLUDED_PATTERNS_DEFAULT = {"Others", "정상/미달", "데이터 없음", "None"} class LLSPatternAnalyzer: """ LLS 결함 일자별 분석 오케스트레이터. Parameters ---------- config_path : str ``lls_config.json`` 경로. daily_input_dir : str 일자별 parquet 파일 디렉터리. 파일명은 ``YYYYMMDD.parquet`` 형식이어야 함. output_dir : str 모든 결과(CSV, parquet, 이미지)가 저장될 루트 디렉터리. contact_csv : str, optional ``contact_angle.csv`` 경로. None 또는 파일 부재 시 contact 매핑 비활성화. setup_font : bool True 면 시작 시 한글 폰트 등록. Attributes ---------- config : dict ``lls_config.json`` 트리. contact_mapper : ContactMapper | None contact 매핑 활성화 시 인스턴스, 아니면 None. Examples -------- >>> analyzer = LLSPatternAnalyzer( ... config_path="./lls_config.json", ... daily_input_dir="./daily", ... output_dir="./result_daily", ... ) >>> df_daily = analyzer.run(mode="daily") # Mode 1 >>> df_by_cst = analyzer.run(mode="by_cst") # Mode 2 """ # ------------------------------------------------------------------ # 생성자 + 초기화 # ------------------------------------------------------------------ def __init__( self, config_path: str = "./lls_config.json", daily_input_dir: str = "./daily", output_dir: str = "./result_daily", contact_csv: Optional[str] = "./contact_angle.csv", setup_font: bool = True, ): if setup_font: setup_korean_font() self.config_path = config_path self.config = load_config(config_path) self.daily_input_dir = daily_input_dir self.output_dir = output_dir # --- Contact mapper (선택) --- cm_cfg = self.config.get("contact_mapping", {}) self.contact_tolerance_mm = cm_cfg.get("tolerance_mm", 30.0) self.contact_top_n = cm_cfg.get("top_n", 5) self.contact_mapper: Optional[ContactMapper] = None if contact_csv and os.path.exists(contact_csv): self.contact_mapper = ContactMapper( csv_path=contact_csv, tolerance_mm=self.contact_tolerance_mm, ) print(f"✅ Contact mapper 활성화: {contact_csv} (tolerance={self.contact_tolerance_mm}mm)") # --- 전처리 파라미터 (lls_config.json::preprocessing) --- pp = self.config["preprocessing"] self.cell_size_mm = pp["cell_size_mm"] self.n1_min_wafers = pp["n1_min_wafers"] # 구버전 config 호환: n2_min_cell_defects 또는 n2_min_zone_defects 모두 인식 self.n2_min_cell_defects = pp.get( "n2_min_cell_defects", pp.get("n2_min_zone_defects", 3) ) self.inner_radius_mm = pp["inner_radius_mm"] # --- Mode 1 유의 패턴 필터링 임계치 (lls_config.json::mode_daily) --- md = self.config.get("mode_daily", {}) self.daily_min_defect_count = md.get("min_defect_count", 30) self.daily_min_wafer_count = md.get("min_wafer_count", 3) self.daily_excluded_patterns = set( md.get("excluded_patterns", list(EXCLUDED_PATTERNS_DEFAULT)) ) self._prepare_output_dirs() self._backup_config() def _prepare_output_dirs(self) -> None: """출력 디렉터리 일괄 생성.""" self.by_cst_dir = os.path.join(self.output_dir, "by_cst") self.daily_agg_dir = os.path.join(self.output_dir, "daily_agg") self.figures_by_cst_dir = os.path.join(self.output_dir, "figures_by_cst") self.figures_daily_dir = os.path.join(self.output_dir, "figures_daily") self.config_used_dir = os.path.join(self.output_dir, "config_used") for d in [ self.output_dir, self.by_cst_dir, self.daily_agg_dir, self.figures_by_cst_dir, self.figures_daily_dir, self.config_used_dir, ]: os.makedirs(d, exist_ok=True) def _backup_config(self) -> None: """현재 사용된 config를 타임스탬프 파일명으로 백업 (재현성 확보).""" ts = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = os.path.join(self.config_used_dir, f"{ts}_config.json") shutil.copy(self.config_path, backup_path) print(f"✅ 설정 파일 백업 완료: {backup_path}") # ------------------------------------------------------------------ # 공용 전처리 헬퍼 # ------------------------------------------------------------------ def _load_parquet(self, file_path: str) -> Optional[pd.DataFrame]: """ Parquet 로드 + HIS_REGIST_DTTM_8030 정규화. Returns ------- Optional[pd.DataFrame] 로드 실패 또는 빈 데이터 시 None. """ try: df = pd.read_parquet(file_path) except Exception as e: print(f"❌ 파일 읽기 실패: {file_path}, 오류: {e}") return None if df.empty: return None # 타임스탬프 14자리(YYYYMMDDHHMMSS)로 자르기 — 그룹화 키 일관성 확보 if "HIS_REGIST_DTTM_8030" in df.columns: df["HIS_REGIST_DTTM_8030"] = df["HIS_REGIST_DTTM_8030"].astype(str).str[:14] return df def _apply_grid_and_n1(self, df: pd.DataFrame) -> pd.DataFrame: """fine-grid 할당 + n1 필터 (cell당 최소 wafer 수).""" df = assign_fine_grid(df, cell_size_mm=self.cell_size_mm) df = filter_by_cell_wafer_count(df, self.n1_min_wafers, cell_size_mm=self.cell_size_mm) return df def _apply_n2(self, df: pd.DataFrame) -> pd.DataFrame: """n2 필터: cell당 최소 결함 수 미만인 cell 제거.""" if df.empty: return df cell_counts = df["cell_id"].value_counts() valid_cells = cell_counts[cell_counts >= self.n2_min_cell_defects].index return df[df["cell_id"].isin(valid_cells)].copy() def _classify(self, df_group: pd.DataFrame) -> Optional[dict]: """ 한 그룹의 결함을 패턴 분류 + centroid 산출. Returns ------- Optional[dict] 성공 시 result_df / pattern_list / centroid 등을 담은 dict. 결함이 0건이면 None. """ coords = df_group[["coor_x", "coor_y"]].dropna() if len(coords) == 0: return None df_for_classify = df_group.loc[coords.index].copy() result_df, dominant_zone, pattern_list, centroid = classify_wafer_patterns( df_for_classify, self.config ) if centroid: cx, cy = centroid angle = (np.arctan2(cy, cx) / np.pi * 180 + 360) % 360 distance = round(float(np.sqrt(cx ** 2 + cy ** 2)), 4) else: angle = distance = None return { "result_df": result_df, "dominant_zone": dominant_zone, "pattern_list": pattern_list, "centroid": centroid, "main_centroid_x": round(centroid[0], 4) if centroid else None, "main_centroid_y": round(centroid[1], 4) if centroid else None, "main_centroid_Angle": angle, "main_centroid_Distance": distance, "defect_count": len(coords), } @staticmethod def _pattern_str(pattern_list) -> str: """패턴 리스트를 쉼표 결합 문자열로 정규화.""" if isinstance(pattern_list, list): return ", ".join(pattern_list) return str(pattern_list) def _attach_contact_candidates(self, record: dict) -> dict: """ record에 Curling 라벨 + contact 매핑 결과 컬럼 추가. 추가되는 컬럼 (순서 보존) - Curling : "Curling" 또는 None (장비 정보 앞) - contact_candidate_count : 매칭 후보 총 개수 - contact_candidates : "EQP:Part | ..." 형식 top-N 요약 문자열 Curling 검출은 contact 매핑 사전 단계로, centroid가 외각(r ≥ 130mm)의 2시 방향(30°)에 있으면 부여한다. """ if self.contact_mapper is None: return record cx = record.get("main_centroid_x") cy = record.get("main_centroid_y") # Curling 라벨은 contact 매핑 이전에 부착 (장비 정보 앞에 위치) record["Curling"] = self.contact_mapper.detect_curling(cx, cy) pat = record.get("overall_pattern", "") candidates = self.contact_mapper.map_pattern(pat, centroid_x=cx, centroid_y=cy) record["contact_candidate_count"] = int(len(candidates)) record["contact_candidates"] = self.contact_mapper.summarize_candidates( candidates, top_n=self.contact_top_n ) return record def _is_significant( self, pattern_list, defect_count: int, wafer_count: int ) -> bool: """ Mode 1 '유의 패턴' 판정. 세 조건 모두 충족해야 유의: (a) pattern_list가 제외 라벨(Others 등)로만 구성되지 않을 것 (b) defect_count >= daily_min_defect_count (c) wafer_count >= daily_min_wafer_count """ patterns = pattern_list if isinstance(pattern_list, list) else [pattern_list] if all(p in self.daily_excluded_patterns for p in patterns): return False if defect_count < self.daily_min_defect_count: return False if wafer_count < self.daily_min_wafer_count: return False return True # ------------------------------------------------------------------ # Mode 2 : by CST × scan-time # ------------------------------------------------------------------ def run_by_cst(self, df: pd.DataFrame, date_str: str) -> List[dict]: """ Mode 2 단일 일자 처리: CAR_ID × HIS_REGIST_DTTM 그룹별 분류. Parameters ---------- df : pd.DataFrame 한 일자 분량의 결함 DF. date_str : str 'YYYYMMDD' 일자 문자열 (저장 경로용). Returns ------- List[dict] 각 그룹별 record 리스트. 빈 결과면 []. """ daily_results: List[dict] = [] daily_result_dfs: dict = {} figures_dir = os.path.join(self.figures_by_cst_dir, date_str) os.makedirs(figures_dir, exist_ok=True) for car_id in tqdm(df["CAR_ID"].unique(), desc=f"{date_str} CST", leave=False): df_cst = df[df["CAR_ID"] == car_id].copy() if df_cst.empty: continue df_cst = self._apply_grid_and_n1(df_cst) if df_cst.empty: continue df_cst = add_zone_labels(df_cst, inner_radius=self.inner_radius_mm) for dttm, df_group in df_cst.groupby("HIS_REGIST_DTTM_8030"): df_group = self._apply_n2(df_group) if df_group.empty: continue eqp_series = df_group["EQP_ID_8030"].dropna() eqp_nm = eqp_series.mode().iloc[0] if not eqp_series.empty else "Unknown" cls = self._classify(df_group) if cls is None: continue key = f"{car_id}_{dttm}" daily_result_dfs[key] = cls["result_df"] rec = { "status": "Success", "mode": "by_cst", "CST_ID": car_id, "HIS_REGIST_DTTM": dttm, "EQP_NM_8030": eqp_nm, "analysis_date": date_str, "wafer_count": df_group["WAF_ID"].nunique(), "defect_count": cls["defect_count"], "overall_pattern": self._pattern_str(cls["pattern_list"]), "overall_dominant_zone": cls["dominant_zone"], "main_centroid_x": cls["main_centroid_x"], "main_centroid_y": cls["main_centroid_y"], "main_centroid_Angle": cls["main_centroid_Angle"], "main_centroid_Distance": cls["main_centroid_Distance"], } daily_results.append(self._attach_contact_candidates(rec)) if daily_results: df_daily = pd.DataFrame(daily_results) df_daily.to_csv( os.path.join(self.by_cst_dir, f"{date_str}_LLS_CST_analysis.csv"), index=False, encoding="utf-8-sig", ) for key, result_df in tqdm(daily_result_dfs.items(), desc=f"{date_str} 시각화", leave=False): meta = next( (r for r in daily_results if f"{r['CST_ID']}_{r['HIS_REGIST_DTTM']}" == key), None, ) if not meta: continue plot_wafer_map( result_df=result_df, key=key, pattern_list=meta["overall_pattern"], dominant_zone=meta["overall_dominant_zone"], meta=meta, show_mode=False, save_path=os.path.join(figures_dir, f"{key}.jpg"), ) return daily_results # ------------------------------------------------------------------ # Mode 1 : daily aggregated # ------------------------------------------------------------------ def run_daily(self, df: pd.DataFrame, date_str: str) -> List[dict]: """ Mode 1 단일 일자 처리: 하루 전체 결함 통합 후 1회 분류. 패턴 분류 성공 여부와 무관하게 ``filtered_defects/{date}_filtered.parquet`` 에 필터링된 결함을 항상 보존한다. 시각화는 유의 여부에 따라 ``figures_daily/significant/`` 또는 ``others/`` 폴더로 분리 저장. Returns ------- List[dict] 성공 시 1건 record 리스트. 필터 단계에서 모두 제거되면 []. """ df_day = df.copy() df_day = self._apply_grid_and_n1(df_day) if df_day.empty: print(f"🟡 {date_str} n1 필터 통과 결함 없음 → 스킵") return [] df_day = add_zone_labels(df_day, inner_radius=self.inner_radius_mm) df_day = self._apply_n2(df_day) if df_day.empty: print(f"🟡 {date_str} n2 필터 통과 결함 없음 → 스킵") return [] wafer_count = df_day["WAF_ID"].nunique() cls = self._classify(df_day) # 분류 실패해도 필터링된 결함은 유지 (사용자 요구사항) if cls is None: result_df = df_day.assign(inlier=False) pattern_list = ["None"] dominant_zone = "N/A" defect_count = len(df_day) centroid_fields = { "main_centroid_x": None, "main_centroid_y": None, "main_centroid_Angle": None, "main_centroid_Distance": None, } else: result_df = cls["result_df"] pattern_list = cls["pattern_list"] dominant_zone = cls["dominant_zone"] defect_count = cls["defect_count"] centroid_fields = { "main_centroid_x": cls["main_centroid_x"], "main_centroid_y": cls["main_centroid_y"], "main_centroid_Angle": cls["main_centroid_Angle"], "main_centroid_Distance": cls["main_centroid_Distance"], } is_significant = self._is_significant(pattern_list, defect_count, wafer_count) eqp_series = (df_day["EQP_ID_8030"].dropna() if "EQP_ID_8030" in df_day.columns else pd.Series([], dtype=object)) eqp_nm = eqp_series.mode().iloc[0] if not eqp_series.empty else "Unknown" key = f"DAILY_{date_str}" record = { "status": "Success", "mode": "daily", "is_significant": is_significant, "CST_ID": "ALL", "HIS_REGIST_DTTM": date_str, "EQP_NM_8030": eqp_nm, "analysis_date": date_str, "wafer_count": wafer_count, "defect_count": defect_count, "overall_pattern": self._pattern_str(pattern_list), "overall_dominant_zone": dominant_zone, **centroid_fields, } record = self._attach_contact_candidates(record) # CSV 저장 pd.DataFrame([record]).to_csv( os.path.join(self.daily_agg_dir, f"{date_str}_LLS_daily_analysis.csv"), index=False, encoding="utf-8-sig", ) # 통합 결함 parquet 항상 저장 (분류 무관) defects_dir = os.path.join(self.daily_agg_dir, "filtered_defects") os.makedirs(defects_dir, exist_ok=True) result_df.to_parquet( os.path.join(defects_dir, f"{date_str}_filtered.parquet"), index=False, ) # 시각화: 유의/비유의 폴더 분리 sub_dir = "significant" if is_significant else "others" save_dir = os.path.join(self.figures_daily_dir, sub_dir) os.makedirs(save_dir, exist_ok=True) plot_wafer_map( result_df=result_df, key=key, pattern_list=record["overall_pattern"], dominant_zone=record["overall_dominant_zone"], meta=record, show_mode=False, save_path=os.path.join(save_dir, f"{key}.jpg"), ) return [record] # ------------------------------------------------------------------ # Dispatcher / 진입점 # ------------------------------------------------------------------ def run(self, mode: Mode = "by_cst") -> pd.DataFrame: """ 모드별 일자 일괄 처리. Parameters ---------- mode : {"by_cst", "daily"} "by_cst": CST × 스캔시각 단위 (세밀) "daily" : 일자 통합 단위 (트렌드) Returns ------- pd.DataFrame 모든 일자 record를 합친 통합 DF (`output_dir`에 CSV로도 저장). 결과 없으면 빈 DF. Raises ------ ValueError mode가 허용 값이 아닐 때. FileNotFoundError ``daily_input_dir`` 에 parquet 파일이 없을 때. """ if mode not in ("by_cst", "daily"): raise ValueError(f"mode는 'by_cst' 또는 'daily' 여야 합니다. got={mode}") parquet_files = sorted(glob.glob(os.path.join(self.daily_input_dir, "*.parquet"))) if not parquet_files: raise FileNotFoundError( f"❌ {self.daily_input_dir} 폴더에 parquet 파일이 없습니다." ) print(f"✅ 총 {len(parquet_files)}개의 일자별 파일 발견 (mode={mode})") all_results: List[dict] = [] for file_path in tqdm(parquet_files, desc=f"📅 일자별 처리 ({mode})"): date_str = os.path.basename(file_path).split(".")[0] if not (len(date_str) == 8 and date_str.isdigit()): print(f"🟡 건너뜀 (파일명 형식 오류): {file_path}") continue df = self._load_parquet(file_path) if df is None: print(f"🟡 데이터 없음: {file_path}") continue if mode == "by_cst": results = self.run_by_cst(df, date_str) else: results = self.run_daily(df, date_str) all_results.extend(results) if not all_results: print("❌ 분석된 결과가 없습니다.") return pd.DataFrame() final_df = pd.DataFrame(all_results) suffix = "by_cst" if mode == "by_cst" else "daily_agg" final_path = os.path.join(self.output_dir, f"LLS_{suffix}_full_analysis.csv") final_df.to_csv(final_path, index=False, encoding="utf-8-sig") print(f"✅ 전체 분석 완료: {len(all_results)}건 → {final_path}") return final_df # ---------------------------------------------------------------------- # CLI 진입점: `python pattern_analyzer.py [by_cst|daily]` # ---------------------------------------------------------------------- if __name__ == "__main__": sys.path.append(os.getcwd()) mode: Mode = sys.argv[1] if len(sys.argv) > 1 else "by_cst" analyzer = LLSPatternAnalyzer() analyzer.run(mode=mode)