import torch import torch.nn as nn import torch.nn.functional as F from transformers import BertTokenizer, BertModel import pickle import re import os import sys import numpy as np from collections import defaultdict # ============================================================================= # 1. 모델 클래스 정의 # ============================================================================= # (1) 규칙 기반 스코어러 클래스 class RuleBasedScorer: def __init__(self): # 패턴별 단어 사전 self.patterns = { 11: defaultdict(float), 12: defaultdict(float), 13: defaultdict(float), 14: defaultdict(float) } self.pattern_names = { 11: '의문 유발형(부호)', 12: '의문 유발형(은닉)', 13: '선정표현 사용형', 14: '속어/줄임말 사용형' } # 부호 패턴 (단순 물음표 제외, 과장된 부호만) self.symbol_patterns = { 'repeated': re.compile(r'([!?…~])\1+'), # 반복 부호 (??, !!) 'ellipsis': re.compile(r'\.\.\.|…') # 말줄임표 } def get_score(self, title): # 1. 텍스트 토큰화 (단순 띄어쓰기 및 문자 추출) words = re.findall(r'[가-힣A-Za-z0-9]+', str(title)) scores = {} # 2. 부호 점수 계산 rep = len(self.symbol_patterns['repeated'].findall(title)) ell = len(self.symbol_patterns['ellipsis'].findall(title)) symbol_score = (rep * 30) + (ell * 10) # 3. 패턴별(11~14) 점수 계산 for p in [11, 12, 13, 14]: word_score = 0 # 단어 매칭 점수 (사전에 있는 단어인지 확인) if p in self.patterns: # 안전장치 for word in words: if word in self.patterns[p]: # 가중치 적용 (로그 스케일) word_score += np.log1p(self.patterns[p][word]) * 2 total = 0 # 패턴별 점수 합산 로직 if p == 11: # 의문부호형 total = symbol_score # 오직 부호만 봄 elif p == 12: # 의문은닉형 ("...이유는") total = word_score + (symbol_score * 0.5) else: # 13(선정), 14(속어) total = word_score # 오직 단어만 봄 scores[p] = total # 4. 최종 점수 산출 (가장 높은 점수 선택) if not scores: return {'score': 0, 'pattern': 0, 'pattern_name': '정상'} max_pattern = max(scores, key=scores.get) max_score = min(100, scores[max_pattern]) # 100점 만점 return { 'score': max_score, 'pattern': max_pattern, 'pattern_name': self.pattern_names.get(max_pattern, '알 수 없음') } # 🚨 Pickle 로딩 에러 방지용 import __main__ setattr(__main__, "RuleBasedScorer", RuleBasedScorer) # (2) KoBERT 모델 클래스 class FishingClassifier(nn.Module): def __init__(self, bert, num_classes=2): super().__init__() self.bert = bert self.dropout = nn.Dropout(0.3) self.fc = nn.Linear(768, num_classes) def forward(self, input_ids, mask): _, pooled = self.bert(input_ids=input_ids, attention_mask=mask, return_dict=False) return self.fc(self.dropout(pooled)) # ============================================================================= # 2. 모델 로드 # ============================================================================= print("[AggroModel] 시스템 로딩 시작...") from kobert_transformers import get_tokenizer aggro_model = None tokenizer = None rule_scorer = None device = torch.device("cpu") BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # A. 규칙 모델 로드 try: with open(os.path.join(BASE_DIR, "rule_based_scorer.pkl"), "rb") as f: rule_scorer = pickle.load(f) print("✅ [Aggro] 규칙 모델 로드 성공") except: print("⚠️ [Aggro] 규칙 모델 없음, 빈 객체 생성") rule_scorer = RuleBasedScorer() # B. KoBERT 모델 로드 try: print("🔄 KoBERT 모델 로딩 중...") # 토크나이저 로드 # 이 함수가 알아서 사전 파일(.spm)을 다운로드하고 연결해줍니다. tokenizer = get_tokenizer() # 모델 로드 (monologg 구조 유지) bert_base = BertModel.from_pretrained('monologg/kobert') aggro_model = FishingClassifier(bert_base).to(device) # 가중치 파일 로드 pth_path = os.path.join(BASE_DIR, "bert_fishing_model_best.pth") pt_path = os.path.join(BASE_DIR, "kobert_aggro_score.pt") final_path = pth_path if os.path.exists(pth_path) else pt_path if os.path.exists(final_path): checkpoint = torch.load(final_path, map_location=device) # state_dict 추출 if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint: loaded_state_dict = checkpoint['model_state_dict'] elif isinstance(checkpoint, dict): loaded_state_dict = checkpoint else: loaded_state_dict = checkpoint.state_dict() new_state_dict = {} for k, v in loaded_state_dict.items(): name = k # 1. module. 접두어 제거 if name.startswith('module.'): name = name[7:] # 2. classifier -> fc 로 이름 변경 (여기서 매칭됨!) if 'classifier' in name: new_name = name.replace('classifier', 'fc') print(f"🔧 [Fix] 이름 변경 적용: {name} -> {new_name}") name = new_name new_state_dict[name] = v # 3. 로드 실행 (결과 확인) missing_keys, unexpected_keys = aggro_model.load_state_dict(new_state_dict, strict=False) # [중요] fc.weight가 누락(missing)되었는지 확인 if any("fc.weight" in key for key in missing_keys): print("🚨 [CRITICAL] fc 레이어가 여전히 로드되지 않았습니다! (점수 고정 원인)") print(f"Missing Keys: {missing_keys}") else: print("✅ [Success] fc 레이어(분류기)가 정상적으로 로드되었습니다!") # if os.path.exists(final_path): # state = torch.load(final_path, map_location=device) # if isinstance(state, dict) and 'model_state_dict' in state: # aggro_model.load_state_dict(state['model_state_dict'], strict=False) # else: # aggro_model.load_state_dict(state, strict=False) aggro_model.eval() print(f"✅ [Aggro] KoBERT 모델 로드 완료: {os.path.basename(final_path)}") else: print("⚠️ [Aggro] 가중치 파일(.pth/.pt)을 찾을 수 없습니다!") aggro_model = None except Exception as e: print(f"🚨 [Aggro] 모델 로딩 중 에러 발생: {e}") aggro_model = None # ============================================================================= # 3. 메인 함수 # ============================================================================= def get_aggro_score(title: str) -> dict: # 1. 규칙 점수 계산 rule_score = 0.0 rule_pattern = "분석 불가" try: res = rule_scorer.get_score(title) rule_score = res['score'] rule_pattern = res.get('pattern_name', '알 수 없음') except: pass # 2. KoBERT 점수 계산 bert_score = 0.0 if aggro_model and tokenizer: try: inputs = tokenizer( title, return_tensors='pt', padding="max_length", truncation=True, max_length=64 ) # 🕵️‍♂️ [핵심 디버그] 토큰이 제대로 만들어졌는지 로그에 출력! # 정상이라면: [2, 4532, 1234, 3, ...] 처럼 다양한 숫자가 나와야 함 # 에러라면: [2, 0, 0, 0, 3, ...] 처럼 0이 가득하거나 [2, 3] 처럼 비어있음 input_ids = inputs['input_ids'].to(device) mask = inputs['attention_mask'].to(device) oken_type_ids = inputs['token_type_ids'].to(device) print(f"\n👉 [토큰 확인] 입력: '{title}'") print(f"👉 [토큰 ID]: {input_ids[:15]} ...") # 앞부분 15개만 출력 with torch.no_grad(): outputs = aggro_model(input_ids, mask) probs = F.softmax(outputs / 2.0, dim=1) bert_score = probs[0][1].item() * 100 except Exception as e: print(f"🚨 [BERT Error] {e}") bert_score = 50.0 # Safety Net if rule_score < 5: bert_score *= 0.3 elif rule_score < 20: bert_score *= 0.8 #3. 합산 w_rule = 0.0 w_bert = 1.0 final_score = (rule_score * w_rule) + (bert_score * w_bert) # 4. 결과 normalized_score = min(final_score / 100.0, 1.0) # 5. 등급 판정 if final_score >= 80: reason = f"매우 높음 🔴" recommendation = "전면 수정 권장" elif final_score >= 60: reason = f"높음 🟠" recommendation = "과장된 표현 수정 필요" elif final_score >= 40: reason = f"보통 🟡" recommendation = "일부 표현 중립화 권장" else: reason = f"낮음 🟢" recommendation = "적절한 제목입니다" return { "score": round(normalized_score, 4), "reason": reason, "recommendation": recommendation }