hetest / classifier.py
jkkim
PII ๋ถ„์„ ์˜คํƒ ๋Œ€์ฑ… 3์ข… โ€” NRP/DATE_TIME ๊ฐ€์ค‘์น˜ 0 + ์ •๊ทœ์‹ ๊ฐ•ํ™” + ํ›„์ฒ˜๋ฆฌ ํ•„ํ„ฐ
64f5afe
"""๋ฌธ์„œ ๋ณด์•ˆ๋“ฑ๊ธ‰ ๋ถ„๋ฅ˜๊ธฐ (Rule-based / Zero-shot)
====================================================
SPEC_ํŒŒ์ผ๋ถ„๋ฅ˜_PoC.md ยง2.2 / ยง2.3 / ยง10 ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•œ **๊ฐ„๋‹จํ•œ** 1์ฐจ ๋ถ„๋ฅ˜ ๋ชจ๋ธ.
์„ค๊ณ„ ์˜๋„
---------
- SPEC ์˜ ์ตœ์ข… ๋ชฉํ‘œ๋Š” KoELECTRA + mDeBERTa ์•™์ƒ๋ธ” + ์‚ฌ์šฉ์ž ํ”ผ๋“œ๋ฐฑ ๋ฃจํ”„์ง€๋งŒ,
๊ทธ ๋ชจ๋ธ์€ **์‚ฌ์šฉ์ž ๋ ˆ์ด๋ธ”์ด ๋ˆ„์ ๋˜์–ด์•ผ** ํ•™์Šต์ด ๊ฐ€๋Šฅํ•˜๋‹ค (์ฝœ๋“œ์Šคํƒ€ํŠธ ๋ฌธ์ œ).
- ๋ณธ ๋ชจ๋“ˆ์€ **๋ถ€ํŠธ์ŠคํŠธ๋žฉ 1๋‹จ๊ณ„ โ€” zero-shot ์ž๋™ ๋ ˆ์ด๋ธ” ์ƒ์„ฑ๊ธฐ**.
(SPEC ยง2.3 โ‘  "๋ฃฐ + ํ‚ค์›Œ๋“œ ๋งค์นญ๋งŒ์œผ๋กœ ์ž๋™ ๋ ˆ์ด๋ธ”")
- ์ ์ˆ˜(score) ๊ธฐ๋ฐ˜ ๊ฒฐ์ • โ†’ ์ž„๊ณ„๊ฐ’(threshold) ์œผ๋กœ C / S / O ๋ถ„๋ฅ˜.
- "์™œ ๊ทธ ๋“ฑ๊ธ‰์ธ๊ฐ€" ๋ฅผ ํ•จ๊ป˜ ๋Œ๋ ค์ค€๋‹ค (SPEC ๊ธฐ๋Šฅ 4 โ€” XAI).
C / S / O ์ •์˜ (SPEC ยง2.2)
--------------------------
- **C** Critical / ์œ„ํ—˜ : ์ง์ ‘์‹๋ณ„์ž(์ฃผ๋ฏผ๋ฒˆํ˜ธยท์—ฌ๊ถŒยท์นด๋“œ ๋“ฑ) ๋˜๋Š” ๊ฐ•ํ•œ ๋“ฑ๊ธ‰ ๋งˆ์ปค
- **S** Sensitive / ๋ฏผ๊ฐ : API ํ‚ค, ์‚ฌ์—…์ž๋ฒˆํ˜ธ, ๋‚ด๋ถ€ ํ”„๋กœ์ ํŠธ๋ช…, VIP ๋“ฑ
- **O** Open / ๊ณต๊ฐœ : ๊ทธ ์™ธ
์ˆ˜์‹
----
score = ฮฃ (w_e ยท count_e) + ฮฃ (w_k ยท min(count_k, 3))
(entity) (grade keyword)
grade = C if score >= C_THRESHOLD
S if score >= S_THRESHOLD
O otherwise
์‹ ๋ขฐ๋„(confidence) = 0.55 + 0.4 ยท tanh(margin / 2)
margin = ์ž„๊ณ„๊ฐ’์—์„œ ๋–จ์–ด์ง„ ๊ฑฐ๋ฆฌ (๋ฐด๋“œ ๋‚ด๋ถ€ ํญ์€ ์–‘์ชฝ ๊ฒฝ๊ณ„๊นŒ์ง€์˜ ์ตœ์†Œ๊ฑฐ๋ฆฌ)
"""
from __future__ import annotations
import math
import re
from typing import Iterable
# ๋“ฑ๊ธ‰ โ†’ ์ •์ˆ˜ (SPEC ยง10.1, gap ๊ณ„์‚ฐ์šฉ)
GRADE_RANK = {"O": 0, "S": 1, "C": 2}
# entity_type ๋ณ„ ๋‹จ์ผ ๋งค์น˜๋‹น ๊ฐ€์‚ฐ์ 
ENTITY_WEIGHTS: dict[str, float] = {
# --- Critical ํ›„๋ณด (์ง์ ‘์‹๋ณ„์ž) ---
"KR_RRN": 5.0, # ํ•œ๊ตญ ์ฃผ๋ฏผ๋“ฑ๋ก๋ฒˆํ˜ธ
"KR_PASSPORT": 4.5,
"CREDIT_CARD": 4.5,
"US_SSN": 4.5,
"IBAN_CODE": 3.0,
# --- Sensitive ํ›„๋ณด (๊ณ„์ •ยท๋น„์ฆˆยท๋‚ด๋ถ€) ---
"AWS_ACCESS_KEY": 3.5,
"GENERIC_API_KEY": 1.0, # ์˜คํƒ ์žฆ์Œ (32์ž ์ž„์˜ ํ† ํฐ) โ†’ ๋‚ฎ์ถค
"KR_BIZ_NO": 2.5,
"VIP_NAMES": 2.0,
"INTERNAL_PROJECTS": 2.0,
"KR_ADDRESS": 1.5,
# --- ์•ฝํ•œ ์‹๋ณ„์ž ---
"IP_ADDRESS": 0.4,
"KR_PHONE": 0.5,
"PHONE_NUMBER": 0.5,
"EMAIL_ADDRESS": 0.4,
"PERSON": 0.3,
"LOCATION": 0.15, # ์ผ๋ฐ˜ ์ง€๋ช… ํ”ํ•จ โ€” ๋‚ฎ์ถค
"ORGANIZATION": 0.15, # ์ผ๋ฐ˜ ํšŒ์‚ฌ๋ช… ํ”ํ•จ โ€” ๋‚ฎ์ถค
"URL": 0.0, # ์ผ๋ฐ˜ URL โ€” PII ์•„๋‹˜
"DATE_TIME": 0.0, # ์ผ๋ฐ˜ ๋‚ ์งœ / ๋ฒ„์ „ ๋ฒˆํ˜ธ (1.9.16 ๋“ฑ) โ€” PII ์•„๋‹˜
# --- ๋น„-PII (NER ๋ชจ๋ธ noise) โ€” fallback DEFAULT ๊ฐ€ 0.3 ์œผ๋กœ ์žกํ˜€ ํ•™์Šต dominate ๋ฐฉ์ง€ ---
"NRP": 0.0, # NORP = Nationalities/Religious/Political (spaCy ์˜์–ด NER)
"NORP": 0.0, # spaCy ์›๋ณธ ๋ผ๋ฒจ (Presidio ๊ฐ€ NRP ๋กœ ์ถ•์•ฝ ํ‘œ๊ธฐ)
# --- ํ•œ๊ตญ ํ™˜๊ฒฝ ๋ฌด๊ด€ ์™ธ๊ตญ PII (Presidio default ์ •๊ทœ์‹, ํ•œ๊ตญ์–ด ํ…์ŠคํŠธ์—” ์˜คํƒ) ---
"US_DRIVER_LICENSE": 0.0, # s3, y3, X0 ๊ฐ™์€ 2๊ธ€์ž ์˜คํƒ ๋‹ค์ˆ˜
"US_ITIN": 0.0,
"US_PASSPORT": 0.0,
"IN_PAN": 0.0,
}
DEFAULT_ENTITY_WEIGHT = 0.05 # ๋ฏธ๋ช…์‹œ entity ๊ฐ€ ํ•™์Šต์„ dominate ํ•˜์ง€ ์•Š๋„๋ก (๊ธฐ์กด 0.3 โ†’ 0.05)
# ๋ณธ๋ฌธ ๋“ฑ๊ธ‰ ํ‚ค์›Œ๋“œ โ€” ๋ช…์‹œ์  ๋ผ๋ฒจ์ด ๋ฐ•ํ˜€์žˆ์„ ๋•Œ ๊ฐ•ํ•˜๊ฒŒ ๋ฐ˜์˜
# (kw_lower, weight, display_label)
GRADE_KEYWORDS: list[tuple[str, float, str]] = [
("๊ทน๋น„", 4.0, "๊ทน๋น„"),
("top secret", 4.0, "Top Secret"),
("๋Œ€์™ธ๋น„", 3.0, "๋Œ€์™ธ๋น„"),
("๊ธฐ๋ฐ€", 3.0, "๊ธฐ๋ฐ€"),
("confidential", 3.0, "Confidential"),
("secret", 2.5, "Secret"),
("๋‚ด๋ถ€์šฉ", 1.5, "๋‚ด๋ถ€์šฉ"),
("์‚ฌ๋‚ดํ•œ์ •", 1.5, "์‚ฌ๋‚ดํ•œ์ •"),
("internal use", 1.5, "Internal Use"),
("restricted", 1.5, "Restricted"),
("private", 1.0, "Private"),
("๊ฐœ์ธ์ •๋ณด", 1.0, "๊ฐœ์ธ์ •๋ณด ๋ผ๋ฒจ"),
]
# ๋™์ผ ํ‚ค์›Œ๋“œ ๋ˆ„์  ์ƒํ•œ โ€” "๋น„๋ฐ€ ๋น„๋ฐ€ ๋น„๋ฐ€..." ์–ด๋ทฐ์ง• ๋ฐฉ์ง€
KW_COUNT_CAP = 3
# ์ž„๊ณ„๊ฐ’ โ€” ์†ํŠœ๋‹๊ฐ’. ๋ฐ์ดํ„ฐ ๋ˆ„์  ํ›„ SPEC ยง10.4 ์˜ Platt scaling ์œผ๋กœ ๋ณด์ • ์˜ˆ์ •
C_THRESHOLD = 5.0
S_THRESHOLD = 2.0
CLASSIFIER_VERSION = "rule-v1" # ๋ชจ๋“ˆ ์ถœํ•˜ ์‹œ ๋ฒ ์ด์Šค ๋ฒ„์ „ (๋ถˆ๋ณ€)
_active_version = CLASSIFIER_VERSION # ํ•™์Šต/ํ•ซ์Šค์™‘ ํ›„ ๊ฐฑ์‹ ๋˜๋Š” ํ™œ์„ฑ ๋ฒ„์ „
def active_version() -> str:
"""ํ˜„์žฌ ํ™œ์„ฑ ๋ชจ๋ธ ๋ฒ„์ „. /api/analyze ์‘๋‹ต ๋“ฑ์—์„œ ์‚ฌ์šฉ."""
return _active_version
def set_active_version(v: str) -> None:
"""train.apply_new_weights() ๊ฐ€ ํ•ซ์Šค์™‘ ํ›„ ํ˜ธ์ถœ โ€” ๋ฒ„์ „ ๋ผ๋ฒจ ์—…๋ฐ์ดํŠธ."""
global _active_version
_active_version = v
# ---------------------------------------------------------------------------
# extra grade keywords (์‚ฌ์šฉ์ž ์ถ”๊ฐ€) โ€” ๋ถ€ํŒ… ์‹œ GRADE_KEYWORDS ์— ๋ณ‘ํ•ฉ / ์˜์†ํ™”
# ---------------------------------------------------------------------------
import json as _json
from pathlib import Path as _Path
_EXTRA_KW_FILE = _Path(__file__).resolve().parent / "extra_grade_keywords.json"
def _load_extra_kw() -> list[dict]:
if not _EXTRA_KW_FILE.exists():
return []
try:
return _json.loads(_EXTRA_KW_FILE.read_text(encoding="utf-8"))
except Exception:
return []
def _save_extra_kw(items: list[dict]) -> None:
_EXTRA_KW_FILE.write_text(
_json.dumps(items, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def _is_builtin_kw(label: str) -> bool:
"""์ดˆ๊ธฐ GRADE_KEYWORDS ์— ๋“ฑ๋ก๋œ ํ‚ค์›Œ๋“œ์ธ์ง€ โ€” ์‚ฌ์šฉ์ž ์‚ญ์ œ ๋ถˆ๊ฐ€."""
return label in {l for (_kw, _w, l) in _BUILTIN_GRADE_KEYWORDS}
# ๋นŒํŠธ์ธ ์Šค๋ƒ…์ƒท (load_extras ๊ฐ€ ์—ฌ๋Ÿฌ ๋ฒˆ ํ˜ธ์ถœ๋ผ๋„ ์ค‘๋ณต ์ถ”๊ฐ€ ์•ˆ ๋˜๋„๋ก)
_BUILTIN_GRADE_KEYWORDS: list[tuple[str, float, str]] = list(GRADE_KEYWORDS)
def reload_extra_grade_keywords() -> int:
"""ํŒŒ์ผ์—์„œ ๋‹ค์‹œ ์ฝ์–ด GRADE_KEYWORDS ๋ฅผ ๋นŒํŠธ์ธ + ์‚ฌ์šฉ์ž ์ •์˜๋กœ ์žฌ๊ตฌ์„ฑ. ๋ฐ˜ํ™˜: ์ถ”๊ฐ€๋œ ํ•ญ๋ชฉ ์ˆ˜."""
extras = _load_extra_kw()
GRADE_KEYWORDS.clear()
GRADE_KEYWORDS.extend(_BUILTIN_GRADE_KEYWORDS)
for x in extras:
kw = (x.get("keyword") or "").lower()
w = float(x.get("weight", 1.0))
lbl = x.get("label") or x.get("keyword") or ""
if kw:
GRADE_KEYWORDS.append((kw, w, lbl))
return len(extras)
def add_extra_grade_keyword(keyword: str, weight: float, label: str) -> dict:
"""์‚ฌ์šฉ์ž ์ •์˜ ๋“ฑ๊ธ‰ ํ‚ค์›Œ๋“œ 1๊ฐœ ์ถ”๊ฐ€ (์ด๋ฏธ ์žˆ์œผ๋ฉด ๊ฐ€์ค‘์น˜/๋ผ๋ฒจ ๊ฐฑ์‹ ).
Returns: {keyword, weight, label, action: 'added'|'updated'}
"""
keyword = (keyword or "").strip().lower()
if not keyword:
raise ValueError("keyword is empty")
label = (label or keyword).strip()
items = _load_extra_kw()
action = "added"
for it in items:
if it.get("keyword", "").lower() == keyword:
it["weight"] = float(weight)
it["label"] = label
action = "updated"
break
else:
items.append({"keyword": keyword, "weight": float(weight), "label": label})
_save_extra_kw(items)
reload_extra_grade_keywords()
return {"keyword": keyword, "weight": float(weight), "label": label, "action": action}
def remove_extra_grade_keyword(keyword: str) -> dict:
"""์‚ฌ์šฉ์ž ์ถ”๊ฐ€ ํ‚ค์›Œ๋“œ ์‚ญ์ œ. ๋นŒํŠธ์ธ์€ ์‚ญ์ œ ๋ถˆ๊ฐ€."""
keyword = (keyword or "").strip().lower()
items = _load_extra_kw()
before = len(items)
items = [it for it in items if it.get("keyword", "").lower() != keyword]
if len(items) == before:
return {"keyword": keyword, "removed": False, "reason": "not in extras (built-in or unknown)"}
_save_extra_kw(items)
reload_extra_grade_keywords()
return {"keyword": keyword, "removed": True}
def list_extra_grade_keywords() -> list[dict]:
return _load_extra_kw()
# ๋ชจ๋“ˆ ๋กœ๋“œ ์‹œ ์ž๋™ ๋ณ‘ํ•ฉ
reload_extra_grade_keywords()
def _grade_for_score(score: float) -> tuple[str, str, float]:
"""score โ†’ (grade, label, margin). margin ์€ ๊ฐ€์žฅ ๊ฐ€๊นŒ์šด ์ž„๊ณ„๊ฐ’๊นŒ์ง€์˜ ๊ฑฐ๋ฆฌ."""
if score >= C_THRESHOLD:
return "C", "์œ„ํ—˜ (Critical)", score - C_THRESHOLD
if score >= S_THRESHOLD:
return "S", "๋ฏผ๊ฐ (Sensitive)", min(score - S_THRESHOLD, C_THRESHOLD - score)
return "O", "๊ณต๊ฐœ (Open)", S_THRESHOLD - score
def _confidence(margin: float) -> float:
"""margin โ†’ 0.55..0.95 ๋ฒ”์œ„์˜ ์‹ ๋ขฐ๋„. ์ž„๊ณ„๊ฐ’์— ๊ฐ€๊นŒ์šธ์ˆ˜๋ก 0.55."""
if margin < 0:
margin = 0.0
return round(0.55 + 0.4 * math.tanh(margin / 2.0), 3)
def _scan_keywords(text: str) -> list[dict]:
"""๋ณธ๋ฌธ์—์„œ ๋“ฑ๊ธ‰ ํ‚ค์›Œ๋“œ ๋งค์นญ โ†’ ๊ธฐ์—ฌ๋„ dict ๋ฆฌ์ŠคํŠธ."""
if not text:
return []
lower = text.lower()
out: list[dict] = []
for kw, w, label in GRADE_KEYWORDS:
cnt = lower.count(kw)
if cnt <= 0:
continue
capped = min(cnt, KW_COUNT_CAP)
out.append({
"kind": "keyword",
"label": label,
"weight": round(w, 2),
"count": cnt,
"counted": capped,
"contribution": round(w * capped, 2),
})
return out
def _scan_entities(findings: Iterable[dict]) -> list[dict]:
"""findings โ†’ entity_type ๋ณ„ ๊ธฐ์—ฌ๋„ dict ๋ฆฌ์ŠคํŠธ."""
counts: dict[str, int] = {}
for f in findings or []:
et = f.get("entity_type")
if not et:
continue
counts[et] = counts.get(et, 0) + 1
out: list[dict] = []
for et, cnt in counts.items():
w = ENTITY_WEIGHTS.get(et, DEFAULT_ENTITY_WEIGHT)
out.append({
"kind": "entity",
"label": et,
"weight": round(w, 2),
"count": cnt,
"contribution": round(w * cnt, 2),
})
return out
def classify(findings: Iterable[dict] | None, text: str | None) -> dict:
"""findings + ๋ณธ๋ฌธ โ†’ ๋ถ„๋ฅ˜ ๊ฒฐ๊ณผ.
Args:
findings: /api/analyze ๊ฐ€ ๋งŒ๋“  findings ๋ฆฌ์ŠคํŠธ (entity_type ํ‚ค ํ•„์ˆ˜).
text: ๋ณธ๋ฌธ โ€” ๋“ฑ๊ธ‰ ํ‚ค์›Œ๋“œ ์Šค์บ”์— ์‚ฌ์šฉ. None/"" ์ด๋ฉด ํ‚ค์›Œ๋“œ ์Šค์บ” ์ƒ๋žต.
Returns: {
grade: "C" | "S" | "O"
grade_label: "์œ„ํ—˜ (Critical)" ๋“ฑ
score: ๋ˆ„์  ์ ์ˆ˜
confidence: 0.55..0.95
thresholds: {"C": 5.0, "S": 2.0}
reasons: ๊ธฐ์—ฌ ํฐ ์ˆœ ์ •๋ ฌ๋œ entity/keyword ํ•ญ๋ชฉ (์ตœ๋Œ€ 20)
version: "rule-v1"
text_chars: ์Šค์บ” ๋Œ€์ƒ ๋ณธ๋ฌธ ๊ธธ์ด
}
"""
entity_reasons = _scan_entities(findings or [])
keyword_reasons = _scan_keywords(text or "")
reasons = entity_reasons + keyword_reasons
reasons.sort(key=lambda r: -r["contribution"])
score = sum(r["contribution"] for r in reasons)
grade, grade_label, margin = _grade_for_score(score)
return {
"grade": grade,
"grade_label": grade_label,
"score": round(score, 2),
"confidence": _confidence(margin),
"thresholds": {"C": C_THRESHOLD, "S": S_THRESHOLD},
"reasons": reasons[:20],
"version": CLASSIFIER_VERSION,
"text_chars": len(text or ""),
}
def gap(ai_grade: str, user_grade: str) -> int:
"""SPEC ยง10.1 โ€” |aiRank - userRank|. 0=์ผ์น˜, 1=์ธ์ ‘ ๋ถˆ์ผ์น˜, 2=๋ฐ˜๋Œ€."""
return abs(GRADE_RANK.get(ai_grade, 0) - GRADE_RANK.get(user_grade, 0))
def sample_weight(g: int) -> float:
"""SPEC ยง10.2 โ€” ํ•™์Šต ์‹œ ๊ฐ€์ค‘์น˜. 1 + 1.5 * gap."""
return 1.0 + 1.5 * g