ELLS / nu /pattern_analyzer.py
Hyungseoky's picture
Upload 10 files
0d69261 verified
# pattern_analyzer.py
"""
LLS ์ผ์ž๋ณ„ ๋ฐ์ดํ„ฐ ๋ถ„์„ ์˜ค์ผ€์ŠคํŠธ๋ ˆ์ดํ„ฐ.
`./daily/YYYYMMDD.parquet` ํ˜•ํƒœ์˜ ์ผ์ž๋ณ„ ๊ฒฐํ•จ ๋ฐ์ดํ„ฐ๋ฅผ ์ผ๊ด„ ์ฒ˜๋ฆฌํ•˜์—ฌ
ํŒจํ„ด ๋ถ„๋ฅ˜ + Contact ๋งคํ•‘ + ์‹œ๊ฐํ™”๊นŒ์ง€ ํ•œ ๋ฒˆ์— ์ˆ˜ํ–‰ํ•œ๋‹ค.
๋‘ ๊ฐ€์ง€ ์‹คํ–‰ ๋ชจ๋“œ ์ง€์›
----------------------
- ``"by_cst"`` : CAR_ID(์บ๋ฆฌ์–ด) ร— HIS_REGIST_DTTM(์Šค์บ”์‹œ๊ฐ) ๋‹จ์œ„๋กœ ๋ถ„๋ฅ˜.
๋™์ผ ์บ๋ฆฌ์–ด ๋‚ด ๋™์ผ ์‹œ๊ฐ ๊ทธ๋ฃน๋ณ„ ํŒจํ„ด ๋ฐœ์ƒ ์ถ”์ ์— ์‚ฌ์šฉ.
- ``"daily"`` : ํ•˜๋ฃจ ์ „์ฒด ๊ฒฐํ•จ์„ ํ•œ ๊ทธ๋ฃน์œผ๋กœ ํ•ฉ์ณ 1ํšŒ ๋ถ„๋ฅ˜.
์ผ์ž๋ณ„ ๊ณต์ • ํŠธ๋ Œ๋“œ/์žฅ๋น„ ์ด์ƒ ์ถ”์ ์— ์‚ฌ์šฉ. ์œ ์˜ ํŒจํ„ด ์—ฌ๋ถ€์™€
๋ฌด๊ด€ํ•˜๊ฒŒ ํ•„ํ„ฐ๋ง๋œ ๊ฒฐํ•จ์€ ํ•ญ์ƒ ๋ณด์กด.
๋‚ด๋ถ€ ์˜์กด์„ฑ
-----------
- :class:`utils.WaferUtils` : ์ „์ฒ˜๋ฆฌยท์‹œ๊ฐํ™” ์œ ํ‹ธ
- :func:`pattern_detection.classify_wafer_patterns` : ํŒจํ„ด ๋ถ„๋ฅ˜
- :class:`contact_mapper.ContactMapper` : ์„ค๋น„ ๋ถ€์œ„ ๋งคํ•‘
์ถœ๋ ฅ ๊ตฌ์กฐ
---------
output_dir/
โ”œโ”€โ”€ by_cst/{date}_LLS_CST_analysis.csv # Mode 2
โ”œโ”€โ”€ daily_agg/{date}_LLS_daily_analysis.csv # Mode 1
โ”œโ”€โ”€ daily_agg/filtered_defects/{date}_filtered.parquet
โ”œโ”€โ”€ figures_by_cst/{date}/{CST_ID}_{dttm}.jpg
โ”œโ”€โ”€ figures_daily/{significant|others}/DAILY_{date}.jpg
โ”œโ”€โ”€ config_used/{ts}_config.json
โ””โ”€โ”€ LLS_{by_cst|daily_agg}_full_analysis.csv
"""
from __future__ import annotations
import os
import sys
import shutil
import glob
import warnings
from datetime import datetime
from typing import Optional, Literal, List
import numpy as np
import pandas as pd
import urllib3
from tqdm import tqdm
from utils import (
setup_korean_font, load_config, add_zone_labels, plot_wafer_map,
assign_fine_grid, filter_by_cell_wafer_count,
)
from pattern_detection import classify_wafer_patterns
from contact_mapper import ContactMapper
warnings.filterwarnings("ignore")
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ์‹คํ–‰ ๋ชจ๋“œ ํƒ€์ž….
Mode = Literal["by_cst", "daily"]
# Mode 1(daily aggregated)์—์„œ '์œ ์˜ ํŒจํ„ด' ํŒ์ •์‹œ ๊ธฐ๋ณธ ์ œ์™ธ ๋ผ๋ฒจ.
EXCLUDED_PATTERNS_DEFAULT = {"Others", "์ •์ƒ/๋ฏธ๋‹ฌ", "๋ฐ์ดํ„ฐ ์—†์Œ", "None"}
class LLSPatternAnalyzer:
"""
LLS ๊ฒฐํ•จ ์ผ์ž๋ณ„ ๋ถ„์„ ์˜ค์ผ€์ŠคํŠธ๋ ˆ์ดํ„ฐ.
Parameters
----------
config_path : str
``lls_config.json`` ๊ฒฝ๋กœ.
daily_input_dir : str
์ผ์ž๋ณ„ parquet ํŒŒ์ผ ๋””๋ ‰ํ„ฐ๋ฆฌ. ํŒŒ์ผ๋ช…์€ ``YYYYMMDD.parquet`` ํ˜•์‹์ด์–ด์•ผ ํ•จ.
output_dir : str
๋ชจ๋“  ๊ฒฐ๊ณผ(CSV, parquet, ์ด๋ฏธ์ง€)๊ฐ€ ์ €์žฅ๋  ๋ฃจํŠธ ๋””๋ ‰ํ„ฐ๋ฆฌ.
contact_csv : str, optional
``contact_angle.csv`` ๊ฒฝ๋กœ. None ๋˜๋Š” ํŒŒ์ผ ๋ถ€์žฌ ์‹œ contact ๋งคํ•‘ ๋น„ํ™œ์„ฑํ™”.
setup_font : bool
True ๋ฉด ์‹œ์ž‘ ์‹œ ํ•œ๊ธ€ ํฐํŠธ ๋“ฑ๋ก.
Attributes
----------
config : dict
``lls_config.json`` ํŠธ๋ฆฌ.
contact_mapper : ContactMapper | None
contact ๋งคํ•‘ ํ™œ์„ฑํ™” ์‹œ ์ธ์Šคํ„ด์Šค, ์•„๋‹ˆ๋ฉด None.
Examples
--------
>>> analyzer = LLSPatternAnalyzer(
... config_path="./lls_config.json",
... daily_input_dir="./daily",
... output_dir="./result_daily",
... )
>>> df_daily = analyzer.run(mode="daily") # Mode 1
>>> df_by_cst = analyzer.run(mode="by_cst") # Mode 2
"""
# ------------------------------------------------------------------
# ์ƒ์„ฑ์ž + ์ดˆ๊ธฐํ™”
# ------------------------------------------------------------------
def __init__(
self,
config_path: str = "./lls_config.json",
daily_input_dir: str = "./daily",
output_dir: str = "./result_daily",
contact_csv: Optional[str] = "./contact_angle.csv",
setup_font: bool = True,
):
if setup_font:
setup_korean_font()
self.config_path = config_path
self.config = load_config(config_path)
self.daily_input_dir = daily_input_dir
self.output_dir = output_dir
# --- Contact mapper (์„ ํƒ) ---
cm_cfg = self.config.get("contact_mapping", {})
self.contact_tolerance_mm = cm_cfg.get("tolerance_mm", 30.0)
self.contact_top_n = cm_cfg.get("top_n", 5)
self.contact_mapper: Optional[ContactMapper] = None
if contact_csv and os.path.exists(contact_csv):
self.contact_mapper = ContactMapper(
csv_path=contact_csv,
tolerance_mm=self.contact_tolerance_mm,
)
print(f"โœ… Contact mapper ํ™œ์„ฑํ™”: {contact_csv} (tolerance={self.contact_tolerance_mm}mm)")
# --- ์ „์ฒ˜๋ฆฌ ํŒŒ๋ผ๋ฏธํ„ฐ (lls_config.json::preprocessing) ---
pp = self.config["preprocessing"]
self.cell_size_mm = pp["cell_size_mm"]
self.n1_min_wafers = pp["n1_min_wafers"]
# ๊ตฌ๋ฒ„์ „ config ํ˜ธํ™˜: n2_min_cell_defects ๋˜๋Š” n2_min_zone_defects ๋ชจ๋‘ ์ธ์‹
self.n2_min_cell_defects = pp.get(
"n2_min_cell_defects", pp.get("n2_min_zone_defects", 3)
)
self.inner_radius_mm = pp["inner_radius_mm"]
# --- Mode 1 ์œ ์˜ ํŒจํ„ด ํ•„ํ„ฐ๋ง ์ž„๊ณ„์น˜ (lls_config.json::mode_daily) ---
md = self.config.get("mode_daily", {})
self.daily_min_defect_count = md.get("min_defect_count", 30)
self.daily_min_wafer_count = md.get("min_wafer_count", 3)
self.daily_excluded_patterns = set(
md.get("excluded_patterns", list(EXCLUDED_PATTERNS_DEFAULT))
)
self._prepare_output_dirs()
self._backup_config()
def _prepare_output_dirs(self) -> None:
"""์ถœ๋ ฅ ๋””๋ ‰ํ„ฐ๋ฆฌ ์ผ๊ด„ ์ƒ์„ฑ."""
self.by_cst_dir = os.path.join(self.output_dir, "by_cst")
self.daily_agg_dir = os.path.join(self.output_dir, "daily_agg")
self.figures_by_cst_dir = os.path.join(self.output_dir, "figures_by_cst")
self.figures_daily_dir = os.path.join(self.output_dir, "figures_daily")
self.config_used_dir = os.path.join(self.output_dir, "config_used")
for d in [
self.output_dir, self.by_cst_dir, self.daily_agg_dir,
self.figures_by_cst_dir, self.figures_daily_dir, self.config_used_dir,
]:
os.makedirs(d, exist_ok=True)
def _backup_config(self) -> None:
"""ํ˜„์žฌ ์‚ฌ์šฉ๋œ config๋ฅผ ํƒ€์ž„์Šคํƒฌํ”„ ํŒŒ์ผ๋ช…์œผ๋กœ ๋ฐฑ์—… (์žฌํ˜„์„ฑ ํ™•๋ณด)."""
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = os.path.join(self.config_used_dir, f"{ts}_config.json")
shutil.copy(self.config_path, backup_path)
print(f"โœ… ์„ค์ • ํŒŒ์ผ ๋ฐฑ์—… ์™„๋ฃŒ: {backup_path}")
# ------------------------------------------------------------------
# ๊ณต์šฉ ์ „์ฒ˜๋ฆฌ ํ—ฌํผ
# ------------------------------------------------------------------
def _load_parquet(self, file_path: str) -> Optional[pd.DataFrame]:
"""
Parquet ๋กœ๋“œ + HIS_REGIST_DTTM_8030 ์ •๊ทœํ™”.
Returns
-------
Optional[pd.DataFrame]
๋กœ๋“œ ์‹คํŒจ ๋˜๋Š” ๋นˆ ๋ฐ์ดํ„ฐ ์‹œ None.
"""
try:
df = pd.read_parquet(file_path)
except Exception as e:
print(f"โŒ ํŒŒ์ผ ์ฝ๊ธฐ ์‹คํŒจ: {file_path}, ์˜ค๋ฅ˜: {e}")
return None
if df.empty:
return None
# ํƒ€์ž„์Šคํƒฌํ”„ 14์ž๋ฆฌ(YYYYMMDDHHMMSS)๋กœ ์ž๋ฅด๊ธฐ โ€” ๊ทธ๋ฃนํ™” ํ‚ค ์ผ๊ด€์„ฑ ํ™•๋ณด
if "HIS_REGIST_DTTM_8030" in df.columns:
df["HIS_REGIST_DTTM_8030"] = df["HIS_REGIST_DTTM_8030"].astype(str).str[:14]
return df
def _apply_grid_and_n1(self, df: pd.DataFrame) -> pd.DataFrame:
"""fine-grid ํ• ๋‹น + n1 ํ•„ํ„ฐ (cell๋‹น ์ตœ์†Œ wafer ์ˆ˜)."""
df = assign_fine_grid(df, cell_size_mm=self.cell_size_mm)
df = filter_by_cell_wafer_count(df, self.n1_min_wafers, cell_size_mm=self.cell_size_mm)
return df
def _apply_n2(self, df: pd.DataFrame) -> pd.DataFrame:
"""n2 ํ•„ํ„ฐ: cell๋‹น ์ตœ์†Œ ๊ฒฐํ•จ ์ˆ˜ ๋ฏธ๋งŒ์ธ cell ์ œ๊ฑฐ."""
if df.empty:
return df
cell_counts = df["cell_id"].value_counts()
valid_cells = cell_counts[cell_counts >= self.n2_min_cell_defects].index
return df[df["cell_id"].isin(valid_cells)].copy()
def _classify(self, df_group: pd.DataFrame) -> Optional[dict]:
"""
ํ•œ ๊ทธ๋ฃน์˜ ๊ฒฐํ•จ์„ ํŒจํ„ด ๋ถ„๋ฅ˜ + centroid ์‚ฐ์ถœ.
Returns
-------
Optional[dict]
์„ฑ๊ณต ์‹œ result_df / pattern_list / centroid ๋“ฑ์„ ๋‹ด์€ dict.
๊ฒฐํ•จ์ด 0๊ฑด์ด๋ฉด None.
"""
coords = df_group[["coor_x", "coor_y"]].dropna()
if len(coords) == 0:
return None
df_for_classify = df_group.loc[coords.index].copy()
result_df, dominant_zone, pattern_list, centroid = classify_wafer_patterns(
df_for_classify, self.config
)
if centroid:
cx, cy = centroid
angle = (np.arctan2(cy, cx) / np.pi * 180 + 360) % 360
distance = round(float(np.sqrt(cx ** 2 + cy ** 2)), 4)
else:
angle = distance = None
return {
"result_df": result_df,
"dominant_zone": dominant_zone,
"pattern_list": pattern_list,
"centroid": centroid,
"main_centroid_x": round(centroid[0], 4) if centroid else None,
"main_centroid_y": round(centroid[1], 4) if centroid else None,
"main_centroid_Angle": angle,
"main_centroid_Distance": distance,
"defect_count": len(coords),
}
@staticmethod
def _pattern_str(pattern_list) -> str:
"""ํŒจํ„ด ๋ฆฌ์ŠคํŠธ๋ฅผ ์‰ผํ‘œ ๊ฒฐํ•ฉ ๋ฌธ์ž์—ด๋กœ ์ •๊ทœํ™”."""
if isinstance(pattern_list, list):
return ", ".join(pattern_list)
return str(pattern_list)
def _attach_contact_candidates(self, record: dict) -> dict:
"""
record์— Curling ๋ผ๋ฒจ + contact ๋งคํ•‘ ๊ฒฐ๊ณผ ์ปฌ๋Ÿผ ์ถ”๊ฐ€.
์ถ”๊ฐ€๋˜๋Š” ์ปฌ๋Ÿผ (์ˆœ์„œ ๋ณด์กด)
- Curling : "Curling" ๋˜๋Š” None (์žฅ๋น„ ์ •๋ณด ์•ž)
- contact_candidate_count : ๋งค์นญ ํ›„๋ณด ์ด ๊ฐœ์ˆ˜
- contact_candidates : "EQP:Part | ..." ํ˜•์‹ top-N ์š”์•ฝ ๋ฌธ์ž์—ด
Curling ๊ฒ€์ถœ์€ contact ๋งคํ•‘ ์‚ฌ์ „ ๋‹จ๊ณ„๋กœ,
centroid๊ฐ€ ์™ธ๊ฐ(r โ‰ฅ 130mm)์˜ 2์‹œ ๋ฐฉํ–ฅ(30ยฐ)์— ์žˆ์œผ๋ฉด ๋ถ€์—ฌํ•œ๋‹ค.
"""
if self.contact_mapper is None:
return record
cx = record.get("main_centroid_x")
cy = record.get("main_centroid_y")
# Curling ๋ผ๋ฒจ์€ contact ๋งคํ•‘ ์ด์ „์— ๋ถ€์ฐฉ (์žฅ๋น„ ์ •๋ณด ์•ž์— ์œ„์น˜)
record["Curling"] = self.contact_mapper.detect_curling(cx, cy)
pat = record.get("overall_pattern", "")
candidates = self.contact_mapper.map_pattern(pat, centroid_x=cx, centroid_y=cy)
record["contact_candidate_count"] = int(len(candidates))
record["contact_candidates"] = self.contact_mapper.summarize_candidates(
candidates, top_n=self.contact_top_n
)
return record
def _is_significant(
self, pattern_list, defect_count: int, wafer_count: int
) -> bool:
"""
Mode 1 '์œ ์˜ ํŒจํ„ด' ํŒ์ •.
์„ธ ์กฐ๊ฑด ๋ชจ๋‘ ์ถฉ์กฑํ•ด์•ผ ์œ ์˜:
(a) pattern_list๊ฐ€ ์ œ์™ธ ๋ผ๋ฒจ(Others ๋“ฑ)๋กœ๋งŒ ๊ตฌ์„ฑ๋˜์ง€ ์•Š์„ ๊ฒƒ
(b) defect_count >= daily_min_defect_count
(c) wafer_count >= daily_min_wafer_count
"""
patterns = pattern_list if isinstance(pattern_list, list) else [pattern_list]
if all(p in self.daily_excluded_patterns for p in patterns):
return False
if defect_count < self.daily_min_defect_count:
return False
if wafer_count < self.daily_min_wafer_count:
return False
return True
# ------------------------------------------------------------------
# Mode 2 : by CST ร— scan-time
# ------------------------------------------------------------------
def run_by_cst(self, df: pd.DataFrame, date_str: str) -> List[dict]:
"""
Mode 2 ๋‹จ์ผ ์ผ์ž ์ฒ˜๋ฆฌ: CAR_ID ร— HIS_REGIST_DTTM ๊ทธ๋ฃน๋ณ„ ๋ถ„๋ฅ˜.
Parameters
----------
df : pd.DataFrame
ํ•œ ์ผ์ž ๋ถ„๋Ÿ‰์˜ ๊ฒฐํ•จ DF.
date_str : str
'YYYYMMDD' ์ผ์ž ๋ฌธ์ž์—ด (์ €์žฅ ๊ฒฝ๋กœ์šฉ).
Returns
-------
List[dict]
๊ฐ ๊ทธ๋ฃน๋ณ„ record ๋ฆฌ์ŠคํŠธ. ๋นˆ ๊ฒฐ๊ณผ๋ฉด [].
"""
daily_results: List[dict] = []
daily_result_dfs: dict = {}
figures_dir = os.path.join(self.figures_by_cst_dir, date_str)
os.makedirs(figures_dir, exist_ok=True)
for car_id in tqdm(df["CAR_ID"].unique(), desc=f"{date_str} CST", leave=False):
df_cst = df[df["CAR_ID"] == car_id].copy()
if df_cst.empty:
continue
df_cst = self._apply_grid_and_n1(df_cst)
if df_cst.empty:
continue
df_cst = add_zone_labels(df_cst, inner_radius=self.inner_radius_mm)
for dttm, df_group in df_cst.groupby("HIS_REGIST_DTTM_8030"):
df_group = self._apply_n2(df_group)
if df_group.empty:
continue
eqp_series = df_group["EQP_ID_8030"].dropna()
eqp_nm = eqp_series.mode().iloc[0] if not eqp_series.empty else "Unknown"
cls = self._classify(df_group)
if cls is None:
continue
key = f"{car_id}_{dttm}"
daily_result_dfs[key] = cls["result_df"]
rec = {
"status": "Success",
"mode": "by_cst",
"CST_ID": car_id,
"HIS_REGIST_DTTM": dttm,
"EQP_NM_8030": eqp_nm,
"analysis_date": date_str,
"wafer_count": df_group["WAF_ID"].nunique(),
"defect_count": cls["defect_count"],
"overall_pattern": self._pattern_str(cls["pattern_list"]),
"overall_dominant_zone": cls["dominant_zone"],
"main_centroid_x": cls["main_centroid_x"],
"main_centroid_y": cls["main_centroid_y"],
"main_centroid_Angle": cls["main_centroid_Angle"],
"main_centroid_Distance": cls["main_centroid_Distance"],
}
daily_results.append(self._attach_contact_candidates(rec))
if daily_results:
df_daily = pd.DataFrame(daily_results)
df_daily.to_csv(
os.path.join(self.by_cst_dir, f"{date_str}_LLS_CST_analysis.csv"),
index=False, encoding="utf-8-sig",
)
for key, result_df in tqdm(daily_result_dfs.items(),
desc=f"{date_str} ์‹œ๊ฐํ™”", leave=False):
meta = next(
(r for r in daily_results
if f"{r['CST_ID']}_{r['HIS_REGIST_DTTM']}" == key),
None,
)
if not meta:
continue
plot_wafer_map(
result_df=result_df,
key=key,
pattern_list=meta["overall_pattern"],
dominant_zone=meta["overall_dominant_zone"],
meta=meta,
show_mode=False,
save_path=os.path.join(figures_dir, f"{key}.jpg"),
)
return daily_results
# ------------------------------------------------------------------
# Mode 1 : daily aggregated
# ------------------------------------------------------------------
def run_daily(self, df: pd.DataFrame, date_str: str) -> List[dict]:
"""
Mode 1 ๋‹จ์ผ ์ผ์ž ์ฒ˜๋ฆฌ: ํ•˜๋ฃจ ์ „์ฒด ๊ฒฐํ•จ ํ†ตํ•ฉ ํ›„ 1ํšŒ ๋ถ„๋ฅ˜.
ํŒจํ„ด ๋ถ„๋ฅ˜ ์„ฑ๊ณต ์—ฌ๋ถ€์™€ ๋ฌด๊ด€ํ•˜๊ฒŒ ``filtered_defects/{date}_filtered.parquet``
์— ํ•„ํ„ฐ๋ง๋œ ๊ฒฐํ•จ์„ ํ•ญ์ƒ ๋ณด์กดํ•œ๋‹ค. ์‹œ๊ฐํ™”๋Š” ์œ ์˜ ์—ฌ๋ถ€์— ๋”ฐ๋ผ
``figures_daily/significant/`` ๋˜๋Š” ``others/`` ํด๋”๋กœ ๋ถ„๋ฆฌ ์ €์žฅ.
Returns
-------
List[dict]
์„ฑ๊ณต ์‹œ 1๊ฑด record ๋ฆฌ์ŠคํŠธ. ํ•„ํ„ฐ ๋‹จ๊ณ„์—์„œ ๋ชจ๋‘ ์ œ๊ฑฐ๋˜๋ฉด [].
"""
df_day = df.copy()
df_day = self._apply_grid_and_n1(df_day)
if df_day.empty:
print(f"๐ŸŸก {date_str} n1 ํ•„ํ„ฐ ํ†ต๊ณผ ๊ฒฐํ•จ ์—†์Œ โ†’ ์Šคํ‚ต")
return []
df_day = add_zone_labels(df_day, inner_radius=self.inner_radius_mm)
df_day = self._apply_n2(df_day)
if df_day.empty:
print(f"๐ŸŸก {date_str} n2 ํ•„ํ„ฐ ํ†ต๊ณผ ๊ฒฐํ•จ ์—†์Œ โ†’ ์Šคํ‚ต")
return []
wafer_count = df_day["WAF_ID"].nunique()
cls = self._classify(df_day)
# ๋ถ„๋ฅ˜ ์‹คํŒจํ•ด๋„ ํ•„ํ„ฐ๋ง๋œ ๊ฒฐํ•จ์€ ์œ ์ง€ (์‚ฌ์šฉ์ž ์š”๊ตฌ์‚ฌํ•ญ)
if cls is None:
result_df = df_day.assign(inlier=False)
pattern_list = ["None"]
dominant_zone = "N/A"
defect_count = len(df_day)
centroid_fields = {
"main_centroid_x": None, "main_centroid_y": None,
"main_centroid_Angle": None, "main_centroid_Distance": None,
}
else:
result_df = cls["result_df"]
pattern_list = cls["pattern_list"]
dominant_zone = cls["dominant_zone"]
defect_count = cls["defect_count"]
centroid_fields = {
"main_centroid_x": cls["main_centroid_x"],
"main_centroid_y": cls["main_centroid_y"],
"main_centroid_Angle": cls["main_centroid_Angle"],
"main_centroid_Distance": cls["main_centroid_Distance"],
}
is_significant = self._is_significant(pattern_list, defect_count, wafer_count)
eqp_series = (df_day["EQP_ID_8030"].dropna()
if "EQP_ID_8030" in df_day.columns
else pd.Series([], dtype=object))
eqp_nm = eqp_series.mode().iloc[0] if not eqp_series.empty else "Unknown"
key = f"DAILY_{date_str}"
record = {
"status": "Success",
"mode": "daily",
"is_significant": is_significant,
"CST_ID": "ALL",
"HIS_REGIST_DTTM": date_str,
"EQP_NM_8030": eqp_nm,
"analysis_date": date_str,
"wafer_count": wafer_count,
"defect_count": defect_count,
"overall_pattern": self._pattern_str(pattern_list),
"overall_dominant_zone": dominant_zone,
**centroid_fields,
}
record = self._attach_contact_candidates(record)
# CSV ์ €์žฅ
pd.DataFrame([record]).to_csv(
os.path.join(self.daily_agg_dir, f"{date_str}_LLS_daily_analysis.csv"),
index=False, encoding="utf-8-sig",
)
# ํ†ตํ•ฉ ๊ฒฐํ•จ parquet ํ•ญ์ƒ ์ €์žฅ (๋ถ„๋ฅ˜ ๋ฌด๊ด€)
defects_dir = os.path.join(self.daily_agg_dir, "filtered_defects")
os.makedirs(defects_dir, exist_ok=True)
result_df.to_parquet(
os.path.join(defects_dir, f"{date_str}_filtered.parquet"),
index=False,
)
# ์‹œ๊ฐํ™”: ์œ ์˜/๋น„์œ ์˜ ํด๋” ๋ถ„๋ฆฌ
sub_dir = "significant" if is_significant else "others"
save_dir = os.path.join(self.figures_daily_dir, sub_dir)
os.makedirs(save_dir, exist_ok=True)
plot_wafer_map(
result_df=result_df,
key=key,
pattern_list=record["overall_pattern"],
dominant_zone=record["overall_dominant_zone"],
meta=record,
show_mode=False,
save_path=os.path.join(save_dir, f"{key}.jpg"),
)
return [record]
# ------------------------------------------------------------------
# Dispatcher / ์ง„์ž…์ 
# ------------------------------------------------------------------
def run(self, mode: Mode = "by_cst") -> pd.DataFrame:
"""
๋ชจ๋“œ๋ณ„ ์ผ์ž ์ผ๊ด„ ์ฒ˜๋ฆฌ.
Parameters
----------
mode : {"by_cst", "daily"}
"by_cst": CST ร— ์Šค์บ”์‹œ๊ฐ ๋‹จ์œ„ (์„ธ๋ฐ€)
"daily" : ์ผ์ž ํ†ตํ•ฉ ๋‹จ์œ„ (ํŠธ๋ Œ๋“œ)
Returns
-------
pd.DataFrame
๋ชจ๋“  ์ผ์ž record๋ฅผ ํ•ฉ์นœ ํ†ตํ•ฉ DF (`output_dir`์— CSV๋กœ๋„ ์ €์žฅ).
๊ฒฐ๊ณผ ์—†์œผ๋ฉด ๋นˆ DF.
Raises
------
ValueError
mode๊ฐ€ ํ—ˆ์šฉ ๊ฐ’์ด ์•„๋‹ ๋•Œ.
FileNotFoundError
``daily_input_dir`` ์— parquet ํŒŒ์ผ์ด ์—†์„ ๋•Œ.
"""
if mode not in ("by_cst", "daily"):
raise ValueError(f"mode๋Š” 'by_cst' ๋˜๋Š” 'daily' ์—ฌ์•ผ ํ•ฉ๋‹ˆ๋‹ค. got={mode}")
parquet_files = sorted(glob.glob(os.path.join(self.daily_input_dir, "*.parquet")))
if not parquet_files:
raise FileNotFoundError(
f"โŒ {self.daily_input_dir} ํด๋”์— parquet ํŒŒ์ผ์ด ์—†์Šต๋‹ˆ๋‹ค."
)
print(f"โœ… ์ด {len(parquet_files)}๊ฐœ์˜ ์ผ์ž๋ณ„ ํŒŒ์ผ ๋ฐœ๊ฒฌ (mode={mode})")
all_results: List[dict] = []
for file_path in tqdm(parquet_files, desc=f"๐Ÿ“… ์ผ์ž๋ณ„ ์ฒ˜๋ฆฌ ({mode})"):
date_str = os.path.basename(file_path).split(".")[0]
if not (len(date_str) == 8 and date_str.isdigit()):
print(f"๐ŸŸก ๊ฑด๋„ˆ๋œ€ (ํŒŒ์ผ๋ช… ํ˜•์‹ ์˜ค๋ฅ˜): {file_path}")
continue
df = self._load_parquet(file_path)
if df is None:
print(f"๐ŸŸก ๋ฐ์ดํ„ฐ ์—†์Œ: {file_path}")
continue
if mode == "by_cst":
results = self.run_by_cst(df, date_str)
else:
results = self.run_daily(df, date_str)
all_results.extend(results)
if not all_results:
print("โŒ ๋ถ„์„๋œ ๊ฒฐ๊ณผ๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.")
return pd.DataFrame()
final_df = pd.DataFrame(all_results)
suffix = "by_cst" if mode == "by_cst" else "daily_agg"
final_path = os.path.join(self.output_dir, f"LLS_{suffix}_full_analysis.csv")
final_df.to_csv(final_path, index=False, encoding="utf-8-sig")
print(f"โœ… ์ „์ฒด ๋ถ„์„ ์™„๋ฃŒ: {len(all_results)}๊ฑด โ†’ {final_path}")
return final_df
# ----------------------------------------------------------------------
# CLI ์ง„์ž…์ : `python pattern_analyzer.py [by_cst|daily]`
# ----------------------------------------------------------------------
if __name__ == "__main__":
sys.path.append(os.getcwd())
mode: Mode = sys.argv[1] if len(sys.argv) > 1 else "by_cst"
analyzer = LLSPatternAnalyzer()
analyzer.run(mode=mode)